Class AsyncOnSubscribe<S,T>

java.lang.Object
rx.observables.AsyncOnSubscribe<S,T>
Type Parameters:
S - the type of the user-define state used in generateState(S) , next(S, Long, Observer), and onUnsubscribe(S).
T - the type of Subscribers that will be compatible with this.
All Implemented Interfaces:
Action, Action1<Subscriber<? super T>>, Function, Observable.OnSubscribe<T>
Direct Known Subclasses:
AsyncOnSubscribe.AsyncOnSubscribeImpl

@Experimental public abstract class AsyncOnSubscribe<S,T> extends Object implements Observable.OnSubscribe<T>
A utility class to create OnSubscribe<T> functions that respond correctly to back pressure requests from subscribers. This is an improvement over Observable.create(OnSubscribe) which does not provide any means of managing back pressure requests out-of-the-box. This variant of an OnSubscribe function allows for the asynchronous processing of requests.
  • Constructor Details

    • AsyncOnSubscribe

      public AsyncOnSubscribe()
  • Method Details

    • generateState

      protected abstract S generateState()
      Executed once when subscribed to by a subscriber (via call(Subscriber)) to produce a state value. This value is passed into next(S state, Observer observer) on the first iteration. Subsequent iterations of next will receive the state returned by the previous invocation of next.
      Returns:
      the initial state value
    • next

      protected abstract S next(S state, long requested, Observer<Observable<? extends T>> observer)
      Called to produce data to the downstream subscribers. To emit data to a downstream subscriber call observer.onNext(t). To signal an error condition call observer.onError(throwable) or throw an Exception. To signal the end of a data stream call observer.onCompleted(). Implementations of this method must follow the following rules.
      • Must not call observer.onNext(t) more than 1 time per invocation.
      • Must not call observer.onNext(t) concurrently.
      The value returned from an invocation of this method will be passed in as the state argument of the next invocation of this method.
      Parameters:
      state - the state value (from generateState() on the first invocation or the previous invocation of this method.
      requested - the amount of data requested. An observable emitted to the observer should not exceed this amount.
      observer - the observer of data emitted by
      Returns:
      the next iteration's state value
    • onUnsubscribe

      protected void onUnsubscribe(S state)
      Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed. This method will be invoked exactly once.
      Parameters:
      state - the last state value returned from next(S, Long, Observer) or generateState() at the time when a terminal event is emitted from next(Object, long, Observer) or unsubscribing.
    • createSingleState

      @Experimental public static <S, T> AsyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action3<? super S,Long,? super Observer<Observable<? extends T>>> next)
      Generates a synchronous AsyncOnSubscribe that calls the provided next function to generate data to downstream subscribers.
      Type Parameters:
      S - the type of the associated state with each Subscriber
      T - the type of the generated values
      Parameters:
      generator - generates the initial state value (see generateState())
      next - produces data to the downstream subscriber (see next(S, long, Observer))
      Returns:
      an AsyncOnSubscribe that emits data in a protocol compatible with back-pressure.
    • createSingleState

      @Experimental public static <S, T> AsyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action3<? super S,Long,? super Observer<Observable<? extends T>>> next, Action1<? super S> onUnsubscribe)
      Generates a synchronous AsyncOnSubscribe that calls the provided next function to generate data to downstream subscribers. This overload creates a AsyncOnSubscribe without an explicit clean up step.
      Type Parameters:
      S - the type of the associated state with each Subscriber
      T - the type of the generated values
      Parameters:
      generator - generates the initial state value (see generateState())
      next - produces data to the downstream subscriber (see next(S, long, Observer))
      onUnsubscribe - clean up behavior (see onUnsubscribe(S))
      Returns:
      an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
    • createStateful

      @Experimental public static <S, T> AsyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func3<? super S,Long,? super Observer<Observable<? extends T>>,? extends S> next, Action1<? super S> onUnsubscribe)
      Generates a synchronous AsyncOnSubscribe that calls the provided next function to generate data to downstream subscribers.
      Type Parameters:
      S - the type of the associated state with each Subscriber
      T - the type of the generated values
      Parameters:
      generator - generates the initial state value (see generateState())
      next - produces data to the downstream subscriber (see next(S, long, Observer))
      onUnsubscribe - clean up behavior (see onUnsubscribe(S))
      Returns:
      an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
    • createStateful

      @Experimental public static <S, T> AsyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func3<? super S,Long,? super Observer<Observable<? extends T>>,? extends S> next)
      Generates a synchronous AsyncOnSubscribe that calls the provided next function to generate data to downstream subscribers.
      Type Parameters:
      S - the type of the associated state with each Subscriber
      T - the type of the generated values
      Parameters:
      generator - generates the initial state value (see generateState())
      next - produces data to the downstream subscriber (see next(S, long, Observer))
      Returns:
      an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
    • createStateless

      @Experimental public static <T> AsyncOnSubscribe<Void,T> createStateless(Action2<Long,? super Observer<Observable<? extends T>>> next)
      Generates a synchronous AsyncOnSubscribe that calls the provided next function to generate data to downstream subscribers. This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state value. This should be used when the next function closes over it's state.
      Type Parameters:
      T - the type of the generated values
      Parameters:
      next - produces data to the downstream subscriber (see next(S, long, Observer))
      Returns:
      an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
    • createStateless

      @Experimental public static <T> AsyncOnSubscribe<Void,T> createStateless(Action2<Long,? super Observer<Observable<? extends T>>> next, Action0 onUnsubscribe)
      Generates a synchronous AsyncOnSubscribe that calls the provided next function to generate data to downstream subscribers. This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state value. This should be used when the next function closes over it's state.
      Type Parameters:
      T - the type of the generated values
      Parameters:
      next - produces data to the downstream subscriber (see next(S, long, Observer))
      onUnsubscribe - clean up behavior (see onUnsubscribe(S))
      Returns:
      an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
    • call

      public final void call(Subscriber<? super T> actualSubscriber)
      Specified by:
      call in interface Action1<S>