并发容器Java SDK提供了2个有界队列:ArrayBlockingQueue和LinkedBlockingQueue,都是基于ReentrantLock实现的,在高并发场景下,锁的效率并不高,替代品:Disruptor。
Disruptor是一款高性能的有界内存队列。
- 内存分配更加合理,使用RingBuffer数据结构,数组元素在初始化时一次性全部创建,提升缓存命中率;对象循环利用,避免频繁GC。
- 能够避免伪共享,提升缓存利用率。
- 采用无锁算法,避免频繁加锁、解锁的性能消耗。
- 支持批量消费,消费者可以无锁方式消费多个消息。
1 | //自定义Event |
Disruptor的使用比Java SDK提供BlockingQueue要复杂一些,但总体思路一致:
- 在Disruptor中,生产者生产的对象(也就是消费者消费的对象)称为Event,使用Disruptor必须自定义Event,如示例代码的自定义Event是LongEvent。
- 构建Disruptor对象除了要指定队列大小外,还需要传入一个EventFactory,示例代码中传入的是LongEvent::new。
- 消费Disruptor中的Event需要通过handleEventsWith()方法注册一个事件处理器,发布Event则需要通过publishEvent()方法。
RingBuffer提升性能
Java SDK中ArrayBlockingQueue使用数组作为底层的数据存储,而Disruptor是使用RingBuffer作为数据存储。RingBuffer本质上也是数组,但是Disruptor在RingBuffer的基础上还做了很多优化,包括内存分配相关的优化。
程序的局部性原理:指的是在一段时间内程序的执行会限定在一个局部范围内。这里的局限性可以从两个方面来理解,一个是时间局部性,一个是空间局部性。时间局部性指的是程序中的某条指令一旦被执行,不久之后这条指令很可能再次被执行;如果魔偶天数据被访问,不久之后这条数据很可能被再次访问。而空间局部性是指某块内存一旦被访问,不久之后这块内存附近的内存也可能被访问。
CPU的缓存利用了程序的局部性原理:CPU从内存中加载数据X时,会将数据X缓存在高速缓存Cache中,实际上CPU缓存X的同时,还缓存了X周围的数据,因为根据程序具备局部性原理,X周围的数据也很有可能被访问。
从另外一个角度看,如果程序能够很好的体现出局部性原理,也就能更好的利用CPU缓存,从而提升程序的性能。
对比ArrayBlockingQueue,分析Disruptor中的RingBuffer。
生产者线程向ArrayBlockingQueue增加一个元素,每次增加元素E之前,都需要创建一个对象E。
ArrayBlockingQueue内部的多个元素都是由生产者线程创建的,由于创建这些元素的时间基本上是离散的,所以这些元素的内存地址大概率也不是连续的。
Disruptor内部的RingBuffer也是用数组实现的,但是这个数组中的所有元素在初始化时是一次性全部创建的,所以这些元素的内存地址大概率是连续的。
1 | for(int i = 0; i < bufferSize; i++) { |

Disruptor内的数组中所有元素内存地址连续能够提升性能。消费者线程在消费的时候,是遵循空间局部性原理的,消费完第1个元素,很快就会消费第2个元素;当消费第1个元素E1的时候,CPU会把内存E1后面的数据也加载进Cache,如果E1和E2在内存中的地址是连续的,那么E2也就会被加载进Cache中,然后当消费第2个元素的时候,由于E2已经在Cache中了,就不需要从内存中加载了,这样大大提升了性能。
在Disruptor中,生产者线程通过publishEvent()发布Event的时候,并不是创建一个新的Event,而是通过event.set()方法修改Event,即RingBuffer创建的Event是可以循环利用的(循环数组),这样还能避免频繁创建、删除Event导致的频繁GC问题。
避免伪共享
高效利用Cache能够大大提升性能,一方面要努力构建能够高效利用Cache的内存结构,另一方面要努力避免不能高效利用Cache的内存结构。
伪共享(False sharing)的内存布局会使Cache失效:伪共享和CPU内部的Cache有关;CPU内部是按照缓存行(Cache Ling)管理的,缓存行的大小通常是64个字节;CPU从内存中加载数据X,会同时加载X后面(64-size(X))个字节的数据。
1 | //队列数组 |
Java SDK中ArrayBlockingQueue,其内部维护了4个成员变量,分别是队列数组item、出队索引takeIndex、入队索引putIndex以及队列中的元素总数count。
当CPU从内存中加载takeIndex的时候,会同时将putIndex以及count都加载进Cache。

某个时刻CPU中Cache的状况(缓存行中仅列出了takeIndex和putIndex,简化了count)。
假设线程A运行在CPU-1上,执行入队操作,入队操作会修改putIndex,而修改putIndex会导致其所在的所有核上的缓存行均失效;此时假设运行在CPU-2上的线程执行出队操作,出队操作需要读取takeIndex,由于takeIndex所在的缓存行已经失效,所以CPU-2必须从内存中重新读取。
入队操作本不会修改takeIndex,但是由于takeIndex和putIndex共享的是一个缓存行,就导致出队操作不能很好的利用Cache,这就是伪共享。伪共享指的是由于共享缓存行导致缓存无效的场景。

ArrayBlockingQueue的入队和出队操作是用锁来保证互斥的,所以入队和出队不会同时发生。如果允许入队和出队同时发生,那就会导致线程A和线程B争用同一个缓存行,这样也会导致性能问题。为了更好的利用缓存,必须避免伪共享。
每个变量独占一个缓存行、不共享缓存行就可以避免伪共享,具体的技术是缓存行填充。想让takeIndex独占一个缓存行,可以在takeIndex的前后各填充56个字节,这样就一定能保证takeIndex独占一个缓存行。
1 | //前:填充56字节 |
如Disruptor中示例代码,Sequence对象中的value属性就能避免伪共享,这个属性前后都填充了56个字节。
Disruptor中很多对象,如RingBuffer、RingBuffer内部的数组都用到了这种填充技术来避免伪共享。
无锁算法
ArrayBlockingQueue是利用管程实现的,生产、消费操作都需要加锁,实现起来简单,但性能不理想。
Disruptor采用的是无锁算法,复杂但核心无非是生产和消费两个操作。
对于入队操作,最关键的要求是不能覆盖没有消费的元素;对于出队操作,最关键的要求是不能读取没有写入的元素,所以Disruptor中也一定会维护类似出队索引和入队索引这样两个关键变量。
Disruptor中的RingBuffer维护了入队索引,但是并没有维护出队索引,这是因为在Disruptor中多个消费者可以同时消费,每个消费者都会有一个出队索引,所以RingBuffer的出队索引是所有消费者里面最小的那个。
1 | //生产者获取n个写入位置 |
如果没有足够的空余位置,就出让CPU使用权,然后重新计算,反之则用CAS设置入队索引。
总结
Disruptor在优化并发性能方面做到了极致,优化的思路大体是两个方面,一个是利用无锁算法避免锁的争用,另外一个则是将硬件(CPU)的性能发挥到极致(优化内存布局)。
Java8中提供了避免伪共享的注解:@sun.misc.Contended(需要设置JVM参数-XX:RestrictContended),可以避免伪共享,但是以牺牲内存为代价的。