在上一节中如果并行的客户端连接数超过了默认开启进程的数量,那么后来的客户端请求将会阻塞,为了不阻塞新的客户端,我们可以将进程的单线程改成多线程即可。
服务端代码:
import json
import struct
import socket
import threading
import multiprocessingdef handle_conn(conn, ip, handlers):print("{} connect ...".format(ip))# 循环读写while True:length_prefix = conn.recv(4) # 请求长度前缀if not length_prefix: # 连接关闭了conn.close()print("{} close ...".format(ip))break # 退出循环,处理下一个连接length, = struct.unpack("I", length_prefix)body = conn.recv(length) # 请求消息体request = json.loads(body)client_method = request['client']client_parameter = request['params']print("client request method is {}, params is {}".format(client_method, client_parameter))handler = handlers[client_method] # 查找请求处理器handler(conn, client_parameter) # 处理请求def process_request(sock, handlers):while True:conn, host_ip = sock.accept() # 接收连接handle_conn(conn, host_ip, handlers) # 处理连接def multi_thread(sock, handlers):threads = []thread_nums = 2for i in range(thread_nums):t = threading.Thread(target=process_request, args=(sock, handlers))threads.append(t)for i in range(thread_nums):threads[i].start()for i in range(thread_nums):threads[i].join()def func1(conn, params):res = json.dumps({"response": "OK", "result": params}) # 响应消息体length_prefix = struct.pack("I", len(res)) # 响应长度前缀conn.sendall(length_prefix)# conn.sendall(response) # python2conn.sendall(str.encode(res)) # python3def main():s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建一个 TCP 套接字s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 打开 reuse addr 选项s.bind(("localhost", 8080)) # 绑定端口s.listen(1) # 监听客户端连接handlers = {"func1": func1 # 注册请求处理器}# process_request(s, handlers) # 进入服务循环process = []process_nums = 2for i in range(process_nums):p = multiprocessing.Process(target=multi_thread, args=(s, handlers))process.append(p)for i in range(process_nums):process[i].start()for i in range(process_nums):process[i].join()if __name__ == '__main__':main()
示例代码中开启 2 个进程,每个进程中又开了 2 个线程,那么此时最多可以同时开启 4 个客户端,超过 4 个客户端后多余的请求会被阻塞处理。