一、前置配置
1.1 进入我的企业页面,记录下企业ID。
1.2 创建企微应用,记录下应用的 AgentId 和 Secret。
1.3 设置应用的企业可信IP,将服务器公网 IP 填入即可。
1.4 设置应用接收消息API
填入服务器 API 地址,并记录下随机获取的 Token 和 EncodingAESKey。完成后,先不要点击保存,后续等服务端应用启动后再保存,即可完成校验。
二、服务端部署
2.1 企业应用消息收发流程拓扑
2.2 企微相关开发者文档说明
①、消息接收概述(主要说明了 消息加解密方法、消息收发协议、消息收发格式等)
概述 - 文档 - 企业微信开发者中心 (qq.com)
②、消息加解密官方库(包含多种代码语言,本文使用的是python库,解压使用的文件如下:)
加解密库下载与返回码 - 文档 - 企业微信开发者中心 (qq.com)
注意:需要使用 WXBizMsgCrypt3.py
这个文件。
③、企微应用主动发送消息(被动方式回复消息的格式不支持markdown和文件类型,为使回复内容更美观,可以采用主动发送消息的方式进行指定回复。)
发送应用消息 - 文档 - 企业微信开发者中心 (qq.com)
2.3 安装python相关依赖库。
pip3 install -r requirements.txt
requirements 内容如下:
bcrypt==4.1.1
blinker==1.8.2
certifi==2024.8.30
cffi==1.17.0
charset-normalizer==3.3.2
click==8.1.7
colorama==0.4.6
crypto==1.4.1
cryptography==36.0.2
flask==3.0.3
idna==3.8
importlib-metadata==8.4.0
itsdangerous==2.2.0
jinja2==3.1.4
MarkupSafe==2.1.5
Naked==0.1.32
paramiko==3.0.0
pycparser==2.22
pycryptodome==3.20.0
PyNaCl==1.5.0
PyYAML==6.0.2
requests==2.32.3
shellescape==3.8.1
urllib3==2.2.2
werkzeug==3.0.4
zipp==3.20.1
2.4 主程序 app.py
内容:
# -*- coding: utf-8 -*-
from flask import Flask, request, make_response
from WXBizMsgCrypt3 import WXBizMsgCrypt
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import fromstring# help_list
from help_list import help_list# funny
from funny.help_funny_list import help_funny_list
from funny.get_weather import get_weather
from funny.get_myrb import get_myrb
from funny.get_music import get_music
from funny.get_fortune import get_fortune
from funny.get_tellocal import get_tel
from funny.get_express import get_express# ops_tools
from ops_tools.get_ops import get_opsapp = Flask(__name__)def printXML(xml_content):# 创建XML元素element = ET.XML(xml_content)# 使用indent()函数进行格式化打印ET.indent(element)print(ET.tostring(element, encoding='unicode'))# 对应接受消息回调模式中的token,EncodingAESKey 和 企业信息中的企业id
qy_api = [WXBizMsgCrypt("***************", "**************************", "*********************"), ]# 开启消息接受模式时验证接口连通性
def signature(request, i):msg_signature = request.args.get('msg_signature', '')timestamp = request.args.get('timestamp', '')nonce = request.args.get('nonce', '')echo_str = request.args.get('echostr', '')ret, sEchoStr = qy_api[i].VerifyURL(msg_signature, timestamp, nonce, echo_str)if (ret != 0):print("ERR: VerifyURL ret: " + str(ret))return ("failed")else:return (sEchoStr)# 接收用户消息,可进行被动响应
def handle_user_message(request, i):user_message = request.dataprintXML(user_message)msg_signature = request.args.get('msg_signature', '')timestamp = request.args.get('timestamp', '')nonce = request.args.get('nonce', '')ret, sMsg = qy_api[i].DecryptMsg(user_message.decode('utf-8'), msg_signature, timestamp, nonce)decrypt_data = {}for node in list(fromstring(sMsg.decode('utf-8'))):decrypt_data[node.tag] = node.text# 解析后得到的decrypt_data: {"ToUserName":"企业号", "FromUserName":"发送者用户名", "CreateTime":"发送时间", "Content":"用户发送的内容", "MsgId":"唯一id,需要针对此id做出响应", "AagentID": "应用id"}# 用户应根据Content的内容自定义要做出的行为,包括响应返回数据,如下例子,如果发送的是123,就返回hello worldcontent_text = decrypt_data.get('Content', '')to_username_text = decrypt_data.get('ToUserName', '')from_username_text = decrypt_data.get('FromUserName', '')create_time_text = decrypt_data.get('CreateTime', '')# 主菜单if content_text == '#help':sRespData = help_list(to_username_text, from_username_text, create_time_text)# 生活菜单if content_text == '#help02':sRespData = help_funny_list(to_username_text, from_username_text, create_time_text)# 天气查询if content_text == '#天气查询':sRespData = get_weather(to_username_text, from_username_text, create_time_text)# 摸鱼日报if content_text == '#摸鱼日报':sRespData = get_myrb(to_username_text, from_username_text, create_time_text)# 随机点歌if content_text == '#随机点歌':sRespData = get_music(to_username_text, from_username_text, create_time_text)# 星座运势if "#星座运势#" in content_text:sRespData = get_fortune(content_text, to_username_text, from_username_text, create_time_text)# 电话查询if "#电话查询#" in content_text:sRespData = get_tel(content_text, to_username_text, from_username_text, create_time_text)# 快递查询if "#快递查询#" in content_text:sRespData = get_express(content_text, to_username_text, from_username_text, create_time_text)# OPS工具if "#ops#" in content_text:sRespData = get_ops(content_text, to_username_text, from_username_text, create_time_text)ret, send_msg = qy_api[i].EncryptMsg(sReplyMsg=sRespData, sNonce=nonce)if ret == 0:return send_msgelse:print(send_msg)@app.route('/company_wechat', methods=['GET', 'POST'])
def company_wechat():if request.method == 'GET':return signature(request, 0)else:print("收到请求......")return handle_user_message(request, 0)# Flask服务端口,可自定义
if __name__ == '__main__':app.run(host='0.0.0.0', port=6969, debug=True)
将刚刚记录下的 Token
,EncodingAESKey
和 企业ID
分别替换至该段:
2.5 解密库 WXBizMsgCrypt3.py
内容:
# -*- encoding:utf-8 -*-""" 对企业微信发送给企业后台的消息加解密示例代码.
@copyright: Copyright (c) 1998-2014 Tencent Inc."""
# ------------------------------------------------------------------------
import logging
import base64
import random
import hashlib
import time
import struct
from Crypto.Cipher import AES
import xml.etree.cElementTree as ET
import socket
import ierror"""
关于Crypto.Cipher模块,ImportError: No module named 'Crypto'解决方案
请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。
下载后,按照README中的“Installation”小节的提示进行pycrypto安装。
"""class FormatException(Exception):passdef throw_exception(message, exception_class=FormatException):"""my define raise exception function"""raise exception_class(message)class SHA1:"""计算企业微信的消息签名接口"""def getSHA1(self, token, timestamp, nonce, encrypt):"""用SHA1算法生成安全签名@param token: 票据@param timestamp: 时间戳@param encrypt: 密文@param nonce: 随机字符串@return: 安全签名"""try:sortlist = [token, timestamp, nonce, encrypt]sortlist.sort()sha = hashlib.sha1()sha.update("".join(sortlist).encode())return ierror.WXBizMsgCrypt_OK, sha.hexdigest()except Exception as e:logger = logging.getLogger()logger.error(e)return ierror.WXBizMsgCrypt_ComputeSignature_Error, Noneclass XMLParse:"""提供提取消息格式中的密文及生成回复消息格式的接口"""# xml消息模板AES_TEXT_RESPONSE_TEMPLATE = """<xml>
<Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
<MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature>
<TimeStamp>%(timestamp)s</TimeStamp>
<Nonce><![CDATA[%(nonce)s]]></Nonce>
</xml>"""def extract(self, xmltext):"""提取出xml数据包中的加密消息@param xmltext: 待提取的xml字符串@return: 提取出的加密消息字符串"""try:xml_tree = ET.fromstring(xmltext)encrypt = xml_tree.find("Encrypt")return ierror.WXBizMsgCrypt_OK, encrypt.textexcept Exception as e:logger = logging.getLogger()logger.error(e)return ierror.WXBizMsgCrypt_ParseXml_Error, Nonedef generate(self, encrypt, signature, timestamp, nonce):"""生成xml消息@param encrypt: 加密后的消息密文@param signature: 安全签名@param timestamp: 时间戳@param nonce: 随机字符串@return: 生成的xml字符串"""resp_dict = {'msg_encrypt': encrypt,'msg_signaturet': signature,'timestamp': timestamp,'nonce': nonce,}resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dictreturn resp_xmlclass PKCS7Encoder():"""提供基于PKCS7算法的加解密接口"""block_size = 32def encode(self, text):""" 对需要加密的明文进行填充补位@param text: 需要进行填充补位操作的明文@return: 补齐明文字符串"""text_length = len(text)# 计算需要填充的位数amount_to_pad = self.block_size - (text_length % self.block_size)if amount_to_pad == 0:amount_to_pad = self.block_size# 获得补位所用的字符pad = chr(amount_to_pad)return text + (pad * amount_to_pad).encode()def decode(self, decrypted):"""删除解密后明文的补位字符@param decrypted: 解密后的明文@return: 删除补位字符后的明文"""pad = ord(decrypted[-1])if pad < 1 or pad > 32:pad = 0return decrypted[:-pad]class Prpcrypt(object):"""提供接收和推送给企业微信消息的加解密接口"""def __init__(self, key):# self.key = base64.b64decode(key+"=")self.key = key# 设置加解密模式为AES的CBC模式self.mode = AES.MODE_CBCdef encrypt(self, text, receiveid):"""对明文进行加密@param text: 需要加密的明文@return: 加密得到的字符串"""# 16位随机字符串添加到明文开头text = text.encode()text = self.get_random_str() + struct.pack("I", socket.htonl(len(text))) + text + receiveid.encode()# 使用自定义的填充方式对明文进行补位填充pkcs7 = PKCS7Encoder()text = pkcs7.encode(text)# 加密cryptor = AES.new(self.key, self.mode, self.key[:16])try:ciphertext = cryptor.encrypt(text)# 使用BASE64对加密后的字符串进行编码return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)except Exception as e:logger = logging.getLogger()logger.error(e)return ierror.WXBizMsgCrypt_EncryptAES_Error, Nonedef decrypt(self, text, receiveid):"""对解密后的明文进行补位删除@param text: 密文@return: 删除填充补位后的明文"""try:cryptor = AES.new(self.key, self.mode, self.key[:16])# 使用BASE64对密文进行解码,然后AES-CBC解密plain_text = cryptor.decrypt(base64.b64decode(text))except Exception as e:logger = logging.getLogger()logger.error(e)return ierror.WXBizMsgCrypt_DecryptAES_Error, Nonetry:pad = plain_text[-1]# 去掉补位字符串# pkcs7 = PKCS7Encoder()# plain_text = pkcs7.encode(plain_text)# 去除16位随机字符串content = plain_text[16:-pad]xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])xml_content = content[4: xml_len + 4]from_receiveid = content[xml_len + 4:]except Exception as e:logger = logging.getLogger()logger.error(e)return ierror.WXBizMsgCrypt_IllegalBuffer, Noneif from_receiveid.decode('utf8') != receiveid:return ierror.WXBizMsgCrypt_ValidateCorpid_Error, Nonereturn 0, xml_contentdef get_random_str(self):""" 随机生成16位字符串@return: 16位字符串"""return str(random.randint(1000000000000000, 9999999999999999)).encode()class WXBizMsgCrypt(object):# 构造函数def __init__(self, sToken, sEncodingAESKey, sReceiveId):try:self.key = base64.b64decode(sEncodingAESKey + "=")assert len(self.key) == 32except:throw_exception("[error]: EncodingAESKey unvalid !", FormatException)# return ierror.WXBizMsgCrypt_IllegalAesKey,Noneself.m_sToken = sTokenself.m_sReceiveId = sReceiveId# 验证URL# @param sMsgSignature: 签名串,对应URL参数的msg_signature# @param sTimeStamp: 时间戳,对应URL参数的timestamp# @param sNonce: 随机串,对应URL参数的nonce# @param sEchoStr: 随机串,对应URL参数的echostr# @param sReplyEchoStr: 解密之后的echostr,当return返回0时有效# @return:成功0,失败返回对应的错误码def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):sha1 = SHA1()ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr)if ret != 0:return ret, Noneif not signature == sMsgSignature:return ierror.WXBizMsgCrypt_ValidateSignature_Error, Nonepc = Prpcrypt(self.key)ret, sReplyEchoStr = pc.decrypt(sEchoStr, self.m_sReceiveId)return ret, sReplyEchoStrdef EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):# 将企业回复用户的消息加密打包# @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串# @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间# @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce# sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,# return:成功0,sEncryptMsg,失败返回对应的错误码Nonepc = Prpcrypt(self.key)ret, encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId)encrypt = encrypt.decode('utf8')if ret != 0:return ret, Noneif timestamp is None:timestamp = str(int(time.time()))# 生成安全签名sha1 = SHA1()ret, signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt)if ret != 0:return ret, NonexmlParse = XMLParse()return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):# 检验消息的真实性,并且获取解密后的明文# @param sMsgSignature: 签名串,对应URL参数的msg_signature# @param sTimeStamp: 时间戳,对应URL参数的timestamp# @param sNonce: 随机串,对应URL参数的nonce# @param sPostData: 密文,对应POST请求的数据# xml_content: 解密后的原文,当return返回0时有效# @return: 成功0,失败返回对应的错误码# 验证安全签名xmlParse = XMLParse()ret, encrypt = xmlParse.extract(sPostData)if ret != 0:return ret, Nonesha1 = SHA1()ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt)if ret != 0:return ret, Noneif not signature == sMsgSignature:return ierror.WXBizMsgCrypt_ValidateSignature_Error, Nonepc = Prpcrypt(self.key)ret, xml_content = pc.decrypt(encrypt, self.m_sReceiveId)return ret, xml_content
2.6 启动应用,测试收发
nohup python3 app.py > /dev/null 2>&1 &
测试收发(旧图):
三、菜单功能示例
3.1 help_funny_list.py
菜单内容:
(注:改用了主动发送消息的方式,将回复内容设为markdown,并发送至指定成员ID)
import requests
import json# 帮助菜单
def help_funny_list(to_username_text, from_username_text, create_time_text):# 获取access_tokentoken_api = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'params = {'corpid': "******************",'corpsecret': "******************"}access_token = requests.get(token_api, params=params).json()['access_token']print(access_token)# 主动发送消息send_api = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'payload = json.dumps({"touser": from_username_text,"msgtype": "markdown","agentid": 1000003,"markdown": {"content": "# 【其他功能菜单】\n "">**【`#天气查询`】:查询实时天气信息**\n\n\n "">**【`#电话查询`】:查询手机号归属地信息**\n "">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#电话查询#手机号码</font>\n"">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">##电话查询#15200000000</font>\n\n\n"">**【`#快递查询`】:查询实时快递物流信息**\n "">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#快递查询#快递公司#手机尾号#快递单号</font>\n"">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">#快递查询#京东快递#95**#JD01425**************</font>\n""→[点击查看可用快递列表](http://work.weixin.qq.com/api/doc)\n\n\n"">**【`#星座运势`】:查询当日十二星座运势**\n "">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#星座运势#星座名称</font>\n"">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">#星座运势#金牛座</font>\n\n\n"">**【`#随机点歌`】:随机获取网易云在线歌曲**\n\n\n "">**【`#摸鱼日报`】:获取当日宜忌事项、历史事件、热点新闻**\n\n\n "">**【`#help`】:获取主菜单**"}})headers = {'User-Agent': 'Apifox/1.0.0 (https://apifox.com)','Content-Type': 'application/json','Accept': '*/*','Host': 'qyapi.weixin.qq.com','Connection': 'keep-alive'}response = requests.post(send_api, headers=headers, data=payload, timeout=15)if response.status_code == 200:print('ok')
将刚刚记录下来的 企业ID
和 Secret
分别替换至该段:
效果如下:
3.2 通过 paramiko
交互远程服务器,回复服务器信息
get_ops.py
内容如下:
import paramiko
import osdef get_ops(content_text, to_username_text, from_username_text, create_time_text):# 使用 split 以#分割字符串parts = content_text.split('#')# 检查分割后的列表是否有足够的分段if len(parts) >= 4:ip_address = parts[2] # 获取ip地址command = parts[3] # 获取命令client = paramiko.SSHClient()# 添加服务器密钥,如果使用的是密钥形式client.set_missing_host_key_policy(paramiko.AutoAddPolicy())# 连接SSH服务端client.connect(ip_address, port=22, username='root', password='********')# 执行命令stdin, stdout, stderr = client.exec_command(command)# 获取命令执行结果result = stdout.read().decode('utf-8', errors='ignore')sRespData = """<xml><ToUserName>{to_username}</ToUserName><FromUserName>{from_username}</FromUserName><CreateTime>{create_time}</CreateTime><MsgType>text</MsgType><Content>{content}</Content></xml>""".format(to_username=to_username_text,from_username=from_username_text,create_time=create_time_text,content=result, )return sRespData
效果如下:
3.3 查询物流信息,被动回复纯文本格式
。
get_express.py
内容如下:
import requests
import json
import redef get_express(content_text, to_username_text, from_username_text, create_time_text):# 将接收到的消息内容以#进行分割parts = content_text.split('#')if len(parts) >= 4:com_str = parts[2]phone_int = parts[3]no_str = parts[4]# 查询本地json文件中com_str对应的NO值with open('funny/exp.json', 'r', encoding='utf-8') as f:data = json.load(f)["result"]for item in data:if item["com"] == com_str:com_no = (item["no"])breakelse:error_msg = '输入有误,未找到物流信息'# 聚合平台物流查询接口,接口文档:https://www.juhe.cn/docs/api/id/43api_url = "http://v.juhe.cn/exp/index"params = {"key": "*********************************","com": com_no,"no": no_str,"receiverPhone": phone_int}response = requests.get(api_url, params=params)json_data = response.json()# 提取result中的值exp_info = {"company": json_data["result"].get("company"),"no": json_data["result"].get("no"),"status_detail": json_data["result"].get("status_detail")}# 从 list 中提取每一项的 datetime 和 remark ,然后格式化为字符串list_items = "\n\n".join("【物流时间】:{}\n【物流详情】:{}".format(item.get("datetime"), item.get("remark"))for item in json_data["result"].get("list", []))# 构造最终的回复字符串,包括所有物流详情reply = ('【物流公司】:{company}\n【物流单号】:{no}\n【物流状态】:{status_detail}\n{list_items}'.format(**exp_info,list_items=list_items))sRespData = """<xml><ToUserName>{to_username}</ToUserName><FromUserName>{from_username}</FromUserName><CreateTime>{create_time}</CreateTime><MsgType>text</MsgType><Content>{content}</Content></xml>""".format(to_username=to_username_text,from_username=from_username_text,create_time=create_time_text,content=reply, )return sRespData
效果如下:
3.4 获取随机音乐链接,被动回复图文格式
。
get_music.py
内容如下:
import requests
import json
import redef get_music(to_username_text, from_username_text, create_time_text):api_url = "https://api.52vmy.cn/api/music/wy/rand"response = requests.get(api_url)data = response.json().get('data', {})music_info = {key: data.get(key) for key in ['song', 'singer', 'cover', 'Music']}sRespData = """<xml><ToUserName>{to_username}</ToUserName><FromUserName>{from_username}</FromUserName><CreateTime>{create_time}</CreateTime><MsgType>news</MsgType><ArticleCount>1</ArticleCount><Articles><item><Title>歌曲名:{title}</Title><Description>演唱者:{description}</Description><PicUrl>{picurl}</PicUrl><Url>{url}</Url></item></Articles></xml>""".format(to_username=to_username_text,from_username=from_username_text,create_time=create_time_text,title=music_info['song'],description=music_info['singer'],picurl=music_info['cover'],url=music_info['Music'])return sRespData
效果如下:
3.5 其他问题
由于应用被动回复消息的格式不支持文件类型,如需将文件回复至企微可以采用2种方式:
①、在服务端配置nginx静态目录,通过静态页面路径 + 文件名 的形式拼接出完整的文件 url 地址,再通过图文类型的消息格式带入回复。
②、改用主动发送消息的方式,通过企微素材上传接口上传文件,并获取对应的 media_id ,再通过文件类型消息带入 media_id 指定对应成员完成发送,示例:
# ===================== 【获取access_token】 ==========================# API 地址
url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
# 企业ID
corpid = '*****************'
# SECRET
corpsecret = '******************************'params = {'corpid': corpid,'corpsecret': corpsecret
}access_token = requests.get(url, params=params).json()['access_token']
print(access_token)# ================== 【上传素材获取media_id】 =======================# API 地址
url = 'https://qyapi.weixin.qq.com/cgi-bin/media/upload'params = {'access_token': access_token,'type': 'file'
}# 要上传的文件
files = {'media': ('111.xlsx', open(r'C:\Users\Looper\Desktop\111.xlsx', 'rb'), 'application/octet-stream')
}media_id = requests.post(url, params=params, files=files).json()['media_id']
print(media_id)
到此,企微应用的消息收发实施,全部测试完毕。