Package rx.internal.operators
Class OperatorMerge.MergeSubscriber<T>
- Type Parameters:
T
- the value type
- All Implemented Interfaces:
Observer<Observable<? extends T>>
,Subscription
- Enclosing class:
OperatorMerge<T>
The subscriber that observes Observables.
-
Field Summary
FieldsModifier and TypeFieldDescription(package private) final Subscriber
<? super T> (package private) final boolean
(package private) boolean
(package private) boolean
Guarded by this.(package private) static final OperatorMerge.InnerSubscriber<?>[]
An empty array to avoid creating new empty arrays in removeInner.(package private) ConcurrentLinkedQueue
<Throwable> Due to the emission loop, we need to store errors somewhere if !delayErrors.(package private) final Object
(package private) OperatorMerge.InnerSubscriber<?>[]
Copy-on-write array, guarded by innerGuard.(package private) long
Which was the last InnerSubscriber that emitted? Accessed if emitting == true.(package private) int
What was its index in the innerSubscribers array? Accessed if emitting == true.(package private) final int
(package private) boolean
Guarded by this.(package private) final NotificationLite
<T> (package private) OperatorMerge.MergeProducer
<T> (package private) int
(package private) final int
(package private) CompositeSubscription
Tracks the active subscriptions to sources.(package private) long
Used to generate unique InnerSubscriber IDs. -
Constructor Summary
ConstructorsConstructorDescriptionMergeSubscriber
(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) -
Method Summary
Modifier and TypeMethodDescription(package private) void
addInner
(OperatorMerge.InnerSubscriber<T> inner) (package private) boolean
Check if the operator reached some terminal state: child unsubscribed, an error was reported and we don't delay errors.(package private) void
emit()
(package private) void
(package private) void
emitLoop()
The standard emission loop serializing events and requests.protected void
emitScalar
(OperatorMerge.InnerSubscriber<T> subscriber, T value, long r) protected void
emitScalar
(T value, long r) (package private) CompositeSubscription
void
Notifies the Observer that theObservable
has finished sending push-based notifications.void
Notifies the Observer that theObservable
has experienced an error condition.void
onNext
(Observable<? extends T> t) Provides the Observer with a new item to observe.protected void
queueScalar
(OperatorMerge.InnerSubscriber<T> subscriber, T value) protected void
queueScalar
(T value) (package private) void
private void
void
requestMore
(long n) (package private) void
tryEmit
(OperatorMerge.InnerSubscriber<T> subscriber, T value) Tries to emit the value directly to the child if no concurrent emission is happening at the moment.(package private) void
Tries to emit the value directly to the child if no concurrent emission is happening at the moment.Methods inherited from class rx.Subscriber
add, isUnsubscribed, onStart, request, setProducer, unsubscribe
-
Field Details
-
child
-
delayErrors
final boolean delayErrors -
maxConcurrent
final int maxConcurrent -
producer
OperatorMerge.MergeProducer<T> producer -
queue
-
subscriptions
Tracks the active subscriptions to sources. -
errors
Due to the emission loop, we need to store errors somewhere if !delayErrors. -
nl
-
done
volatile boolean done -
emitting
boolean emittingGuarded by this. -
missed
boolean missedGuarded by this. -
innerGuard
-
innerSubscribers
Copy-on-write array, guarded by innerGuard. -
uniqueId
long uniqueIdUsed to generate unique InnerSubscriber IDs. Modified from onNext only. -
lastId
long lastIdWhich was the last InnerSubscriber that emitted? Accessed if emitting == true. -
lastIndex
int lastIndexWhat was its index in the innerSubscribers array? Accessed if emitting == true. -
EMPTY
An empty array to avoid creating new empty arrays in removeInner. -
scalarEmissionLimit
final int scalarEmissionLimit -
scalarEmissionCount
int scalarEmissionCount
-
-
Constructor Details
-
MergeSubscriber
-
-
Method Details
-
getOrCreateErrorQueue
-
getOrCreateComposite
CompositeSubscription getOrCreateComposite() -
onNext
Description copied from interface:Observer
Provides the Observer with a new item to observe.The
Observable
may call this method 0 or more times.The
Observable
will not call this method again after it calls eitherObserver.onCompleted()
orObserver.onError(java.lang.Throwable)
.- Parameters:
t
- the item emitted by the Observable
-
emitEmpty
void emitEmpty() -
reportError
private void reportError() -
onError
Description copied from interface:Observer
Notifies the Observer that theObservable
has experienced an error condition.If the
Observable
calls this method, it will not thereafter callObserver.onNext(T)
orObserver.onCompleted()
.- Parameters:
e
- the exception encountered by the Observable
-
onCompleted
public void onCompleted()Description copied from interface:Observer
Notifies the Observer that theObservable
has finished sending push-based notifications.The
Observable
will not call this method if it callsObserver.onError(java.lang.Throwable)
. -
addInner
-
removeInner
-
tryEmit
Tries to emit the value directly to the child if no concurrent emission is happening at the moment.Since the scalar-value queue optimization applies to both the main source and the inner subscribers, we handle things in a shared manner.
- Parameters:
subscriber
-value
-
-
queueScalar
-
emitScalar
-
requestMore
public void requestMore(long n) -
tryEmit
Tries to emit the value directly to the child if no concurrent emission is happening at the moment.Since the scalar-value queue optimization applies to both the main source and the inner subscribers, we handle things in a shared manner.
- Parameters:
value
-subscriber
-
-
queueScalar
-
emitScalar
-
emit
void emit() -
emitLoop
void emitLoop()The standard emission loop serializing events and requests. -
checkTerminate
boolean checkTerminate()Check if the operator reached some terminal state: child unsubscribed, an error was reported and we don't delay errors.- Returns:
- true if the child unsubscribed or there are errors available and merge doesn't delay errors.
-