from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, RuntimeContext, KeyedProcessFunction
import re
import redis
# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# 读取文件,创建 DataStream 对象
data_stream = env.read_text_file('/root/pyflink/elink_20230603')
# 对每行数据添加字符串 'aaaa'
class MyMapFunction(MapFunction):
def open(self, runtime_context: RuntimeContext):
self.r = redis.Redis(host='127.0.0.1', port=6379)
def map(self,line):
process_id='';
bus_seq=''
if not line.startswith("ES"):
return
if '<Serial>' in line:
pat=re.compile(r"<Serial>(\d+)</Serial>")
bus_seq=pat.findall(line)
process_id=line.split()[1]
self.r.set(process_id,bus_seq[0])
process_id=line.split()[1]
if not len(process_id)==6 :
process_id=line.split()[2]
bus_seq=self.r.get(process_id)
if not bus_seq:
return
#self.r.delete(process_id)
return(bus_seq.decode('UTF-8')+'->'+line)
new_stream = data_stream.map(MyMapFunction()).set_parallelism(1)
# 输出到控制台
new_stream.print()
# 执行任务
env.execute('Add "aaaa" to each line')