/** * Copyright 2013 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package rx.operators; import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import rx.Observable; import rx.Observer; import rx.Subscription; import rx.util.AtomicObservableSubscription; import rx.util.SynchronizedObserver; import rx.util.functions.Func1; public final class OperationMerge { /** * Flattens the observable sequences from the list of Observables into one * observable sequence without any transformation. * * @param source An observable sequence of elements to project. * * @return An observable sequence whose elements are the result of * flattening the output from the list of Observables. * * @see http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx */ public static Func1, Subscription> merge(final Observable> source) { // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. return new MergeObservable(source)::call; } public static Func1, Subscription> merge(final Observable... sequences) { return merge(Arrays.asList(sequences)); } public static Func1, Subscription> merge(final List> sequences) { return merge(Observable.create(new Func1>, Subscription>() { private volatile boolean unsubscribed = false; @Override public Subscription call(Observer> observer) { for (Observable o : sequences) if (!unsubscribed) observer.onNext(o); else // break out of the loop if we are unsubscribed break; if (!unsubscribed) observer.onCompleted(); return () -> { unsubscribed = true; }; } })); } /** * This class is NOT thread-safe if invoked and referenced multiple times. * In other words, don't subscribe to it multiple times from different * threads. *

* It IS thread-safe from within it while receiving onNext events from * multiple threads. *

* This should all be fine as long as it's kept as a private class and a new * instance created from static factory method above. *

* Note how the take() factory method above protects us from a single * instance being exposed with the Observable wrapper handling the subscribe * flow. * * @param */ private static final class MergeObservable implements Func1, Subscription> { private final Observable> sequences; private final MergeSubscription ourSubscription = new MergeSubscription(); private final AtomicBoolean stopped = new AtomicBoolean(false); private volatile boolean parentCompleted = false; private final ConcurrentHashMap childObservers = new ConcurrentHashMap<>(); private final ConcurrentHashMap childSubscriptions = new ConcurrentHashMap<>(); private MergeObservable(Observable> sequences) { this.sequences = sequences; } @Override public Subscription call(Observer actualObserver) { /** * We must synchronize a merge because we subscribe to multiple * sequences in parallel that will each be emitting. *

* The calls from each sequence must be serialized. *

* Bug report: https://github.com/Netflix/RxJava/issues/200 */ AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription); SynchronizedObserver synchronizedObserver = new SynchronizedObserver<>(actualObserver, subscription); /** * Subscribe to the parent Observable to get to the children * Observables */ sequences.subscribe(new ParentObserver(synchronizedObserver)); /* * return our subscription to allow unsubscribing */ return subscription; } /** * Manage the internal subscription with a thread-safe means of * stopping/unsubscribing so we don't unsubscribe twice. *

* Also has the stop() method returning a boolean so callers know if * their thread "won" and should perform further actions. */ private class MergeSubscription implements Subscription { @Override public void unsubscribe() { stop(); } public boolean stop() { // try setting to false unless another thread beat us boolean didSet = stopped.compareAndSet(false, true); if (didSet) { // this thread won the race to stop, so unsubscribe from the actualSubscription for (Subscription _s : childSubscriptions.values()) _s.unsubscribe(); return true; } else // another thread beat us return false; } } /** * Subscribe to the top level Observable to receive the sequence of * Observable children. * * @param */ private class ParentObserver implements Observer> { private final Observer actualObserver; public ParentObserver(Observer actualObserver) { this.actualObserver = actualObserver; } @Override public void onCompleted() { parentCompleted = true; // this *can* occur before the children are done, so if it does we won't send onCompleted // but will let the child worry about it // if however this completes and there are no children processing, then we will send onCompleted if (childObservers.isEmpty()) if (!stopped.get()) if (ourSubscription.stop()) actualObserver.onCompleted(); } @Override public void onError(Exception e) { actualObserver.onError(e); } @Override public void onNext(Observable childObservable) { if (stopped.get()) // we won't act on any further items return; if (childObservable == null) throw new IllegalArgumentException("Observable can not be null."); /** * For each child Observable we receive we'll subscribe with a * separate Observer that will each then forward their sequences * to the actualObserver. *

* We use separate child Observers for each sequence to simplify * the onComplete/onError handling so each sequence has its own * lifecycle. */ ChildObserver _w = new ChildObserver(actualObserver); childObservers.put(_w, _w); Subscription _subscription = childObservable.subscribe(_w); // remember this Observer and the subscription from it childSubscriptions.put(_w, _subscription); } } /** * Subscribe to each child Observable and forward their sequence of * data to the actualObserver * */ private class ChildObserver implements Observer { private final Observer actualObserver; public ChildObserver(Observer actualObserver) { this.actualObserver = actualObserver; } @Override public void onCompleted() { // remove self from map of Observers childObservers.remove(this); // if there are now 0 Observers left, so if the parent is also completed we send the onComplete to the actualObserver // if the parent is not complete that means there is another sequence (and child Observer) to come if (!stopped.get()) if (childObservers.isEmpty() && parentCompleted) if (ourSubscription.stop()) // this thread 'won' the race to unsubscribe/stop so let's send onCompleted actualObserver.onCompleted(); } @Override public void onError(Exception e) { if (!stopped.get()) if (ourSubscription.stop()) // this thread 'won' the race to unsubscribe/stop so let's send the error actualObserver.onError(e); } @Override public void onNext(T args) { // in case the Observable is poorly behaved and doesn't listen to the unsubscribe request // we'll ignore anything that comes in after we've unsubscribed if (!stopped.get()) actualObserver.onNext(args); } } } }