0%

高性能队列Disruptor

Disruptor是线程之间用于消息传递的队列。比另一种内存消息队列ArrayBlockingQueue性能高一个数量级。

基于循环队列的“生产者-消费者模型”

生产者生产数据,并将数据放到一个中心存储容器中。
消费者从中心存储容器中,取出数据消费。

实现中心存储容器最常用的数据结构是队列。队列支持数据的先进先出,使得数据被消费的顺序性可以得到保证,早被生产的数据就会早被消费。

队列有两种实现思路:
基于链表实现的链式队列

无界队列,队列的大小事先不确定,理论上可以支持无线大。
基于数组实现的顺序队列
有界队列,队列的大小事先确定,当队列中数据满了之后,生产者就需要等待,直到消费者消费了数据,队列有空闲位置的时候,生产者才能将数据放入。
循环队列,特殊的顺序队列。
非循环的顺序队列在添加、删除数据的工程中,会涉及数据的搬移操作,导致性能变差。

基于加锁的并发“生产者-消费者模型”

多个生产者并发往队列中写入数据,多个消费者并发从队列中消费数据。

  • 多个生产者写入的数据可能会互相覆盖;
  • 多个消费者可能会读取重复的数据。

两个线程同时往队列中添加数据,相当于两个线程同时执行Queue中的add()函数。add函数中的操作并非原子操作。导致判断后插入数据时被覆盖。
消费时同理。

简单处理方法:给代码加锁,同一时间只允许一个线程执行add()函数。加锁将并行改成串行,导致多个生产者同时生产数据的时候执行效率下降。

优化处理方法:CAS(compare and swap,比较并交换),减少加锁的粒度。

基于无锁的并发“生产者-消费者模型”

Disruptor采用的是RingBuffer和AvailableBuffer这两个结构实现并发的“生产者-消费者模型”。

对生产者来说,它往队列中添加数据之前,先申请可用空闲存储单元,并且是批量地申请连续的n个(n>=1)存储单元。当申请到这组连续的存储单元之后,后续往队列中添加元素,就可以不用加锁了,因为这组存储单元是这个线程独享的。不过,申请存储单元的过程是需要加锁的。

对消费者来说,先去申请一批连续可读的存储单元(这个申请的过程也是需要加锁的),当申请到这批存储单元之后,后续的读取操作就可以不用加锁了。

如果生产者A申请到了一组连续的存储单元,假设是下标为3-6的存储单元,生产者B紧跟着申请到了下标7-9的存储单元,在3-6没有完全写入数据之前,7-9的数据是无法读取的。

常见的内存队列通常采用循环队列来实现。
Disruptor采用两阶段写入的方法解决并发产生的问题。在写入数据之前,先加锁申请批量的空闲存储单元,之后往队列中写入数据的操作就不需要加锁了,写入的性能提高。在消费数据之前,先加锁申请批量的可读取的存储单元,之后从队列中读取数据的操作也就不需要加锁了,读取的性能提高。