Class BackpressureUtils

java.lang.Object
rx.internal.operators.BackpressureUtils

public final class BackpressureUtils extends Object
Utility functions for use with backpressure.
  • Field Summary

    Fields
    Modifier and Type
    Field
    Description
    (package private) static final long
    Masks the most significant bit, i.e., 0x8000_0000_0000_0000L.
    (package private) static final long
    Masks the request amount bits, i.e., 0x7FFF_FFFF_FFFF_FFFF.
  • Constructor Summary

    Constructors
    Modifier
    Constructor
    Description
    private
    Utility class, no instances.
  • Method Summary

    Modifier and Type
    Method
    Description
    static long
    addCap(long a, long b)
    Adds two positive longs and caps the result at Long.MAX_VALUE.
    static <T> long
    getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n)
    Deprecated.
    Android has issues with reflection-based atomics
    static long
    getAndAddRequest(AtomicLong requested, long n)
    Adds n (not validated) to requested and returns the value prior to addition once the addition is successful (uses CAS semantics).
    static long
    multiplyCap(long a, long b)
    Multiplies two positive longs and caps the result at Long.MAX_VALUE.
    static <T, R> void
    postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T,? extends R> exitTransform)
    Signals the completion of the main sequence and switches to post-completion replay mode and allows exit transformation on the queued values.
    static <T> void
    postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super T> actual)
    Signals the completion of the main sequence and switches to post-completion replay mode.
    (package private) static <T, R> void
    postCompleteDrain(AtomicLong requested, Queue<T> queue, Subscriber<? super R> subscriber, Func1<? super T,? extends R> exitTransform)
    Drains the queue based on the outstanding requests in post-completed mode (only!) and allows exit transformation on the queued values.
    static <T, R> boolean
    postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T,? extends R> exitTransform)
    Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests and allows exit transformation on the queued values.
    static <T> boolean
    postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super T> actual)
    Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
    static long
    produced(AtomicLong requested, long n)
    Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
    static boolean
    validate(long n)
    Validates the requested amount and returns true if it is positive.

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Field Details

    • COMPLETED_MASK

      static final long COMPLETED_MASK
      Masks the most significant bit, i.e., 0x8000_0000_0000_0000L.
      See Also:
    • REQUESTED_MASK

      static final long REQUESTED_MASK
      Masks the request amount bits, i.e., 0x7FFF_FFFF_FFFF_FFFF.
      See Also:
  • Constructor Details

    • BackpressureUtils

      private BackpressureUtils()
      Utility class, no instances.
  • Method Details

    • getAndAddRequest

      @Deprecated public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n)
      Deprecated.
      Android has issues with reflection-based atomics
      Adds n to requested field and returns the value prior to addition once the addition is successful (uses CAS semantics). If overflows then sets requested field to Long.MAX_VALUE.
      Type Parameters:
      T - the type of the target object on which the field updater operates
      Parameters:
      requested - atomic field updater for a request count
      object - contains the field updated by the updater
      n - the number of requests to add to the requested count
      Returns:
      requested value just prior to successful addition
    • getAndAddRequest

      public static long getAndAddRequest(AtomicLong requested, long n)
      Adds n (not validated) to requested and returns the value prior to addition once the addition is successful (uses CAS semantics). If overflows then sets requested field to Long.MAX_VALUE.
      Parameters:
      requested - atomic long that should be updated
      n - the number of requests to add to the requested count, positive (not validated)
      Returns:
      requested value just prior to successful addition
    • multiplyCap

      public static long multiplyCap(long a, long b)
      Multiplies two positive longs and caps the result at Long.MAX_VALUE.
      Parameters:
      a - the first value
      b - the second value
      Returns:
      the capped product of a and b
    • addCap

      public static long addCap(long a, long b)
      Adds two positive longs and caps the result at Long.MAX_VALUE.
      Parameters:
      a - the first value
      b - the second value
      Returns:
      the capped sum of a and b
    • postCompleteDone

      public static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super T> actual)
      Signals the completion of the main sequence and switches to post-completion replay mode.

      Don't modify the queue after calling this method!

      Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

      The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't allowed.

      Type Parameters:
      T - the value type to emit
      Parameters:
      requested - the holder of current requested amount
      queue - the queue holding values to be emitted after completion
      actual - the subscriber to receive the values
    • postCompleteRequest

      public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super T> actual)
      Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.

      Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

      Type Parameters:
      T - the value type to emit
      Parameters:
      requested - the holder of current requested amount
      n - the value requested;
      queue - the queue holding values to be emitted after completion
      actual - the subscriber to receive the values
      Returns:
      true if in the active mode and the request amount of n can be relayed to upstream, false if in the post-completed mode and the queue is draining.
    • postCompleteDone

      public static <T, R> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T,? extends R> exitTransform)
      Signals the completion of the main sequence and switches to post-completion replay mode and allows exit transformation on the queued values.

      Don't modify the queue after calling this method!

      Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

      The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't allowed.

      Type Parameters:
      T - the value type in the queue
      R - the value type to emit
      Parameters:
      requested - the holder of current requested amount
      queue - the queue holding values to be emitted after completion
      actual - the subscriber to receive the values
      exitTransform - the transformation to apply on the dequeued value to get the value to be emitted
    • postCompleteRequest

      public static <T, R> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T,? extends R> exitTransform)
      Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests and allows exit transformation on the queued values.

      Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

      Type Parameters:
      T - the value type in the queue
      R - the value type to emit
      Parameters:
      requested - the holder of current requested amount
      n - the value requested;
      queue - the queue holding values to be emitted after completion
      actual - the subscriber to receive the values
      exitTransform - the transformation to apply on the dequeued value to get the value to be emitted
      Returns:
      true if in the active mode and the request amount of n can be relayed to upstream, false if in the post-completed mode and the queue is draining.
    • postCompleteDrain

      static <T, R> void postCompleteDrain(AtomicLong requested, Queue<T> queue, Subscriber<? super R> subscriber, Func1<? super T,? extends R> exitTransform)
      Drains the queue based on the outstanding requests in post-completed mode (only!) and allows exit transformation on the queued values.
      Type Parameters:
      T - the value type in the queue
      R - the value type to emit
      Parameters:
      requested - the holder of current requested amount
      queue - the queue holding values to be emitted after completion
      subscriber - the subscriber to receive the values
      exitTransform - the transformation to apply on the dequeued value to get the value to be emitted
    • produced

      public static long produced(AtomicLong requested, long n)
      Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
      Parameters:
      requested - the requested amount holder
      n - the value to subtract from the requested amount, has to be positive (not verified)
      Returns:
      the new requested amount
      Throws:
      IllegalStateException - if n is greater than the current requested amount, which indicates a bug in the request accounting logic
    • validate

      public static boolean validate(long n)
      Validates the requested amount and returns true if it is positive.
      Parameters:
      n - the requested amount
      Returns:
      true if n is positive
      Throws:
      IllegalArgumentException - if n is negative