package rx.subjects; import java.util.concurrent.ConcurrentHashMap; import rx.Observable; import rx.Observer; import rx.Subscription; import rx.util.AtomicObservableSubscription; import rx.util.SynchronizedObserver; import rx.util.functions.Func1; public class Subject extends Observable implements Observer { public static Subject create() { final ConcurrentHashMap> observers = new ConcurrentHashMap>(); Func1, Subscription> onSubscribe = new Func1, Subscription>() { @Override public Subscription call(Observer observer) { final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); subscription.wrap(new Subscription() { @Override public void unsubscribe() { // on unsubscribe remove it from the map of outbound observers to notify observers.remove(subscription); } }); // on subscribe add it to the map of outbound observers to notify observers.put(subscription, new SynchronizedObserver(observer, subscription)); return subscription; } }; return new Subject(onSubscribe, observers); } private final ConcurrentHashMap> observers; protected Subject(Func1, Subscription> onSubscribe, ConcurrentHashMap> observers) { super(onSubscribe); this.observers = observers; } @Override public void onCompleted() { for (Observer observer : observers.values()) { observer.onCompleted(); } } @Override public void onError(Exception e) { for (Observer observer : observers.values()) { observer.onError(e); } } @Override public void onNext(T args) { for (Observer observer : observers.values()) { observer.onNext(args); } } }