Class RxRingBuffer

java.lang.Object
rx.internal.util.RxRingBuffer
All Implemented Interfaces:
Subscription

public class RxRingBuffer extends Object implements Subscription
This assumes Spsc or Spmc usage. This means only a single producer calling the on* methods. This is the Rx contract of an Observer (see http://reactivex.io/documentation/contract.html). Concurrent invocations of on* methods will not be thread-safe.
  • Field Details

    • ON

      private static final NotificationLite<Object> ON
      Queue implementation testing that led to current choices of data structures: With synchronized LinkedList
       
       Benchmark                                        Mode   Samples        Score  Score error    Units
       r.i.RxRingBufferPerf.ringBufferAddRemove1       thrpt         5 19118392.046  1002814.238    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1000    thrpt         5    17891.641      252.747    ops/s
       
       With MpscPaddedQueue (single consumer, so failing 1 unit test)
       
       Benchmark                                        Mode   Samples        Score  Score error    Units
       r.i.RxRingBufferPerf.ringBufferAddRemove1       thrpt         5 22164483.238  3035027.348    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1000    thrpt         5    23154.303      602.548    ops/s
       
       
       With ConcurrentLinkedQueue (tracking count separately)
       
       Benchmark                                        Mode   Samples        Score  Score error    Units
       r.i.RxRingBufferPerf.ringBufferAddRemove1       thrpt         5 17353906.092   378756.411    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1000    thrpt         5    19224.411     1010.610    ops/s
       
       With ConcurrentLinkedQueue (using queue.size() method for count)
       
       Benchmark                                        Mode   Samples        Score  Score error    Units
       r.i.RxRingBufferPerf.ringBufferAddRemove1       thrpt         5 23951121.098  1982380.330    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1000    thrpt         5     1142.351       33.592    ops/s
       
       With SynchronizedQueue (synchronized LinkedList ... no object pooling)
       
       r.i.RxRingBufferPerf.createUseAndDestroy1       thrpt         5 33231667.136   685757.510    ops/s
       r.i.RxRingBufferPerf.createUseAndDestroy1000    thrpt         5    74623.614     5493.766    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1       thrpt         5 22907359.257   707026.632    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1000    thrpt         5    22222.410      320.829    ops/s
       
       With ArrayBlockingQueue
       
       Benchmark                                            Mode   Samples        Score  Score error    Units
       r.i.RxRingBufferPerf.createUseAndDestroy1       thrpt         5  2389804.664    68990.804    ops/s
       r.i.RxRingBufferPerf.createUseAndDestroy1000    thrpt         5    27384.274     1411.789    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1       thrpt         5 26497037.559    91176.247    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1000    thrpt         5    17985.144      237.771    ops/s
       
       With ArrayBlockingQueue and Object Pool
       
       Benchmark                                            Mode   Samples        Score  Score error    Units
       r.i.RxRingBufferPerf.createUseAndDestroy1       thrpt         5 12465685.522   399070.770    ops/s
       r.i.RxRingBufferPerf.createUseAndDestroy1000    thrpt         5    27701.294      395.217    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1       thrpt         5 26399625.086   695639.436    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1000    thrpt         5    17985.427      253.190    ops/s
       
       With SpscArrayQueue (single consumer, so failing 1 unit test)
        - requires access to Unsafe
       
       Benchmark                                        Mode   Samples        Score  Score error    Units
       r.i.RxRingBufferPerf.createUseAndDestroy1       thrpt         5  1922996.035    49183.766    ops/s
       r.i.RxRingBufferPerf.createUseAndDestroy1000    thrpt         5    70890.186     1382.550    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1       thrpt         5 80637811.605  3509706.954    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1000    thrpt         5    71822.453     4127.660    ops/s
       
       
       With SpscArrayQueue and Object Pool (object pool improves createUseAndDestroy1 by 10x)
       
       Benchmark                                        Mode   Samples        Score  Score error    Units
       r.i.RxRingBufferPerf.createUseAndDestroy1       thrpt         5 25220069.264  1329078.785    ops/s
       r.i.RxRingBufferPerf.createUseAndDestroy1000    thrpt         5    72313.457     3535.447    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1       thrpt         5 81863840.884  2191416.069    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1000    thrpt         5    73140.822     1528.764    ops/s
       
       With SpmcArrayQueue
        - requires access to Unsafe
        
       Benchmark                                            Mode   Samples        Score  Score error    Units
       r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1       thrpt         5 27630345.474   769219.142    ops/s
       r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1000    thrpt         5    80052.046     4059.541    ops/s
       r.i.RxRingBufferPerf.spmcRingBufferAddRemove1       thrpt         5 44449524.222   563068.793    ops/s
       r.i.RxRingBufferPerf.spmcRingBufferAddRemove1000    thrpt         5    65231.253     1805.732    ops/s
       
       With SpmcArrayQueue and ObjectPool (object pool improves createUseAndDestroy1 by 10x)
       
       Benchmark                                        Mode   Samples        Score  Score error    Units
       r.i.RxRingBufferPerf.createUseAndDestroy1       thrpt         5 18489343.061  1011872.825    ops/s
       r.i.RxRingBufferPerf.createUseAndDestroy1000    thrpt         5    46416.434     1439.144    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove        thrpt         5 38280945.847  1071801.279    ops/s
       r.i.RxRingBufferPerf.ringBufferAddRemove1000    thrpt         5    42337.663     1052.231    ops/s
       
       --------------
       
       When UnsafeAccess.isUnsafeAvailable() == true we can use the Spmc/SpscArrayQueue implementations.
       
        
    • queue

      private Queue<Object> queue
    • size

      private final int size
    • pool

      private final ObjectPool<Queue<Object>> pool
    • terminalState

      public volatile Object terminalState
      We store the terminal state separately so it doesn't count against the size. We don't just +1 the size since some of the queues require sizes that are a power of 2. This is a subjective thing ... wanting to keep the size (ie 1024) the actual number of onNext that can be sent rather than something like 1023 onNext + 1 terminal event. It also simplifies checking that we have received only 1 terminal event, as we don't need to peek at the last item or retain a boolean flag.
    • defaultSize

      static int defaultSize
      128 was chosen as the default based on the numbers below. A stream processing system may benefit from increasing to 512+.
       
       ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorObserveOnPerf.*'
       
       1024
       
       Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
       r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   100642.874    24676.478    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     4095.901       90.730    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5        9.797        4.982    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 15536155.489   758579.454    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   156257.341     6324.176    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      157.099        7.143    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    16864.641     1826.877    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     4269.317      169.480    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       13.393        1.047    ops/s
       
       512
       
       Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
       r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5    98945.980    48050.282    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     4111.149       95.987    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5       12.483        3.067    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16032469.143   620157.818    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   157997.290     5097.718    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      156.462        7.728    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    15813.984     8260.170    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     4358.334      251.609    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       13.647        0.613    ops/s
       
       256
       
       Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
       r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   108489.834     2688.489    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     4526.674      728.019    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5       13.372        0.457    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16435113.709   311602.627    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   157611.204    13146.108    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      158.346        2.500    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    16976.775      968.191    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     6238.210     2060.387    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       13.465        0.566    ops/s
       
       128
       
       Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
       r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   106887.027    29307.913    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     6713.891      202.989    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5       11.929        0.187    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16055774.724   350633.068    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   153403.821    17976.156    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      153.559       20.178    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    17172.274      236.816    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     7073.555      595.990    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       11.855        1.093    ops/s
       
       32
       
       Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
       r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   106128.589    20986.201    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     6396.607       73.627    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5        7.643        0.668    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16012419.447   409004.521    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   157907.001     5772.849    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      155.308       23.853    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    16927.513      606.692    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     5191.084      244.876    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5        8.288        0.217    ops/s
       
       16
       
       Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
       r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   109974.741      839.064    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     4538.912      173.561    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5        5.420        0.111    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16017466.785   768748.695    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   157934.065    13479.575    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      155.922       17.781    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    14903.686     3325.205    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     3784.776     1054.131    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5        5.624        0.130    ops/s
       
       2
       
       Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
       r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   112663.216      899.005    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5      899.737        9.460    ops/s
       r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5        0.999        0.100    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16087325.336   783206.227    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   156747.025     4880.489    ops/s
       r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      156.645        3.810    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    15958.711      673.895    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5      884.624       47.692    ops/s
       r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5        1.173        0.100    ops/s
        
    • SIZE

      public static final int SIZE
    • SPSC_POOL

      public static final ObjectPool<Queue<Object>> SPSC_POOL
    • SPMC_POOL

      public static final ObjectPool<Queue<Object>> SPMC_POOL
  • Constructor Details

    • RxRingBuffer

      private RxRingBuffer(Queue<Object> queue, int size)
    • RxRingBuffer

      private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size)
    • RxRingBuffer

      RxRingBuffer()
  • Method Details

    • getSpscInstance

      public static RxRingBuffer getSpscInstance()
    • getSpmcInstance

      public static RxRingBuffer getSpmcInstance()
    • release

      public void release()
    • unsubscribe

      public void unsubscribe()
      Description copied from interface: Subscription
      Stops the receipt of notifications on the Subscriber that was registered when this Subscription was received.

      This allows unregistering an Subscriber before it has finished receiving all events (i.e. before onCompleted is called).

      Specified by:
      unsubscribe in interface Subscription
    • onNext

      public void onNext(Object o) throws MissingBackpressureException
      Parameters:
      o -
      Throws:
      MissingBackpressureException - if more onNext are sent than have been requested
    • onCompleted

      public void onCompleted()
    • onError

      public void onError(Throwable t)
    • available

      public int available()
    • capacity

      public int capacity()
    • count

      public int count()
    • isEmpty

      public boolean isEmpty()
    • poll

      public Object poll()
    • peek

      public Object peek()
    • isCompleted

      public boolean isCompleted(Object o)
    • isError

      public boolean isError(Object o)
    • getValue

      public Object getValue(Object o)
    • accept

      public boolean accept(Object o, Observer child)
    • asError

      public Throwable asError(Object o)
    • isUnsubscribed

      public boolean isUnsubscribed()
      Description copied from interface: Subscription
      Indicates whether this Subscription is currently unsubscribed.
      Specified by:
      isUnsubscribed in interface Subscription
      Returns:
      true if this Subscription is currently unsubscribed, false otherwise