场景:
新需求还未开发时,使用mock提早介入测试,等后边开发后,进行调试
- 三方接口返回效率低,使用mock技术走通流程
1.mock方式
(1)如果会写django或flask,可以写简单对应的代码
(2)一些接口工具具备这个功能(postman)
(3)一些现成的框架直接使用:Moco框架
构造一个依赖的服务,并给予他预期的服务返回值,适用范围广,更加适合 集成测试
Moco框架
类似一个Mock的工具框架,一个简单搭建模拟服务器的程序库/工具,下载就是一个jar包
特点:
- 简单的配置request、response等即可满足要求
- 支持http、https、socket协议,可以说是非常的灵活性
- 支持在request中设置Headers,Cookies,StatusCode等
- 对get\post\put\delete等请求方式都支持
- 无需环境配置,有Java环境即可
- 修改配置后,立即生效。只需要维护接口,也就是契约即可
- 支持多种数据格式,如JSON\Text\XML\File等
- 可与其他工具集成,如Junit\Maven等
2.Mock服务搭建
(1)需要安装jdk
(2)下载moco的jar包
moco下载
提取码:8eem
(3)启动服务,jar包名称根据下载的jar包写
http代表这个模拟的是http请求,
-p 9090是定义端口号
-c test.json是编辑的json文件名
java -jar moco-runner-0.11.0-standalone.jar http -p 9090 -c test.json
如果cmd乱码就用:`
``java -jar -Dfile.encoding=utf-8 moco-runner-0.11.0-standalone.jar http -p 9090 -c demo.json```
(4)验证
/demo1:是json文件中定义的uri
访问:http://localhost:9090/demo1
3.Mock请求构建
start.bat
@echo onjava -jar -Dfile.encoding=utf-8 moco-runner-0.11.0-standalone.jar http -p 9090 -c demo.json
@echo offpause
demo.json
[{"description":"11","request":{"uri":"/shop","method":"Get"},"response":{"text":"Hello,baby"} }
]
浏览器输入:http://127.0.0.1:9090/shop
4.配置不同的请求
(1)约定uri
[{"description":"约定uri","request":{"uri":"/shop"},"response":{"text":"Hello,baby"} }
]
(2)约定请求参数
[{"description":"约定请求参数","request":{"queries":{"key1":"123","key2":"cvf"}},"response":{"text":"Hello,baby"} }
]
(3)约定请求方法
[{"description":"约定请求方法","request":{"method":"DELETE"},"response":{"text":"Hello,baby"} }
]
(4)约定请求头
[{"description":"约定请求头","request":{"headers":{"Content-Type":"application/xml"}},"response":{"text":"Hello,baby"} }
]
(5)约定请求体参数-form
[{"description":"约定请求体参数-form","request":{"forms":{"key1":"anc"}},"response":{"text":"Hello,baby"} }
]
(6)约定请求体参数-json
[{"description":"约定请求体参数-json","request":{"json":{"key1":"anc","key2":"anc"}},"response":{"text":"Hello,baby"} }
]
(7)返回响应头和json
[{"description":"返回响应头和json","request":{"json":{"key1":"anc","key2":"anc"}},"response":{"headers":{"Content-Type":"application/json"},"json":{"code":"200"}} }
]
代码测试
HOST = 'http://127.0.0.1:9090'
import requests
def test():url=f'{HOST}/shop_up'payload={'key':'abc'}# form格式:data=payload json格式:json=payloadresp=requests.post(url,json=payload)print(resp.text)if __name__ == '__main__':test()
要先执行jar包启动服务,在运行上述代码
5.异步接口实战
(1)同步和异步
同步:不利于性能提升,需要等待响应
异步:发送请求后,响应后续给出,可以先做其他的
eg:体检拿结果,可以A项做完后,直接做B项的同时等A的结果,不用先等A的结果,再去做B。
(2)异步接口实现
- 业务场景
店铺向平台申请退单请求
平台接受请求,核实信息
平台3个工作日内告知结果 - 具体实现
通过提交申请的接口给服务端
服务器立即返回这个申请id
后续使用查询的接口,带上id查询结果
看是否有返回结果 - mock+异步查询技术
拿到申请和查询订单结果接口的文档,获取url、参数、响应等信息
(1)申请退单接口
HOST = 'http://127.0.0.1:9090'
import requests
import time
# ----------------1.创建申诉接口---------------
def create_order(inData):url = f'{HOST}/api/order/create/'payload = inDataresp = requests.post(url, json=payload)return resp.json()
# -------2.查询结果接口--------
"""
查询接口注意事项:
1.使用id去查--id是creat_order创建返回的
2.次数/频率--频率 interval
3.超时机制--timeout
"""
def get_order_result(orderID, interval=5, timeout=30):""":param orderID: 订单id:param interval: 查询频率:param timeout: 超时时间:return: 查询结果"""url = f'{HOST}/api/order/get_result/'payload = {"order_id": orderID}# 获取开始查询的时间和结束时间starTime = time.time() # 单位是sendTime = starTime + timeoutcnt=0#查询的次数--初始值为0# 判断查询时间是否超时while time.time()<endTime:resp = requests.get(url, params=payload)cnt+=1# 如果有响应数据,直接结束循环,breakif resp.text:print(f"第{cnt}次查询,已有结果,退出查询>>>",resp.text)breakelse:print(f'第{cnt}次查询,暂无结果,请耐心等待>>>')time.sleep(5)#等5sprint('====该查询函数执行完毕===')return resp.text
if __name__ == '__main__':testData = {"user_id": "zz123","goods_id": "20240424","num": 1,"amount": 200.6}res = create_order(testData)print(res)id = res['order_id']# 2.调用查询接口res2 = get_order_result(id)print(res2)
结果:异步接口已经实现,但执行效率低
原因:
(1)查询接口使用sleep–5s,导致自动化测试效率低
优化:
(1)代码层面原因:当执行sleep()时,代码不会处理其他事情
requests.xx方法和sleep()属于io阻塞
深挖细节
cpu:
- io阻塞模式
- cpu密集性
(2)具体解决方案
改为多线程,等待的时候,做其他接口,等待时间到了之后,再回来继续执行下一步操作
主线程:main–下面的代码
子线程:就是get_order_result函数
HOST = 'http://127.0.0.1:9090'
import requests
import time
import threading #多线程--内置库# ----------------1.创建申诉接口---------------
def create_order(inData):url = f'{HOST}/api/order/create/'payload = inDataresp = requests.post(url, json=payload)return resp.json()# -------2.查询结果接口--------
"""
查询接口注意事项:
1.使用id去查--id是creat_order创建返回的
2.次数/频率--频率 interval
3.超时机制--timeout
"""
def get_order_result(orderID, interval=5, timeout=30):""":param orderID: 订单id:param interval: 查询频率:param timeout: 超时时间:return: 查询结果"""url = f'{HOST}/api/order/get_result/'payload = {"order_id": orderID}# 获取开始查询的时间和结束时间starTime = time.time() # 单位是sendTime = starTime + timeoutcnt=0#查询的次数--初始值为0# 判断查询时间是否超时while time.time()<endTime:resp = requests.get(url, params=payload)cnt+=1# 如果有响应数据,直接结束循环,breakif resp.text:print(f"第{cnt}次查询,已有结果,退出查询>>>",resp.text)breakelse:print(f'第{cnt}次查询,暂无结果,请耐心等待>>>')time.sleep(5)#等5sprint('====该查询函数执行完毕===')return resp.text
if __name__ == '__main__':testData = {"user_id": "zz123","goods_id": "20240424","num": 1,"amount": 200.6}res = create_order(testData)print(res)id = res['order_id']#--------多线程操作---------#创建子线程"""threading.Thread(target,args)target:需要把哪一个函数做为子线程执行的任务,就写该函数名args:该函数需要传入的参数-以元组形式"""t1=threading.Thread(target=get_order_result,args=(id,))# 主线程如果结束或异常退出,子线程就直接退出t1.setDaemon(True)#守护线程t1.start()#---------这些是模拟其他接口的测试操作for one in range(20):time.sleep(1)#模拟其他接口执行的时间print(f'{one}---正在执行其他接口')#--------------------------