Package rx.observables
Class SyncOnSubscribe<S,T>
java.lang.Object
rx.observables.SyncOnSubscribe<S,T>
- Type Parameters:
S
- the type of the user-define state used ingenerateState(S)
,next(S, Subscriber)
, andonUnsubscribe(S)
.T
- the type ofSubscribers
that will be compatible withthis
.
- All Implemented Interfaces:
Action
,Action1<Subscriber<? super T>>
,Function
,Observable.OnSubscribe<T>
- Direct Known Subclasses:
SyncOnSubscribe.SyncOnSubscribeImpl
@Beta
public abstract class SyncOnSubscribe<S,T>
extends Object
implements Observable.OnSubscribe<T>
A utility class to create
OnSubscribe<T>
functions that respond correctly to back
pressure requests from subscribers. This is an improvement over
Observable.create(OnSubscribe)
which does not provide
any means of managing back pressure requests out-of-the-box.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static final class
Contains the producer loop that reacts to downstream requests of work.(package private) static final class
An implementation of SyncOnSubscribe that delegatesinvalid reference
SyncOnSubscribe#next(Object, Subscriber)
generateState()
, andonUnsubscribe(Object)
to provided functions/closures. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionfinal void
call
(Subscriber<? super T> subscriber) static <S,
T> SyncOnSubscribe <S, T> createSingleState
(Func0<? extends S> generator, Action2<? super S, ? super Observer<? super T>> next) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.static <S,
T> SyncOnSubscribe <S, T> createSingleState
(Func0<? extends S> generator, Action2<? super S, ? super Observer<? super T>> next, Action1<? super S> onUnsubscribe) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.static <S,
T> SyncOnSubscribe <S, T> createStateful
(Func0<? extends S> generator, Func2<? super S, ? super Observer<? super T>, ? extends S> next) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.static <S,
T> SyncOnSubscribe <S, T> createStateful
(Func0<? extends S> generator, Func2<? super S, ? super Observer<? super T>, ? extends S> next, Action1<? super S> onUnsubscribe) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.static <T> SyncOnSubscribe
<Void, T> createStateless
(Action1<? super Observer<? super T>> next) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.static <T> SyncOnSubscribe
<Void, T> createStateless
(Action1<? super Observer<? super T>> next, Action0 onUnsubscribe) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.protected abstract S
Executed once when subscribed to by a subscriber (viacall(Subscriber)
) to produce a state value.protected abstract S
Called to produce data to the downstream subscribers.protected void
onUnsubscribe
(S state) Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed.
-
Constructor Details
-
SyncOnSubscribe
public SyncOnSubscribe()
-
-
Method Details
-
call
-
generateState
Executed once when subscribed to by a subscriber (viacall(Subscriber)
) to produce a state value. This value is passed intonext(S state, Observer
on the first iteration. Subsequent iterations ofobserver) next
will receive the state returned by the previous invocation ofnext
.- Returns:
- the initial state value
-
next
Called to produce data to the downstream subscribers. To emit data to a downstream subscriber callobserver.onNext(t)
. To signal an error condition callobserver.onError(throwable)
or throw an Exception. To signal the end of a data stream callobserver.onCompleted()
. Implementations of this method must follow the following rules.- Must not call
observer.onNext(t)
more than 1 time per invocation. - Must not call
observer.onNext(t)
concurrently.
state
argument of the next invocation of this method.- Parameters:
state
- the state value (fromgenerateState()
on the first invocation or the previous invocation of this method.observer
- the observer of data emitted by- Returns:
- the next iteration's state value
- Must not call
-
onUnsubscribe
Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed. This method will be invoked exactly once.- Parameters:
state
- the last state value prior fromgenerateState()
ornext(S, Observer<T>)
before unsubscribe.
-
createSingleState
@Beta public static <S,T> SyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action2<? super S, ? super Observer<? super T>> next) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.- Type Parameters:
S
- the type of the associated state with each SubscriberT
- the type of the generated values- Parameters:
generator
- generates the initial state value (seegenerateState()
)next
- produces data to the downstream subscriber (seenext(S, Subscriber)
)- Returns:
- a SyncOnSubscribe that emits data in a protocol compatible with back-pressure.
-
createSingleState
@Beta public static <S,T> SyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action2<? super S, ? super Observer<? super T>> next, Action1<? super S> onUnsubscribe) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers. This overload creates a SyncOnSubscribe without an explicit clean up step.- Type Parameters:
S
- the type of the associated state with each SubscriberT
- the type of the generated values- Parameters:
generator
- generates the initial state value (seegenerateState()
)next
- produces data to the downstream subscriber (seenext(S, Subscriber)
)onUnsubscribe
- clean up behavior (seeonUnsubscribe(S)
)- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateful
@Beta public static <S,T> SyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func2<? super S, ? super Observer<? super T>, ? extends S> next, Action1<? super S> onUnsubscribe) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.- Type Parameters:
S
- the type of the associated state with each SubscriberT
- the type of the generated values- Parameters:
generator
- generates the initial state value (seegenerateState()
)next
- produces data to the downstream subscriber (seenext(S, Subscriber)
)onUnsubscribe
- clean up behavior (seeonUnsubscribe(S)
)- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateful
@Beta public static <S,T> SyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func2<? super S, ? super Observer<? super T>, ? extends S> next) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.- Type Parameters:
S
- the type of the associated state with each SubscriberT
- the type of the generated values- Parameters:
generator
- generates the initial state value (seegenerateState()
)next
- produces data to the downstream subscriber (seenext(S, Subscriber)
)- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateless
@Beta public static <T> SyncOnSubscribe<Void,T> createStateless(Action1<? super Observer<? super T>> next) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers. This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state value. This should be used when thenext
function closes over it's state.- Type Parameters:
T
- the type of the generated values- Parameters:
next
- produces data to the downstream subscriber (seenext(S, Subscriber)
)- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateless
@Beta public static <T> SyncOnSubscribe<Void,T> createStateless(Action1<? super Observer<? super T>> next, Action0 onUnsubscribe) Generates a synchronousSyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers. This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state value. This should be used when thenext
function closes over it's state.- Type Parameters:
T
- the type of the generated values- Parameters:
next
- produces data to the downstream subscriber (seenext(S, Subscriber)
)onUnsubscribe
- clean up behavior (seeonUnsubscribe(S)
)- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-