/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import rx.observables.GroupedObservable;
import rx.operators.OperationAll;
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
import rx.operators.OperationObserveOn;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
import rx.operators.OperationSubscribeOn;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTake;
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
import rx.operators.OperationToObservableList;
import rx.operators.OperationToObservableSortedList;
import rx.operators.OperationWhere;
import rx.operators.OperationZip;
import rx.operators.OperatorGroupBy;
import rx.operators.OperatorTakeUntil;
import rx.operators.OperatorToIterator;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.AtomicObserver;
import rx.util.Range;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
import rx.util.functions.Func3;
import rx.util.functions.Func4;
import rx.util.functions.FuncN;
import rx.util.functions.Function;
import rx.util.functions.FunctionLanguageAdaptor;
import rx.util.functions.Functions;
/**
* The Observable interface that implements the Reactive Pattern.
*
* It provides overloaded methods for subscribing as well as delegate methods to the various operators.
*
* The documentation for this interface makes use of marble diagrams. The following legend explains
* these diagrams:
*
*
*
* For more information see the RxJava Wiki
*
* @param
*/
public class Observable {
private final static RxJavaObservableExecutionHook HOOK = RxJavaPlugins.getInstance().getObservableExecutionHook();
private final Func1, Subscription> onSubscribe;
protected Observable() {
this(null);
}
/**
* Construct an Observable with Function to execute when subscribed to.
*
* NOTE: Generally you're better off using {@link #create(Func1)} to create an Observable instead of using inheritance.
*
* @param onSubscribe
* {@link Func1} to be executed when {@link #subscribe(Observer)} is called.
*/
protected Observable(Func1, Subscription> onSubscribe) {
this.onSubscribe = onSubscribe;
}
/**
* an {@link Observer} must call an Observable's subscribe method in order to register itself
* to receive push-based notifications from the Observable. A typical implementation of the
* subscribe method does the following:
*
* It stores a reference to the Observer in a collection object, such as a List
* object.
*
* It returns a reference to the {@link Subscription} interface. This enables
* Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has
* finished sending them and has called the Observer's {@link Observer#onCompleted()} method.
*
* At any given time, a particular instance of an Observable implementation is
* responsible for accepting all subscriptions and notifying all subscribers. Unless the
* documentation for a particular Observable implementation indicates otherwise,
* Observers should make no assumptions about the Observable implementation, such
* as the order of notifications that multiple Observers will receive.
*
* For more information see the RxJava Wiki
*
*
* @param observer
* @return a {@link Subscription} reference that allows observers
* to stop receiving notifications before the provider has finished sending them
*/
public Subscription subscribe(Observer observer) {
// allow the hook to intercept and/or decorate
Func1, Subscription> onSubscribeFunction = HOOK.onSubscribeStart(this, onSubscribe);
// validate and proceed
if (onSubscribeFunction == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
// the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception
}
try {
/**
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
if (isInternalImplementation(observer)) {
Subscription s = onSubscribeFunction.call(observer);
if (s == null) {
// this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens
// we want to gracefully handle it the same as AtomicObservableSubscription does
return HOOK.onSubscribeReturn(this, Subscriptions.empty());
} else {
return HOOK.onSubscribeReturn(this, s);
}
} else {
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
subscription.wrap(onSubscribeFunction.call(new AtomicObserver<>(subscription, observer)));
return HOOK.onSubscribeReturn(this, subscription);
}
} catch (Exception e) {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
observer.onError(HOOK.onSubscribeError(this, e));
} catch (Exception e2) {
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
HOOK.onSubscribeError(this, r);
throw r;
}
return Subscriptions.empty();
}
}
/**
* an {@link Observer} must call an Observable's subscribe method in order to register itself
* to receive push-based notifications from the Observable. A typical implementation of the
* subscribe method does the following:
*
* It stores a reference to the Observer in a collection object, such as a List
* object.
*
* It returns a reference to the {@link Subscription} interface. This enables
* Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has
* finished sending them and has called the Observer's {@link Observer#onCompleted()} method.
*
* At any given time, a particular instance of an Observable implementation is
* responsible for accepting all subscriptions and notifying all subscribers. Unless the
* documentation for a particular Observable implementation indicates otherwise,
* Observers should make no assumptions about the Observable implementation, such
* as the order of notifications that multiple Observers will receive.
*
* For more information see the RxJava Wiki
*
*
* @param observer
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @return a {@link Subscription} reference that allows observers
* to stop receiving notifications before the provider has finished sending them
*/
public Subscription subscribe(Observer observer, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(observer);
}
/**
* Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
private Subscription protectivelyWrapAndSubscribe(Observer o) {
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
return subscription.wrap(subscribe(new AtomicObserver<>(subscription, o)));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Map callbacks) {
// lookup and memoize onNext
Object _onNext = callbacks.get("onNext");
if (_onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNext = Functions.from(_onNext);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
Object onComplete = callbacks.get("onCompleted");
if (onComplete != null) {
Functions.from(onComplete).call();
}
}
@Override
public void onError(Exception e) {
handleError(e);
Object onError = callbacks.get("onError");
if (onError != null) {
Functions.from(onError).call(e);
}
}
@Override
public void onNext(Object args) {
onNext.call(args);
}
});
}
public Subscription subscribe(final Map callbacks, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(callbacks);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object o) {
if (o instanceof Observer) {
// in case a dynamic language is not correctly handling the overloaded methods and we receive an Observer just forward to the correct method.
return subscribe((Observer) o);
}
// lookup and memoize onNext
if (o == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNext = Functions.from(o);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Exception e) {
handleError(e);
// no callback defined
}
@Override
public void onNext(Object args) {
onNext.call(args);
}
});
}
public Subscription subscribe(final Object o, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(o);
}
public Subscription subscribe(final Action1 onNext) {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Exception e) {
handleError(e);
// no callback defined
}
@Override
public void onNext(T args) {
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
onNext.call(args);
}
});
}
public Subscription subscribe(final Action1 onNext, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError) {
// lookup and memoize onNext
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNextFunction = Functions.from(onNext);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
Functions.from(onError).call(e);
}
}
@Override
public void onNext(Object args) {
onNextFunction.call(args);
}
});
}
public Subscription subscribe(final Object onNext, final Object onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}
public Subscription subscribe(final Action1 onNext, final Action1 onError) {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
onError.call(e);
}
}
@Override
public void onNext(T args) {
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
onNext.call(args);
}
});
}
public Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) {
// lookup and memoize onNext
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNextFunction = Functions.from(onNext);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
if (onComplete != null) {
Functions.from(onComplete).call();
}
}
@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
Functions.from(onError).call(e);
}
}
@Override
public void onNext(Object args) {
onNextFunction.call(args);
}
});
}
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}
public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
onComplete.call();
}
@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
onError.call(e);
}
}
@Override
public void onNext(T args) {
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
onNext.call(args);
}
});
}
public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}
/**
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
*
* NOTE: This will block even if the Observable is asynchronous.
*
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
*
* @param onNext
* {@link Action1}
* @throws RuntimeException
* if error occurs
*/
public void forEach(final Action1 onNext) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference exceptionFromOnError = new AtomicReference<>();
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Exception e) {
/*
* If we receive an onError event we set the reference on the outer thread
* so we can git it and throw after the latch.await().
*
* We do this instead of throwing directly since this may be on a different thread and the latch is still waiting.
*/
exceptionFromOnError.set(e);
latch.countDown();
}
@Override
public void onNext(T args) {
onNext.call(args);
}
});
// block until the subscription completes and then return
try {
latch.await();
} catch (InterruptedException e) {
// set the interrupted flag again so callers can still get it
// for more information see https://github.com/Netflix/RxJava/pull/147#issuecomment-13624780
Thread.currentThread().interrupt();
// using Runtime so it is not checked
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
}
if (exceptionFromOnError.get() != null) {
if (exceptionFromOnError.get() instanceof RuntimeException) {
throw (RuntimeException) exceptionFromOnError.get();
} else {
throw new RuntimeException(exceptionFromOnError.get());
}
}
}
/**
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
*
* NOTE: This will block even if the Observable is asynchronous.
*
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
*
* @param o
* {@link Action1}
* @throws RuntimeException
* if error occurs
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public void forEach(final Object o) {
if (o instanceof Action1) {
// in case a dynamic language is not correctly handling the overloaded methods and we receive an Action1 just forward to the correct method.
forEach((Action1) o);
}
// lookup and memoize onNext
if (o == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNext = Functions.from(o);
forEach(onNext::call);
}
/**
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
*
* @return The single element in the observable sequence.
*/
public T single() {
return single(this);
}
/**
* Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence.
*
* @param predicate
* A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence.
*/
public T single(Func1 predicate) {
return single(this, predicate);
}
/**
* Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence.
*
* @param predicate
* A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence.
*/
public T single(Object predicate) {
return single(this, predicate);
}
/**
* Returns the only element of an observable sequence, or a default value if the observable sequence is empty.
*
* @param defaultValue
* default value for a sequence.
* @return The single element in the observable sequence, or a default value if no value is found.
*/
public T singleOrDefault(T defaultValue) {
return singleOrDefault(this, defaultValue);
}
/**
* Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found.
*
* @param defaultValue
* default value for a sequence.
* @param predicate
* A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence, or a default value if no value is found.
*/
public T singleOrDefault(T defaultValue, Func1 predicate) {
return singleOrDefault(this, defaultValue, predicate);
}
/**
* Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found.
*
* @param defaultValue
* default value for a sequence.
* @param predicate
* A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence, or a default value if no value is found.
*/
public T singleOrDefault(T defaultValue, Object predicate) {
return singleOrDefault(this, defaultValue, predicate);
}
/**
* Allow the {@link RxJavaErrorHandler} to receive the exception from onError.
*
* @param e
*/
private void handleError(Exception e) {
// onError should be rare so we'll only fetch when needed
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
}
/**
* An Observable that never sends any information to an {@link Observer}.
*
* This Observable is useful primarily for testing purposes.
*
* @param
* the type of item emitted by the Observable
*/
private static class NeverObservable extends Observable {
public NeverObservable() {
super((Observer t1) -> Subscriptions.empty());
}
}
/**
* an Observable that calls {@link Observer#onError(Exception)} when the Observer subscribes.
*
* @param
* the type of object returned by the Observable
*/
private static class ThrowObservable extends Observable {
public ThrowObservable(final Exception exception) {
/**
* Accepts an {@link Observer} and calls its onError method.
*
* @param observer
* an {@link Observer} of this Observable
* @return a reference to the subscription
*/
super((Observer observer) -> {
observer.onError(exception);
return Subscriptions.empty();
});
}
}
/**
* Creates an Observable that will execute the given function when a {@link Observer} subscribes to it.
*
* Write the function you pass to create so that it behaves as an Observable - calling the passed-in
* onNext, onError, and onCompleted methods appropriately.
*
* A well-formed Observable must call either the {@link Observer}'s onCompleted method exactly once or its onError method exactly once.
*
* See Rx Design Guidelines (PDF) for detailed information.
*
* @param
* the type emitted by the Observable sequence
* @param func
* a function that accepts an Observer and calls its onNext, onError, and onCompleted methods
* as appropriate, and returns a {@link Subscription} to allow canceling the subscription (if applicable)
* @return an Observable that, when an {@link Observer} subscribes to it, will execute the given function
*/
public static Observable create(Func1, Subscription> func) {
return new Observable<>(func);
}
/**
* Creates an Observable that will execute the given function when a {@link Observer} subscribes to it.
*
* This method accept {@link Object} to allow different languages to pass in closures using {@link FunctionLanguageAdaptor}.
*
* Write the function you pass to create so that it behaves as an Observable - calling the passed-in
* onNext, onError, and onCompleted methods appropriately.
*
* A well-formed Observable must call either the {@link Observer}'s onCompleted method exactly once or its onError method exactly once.
*
* See Rx Design Guidelines (PDF) for detailed information.
*
* @param
* the type emitted by the Observable sequence
* @param func
* a function that accepts an Observer and calls its onNext, onError, and onCompleted methods
* as appropriate, and returns a {@link Subscription} to allow canceling the subscription (if applicable)
* @return an Observable that, when an {@link Observer} subscribes to it, will execute the given function
*/
public static Observable create(final Object func) {
final FuncN _f = Functions.from(func);
return create((Observer t1) -> (Subscription) _f.call(t1));
}
/**
* Returns an Observable that returns no data to the {@link Observer} and immediately invokes its onCompleted method.
*
*
*
* @param
* the type of item emitted by the Observable
* @return an Observable that returns no data to the {@link Observer} and immediately invokes the {@link Observer}'s onCompleted method
*/
public static Observable empty() {
return toObservable(new ArrayList());
}
/**
* Returns an Observable that calls onError when an {@link Observer} subscribes to it.
*
*
* @param exception
* the error to throw
* @param
* the type of object returned by the Observable
* @return an Observable object that calls onError when an {@link Observer} subscribes
*/
public static Observable error(Exception exception) {
return new ThrowObservable<>(exception);
}
/**
* Filters an Observable by discarding any of its emissions that do not meet some test.
*
*
*
* @param that
* the Observable to filter
* @param predicate
* a function that evaluates the items emitted by the source Observable, returning true if they pass the filter
* @return an Observable that emits only those items in the original Observable that the filter evaluates as true
*/
public static Observable filter(Observable that, Func1 predicate) {
return create(OperationFilter.filter(that, predicate));
}
/**
* Filters an Observable by discarding any of its emissions that do not meet some test.
*
*
*
* @param that
* the Observable to filter
* @param function
* a function that evaluates the items emitted by the source Observable, returning true if they pass the filter
* @return an Observable that emits only those items in the original Observable that the filter evaluates as true
*/
public static Observable filter(Observable that, final Object function) {
final FuncN _f = Functions.from(function);
return filter(that, (T t1) -> (Boolean) _f.call(t1));
}
/**
* Filters an Observable by discarding any of its emissions that do not meet some test.
*
*
*
* @param that
* the Observable to filter
* @param predicate
* a function that evaluates the items emitted by the source Observable, returning true if they pass the filter
* @return an Observable that emits only those items in the original Observable that the filter evaluates as true
*/
public static Observable where(Observable that, Func1 predicate) {
return create(OperationWhere.where(that, predicate));
}
/**
* Converts an {@link Iterable} sequence to an Observable sequence.
*
* @param iterable
* the source {@link Iterable} sequence
* @param
* the type of items in the {@link Iterable} sequence and the type emitted by the resulting Observable
* @return an Observable that emits each item in the source {@link Iterable} sequence
* @see {@link #toObservable(Iterable)}
*/
public static Observable from(Iterable iterable) {
return toObservable(iterable);
}
/**
* Converts an Array to an Observable sequence.
*
* @param items
* the source Array
* @param
* the type of items in the Array, and the type of items emitted by the resulting Observable
* @return an Observable that emits each item in the source Array
* @see {@link #toObservable(Object...)}
*/
public static Observable from(T... items) {
return toObservable(items);
}
/**
* Generates an observable sequence of integral numbers within a specified range.
*
* @param start
* The value of the first integer in the sequence
* @param count
* The number of sequential integers to generate.
*
* @return An observable sequence that contains a range of sequential integral numbers.
*
* @see Observable.Range Method (Int32, Int32)
*/
public static Observable range(int start, int count) {
return from(Range.createWithCount(start, count));
}
/**
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
*
* @param source
* the source observable.
* @param scheduler
* the scheduler to perform subscription and unsubscription actions on.
* @param
* the type of observable.
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public static Observable subscribeOn(Observable source, Scheduler scheduler) {
return create(OperationSubscribeOn.subscribeOn(source, scheduler));
}
/**
* Asynchronously notify observers on the specified scheduler.
*
* @param source
* the source observable.
* @param scheduler
* the scheduler to notify observers on.
* @param
* the type of observable.
* @return the source sequence whose observations happen on the specified scheduler.
*/
public static Observable observeOn(Observable source, Scheduler scheduler) {
return create(OperationObserveOn.observeOn(source, scheduler));
}
/**
* Returns an observable sequence that invokes the observable factory whenever a new observer subscribes.
* The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer
* subscribes to the sequence. This is useful to allow an observer to easily obtain an updates or refreshed version
* of the sequence.
*
* @param observableFactory
* the observable factory function to invoke for each observer that subscribes to the resulting sequence.
* @param
* the type of the observable.
* @return the observable sequence whose observers trigger an invocation of the given observable factory function.
*/
public static Observable defer(Func0> observableFactory) {
return create(OperationDefer.defer(observableFactory));
}
/**
* Returns an observable sequence that invokes the observable factory whenever a new observer subscribes.
* The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer
* subscribes to the sequence. This is useful to allow an observer to easily obtain an updates or refreshed version
* of the sequence.
*
* @param observableFactory
* the observable factory function to invoke for each observer that subscribes to the resulting sequence.
* @param
* the type of the observable.
* @return the observable sequence whose observers trigger an invocation of the given observable factory function.
*/
public static Observable defer(Object observableFactory) {
final FuncN _f = Functions.from(observableFactory);
return create(OperationDefer.defer(() -> (Observable) _f.call()));
}
/**
* Returns an Observable that notifies an {@link Observer} of a single value and then completes.
*
* To convert any object into an Observable that emits that object, pass that object into the just method.
*
* This is similar to the {@link #toObservable} method, except that toObservable will convert
* an {@link Iterable} object into an Observable that emits each of the items in the {@link Iterable}, one
* at a time, while the just method would convert the {@link Iterable} into an Observable
* that emits the entire {@link Iterable} as a single item.
*
*
*
* @param value
* the value to pass to the Observer's onNext method
* @param
* the type of the value
* @return an Observable that notifies an {@link Observer} of a single value and then completes
*/
public static Observable just(T value) {
List list = new ArrayList<>();
list.add(value);
return toObservable(list);
}
/**
* Returns the last element of an observable sequence with a specified source.
*
* @param that
* the source Observable
* @return the last element in the observable sequence.
*/
public static T last(final Observable that) {
T result = null;
for (T value : that.toIterable()) {
result = value;
}
return result;
}
/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param that
* the source Observable
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public static T last(final Observable that, final Func1 predicate) {
return last(that.filter(predicate));
}
/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param that
* the source Observable
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public static T last(final Observable that, final Object predicate) {
return last(that.filter(predicate));
}
/**
* Returns the last element of an observable sequence, or a default value if no value is found.
*
* @param source
* the source observable.
* @param defaultValue
* a default value that would be returned if observable is empty.
* @param
* the type of source.
* @return the last element of an observable sequence that matches the predicate, or a default value if no value is found.
*/
public static T lastOrDefault(Observable source, T defaultValue) {
boolean found = false;
T result = null;
for (T value : source.toIterable()) {
found = true;
result = value;
}
if (!found) {
return defaultValue;
}
return result;
}
/**
* Returns the last element of an observable sequence that matches the predicate, or a default value if no value is found.
*
* @param source
* the source observable.
* @param defaultValue
* a default value that would be returned if observable is empty.
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @param
* the type of source.
* @return the last element of an observable sequence that matches the predicate, or a default value if no value is found.
*/
public static T lastOrDefault(Observable source, T defaultValue, Func1 predicate) {
return lastOrDefault(source.filter(predicate), defaultValue);
}
/**
* Returns the last element of an observable sequence that matches the predicate, or a default value if no value is found.
*
* @param source
* the source observable.
* @param defaultValue
* a default value that would be returned if observable is empty.
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @param
* the type of source.
* @return the last element of an observable sequence that matches the predicate, or a default value if no value is found.
*/
public static T lastOrDefault(Observable source, T defaultValue, Object predicate) {
final FuncN _f = Functions.from(predicate);
return lastOrDefault(source, defaultValue, (T args) -> (Boolean) _f.call(args));
}
/**
* Applies a function of your choosing to every notification emitted by an Observable, and returns
* this transformation as a new Observable sequence.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item in the sequence emitted by the source Observable
* @param
* the type of items emitted by the the source Observable
* @param
* the type of items returned by map function
* @return an Observable that is the result of applying the transformation function to each item
* in the sequence emitted by the source Observable
*/
public static Observable map(Observable sequence, Func1 func) {
return create(OperationMap.map(sequence, func));
}
/**
* Applies a function of your choosing to every notification emitted by an Observable, and returns
* this transformation as a new Observable sequence.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item in the sequence emitted by the source Observable
* @param
* the type of items emitted by the the source Observable
* @param
* the type of items returned by map function
* @return an Observable that is the result of applying the transformation function to each item
* in the sequence emitted by the source Observable
*/
public static Observable map(Observable sequence, final Object func) {
final FuncN _f = Functions.from(func);
return map(sequence, (T t1) -> (R) _f.call(t1));
}
/**
* Creates a new Observable sequence by applying a function that you supply to each object in the
* original Observable sequence, where that function is itself an Observable that emits objects,
* and then merges the results of that function applied to every item emitted by the original
* Observable, emitting these merged results as its own sequence.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item emitted by the source Observable, generating a
* Observable
* @param
* the type emitted by the source Observable
* @param
* the type emitted by the Observables emitted by func
* @return an Observable that emits a sequence that is the result of applying the transformation
* function to each item emitted by the source Observable and merging the results of
* the Observables obtained from this transformation
*/
public static Observable mapMany(Observable sequence, Func1> func) {
return create(OperationMap.mapMany(sequence, func));
}
/**
* Creates a new Observable sequence by applying a function that you supply to each object in the
* original Observable sequence, where that function is itself an Observable that emits objects,
* and then merges the results of that function applied to every item emitted by the original
* Observable, emitting these merged results as its own sequence.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item emitted by the source Observable, generating a
* Observable
* @param
* the type emitted by the source Observable
* @param
* the type emitted by the Observables emitted by func
* @return an Observable that emits a sequence that is the result of applying the transformation
* function to each item emitted by the source Observable and merging the results of
* the Observables obtained from this transformation
*/
public static Observable mapMany(Observable sequence, final Object func) {
final FuncN _f = Functions.from(func);
return mapMany(sequence, (T t1) -> (Observable) _f.call(t1));
}
/**
* Materializes the implicit notifications of an observable sequence as explicit notification values.
*
*
*
* @param sequence
* An observable sequence of elements to project.
* @return An observable sequence whose elements are the result of materializing the notifications of the given sequence.
* @see MSDN: Observable.Materialize
*/
public static Observable> materialize(final Observable sequence) {
return create(OperationMaterialize.materialize(sequence));
}
/**
* Dematerializes the explicit notification values of an observable sequence as implicit notifications.
*
* @param sequence
* An observable sequence containing explicit notification values which have to be turned into implicit notifications.
* @return An observable sequence exhibiting the behavior corresponding to the source sequence's notification values.
* @see MSDN: Observable.Dematerialize
*/
public static Observable dematerialize(final Observable> sequence) {
return create(OperationDematerialize.dematerialize(sequence));
}
/**
* Flattens the Observable sequences from a list of Observables into one Observable sequence
* without any transformation. You can combine the output of multiple Observables so that they
* act like a single Observable, by using the merge method.
*
*
*
* @param source
* a list of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the source list of Observables
* @see MSDN: Observable.Merge
*/
public static Observable merge(List> source) {
return create(OperationMerge.merge(source));
}
/**
* Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a
* Observable into one Observable sequence without any transformation. You can combine the output
* of multiple Observables so that they act like a single Observable, by using the merge method.
*
*
*
* @param source
* an Observable that emits Observables
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the Observables emitted by the source Observable
* @see MSDN: Observable.Merge Method
*/
public static Observable merge(Observable> source) {
return create(OperationMerge.merge(source));
}
/**
* Flattens the Observable sequences from a series of Observables into one Observable sequence
* without any transformation. You can combine the output of multiple Observables so that they
* act like a single Observable, by using the merge method.
*
*
*
* @param source
* a series of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the source Observables
* @see MSDN: Observable.Merge Method
*/
public static Observable merge(Observable... source) {
return create(OperationMerge.merge(source));
}
/**
* Returns the values from the source observable sequence until the other observable sequence produces a value.
*
* @param source
* the source sequence to propagate elements for.
* @param other
* the observable sequence that terminates propagation of elements of the source sequence.
* @param
* the type of source.
* @param
* the other type.
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
*/
public static Observable takeUntil(final Observable source, final Observable other) {
return OperatorTakeUntil.takeUntil(source, other);
}
/**
* Combines the objects emitted by two or more Observables, and emits the result as a single Observable,
* by using the concat method.
*
*
*
* @param source
* a series of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of combining the
* output from the source Observables
* @see MSDN: Observable.Concat Method
*/
public static Observable concat(Observable... source) {
return create(OperationConcat.concat(source));
}
/**
* Emits the same objects as the given Observable, calling the given action
* when it calls onComplete or onError.
*
* @param source
* an observable
* @param action
* an action to be called when the source completes or errors.
* @return an Observable that emits the same objects, then calls the action.
* @see MSDN: Observable.Finally Method
*/
public static Observable finallyDo(Observable source, Action0 action) {
return create(OperationFinally.finallyDo(source, action));
}
/**
* Groups the elements of an observable and selects the resulting elements by using a specified function.
*
* @param source
* an observable whose elements to group.
* @param keySelector
* a function to extract the key for each element.
* @param elementSelector
* a function to map each source element to an element in an observable group.
* @param
* the key type.
* @param
* the source type.
* @param
* the resulting observable type.
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
*/
public static Observable> groupBy(Observable source, final Func1 keySelector, final Func1 elementSelector) {
return create(OperatorGroupBy.groupBy(source, keySelector, elementSelector));
}
/**
* Groups the elements of an observable according to a specified key selector function and
*
* @param source
* an observable whose elements to group.
* @param keySelector
* a function to extract the key for each element.
* @param
* the key type.
* @param
* the source type.
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
*/
public static Observable> groupBy(Observable source, final Func1 keySelector) {
return create(OperatorGroupBy.groupBy(source, keySelector));
}
/**
* Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error.
*
* Only the first onError received will be sent.
*
* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented.
*
*
*
* @param source
* a list of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the source list of Observables
* @see MSDN: Observable.Merge Method
*/
public static Observable mergeDelayError(List> source) {
return create(OperationMergeDelayError.mergeDelayError(source));
}
/**
* Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error.
*
* Only the first onError received will be sent.
*
* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented.
*
*
*
* @param source
* an Observable that emits Observables
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the Observables emitted by the source Observable
* @see MSDN: Observable.Merge Method
*/
public static Observable mergeDelayError(Observable> source) {
return create(OperationMergeDelayError.mergeDelayError(source));
}
/**
* Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error.
*
* Only the first onError received will be sent.
*
* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented.
*
*
*
* @param source
* a series of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the source Observables
* @see MSDN: Observable.Merge Method
*/
public static Observable mergeDelayError(Observable... source) {
return create(OperationMergeDelayError.mergeDelayError(source));
}
/**
* Returns an Observable that never sends any information to an {@link Observer}.
*
* This observable is useful primarily for testing purposes.
*
* @param
* the type of item (not) emitted by the Observable
* @return an Observable that never sends any information to an {@link Observer}
*/
public static Observable never() {
return new NeverObservable<>();
}
/**
* Instruct an Observable to pass control to another Observable (the return value of a function)
* rather than calling onError if it encounters an error.
*
* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer,
* the Observable calls its {@link Observer}'s onError function, and then quits without calling any more
* of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a
* function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method,
* if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it
* will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if
* it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may
* never know that an error happened.
*
* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered.
*
*
*
* @param that
* the source Observable
* @param resumeFunction
* a function that returns an Observable that will take over if the source Observable
* encounters an error
* @return the source Observable, with its behavior modified as described
*/
public static Observable onErrorResumeNext(final Observable that, final Func1> resumeFunction) {
return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction));
}
/**
* Instruct an Observable to pass control to another Observable (the return value of a function)
* rather than calling onError if it encounters an error.
*
* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer,
* the Observable calls its {@link Observer}'s onError function, and then quits without calling any more
* of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a
* function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method,
* if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it
* will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if
* it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may
* never know that an error happened.
*
* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered.
*
*
*
* @param that
* the source Observable
* @param resumeFunction
* a function that returns an Observable that will take over if the source Observable
* encounters an error
* @return the source Observable, with its behavior modified as described
*/
public static Observable onErrorResumeNext(final Observable that, final Object resumeFunction) {
final FuncN _f = Functions.from(resumeFunction);
return onErrorResumeNext(that, (Exception e) -> (Observable) _f.call(e));
}
/**
* Instruct an Observable to pass control to another Observable rather than calling onError if it encounters an error.
*
* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer,
* the Observable calls its {@link Observer}'s onError function, and then quits without calling any more
* of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a
* function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method,
* if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it
* will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if
* it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may
* never know that an error happened.
*
* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered.
*
*
*
* @param that
* the source Observable
* @param resumeSequence
* a function that returns an Observable that will take over if the source Observable
* encounters an error
* @return the source Observable, with its behavior modified as described
*/
public static Observable onErrorResumeNext(final Observable that, final Observable resumeSequence) {
return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence));
}
/**
* Instruct an Observable to emit a particular item to its Observer's onNext function
* rather than calling onError if it encounters an error.
*
* By default, when an Observable encounters an error that prevents it from emitting the expected item to its {@link Observer}, the Observable calls its {@link Observer}'s onError
* function, and then quits
* without calling any more of its {@link Observer}'s closures. The onErrorReturn method changes
* this behavior. If you pass a function (resumeFunction) to an Observable's onErrorReturn
* method, if the original Observable encounters an error, instead of calling its {@link Observer}'s
* onError function, it will instead pass the return value of resumeFunction to the {@link Observer}'s onNext method.
*
* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered.
*
* @param that
* the source Observable
* @param resumeFunction
* a function that returns a value that will be passed into an {@link Observer}'s onNext function if the Observable encounters an error that would
* otherwise cause it to call onError
* @return the source Observable, with its behavior modified as described
*/
public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) {
return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction));
}
/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
* by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the final result from the final call to your function as its sole
* output.
*
* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject
* method that does a similar operation on lists.
*
*
*
* @param
* the type item emitted by the source Observable
* @param sequence
* the source Observable
* @param accumulator
* an accumulator function to be invoked on each element from the sequence, whose
* result will be used in the next accumulator call (if applicable)
*
* @return an Observable that emits a single element that is the result of accumulating the
* output from applying the accumulator to the sequence of items emitted by the source
* Observable
* @see MSDN: Observable.Aggregate
* @see Wikipedia: Fold (higher-order function)
*/
public static Observable reduce(Observable sequence, Func2 accumulator) {
return takeLast(create(OperationScan.scan(sequence, accumulator)), 1);
}
/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
* by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the final result from the final call to your function as its sole
* output.
*
* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject
* method that does a similar operation on lists.
*
*
*
* @param
* the type item emitted by the source Observable
* @param sequence
* the source Observable
* @param accumulator
* an accumulator function to be invoked on each element from the sequence, whose
* result will be used in the next accumulator call (if applicable)
*
* @return an Observable that emits a single element that is the result of accumulating the
* output from applying the accumulator to the sequence of items emitted by the source
* Observable
* @see MSDN: Observable.Aggregate
* @see Wikipedia: Fold (higher-order function)
*/
public static Observable reduce(final Observable sequence, final Object accumulator) {
final FuncN _f = Functions.from(accumulator);
return reduce(sequence, (T t1, T t2) -> (T) _f.call(t1, t2));
}
/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
* by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the final result from the final call to your function as its sole
* output.
*
* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject
* method that does a similar operation on lists.
*
*
*
* @param
* the type item emitted by the source Observable
* @param sequence
* the source Observable
* @param initialValue
* a seed passed into the first execution of the accumulator function
* @param accumulator
* an accumulator function to be invoked on each element from the sequence, whose
* result will be used in the next accumulator call (if applicable)
*
* @return an Observable that emits a single element that is the result of accumulating the
* output from applying the accumulator to the sequence of items emitted by the source
* Observable
* @see MSDN: Observable.Aggregate
* @see Wikipedia: Fold (higher-order function)
*/
public static Observable reduce(Observable sequence, T initialValue, Func2 accumulator) {
return takeLast(create(OperationScan.scan(sequence, initialValue, accumulator)), 1);
}
/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
* by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the final result from the final call to your function as its sole
* output.
*
* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject
* method that does a similar operation on lists.
*
*
*
* @param
* the type item emitted by the source Observable
* @param sequence
* the source Observable
* @param initialValue
* a seed passed into the first execution of the accumulator function
* @param accumulator
* an accumulator function to be invoked on each element from the sequence, whose
* result will be used in the next accumulator call (if applicable)
* @return an Observable that emits a single element that is the result of accumulating the
* output from applying the accumulator to the sequence of items emitted by the source
* Observable
* @see MSDN: Observable.Aggregate
* @see Wikipedia: Fold (higher-order function)
*/
public static Observable reduce(final Observable sequence, final T initialValue, final Object accumulator) {
final FuncN _f = Functions.from(accumulator);
return reduce(sequence, initialValue, (T t1, T t2) -> (T) _f.call(t1, t2));
}
/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
* by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the result of each of these iterations as its own sequence.
*
*