Class OnSubscribePublishMulticast<T>
- Type Parameters:
T
- the input and output type
- All Implemented Interfaces:
Serializable
,Action
,Action1<Subscriber<? super T>>
,Function
,Observable.OnSubscribe<T>
,Observer<T>
,Subscription
The difference between this class and OperatorPublish is that this class doesn't consume the upstream if there are no child subscribers but waits for them to show up. Plus if the upstream source terminates, late subscribers will be immediately terminated with the same terminal event unlike OperatorPublish which just waits for the next connection.
The class extends AtomicInteger which is the work-in-progress gate for the drain-loop serializing subscriptions and child request changes.
- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static final class
The subscriber that must be used for subscribing to the upstream source.(package private) static final class
A Producer and Subscription that wraps a child Subscriber and manages its backpressure requests along with its unsubscription from the parent class. -
Field Summary
FieldsModifier and TypeFieldDescription(package private) final boolean
Delays the error delivery to happen only after all values have been consumed.(package private) boolean
Indicates the upstream has completed.(package private) static final OnSubscribePublishMulticast.PublishProducer<?>[]
Represents an empty array of subscriber wrapper, helps avoid allocating an empty array all the time.(package private) Throwable
Holds onto the upstream's exception if done is true and this field is non-null.(package private) final OnSubscribePublishMulticast.ParentSubscriber
<T> The subscriber that can be 'connected' to the upstream source.(package private) final int
The number of items to prefetch from the upstreams source.(package private) Producer
Holds the upstream producer if any, set through the parent subscriber.The prefetch queue holding onto a fixed amount of items until all all child subscribers have requested something.private static final long
(package private) OnSubscribePublishMulticast.PublishProducer<T>[]
A copy-on-write array of currently subscribed child subscribers' wrapper structure.(package private) static final OnSubscribePublishMulticast.PublishProducer<?>[]
Represents a final state for this class that prevents new subscribers from subscribing to it. -
Constructor Summary
ConstructorsConstructorDescriptionOnSubscribePublishMulticast
(int prefetch, boolean delayError) Constructor, initializes the fields -
Method Summary
Modifier and TypeMethodDescription(package private) boolean
Atomically adds the given wrapper of a child Subscriber to the subscribers array.void
call
(Subscriber<? super T> t) (package private) boolean
checkTerminated
(boolean d, boolean empty) Given the current source state, terminates all child subscribers.(package private) void
drain()
The serialization loop that determines the minimum request of all subscribers and tries to emit as many items from the queue if they are available.boolean
Indicates whether thisSubscription
is currently unsubscribed.void
Notifies the Observer that theObservable
has finished sending push-based notifications.void
Notifies the Observer that theObservable
has experienced an error condition.void
Provides the Observer with a new item to observe.(package private) void
Atomically removes the given wrapper, if present, from the subscribers array.(package private) void
Sets the main producer and issues the prefetch amount.Returns the input subscriber of this class that must be subscribed to the upstream source.(package private) OnSubscribePublishMulticast.PublishProducer<T>[]
Atomically swaps in the terminated state.void
Stops the receipt of notifications on theSubscriber
that was registered when this Subscription was received.Methods inherited from class java.util.concurrent.atomic.AtomicInteger
accumulateAndGet, addAndGet, compareAndExchange, compareAndExchangeAcquire, compareAndExchangeRelease, compareAndSet, decrementAndGet, doubleValue, floatValue, get, getAcquire, getAndAccumulate, getAndAdd, getAndDecrement, getAndIncrement, getAndSet, getAndUpdate, getOpaque, getPlain, incrementAndGet, intValue, lazySet, longValue, set, setOpaque, setPlain, setRelease, toString, updateAndGet, weakCompareAndSet, weakCompareAndSetAcquire, weakCompareAndSetPlain, weakCompareAndSetRelease, weakCompareAndSetVolatile
Methods inherited from class java.lang.Number
byteValue, shortValue
-
Field Details
-
serialVersionUID
private static final long serialVersionUID- See Also:
-
queue
The prefetch queue holding onto a fixed amount of items until all all child subscribers have requested something. -
prefetch
final int prefetchThe number of items to prefetch from the upstreams source. -
delayError
final boolean delayErrorDelays the error delivery to happen only after all values have been consumed. -
parent
The subscriber that can be 'connected' to the upstream source. -
done
volatile boolean doneIndicates the upstream has completed. -
error
Throwable errorHolds onto the upstream's exception if done is true and this field is non-null.This field must be read after done or if subscribers == TERMINATED to establish a proper happens-before.
-
producer
Holds the upstream producer if any, set through the parent subscriber. -
subscribers
A copy-on-write array of currently subscribed child subscribers' wrapper structure. -
EMPTY
Represents an empty array of subscriber wrapper, helps avoid allocating an empty array all the time. -
TERMINATED
Represents a final state for this class that prevents new subscribers from subscribing to it.
-
-
Constructor Details
-
OnSubscribePublishMulticast
public OnSubscribePublishMulticast(int prefetch, boolean delayError) Constructor, initializes the fields- Parameters:
prefetch
- the prefetch amount, > 0 requireddelayError
- delay the error delivery after the normal items?- Throws:
IllegalArgumentException
- if prefetch <= 0
-
-
Method Details
-
call
-
onNext
Description copied from interface:Observer
Provides the Observer with a new item to observe.The
Observable
may call this method 0 or more times.The
Observable
will not call this method again after it calls eitherObserver.onCompleted()
orObserver.onError(java.lang.Throwable)
. -
onError
Description copied from interface:Observer
Notifies the Observer that theObservable
has experienced an error condition.If the
Observable
calls this method, it will not thereafter callObserver.onNext(T)
orObserver.onCompleted()
. -
onCompleted
public void onCompleted()Description copied from interface:Observer
Notifies the Observer that theObservable
has finished sending push-based notifications.The
Observable
will not call this method if it callsObserver.onError(java.lang.Throwable)
.- Specified by:
onCompleted
in interfaceObserver<T>
-
setProducer
Sets the main producer and issues the prefetch amount.- Parameters:
p
- the producer to set
-
drain
void drain()The serialization loop that determines the minimum request of all subscribers and tries to emit as many items from the queue if they are available.The execution of the drain-loop is guaranteed to be thread-safe.
-
checkTerminated
boolean checkTerminated(boolean d, boolean empty) Given the current source state, terminates all child subscribers.- Parameters:
d
- the source-done indicatorempty
- the queue-emptiness indicator- Returns:
- true if the class reached its terminal state
-
terminate
OnSubscribePublishMulticast.PublishProducer<T>[] terminate()Atomically swaps in the terminated state.- Returns:
- the last set of subscribers before the state change or an empty array
-
add
Atomically adds the given wrapper of a child Subscriber to the subscribers array.- Parameters:
inner
- the wrapper- Returns:
- true if successful, false if the terminal state has been reached in the meantime
-
remove
Atomically removes the given wrapper, if present, from the subscribers array.- Parameters:
inner
- the wrapper to remove
-
subscriber
Returns the input subscriber of this class that must be subscribed to the upstream source.- Returns:
- the subscriber instance
-
unsubscribe
public void unsubscribe()Description copied from interface:Subscription
Stops the receipt of notifications on theSubscriber
that was registered when this Subscription was received.This allows unregistering an
Subscriber
before it has finished receiving all events (i.e. before onCompleted is called).- Specified by:
unsubscribe
in interfaceSubscription
-
isUnsubscribed
public boolean isUnsubscribed()Description copied from interface:Subscription
Indicates whether thisSubscription
is currently unsubscribed.- Specified by:
isUnsubscribed
in interfaceSubscription
- Returns:
true
if thisSubscription
is currently unsubscribed,false
otherwise
-