网络安全-华为华三交换机防火墙日志解析示例

ops/2024/11/14 6:38:44/

DEF_SYSLOG_SWITCH_HUAWEI.py

华为交换机日志解析示例

# -*- coding: utf8 -*-
import time
from DEF_COLOR import *   ## 终端显示颜色def 时间戳_2_时间文本(时间戳, 时间文本格式='%Y-%m-%d %H:%M:%S'):#时间文本格式 = '%Y-%m-%d %H:%M:%S'时间类 = time.localtime(时间戳)时间文本 = time.strftime(时间文本格式, 时间类)return(时间文本)## 适用于需要在SYSLOG的时间上加上时区才能和本地时间相等的情况
def 日志时间转时间戳(时间文本, 加时区):时间戳 = time.mktime(time.strptime(时间文本, '%Y-%m-%dT%H:%M:%S'))+(3600*加时区)return(时间戳)## 2022-11-28T19:00:51+08:00 主机名 %%01SHELL/5/LOGOUT(s)[3918]: The user succeeded in logging out of VTY0. (UserType=SSH, UserName=xxx, Ip=xxx.xxx.xxx.xxx, VpnName=)
def LOG_TYPE(LINE_TEXT):A = LINE_TEXT.find('%%')if A != -1:#打印_黄(LINE_TEXT[A:])B = LINE_TEXT.find('(', A)if B != -1:#打印_绿(LINE_TEXT[B:])C = LINE_TEXT.find(' ', B)  ## 找日志正文开始位置标志if C != -1:#打印_蓝(LINE_TEXT[C:])#打印_青(LINE_TEXT[A+4:B])SP = LINE_TEXT[A+4:B].split('/')return((SP[0], SP[2], C+1))else:打印_红(f"找位置标志' '失败(日志正文开始位) {LINE_TEXT}")return((' -1', LINE_TEXT[A:B], -1))else:打印_红(f"找位置标志'('失败 {LINE_TEXT}")return((LINE_TEXT[:A], '(-1', -1))else:A2 = LINE_TEXT.find(': OID')if A2 != -1:SP = LINE_TEXT[:A2].split()[-1].split('/')return((SP[0], SP[2], A2+1))else:打印_红(f"找位置标志'%%'失败且查找': OID'也失败 {LINE_TEXT}")return(('%-1', LINE_TEXT, -1))# %%01ACLE/4/ACLLOG(l)[xxx]: Acl 3997 deny GigabitEthernet0/0/10 xxxx-xxxx-xxxx -> xxxx-xxxx-xxxx udp x.x.x.x(63877) -> 239.255.255.250(1900) (1 packet).
# %%01ACLE/4/ACLLOG(l)[xxx]: Acl 3997 deny GigabitEthernet0/0/10 xxxx-xxxx-xxxx -> xxxx-xxxx-xxxx igmp x.x.x.x -> 224.0.0.22 (4 packets).
# %%01ACLE/4/ACLLOG(l)[xxx]: Acl      deny GigabitEthernet0/0/14 xxxx-xxxx-xxxx -> xxxx-xxxx-xxxx tcp x.x.x.x(60559) -> x.x.x.x(80) (1 packet).
def ACLE_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LOG_MSG):SP = LOG_MSG.split()if len(SP) == 11:SP = ['X'] + SP     # 偶尔会丢个 acl id 补齐列表长度 'GigabitEthernet0/0/14 Acl'ACL_ID = SP[1]ACL_ST = SP[2]ACL_IF = SP[3]SMAC = SP[4]DMAC = SP[6]PT  = SP[7]SIP = SP[8].split('(')[0]   # 不记录源端口号DIP = SP[10].split('(')[0]  # 不记录目的端口号COUNT = int(SP[11][1:])K = (ACL_IF, ACL_ST, PT, f"{SIP}({SMAC})", f"{DIP}({DMAC})", ACL_ID)if K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE']:D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K] += COUNTelse:D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K] = COUNT# SACL/4/ACLLOG(l)[9487]: Acl 3996 applied Interface GigabitEthernet0/0/6 permit (15591 packets).
# SACL/4/ACLLOG(l)[6146]: Acl 3992 applied Interface  permit (2 packets).
# SACL/4/ACLLOG(l)[6112]: Acl 3992 applied Interface GigabitEthernet0/0/14 permit (174757578 packets).
def SACL_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LOG_MSG):try:SP = LOG_MSG.split()if len(SP) == 7:ACL_ID = SP[1]ACL_IF = SP[4]ACL_ST = SP[5]COUNT = int(SP[6][1:])elif len(SP) == 6:ACL_ID = SP[1]ACL_IF = 'un'ACL_ST = SP[4]COUNT = int(SP[5][1:])else:打印_红(f"ERROR LOG_MSG={LOG_MSG} SP={SP} len(SP)={len(SP)}")K = (ACL_IF, ACL_ST, ACL_ID)if K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL']:D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K] += COUNTelse:D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K] = COUNTexcept Exception as e:打印_红(f"ERROR {e} LOG_MSG={LOG_MSG} SP={SP} len(SP)={len(SP)}")else:pass## 解析SYSLOG日志一行内容
def LINE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT, TIME_LOCAL):TP1, TP2, X = LOG_TYPE(LINE_TEXT)if TP1 == 'ACLE':if TP2 == 'ACLLOG':#打印_黄(LINE_TEXT[X:-1])ACLE_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT[X:-10])else:return((1, TP1, TP2))   ## 终止,不做解析,返回标识代码及日志类型信息elif TP1 == 'SACL':if TP2 == 'ACLLOG':SACL_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT[X:-10])else:return((1, TP1, TP2))elif TP1 == 'SHELL':if TP2 == 'LOGIN':                                 # The user succeeded in logging in to VTY0. (UserType=SSH, UserName=xxx, AuthenticationMethod="Local-user", Ip=xxx.xxx.xxx.xxx, VpnName=)SP = LINE_TEXT[X:-1].split(',')UserType = SP[0].split('=')[-1]UserName = SP[1].split('=')[-1]IP = SP[3].split('=')[-1]D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'].append((TIME_LOCAL, 'LOGIN', UserType, UserName, IP))elif TP2 == 'LOGOUT':                              # The user succeeded in logging out of VTY0. (UserType=SSH, UserName=xxx, Ip=xxx.xxx.xxx.xxx, VpnName=)#print(TP1,TP2,LINE_TEXT[X:-1])SP = LINE_TEXT[X:-1].split(',')UserType = SP[0].split('=')[-1]UserName = SP[1].split('=')[-1]IP = SP[2].split('=')[-1]D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'].append((TIME_LOCAL, 'LOGOUT', UserType, UserName, IP))elif TP2 in ('DISPLAY_CMDRECORD', 'CMDRECORD'):    # %%01SHELL/6/DISPLAY_CMDRECORD(s)[1869]: Recorded display command information. (Task=VT0, Ip=x.x.x.x, VpnName=, User=xx, AuthenticationMethod="Local-user", Command="display stp brief")SP = LINE_TEXT[X:].split(',')IP = SP[1].split('=')[-1]USER = SP[3].split('=')[-1]CMD = SP[5].split('=')[-1][:-2]#打印_青(f"(SHELL, {TP2}) {TIME_LOCAL} {IP} {USER} {CMD}")D_SYSLOG_SWITCH_HUAWEI['L_CMD'].append((TIME_LOCAL, IP, USER, CMD))elif TP2 == 'CMDCONFIRM_UNIFORMRECORD':            # %%01SHELL/6/CMDCONFIRM_UNIFORMRECORD(s)[1867]: Record command information. (Task=VT0, IP=x.x.x.x, VpnName=, User=xx, Command="", PromptInfo="The password needs to be changed. Change now? [Y/N]:", UserInput=N)SP = LINE_TEXT[X:].split(',')IP = SP[1].split('=')[-1]USER = SP[3].split('=')[-1]PromptInfo = SP[5].split('=')[-1]UserInput = SP[6].split('=')[-1][:-2]CMD = f"{PromptInfo} {UserInput}"#打印_青(f"(SHELL, {TP2}) {TIME_LOCAL} {IP} {USER} {CMD}")D_SYSLOG_SWITCH_HUAWEI['L_CMD'].append((TIME_LOCAL, IP, USER, CMD))else:return((1, TP1, TP2))elif TP1 == 'IFPDT':if TP2 == 'PKT_OUTDISCARD_ABNL':    ## 端口【出】方向丢包达到报警阈值SP = LINE_TEXT[X:].split(',')#for i in SP:#    print(i)SW_IF = SP[0].split('=')[-1]SW_DROP = SP[1].split('=')[-1]#print(SW_IF, SW_DROP, 'OUT')D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'OUT', '丢包超过阈值'))elif TP2 == 'PKT_OUTDISCARD_NL':    ## 端口【出】方向恢复正常SP = LINE_TEXT[X:].split(',')SW_IF = SP[0].split('=')[-1]SW_DROP = SP[1].split('=')[-1]#print(SW_IF, SW_DROP, 'OUT')D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'OUT', '恢复'))elif TP2 == 'PKT_INDISCARD_ABNL':   ## 端口【入】方向丢包达到报警阈值SP = LINE_TEXT[X:].split(',')#for i in SP:#    print(i)SW_IF = SP[0].split('=')[-1]SW_DROP = SP[1].split('=')[-1]#print(SW_IF, SW_DROP, 'IN')D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'IN', '丢包超过阈值'))elif TP2 == 'PKT_INDISCARD_NL':     ## 端口【入】方向恢复正常SP = LINE_TEXT[X:].split(',')SW_IF = SP[0].split('=')[-1]SW_DROP = SP[1].split('=')[-1]#print(SW_IF, SW_DROP, 'IN')D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'IN', '恢复'))elif TP2 == 'IF_STATE':SP = LINE_TEXT[X:].split()IF_ID = SP[1]IF_ST = SP[5]if IF_ID not in D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY']:D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][IF_ID] = []D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][IF_ID].append((TIME_LOCAL, IF_ST))else:return((1, TP1, TP2))elif TP1 == 'SECE':D_SYSLOG_SWITCH_HUAWEI['L_SECE'].append(LINE_TEXT)elif TP1 == 'MSTP':D_SYSLOG_SWITCH_HUAWEI['L_STP'].append((TIME_LOCAL, LINE_TEXT[X:-1]))elif TP1 == 'IFNET':print('IFNET', TP2, TIME_LOCAL, LINE_TEXT[X:-1])elif TP1 == 'IFADP':print('IFADP', TP2, TIME_LOCAL, LINE_TEXT[X:-1])elif TP1 == 'SSH':if TP2 == 'SSH_TRANS_FILE_FINISH':D_SYSLOG_SWITCH_HUAWEI['L_FTP'].append((TIME_LOCAL, LINE_TEXT[X:-1]))else:return((1, TP1, TP2))elif TP1 == 'VTY':print('VTY', LINE_TEXT)else:return((1, TP1, TP2))return((0, TP1, TP2))## 解析SYSLOG日志文件
def FILE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX):#print("RUN", FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)TIME_S = time.time()TOT_N = 0SELECT_N = 0解析数量 = 0for LINE_BYTES in open(FILE_PATH, mode='br'):TOT_N += 1try:LINE_TEXT = LINE_BYTES.decode('UTF-8')except Exception as e:打印_红(f"ERROR {TOT_N} LINE_BYTES={LINE_BYTES} {e}")else:TIME_UTC = LINE_TEXT[:19]#TIME_STAMP = time.mktime(time.strptime(TIME_UTC, '%Y-%m-%dT%H:%M:%S'))+28800TIME_STAMP = 日志时间转时间戳(TIME_UTC, 8)TIME_LOCAL = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(TIME_STAMP))if TIME_STAMP_MIN < TIME_STAMP < TIME_STAMP_MAX:SELECT_N += 1#print(TIME_UTC, TIME_LOCAL, 'RUN')ST,TP1,TP2 = LINE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT, TIME_LOCAL)if ST != 0:#打印_红(f"{TOT_N} 未知LOG")#breakif (TP1,TP2) in D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER']:D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'][(TP1,TP2)] += 1else:D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'][(TP1,TP2)] = 1else:解析数量 += 1else:#print(TIME_UTC, TIME_LOCAL, 'PASS')passTIME_RUN = time.time() - TIME_S#打印_绿(f"{FILE_PATH} 完成 处理日志数量 {SELECT_N}/{TOT_N} 筛选数/总日志数 用时={TIME_RUN:.2f}秒")return((FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN))def SYSLOG_SWITCH_HUAWEI(FILE_PATH, SHOW=0):D_SYSLOG_SWITCH_HUAWEI = {}D_SYSLOG_SWITCH_HUAWEI['D_ACL'] = {'ACLE':{}, 'SACL':{}} # ACL规则匹配记录D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'] = {}  # 接口物理断开记录D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK'] = {} # 接口链路断开记录D_SYSLOG_SWITCH_HUAWEI['L_CMD'] = []     # 用户执行命令记录D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'] = []   # 登录信息D_SYSLOG_SWITCH_HUAWEI['L_STP'] = []     # 生成树信息D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'] = [] # 接口丢包D_SYSLOG_SWITCH_HUAWEI['L_SECE'] = []    # 安全事件D_SYSLOG_SWITCH_HUAWEI['L_FTP'] = []D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'] = {} # 未解析日志记录TIME_STAMP_MIN = 0TIME_STAMP_MAX = time.time()    # 默认为当前时间戳R = FILE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)if SHOW == 1:打印_黄("端口丢包 (D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'])")for TIME, SW_IF, SW_DROP, IN_OUT, PS in D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP']:if PS == '恢复':打印_绿(f"    {TIME} {SW_IF} {SW_DROP} {IN_OUT:3s} {PS}")else:打印_红(f"    {TIME} {SW_IF} {SW_DROP} {IN_OUT:3s} {PS}")打印_黄("设备安全 (D_SYSLOG_SWITCH_HUAWEI['L_SECE'])")for i in D_SYSLOG_SWITCH_HUAWEI['L_SECE']:打印_红(f"    {i}")打印_黄("备份配置 (D_SYSLOG_SWITCH_HUAWEI['L_FTP'])")for i in D_SYSLOG_SWITCH_HUAWEI['L_FTP']:打印_青(f"    {i}")打印_黄("生成树   (D_SYSLOG_SWITCH_HUAWEI['L_STP'])")for i in D_SYSLOG_SWITCH_HUAWEI['L_STP']:打印_红(f"    {i}")打印_黄("访问规则 (D_SYSLOG_SWITCH_HUAWEI['D_ACL'])")for K1 in D_SYSLOG_SWITCH_HUAWEI['D_ACL']:if K1 == 'ACLE':L_K2 = [i for i in D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1]]L_K2.sort()for K2 in L_K2:ACL_IF, ACL_ST, PT, SIPSMAC, DIPDMAC, ACL_ID = K2if ACL_ST == 'deny':打印_红(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ID:4s} {ACL_ST:5s} {PT:4s} {SIPSMAC:31s} -> {DIPDMAC}")else:打印_绿(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ID:4s} {ACL_ST:5s} {PT:4s} {SIPSMAC:31s} -> {DIPDMAC}")elif K1 == 'SACL':L_K2 = [i for i in D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1]]L_K2.sort()for K2 in L_K2:ACL_IF, ACL_ST, ACL_ID = K2if ACL_ST == 'deny':打印_红(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ST:5s}")else:打印_绿(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ST:5s}")打印_黄("接口物理状态 (D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'])")for K in D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY']:print(f"    {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][K])}")for TIME_LOCAL, IF_ST in D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][K]:if IF_ST == 'DOWN':打印_红(f"        {TIME_LOCAL} DOWN")else:打印_绿(f"        {TIME_LOCAL} {IF_ST}")#打印_黄("接口逻辑状态 (D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK'])")#for K in D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK']:#    print(f"    {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK'][K])}")打印_黄("登录登出详细 (D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'])")for TIME_LOCAL, ST, UserType, UserName, IP in D_SYSLOG_SWITCH_HUAWEI['L_LOGIN']:if ST == 'LOGIN':打印_青(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {UserType} {UserName}")elif ST == 'LOGOUT':打印_蓝(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {UserType} {UserName}")else:打印_红(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {UserType} {UserName}")打印_黄("操作命令 (D_SYSLOG_SWITCH_HUAWEI['L_CMD'])")for TIME_LOCAL, IP, USER, CMD in D_SYSLOG_SWITCH_HUAWEI['L_CMD']:打印_青(f"    {TIME_LOCAL} {IP:15s} {USER:8s} {CMD}")打印_黄("忽略解析日志 (D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'])")L_K = [i for i in D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER']]L_K.sort()for K in L_K:print(f"{D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'][K]:5} {K}")FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN = R时间文本格式 = '%Y-%m-%d %H:%M:%S'打印_红(f"日志路径 : {FILE_PATH}")打印_青(f"开始/结束: {时间戳_2_时间文本(TIME_STAMP_MIN, 时间文本格式)} / {时间戳_2_时间文本(TIME_STAMP_MAX, 时间文本格式)}")打印_绿(f"解析/筛选/总数: {解析数量}/{SELECT_N}/{TOT_N} {(解析数量/TOT_N)*100:.2f}(%)/{(SELECT_N/TOT_N)*100:.2f}(%)/100(%)  用时: {TIME_RUN:.2f}秒")## 返回需要的信息(ACL放行或阻止统计)D_RETURN = {'PERMIT':{}, 'DENY':{}, 'OTHER':{}}#print(D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'])for K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE']:ACL_IF = K[0]ACL_ST = K[1]ACL_ID = K[5]KEY_NEW = f"{ACL_IF} {ACL_ID}"if ACL_ST.upper() == 'PERMIT':if KEY_NEW not in D_RETURN['PERMIT']:D_RETURN['PERMIT'][KEY_NEW] = 0D_RETURN['PERMIT'][KEY_NEW] += D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K]elif ACL_ST.upper() == 'DENY':if KEY_NEW not in D_RETURN['DENY']:D_RETURN['DENY'][KEY_NEW] = 0D_RETURN['DENY'][KEY_NEW] += D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K]else:if KEY_NEW not in D_RETURN['OTHER']:D_RETURN['OTHER'][KEY_NEW] = 0D_RETURN['OTHER'][KEY_NEW] += D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K]#print(D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'])for K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL']:ACL_IF, ACL_ST, ACL_ID = KKEY_NEW = f"{ACL_IF} {ACL_ID}"if ACL_ST.upper() == 'PERMIT':D_RETURN['PERMIT'][KEY_NEW] = D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K]elif ACL_ST.upper() == 'DENY':D_RETURN['DENY'][KEY_NEW] = D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K]else:D_RETURN['OTHER'][KEY_NEW] = D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K]return(D_RETURN)if __name__ == '__main__':FILE_PATH = '华为交换机日志文件路径'SHOW = 1    # 查看重要日志信息D_ACL_INFO = SYSLOG_SWITCH_HUAWEI(FILE_PATH, SHOW)print(D_ACL_INFO)

DEF_SYSLOG_SWITCH_H3C.py

华三交换机日志示例

# -*- coding: utf8 -*-
import time
from DEF_COLOR import *   ## 终端显示颜色def 时间戳_2_时间文本(时间戳, 时间文本格式='%Y-%m-%d %H:%M:%S'):#时间文本格式 = '%Y-%m-%d %H:%M:%S'时间类 = time.localtime(时间戳)时间文本 = time.strftime(时间文本格式, 时间类)return(时间文本)def 日志时间转时间戳(时间文本):时间戳 = time.mktime(time.strptime(时间文本, '%Y-%m-%dT%H:%M:%S'))return(时间戳)def 参数时间转时间戳(TEXT):SP_DATE = TEXT.split('T')if len(SP_DATE) == 2:DATE = SP_DATE[0]SP_TIME = SP_DATE[1].split(':')if len(SP_TIME) == 1:M_TIME = '%H'elif len(SP_TIME) == 2:M_TIME = '%H:%M'elif len(SP_TIME) == 3:M_TIME = '%H:%M:%S'DATE_TIME = TEXTelse:DATE = time.strftime('%Y-%m-%d')SP_TIME = TEXT.split(':')if len(SP_TIME) == 1:M_TIME = '%H'elif len(SP_TIME) == 2:M_TIME = '%H:%M'elif len(SP_TIME) == 3:M_TIME = '%H:%M:%S'DATE_TIME = f"{DATE}T{TEXT}"#print(f"TEXT={TEXT}")#print(f"DATE={DATE}")#print(f"M_TIME={M_TIME}")#print(f"DATE_TIME={DATE_TIME}")M_DATE_TIME = f"%Y-%m-%dT{M_TIME}"TIME_STAMP = time.mktime(time.strptime(DATE_TIME, M_DATE_TIME))#print(f"TIME_STAMP={TIME_STAMP}")return(TIME_STAMP)def 转时间戳(TEXT):SP_DATE = TEXT.split('T')if len(SP_DATE) == 2:DATE = SP_DATE[0]SP_TIME = SP_DATE[1].split(':')if len(SP_TIME) == 1:M_TIME = '%H'elif len(SP_TIME) == 2:M_TIME = '%H:%M'elif len(SP_TIME) == 3:M_TIME = '%H:%M:%S'DATE_TIME = TEXTelse:DATE = time.strftime('%Y-%m-%d')SP_TIME = TEXT.split(':')if len(SP_TIME) == 1:M_TIME = '%H'elif len(SP_TIME) == 2:M_TIME = '%H:%M'elif len(SP_TIME) == 3:M_TIME = '%H:%M:%S'DATE_TIME = f"{DATE}T{TEXT}"#print(f"TEXT={TEXT}")#print(f"DATE={DATE}")#print(f"M_TIME={M_TIME}")print(f"DATE_TIME={DATE_TIME}")M_DATE_TIME = f"%Y-%m-%dT{M_TIME}"TIME_STAMP = time.mktime(time.strptime(DATE_TIME, M_DATE_TIME))#print(f"TIME_STAMP={TIME_STAMP}")return(TIME_STAMP)def LOG_TYPE(LINE_TEXT):A = LINE_TEXT.find('%%')         # 主机名 %%10ACL/6/PFILTER_STATIS_INFO: GigabitEthernet1/0/25 (outbound): Packet-filter if A != -1:#打印_黄(LINE_TEXT[A:])B = LINE_TEXT.find(':', A)   # 找日志类型结尾标识符号if B != -1:#打印_绿(LINE_TEXT[B:])SP = LINE_TEXT[A+4:B].split('/')    # CFGMAN/4/TRAP(t)if SP[2][-1] == ')':IDX = SP[2].find('(')if IDX != -1:return((SP[0], SP[2][:IDX], B+1))else:return((SP[0], SP[2], B+1))else:return((SP[0], SP[2], B+1))else:打印_红(f"找位置标志':'失败(日志类型结尾标识) {LINE_TEXT}")return((A, ':-1', -1))else:打印_红(f"找位置标志'%%' {LINE_TEXT}")return(('%%-1', LINE_TEXT, -1))##GigabitEthernet1/0/3 (outbound): Packet-filter 2205 rule 97 deny source 192.168.0.0 0.0.255.255 logging 14 packet(s).
##GigabitEthernet1/0/3 (inbound):  Packet-filter name in_log rule 40 permit ip destination 192.168.0.0 0.0.255.255 logging 20 packet(s).
def ACL_PFILTER_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LOG_MSG):try:IDX_1 = LOG_MSG.index(': Packet-filter ')IDX_2 = LOG_MSG.index('logging')IDX_3 = LOG_MSG.index(' packet')except:打印_红(f"ERR {LOG_MSG}")else:SW_IF = LOG_MSG[:IDX_1]if LOG_MSG[IDX_1+16:IDX_1+20] == 'name':RULE = LOG_MSG[IDX_1+21:IDX_2-1]else:RULE = LOG_MSG[IDX_1+16:IDX_2-1]NCOUNT = int(LOG_MSG[IDX_2+8:IDX_3])#打印_绿(f"|{SW_IF}|{RULE}|{NCOUNT}|")if (SW_IF, RULE) in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4']:D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][(SW_IF, RULE)] += NCOUNTelse:D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][(SW_IF, RULE)] = NCOUNT##GigabitEthernet1/0/5 (inbound): Packet-filter IPv6 name ipv6_deny_in rule 0 deny logging 129 packet(s).
##GigabitEthernet1/0/5 (inbound): Packet-filter IPv6 name ipv6_deny_in rule 0 deny logging 752 packet(s).
##GigabitEthernet1/0/2 (inbound): Packet-filter IPv6 name ipv6_deny_in rule 0 deny logging 12 packet(s).
def ACL_PFILTER_IPV6_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LOG_MSG):##打印_黄(LOG_MSG)try:IDX_1 = LOG_MSG.index(': Packet-filter ')IDX_2 = LOG_MSG.index('logging')IDX_3 = LOG_MSG.index(' packet')except:打印_红(f"ERR {LOG_MSG}")else:SW_IF = LOG_MSG[:IDX_1]#print(f"SW_IF={SW_IF}")if LOG_MSG[IDX_1+21:IDX_1+25] == 'name':#print(f"LOG_MSG[IDX_1+26:IDX_2-1]={LOG_MSG[IDX_1+26:IDX_2-1]}")RULE = LOG_MSG[IDX_1+26:IDX_2-1]else:RULE = LOG_MSG[IDX_1+21:IDX_2-1]NCOUNT = int(LOG_MSG[IDX_2+8:IDX_3])#打印_绿(f"|{SW_IF}|{RULE}|{NCOUNT}|")if (SW_IF, RULE) in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6']:D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][(SW_IF, RULE)] += NCOUNTelse:D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][(SW_IF, RULE)] = NCOUNT## ARP 超过阈值
##The ARP packet rate(6 pps) exceeded the rate limit(5 pps) on interface GigabitEthernet1/0/19 in the last 60 seconds.
##The ARP packet rate(118 pps) exceeded the rate limit(100 pps) on interface GigabitEthernet1/0/25 in the last 60 seconds.
##The ARP packet rate(100 pps) exceeded the rate limit(100 pps) on interface GigabitEthernet1/0/25 in the last 60 seconds.
def ARP_RATE_EXCEEDED(D_SYSLOG_SWITCH_H3C, LOG_MSG):SP = LOG_MSG.split()实际 = SP[3]限制 = SP[8]接口 = SP[12]实际值 = int(实际[5:])限制值 = int(限制[6:])#print(f"{实际}|{实际值}|{限制}|{限制值}|{接口}")if (接口,限制值) in D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED']:D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'][(接口,限制值)]+= 实际值else:D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'][(接口,限制值)] = 实际值## 广播超阈值次数(广播风暴)
##GigabitEthernet1/0/21 is in normal status, BC flux exceeds its upper threshold 5 pps.
def STORM_CONSTRAIN_EXCEED(D_SYSLOG_SWITCH_H3C, LOG_MSG):接口 = LOG_MSG.split()[0]if 接口 in D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED']:D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'][接口]+= 1else:D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'][接口] = 1## 解析SYSLOG日志一行内容
def LINE_H3C(D_SYSLOG_SWITCH_H3C, LINE_TEXT, TIME_LOCAL):TP1, TP2, X = LOG_TYPE(LINE_TEXT)if TP1 == 'ACL':if TP2 == 'PFILTER_STATIS_INFO':#打印_黄(LINE_TEXT[X:-1])ACL_PFILTER_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])elif TP2 == 'PFILTER_IPV6_STATIS_INFO':ACL_PFILTER_IPV6_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])else:return((1, TP1, TP2))           ## 终止,不做解析,返回标识代码及日志类型信息elif TP1 == 'IFNET':if TP2 == 'PHY_UPDOWN':             ## Physical state on the interface GigabitEthernet1/0/8 changed to down.#打印_黄(LINE_TEXT[X+1:-1])SP = LINE_TEXT[X+1:-1].split()IF_ID = SP[5]IF_ST = SP[-1]#print(f'PHY  {IF_ID:21s} {IF_ST:5s} {TIME_LOCAL}')if IF_ID not in D_SYSLOG_SWITCH_H3C['D_IF_PHY']:D_SYSLOG_SWITCH_H3C['D_IF_PHY'][IF_ID] = []D_SYSLOG_SWITCH_H3C['D_IF_PHY'][IF_ID].append((TIME_LOCAL, IF_ST))elif TP2 == 'LINK_UPDOWN':          ## Line protocol state on the interface GigabitEthernet1/0/8 changed to down.#打印_黄(LINE_TEXT[X+1:-1])SP = LINE_TEXT[X+1:-1].split()IF_ID = SP[6]IF_ST = SP[-1]#print(f'LINK {IF_ID:21s} {IF_ST:5s} {TIME_LOCAL}')if IF_ID not in D_SYSLOG_SWITCH_H3C['D_IF_LINK']:D_SYSLOG_SWITCH_H3C['D_IF_LINK'][IF_ID] = []D_SYSLOG_SWITCH_H3C['D_IF_LINK'][IF_ID].append((TIME_LOCAL, IF_ST))elif TP2 == 'STORM_CONSTRAIN_EXCEED':STORM_CONSTRAIN_EXCEED(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])elif TP2 == 'STORM_CONSTRAIN_BELOW':#print(LINE_TEXT)passelse:return((1, TP1, TP2))elif TP1 == 'LLDP':if TP2 == 'LLDP_CREATE_NEIGHBOR':   ## Nearest bridge agent neighbor created on port GigabitEthernet1/0/10 (IfIndex 10), neighbor's chassis ID is xxxx-xxxx-xxxx, port ID is xxxx-xxxx-xxxx.SP = LINE_TEXT[X+1:-1].split()IF_ID  = SP[7]IF_MAC = SP[14]D_SYSLOG_SWITCH_H3C['L_LLDP'].append((TIME_LOCAL, 'CREATE', IF_ID, IF_MAC))elif TP2 == 'LLDP_DELETE_NEIGHBOR': ## Nearest bridge agent neighbor deleted on port GigabitEthernet1/0/6 (IfIndex 6), neighbor's chassis ID is xxxx-xxxx-xxxx, port ID is xxxx-xxxx-xxxx.SP = LINE_TEXT[X+1:-1].split()IF_ID  = SP[7]IF_MAC = SP[14]D_SYSLOG_SWITCH_H3C['L_LLDP'].append((TIME_LOCAL, 'DELETE', IF_ID, IF_MAC))else:return((1, TP1, TP2))elif TP1 == 'SSHS':if TP2 == 'SSHS_LOG':               ## Accepted password for xxx from xxx.xxx.xxx.xxx port 55598.pass#打印_红(LINE_TEXT.rstrip('\n'))elif TP2 == 'SSHS_SFTP_OPER':#打印_黄(LINE_TEXT[X+1:-1])D_SYSLOG_SWITCH_H3C['L_FTP'].append((TIME_LOCAL, LINE_TEXT[X+1:-1]))else:return((1, TP1, TP2))elif TP1 == 'SHELL':if TP2 == 'SHELL_CMD':               ## -Line=vty0-IPAddr=xxx.xxx.xxx.xxx-User=xxx; Command is dis cuIndex_IP = LINE_TEXT.index('-IPAddr=')Index_USER = LINE_TEXT.index('-User=')IP = LINE_TEXT[Index_IP+8:Index_USER]LOG_CMD = LINE_TEXT[Index_USER+6:].rstrip('\n')USER = LOG_CMD.split(';')[0]CMD = LOG_CMD[len(USER)+13:]##打印_红(f"{USER} {CMD}")D_SYSLOG_SWITCH_H3C['L_CMD'].append((TIME_LOCAL, IP, USER, CMD))elif TP2 == 'SHELL_LOGIN':           ## xxx logged in from xxx.xxx.xxx.xxx.SP = LINE_TEXT[X+1:-1].split()USER = SP[0]IP = SP[4]D_SYSLOG_SWITCH_H3C['L_LOGIN'].append(('LOGIN', TIME_LOCAL, USER, IP))elif TP2 == 'SHELL_LOGOUT':          ## xxx logged out from xxx.xxx.xxx.xxx.SP = LINE_TEXT[X+1:-1].split()USER = SP[0]IP = SP[4]D_SYSLOG_SWITCH_H3C['L_LOGIN'].append(('LOGOUT', TIME_LOCAL, USER, IP))else:return((1, TP1, TP2))elif TP1 == 'CFGMAN':if TP2 == 'CFGMAN_CFGCHANGED':       ## -EventIndex=74-CommandSource=snmp-ConfigSource=startup-ConfigDestination=running; Configuration changed.D_SYSLOG_SWITCH_H3C['L_CFGCHANGED'].append(TIME_LOCAL)else:return((1, TP1, TP2))elif TP1 == 'ARP':#print(f"LINE_TEXT={LINE_TEXT}")if TP2 == 'ARP_RATE_EXCEEDED':      ## %%10ARP/4/ARP_RATE_EXCEEDED: The ARP packet rate(100 pps) exceeded the rate limit(100 pps) on interface GigabitEthernet1/0/25 in the last 60 secondsARP_RATE_EXCEEDED(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])else:return((1, TP1, TP2))else:return((1, TP1, TP2))return((0, TP1, TP2))## 解析SYSLOG日志文件
def FILE_H3C(D_SYSLOG_SWITCH_H3C, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX):#print("RUN", FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)TIME_S = time.time()TOT_N = 0SELECT_N = 0解析数量 = 0for LINE_BYTES in open(FILE_PATH, mode='br'):TOT_N += 1try:LINE_TEXT = LINE_BYTES.decode('UTF-8')except Exception as e:打印_红(f"ERROR {TOT_N} LINE_BYTES={LINE_BYTES} {e}")else:TIME_UTC = LINE_TEXT[:19]TIME_STAMP = time.mktime(time.strptime(TIME_UTC, '%Y-%m-%dT%H:%M:%S'))TIME_LOCAL = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(TIME_STAMP))if TIME_STAMP_MIN < TIME_STAMP < TIME_STAMP_MAX:SELECT_N += 1#print(TIME_UTC, TIME_LOCAL, 'RUN')ST,TP1,TP2 = LINE_H3C(D_SYSLOG_SWITCH_H3C, LINE_TEXT, TIME_LOCAL)if ST != 0:#打印_红(f"{TOT_N} 未知LOG")#breakif (TP1,TP2) in D_SYSLOG_SWITCH_H3C['D_LOG_OTHER']:D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'][(TP1,TP2)] += 1else:D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'][(TP1,TP2)] = 1else:解析数量 += 1else:#print(TIME_UTC, TIME_LOCAL, 'PASS')passTIME_RUN = time.time() - TIME_Sreturn((FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN))def SYSLOG_SWITCH_H3C(FILE_PATH, SHOW=0):D_SYSLOG_SWITCH_H3C = {}D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'] = {} # 重点ACL规则匹配记录D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'] = {} # 重点ACL规则匹配记录D_SYSLOG_SWITCH_H3C['D_IF_PHY']  = {} # 接口物理断开记录D_SYSLOG_SWITCH_H3C['D_IF_LINK'] = {} # 接口链路断开记录D_SYSLOG_SWITCH_H3C['L_LLDP']    = [] # 邻接设备记录D_SYSLOG_SWITCH_H3C['D_IF_MAC']  = {} # 接口电脑MAC记录D_SYSLOG_SWITCH_H3C['L_CMD']     = [] # 用户执行命令记录D_SYSLOG_SWITCH_H3C['L_LOGIN']   = [] # 登录信息D_SYSLOG_SWITCH_H3C['L_FTP']     = []D_SYSLOG_SWITCH_H3C['L_CFGCHANGED'] = []  # 配置被修改时间记录D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'] = {} # 未解析日志记录D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'] = {} # ARP限制D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'] = {}   #广播风暴达到阈值次数TIME_STAMP_MIN = 0TIME_STAMP_MAX = time.time()    # 默认为当前时间戳R = FILE_H3C(D_SYSLOG_SWITCH_H3C, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)if SHOW == 1:打印_黄("D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'] 匹配ACL日志")L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4']]L_K.sort()for K in L_K:PORT, RULE = KSP_RULE = RULE.split()if len(SP_RULE) > 2:if SP_RULE[3] == 'permit':打印_绿(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]:8}  {PORT:33s} {RULE}")elif SP_RULE[3] == 'deny':打印_红(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]:8}  {PORT:33s} {RULE}")else:print(f"ERR1 SP_RULE[3]={SP_RULE[3]} {K} {FILE_PATH}")else:print(f"ERR2 {K} {FILE_PATH}")打印_黄("D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'] 匹配ACL日志")L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6']]L_K.sort()for K in L_K:PORT, RULE = KSP_RULE = RULE.split()if len(SP_RULE) > 2:if SP_RULE[3] == 'permit':打印_绿(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][K]:8}  {PORT:33s} {RULE}")elif SP_RULE[3] == 'deny':打印_红(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][K]:8}  {PORT:33s} {RULE}")else:print(f"ERR1 SP_RULE[3]={SP_RULE[3]} {K} {FILE_PATH}")else:print(f"ERR2 {K} {FILE_PATH}")打印_黄("D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'] ARP 发包超过阈值")L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED']]L_K.sort()for K in L_K:合计 = D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'][K]接口,限制值 = K打印_青(f"{合计:8}  {接口:22s}({限制值})")打印_黄("D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'] 广播风暴发包超过阈值")L = [(D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'][i],i) for i in D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED']]L.sort()for 次数,接口 in L:打印_红(f"{次数:8}  {接口:22s}")打印_黄("D_SYSLOG_SWITCH_H3C['L_LLDP'] 邻居设备变化")for TIME_LOCAL, ST, IF_ID, IF_MAC in D_SYSLOG_SWITCH_H3C['L_LLDP']:if ST == 'CREATE':打印_绿(f"    {TIME_LOCAL} {IF_ID} {IF_MAC} {ST}")else:打印_红(f"    {TIME_LOCAL} {IF_ID} {IF_MAC} {ST}")打印_黄("D_SYSLOG_SWITCH_H3C['D_IF_PHY'] 端口物理线路变化")for K in D_SYSLOG_SWITCH_H3C['D_IF_PHY']:打印_红(f"    {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_H3C['D_IF_PHY'][K])}")#打印_黄('D_SYSLOG_SWITCH_H3C['D_IF_LINK']')#for K in D_SYSLOG_SWITCH_H3C['D_IF_LINK']:#    print(f"    {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_H3C['D_IF_LINK'][K])}")打印_黄("D_SYSLOG_SWITCH_H3C['L_CMD']")for TIME_LOCAL, IP, USER, CMD in D_SYSLOG_SWITCH_H3C['L_CMD']:打印_青(f"    {TIME_LOCAL} {IP:15s} {USER:8s} '{CMD}'")打印_黄("D_SYSLOG_SWITCH_H3C['L_LOGIN'] 登录登出详细")for ST, TIME_LOCAL, USER, IP in D_SYSLOG_SWITCH_H3C['L_LOGIN']:if ST == 'LOGIN':打印_青(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {USER}")elif ST == 'LOGOUT':打印_蓝(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {USER}")else:打印_红(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {USER}")打印_黄("D_LOGIN 登录汇总")D_LOGIN = {}for ST, TIME_LOCAL, USER, IP in D_SYSLOG_SWITCH_H3C['L_LOGIN']:if ST == 'LOGIN':if (IP, USER) not in D_LOGIN:D_LOGIN[(IP, USER)] = []D_LOGIN[(IP, USER)].append(TIME_LOCAL)for K in D_LOGIN:打印_红(f"    {K} {D_LOGIN[K]}")打印_黄("D_SYSLOG_SWITCH_H3C['L_CFGCHANGED'] 配置被修改时间记录")for i in D_SYSLOG_SWITCH_H3C['L_CFGCHANGED']:打印_紫(f"    {i}")打印_黄("D_SYSLOG_SWITCH_H3C['L_FTP']")for TIME_LOCAL, LOG_TEXT in D_SYSLOG_SWITCH_H3C['L_FTP']:打印_青(f"    {TIME_LOCAL} {LOG_TEXT}")打印_黄("D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'] 未解析日志记录")L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_LOG_OTHER']]L_K.sort()for K in L_K:print(f"{D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'][K]:5} {K}")FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN = R时间文本格式 = '%Y-%m-%d %H:%M:%S'打印_红(f"日志路径 : {FILE_PATH}")打印_青(f"开始/结束: {时间戳_2_时间文本(TIME_STAMP_MIN, 时间文本格式)} / {时间戳_2_时间文本(TIME_STAMP_MAX, 时间文本格式)}")打印_绿(f"解析/筛选/总数: {解析数量}/{SELECT_N}/{TOT_N} {(解析数量/TOT_N)*100:.2f}(%)/{(SELECT_N/TOT_N)*100:.2f}(%)/100(%)  用时: {TIME_RUN:.2f}秒")## 返回需要的信息D_RETURN = {'PERMIT':{}, 'DENY':{}, 'OTHER':{}}#print(D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'])for K in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4']:PORT, RULE = KSP_RULE = RULE.split()KEY_NEW = f"{PORT} {SP_RULE[0]}"if len(SP_RULE) > 2:if SP_RULE[3] == 'permit':if KEY_NEW not in D_RETURN['PERMIT']:D_RETURN['PERMIT'][KEY_NEW] = 0D_RETURN['PERMIT'][KEY_NEW] += D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]elif SP_RULE[3] == 'deny':if KEY_NEW not in D_RETURN['DENY']:D_RETURN['DENY'][KEY_NEW] = 0D_RETURN['DENY'][KEY_NEW] += D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]else:print(f"ERR1 SP_RULE[3]={SP_RULE[3]} {K} {FILE_PATH}")else:print(f"ERR2 {K} {FILE_PATH}")return(D_RETURN)if __name__ == '__main__':FILE_PATH = '华三交换机日志文件路径'SHOW = 1    # 查看重要日志信息D_ACL_INFO = SYSLOG_SWITCH_H3C(FILE_PATH, SHOW)print(D_ACL_INFO)

DEF_SYSLOG_USG.py

华为USG防火墙

# -*- coding: utf8 -*-
import time
from DEF_COLOR import *   ## 终端显示颜色def 时间戳_2_时间文本(时间戳, 时间文本格式='%Y-%m-%d %H:%M:%S'):#时间文本格式 = '%Y-%m-%d %H:%M:%S'时间类 = time.localtime(时间戳)时间文本 = time.strftime(时间文本格式, 时间类)return(时间文本)def USG_LOG_TYPE(LINE_TEXT):A = LINE_TEXT.find('%')     ## 2022-12-11T16:03:31+08:00 IPS6515E %%01SECIF/6/STREAM(l)[136014]: In Last Five Minutesif A != -1:#打印_黄(LINE_TEXT[A:])B = LINE_TEXT.find('(', A)if B != -1:#打印_绿(LINE_TEXT[B:])C = LINE_TEXT.find(':', B)  # 找日志正文开始位置标志if C != -1:#打印_蓝(LINE_TEXT[C:])#打印_青(LINE_TEXT[A+4:B])SP = LINE_TEXT[A+4:B].split('/')return((SP[0], SP[2], C+1))else:打印_红(f"找位置标志':'失败(日志正文开始位) {LINE_TEXT}")return((':-1', LINE_TEXT[A:B], -1))else:打印_红(f"找位置标志'('失败 {LINE_TEXT}")return((LINE_TEXT[:A], '(-1', -1))else:#打印_红(f"找位置标志'%'失败 {LINE_TEXT}")A2 = LINE_TEXT.find('DS/4/DATASYNC_CFGCHANGE')if A2 != -1:return(('DS', 'DATASYNC_CFGCHANGE', A2+1))else:打印_红(f"找位置标志'%'失败 {LINE_TEXT}")return(('%-1', LINE_TEXT, -1))## 解析SYSLOG日志一行内容
# POLICY 规则匹配日志 规则内配置 policy logging 产生
# SECLOG/4/PACKET_DENY 丢包信息 规则内配置 session logging 产生
def USG_LOG_LINE(D_LOG_USG, LINE_TEXT, TIME_LOCAL):TP1, TP2, X = USG_LOG_TYPE(LINE_TEXT)if TP1 == 'POLICY':if TP2 in ('POLICYDENY', 'POLICYPERMIT'):#print(TP1, TP2, LINE_TEXT[X:-1])SP = LINE_TEXT[X:-1].split(',')PT = SP[1].split('=')[-1]SIP = SP[2].split('=')[-1]SPORT = SP[3].split('=')[-1]DIP   = SP[4].split('=')[-1]DPORT = SP[5].split('=')[-1]TIME  = SP[6].split('=')[-1]SZONE = SP[7].split('=')[-1]DZONE = SP[8].split('=')[-1]RULE_NAME = SP[9].split('=')[-1][:-1]if PT == '6':PT = 'TCP'elif PT == '17':PT = 'UDP'D_LOG_USG['L_SEC_RULE'].append((TIME, PT, SIP, SPORT, DIP, DPORT, SZONE, DZONE, RULE_NAME, TP2[6:]))else:return((TP1, TP2))elif TP1 == 'SECLOG':if TP2 == 'PACKET_DENY':    # IPVer=4,Protocol=tcp,SourceIP=89.248.164.165,DestinationIP=183.129.153.43,SourcePort=48397,DestinationPort=8888,DestinationNatIP=192.168.200.112,DestinationNatPort=8888,BeginTime=1671059311,EndTime=1671059311,SourceVpnID=0,DestinationVpnID=0,SourceZone=untrust,DestinationZone=trust,PolicyName=HW,CloseReason=policy-deny.#print(TP1, TP2, LINE_TEXT[X:-1])SP = LINE_TEXT[X:-1].split(',')IPVer = SP[0].split('=')[-1]Protocol = SP[1].split('=')[-1]SIP = SP[2].split('=')[-1]DIP   = SP[3].split('=')[-1]SPORT = SP[4].split('=')[-1]DPORT = SP[5].split('=')[-1]D_NAT_IP = SP[6].split('=')[-1]D_NAT_PORT = SP[7].split('=')[-1]BeginTime = int(SP[8].split('=')[-1])EndTime = int(SP[9].split('=')[-1])SZONE = SP[12].split('=')[-1]DZONE = SP[13].split('=')[-1]PolicyName = SP[14].split('=')[-1]CloseReason = SP[15].split('=')[-1][:-1]if CloseReason == 'policy-deny':丢包类型 = '安全策略丢包'elif CloseReason == 'default-policy-deny':丢包类型 = '缺省包过滤丢包'elif CloseReason == 'session miss':丢包类型 = '未命中会话丢包'elif CloseReason == 'others':丢包类型 = '其他类型丢包'else:丢包类型 = f'未知类型丢包:{CloseReason}'D_LOG_USG['L_SEC_PACKET'].append((TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, 丢包类型))elif TP2 == 'SESSION_TEARDOWN':SP = LINE_TEXT[X:-1].split(',')IPVer = SP[0].split('=')[-1]Protocol = SP[1].split('=')[-1]SIP = SP[2].split('=')[-1]DIP   = SP[3].split('=')[-1]SPORT = SP[4].split('=')[-1]DPORT = SP[5].split('=')[-1]D_NAT_IP = SP[6].split('=')[-1]D_NAT_PORT = SP[7].split('=')[-1]BeginTime = int(SP[8].split('=')[-1])EndTime = int(SP[9].split('=')[-1])SendPkts =  int(SP[10].split('=')[-1])SendBytes = int(SP[11].split('=')[-1])RcvPkts =   int(SP[12].split('=')[-1])RcvBytes =  int(SP[13].split('=')[-1])SZONE = SP[16].split('=')[-1]DZONE = SP[17].split('=')[-1]PolicyName = SP[18].split('=')[-1]CloseReason = SP[19].split('=')[-1][:-1]D_LOG_USG['L_SEC_SESSION'].append((TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, CloseReason))else:return((TP1, TP2))elif TP1 == 'SHELL':if TP2 in ('LOGIN', 'LOGOUT'):#SP = LINE_TEXT[X:-1].split(',')#print(SP)#用户类型 = SP[1].split('=')[-1]#用户名 = SP[2].split('=')[-1]#用户地址 = SP[4].split('=')[-1]#D_LOG_USG['L_LOGIN'].append((TIME_LOCAL, TP2, 用户类型, 用户名, 用户地址))D_LOG_USG['L_LOGIN'].append((TIME_LOCAL, TP1, TP2, LINE_TEXT[X:-1]))else:return((TP1, TP2))elif TP1 == 'PHY':if TP2 in ('STATUSDOWN', 'STATUSUP'):接口号 = LINE_TEXT[X:-1].split(':')[0]IF_ST = TP2[6:]if 接口号 in D_LOG_USG['D_IF_PHY']:D_LOG_USG['D_IF_PHY'][接口号].append((TIME_LOCAL, IF_ST))else:D_LOG_USG['D_IF_PHY'][接口号] = [(TIME_LOCAL, IF_ST)]else:return((TP1, TP2))else:if TP1 == '':打印_紫(LINE_TEXT[X:-1])return((TP1, TP2))return(0)## 解析SYSLOG日志文件
def USG_LOG_FILE(D_LOG_USG, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX):#print("RUN", FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)TIME_S = time.time()TOT_N = 0SELECT_N = 0for LINE_BYTES in open(FILE_PATH, mode='br'):TOT_N += 1try:LINE_TEXT = LINE_BYTES.decode('GB2312')except Exception as e:打印_红(f"ERROR {TOT_N} LINE_BYTES={LINE_BYTES} {e}")else:TIME_UTC = LINE_TEXT[:19]TIME_STAMP = time.mktime(time.strptime(TIME_UTC, '%Y-%m-%dT%H:%M:%S'))#TIME_STAMP = 日志时间转时间戳(TIME_UTC, 8)TIME_LOCAL = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(TIME_STAMP))if TIME_STAMP_MIN < TIME_STAMP < TIME_STAMP_MAX:SELECT_N += 1#print(TIME_UTC, TIME_LOCAL, 'RUN')R = USG_LOG_LINE(D_LOG_USG, LINE_TEXT, TIME_LOCAL)if R != 0:#打印_红(f"{TOT_N} 未知LOG")#breakif R in D_LOG_USG['D_LOG_OTHER']:D_LOG_USG['D_LOG_OTHER'][R] += 1else:D_LOG_USG['D_LOG_OTHER'][R] = 1else:#print(TIME_UTC, TIME_LOCAL, 'PASS')passTIME_RUN = time.time() - TIME_S#打印_绿(f"{FILE_PATH} 完成 处理日志数量 {SELECT_N}/{TOT_N} 筛选数/总日志数 用时={TIME_RUN:.2f}秒")return((FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, TIME_RUN))def 秒数转时长表示(总秒数):天数 = 总秒数//86400剩余秒数 = 总秒数%86400时数 = 剩余秒数//3600剩余秒数 = 剩余秒数%3600分数 = 剩余秒数//60剩余秒数 = 剩余秒数%60#print(f"{天数}:{时数}:{分数}:{剩余秒数}(天:时:分:秒)")return(f"{天数:02}:{时数:02}:{分数:02}:{剩余秒数:02}")def USG(PATH_SYSLOG_FILE, SHOW=0):D_LOG_USG = {}D_LOG_USG['L_SEC_RULE'] = []D_LOG_USG['L_SEC_PACKET'] = []D_LOG_USG['L_SEC_SESSION'] = []D_LOG_USG['D_IF_PHY']  = {} # 接口物理断开记录D_LOG_USG['D_IF_LINK'] = {} # 接口链路断开记录D_LOG_USG['L_CMD'] = []     # 用户执行命令记录D_LOG_USG['L_LOGIN'] = []   # 登录信息D_LOG_USG['D_LOG_OTHER'] = {} # 未解析日志记录TIME_STAMP_MIN = 0TIME_STAMP_MAX = time.time()    # 默认为当前时间戳R = USG_LOG_FILE(D_LOG_USG, PATH_SYSLOG_FILE, TIME_STAMP_MIN, TIME_STAMP_MAX)if SHOW == 1:打印_黄("L_SEC_RULE")D_SEC_RULE = {}for TIME, PT, SIP, SPORT, DIP, DPORT, SZONE, DZONE, RULE_NAME, DoP in D_LOG_USG['L_SEC_RULE']:if DoP == 'DENY':打印_红(f"    {TIME}  {RULE_NAME:8s} {DoP:6s} {PT:4s} {SIP:15s} {SPORT:5s} -> {DIP:15s} {DPORT:5s} {SZONE} -> {DZONE}")#else:#    打印_绿(f"    {TIME}  {RULE_NAME:8s} {DoP:6s} {PT:4s} {SIP:15s} {SPORT:5s} -> {DIP:15s} {DPORT:5s} {SZONE} -> {DZONE}")KEY = (RULE_NAME, DoP)if KEY in D_SEC_RULE:D_SEC_RULE[KEY] += 1else:D_SEC_RULE[KEY] = 1for KEY in D_SEC_RULE:if KEY[1] == 'DENY':打印_红(f"    {KEY[0]:8s} {KEY[1]:6s} {D_SEC_RULE[KEY]}")else:打印_绿(f"    {KEY[0]:8s} {KEY[1]:6s} {D_SEC_RULE[KEY]}")打印_黄("L_SEC_SESSION")for TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, CloseReason in D_LOG_USG['L_SEC_SESSION']:会话秒数 = EndTime-BeginTimeif 会话秒数 < 30:打印_灰(f"    {TIME}  {PolicyName:8s} {Protocol:4s} {SIP:15s} {SPORT:5s} -> 用时:{秒数转时长表示(会话秒数)}(天:时:分:秒) {会话秒数}秒")else:打印_紫(f"    {TIME}  {PolicyName:8s} {Protocol:4s} {SIP:15s} {SPORT:5s} -> 用时:{秒数转时长表示(会话秒数)}(天:时:分:秒) {会话秒数}秒")打印_黄("L_SEC_PACKET")D_SEC_PACKET = {}for TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, 丢包类型 in D_LOG_USG['L_SEC_PACKET']:#打印_红(f"    {TIME_LOCAL} [{时间戳_2_时间文本(BeginTime)} {时间戳_2_时间文本(EndTime)}] {丢包类型} {PolicyName:8s} {Protocol}.{IPVer} {SIP:15s} {SPORT:5s} -> {DIP:15s} {DPORT:5s} -> {D_NAT_IP}:{D_NAT_PORT} {SZONE} -> {DZONE}")KEY = (PolicyName, 丢包类型)if KEY in D_SEC_PACKET:D_SEC_PACKET[KEY] += 1else:D_SEC_PACKET[KEY] = 1for KEY in D_SEC_PACKET:打印_红(f"    {KEY[0]:8s} {KEY[1]} {D_SEC_PACKET[KEY]}")打印_黄("D_IF_PHY")for K in D_LOG_USG['D_IF_PHY']:print(f"    {K:21s} DOWN/UP 次数 {len(D_LOG_USG['D_IF_PHY'][K])}")for TIME_LOCAL, IF_ST in D_LOG_USG['D_IF_PHY'][K]:if IF_ST == 'DOWN':打印_红(f"        {TIME_LOCAL} DOWN")else:打印_绿(f"        {TIME_LOCAL} {IF_ST}")#print("D_IF_LINK")#for K in D_LOG_USG['D_IF_LINK']:#    print(f"    {K:21s} DOWN/UP 次数 {len(D_LOG_USG['D_IF_LINK'][K])}")打印_黄("L_LOGIN")for i in D_LOG_USG['L_LOGIN']:打印_红(f"    {' '.join(i)}")打印_黄("D_LOG_OTHER")L_K = [i for i in D_LOG_USG['D_LOG_OTHER']]L_K.sort()for K in L_K:print(f"{D_LOG_USG['D_LOG_OTHER'][K]:5} {K}")FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, TIME_RUN = R时间文本格式 = '%Y-%m-%d %H:%M:%S'打印_紫(f"日志路径 : {FILE_PATH}")打印_青(f"开始/结束: {时间戳_2_时间文本(TIME_STAMP_MIN, 时间文本格式)} / {时间戳_2_时间文本(TIME_STAMP_MAX, 时间文本格式)}")打印_绿(f"解析/总数: {SELECT_N}/{TOT_N} {(SELECT_N/TOT_N)*100:.2f}(%)  用时: {TIME_RUN:.2f}秒")## 返回需要的信息(防火墙规则放行和阻止的统计)D_RETURN = {'PERMIT':{}, 'DENY':{}, 'OTHER':{}}for TIME, PT, SIP, SPORT, DIP, DPORT, SZONE, DZONE, RULE_NAME, DoP in D_LOG_USG['L_SEC_RULE']:if DoP == 'PERMIT':if RULE_NAME not in D_RETURN['PERMIT']:D_RETURN['PERMIT'][RULE_NAME] = 0D_RETURN['PERMIT'][RULE_NAME] += 1elif DoP == 'DENY':if RULE_NAME not in D_RETURN['DENY']:D_RETURN['DENY'][RULE_NAME] = 0D_RETURN['DENY'][RULE_NAME] += 1else:if RULE_NAME not in D_RETURN['OTHER']:D_RETURN['OTHER'][RULE_NAME] = 0D_RETURN['OTHER'][RULE_NAME] += 1return(D_RETURN)if __name__ == '__main__':FILE_PATH = '华为防火墙日志文件路径'SHOW = 1    # 查看重要日志信息D_RULE_INFO = USG(PATH_SYSLOG_FILE, SHOW)print(D_RULE_INFO)

DEF_COLOR.py

给点颜色看看

# -*- coding: utf8 -*-
import os## 终端显示颜色
if os.name == 'nt':       # Windowsimport ctypes,sysSTD_OUTPUT_HANDLE = -11# Windows CMD命令行 字体颜色定义 text colors黑字 = 0x00 # black.暗蓝字 = 0x01 # dark blue.暗绿字 = 0x02 # dark green.暗青字 = 0x03 # dark skyblue.暗红字 = 0x04 # dark red.暗紫字 = 0x05 # dark pink.暗黄字 = 0x06 # dark yellow.暗白字 = 0x07 # dark white.灰字 = 0x08 # dark gray.蓝字 = 0x09 # blue.绿字 = 0x0a # green.青字 = 0x0b # skyblue.红字 = 0x0c # red.紫字 = 0x0d # pink.黄字 = 0x0e # yellow.白字 = 0x0f # white.# Windows CMD命令行 背景颜色定义 background colors暗蓝底 = 0x10 # dark blue.暗绿底 = 0x20 # dark green.暗青底 = 0x30 # dark skyblue.暗红底 = 0x40 # dark red.暗紫底 = 0x50 # dark pink.暗黄底 = 0x60 # dark yellow.暗白底 = 0x70 # dark white.灰底 = 0x80 # dark gray.蓝底 = 0x90 # blue.绿底 = 0xa0 # green.青底 = 0xb0 # skyblue.红底 = 0xc0 # red.紫底 = 0xd0 # pink.黄底 = 0xe0 # yellow.白底 = 0xf0 # white.std_out_handle = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)def set_cmd_text_color(color, handle=std_out_handle):Bool = ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)return Booldef resetColor():set_cmd_text_color(红字 | 绿字 | 蓝字)def 打印_黑(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黑字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(灰字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_蓝(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(蓝字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_绿(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(绿字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_青(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(青字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_红(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(红字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_紫(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(紫字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_黄(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黄字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(白字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗蓝(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗蓝字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗绿(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗绿字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗青(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗青字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗红(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗红字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗紫(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗紫字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗黄(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗黄字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗白(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗白字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底黑字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黑字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底灰字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(灰字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底红字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(红字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底绿字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(绿字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底黄字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黄字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底蓝字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(蓝字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底紫字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(紫字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底青字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(青字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底红字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(红字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底蓝字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(蓝字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底绿字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(绿字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_蓝底黄字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黄字 | 蓝底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_蓝底白字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(白字 | 蓝底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底青字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(青字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()
elif os.name == 'posix':  # Linux'''格式: print('\033[显示方式;前景颜色;背景颜色m ..........\033[0m')print('\033[31;42m 123\033[0m')显示方式0 默认1 高亮显示4 下划线5 闪烁7 反白显示8 不可见颜色 前景色 背景色黑色     30     40红色     31     41绿色     32     42黄色     33     43蓝色     34     44紫色     35     45青色     36     46白色     37     47'''def 打印_灰(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;30;1m{TEXT}\033[0m")def 打印_红(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;31;1m{TEXT}\033[0m")def 打印_绿(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;32;1m{TEXT}\033[0m")def 打印_黄(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;33;1m{TEXT}\033[0m")def 打印_蓝(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;34;1m{TEXT}\033[0m")def 打印_紫(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;35;1m{TEXT}\033[0m")def 打印_青(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;36;1m{TEXT}\033[0m")def 打印_白(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;37;1m{TEXT}\033[0m")def 打印_白底黑字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;30;47m{TEXT}\033[0m")def 打印_白底红字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;31;47m{TEXT}\033[0m")def 打印_白底绿字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;32;47m{TEXT}\033[0m")def 打印_白底黄字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;33;47m{TEXT}\033[0m")def 打印_白底蓝字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;34;47m{TEXT}\033[0m")def 打印_白底紫字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;35;47m{TEXT}\033[0m")def 打印_白底青字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;36;47m{TEXT}\033[0m")


http://www.ppmy.cn/ops/85893.html

相关文章

快速安装torch-gpu和Tensorflow-gpu(自用,Ubuntu)

要更详细的教程可以参考Tensorflow PyTorch 安装&#xff08;CPU GPU 版本&#xff09;&#xff0c;这里是有基础之后的快速安装。 一、Pytorch 安装 conda create -n torch_env python3.10.13 conda activate torch_env conda install cudatoolkit11.8 -c nvidia pip ins…

go 并发

一、问题 1.1描述 我想要检测一组url的运行状态&#xff0c;如果ok则返回true&#xff0c;结果返回的结果是空的 func CheckWebsites(wc WebsiteChecker, urls []string) map[string]bool {results : make(map[string]bool)for _, url : range urls {go func() {results[url…

MongoDB教程(二十一):MongoDB大文件存储GridFS

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; 文章目录 引言一、GridFS…

外卖霸王餐系统架构怎么选?

在当今日益繁荣的外卖市场中&#xff0c;外卖霸王餐作为一种独特的营销策略&#xff0c;受到了众多商家的青睐。然而&#xff0c;要想成功实施外卖霸王餐活动&#xff0c;一个安全、稳定且高效的架构选择至关重要。本文将深入探讨外卖霸王餐架构的选择&#xff0c;以期为商家提…

优化冗余代码:提升前端项目开发效率的实用方法

目录 前言代码复用与组件化模块化开发与代码分割工具辅助与自动化结束语 前言 在前端开发中&#xff0c;我们常常会遇到代码冗余的问题&#xff0c;这不仅增加了代码量&#xff0c;还影响了项目的可维护性和开发效率。还有就是有时候会接到紧急业务需求&#xff0c;要求立马完…

泄露的基准测试表明Meta Llama 3.1 405B模型的性能可能超过OpenAI GPT-4o

2024 年 4 月&#xff0c;Meta 推出了新一代最先进的开源大型语言模型Llama 3。前两个模型 Llama 3 8B 和 Llama 3 70B为同类规模的 LLM 树立了新的基准。然而&#xff0c;在短短三个月内&#xff0c;其他几个 LLM 的性能已经超过了它们。 Meta 已经透露&#xff0c;其最大的 L…

Python3网络爬虫开发实战(2)爬虫基础库

文章目录 一、urllib1. urlparse 实现 URL 的识别和分段2. urlunparse 用于构造 URL3. urljoin 用于两个链接的拼接4. urlencode 将 params 字典序列化为 params 字符串5. parse_qs 和 parse_qsl 用于将 params 字符串反序列化为 params 字典或列表6. quote 和 unquote 对 URL的…

PyTorch 的各个核心模块和它们的功能

1. torch 核心功能 张量操作&#xff1a;PyTorch 的张量是一个多维数组&#xff0c;类似于 NumPy 的 ndarray&#xff0c;但支持 GPU 加速。数学运算&#xff1a;提供了各种数学运算&#xff0c;包括线性代数操作、随机数生成等。自动微分&#xff1a;torch.autograd 模块用于…