Package rx.subjects

Class ReplaySubject<T>

java.lang.Object
rx.Observable<T>
rx.subjects.Subject<T,T>
rx.subjects.ReplaySubject<T>
Type Parameters:
T - the type of items observed and emitted by the Subject
All Implemented Interfaces:
Observer<T>

public final class ReplaySubject<T> extends Subject<T,T>
Subject that buffers all items it observes and replays them to any Observer that subscribes.

Example usage:

 

  ReplaySubject<Object> subject = ReplaySubject.create();
  subject.onNext("one");
  subject.onNext("two");
  subject.onNext("three");
  subject.onCompleted();

  // both of the following will get the onNext/onCompleted calls from above
  subject.subscribe(observer1);
  subject.subscribe(observer2);

   
  • Field Details

    • state

      The state storing the history and the references.
    • EMPTY_ARRAY

      private static final Object[] EMPTY_ARRAY
      An empty array to trigger getValues() to return a new array.
  • Constructor Details

  • Method Details

    • create

      public static <T> ReplaySubject<T> create()
      Creates an unbounded replay subject.

      The internal buffer is backed by an ArrayList and starts with an initial capacity of 16. Once the number of items reaches this capacity, it will grow as necessary (usually by 50%). However, as the number of items grows, this causes frequent array reallocation and copying, and may hurt performance and latency. This can be avoided with the create(int) overload which takes an initial capacity parameter and can be tuned to reduce the array reallocation frequency as needed.

      Type Parameters:
      T - the type of items observed and emitted by the Subject
      Returns:
      the created subject
    • create

      public static <T> ReplaySubject<T> create(int capacity)
      Creates an unbounded replay subject with the specified initial buffer capacity.

      Use this method to avoid excessive array reallocation while the internal buffer grows to accommodate new items. For example, if you know that the buffer will hold 32k items, you can ask the ReplaySubject to preallocate its internal array with a capacity to hold that many items. Once the items start to arrive, the internal array won't need to grow, creating less garbage and no overhead due to frequent array-copying.

      Type Parameters:
      T - the type of items observed and emitted by the Subject
      Parameters:
      capacity - the initial buffer capacity
      Returns:
      the created subject
    • createUnbounded

      static <T> ReplaySubject<T> createUnbounded()
      Creates an unbounded replay subject with the bounded-implementation for testing purposes.

      This variant behaves like the regular unbounded ReplaySubject created via create() but uses the structures of the bounded-implementation. This is by no means intended for the replacement of the original, array-backed and unbounded ReplaySubject due to the additional overhead of the linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior of the bounded implementations without the interference of the eviction policies.

      Type Parameters:
      T - the type of items observed and emitted by the Subject
      Returns:
      the created subject
    • createUnboundedTime

      static <T> ReplaySubject<T> createUnboundedTime()
      Creates an unbounded replay subject with the time-bounded-implementation for testing purposes.

      This variant behaves like the regular unbounded ReplaySubject created via create() but uses the structures of the bounded-implementation. This is by no means intended for the replacement of the original, array-backed and unbounded ReplaySubject due to the additional overhead of the linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior of the bounded implementations without the interference of the eviction policies.

      Type Parameters:
      T - the type of items observed and emitted by the Subject
      Returns:
      the created subject
    • createWithSize

      public static <T> ReplaySubject<T> createWithSize(int size)
      Creates a size-bounded replay subject.

      In this setting, the ReplaySubject holds at most size items in its internal buffer and discards the oldest item.

      When observers subscribe to a terminated ReplaySubject, they are guaranteed to see at most size onNext events followed by a termination event.

      If an observer subscribes while the ReplaySubject is active, it will observe all items in the buffer at that point in time and each item observed afterwards, even if the buffer evicts items due to the size constraint in the mean time. In other words, once an Observer subscribes, it will receive items without gaps in the sequence.

      Type Parameters:
      T - the type of items observed and emitted by the Subject
      Parameters:
      size - the maximum number of buffered items
      Returns:
      the created subject
    • createWithTime

      public static <T> ReplaySubject<T> createWithTime(long time, TimeUnit unit, Scheduler scheduler)
      Creates a time-bounded replay subject.

      In this setting, the ReplaySubject internally tags each observed item with a timestamp value supplied by the Scheduler and keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination event, leaving the buffer empty.

      Once the subject is terminated, observers subscribing to it will receive items that remained in the buffer after the terminal event, regardless of their age.

      If an observer subscribes while the ReplaySubject is active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an observer subscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.

      Note that terminal notifications (onError and onCompleted) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onCompleted notification arrives at T=10. If an observer subscribes at T=11, it will find an empty ReplaySubject with just an onCompleted notification.

      Type Parameters:
      T - the type of items observed and emitted by the Subject
      Parameters:
      time - the maximum age of the contained items
      unit - the time unit of time
      scheduler - the Scheduler that provides the current time
      Returns:
      the created subject
    • createWithTimeAndSize

      public static <T> ReplaySubject<T> createWithTimeAndSize(long time, TimeUnit unit, int size, Scheduler scheduler)
      Creates a time- and size-bounded replay subject.

      In this setting, the ReplaySubject internally tags each received item with a timestamp value supplied by the Scheduler and holds at most size items in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches its size limit.

      When observers subscribe to a terminated ReplaySubject, they observe the items that remained in the buffer after the terminal notification, regardless of their age, but at most size items.

      If an observer subscribes while the ReplaySubject is active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an observer subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.

      Note that terminal notifications (onError and onCompleted) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onCompleted notification arrives at T=10. If an observer subscribes at T=11, it will find an empty ReplaySubject with just an onCompleted notification.

      Type Parameters:
      T - the type of items observed and emitted by the Subject
      Parameters:
      time - the maximum age of the contained items
      unit - the time unit of time
      size - the maximum number of buffered items
      scheduler - the Scheduler that provides the current time
      Returns:
      the created subject
    • onNext

      public void onNext(T t)
      Description copied from interface: Observer
      Provides the Observer with a new item to observe.

      The Observable may call this method 0 or more times.

      The Observable will not call this method again after it calls either Observer.onCompleted() or Observer.onError(java.lang.Throwable).

      Parameters:
      t - the item emitted by the Observable
    • onError

      public void onError(Throwable e)
      Description copied from interface: Observer
      Notifies the Observer that the Observable has experienced an error condition.

      If the Observable calls this method, it will not thereafter call Observer.onNext(T) or Observer.onCompleted().

      Parameters:
      e - the exception encountered by the Observable
    • onCompleted

      public void onCompleted()
      Description copied from interface: Observer
      Notifies the Observer that the Observable has finished sending push-based notifications.

      The Observable will not call this method if it calls Observer.onError(java.lang.Throwable).

    • subscriberCount

      int subscriberCount()
      Returns:
      Returns the number of subscribers.
    • hasObservers

      public boolean hasObservers()
      Description copied from class: Subject
      Indicates whether the Subject has Observers subscribed to it.
      Specified by:
      hasObservers in class Subject<T,T>
      Returns:
      true if there is at least one Observer subscribed to this Subject, false otherwise
    • hasThrowable

      @Beta public boolean hasThrowable()
      Check if the Subject has terminated with an exception.
      Returns:
      true if the subject has received a throwable through onError.
    • hasCompleted

      @Beta public boolean hasCompleted()
      Check if the Subject has terminated normally.
      Returns:
      true if the subject completed normally via onCompleted
    • getThrowable

      @Beta public Throwable getThrowable()
      Returns the Throwable that terminated the Subject.
      Returns:
      the Throwable that terminated the Subject or null if the subject hasn't terminated yet or it terminated normally.
    • size

      @Beta public int size()
      Returns the current number of items (non-terminal events) available for replay.
      Returns:
      the number of items available
    • hasAnyValue

      @Beta public boolean hasAnyValue()
      Returns:
      true if the Subject holds at least one non-terminal event available for replay
    • hasValue

      @Beta public boolean hasValue()
    • getValues

      @Beta public T[] getValues(T[] a)
      Returns a snapshot of the currently buffered non-terminal events into the provided a array or creates a new array if it has not enough capacity.
      Parameters:
      a - the array to fill in
      Returns:
      the array a if it had enough capacity or a new array containing the available values
    • getValues

      @Beta public Object[] getValues()
      Returns a snapshot of the currently buffered non-terminal events.

      The operation is threadsafe.

      Returns:
      a snapshot of the currently buffered non-terminal events.
      Since:
      (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
    • getValue

      @Beta public T getValue()