quic-go源码二---server accept请求

devtools/2024/10/23 3:23:50/

本篇是上一篇的 看点2 内容。本该放上篇,但是由于上篇内容已经不少,所以单独拆开。另外还是主动说明下:我使用quic-go版本是github.com/quic-go/quic-go v0.47.0
虽然我在上篇截图过程中就尽量带上了quic-go版本信息,但在此还是再次说明。
本篇分析代码如下:

for {conn, err := listener.Accept(context.Background())// ... use conn...
}

看下面官方源码和注释:Accept返回新的连接,本方法应该在循环中调用。
都给出使用说明了:

// Accept returns new connections. It should be called in a loop.
func (l *Listener) Accept(ctx context.Context) (Connection, error) {return l.baseServer.Accept(ctx)
}// Accept returns connections that already completed the handshake.
// It is only valid if acceptEarlyConns is false.
// 返回已完成握手的连接
// 只有当 acceptEarlyConns标识(上篇有说明)为false的时候,返回的连接才是有效的
func (s *baseServer) Accept(ctx context.Context) (Connection, error) {return s.accept(ctx)
}func (s *baseServer) accept(ctx context.Context) (quicConn, error) {select {case <-ctx.Done():return nil, ctx.Err()case conn := <-s.connQueue: // 从这个chan中读取连接return conn, nilcase <-s.errorChan:return nil, s.closeErr}
}

在上篇我们知道connQueue是一个长度为32的quicConn chan,
那么问题来了,谁往baseServer的connQueue扔连接呢?
server.go第555行:

func (s *baseServer) handleInitialImpl(p receivedPacket, hdr *wire.Header) error

方法内的723行:
在这里插入图片描述
这个方法内容很多,回头我们分析,先看看是谁调用了它?
server.go第377行:

func (s *baseServer) handlePacketImpl(p receivedPacket) bool /* is the buffer still in use? */

方法内的第459行:
在这里插入图片描述
这个handlePacketImpl方法内容同样不少,那么是谁调用了它呢?
在这里插入图片描述
闭环了朋友们,在上篇提到了上图第299行代码很重要,但是没有解释为什么,现在~
在继续分析之前,我们先来一张粗略图:
在这里插入图片描述
流程清楚了后:知道了如何接收到数据包后,接下来就好说了,去看看
func (s *baseServer) handlePacketImpl(p receivedPacket) bool里的逻辑:

客户端请求下,代码如下:

package mainimport ("context""crypto/tls""encoding/binary""fmt""io""log""os""path/filepath""strconv""time""github.com/quic-go/quic-go"
)const addr = "127.0.0.1:9000"func main() {tlsConf := &tls.Config{InsecureSkipVerify: true,NextProtos:         []string{"HLD"},}conn, err := quic.DialAddr(context.Background(), addr, tlsConf, nil)if err != nil {log.Fatalf("Error dialing address: %v", err)}defer conn.CloseWithError(0, "")stream, err := conn.OpenStreamSync(context.Background())if err != nil {log.Fatalf("Error opening stream: %v", err)}defer stream.Close()// 发送数据var sendFlag = 0go func() {for {// 发送业务类型bizType := []byte{0x01} // 文本err = sendData(stream, bizType)if err != nil {fmt.Errorf("error writing text biz type to stream: %v", err)break}sendFlag++sendBuffer := make([]byte, 1024)numberStr := "HLD_" + strconv.Itoa(sendFlag)copy(sendBuffer, numberStr)log.Printf("Send: %v", string(sendBuffer))time.Sleep(2 * time.Second)err = sendData(stream, sendBuffer[:len(numberStr)])if err != nil {fmt.Errorf("Error writing to stream: %v", err)break}// 为了方便调试,间隔设置得很大!time.Sleep(10000 * time.Second)}}()// 接收数据go func() {for {recvBuffer := make([]byte, 1024)recvBuffer, err = receiveData(stream, len(recvBuffer))if err != nil {fmt.Errorf("Error reading from stream: %v", err)break}log.Printf("Recv: %v", string(recvBuffer))}}()select {}
}func sendData(stream quic.Stream, data []byte) error {_, err := stream.Write(data)if err != nil {return fmt.Errorf("error writing to stream: %v", err)}return nil
}func receiveData(stream quic.Stream, expectedLen int) ([]byte, error) {readBuf := make([]byte, expectedLen)n, err := stream.Read(readBuf)if err != nil {return nil, fmt.Errorf("error reading from stream: %v", err)}return readBuf[:n], nil
}

先debug启动server(在本篇最后附上完整server代码),再运行client:按照上面分析,在func (t *Transport) listen(conn rawConn)方法内:读取Packet后,再t.handlePacket(p),然后程序往下走到如下所示代码行:
在这里插入图片描述
往下:
在这里插入图片描述
早有“人”在等着呢!从chan中读取数据:
在这里插入图片描述
下面我们看看func (s *baseServer) handlePacketImpl(p receivedPacket) bool

// 。。。 省略。。。
v, err := wire.ParseVersion(p.data) // Version1       Version = 0x1
// quic version校验。。。
// 判断是否是  0-RTT 包。。额外特殊分支处理。。。咱们这个不是
hdr, _, _, err := wire.ParsePacket(p.data) // 解析包...

在这里插入图片描述

s.handleInitialImpl(p, hdr) // 进入看看

在这里插入图片描述
在这里插入图片描述
上面来回是一个循环,一直重复重复,然后你在客户端控制台可能会发现报错了:timeout: no recent network activity
所以我们不再一行一行debug了,直接按照咱们分析,在func (s *baseServer) handleInitialImpl(p receivedPacket, hdr *wire.Header) error方法内的第718行打断点,如下所示:
在这里插入图片描述
在这里插入图片描述
因为我们没有在其它地方打断点,所以放行后,发现有结果输出:
在这里插入图片描述
经过30秒后(MaxIdleTimeout默认值),发生timeout: no recent network activity
interface.go中第288行:

// MaxIdleTimeout is the maximum duration that may pass without any incoming network activity.
// The actual value for the idle timeout is the minimum of this value and the peer's.
// This value only applies after the handshake has completed.
// If the timeout is exceeded, the connection is closed.
// If this value is zero, the timeout is set to 30 seconds.
MaxIdleTimeout time.Duration

在这里插入图片描述
至此,咱们简略分析到了case s.connQueue <- conn:
那么谁去从connQueue 这个chan中读取conn呢?查找后发现在下面:
在这里插入图片描述
这不就是咱们server代码中的Accept嘛:
在这里插入图片描述
所以你看流程是不是通了。。。有兴趣的朋友可以打断点试试,我刚验证是这样的。
总结:本篇简单分析了获取conn连接的过程,transport 和 server之间的交互。

server端完整代码如下

package mainimport ("bufio""context""crypto/rand""crypto/rsa""crypto/tls""crypto/x509""encoding/binary""encoding/pem""fmt""io""log""math/big""os""path/filepath""strconv""strings""time""github.com/quic-go/quic-go"
)
const addr = "127.0.0.1:9000"func main() {quicConf := &quic.Config{InitialStreamReceiveWindow:     1 << 20,  // 1 MBMaxStreamReceiveWindow:         6 << 20,  // 6 MBInitialConnectionReceiveWindow: 2 << 20,  // 2 MBMaxConnectionReceiveWindow:     12 << 20, // 12 MB}listener, err := quic.ListenAddr(addr, generateTLSConfig(), quicConf)if err != nil {log.Fatalf("Error listening on address: %v", err)}defer listener.Close()for {conn, err := listener.Accept(context.Background())if err != nil {log.Printf("Error accepting connection: %v", err)continue}go handleConnection(conn)fmt.Println("New client connected")}
}func handleConnection(conn quic.Connection) {for {// 接收数据流stream, err := conn.AcceptStream(context.Background())if err != nil {log.Printf("Error accepting stream: %v", err)return}fmt.Printf("New stream opened:%+v\n", stream.StreamID())remoteAddr := conn.RemoteAddr().String()fmt.Printf("Client connected from: %s\n", remoteAddr)go func() {defer stream.Close()defer fmt.Println("video recv ok, ready to close stream for video...")for {// 读取业务类型bizType := make([]byte, 1)_, err = stream.Read(bizType)if err != nil {log.Printf("Error reading from stream: %v", err)return}log.Printf("Recv: biz type %v\n", bizType)if bizType[0] == byte(0x05) { // 视频handleVideo(stream)break} else {// 文本data := make([]byte, 1024)nr, err := stream.Read(data)if err != nil {log.Printf("Error reading from stream: %v", err)return}log.Printf("Recv: %v\n", string(data))time.Sleep(2 * time.Second)nw, err := stream.Write(data[:nr])if err != nil {log.Printf("Error writing to stream: %v", err)return}log.Printf("Send: %v, size: %d\n", string(data[:nr]), nw)}}}()}
}func handleVideo(stream quic.Stream) {//defer stream.Close()var err error// 读取文件信息headLenBuf := make([]byte, 4)n, err := stream.Read(headLenBuf)if err != nil || n != 4 {fmt.Println("video Error reading:", err.Error())return}nn := binary.BigEndian.Uint32(headLenBuf)bufferFileName := make([]byte, nn)n, err = io.ReadFull(stream, bufferFileName)if err != nil {fmt.Println("video Error reading:", err.Error())return}fileInfo := strings.Split(string(bufferFileName[:n]), "|")fileName, fileSizeStr := fileInfo[0], fileInfo[1]fileSize, _ := strconv.ParseInt(fileSizeStr, 10, 64)// 创建文件newFile, err := os.Create(filepath.Join("./", "received_"+fileName))if err != nil {fmt.Println("video Error creating file:", err)return}writer := bufio.NewWriter(newFile)defer writer.Flush()defer newFile.Close()if fileSize == 0 {fmt.Println("video read file meta info err...")return}var receivedBytes int64 = 0var bufSize int64 = 1024 * 8for receivedBytes < fileSize {_, err := io.CopyN(writer, stream, bufSize)if err != nil {if err != io.EOF {fmt.Println("Error copying:", err)}break}receivedBytes += bufSize}fmt.Println("File received successfully", fileName)
}func generateTLSConfig() *tls.Config {key, err := rsa.GenerateKey(rand.Reader, 1024)if err != nil {panic(err)}template := x509.Certificate{SerialNumber: big.NewInt(1)}certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)if err != nil {panic(err)}keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)})certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})tlsCert, err := tls.X509KeyPair(certPEM, keyPEM)if err != nil {panic(err)}return &tls.Config{Certificates: []tls.Certificate{tlsCert},NextProtos:   []string{"HLD"},}
}

http://www.ppmy.cn/devtools/128035.html

相关文章

【C语言】循环结构while循环do...while循环

while循环&#xff1a; 初始化循环变量 while(循环条件) {循环变量控制;循环体; }do while循环&#xff1a; do{循环变量控制;循环体; }while(循环条件)#include <stdio.h> #include <math.h> /* 功能&#xff1a;循环结构&#xff08;while,do while&#xff09…

在 Vue 3 中实现电子签名组件

在 Vue 3 中实现一个简单的电子签名组件&#xff0c;并解决一个常见问题&#xff1a;当签名组件放在弹窗内时&#xff0c;鼠标绘制会出现偏移的问题。 项目环境&#xff1a; Vue 3&#xff1a;前端框架Element Plus&#xff1a;UI 组件库 电子签名组件功能 画布绘制&#x…

判断 HTTP/2 多路复用是否在服务器上实现

要判断 HTTP/2 多路复用是否在服务器上实现&#xff0c;并确保浏览器正在使用多路复用来加载资源&#xff0c;您可以使用以下几种方法进行验证&#xff1a; 1. 使用浏览器开发者工具 大多数现代浏览器&#xff08;如 Chrome、Firefox、Edge&#xff09;提供了开发者工具&…

Python进阶

面向对象编程&#xff08;OOP&#xff09; 1.1 类和对象 类 是一个模板&#xff0c;用来描述一类对象的属性和行为。通过定义类&#xff0c;你可以创建对象&#xff08;也称为类的实例&#xff09;。对象 是类的实例&#xff0c;通过类创建的具体实例对象。 示例&#xff1a…

基于Springboot在线视频网站的设计与实现

基于Springboot视频网站的设计与实现 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;idea 源码获取&#xff1a;https://do…

架构设计笔记-21-案例分析

1.遗留系统策略 / 数据迁移 / REST和RPC风格 2.分布式系统 / 分布式对象调用 3.开放式架构 / GOA 4.ESB 5.FMEA故障分析 6. 加密 / 公钥体系机制 / 加解密API和透明加密 7.嵌入式系统故障 / 故障滤波算法 / 容错算法 8.开源框架struts / spring / Hibenate 9.企业应用集成 10.T…

西门子嵌入式面试题及参考答案(万字长文)

RAM 和 ROM 的各种总线协议 RAM(随机存取存储器)和 ROM(只读存储器)在嵌入式系统中起着重要的作用,它们通常使用不同的总线协议与其他设备进行通信。 一、RAM 的常见总线协议 SRAM(静态随机存取存储器)常用的总线协议有异步 SRAM 协议和同步 SRAM 协议。 异步 SRAM 协议…

2024软考网络工程师笔记 - 第10章.组网技术

文章目录 交换机基础1️⃣交换机分类2️⃣其他分类方式3️⃣级联和堆叠4️⃣堆叠优劣势5️⃣交换机性能参数 &#x1f551;路由器基础1️⃣路由器接口2️⃣交换机路由器管理方式2️⃣交换机路由器管理方式 交换机基础 1️⃣交换机分类 1.根据交换方式分 存储转发式交换(Store…