Package rx.internal.util.unsafe
Class MpmcArrayQueue<E>
java.lang.Object
java.util.AbstractCollection<E>
java.util.AbstractQueue<E>
rx.internal.util.unsafe.ConcurrentCircularArrayQueueL0Pad<E>
rx.internal.util.unsafe.ConcurrentCircularArrayQueue<E>
rx.internal.util.unsafe.ConcurrentSequencedCircularArrayQueue<E>
rx.internal.util.unsafe.MpmcArrayQueueL1Pad<E>
rx.internal.util.unsafe.MpmcArrayQueueProducerField<E>
rx.internal.util.unsafe.MpmcArrayQueueL2Pad<E>
rx.internal.util.unsafe.MpmcArrayQueueConsumerField<E>
rx.internal.util.unsafe.MpmcArrayQueue<E>
- Type Parameters:
E
- type of the element stored in theQueue
- All Implemented Interfaces:
Iterable<E>
,Collection<E>
,Queue<E>
,MessagePassingQueue<E>
A Multi-Producer-Multi-Consumer queue based on a
This implementation follows patterns documented on the package level for False Sharing protection.
The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See here). The original algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each field of the struct. There is a further alternative in the experimental project which uses iteration phase markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as well as this implementation.
Tradeoffs to keep in mind:
ConcurrentCircularArrayQueue
. This implies that
any and all threads may call the offer/poll/peek methods and correctness is maintained. This implementation follows patterns documented on the package level for False Sharing protection.
The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See here). The original algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each field of the struct. There is a further alternative in the experimental project which uses iteration phase markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as well as this implementation.
Tradeoffs to keep in mind:
- Padding for false sharing: counter fields and queue fields are all padded as well as either side of both arrays. We are trading memory to avoid false sharing(active and passive).
- 2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the elements array. This is doubling/tripling the memory allocated for the buffer.
- Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or equal to the requested capacity.
-
Field Summary
FieldsModifier and TypeFieldDescription(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
Fields inherited from class rx.internal.util.unsafe.MpmcArrayQueueL2Pad
p20, p21, p22, p23, p24, p25, p26
Fields inherited from class rx.internal.util.unsafe.MpmcArrayQueueL1Pad
p10, p11, p12, p13, p14, p15, p16
Fields inherited from class rx.internal.util.unsafe.ConcurrentSequencedCircularArrayQueue
sequenceBuffer
Fields inherited from class rx.internal.util.unsafe.ConcurrentCircularArrayQueue
buffer, BUFFER_PAD, mask, SPARSE_SHIFT
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionboolean
isEmpty()
This method's accuracy is subject to concurrent modifications happening as the observation is carried out.boolean
Called from a producer thread subject to the restrictions appropriate to the implementation and according to theQueue.offer(Object)
interface.peek()
Called from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.peek()
interface.poll()
Called from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.poll()
interface.int
size()
This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a best effort rather than absolute value.Methods inherited from class rx.internal.util.unsafe.MpmcArrayQueueConsumerField
casConsumerIndex, lvConsumerIndex
Methods inherited from class rx.internal.util.unsafe.MpmcArrayQueueProducerField
casProducerIndex, lvProducerIndex
Methods inherited from class rx.internal.util.unsafe.ConcurrentSequencedCircularArrayQueue
calcSequenceOffset, lvSequence, soSequence
Methods inherited from class rx.internal.util.unsafe.ConcurrentCircularArrayQueue
calcElementOffset, calcElementOffset, clear, iterator, lpElement, lpElement, lvElement, lvElement, soElement, soElement, spElement, spElement
Methods inherited from class java.util.AbstractQueue
add, addAll, element, remove
Methods inherited from class java.util.AbstractCollection
contains, containsAll, remove, removeAll, retainAll, toArray, toArray, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface java.util.Collection
contains, containsAll, equals, hashCode, parallelStream, remove, removeAll, removeIf, retainAll, spliterator, stream, toArray, toArray, toArray
-
Field Details
-
p40
long p40 -
p41
long p41 -
p42
long p42 -
p43
long p43 -
p44
long p44 -
p45
long p45 -
p46
long p46 -
p30
long p30 -
p31
long p31 -
p32
long p32 -
p33
long p33 -
p34
long p34 -
p35
long p35 -
p36
long p36 -
p37
long p37
-
-
Constructor Details
-
MpmcArrayQueue
public MpmcArrayQueue(int capacity)
-
-
Method Details
-
offer
Description copied from interface:MessagePassingQueue
Called from a producer thread subject to the restrictions appropriate to the implementation and according to theQueue.offer(Object)
interface.- Parameters:
e
-- Returns:
- true if element was inserted into the queue, false iff full
-
poll
Called from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.poll()
interface.Because return null indicates queue is empty we cannot simply rely on next element visibility for poll and must test producer index when next element is not visible.
- Returns:
- a message from the queue if one is available, null iff empty
-
peek
Description copied from interface:MessagePassingQueue
Called from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.peek()
interface.- Returns:
- a message from the queue if one is available, null iff empty
-
size
public int size()Description copied from interface:MessagePassingQueue
This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).- Specified by:
size
in interfaceCollection<E>
- Specified by:
size
in interfaceMessagePassingQueue<E>
- Specified by:
size
in classAbstractCollection<E>
- Returns:
- number of messages in the queue, between 0 and queue capacity or
Integer.MAX_VALUE
if not bounded
-
isEmpty
public boolean isEmpty()Description copied from interface:MessagePassingQueue
This method's accuracy is subject to concurrent modifications happening as the observation is carried out.- Specified by:
isEmpty
in interfaceCollection<E>
- Specified by:
isEmpty
in interfaceMessagePassingQueue<E>
- Overrides:
isEmpty
in classAbstractCollection<E>
- Returns:
- true if empty, false otherwise
-