一、环形队列
环形队列是数据结构里一个常用的数据结构。一般来说,开发者对其基本都清楚,即使不清楚的翻翻书也就明白了。这里重点不再于讲环形队列的基本实现,那个资料太多了,这里也不再狗尾续貂。
环形队列在实际应用中最常用的方法是一个数组(或者链表),把尾和头在某种条件下连接起来。可以用取余的方式,也可以用指针的方式。但一般都会有一个置空位,防止front和rear指针的相遇(目的是为了明确的判断全空载和满载的不同)。这样,当指针read+1==front时,环形队列就是满的。因此,环形队列的容量是固定的。
也就是说,环形队列是逻辑上的环形,不是一个真正的环形,计算机的内存都是平坦的,怎么可能拐个弯儿?
但在实际应用中还有另外一种情况,就是做为一种数据缓冲区时,对数据的位置不敏感,数据被定时刷新时,就可以不留置空区。直接front == rear++%N,这样做的好处是,处理会更简单方便。每次给数据只要从下一个序号开始到本序号截止,即为缓冲区内的全部数据。
二、DPDK中的环形队列
既然简单回顾了一下环形队列,那么在DPDK中的环形缓冲队列是什么样的呢?DPDK的环形队列是一个无锁的环形队列,它借鉴了Linux内核中的kfifo无锁队列,听名字就可以大概判断出,DPDK的rte_ring是一个无锁的FIFO(先进先出)队列。它有下面一些特点:
1、多对多队列,即可以多生产者或单生产者入队;多消费者或单消费者出队
2、无锁,即使用CAS实现无锁进出
3、批量出入队列
4、支持突发出入队列。
当然,环形队列本身的一些特点它也都具有。下面重点说一下环形队列的出入队,rte_ring的出入队看似复杂,其实有规律可寻,网上的很多资料,其实只是对官网的一种简单翻译,这里进行一下初步的总结:
1、在队列中有两对头尾指针,一个指向生产者的头尾,一个指向消费者的头尾。为什么不是两个而是两对?其实很容易理解,因为要批量插入,如果是一个一个的插入,就不需要两对了。这也是为什么一个个入队时,单对的头尾指针相同的原因。
2、为什么开始生产操作时,只操作生产者那对指针中的头指针(反之,消费者只操作尾)?因为头指针会移动插入的数量大小,始终保持前进(反之,尾始终保持后退)。这样,如果是一个逻辑环形的队列时,可以看到它们都是朝着一个方向在前进(顺时针),形成FIFO。然后另外的两个指针可以根据实际的生产和消费过程推导出来。
3、需要注意的是,循环队列需要一个哨兵(置空区),官网的说明应该是头指针不存储数据做为哨兵。
4、多核的入队出队,涉及到局部变量和rte_ring的相关变量的更新问题,使用CAS来实现(CAS有不明白的可以查找一下相关资料)。这里分析一下,如果只是一个核心(一个进程或线程),那么问题就简单了,更新一下,就结束了。可以把CAS理解成一把硬件锁(本身也是),两个核(上的进程或线程)同时入队(出队)完成后更新状态时,1核更新成功时,2核不可能成功,但1核成功后2核更新状态后可以导致2核再次CAS时成功(这个要不明白,就得翻CAS的相关资料了)。推理可到N个核,所以这里CAS也需要N次。
5、环形队列为什么有Mask码,目的其实就是为了方便快捷的处理溢出的情况下自动取模。官网的例子中:(uint16) (6000-59000)%65536 = 12536(其实就是溢出时让65535减去计算值的反向正值,上面就是65536-(59000-6000)=12536);就是这个意思。
6、其它的对齐等情况可以在学习时,暂时忽略
三、源码分析
弄明白了上面的逻辑,再看代码就明白很多了。
先看一下定义:
/* structure to hold a pair of head/tail values and other metadata */
struct rte_ring_headtail {volatile uint32_t head; /**< Prod/consumer head. */volatile uint32_t tail; /**< Prod/consumer tail. */uint32_t single; /**< True if single prod/cons */
};/*** An RTE ring structure.** The producer and the consumer have a head and a tail index. The particularity* of these index is that they are not between 0 and size(ring). These indexes* are between 0 and 2^32, and we mask their value when we access the ring[]* field. Thanks to this assumption, we can do subtractions between 2 index* values in a modulo-32bit base: that's why the overflow of the indexes is not* a problem.*/
struct rte_ring {/** Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI* compatibility requirements, it could be changed to RTE_RING_NAMESIZE* next time the ABI changes*/char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */int flags; /**< Flags supplied at creation. */const struct rte_memzone *memzone;/**< Memzone, if any, containing the rte_ring */uint32_t size; /**< Size of ring. */uint32_t mask; /**< Mask (size-1) of ring. */uint32_t capacity; /**< Usable size of ring */char pad0 __rte_cache_aligned; /**< empty cache line *//** Ring producer status. */struct rte_ring_headtail prod __rte_cache_aligned;char pad1 __rte_cache_aligned; /**< empty cache line *//** Ring consumer status. */struct rte_ring_headtail cons __rte_cache_aligned;char pad2 __rte_cache_aligned; /**< empty cache line */
};
再看一下创建:
int
rte_ring_init(struct rte_ring *r, const char *name, unsigned count,unsigned flags)
{int ret;/* compilation-time checks */RTE_BUILD_BUG_ON((sizeof(struct rte_ring) &RTE_CACHE_LINE_MASK) != 0);RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) &RTE_CACHE_LINE_MASK) != 0);RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &RTE_CACHE_LINE_MASK) != 0);/* init the ring structure */memset(r, 0, sizeof(*r));ret = strlcpy(r->name, name, sizeof(r->name));if (ret < 0 || ret >= (int)sizeof(r->name))return -ENAMETOOLONG;r->flags = flags;r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;if (flags & RING_F_EXACT_SZ) {r->size = rte_align32pow2(count + 1);r->mask = r->size - 1;r->capacity = count;} else {if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {RTE_LOG(ERR, RING,"Requested size is invalid, must be power of 2, and not exceed the size limit %u\n",RTE_RING_SZ_MASK);return -EINVAL;}r->size = count;r->mask = count - 1;r->capacity = r->mask;}r->prod.head = r->cons.head = 0;r->prod.tail = r->cons.tail = 0;return 0;
}/* create the ring */
struct rte_ring *
rte_ring_create(const char *name, unsigned count, int socket_id,unsigned flags)
{char mz_name[RTE_MEMZONE_NAMESIZE];struct rte_ring *r;struct rte_tailq_entry *te;const struct rte_memzone *mz;ssize_t ring_size;int mz_flags = 0;struct rte_ring_list* ring_list = NULL;const unsigned int requested_count = count;int ret;ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);/* for an exact size ring, round up from count to a power of two */if (flags & RING_F_EXACT_SZ)count = rte_align32pow2(count + 1);ring_size = rte_ring_get_memsize(count);if (ring_size < 0) {rte_errno = -ring_size;return NULL;}ret = snprintf(mz_name, sizeof(mz_name), "%s%s",RTE_RING_MZ_PREFIX, name);if (ret < 0 || ret >= (int)sizeof(mz_name)) {rte_errno = ENAMETOOLONG;return NULL;}te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0);if (te == NULL) {RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");rte_errno = ENOMEM;return NULL;}rte_mcfg_tailq_write_lock();/* reserve a memory zone for this ring. If we can't get rte_config or* we are secondary process, the memzone_reserve function will set* rte_errno for us appropriately - hence no check in this this function */mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id,mz_flags, __alignof__(*r));if (mz != NULL) {r = mz->addr;/* no need to check return value here, we already checked the* arguments above */rte_ring_init(r, name, requested_count, flags);te->data = (void *) r;r->memzone = mz;TAILQ_INSERT_TAIL(ring_list, te, next);} else {r = NULL;RTE_LOG(ERR, RING, "Cannot reserve memory\n");rte_free(te);}rte_mcfg_tailq_write_unlock();return r;
}
重点看一下出入队列,即生产和消费(lib/librte_ring.h):
1、看一下单生产入队:
/*** Enqueue one object on a ring (NOT multi-producers safe).** @param r* A pointer to the ring structure.* @param obj* A pointer to the object to be added.* @return* - 0: Success; objects enqueued.* - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.*/
static __rte_always_inline int
rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
{return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
}
static __rte_always_inline unsigned int
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,unsigned int n, unsigned int *free_space)
{return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,__IS_SP, free_space);
}
static __rte_always_inline unsigned int
__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,unsigned int n, enum rte_ring_queue_behavior behavior,unsigned int is_sp, unsigned int *free_space)
{uint32_t prod_head, prod_next;uint32_t free_entries;n = __rte_ring_move_prod_head(r, is_sp, n, behavior,&prod_head, &prod_next, &free_entries);if (n == 0)goto end;ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
end:if (free_space != NULL)*free_space = free_entries - n;return n;
}
#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \unsigned int i; \const uint32_t size = (r)->size; \uint32_t idx = prod_head & (r)->mask; \obj_type *ring = (obj_type *)ring_start; \if (likely(idx + n < size)) { \for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \ring[idx] = obj_table[i]; \ring[idx+1] = obj_table[i+1]; \ring[idx+2] = obj_table[i+2]; \ring[idx+3] = obj_table[i+3]; \} \switch (n & 0x3) { \case 3: \ring[idx++] = obj_table[i++]; /* fallthrough */ \case 2: \ring[idx++] = obj_table[i++]; /* fallthrough */ \case 1: \ring[idx++] = obj_table[i++]; \} \} else { \for (i = 0; idx < size; i++, idx++)\ring[idx] = obj_table[i]; \for (idx = 0; i < n; i++, idx++) \ring[idx] = obj_table[i]; \} \
} while (0)
上面的代码最终调用宏代码中,其实是将插入的数量以四个为基准,分批插入,不足的在后面的switch中再完成。
2、单消费者出队:
/*** Dequeue one object from a ring (NOT multi-consumers safe).** @param r* A pointer to the ring structure.* @param obj_p* A pointer to a void * pointer (object) that will be filled.* @return* - 0: Success; objects dequeued.* - -ENOENT: Not enough entries in the ring to dequeue, no object is* dequeued.*/
static __rte_always_inline int
rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
{return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
}
static __rte_always_inline unsigned int
rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,unsigned int n, unsigned int *available)
{return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,__IS_SC, available);
}
static __rte_always_inline unsigned int
__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,unsigned int n, enum rte_ring_queue_behavior behavior,unsigned int is_sc, unsigned int *available)
{uint32_t cons_head, cons_next;uint32_t entries;n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,&cons_head, &cons_next, &entries);if (n == 0)goto end;DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);update_tail(&r->cons, cons_head, cons_next, is_sc, 0);end:if (available != NULL)*available = entries - n;return n;
}
#define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \unsigned int i; \uint32_t idx = cons_head & (r)->mask; \const uint32_t size = (r)->size; \obj_type *ring = (obj_type *)ring_start; \if (likely(idx + n < size)) { \for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\obj_table[i] = ring[idx]; \obj_table[i+1] = ring[idx+1]; \obj_table[i+2] = ring[idx+2]; \obj_table[i+3] = ring[idx+3]; \} \switch (n & 0x3) { \case 3: \obj_table[i++] = ring[idx++]; /* fallthrough */ \case 2: \obj_table[i++] = ring[idx++]; /* fallthrough */ \case 1: \obj_table[i++] = ring[idx++]; \} \} else { \for (i = 0; idx < size; i++, idx++) \obj_table[i] = ring[idx]; \for (idx = 0; i < n; i++, idx++) \obj_table[i] = ring[idx]; \} \
} while (0)
3、多生产入队
/*** Enqueue one object on a ring (multi-producers safe).** This function uses a "compare and set" instruction to move the* producer index atomically.** @param r* A pointer to the ring structure.* @param obj* A pointer to the object to be added.* @return* - 0: Success; objects enqueued.* - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.*/
static __rte_always_inline int
rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
{return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
}
static __rte_always_inline unsigned int
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,unsigned int n, unsigned int *free_space)
{return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,__IS_MP, free_space);
}
最终mp,sp的处理走到了一起。
4、多消费者出队
static __rte_always_inline int
rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
{return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
}
static __rte_always_inline unsigned int
rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,unsigned int n, unsigned int *available)
{return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,__IS_MC, available);
}
突传的几个函数自己看一看就OK了。
重点看一下那个入队时的mp和sp的标记导致的代码不同:
static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,unsigned int n, enum rte_ring_queue_behavior behavior,uint32_t *old_head, uint32_t *new_head,uint32_t *free_entries)
{const uint32_t capacity = r->capacity;unsigned int max = n;int success;do {/* Reset n to the initial burst count */n = max;*old_head = r->prod.head;/* add rmb barrier to avoid load/load reorder in weak* memory model. It is noop on x86*/rte_smp_rmb();/** The subtraction is done between two unsigned 32bits value* (the result is always modulo 32 bits even if we have* *old_head > cons_tail). So 'free_entries' is always between 0* and capacity (which is < size).*/*free_entries = (capacity + r->cons.tail - *old_head);/* check that we have enough room in ring */if (unlikely(n > *free_entries))n = (behavior == RTE_RING_QUEUE_FIXED) ?0 : *free_entries;if (n == 0)return 0;*new_head = *old_head + n;if (is_sp)r->prod.head = *new_head, success = 1;elsesuccess = rte_atomic32_cmpset(&r->prod.head,*old_head, *new_head);} while (unlikely(success == 0));return n;
}
static __rte_always_inline void
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,uint32_t single, uint32_t enqueue)
{RTE_SET_USED(enqueue);/** If there are other enqueues/dequeues in progress that preceded us,* we need to wait for them to complete*/if (!single)while (unlikely(ht->tail != old_val))rte_pause();__atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
}
上面的代码中对is_sp的处理和对!single的处理就体现了CAS。消费者最终也有类似的处理,这里就不再多贴代码了。
四、总结
DPDK中的环形队列只要掌握了其基本的内存管理知识和应用场景,就会很容易的掌握其中的内在的设计逻辑。从此处再推代码,互相印证,则可事半功倍。千万不要一开始就陷入代码的海洋,看似用功很多,则收效甚微。甚至学习不久就被劝退。
整体把握,细节推敲,接口调配,三者共同推进,则学习别人的代码就会变得简单很多。