Class BackpressureUtils
-
Field Summary
FieldsModifier and TypeFieldDescription(package private) static final long
Masks the most significant bit, i.e., 0x8000_0000_0000_0000L.(package private) static final long
Masks the request amount bits, i.e., 0x7FFF_FFFF_FFFF_FFFF. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic long
addCap
(long a, long b) Adds two positive longs and caps the result at Long.MAX_VALUE.static <T> long
getAndAddRequest
(AtomicLongFieldUpdater<T> requested, T object, long n) Deprecated.Android has issues with reflection-based atomicsstatic long
getAndAddRequest
(AtomicLong requested, long n) Addsn
(not validated) torequested
and returns the value prior to addition once the addition is successful (uses CAS semantics).static long
multiplyCap
(long a, long b) Multiplies two positive longs and caps the result at Long.MAX_VALUE.static <T,
R> void postCompleteDone
(AtomicLong requested, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T, ? extends R> exitTransform) Signals the completion of the main sequence and switches to post-completion replay mode and allows exit transformation on the queued values.static <T> void
postCompleteDone
(AtomicLong requested, Queue<T> queue, Subscriber<? super T> actual) Signals the completion of the main sequence and switches to post-completion replay mode.(package private) static <T,
R> void postCompleteDrain
(AtomicLong requested, Queue<T> queue, Subscriber<? super R> subscriber, Func1<? super T, ? extends R> exitTransform) Drains the queue based on the outstanding requests in post-completed mode (only!) and allows exit transformation on the queued values.static <T,
R> boolean postCompleteRequest
(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T, ? extends R> exitTransform) Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests and allows exit transformation on the queued values.static <T> boolean
postCompleteRequest
(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super T> actual) Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.static long
produced
(AtomicLong requested, long n) Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.static boolean
validate
(long n) Validates the requested amount and returns true if it is positive.
-
Field Details
-
COMPLETED_MASK
static final long COMPLETED_MASKMasks the most significant bit, i.e., 0x8000_0000_0000_0000L.- See Also:
-
REQUESTED_MASK
static final long REQUESTED_MASKMasks the request amount bits, i.e., 0x7FFF_FFFF_FFFF_FFFF.- See Also:
-
-
Constructor Details
-
BackpressureUtils
private BackpressureUtils()Utility class, no instances.
-
-
Method Details
-
getAndAddRequest
@Deprecated public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) Deprecated.Android has issues with reflection-based atomicsAddsn
torequested
field and returns the value prior to addition once the addition is successful (uses CAS semantics). If overflows then setsrequested
field toLong.MAX_VALUE
.- Type Parameters:
T
- the type of the target object on which the field updater operates- Parameters:
requested
- atomic field updater for a request countobject
- contains the field updated by the updatern
- the number of requests to add to the requested count- Returns:
- requested value just prior to successful addition
-
getAndAddRequest
Addsn
(not validated) torequested
and returns the value prior to addition once the addition is successful (uses CAS semantics). If overflows then setsrequested
field toLong.MAX_VALUE
.- Parameters:
requested
- atomic long that should be updatedn
- the number of requests to add to the requested count, positive (not validated)- Returns:
- requested value just prior to successful addition
-
multiplyCap
public static long multiplyCap(long a, long b) Multiplies two positive longs and caps the result at Long.MAX_VALUE.- Parameters:
a
- the first valueb
- the second value- Returns:
- the capped product of a and b
-
addCap
public static long addCap(long a, long b) Adds two positive longs and caps the result at Long.MAX_VALUE.- Parameters:
a
- the first valueb
- the second value- Returns:
- the capped sum of a and b
-
postCompleteDone
public static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super T> actual) Signals the completion of the main sequence and switches to post-completion replay mode.Don't modify the queue after calling this method!
Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't allowed.
- Type Parameters:
T
- the value type to emit- Parameters:
requested
- the holder of current requested amountqueue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the values
-
postCompleteRequest
public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super T> actual) Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
- Type Parameters:
T
- the value type to emit- Parameters:
requested
- the holder of current requested amountn
- the value requested;queue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the values- Returns:
- true if in the active mode and the request amount of n can be relayed to upstream, false if in the post-completed mode and the queue is draining.
-
postCompleteDone
public static <T,R> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T, ? extends R> exitTransform) Signals the completion of the main sequence and switches to post-completion replay mode and allows exit transformation on the queued values.Don't modify the queue after calling this method!
Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't allowed.
- Type Parameters:
T
- the value type in the queueR
- the value type to emit- Parameters:
requested
- the holder of current requested amountqueue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the valuesexitTransform
- the transformation to apply on the dequeued value to get the value to be emitted
-
postCompleteRequest
public static <T,R> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T, ? extends R> exitTransform) Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests and allows exit transformation on the queued values.Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
- Type Parameters:
T
- the value type in the queueR
- the value type to emit- Parameters:
requested
- the holder of current requested amountn
- the value requested;queue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the valuesexitTransform
- the transformation to apply on the dequeued value to get the value to be emitted- Returns:
- true if in the active mode and the request amount of n can be relayed to upstream, false if in the post-completed mode and the queue is draining.
-
postCompleteDrain
static <T,R> void postCompleteDrain(AtomicLong requested, Queue<T> queue, Subscriber<? super R> subscriber, Func1<? super T, ? extends R> exitTransform) Drains the queue based on the outstanding requests in post-completed mode (only!) and allows exit transformation on the queued values.- Type Parameters:
T
- the value type in the queueR
- the value type to emit- Parameters:
requested
- the holder of current requested amountqueue
- the queue holding values to be emitted after completionsubscriber
- the subscriber to receive the valuesexitTransform
- the transformation to apply on the dequeued value to get the value to be emitted
-
produced
Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.- Parameters:
requested
- the requested amount holdern
- the value to subtract from the requested amount, has to be positive (not verified)- Returns:
- the new requested amount
- Throws:
IllegalStateException
- if n is greater than the current requested amount, which indicates a bug in the request accounting logic
-
validate
public static boolean validate(long n) Validates the requested amount and returns true if it is positive.- Parameters:
n
- the requested amount- Returns:
- true if n is positive
- Throws:
IllegalArgumentException
- if n is negative
-