/** * Copyright 2013 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package rx; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import rx.observables.GroupedObservable; import rx.operators.OperationAll; import rx.operators.OperationConcat; import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; import rx.operators.OperationFinally; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; import rx.operators.OperationMergeDelayError; import rx.operators.OperationMostRecent; import rx.operators.OperationNext; import rx.operators.OperationObserveOn; import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSubscribeOn; import rx.operators.OperationSynchronize; import rx.operators.OperationTake; import rx.operators.OperationTakeLast; import rx.operators.OperationTakeWhile; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; import rx.operators.OperationToObservableList; import rx.operators.OperationToObservableSortedList; import rx.operators.OperationWhere; import rx.operators.OperationZip; import rx.operators.OperatorGroupBy; import rx.operators.OperatorTakeUntil; import rx.operators.OperatorToIterator; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.AtomicObserver; import rx.util.Range; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; import rx.util.functions.Func2; import rx.util.functions.Func3; import rx.util.functions.Func4; import rx.util.functions.FuncN; import rx.util.functions.Function; import rx.util.functions.FunctionLanguageAdaptor; import rx.util.functions.Functions; /** * The Observable interface that implements the Reactive Pattern. *

* It provides overloaded methods for subscribing as well as delegate methods to the various operators. *

* The documentation for this interface makes use of marble diagrams. The following legend explains * these diagrams: *

* *

* For more information see the RxJava Wiki * * @param */ public class Observable { private final static RxJavaObservableExecutionHook HOOK = RxJavaPlugins.getInstance().getObservableExecutionHook(); private final Func1, Subscription> onSubscribe; protected Observable() { this(null); } /** * Construct an Observable with Function to execute when subscribed to. *

* NOTE: Generally you're better off using {@link #create(Func1)} to create an Observable instead of using inheritance. * * @param onSubscribe * {@link Func1} to be executed when {@link #subscribe(Observer)} is called. */ protected Observable(Func1, Subscription> onSubscribe) { this.onSubscribe = onSubscribe; } /** * an {@link Observer} must call an Observable's subscribe method in order to register itself * to receive push-based notifications from the Observable. A typical implementation of the * subscribe method does the following: *

* It stores a reference to the Observer in a collection object, such as a List * object. *

* It returns a reference to the {@link Subscription} interface. This enables * Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has * finished sending them and has called the Observer's {@link Observer#onCompleted()} method. *

* At any given time, a particular instance of an Observable implementation is * responsible for accepting all subscriptions and notifying all subscribers. Unless the * documentation for a particular Observable implementation indicates otherwise, * Observers should make no assumptions about the Observable implementation, such * as the order of notifications that multiple Observers will receive. *

* For more information see the RxJava Wiki * * * @param observer * @return a {@link Subscription} reference that allows observers * to stop receiving notifications before the provider has finished sending them */ public Subscription subscribe(Observer observer) { // allow the hook to intercept and/or decorate Func1, Subscription> onSubscribeFunction = HOOK.onSubscribeStart(this, onSubscribe); // validate and proceed if (onSubscribeFunction == null) { throw new IllegalStateException("onSubscribe function can not be null."); // the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception } try { /** * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ if (isInternalImplementation(observer)) { Subscription s = onSubscribeFunction.call(observer); if (s == null) { // this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens // we want to gracefully handle it the same as AtomicObservableSubscription does return HOOK.onSubscribeReturn(this, Subscriptions.empty()); } else { return HOOK.onSubscribeReturn(this, s); } } else { AtomicObservableSubscription subscription = new AtomicObservableSubscription(); subscription.wrap(onSubscribeFunction.call(new AtomicObserver<>(subscription, observer))); return HOOK.onSubscribeReturn(this, subscription); } } catch (Exception e) { // if an unhandled error occurs executing the onSubscribe we will propagate it try { observer.onError(HOOK.onSubscribeError(this, e)); } catch (Exception e2) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); HOOK.onSubscribeError(this, r); throw r; } return Subscriptions.empty(); } } /** * an {@link Observer} must call an Observable's subscribe method in order to register itself * to receive push-based notifications from the Observable. A typical implementation of the * subscribe method does the following: *

* It stores a reference to the Observer in a collection object, such as a List * object. *

* It returns a reference to the {@link Subscription} interface. This enables * Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has * finished sending them and has called the Observer's {@link Observer#onCompleted()} method. *

* At any given time, a particular instance of an Observable implementation is * responsible for accepting all subscriptions and notifying all subscribers. Unless the * documentation for a particular Observable implementation indicates otherwise, * Observers should make no assumptions about the Observable implementation, such * as the order of notifications that multiple Observers will receive. *

* For more information see the RxJava Wiki * * * @param observer * @param scheduler * The {@link Scheduler} that the sequence is subscribed to on. * @return a {@link Subscription} reference that allows observers * to stop receiving notifications before the provider has finished sending them */ public Subscription subscribe(Observer observer, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(observer); } /** * Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance. *

* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ private Subscription protectivelyWrapAndSubscribe(Observer o) { AtomicObservableSubscription subscription = new AtomicObservableSubscription(); return subscription.wrap(subscribe(new AtomicObserver<>(subscription, o))); } @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Map callbacks) { // lookup and memoize onNext Object _onNext = callbacks.get("onNext"); if (_onNext == null) { throw new RuntimeException("onNext must be implemented"); } final FuncN onNext = Functions.from(_onNext); /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { Object onComplete = callbacks.get("onCompleted"); if (onComplete != null) { Functions.from(onComplete).call(); } } @Override public void onError(Exception e) { handleError(e); Object onError = callbacks.get("onError"); if (onError != null) { Functions.from(onError).call(e); } } @Override public void onNext(Object args) { onNext.call(args); } }); } public Subscription subscribe(final Map callbacks, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(callbacks); } @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object o) { if (o instanceof Observer) { // in case a dynamic language is not correctly handling the overloaded methods and we receive an Observer just forward to the correct method. return subscribe((Observer) o); } // lookup and memoize onNext if (o == null) { throw new RuntimeException("onNext must be implemented"); } final FuncN onNext = Functions.from(o); /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { // do nothing } @Override public void onError(Exception e) { handleError(e); // no callback defined } @Override public void onNext(Object args) { onNext.call(args); } }); } public Subscription subscribe(final Object o, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(o); } public Subscription subscribe(final Action1 onNext) { /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { // do nothing } @Override public void onError(Exception e) { handleError(e); // no callback defined } @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); } onNext.call(args); } }); } public Subscription subscribe(final Action1 onNext, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext); } @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object onNext, final Object onError) { // lookup and memoize onNext if (onNext == null) { throw new RuntimeException("onNext must be implemented"); } final FuncN onNextFunction = Functions.from(onNext); /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { // do nothing } @Override public void onError(Exception e) { handleError(e); if (onError != null) { Functions.from(onError).call(e); } } @Override public void onNext(Object args) { onNextFunction.call(args); } }); } public Subscription subscribe(final Object onNext, final Object onError, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError); } public Subscription subscribe(final Action1 onNext, final Action1 onError) { /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { // do nothing } @Override public void onError(Exception e) { handleError(e); if (onError != null) { onError.call(e); } } @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); } onNext.call(args); } }); } public Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError); } @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) { // lookup and memoize onNext if (onNext == null) { throw new RuntimeException("onNext must be implemented"); } final FuncN onNextFunction = Functions.from(onNext); /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { if (onComplete != null) { Functions.from(onComplete).call(); } } @Override public void onError(Exception e) { handleError(e); if (onError != null) { Functions.from(onError).call(e); } } @Override public void onNext(Object args) { onNextFunction.call(args); } }); } public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); } public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { onComplete.call(); } @Override public void onError(Exception e) { handleError(e); if (onError != null) { onError.call(e); } } @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); } onNext.call(args); } }); } public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); } /** * Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated. *

* NOTE: This will block even if the Observable is asynchronous. *

* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods. * * @param onNext * {@link Action1} * @throws RuntimeException * if error occurs */ public void forEach(final Action1 onNext) { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference exceptionFromOnError = new AtomicReference<>(); /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { latch.countDown(); } @Override public void onError(Exception e) { /* * If we receive an onError event we set the reference on the outer thread * so we can git it and throw after the latch.await(). * * We do this instead of throwing directly since this may be on a different thread and the latch is still waiting. */ exceptionFromOnError.set(e); latch.countDown(); } @Override public void onNext(T args) { onNext.call(args); } }); // block until the subscription completes and then return try { latch.await(); } catch (InterruptedException e) { // set the interrupted flag again so callers can still get it // for more information see https://github.com/Netflix/RxJava/pull/147#issuecomment-13624780 Thread.currentThread().interrupt(); // using Runtime so it is not checked throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); } if (exceptionFromOnError.get() != null) { if (exceptionFromOnError.get() instanceof RuntimeException) { throw (RuntimeException) exceptionFromOnError.get(); } else { throw new RuntimeException(exceptionFromOnError.get()); } } } /** * Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated. *

* NOTE: This will block even if the Observable is asynchronous. *

* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods. * * @param o * {@link Action1} * @throws RuntimeException * if error occurs */ @SuppressWarnings({ "rawtypes", "unchecked" }) public void forEach(final Object o) { if (o instanceof Action1) { // in case a dynamic language is not correctly handling the overloaded methods and we receive an Action1 just forward to the correct method. forEach((Action1) o); } // lookup and memoize onNext if (o == null) { throw new RuntimeException("onNext must be implemented"); } final FuncN onNext = Functions.from(o); forEach(onNext::call); } /** * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. * * @return The single element in the observable sequence. */ public T single() { return single(this); } /** * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. * * @param predicate * A predicate function to evaluate for elements in the sequence. * @return The single element in the observable sequence. */ public T single(Func1 predicate) { return single(this, predicate); } /** * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. * * @param predicate * A predicate function to evaluate for elements in the sequence. * @return The single element in the observable sequence. */ public T single(Object predicate) { return single(this, predicate); } /** * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. * * @param defaultValue * default value for a sequence. * @return The single element in the observable sequence, or a default value if no value is found. */ public T singleOrDefault(T defaultValue) { return singleOrDefault(this, defaultValue); } /** * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. * * @param defaultValue * default value for a sequence. * @param predicate * A predicate function to evaluate for elements in the sequence. * @return The single element in the observable sequence, or a default value if no value is found. */ public T singleOrDefault(T defaultValue, Func1 predicate) { return singleOrDefault(this, defaultValue, predicate); } /** * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. * * @param defaultValue * default value for a sequence. * @param predicate * A predicate function to evaluate for elements in the sequence. * @return The single element in the observable sequence, or a default value if no value is found. */ public T singleOrDefault(T defaultValue, Object predicate) { return singleOrDefault(this, defaultValue, predicate); } /** * Allow the {@link RxJavaErrorHandler} to receive the exception from onError. * * @param e */ private void handleError(Exception e) { // onError should be rare so we'll only fetch when needed RxJavaPlugins.getInstance().getErrorHandler().handleError(e); } /** * An Observable that never sends any information to an {@link Observer}. * * This Observable is useful primarily for testing purposes. * * @param * the type of item emitted by the Observable */ private static class NeverObservable extends Observable { public NeverObservable() { super((Observer t1) -> Subscriptions.empty()); } } /** * an Observable that calls {@link Observer#onError(Exception)} when the Observer subscribes. * * @param * the type of object returned by the Observable */ private static class ThrowObservable extends Observable { public ThrowObservable(final Exception exception) { /** * Accepts an {@link Observer} and calls its onError method. * * @param observer * an {@link Observer} of this Observable * @return a reference to the subscription */ super((Observer observer) -> { observer.onError(exception); return Subscriptions.empty(); }); } } /** * Creates an Observable that will execute the given function when a {@link Observer} subscribes to it. *

* Write the function you pass to create so that it behaves as an Observable - calling the passed-in * onNext, onError, and onCompleted methods appropriately. *

* A well-formed Observable must call either the {@link Observer}'s onCompleted method exactly once or its onError method exactly once. *

* See Rx Design Guidelines (PDF) for detailed information. * * @param * the type emitted by the Observable sequence * @param func * a function that accepts an Observer and calls its onNext, onError, and onCompleted methods * as appropriate, and returns a {@link Subscription} to allow canceling the subscription (if applicable) * @return an Observable that, when an {@link Observer} subscribes to it, will execute the given function */ public static Observable create(Func1, Subscription> func) { return new Observable<>(func); } /** * Creates an Observable that will execute the given function when a {@link Observer} subscribes to it. *

* This method accept {@link Object} to allow different languages to pass in closures using {@link FunctionLanguageAdaptor}. *

* Write the function you pass to create so that it behaves as an Observable - calling the passed-in * onNext, onError, and onCompleted methods appropriately. *

* A well-formed Observable must call either the {@link Observer}'s onCompleted method exactly once or its onError method exactly once. *

* See Rx Design Guidelines (PDF) for detailed information. * * @param * the type emitted by the Observable sequence * @param func * a function that accepts an Observer and calls its onNext, onError, and onCompleted methods * as appropriate, and returns a {@link Subscription} to allow canceling the subscription (if applicable) * @return an Observable that, when an {@link Observer} subscribes to it, will execute the given function */ public static Observable create(final Object func) { final FuncN _f = Functions.from(func); return create((Observer t1) -> (Subscription) _f.call(t1)); } /** * Returns an Observable that returns no data to the {@link Observer} and immediately invokes its onCompleted method. *

* * * @param * the type of item emitted by the Observable * @return an Observable that returns no data to the {@link Observer} and immediately invokes the {@link Observer}'s onCompleted method */ public static Observable empty() { return toObservable(new ArrayList()); } /** * Returns an Observable that calls onError when an {@link Observer} subscribes to it. *

* * @param exception * the error to throw * @param * the type of object returned by the Observable * @return an Observable object that calls onError when an {@link Observer} subscribes */ public static Observable error(Exception exception) { return new ThrowObservable<>(exception); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param that * the Observable to filter * @param predicate * a function that evaluates the items emitted by the source Observable, returning true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter evaluates as true */ public static Observable filter(Observable that, Func1 predicate) { return create(OperationFilter.filter(that, predicate)); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param that * the Observable to filter * @param function * a function that evaluates the items emitted by the source Observable, returning true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter evaluates as true */ public static Observable filter(Observable that, final Object function) { final FuncN _f = Functions.from(function); return filter(that, (T t1) -> (Boolean) _f.call(t1)); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param that * the Observable to filter * @param predicate * a function that evaluates the items emitted by the source Observable, returning true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter evaluates as true */ public static Observable where(Observable that, Func1 predicate) { return create(OperationWhere.where(that, predicate)); } /** * Converts an {@link Iterable} sequence to an Observable sequence. * * @param iterable * the source {@link Iterable} sequence * @param * the type of items in the {@link Iterable} sequence and the type emitted by the resulting Observable * @return an Observable that emits each item in the source {@link Iterable} sequence * @see {@link #toObservable(Iterable)} */ public static Observable from(Iterable iterable) { return toObservable(iterable); } /** * Converts an Array to an Observable sequence. * * @param items * the source Array * @param * the type of items in the Array, and the type of items emitted by the resulting Observable * @return an Observable that emits each item in the source Array * @see {@link #toObservable(Object...)} */ public static Observable from(T... items) { return toObservable(items); } /** * Generates an observable sequence of integral numbers within a specified range. * * @param start * The value of the first integer in the sequence * @param count * The number of sequential integers to generate. * * @return An observable sequence that contains a range of sequential integral numbers. * * @see Observable.Range Method (Int32, Int32) */ public static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } /** * Asynchronously subscribes and unsubscribes observers on the specified scheduler. * * @param source * the source observable. * @param scheduler * the scheduler to perform subscription and unsubscription actions on. * @param * the type of observable. * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. */ public static Observable subscribeOn(Observable source, Scheduler scheduler) { return create(OperationSubscribeOn.subscribeOn(source, scheduler)); } /** * Asynchronously notify observers on the specified scheduler. * * @param source * the source observable. * @param scheduler * the scheduler to notify observers on. * @param * the type of observable. * @return the source sequence whose observations happen on the specified scheduler. */ public static Observable observeOn(Observable source, Scheduler scheduler) { return create(OperationObserveOn.observeOn(source, scheduler)); } /** * Returns an observable sequence that invokes the observable factory whenever a new observer subscribes. * The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer * subscribes to the sequence. This is useful to allow an observer to easily obtain an updates or refreshed version * of the sequence. * * @param observableFactory * the observable factory function to invoke for each observer that subscribes to the resulting sequence. * @param * the type of the observable. * @return the observable sequence whose observers trigger an invocation of the given observable factory function. */ public static Observable defer(Func0> observableFactory) { return create(OperationDefer.defer(observableFactory)); } /** * Returns an observable sequence that invokes the observable factory whenever a new observer subscribes. * The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer * subscribes to the sequence. This is useful to allow an observer to easily obtain an updates or refreshed version * of the sequence. * * @param observableFactory * the observable factory function to invoke for each observer that subscribes to the resulting sequence. * @param * the type of the observable. * @return the observable sequence whose observers trigger an invocation of the given observable factory function. */ public static Observable defer(Object observableFactory) { final FuncN _f = Functions.from(observableFactory); return create(OperationDefer.defer(() -> (Observable) _f.call())); } /** * Returns an Observable that notifies an {@link Observer} of a single value and then completes. *

* To convert any object into an Observable that emits that object, pass that object into the just method. *

* This is similar to the {@link #toObservable} method, except that toObservable will convert * an {@link Iterable} object into an Observable that emits each of the items in the {@link Iterable}, one * at a time, while the just method would convert the {@link Iterable} into an Observable * that emits the entire {@link Iterable} as a single item. *

* * * @param value * the value to pass to the Observer's onNext method * @param * the type of the value * @return an Observable that notifies an {@link Observer} of a single value and then completes */ public static Observable just(T value) { List list = new ArrayList<>(); list.add(value); return toObservable(list); } /** * Returns the last element of an observable sequence with a specified source. * * @param that * the source Observable * @return the last element in the observable sequence. */ public static T last(final Observable that) { T result = null; for (T value : that.toIterable()) { result = value; } return result; } /** * Returns the last element of an observable sequence that matches the predicate. * * @param that * the source Observable * @param predicate * a predicate function to evaluate for elements in the sequence. * @return the last element in the observable sequence. */ public static T last(final Observable that, final Func1 predicate) { return last(that.filter(predicate)); } /** * Returns the last element of an observable sequence that matches the predicate. * * @param that * the source Observable * @param predicate * a predicate function to evaluate for elements in the sequence. * @return the last element in the observable sequence. */ public static T last(final Observable that, final Object predicate) { return last(that.filter(predicate)); } /** * Returns the last element of an observable sequence, or a default value if no value is found. * * @param source * the source observable. * @param defaultValue * a default value that would be returned if observable is empty. * @param * the type of source. * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. */ public static T lastOrDefault(Observable source, T defaultValue) { boolean found = false; T result = null; for (T value : source.toIterable()) { found = true; result = value; } if (!found) { return defaultValue; } return result; } /** * Returns the last element of an observable sequence that matches the predicate, or a default value if no value is found. * * @param source * the source observable. * @param defaultValue * a default value that would be returned if observable is empty. * @param predicate * a predicate function to evaluate for elements in the sequence. * @param * the type of source. * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. */ public static T lastOrDefault(Observable source, T defaultValue, Func1 predicate) { return lastOrDefault(source.filter(predicate), defaultValue); } /** * Returns the last element of an observable sequence that matches the predicate, or a default value if no value is found. * * @param source * the source observable. * @param defaultValue * a default value that would be returned if observable is empty. * @param predicate * a predicate function to evaluate for elements in the sequence. * @param * the type of source. * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. */ public static T lastOrDefault(Observable source, T defaultValue, Object predicate) { final FuncN _f = Functions.from(predicate); return lastOrDefault(source, defaultValue, (T args) -> (Boolean) _f.call(args)); } /** * Applies a function of your choosing to every notification emitted by an Observable, and returns * this transformation as a new Observable sequence. *

* * * @param sequence * the source Observable * @param func * a function to apply to each item in the sequence emitted by the source Observable * @param * the type of items emitted by the the source Observable * @param * the type of items returned by map function * @return an Observable that is the result of applying the transformation function to each item * in the sequence emitted by the source Observable */ public static Observable map(Observable sequence, Func1 func) { return create(OperationMap.map(sequence, func)); } /** * Applies a function of your choosing to every notification emitted by an Observable, and returns * this transformation as a new Observable sequence. *

* * * @param sequence * the source Observable * @param func * a function to apply to each item in the sequence emitted by the source Observable * @param * the type of items emitted by the the source Observable * @param * the type of items returned by map function * @return an Observable that is the result of applying the transformation function to each item * in the sequence emitted by the source Observable */ public static Observable map(Observable sequence, final Object func) { final FuncN _f = Functions.from(func); return map(sequence, (T t1) -> (R) _f.call(t1)); } /** * Creates a new Observable sequence by applying a function that you supply to each object in the * original Observable sequence, where that function is itself an Observable that emits objects, * and then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* * * @param sequence * the source Observable * @param func * a function to apply to each item emitted by the source Observable, generating a * Observable * @param * the type emitted by the source Observable * @param * the type emitted by the Observables emitted by func * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item emitted by the source Observable and merging the results of * the Observables obtained from this transformation */ public static Observable mapMany(Observable sequence, Func1> func) { return create(OperationMap.mapMany(sequence, func)); } /** * Creates a new Observable sequence by applying a function that you supply to each object in the * original Observable sequence, where that function is itself an Observable that emits objects, * and then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* * * @param sequence * the source Observable * @param func * a function to apply to each item emitted by the source Observable, generating a * Observable * @param * the type emitted by the source Observable * @param * the type emitted by the Observables emitted by func * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item emitted by the source Observable and merging the results of * the Observables obtained from this transformation */ public static Observable mapMany(Observable sequence, final Object func) { final FuncN _f = Functions.from(func); return mapMany(sequence, (T t1) -> (Observable) _f.call(t1)); } /** * Materializes the implicit notifications of an observable sequence as explicit notification values. *

* * * @param sequence * An observable sequence of elements to project. * @return An observable sequence whose elements are the result of materializing the notifications of the given sequence. * @see MSDN: Observable.Materialize */ public static Observable> materialize(final Observable sequence) { return create(OperationMaterialize.materialize(sequence)); } /** * Dematerializes the explicit notification values of an observable sequence as implicit notifications. * * @param sequence * An observable sequence containing explicit notification values which have to be turned into implicit notifications. * @return An observable sequence exhibiting the behavior corresponding to the source sequence's notification values. * @see MSDN: Observable.Dematerialize */ public static Observable dematerialize(final Observable> sequence) { return create(OperationDematerialize.dematerialize(sequence)); } /** * Flattens the Observable sequences from a list of Observables into one Observable sequence * without any transformation. You can combine the output of multiple Observables so that they * act like a single Observable, by using the merge method. *

* * * @param source * a list of Observables that emit sequences of items * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the source list of Observables * @see MSDN: Observable.Merge */ public static Observable merge(List> source) { return create(OperationMerge.merge(source)); } /** * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a * Observable into one Observable sequence without any transformation. You can combine the output * of multiple Observables so that they act like a single Observable, by using the merge method. *

* * * @param source * an Observable that emits Observables * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the Observables emitted by the source Observable * @see MSDN: Observable.Merge Method */ public static Observable merge(Observable> source) { return create(OperationMerge.merge(source)); } /** * Flattens the Observable sequences from a series of Observables into one Observable sequence * without any transformation. You can combine the output of multiple Observables so that they * act like a single Observable, by using the merge method. *

* * * @param source * a series of Observables that emit sequences of items * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the source Observables * @see MSDN: Observable.Merge Method */ public static Observable merge(Observable... source) { return create(OperationMerge.merge(source)); } /** * Returns the values from the source observable sequence until the other observable sequence produces a value. * * @param source * the source sequence to propagate elements for. * @param other * the observable sequence that terminates propagation of elements of the source sequence. * @param * the type of source. * @param * the other type. * @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation. */ public static Observable takeUntil(final Observable source, final Observable other) { return OperatorTakeUntil.takeUntil(source, other); } /** * Combines the objects emitted by two or more Observables, and emits the result as a single Observable, * by using the concat method. *

* * * @param source * a series of Observables that emit sequences of items * @return an Observable that emits a sequence of elements that are the result of combining the * output from the source Observables * @see MSDN: Observable.Concat Method */ public static Observable concat(Observable... source) { return create(OperationConcat.concat(source)); } /** * Emits the same objects as the given Observable, calling the given action * when it calls onComplete or onError. * * @param source * an observable * @param action * an action to be called when the source completes or errors. * @return an Observable that emits the same objects, then calls the action. * @see MSDN: Observable.Finally Method */ public static Observable finallyDo(Observable source, Action0 action) { return create(OperationFinally.finallyDo(source, action)); } /** * Groups the elements of an observable and selects the resulting elements by using a specified function. * * @param source * an observable whose elements to group. * @param keySelector * a function to extract the key for each element. * @param elementSelector * a function to map each source element to an element in an observable group. * @param * the key type. * @param * the source type. * @param * the resulting observable type. * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. */ public static Observable> groupBy(Observable source, final Func1 keySelector, final Func1 elementSelector) { return create(OperatorGroupBy.groupBy(source, keySelector, elementSelector)); } /** * Groups the elements of an observable according to a specified key selector function and * * @param source * an observable whose elements to group. * @param keySelector * a function to extract the key for each element. * @param * the key type. * @param * the source type. * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. */ public static Observable> groupBy(Observable source, final Func1 keySelector) { return create(OperatorGroupBy.groupBy(source, keySelector)); } /** * Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error. *

* Only the first onError received will be sent. *

* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented. *

* * * @param source * a list of Observables that emit sequences of items * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the source list of Observables * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(List> source) { return create(OperationMergeDelayError.mergeDelayError(source)); } /** * Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error. *

* Only the first onError received will be sent. *

* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented. *

* * * @param source * an Observable that emits Observables * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the Observables emitted by the source Observable * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(Observable> source) { return create(OperationMergeDelayError.mergeDelayError(source)); } /** * Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error. *

* Only the first onError received will be sent. *

* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented. *

* * * @param source * a series of Observables that emit sequences of items * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the source Observables * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(Observable... source) { return create(OperationMergeDelayError.mergeDelayError(source)); } /** * Returns an Observable that never sends any information to an {@link Observer}. * * This observable is useful primarily for testing purposes. * * @param * the type of item (not) emitted by the Observable * @return an Observable that never sends any information to an {@link Observer} */ public static Observable never() { return new NeverObservable<>(); } /** * Instruct an Observable to pass control to another Observable (the return value of a function) * rather than calling onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer, * the Observable calls its {@link Observer}'s onError function, and then quits without calling any more * of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a * function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method, * if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it * will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if * it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may * never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. *

* * * @param that * the source Observable * @param resumeFunction * a function that returns an Observable that will take over if the source Observable * encounters an error * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Func1> resumeFunction) { return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction)); } /** * Instruct an Observable to pass control to another Observable (the return value of a function) * rather than calling onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer, * the Observable calls its {@link Observer}'s onError function, and then quits without calling any more * of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a * function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method, * if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it * will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if * it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may * never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. *

* * * @param that * the source Observable * @param resumeFunction * a function that returns an Observable that will take over if the source Observable * encounters an error * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Object resumeFunction) { final FuncN _f = Functions.from(resumeFunction); return onErrorResumeNext(that, (Exception e) -> (Observable) _f.call(e)); } /** * Instruct an Observable to pass control to another Observable rather than calling onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer, * the Observable calls its {@link Observer}'s onError function, and then quits without calling any more * of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a * function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method, * if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it * will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if * it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may * never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. *

* * * @param that * the source Observable * @param resumeSequence * a function that returns an Observable that will take over if the source Observable * encounters an error * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Observable resumeSequence) { return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence)); } /** * Instruct an Observable to emit a particular item to its Observer's onNext function * rather than calling onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected item to its {@link Observer}, the Observable calls its {@link Observer}'s onError * function, and then quits * without calling any more of its {@link Observer}'s closures. The onErrorReturn method changes * this behavior. If you pass a function (resumeFunction) to an Observable's onErrorReturn * method, if the original Observable encounters an error, instead of calling its {@link Observer}'s * onError function, it will instead pass the return value of resumeFunction to the {@link Observer}'s onNext method. *

* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. * * @param that * the source Observable * @param resumeFunction * a function that returns a value that will be passed into an {@link Observer}'s onNext function if the Observable encounters an error that would * otherwise cause it to call onError * @return the source Observable, with its behavior modified as described */ public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) { return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction)); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject * method that does a similar operation on lists. *

* * * @param * the type item emitted by the source Observable * @param sequence * the source Observable * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be used in the next accumulator call (if applicable) * * @return an Observable that emits a single element that is the result of accumulating the * output from applying the accumulator to the sequence of items emitted by the source * Observable * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, Func2 accumulator) { return takeLast(create(OperationScan.scan(sequence, accumulator)), 1); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject * method that does a similar operation on lists. *

* * * @param * the type item emitted by the source Observable * @param sequence * the source Observable * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be used in the next accumulator call (if applicable) * * @return an Observable that emits a single element that is the result of accumulating the * output from applying the accumulator to the sequence of items emitted by the source * Observable * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(final Observable sequence, final Object accumulator) { final FuncN _f = Functions.from(accumulator); return reduce(sequence, (T t1, T t2) -> (T) _f.call(t1, t2)); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject * method that does a similar operation on lists. *

* * * @param * the type item emitted by the source Observable * @param sequence * the source Observable * @param initialValue * a seed passed into the first execution of the accumulator function * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be used in the next accumulator call (if applicable) * * @return an Observable that emits a single element that is the result of accumulating the * output from applying the accumulator to the sequence of items emitted by the source * Observable * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, T initialValue, Func2 accumulator) { return takeLast(create(OperationScan.scan(sequence, initialValue, accumulator)), 1); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject * method that does a similar operation on lists. *

* * * @param * the type item emitted by the source Observable * @param sequence * the source Observable * @param initialValue * a seed passed into the first execution of the accumulator function * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be used in the next accumulator call (if applicable) * @return an Observable that emits a single element that is the result of accumulating the * output from applying the accumulator to the sequence of items emitted by the source * Observable * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(final Observable sequence, final T initialValue, final Object accumulator) { final FuncN _f = Functions.from(accumulator); return reduce(sequence, initialValue, (T t1, T t2) -> (T) _f.call(t1, t2)); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the result of each of these iterations as its own sequence. *

* * * @param * the type item emitted by the source Observable * @param sequence * the source Observable * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be emitted and used in the next accumulator call (if applicable) * @return an Observable that emits a sequence of items that are the result of accumulating the * output from the sequence emitted by the source Observable * @see MSDN: Observable.Scan */ public static Observable scan(Observable sequence, Func2 accumulator) { return create(OperationScan.scan(sequence, accumulator)); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the result of each of these iterations as its own sequence. *

* * * @param * the type item emitted by the source Observable * @param sequence * the source Observable * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be emitted and used in the next accumulator call (if applicable) * @return an Observable that emits a sequence of items that are the result of accumulating the * output from the sequence emitted by the source Observable * @see MSDN: Observable.Scan */ public static Observable scan(final Observable sequence, final Object accumulator) { final FuncN _f = Functions.from(accumulator); return scan(sequence, (T t1, T t2) -> (T) _f.call(t1, t2)); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the result of each of these iterations as its own sequence. *

* * * @param * the type item emitted by the source Observable * @param sequence * the source Observable * @param initialValue * the initial (seed) accumulator value * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be emitted and used in the next accumulator call (if applicable) * @return an Observable that emits a sequence of items that are the result of accumulating the * output from the sequence emitted by the source Observable * @see MSDN: Observable.Scan */ public static Observable scan(Observable sequence, T initialValue, Func2 accumulator) { return create(OperationScan.scan(sequence, initialValue, accumulator)); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the result of each of these iterations as its own sequence. *

* * * @param * the type item emitted by the source Observable * @param sequence * the source Observable * @param initialValue * the initial (seed) accumulator value * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be emitted and used in the next accumulator call (if applicable) * @return an Observable that emits a sequence of items that are the result of accumulating the * output from the sequence emitted by the source Observable * @see MSDN: Observable.Scan */ public static Observable scan(final Observable sequence, final T initialValue, final Object accumulator) { final FuncN _f = Functions.from(accumulator); return scan(sequence, initialValue, (T t1, T t2) -> (T) _f.call(t1, t2)); } /** * Determines whether all elements of an observable sequence satisfies a condition. * * @param sequence * an observable sequence whose elements to apply the predicate to. * @param predicate * a function to test each element for a condition. * @param * the type of observable. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public static Observable all(final Observable sequence, final Func1 predicate) { return create(OperationAll.all(sequence, predicate)); } /** * Determines whether all elements of an observable sequence satisfies a condition. * * @param sequence * an observable sequence whose elements to apply the predicate to. * @param predicate * a function to test each element for a condition. * @param * the type of observable. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public static Observable all(final Observable sequence, Object predicate) { final FuncN _f = Functions.from(predicate); return all(sequence, (T t) -> (Boolean) _f.call(t)); } /** * Returns an Observable that skips the first num items emitted by the source * Observable. You can ignore the first num items emitted by an Observable and attend * only to those items that come after, by modifying the Observable with the skip method. *

* * * @param items * the source Observable * @param num * the number of items to skip * @return an Observable that emits the same sequence of items emitted by the source Observable, * except for the first num items * @see MSDN: Observable.Skip Method */ public static Observable skip(final Observable items, int num) { return create(OperationSkip.skip(items, num)); } /** * Accepts an Observable and wraps it in another Observable that ensures that the resulting * Observable is chronologically well-behaved. *

* A well-behaved observable ensures onNext, onCompleted, or onError calls to its subscribers are not interleaved, onCompleted and * onError are only called once respectively, and no * onNext calls follow onCompleted and onError calls. * * @param observable * the source Observable * @param * the type of item emitted by the source Observable * @return an Observable that is a chronologically well-behaved version of the source Observable */ public static Observable synchronize(Observable observable) { return create(OperationSynchronize.synchronize(observable)); } /** * Returns an Observable that emits the first num items emitted by the source * Observable. *

* You can choose to pay attention only to the first num values emitted by an Observable by calling its take method. This method returns an Observable that will call a * subscribing Observer's onNext function a * maximum of num times before calling onCompleted. *

* * * @param items * the source Observable * @param num * the number of items from the start of the sequence emitted by the source * Observable to emit * @return an Observable that only emits the first num items emitted by the source * Observable */ public static Observable take(final Observable items, final int num) { return create(OperationTake.take(items, num)); } /** * Returns an Observable that emits the last count items emitted by the source * Observable. * * @param items * the source Observable * @param count * the number of items from the end of the sequence emitted by the source * Observable to emit * @return an Observable that only emits the last count items emitted by the source * Observable */ public static Observable takeLast(final Observable items, final int count) { return create(OperationTakeLast.takeLast(items, count)); } /** * Returns a specified number of contiguous values from the start of an observable sequence. * * @param items * @param predicate * a function to test each source element for a condition * @return */ public static Observable takeWhile(final Observable items, Func1 predicate) { return create(OperationTakeWhile.takeWhile(items, predicate)); } /** * Returns a specified number of contiguous values from the start of an observable sequence. * * @param items * @param predicate * a function to test each source element for a condition * @return */ public static Observable takeWhile(final Observable items, Object predicate) { final FuncN _f = Functions.from(predicate); return takeWhile(items, (T t) -> (Boolean) _f.call(t)); } /** * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. * * @param items * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. * @return */ public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { return create(OperationTakeWhile.takeWhileWithIndex(items, predicate)); } public static Observable takeWhileWithIndex(final Observable items, Object predicate) { final FuncN _f = Functions.from(predicate); return create(OperationTakeWhile.takeWhileWithIndex(items, (T t, Integer integer) -> (Boolean) _f.call(t, integer))); } /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. *

* Normally, an Observable that returns multiple items will do so by calling its Observer's onNext function for each such item. You can change this behavior, instructing the * Observable * to * compose a list of all of these multiple items and * then to call the Observer's onNext function once, passing it the entire list, by calling the Observable object's toList method prior to calling its * subscribe * method. *

* * * @param that * the source Observable * @return an Observable that emits a single item: a List containing all of the * items emitted by the source Observable */ public static Observable> toList(final Observable that) { return create(OperationToObservableList.toObservableList(that)); } /** * Converts an observable sequence to an Iterable. * * @param that * the source Observable * @return Observable converted to Iterable. */ public static Iterable toIterable(final Observable that) { return new Iterable() { @Override public Iterator iterator() { return getIterator(that); } }; } /** * Returns an iterator that iterates all values of the observable. * * @param that * an observable sequence to get an iterator for. * @param * the type of source. * @return the iterator that could be used to iterate over the elements of the observable. */ public static Iterator getIterator(Observable that) { return OperatorToIterator.toIterator(that); } /** * Samples the next value (blocking without buffering) from in an observable sequence. * * @param items * the source observable sequence. * @param * the type of observable. * @return iterable that blocks upon each iteration until the next element in the observable source sequence becomes available. */ public static Iterable next(Observable items) { return OperationNext.next(items); } /** * Samples the most recent value in an observable sequence. * * @param source * the source observable sequence. * @param * the type of observable. * @param initialValue * the initial value that will be yielded by the enumerable sequence if no element has been sampled yet. * @return the iterable that returns the last sampled element upon each iteration. */ public static Iterable mostRecent(Observable source, T initialValue) { return OperationMostRecent.mostRecent(source, initialValue); } /** * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. * * @param that * the source Observable * @return The single element in the observable sequence. * @throws IllegalStateException * if there is not exactly one element in the observable sequence */ public static T single(Observable that) { return singleOrDefault(that, false, null); } /** * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. * * @param that * the source Observable * @param predicate * A predicate function to evaluate for elements in the sequence. * @return The single element in the observable sequence. * @throws IllegalStateException * if there is not exactly one element in the observable sequence that matches the predicate */ public static T single(Observable that, Func1 predicate) { return single(that.filter(predicate)); } /** * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. * * @param that * the source Observable * @param predicate * A predicate function to evaluate for elements in the sequence. * @return The single element in the observable sequence. * @throws IllegalStateException * if there is not exactly one element in the observable sequence that matches the predicate */ public static T single(Observable that, Object predicate) { final FuncN _f = Functions.from(predicate); return single(that, (T t) -> (Boolean) _f.call(t)); } /** * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. * * @param that * the source Observable * @param defaultValue * default value for a sequence. * @return The single element in the observable sequence, or a default value if no value is found. */ public static T singleOrDefault(Observable that, T defaultValue) { return singleOrDefault(that, true, defaultValue); } /** * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. * * @param that * the source Observable * @param defaultValue * default value for a sequence. * @param predicate * A predicate function to evaluate for elements in the sequence. * @return The single element in the observable sequence, or a default value if no value is found. */ public static T singleOrDefault(Observable that, T defaultValue, Func1 predicate) { return singleOrDefault(that.filter(predicate), defaultValue); } /** * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. * * @param that * the source Observable * @param defaultValue * default value for a sequence. * @param predicate * A predicate function to evaluate for elements in the sequence. * @return The single element in the observable sequence, or a default value if no value is found. */ public static T singleOrDefault(Observable that, T defaultValue, Object predicate) { final FuncN _f = Functions.from(predicate); return singleOrDefault(that, defaultValue, (T t) -> (Boolean) _f.call(t)); } private static T singleOrDefault(Observable that, boolean hasDefault, T defaultVal) { Iterator it = that.toIterable().iterator(); if (!it.hasNext()) { if (hasDefault) { return defaultVal; } throw new IllegalStateException("Expected single entry. Actually empty stream."); } T result = it.next(); if (it.hasNext()) { throw new IllegalStateException("Expected single entry. Actually more than one entry."); } return result; } /** * Converts an Iterable sequence to an Observable sequence. * * Any object that supports the Iterable interface can be converted into an Observable that emits * each iterable item in the object, by passing the object into the toObservable method. *

* * * @param iterable * the source Iterable sequence * @param * the type of items in the iterable sequence and the type emitted by the resulting * Observable * @return an Observable that emits each item in the source Iterable sequence */ public static Observable toObservable(Iterable iterable) { return create(OperationToObservableIterable.toObservableIterable(iterable)); } /** * Converts an Future to an Observable sequence. * * Any object that supports the {@link Future} interface can be converted into an Observable that emits * the return value of the get() method in the object, by passing the object into the toObservable method. *

* This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. * * @param future * the source {@link Future} * @param * the type of of object that the future's returns and the type emitted by the resulting * Observable * @return an Observable that emits the item from the source Future */ public static Observable toObservable(Future future) { return create(OperationToObservableFuture.toObservableFuture(future)); } /** * Converts an Future to an Observable sequence. * * Any object that supports the {@link Future} interface can be converted into an Observable that emits * the return value of the get() method in the object, by passing the object into the toObservable method. * The subscribe method on this synchronously so the Subscription returned doesn't nothing. *

* This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. * * @param future * the source {@link Future} * @param timeout * the maximum time to wait * @param unit * the time unit of the time argument * @param * the type of of object that the future's returns and the type emitted by the resulting * Observable * @return an Observable that emits the item from the source Future */ public static Observable toObservable(Future future, long timeout, TimeUnit unit) { return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); } /** * Converts an Array sequence to an Observable sequence. * * An Array can be converted into an Observable that emits each item in the Array, by passing the * Array into the toObservable method. *

* * * @param items * the source Array * @param * the type of items in the Array, and the type of items emitted by the resulting * Observable * @return an Observable that emits each item in the source Array */ public static Observable toObservable(T... items) { return toObservable(Arrays.asList(items)); } /** * Sort T objects by their natural order (object must implement Comparable). *

* * * @param sequence * @throws ClassCastException * if T objects do not implement Comparable * @return */ public static Observable> toSortedList(Observable sequence) { return create(OperationToObservableSortedList.toSortedList(sequence)); } /** * Sort T objects using the defined sort function. *

* * * @param sequence * @param sortFunction * @return */ public static Observable> toSortedList(Observable sequence, Func2 sortFunction) { return create(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); } /** * Sort T objects using the defined sort function. *

* * * @param sequence * @param sortFunction * @return */ public static Observable> toSortedList(Observable sequence, final Object sortFunction) { final FuncN _f = Functions.from(sortFunction); return create(OperationToObservableSortedList.toSortedList(sequence, (T t1, T t2) -> (Integer) _f.call(t1, t2))); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by two other Observables, with the results of this function becoming the * sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0 * and the first item emitted by w1; the * second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0 and the second item emitted by w1; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param reduceFunction * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Func2 reduceFunction) { return create(OperationZip.zip(w0, w1, reduceFunction)); } /** * Determines whether two sequences are equal by comparing the elements pairwise. * * @param first * observable to compare * @param second * observable to compare * @param * type of sequence * @return sequence of booleans, true if two sequences are equal by comparing the elements pairwise; otherwise, false. */ public static Observable sequenceEqual(Observable first, Observable second) { return sequenceEqual(first, second, Object::equals); } /** * Determines whether two sequences are equal by comparing the elements pairwise using a specified equality function. * * @param first * observable sequence to compare * @param second * observable sequence to compare * @param equality * a function used to compare elements of both sequences * @param * type of sequence * @return sequence of booleans, true if two sequences are equal by comparing the elements pairwise; otherwise, false. */ public static Observable sequenceEqual(Observable first, Observable second, Func2 equality) { return zip(first, second, equality); } /** * Determines whether two sequences are equal by comparing the elements pairwise using a specified equality function. * * @param first * observable sequence to compare * @param second * observable sequence to compare * @param equality * a function used to compare elements of both sequences * @param * type of sequence * @return sequence of booleans, true if two sequences are equal by comparing the elements pairwise; otherwise, false. */ public static Observable sequenceEqual(Observable first, Observable second, Object equality) { return zip(first, second, equality); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by two other Observables, with the results of this function becoming the * sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0 * and the first item emitted by w1; the * second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0 and the second item emitted by w1; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param function * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, final Object function) { final FuncN _f = Functions.from(function); return zip(w0, w1, (T0 t0, T1 t1) -> (R) _f.call(t0, t1)); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by three other Observables, with the results of this function becoming * the sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0, * the first item emitted by w1, and the * first item emitted by w2; the second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0, the second item * emitted by w1, and the second item * emitted by w2; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param w2 * a third source Observable * @param function * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Func3 function) { return create(OperationZip.zip(w0, w1, w2, function)); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by three other Observables, with the results of this function becoming * the sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0, * the first item emitted by w1, and the * first item emitted by w2; the second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0, the second item * emitted by w1, and the second item * emitted by w2; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param w2 * a third source Observable * @param function * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, final Object function) { final FuncN _f = Functions.from(function); return zip(w0, w1, w2, (T0 t0, T1 t1, T2 t2) -> (R) _f.call(t0, t1, t2)); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by four other Observables, with the results of this function becoming * the sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0, * the first item emitted by w1, the * first item emitted by w2, and the first item emitted by w3; the second item emitted by the new Observable will be the result of the function applied to the second item * emitted by each of those Observables; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param w2 * a third source Observable * @param w3 * a fourth source Observable * @param reduceFunction * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, Func4 reduceFunction) { return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction)); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by four other Observables, with the results of this function becoming * the sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0, * the first item emitted by w1, the * first item emitted by w2, and the first item emitted by w3; the second item emitted by the new Observable will be the result of the function applied to the second item * emitted by each of those Observables; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param w2 * a third source Observable * @param w3 * a fourth source Observable * @param function * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, final Object function) { final FuncN _f = Functions.from(function); return zip(w0, w1, w2, w3, (T0 t0, T1 t1, T2 t2, T3 t3) -> (R) _f.call(t0, t1, t2, t3)); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param predicate * a function that evaluates the items emitted by the source Observable, returning * true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter * evaluates as true */ public Observable filter(Func1 predicate) { return filter(this, predicate); } /** * Registers an action to be called when this observable calls * onComplete or onError. * * @param action * an action to be called when this observable completes or errors. * @return an Observable that emits the same objects as this observable, then calls the action. * @see MSDN: Observable.Finally Method */ public Observable finallyDo(Action0 action) { return create(OperationFinally.finallyDo(this, action)); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param callback * a function that evaluates the items emitted by the source Observable, returning * true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter * evaluates as "true" */ public Observable filter(final Object callback) { final FuncN _f = Functions.from(callback); return filter(this, (T t1) -> (Boolean) _f.call(t1)); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param predicate * a function that evaluates the items emitted by the source Observable, returning * true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter * evaluates as true */ public Observable where(Func1 predicate) { return where(this, predicate); } public final Observable flatMap(Func1> func) { return merge(map(func)); } /** * Returns the last element of an observable sequence with a specified source. * * @return the last element in the observable sequence. */ public T last() { return last(this); } /** * Returns the last element of an observable sequence that matches the predicate. * * @param predicate * a predicate function to evaluate for elements in the sequence. * @return the last element in the observable sequence. */ public T last(final Func1 predicate) { return last(this, predicate); } /** * Returns the last element of an observable sequence that matches the predicate. * * @param predicate * a predicate function to evaluate for elements in the sequence. * @return the last element in the observable sequence. */ public T last(final Object predicate) { final FuncN _f = Functions.from(predicate); return last(this, (T args) -> (Boolean) _f.call(args)); } /** * Returns the last element, or a default value if no value is found. * * @param defaultValue * a default value that would be returned if observable is empty. * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. */ public T lastOrDefault(T defaultValue) { return lastOrDefault(this, defaultValue); } /** * Returns the last element that matches the predicate, or a default value if no value is found. * * @param defaultValue * a default value that would be returned if observable is empty. * @param predicate * a predicate function to evaluate for elements in the sequence. * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. */ public T lastOrDefault(T defaultValue, Func1 predicate) { return lastOrDefault(this, defaultValue, predicate); } /** * Returns the last element that matches the predicate, or a default value if no value is found. * * @param defaultValue * a default value that would be returned if observable is empty. * @param predicate * a predicate function to evaluate for elements in the sequence. * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. */ public T lastOrDefault(T defaultValue, Object predicate) { return lastOrDefault(this, defaultValue, predicate); } /** * Applies a function of your choosing to every item emitted by an Observable, and returns this * transformation as a new Observable sequence. *

* * * @param func * a function to apply to each item in the sequence. * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item in the sequence emitted by the input Observable. */ public Observable map(Func1 func) { return map(this, func); } /** * Applies a function of your choosing to every item emitted by an Observable, and returns this * transformation as a new Observable sequence. *

* * * @param callback * a function to apply to each item in the sequence. * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item in the sequence emitted by the input Observable. */ public Observable map(final Object callback) { final FuncN _f = Functions.from(callback); return map(this, (T t1) -> (R) _f.call(t1)); } /** * Creates a new Observable sequence by applying a function that you supply to each item in the * original Observable sequence, where that function is itself an Observable that emits items, and * then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* * * @param func * a function to apply to each item in the sequence, that returns an Observable. * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item in the input sequence and merging the results of the * Observables obtained from this transformation. */ public Observable mapMany(Func1> func) { return mapMany(this, func); } /** * Creates a new Observable sequence by applying a function that you supply to each item in the * original Observable sequence, where that function is itself an Observable that emits items, and * then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* * * @param callback * a function to apply to each item in the sequence that returns an Observable. * @return an Observable that emits a sequence that is the result of applying the transformation' * function to each item in the input sequence and merging the results of the * Observables obtained from this transformation. */ public Observable mapMany(final Object callback) { final FuncN _f = Functions.from(callback); return mapMany(this, (T t1) -> (Observable) _f.call(t1)); } /** * Materializes the implicit notifications of this observable sequence as explicit notification values. *

* * * @return An observable sequence whose elements are the result of materializing the notifications of the given sequence. * @see MSDN: Observable.materialize */ public Observable> materialize() { return materialize(this); } /** * Asynchronously subscribes and unsubscribes observers on the specified scheduler. * * @param scheduler * the scheduler to perform subscription and unsubscription actions on. * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. */ public Observable subscribeOn(Scheduler scheduler) { return subscribeOn(this, scheduler); } /** * Asynchronously notify observers on the specified scheduler. * * @param scheduler * the scheduler to notify observers on. * @return the source sequence whose observations happen on the specified scheduler. */ public Observable observeOn(Scheduler scheduler) { return observeOn(this, scheduler); } /** * Dematerializes the explicit notification values of an observable sequence as implicit notifications. * * @return An observable sequence exhibiting the behavior corresponding to the source sequence's notification values. * @see MSDN: Observable.dematerialize * @throws Exception * if attempted on Observable not of type {@code Observable>}. */ public Observable dematerialize() { return dematerialize((Observable>) this); } /** * Instruct an Observable to pass control to another Observable rather than calling onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected * item to its Observer, the Observable calls its Observer's onError function, and * then quits without calling any more of its Observer's closures. The * onErrorResumeNext method changes this behavior. If you pass another Observable * (resumeFunction) to an Observable's onErrorResumeNext method, if the * original Observable encounters an error, instead of calling its Observer's * onErrort function, it will instead relinquish control to * resumeFunction which will call the Observer's onNext method if it * is able to do so. In such a case, because no Observable necessarily invokes * onError, the Observer may never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. *

* * * @param resumeFunction * @return the original Observable, with appropriately modified behavior */ public Observable onErrorResumeNext(final Func1> resumeFunction) { return onErrorResumeNext(this, resumeFunction); } /** * Instruct an Observable to emit a particular item rather than calling onError if * it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected * item to its Observer, the Observable calls its Observer's onError function, and * then quits without calling any more of its Observer's closures. The * onErrorResumeNext method changes this behavior. If you pass another Observable * (resumeFunction) to an Observable's onErrorResumeNext method, if the * original Observable encounters an error, instead of calling its Observer's * onError function, it will instead relinquish control to * resumeFunction which will call the Observer's onNext method if it * is able to do so. In such a case, because no Observable necessarily invokes * onError, the Observer may never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. *

* * * @param resumeFunction * @return the original Observable with appropriately modified behavior */ public Observable onErrorResumeNext(final Object resumeFunction) { final FuncN _f = Functions.from(resumeFunction); return onErrorResumeNext(this, (Exception e) -> (Observable) _f.call(e)); } /** * Instruct an Observable to pass control to another Observable rather than calling * onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected * item to its Observer, the Observable calls its Observer's onError function, and * then quits without calling any more of its Observer's closures. The * onErrorResumeNext method changes this behavior. If you pass another Observable * (resumeSequence) to an Observable's onErrorResumeNext method, if the * original Observable encounters an error, instead of calling its Observer's * onError function, it will instead relinquish control to * resumeSequence which will call the Observer's onNext method if it * is able to do so. In such a case, because no Observable necessarily invokes * onError, the Observer may never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. *

* * * @param resumeSequence * @return the original Observable, with appropriately modified behavior */ public Observable onErrorResumeNext(final Observable resumeSequence) { return onErrorResumeNext(this, resumeSequence); } /** * Instruct an Observable to emit a particular item rather than calling onError if * it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected * object to its Observer, the Observable calls its Observer's onError function, and * then quits without calling any more of its Observer's closures. The * onErrorReturn method changes this behavior. If you pass a function * (resumeFunction) to an Observable's onErrorReturn method, if the * original Observable encounters an error, instead of calling its Observer's * onError function, it will instead call pass the return value of * resumeFunction to the Observer's onNext method. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. * * @param resumeFunction * @return the original Observable with appropriately modified behavior */ public Observable onErrorReturn(Func1 resumeFunction) { return onErrorReturn(this, resumeFunction); } /** * Instruct an Observable to emit a particular item rather than calling onError if * it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected * object to its Observer, the Observable calls its Observer's onError function, and * then quits without calling any more of its Observer's closures. The * onErrorReturn method changes this behavior. If you pass a function * (resumeFunction) to an Observable's onErrorReturn method, if the * original Observable encounters an error, instead of calling its Observer's * onError function, it will instead call pass the return value of * resumeFunction to the Observer's onNext method. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. * * @param resumeFunction * @return the original Observable with appropriately modified behavior */ public Observable onErrorReturn(final Object resumeFunction) { final FuncN _f = Functions.from(resumeFunction); return onErrorReturn(this, (Exception e) -> (T) _f.call(e)); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," * "compress," or "inject" in other programming contexts. Groovy, for instance, has an * inject method that does a similar operation on lists. *

* * * @param accumulator * An accumulator function to be invoked on each element from the sequence, whose result * will be used in the next accumulator call (if applicable). * * @return An observable sequence with a single element from the result of accumulating the * output from the list of Observables. * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(Func2 accumulator) { return reduce(this, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," * "compress," or "inject" in other programming contexts. Groovy, for instance, has an * inject method that does a similar operation on lists. *

* * * @param accumulator * An accumulator function to be invoked on each element from the sequence, whose result * will be used in the next accumulator call (if applicable). * * @return an Observable that emits a single element from the result of accumulating the output * from the list of Observables. * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(Object accumulator) { return reduce(this, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," * "compress," or "inject" in other programming contexts. Groovy, for instance, has an * inject method that does a similar operation on lists. *

* * * @param initialValue * The initial (seed) accumulator value. * @param accumulator * An accumulator function to be invoked on each element from the sequence, whose * result will be used in the next accumulator call (if applicable). * * @return an Observable that emits a single element from the result of accumulating the output * from the list of Observables. * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(T initialValue, Func2 accumulator) { return reduce(this, initialValue, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," * "compress," or "inject" in other programming contexts. Groovy, for instance, has an * inject method that does a similar operation on lists. *

* * * @param initialValue * The initial (seed) accumulator value. * @param accumulator * An accumulator function to be invoked on each element from the sequence, whose * result will be used in the next accumulator call (if applicable). * @return an Observable that emits a single element from the result of accumulating the output * from the list of Observables. * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(T initialValue, Object accumulator) { return reduce(this, initialValue, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the result of each of these iterations. It emits the result of * each of these iterations as a sequence from the returned Observable. This sort of function is * sometimes called an accumulator. *

* * * @param accumulator * An accumulator function to be invoked on each element from the sequence whose * result will be sent via onNext and used in the next accumulator call * (if applicable). * @return an Observable sequence whose elements are the result of accumulating the output from * the list of Observables. * @see MSDN: Observable.Scan */ public Observable scan(Func2 accumulator) { return scan(this, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the result of each of these iterations. It emits the result of * each of these iterations as a sequence from the returned Observable. This sort of function is * sometimes called an accumulator. *

* * * @param accumulator * An accumulator function to be invoked on each element from the sequence whose * result will be sent via onNext and used in the next accumulator call * (if applicable). * * @return an Observable sequence whose elements are the result of accumulating the output from * the list of Observables. * @see MSDN: Observable.Scan */ public Observable scan(final Object accumulator) { return scan(this, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the result of each of these iterations. This sort of function is * sometimes called an accumulator. *

* * * @param initialValue * The initial (seed) accumulator value. * @param accumulator * An accumulator function to be invoked on each element from the sequence whose * result will be sent via onNext and used in the next accumulator call * (if applicable). * @return an Observable sequence whose elements are the result of accumulating the output from * the list of Observables. * @see MSDN: Observable.Scan */ public Observable scan(T initialValue, Func2 accumulator) { return scan(this, initialValue, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, then feeds the result of that function along with the * third item into the same function, and so on, emitting the result of each of these * iterations. This sort of function is sometimes called an accumulator. *

* * * @param initialValue * The initial (seed) accumulator value. * @param accumulator * An accumulator function to be invoked on each element from the sequence whose result * will be sent via onNext and used in the next accumulator call (if * applicable). * @return an Observable sequence whose elements are the result of accumulating the output from * the list of Observables. * @see MSDN: Observable.Scan */ public Observable scan(final T initialValue, final Object accumulator) { return scan(this, initialValue, accumulator); } /** * Determines whether all elements of an observable sequence satisfies a condition. * * @param predicate * a function to test each element for a condition. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public Observable all(Func1 predicate) { return all(this, predicate); } /** * Determines whether all elements of an observable sequence satisfies a condition. * * @param predicate * a function to test each element for a condition. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public Observable all(Object predicate) { return all(this, predicate); } /** * Returns an Observable that skips the first num items emitted by the source * Observable. * You can ignore the first num items emitted by an Observable and attend only to * those items that come after, by modifying the Observable with the skip method. *

* * * @param num * The number of items to skip * @return an Observable sequence that is identical to the source Observable except that it does * not emit the first num items from that sequence. */ public Observable skip(int num) { return skip(this, num); } /** * Returns an Observable that emits the first num items emitted by the source * Observable. * * You can choose to pay attention only to the first num values emitted by a * Observable by calling its take method. This method returns an Observable that will * call a subscribing Observer's onNext function a maximum of num times * before calling onCompleted. *

* * * @param num * @return an Observable that emits only the first num items from the source * Observable, or all of the items from the source Observable if that Observable emits * fewer than num items. */ public Observable take(final int num) { return take(this, num); } /** * Returns an Observable that items emitted by the source Observable as long as a specified condition is true. * * @param predicate * a function to test each source element for a condition * @return */ public Observable takeWhile(final Func1 predicate) { return takeWhile(this, predicate); } /** * Returns a specified number of contiguous values from the start of an observable sequence. * * @param predicate * a function to test each source element for a condition * @return */ public Observable takeWhile(final Object predicate) { return takeWhile(this, predicate); } /** * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. * * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. * @return */ public Observable takeWhileWithIndex(final Func2 predicate) { return takeWhileWithIndex(this, predicate); } /** * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. * * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. * @return */ public Observable takeWhileWithIndex(final Object predicate) { return takeWhileWithIndex(this, predicate); } /** * Returns an Observable that emits the last count items emitted by the source * Observable. * * @param count * the number of items from the end of the sequence emitted by the source * Observable to emit * @return an Observable that only emits the last count items emitted by the source * Observable */ public Observable takeLast(final int count) { return takeLast(this, count); } /** * Returns the values from the source observable sequence until the other observable sequence produces a value. * * @param other * the observable sequence that terminates propagation of elements of the source sequence. * @param * the other type. * @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation. */ public Observable takeUntil(Observable other) { return takeUntil(this, other); } /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. * * Normally, an Observable that returns multiple items will do so by calling its Observer's * onNext function for each such item. You can change this behavior, instructing * the Observable to compose a list of all of these multiple items and then to call the * Observer's onNext function once, passing it the entire list, by calling the * Observable object's toList method prior to calling its subscribe * method. *

* * * @return an Observable that emits a single item: a List containing all of the items emitted by * the source Observable. */ public Observable> toList() { return toList(this); } /** * Sort T objects by their natural order (object must implement Comparable). *

* * * @throws ClassCastException * if T objects do not implement Comparable * @return */ public Observable> toSortedList() { return toSortedList(this); } /** * Sort T objects using the defined sort function. *

* * * @param sortFunction * @return */ public Observable> toSortedList(Func2 sortFunction) { return toSortedList(this, sortFunction); } /** * Sort T objects using the defined sort function. *

* * * @param sortFunction * @return */ public Observable> toSortedList(final Object sortFunction) { return toSortedList(this, sortFunction); } /** * Converts an observable sequence to an Iterable. * * @return Observable converted to Iterable. */ public Iterable toIterable() { return toIterable(this); } @SuppressWarnings("unchecked") public Observable startWith(T... values) { return concat(Observable. from(values), this); } /** * Groups the elements of an observable and selects the resulting elements by using a specified function. * * @param keySelector * a function to extract the key for each element. * @param elementSelector * a function to map each source element to an element in an observable group. * @param * the key type. * @param * the resulting observable type. * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. */ public Observable> groupBy(final Func1 keySelector, final Func1 elementSelector) { return groupBy(this, keySelector, elementSelector); } /** * Groups the elements of an observable according to a specified key selector function and * * @param keySelector * a function to extract the key for each element. * @param * the key type. * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. */ public Observable> groupBy(final Func1 keySelector) { return groupBy(this, keySelector); } /** * Returns an iterator that iterates all values of the observable. * * @return the iterator that could be used to iterate over the elements of the observable. */ public Iterator getIterator() { return getIterator(this); } /** * Samples the next value (blocking without buffering) from in an observable sequence. * * @return iterable that blocks upon each iteration until the next element in the observable source sequence becomes available. */ public Iterable next() { return next(this); } /** * Samples the most recent value in an observable sequence. * * @param initialValue * the initial value that will be yielded by the enumerable sequence if no element has been sampled yet. * @return the iterable that returns the last sampled element upon each iteration. */ public Iterable mostRecent(T initialValue) { return mostRecent(this, initialValue); } /** * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. *

* For why this is being used see https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" * * NOTE: If strong reasons for not depending on package names comes up then the implementation of this method can change to looking for a marker interface. * * @param f * @return */ private boolean isInternalImplementation(Object o) { if (o == null) { return true; } // prevent double-wrapping (yeah it happens) if (o instanceof AtomicObserver) return true; // we treat the following package as "internal" and don't wrap it return o.getClass().getPackage().getName().startsWith("rx.operators"); } }