Package rx

Class Scheduler

java.lang.Object
rx.Scheduler
Direct Known Subclasses:
CachedThreadScheduler, EventLoopsScheduler, ExecutorScheduler, ImmediateScheduler, ImmediateScheduler, NewThreadScheduler, NewThreadScheduler, SchedulerWhen, TestScheduler, TrampolineScheduler, TrampolineScheduler

public abstract class Scheduler extends Object
A Scheduler is an object that schedules units of work. You can find common implementations of this class in Schedulers.
  • Field Details

    • CLOCK_DRIFT_TOLERANCE_NANOS

      static final long CLOCK_DRIFT_TOLERANCE_NANOS
      The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.

      The associated system parameter, rx.scheduler.drift-tolerance, expects its value in minutes.

  • Constructor Details

    • Scheduler

      public Scheduler()
  • Method Details

    • createWorker

      public abstract Scheduler.Worker createWorker()
      Retrieves or creates a new Scheduler.Worker that represents serial execution of actions.

      When work is completed it should be unsubscribed using Subscription.unsubscribe().

      Work on a Scheduler.Worker is guaranteed to be sequential.

      Returns:
      a Worker representing a serial queue of actions to be executed
    • now

      public long now()
      Gets the current time, in milliseconds, according to this Scheduler.
      Returns:
      the scheduler's notion of current absolute time in milliseconds
    • when

      Allows the use of operators for controlling the timing around when actions scheduled on workers are actually done. This makes it possible to layer additional behavior on this Scheduler. The only parameter is a function that flattens an Observable of Observable of Completables into just one Completable. There must be a chain of operators connecting the returned value to the source Observable otherwise any work scheduled on the returned Scheduler will not be executed.

      When createWorker() is invoked a Observable of Completables is onNext'd to the combinator to be flattened. If the inner Observable is not immediately subscribed to an calls to Scheduler.Worker.schedule(rx.functions.Action0) are buffered. Once the Observable is subscribed to actions are then onNext'd as Completables.

      Finally the actions scheduled on the parent Scheduler when the inner most Completables are subscribed to.

      When the Scheduler.Worker is unsubscribed the Completable emits an onComplete and triggers any behavior in the flattening operator. The Observable and all Completables give to the flattening function never onError.

      Limit the amount concurrency two at a time without creating a new fix size thread pool:

       Scheduler limitSched = Schedulers.computation().when(workers -> {
              // use merge max concurrent to limit the number of concurrent
              // callbacks two at a time
              return Completable.merge(Observable.merge(workers), 2);
       });
       

      This is a slightly different way to limit the concurrency but it has some interesting benefits and drawbacks to the method above. It works by limited the number of concurrent Scheduler.Workers rather than individual actions. Generally each Observable uses its own Scheduler.Worker. This means that this will essentially limit the number of concurrent subscribes. The danger comes from using operators like Observable.zip(Observable, Observable, rx.functions.Func2) where subscribing to the first Observable could deadlock the subscription to the second.

       Scheduler limitSched = Schedulers.computation().when(workers -> {
              // use merge max concurrent to limit the number of concurrent
              // Observables two at a time
              return Completable.merge(Observable.merge(workers, 2));
       });
       
      Slowing down the rate to no more than than 1 a second. This suffers from the same problem as the one above I could find an Observable operator that limits the rate without dropping the values (aka leaky bucket algorithm).
       Scheduler slowSched = Schedulers.computation().when(workers -> {
              // use concatenate to make each worker happen one at a time.
              return Completable.concat(workers.map(actions -> {
                      // delay the starting of the next worker by 1 second.
                      return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
              }));
       });
       
      Type Parameters:
      S - a Scheduler and a Subscription
      Parameters:
      combine - the function that takes a two-level nested Observable sequence of a Completable and returns the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
      Returns:
      the Scheduler with the customized execution behavior