一、前言
1.1 基本介绍
NNG/nanomsg 是最近项目上使用到的一个通信库,用来实现进程间过程调用和线程间通信,很是方便。
NNG 是 nanomsg 的继任版本,而 nanomsg 则是流行的 ZMQ (一个简单好用的传输层,像框架一样的一个 socket library)的 C 重写版。
NNG 将通信使用的协议和传输分离,同一个协议可以工作在不同的传输层上,类似与 TCP/IP 的应用层和传输层的分层,同时接口上屏蔽了底层细节,统一用字符串 URL 来描述传输模式。这样当使用场景修改时,可以通过简单修改 URL 来实现适应,极具灵活性。
同时如 NNG 描述所言 “light-weight brokerless messaging”,NNG 中的通信各方是不需要第三方程序介入的,这与 MQTT/Redis 通信需要服务器不同。这样很适合作为通信库来使用而没有其他依赖。
1.2 通讯协议
- PAIR 一对一双向通信。
- PIPELINE(PUSH/PULL) 单向通信,类似与生产者消费者模型的消息队列。
- PUB/SUB 单向广播。
- REQ/REP 请求-应答模式,类似与 RPC 模式。
- BUS 网状连接通信,每个加入节点都可以发送/接受广播消息。
- SURVEY 用于多节点表决或者服务发现。
1.3 传输模式
- inproc 进程内线程间传输
- ipc 主机内进程间传输
- tcp 网络内主机间传输
1.4 通讯模式
通信协议里除了 PAIR 之外,基本都是一对多的通信模式,这点需要注意,以 PIPELINE 和 PUB/SUB 为例:
- PIPELINE 的 PUSH 端是 client,一个 PUSH 可以连接多个 PULL 端,发送数据时会选择其中一个可用的发送;PULL 端是 server,一个 PULL 可以接收多个 PUSH 连接和数据。
- PUB/SUB 的 SUB 端是 client,一个 SUB 可以连接多个不同的 PUB 端,接收多个 PUB 端广播的数据;PUB 端是 server,一个 PUB 可以接收多个 SUB 连接并广播数据。
基于以上,多个程序是没办法共用一个 PUB/SUB 通道来广播数据的,这与 ROS 里的 topic 和 LCM 中的 channel 模式不同。如果要实现类似功能,则可以使用 PIPELINE + PUB/SUB 来处理:
- 独立一个话题发布的程序,拥有一个 PULL 和 PUB。
- PULL 约定一个 URL,所有需要发布该话题的程序都 PUSH 数据到该 URL 上。
- PUB 约定一个 URL,所有需要获取该话题的程序都 SUB 到该 URL 上。
- 程序内部循环将 PULL 读取的数据发送到 PUB 上。
以上则可以模拟出 ROS topic 数据合并 或者 LCM 中 channel 的类似功能。
整体上看,NNG 的 API 很简约,主要是 4 个,open/recv/send/close,open 根据协议不同使用的函数会不同。配置则是 setopt/getopt,与 UNIX API 类似。API 中没有上下文环境(context-less)依赖,只需要一个 nng_socket,这种设计和实现方法值得去学习一下(初步揣测应该是使用指针值作为handle,如果要强制编译器做类型检测,则会套上一层 struct,如 typedef struct { _nng_xxx_socket * p } nng_socket;
)。
NNG 协议基本上囊括了常见的通信需求,一些特殊的需求,也可以通过组合协议来实现,比如上面的模拟 ROS topic 或者 LCM channel 的方法。这样一来,如果在程序中使用 NNG,不管是多进程,还是多线程,通过设计,可以进一步增强模块化,同时不乏灵活性。如果环境变化,程序不管是由多进程改成多线程,还是由多线程改成多主机,都很容易实现。
常见模块/进程/线程间通信,可以依据具体需求来使用 PIPELINE(消息队列) 还是 REQ/REP(过程调用),而不是锁+全局变量,每个模块单元只需要做单一相关的具体事务,无需知晓全局状态。
1.5 代码结构
nng.h:
nng对外暴露的 api 接口
transport.h:
通信层定义,主要是为了暴露给用户以实现扩展,但目前包含了utils下的相关头文件,其中inproc.h/ipc.h/tcp.h是对应的transport
protocol.h:
协议层定义,也是为了暴露给用户以实现扩展,其中reqrep.h/pubsub.h/bus.h/pair.h/pipeline.h/survey.h是对应的protocol
utils/:
实用工具包,包含基本数据结构(list/queue/hash)、互斥及原子操作(mutex/atomic)等
transports/:
通信层实现,包括(inproc:进程内通信;ipc:进程间通信;tcp:tcp通信)
protocols/:
协议层实现,包括(REQREP:请求响应;PUBSUB:订阅发布等)
core/:
通用代码
aio/:
线程池模拟的异步操作,带状态机的事件驱动等
二 结构介绍
2.1 nng_aio
一个异步 I/O 句柄。这个 aio 结构的细节是 AIO 框架私有的。该结构具有公共名称 (nng_aio),以便我们最大限度地减少公共 API 命名空间中的污染。 AIO 框架之外的任何东西访问这些成员中的任何一个都是一个编码错误——这里提供定义是为了方便内联,但这应该是唯一的用途。
2.2 nni_id_map
我们发现我们经常希望有一个由数字 ID 列出的事物列表,它通常是单调递增的。这通常是管道 ID。为了帮助保持这些事物的集合由它们的 ID(可能从一个非常大的值开始)索引,我们提供了一个哈希表。哈希表使用开放寻址,但我们使用更好的探针(取自 Python)以避免命中相同的位置。我们的哈希算法只是低位,我们使用的表大小是 2 的幂。请注意,散列项必须为非 NULL。该表受内部锁保护。
三、数据传输
3.1 发送数据
nng_sendmsg
nng_aio_set_timeout
nng_aio_set_msg
nng_send_aio
nni_aio_get_msg
nni_sock_find
nni_sock_send --> sock_send
nni_sock_rele
nng_aio_wait
nng_aio_result
3.2 接收数据
nng_recvmsg
nng_aio_set_timeout
nng_recv_aio
nni_sock_find
nni_sock_recv --> sock_recv
nni_sock_rele
nng_aio_wait
nng_aio_result
nng_aio_free
四、AIO
4.1 AIO 状态
AIO 结构可以携带最多 4 个不同的输入值,最多 4 个不同的输出值,以及最多 4 个不同的“私有状态”值。 输入和输出的含义由被调用的 I/O 函数决定。
typedef enum {
NNG_INIT_RECV = 0,
NNG_RECV_RET_SEND,
NNG_SEND_RET_RECV,
NNG_RECV_RET_RECV,
} nng_aio_state_t;
4.2 AIO 介绍
AIO 只能由调用者“完成”,调用者必须调用 nni_aio_finish 。在发生这种情况之前,调用者保证 AIO 有效。调用者必须保证一个 AIO 将“完成”(通过调用 nni_aio_finish )。
请注意,取消例程可能会被框架多次调用。框架(或消费者)保证 AIO 将在这些调用中保持有效,以便提供者可以自由地检查 aio 的列表成员资格等。但是提供者不能多次调用完成。
nni_aio_lk 用于保护 AIO 上的标志以及 AIO 上的过期列表。 如果到期未完成,我们将不允许将 AIO 标记为已完成。
为了与过期同步,我们将 aio 记录为过期,并在销毁它之前等待该记录被清除(或至少不等于 aio)。
aio 框架与 taskq 框架紧密结合。当调用者将 aio 标记为开始(使用 nni_aio_begin)时,我们为 aio“准备”任务,并将任务标记为忙碌。然后,当我们想知道操作本身是否完成时,我们所要做的就是等待任务完成(忙碌标志被清除)。
为了防止在拆卸期间 aio 重用,我们设置了 a_stop 标志。在该点之后为新操作初始化的任何尝试都将失败,并且调用者将获得 NNG_ECANCELED 指示这一点。调用 nni_aio_begin() 的提供者必须检查返回值,如果返回非零值 (NNG_ECANCELED),那么它必须简单地丢弃请求并返回。
调用 nni_aio_wait 等待当前未完成的操作完成,但不会阻止另一个操作在同一个 aio 上启动。要同步停止 aio 并防止在其上启动任何进一步的操作,请调用 nni_aio_stop。为了防止操作开始,而无需等待任何现有操作完成,请调用 nni_aio_close。
在某些地方,我们想检查 aio 是否未使用。从技术上讲,如果这些检查通过,那么它们就不需要用锁来完成,因为调用者应该拥有对它们的唯一引用。然而,竞争检测器不一定知道这个语义,并且可能会抱怨潜在的数据竞争。要抑制误报,请定义 NNG_RACE_DETECTOR。注意这会导致获取额外的锁,影响性能,所以不要在生产中使用它。