问题描述
我目前正在使用 rx-java 2,并且有一个用例,即单个 Camel Route 订阅者需要使用多个 Observable.使用此解决方案作为参考,我有一个部分可行的解决方案.RxJava - 随时接受更多 Observable 的合并 Observable?
I'm currently using rx-java 2 and have a use case where multiple Observables need to be consumed by single Camel Route subscriber. Using this solution as a reference, I have a partly working solution. RxJava - Merged Observable that accepts more Observables at any time?
我打算使用 PublishProcessor
我目前不知道如何使用 PublishProcessor 添加/管理 Flowable<T> 实例?我对 rx java 真的很陌生,所以任何帮助表示赞赏!这是我到目前为止所拥有的:
I'm planning to use a PublishProcessor<T> that will be subscribed to one camel reactive stream subscriber and then maintain a ConcurrentHashSet<Flowable<T>> where I can dynamically add new Observable.
I'm currently stuck on how can I add/manage Flowable<T> instances with PublishProcessor?
I'm really new to rx java, so any help is appreciated! This is what I have so far :
PublishProcessor<T> publishProcessor = PublishProcessor.create(); CamelReactiveStreamsService camelReactiveStreamsService = CamelReactiveStreams.get(camelContext); Subscriber<T> subscriber = camelReactiveStreamsService.streamSubscriber("t-class",T.class); } Set<Flowable<T>> flowableSet = Collections.newSetFromMap(new ConcurrentHashMap<Flowable<T>, Boolean>()); public void add(Flowable<T> flowableOrder){ flowableSet.add(flowableOrder); } public void subscribe(){ publishProcessor.flatMap(x -> flowableSet.forEach(// TODO) }) .subscribe(subscriber); }
推荐答案
您可以拥有一个 Processor 并订阅多个可观察流.您需要在添加和删除 observable 时通过添加和删除订阅来管理订阅.
You can have a single Processor and subscribe to more than one observable stream. You would need to manage the subscriptions by adding and removing them as you add and remove observables.
PublishProcessor<T> publishProcessor = PublishProcessor.create(); Map<Flowable<T>, Disposable> subscriptions = new ConcurrentHashMap<>(); void addObservable( Flowable<T> flowable ) { subscriptions.computeIfAbsent( flowable, fkey -> flowable.subscribe( publishProcessor ) ); } void removeObservable( Flowable<T> flowable ) { Disposable d = subscriptions.remove( flowable ); if ( d != null ) { d.dispose(); } } void close() { for ( Disposable d: subscriptions.values() ) { d.dispose(); } }
使用 flowable 作为地图的键,并添加或删除订阅.
Use the flowable as the key to the map, and add or remove subscriptions.