fixed crashing when removing mods
This commit is contained in:
@@ -9,29 +9,24 @@ import rx.util.SynchronizedObserver;
|
||||
import rx.util.functions.Func1;
|
||||
|
||||
public class Subject<T> extends Observable<T> implements Observer<T> {
|
||||
|
||||
public static <T> Subject<T> create() {
|
||||
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();
|
||||
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<>();
|
||||
|
||||
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
|
||||
@Override
|
||||
public Subscription call(Observer<T> observer) {
|
||||
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
|
||||
Func1<Observer<T>, Subscription> onSubscribe = observer -> {
|
||||
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
|
||||
|
||||
subscription.wrap(new Subscription() {
|
||||
@Override
|
||||
public void unsubscribe() {
|
||||
// on unsubscribe remove it from the map of outbound observers to notify
|
||||
observers.remove(subscription);
|
||||
}
|
||||
});
|
||||
subscription.wrap(() -> {
|
||||
// on unsubscribe remove it from the map of outbound observers to notify
|
||||
observers.remove(subscription);
|
||||
});
|
||||
|
||||
// on subscribe add it to the map of outbound observers to notify
|
||||
observers.put(subscription, new SynchronizedObserver<T>(observer, subscription));
|
||||
return subscription;
|
||||
}
|
||||
// on subscribe add it to the map of outbound observers to notify
|
||||
observers.put(subscription, new SynchronizedObserver<>(observer, subscription));
|
||||
return subscription;
|
||||
};
|
||||
|
||||
return new Subject<T>(onSubscribe, observers);
|
||||
return new Subject<>(onSubscribe, observers);
|
||||
}
|
||||
|
||||
private final ConcurrentHashMap<Subscription, Observer<T>> observers;
|
||||
@@ -43,23 +38,20 @@ public class Subject<T> extends Observable<T> implements Observer<T> {
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
for (Observer<T> observer : observers.values()) {
|
||||
for (Observer<T> observer : observers.values())
|
||||
observer.onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Exception e) {
|
||||
for (Observer<T> observer : observers.values()) {
|
||||
for (Observer<T> observer : observers.values())
|
||||
observer.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(T args) {
|
||||
for (Observer<T> observer : observers.values()) {
|
||||
for (Observer<T> observer : observers.values())
|
||||
observer.onNext(args);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user