rxjava 实现的 rxbus
implemetns
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.4'
准备
public class MessageEvent {
private String text;
public MessageEvent(String text) {
this.text = text;
}
public String getText() {
return text;
}
}
核心代码
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
/**
* Created by HanHailong on 15/10/9.
*/
public class RxBus {
private static volatile RxBus mDefaultInstance;
private RxBus() {
}
public static RxBus getDefault() {
if (mDefaultInstance == null) {
synchronized (RxBus.class) {
if (mDefaultInstance == null) {
mDefaultInstance = new RxBus();
}
}
}
return mDefaultInstance;
}
private final Subject<Object> _bus = PublishSubject.create().toSerialized();
public void send(Object o) {
_bus.onNext(o);
}
public <T> Observable<T> toObservable(Class<T> classType) {
return _bus.ofType(classType);
}
}
接受事件
Disposable disposable = RxBus.getDefault().toObservable(MessageEvent.class).subscribe(new Consumer<MessageEvent>() {
@Override
public void accept(MessageEvent event) throws Exception {
Toast.makeText(MainActivity.this, TAG + event.getText(), Toast.LENGTH_LONG).show();
tv.setText(event.getText());
Log.d(TAG, "onMessageEvent: ");
}
});
发送事件
RxBus.getDefault().send(new MessageEvent(" hello rx bus"));
取消订阅
if(disposable != null && disposable.isDisposed()){
disposable.dispose();
}