if '开始分级日志' in message:
#self.process_id_to_bus_seq.clear()
#self.gapslist.clear()
# 记录加到缓存
#self.gapslist.add(message)
date_str = datetime.now().strftime("%Y%m%d")
index_name = 'flink-log-clpf-gaps-' + str(date_str)
id='0'
log_event = LogEvent(id, source, fileTag, fileName, serviceCode, appName, timestamp, offset, message, index_name)
yield log_event.to_dict()
print('aaaaaaaaaaaaaaaaaaaaaa')
return
print('bbbbbbbbbbbbbbbbbbbbbb')
[root@kafka1 pyflink]# python test.py
aaaaaaaaaaaaaaaaaaaaaa
{'id': '0', 'source': '10.4.146.9', 'fileTag': 'gaps', 'fileName': '/opt/test/f3s_20230633_104.log', 'serviceCode': '00000', 'appName': 'clpf', 'timestamp': '1687419610160512025', 'offset': '0', 'message': '2023-06-20 20:04:03.472
--------------开始分级日志 PID:11493 PROC[MPP_SDS_F3S] DATE:20230620 TIME:200403---------------', 'index_name': 'flink-log-clpf-gaps-20230622'}