ipfs pubsub代码解读

news/2025/2/13 22:43:35/

Pubsub: Publish-subscribe发布订阅模式

运行环境

版本:go-ipfs@v0.4.23 go-libp2p-pubsub@v0.0.3

本文运行两个节点,一个在ubuntu,另外一个在windows,下文用ipfs1代表ubuntu端的ipfs,用ipfs2代表windows端的ipfs。两个节点将彼此的地址添加到彼此的bootstrap中,形成由2个节点组成的测试网。

两者的peerID分别为:

ipfs1: QmUB36eFCLEN4PvwSQaJ2tsEBr9epTm5h1rATuY11baZ6o
ipfs2: Qmco9fPhEC9aYsFxY3ZekoUMZucmNa9soWDXh6xgr6FsJy

两个节点启动daemon的时候都添加参数:–enable-pubsub-experiment

$ ipfs daemon --enable-pubsub-experiment

两个节点都将bitswap/dht/namesys/pubsub的log等级设置为debug

$ ipfs log level bitswap debug & ipfs log level dht debug & ipfs log level namesys debug & ipfs log level pubsub debug

ipfs pubsub命令实操

$ ipfs pubsub
USAGEipfs pubsub - An experimental publish-subscribe system on ipfs.ipfs pubsubipfs pubsub allows you to publish messages to a given topic, and also tosubscribe to new messages on a given topic.This is an experimental feature. It is not intended in its current stateto be used in a production environment.To use, the daemon must be run with '--enable-pubsub-experiment'.SUBCOMMANDSipfs pubsub ls                    - List subscribed topics by name.ipfs pubsub peers [<topic>]       - List peers we are currently pubsubbing with.ipfs pubsub pub <topic> <data>... - Publish a message to a given pubsub topic.ipfs pubsub sub <topic>           - Subscribe to messages on a given topic.For more information about each command, use:'ipfs pubsub <subcmd> --help'

首先ipfs2订阅"phone"这个topic

$ ipfs pubsub sub phone

接着ipfs1 publish", topic为"phone",内容为"Moring"

$ ipfs pubsub pub phone "Moring"

最后看ipfs2的终端

$ ipfs pubsub sub phone
Moring

在ipfs2中另外开一个终端,查询节点当前订阅的topic

$ ipfs pubsub ls
phone

ipfs1查找订阅"phone"这个topic的peer,如果topic为空,默认是查找所有的topic

ipfs pubsub peers phone
QmUB36eFCLEN4PvwSQaJ2tsEBr9epTm5h1rATuY11baZ6o

代码讲解

ipfs config文件中关于pubsub的默认配置

"Pubsub": {"DisableSigning": false,"Router": "","StrictSignatureVerification": false}

DisableSigning为false,表示本节点publish消息时需要签名,StrictSignatureVerification为false,表明接受不带签名的publish消息。

Router有3种,默认使用的是FloodSubRouter,RandomSubRouter目前没有被ipfs使用。

FloodSubRouter
GossipSubRouter
RandomSubRouter

顾名思义,Flood是洪泛的意思,即向所有订阅节点发布(publish);gossip是私语的意思,即向6个订阅节点发布;random是随机的意思,即随机向6个订阅节点发布。
值得注意的是,FloodSubRouter是基础协议,GossipSubRouter和RandomSubRouter都支持FloodSubRouter。

显然,GossipSubRouter最好用,也最复杂,接下来主要讲解GossipSubRouter

GossipSubRouter

GossipSubRouter结构:

type GossipSubRouter struct {p       *PubSubpeers   map[peer.ID]protocol.ID         // peer protocolsmesh    map[string]map[peer.ID]struct{} // topic meshesfanout  map[string]map[peer.ID]struct{} // topic fanoutlastpub map[string]int64                // last publish time for fanout topicsgossip  map[peer.ID][]*pb.ControlIHave  // pending gossipcontrol map[peer.ID]*pb.ControlMessage  // pending control messagesmcache  *MessageCache
}

GossipSub以topic为单位维护两个网络,分别为mesh和fanout,两者都包含订阅该topic的节点,如果自身订阅了一个topic,则mesh[topic]不为空,fanout为空;如果没有订阅,则mesh[topic]为空。

publish过程

publish的时候,首先会判断自己有没有订阅这个topic,即判断mesh[topic]是否为空,如果不为空,就向mesh[topic]中的节点发送publish消息;如果为空,就判断fanout[topic]是否为空,如果不为空,就向fanout[topic]中的节点发送publish消息;如果为空,就从pubsub.topic[topic]中向fanout[topic]补充节点至多6个节点,最后向这些节点发送publish消息。

其它节点收到publish消息后,会首先判断是否已经收到过这个publish消息,如果已经收到就将其丢弃;如果之前没收到,首先校验消息(如果publish消息附带签名,就检验签名),除了推送给本地的订阅了topic的对象,还会将该消息原封不动publish出去,过程同上,以此完成消息传递过程,这是一个典型的gossip过程。

为了回收空间,对于一个topic,如果1分钟不publish,就会delete fanout[topic]。

go-libp2p-pubsub/gossipsub.go

	// overlay parametersGossipSubD   = 6GossipSubDlo = 4GossipSubDhi = 12// gossip parametersGossipSubHistoryLength = 5GossipSubHistoryGossip = 3// heartbeat intervalGossipSubHeartbeatInitialDelay = 100 * time.MillisecondGossipSubHeartbeatInterval     = 1 * time.Second// fanout ttlGossipSubFanoutTTL = 60 * time.Second

Subscribe过程

当Subscribe一个topic时,首先会判断自己有没有订阅这个topic,即判断pubsub.myTopics[topic]是否为空,如果不为空,表明自己已经订阅,直接return; 如果为空,则join gossip mesh,首先mesh[topic]是否为空,如果不为空,表明之前已经join了,直接return; 如果为空,就判断fanout[topic]是否为空,如果不为空,表明自己在1min内publish过这个topic,则将fanout[topic]移出到mesh[topic],并delete fanout[topic]。 如果为空,从pubsub.topics[topic]补充6个节点到mesh[topic],并且向这些补充进来的节点发送iGraft消息,告诉它我把它加入到mesh[topic],其它节点收到iGraft消息后,也会把我加入到mesh[topic]中。

取消订阅时,首先会判断自己有没有订阅这个topic,即判断mesh[topic]是否为空,如果为空,表明自己没有订阅,直接return;如果不为空,表明自己已经订阅,则删除mesh[topic],并且向那些被剔除的节点发送iPrune消息,告诉它我把它从mesh[topic]网络剔除了,其它节点收到iPrune消息后,也会把我从mesh[topic]剔除。

消息交换&网络控制

为了确保网络节点在线,gossip每1S就会发送一个心跳包,以维护网络。心跳包的另外一个功能是交换信息,如iHave、iGraft和iPrune。

其它节点收到心跳包后,会判断iHave是否有自己想要的消息,如果有,那么向其发送iWant消息。

gossipSub每1S都会检查mesh[topic]的节点个数,如果少于6个节点则从pubsub.topics[topic]中补充,并且向这些补充进来的节点发送iGraft消息,告诉它我把它加入到mesh[topic],其它节点收到iGraft消息后,也会把我加入到mesh[topic]中;

如果多于6个节点,就会随机剔除多余的节点,并且向那些被剔除的节点发送iPrune消息,告诉它我把它从mesh[topic]网络剔除了,其它节点收到iPrune消息后,也会把我从mesh[topic]剔除。

RPC消息

pubsub使用RPC通信

// pubsub.go
type RPC struct {pb.RPC// unexported on purpose, not sending this over the wirefrom peer.ID
}// rpc.pb.go
type RPC struct {Subscriptions        []*RPC_SubOpts  `protobuf:"bytes,1,rep,name=subscriptions" json:"subscriptions,omitempty"`Publish              []*Message      `protobuf:"bytes,2,rep,name=publish" json:"publish,omitempty"`Control              *ControlMessage `protobuf:"bytes,3,opt,name=control" json:"control,omitempty"`
}type RPC_SubOpts struct {Subscribe            *bool    `protobuf:"varint,1,opt,name=subscribe" json:"subscribe,omitempty"`Topicid              *string  `protobuf:"bytes,2,opt,name=topicid" json:"topicid,omitempty"`
}type Message struct {From                 []byte   `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"`Data                 []byte   `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`Seqno                []byte   `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"`TopicIDs             []string `protobuf:"bytes,4,rep,name=topicIDs" json:"topicIDs,omitempty"`Signature            []byte   `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"`
}type ControlMessage struct {Ihave                []*ControlIHave `protobuf:"bytes,1,rep,name=ihave" json:"ihave,omitempty"`Iwant                []*ControlIWant `protobuf:"bytes,2,rep,name=iwant" json:"iwant,omitempty"`Graft                []*ControlGraft `protobuf:"bytes,3,rep,name=graft" json:"graft,omitempty"`Prune                []*ControlPrune `protobuf:"bytes,4,rep,name=prune" json:"prune,omitempty"`
}type ControlIHave struct {TopicID              *string  `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`MessageIDs           []string `protobuf:"bytes,2,rep,name=messageIDs" json:"messageIDs,omitempty"`
}type ControlIWant struct {MessageIDs           []string `protobuf:"bytes,1,rep,name=messageIDs" json:"messageIDs,omitempty"`
}type ControlGraft struct {TopicID              *string  `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
}type ControlPrune struct {TopicID              *string  `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
}

RPC_SubOpts封装的是订阅信息,即是否订阅某一topic。当有新peer连接的时候,ipfs1会和新peer握手(Helo),握手内容是Subscriptions,即当前订阅的topic。其它节点收到ipfs1发过来的hello(rpc)后,会把其peerID加入到pubsub.topics[topic]中,以便后续publish或者Subscribe该topic的时候,找得到订阅的peer。

Message封装的是publish消息,其中From是publisher;Data是内容数据;Seqno是序列号,用于标识版本,每次publish,Seqno加1(pubsub.counter++);TopicIDs是topic集;Signature是签名。

ControlMessage封装的是控制消息,分别是IWant, IHave、IGraft和IPrune, 其中后3者以topic为单位。


http://www.ppmy.cn/news/646574.html

相关文章

嵌入式linux之go语言开发(四)go语言8583协议报文解析

原来的pos用c语言开发的&#xff0c;与银联后台通信走的是8583协议。那么用go来做&#xff0c;得实现个go语言8583协议报文解析 且若想在电脑上跑交易&#xff0c;做个工具。用c语音处理起来不方便。用go还可以在电脑上跑交易。 于是用go语言做一个8583解析&#xff0c;方便使…

Go语言银联8583报文解析库,支持联小额免密付和银商聚合支付

很早之前就整过一个Go语言版的银联8583报文解析库&#xff0c;当时仅是8583报文的解析。 最近整合了进了银联小额双免交易和银商的聚合支付交易通道&#xff0c;这可以是网上最简单的8583报文解析库了。 银联双免支付通道支持银行卡免密和云闪付二维码交易&#xff0c;而银商…

华为oj: 图片整理

<pre name"code" class"cpp">/* Lily上课时使用字母数字图片教小朋友们学习英语单词&#xff0c; 每次都需要把这些图片按照大小&#xff08;ASCII码值从小到大&#xff09;排列收好。请大家给Lily帮忙&#xff0c;通过C语言解决。 eg:输入&#xff…

BACnet协议简要说明及组网简介

主题概要BACnet协议BACnet协议简要说明&#xff0c;组网简介编辑时间新建20160217序号参考资料1BACnet协议正文1995版2http://www.bacnet.org/Tutorial/BACnetIP/ 1 协议说明 BACnet协议(A Data Communication Protocol for Building Automation and Control Networks)&#…

linux 软盘启动程序,Linux 引导过程及原理-从软盘启动GRUB

制作启动盘后&#xff0c;可以用软盘启动引导硬盘上的操作系统。插入制作好的启动软盘&#xff0c;进入BIOS并设置用软盘启动。软盘启动成功后就会进入GRUB的命令行模式&#xff1a; grub> 要启动一个操作系统&#xff0c;首先指定引导哪个分区上的系统&#xff0c;例如要引…

英语口语练习十四之It's the most...that I have ever...(这个……是我最……)的用法

It’s the most…that I have ever… 这个……是我最……的 用法透视 这个句型用来表达某事物是你在同类事物经历中的之最。 支持范例 It’s the best movie I have ever seen. 这是我看过的最好的电影。You’re one of the nicest persons that I’ve ever met. 你是我见…

Hive实现先排序后分段分组功能,(按某一列,连续相同的值合并)

需求 元数据如&#xff1a;&#xff0c;需要整合成为 如果需将多行合并成一行&#xff0c;按A列排序后&#xff0c;再将B列合并连续值可以使用这个办法。 比如 我们测试了一些路线点&#xff0c;针对这些路线点的数据&#xff0c;可以筛选出某人的超速路段&#xff0c;如上图B…

从《雷神3》洛基下落30分钟学英语时态

前言 从洛基的下落开始说起&#xff1a; 洛基使用了现在完成进行时来描述他的下落&#xff0c;I have been falling for 30 minutes. 在英语时态中&#xff0c;“时“指动作发生的时间&#xff0c;”态“指动作的样子和状态。 “时”一共有4种&#xff1a;现在&#xff0c;…