flink自动加全局流水

news/2024/11/30 10:28:38/

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction

import re
import redis


# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# 读取文件,创建 DataStream 对象
data_stream = env.read_text_file('/root/pyflink/elink_20230603')
# 对每行数据添加字符串 'aaaa'
class MyMapFunction(MapFunction):
   def open(self, runtime_context: RuntimeContext):
     self.r = redis.Redis(host='127.0.0.1', port=6379)

   def map(self,line):
    process_id='';
    bus_seq=''
    if not line.startswith("ES"):
        return
    if '<Serial>' in line:
       pat=re.compile(r"<Serial>(\d+)</Serial>")
       bus_seq=pat.findall(line)
       process_id=line.split()[1]
       self.r.set(process_id,bus_seq[0])
    process_id=line.split()[1]
    if not len(process_id)==6 :
        process_id=line.split()[2]
     
    bus_seq=self.r.get(process_id) 
    if not bus_seq:
        return
    #self.r.delete(process_id)
    return(bus_seq.decode('UTF-8')+'->'+line)
new_stream = data_stream.map(MyMapFunction()).set_parallelism(1)

# 输出到控制台
new_stream.print()

# 执行任务
env.execute('Add "aaaa" to each line')


http://www.ppmy.cn/news/202433.html

相关文章

做自媒体怎样赚钱?

不想露脸&#xff1f;不想自己拍摄视频&#xff1f;能做自媒体赚钱吗&#xff1f; 可以&#xff01; 今天这期内容来给粉丝们分享一波&#xff0c;抓紧点赞收藏&#xff01; 1、素材 如果我们不自己拍摄视频素材&#xff0c;就需要我们从网络中找一些现成的剪辑素材 手机&…

Java反编译工具Jad的下载与使用示例

场景 Java反编译工具-JD-GUI下载以及使用&#xff1a; Java反编译工具-JD-GUI下载以及使用_霸道流氓气质的博客-CSDN博客 上面讲过Java反编译工具JD-GUI的使用&#xff0c;如果使用jad并通过命令如何实现反编译。 为了验证Java开发手册中为什么不推荐使用进行字符串拼接&…

求助帖——关于更新驱动失败导致的案例

前言&#xff1a;2022年5月16日&#xff0c;我的一位朋友求助我。她的电脑出现问题&#xff0c;大概意思是&#xff1a;她的电脑出现驱动更新提示&#xff0c;她就使用“驱动人生”去尝试更新&#xff0c;但是问题就是出现在这里&#xff0c;更新程序在过程中崩溃&#xff0c;导…

联想笔记本电脑V110拆机、清灰详细步骤

联想笔记本电脑V110拆机、清灰教程 1、首先准备拆机工具&#xff0c;我用的是电脑拆机工具盒&#xff0c;里面有各种型号的螺丝刀&#xff0c;还有刷子、撬棍、垫片等。&#xff08;一定要找合适的螺丝刀&#xff0c;不然螺丝很容易拧滑丝&#xff09;这个型号的电脑还是比较好…

《React后台管理系统实战:五》产品管理(二):产品添加页面及验证等、富文本编辑器、提交商品

一、产品添加基础部分 1 home.jsx点添加按钮动作跳转到添加商品页 点击&#xff1a;onClick{() > this.props.history.push(/product/add-update)}> //card右侧内容const extra(<Button typeprimary onClick{() > this.props.history.push(/product/add-update)…

《React后台管理系统实战:五》产品管理(三)商品列表页(产品搜索及分页)、商品详情组件、商品上下架

《React后台管理系统实战&#xff1a;五》产品管理&#xff08;三&#xff09;商品列表页 一、静态产品列表页 第1步&#xff0c;product/home.jsx import React,{Component} from react import {Card,Select,Input,Table,Icon,Button,message } from antd import LinkButto…

《React后台管理系统实战:五》产品管理(一)

一、概述 1.1目录结构及功能 src/pages/admin/product/add-update.jsx //添加及更新产品detail.jsx //产品详情home.jsx //产品默认页index.jsx //产品路由页index.less //产品样式二、路由搭建 2.1 index.jsx 为防止不能匹配到product/xxx&#xff0c;加上exact 如果以上都…

电脑安装系统出错蓝屏报错为 STOP 0xc0000020 ,什么原因?

安装系统盘后一段时间&#xff0c;还未拷贝数据就开始出现蓝屏&#xff0c;报错信息 0xc0000020 &#xff0c;请大家帮忙看什么硬件出了问题&#xff1f; 问题补充&#xff1a;***STOP: 0x0000006F (0xc0000020,0x00000000,0x00000000,0x00000000) SESSION3_INITIALIZATION_FA…