RxJavaのflatMap(mapper, combine)でリストデータをそれぞれ別スレッドで非同期処理する
例えばRSSのリストがあり、それを別々のスレッドで処理して全部の処理が終わったらUIを更新したいとする。
RxJavaのflatMap(mapper, combine)
を使えばリストのデータを1つ1つのObservable/Flowable
に変換して処理できる。
まず第一引数のmapper
部分でそれぞれのデータを受け取ってObservable/Flowable
を作る。
このときsubscribeOn(Schedulers.io())
などでスレッドを分けておかないと全て同じワーカースレッドで実行されてしまうので注意。
ArrayList<RSS> rssList = ... Flowable.fromIterable(rssList) .subscribeOn(Schedulers.io()) .flatMap(new Function<RSS, Publisher<? extends RSS>>() { @Override public Publisher<? extends RSS> apply(RSS rss) throws Exception { return Flowable.just(rss).subscribeOn(Schedulers.io()); } },
次に第二引数のcombine
部分で更新処理を行う
Flowable.fromIterable(rssList) .subscribeOn(Schedulers.io()) .flatMap(new Function<RSS, Publisher<? extends RSS>>() { ... }, new BiFunction<RSS, RSS, RSS>() { @Override public RSS apply(RSS rss, RSS rss2) throws Exception { Log.d("test", "BiFunction, Thread:" + Thread.currentThread().getName() + ", rss:" + rss2.title()); // RSSの更新処理 ... return rss2; } })
あとはUIスレッドで結果を受け取って処理をすればよい。
ArrayList<RSS> rssList = ... Flowable.fromIterable(rssList) .subscribeOn(Schedulers.io()) .flatMap(new Function<RSS, Publisher<? extends RSS>>() { ... }, new BiFunction<RSS, RSS, RSS>() { ... }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<RSS>() { @Override public void onSubscribe(Subscription s) { s.request(rssList.size()); } @Override public void onNext(RSS rss) { // 進捗更新 } @Override public void onError(Throwable t) { } @Override public void onComplete() { // 終了処理 } });
全体のコード
Java(ラムダなし)
ArrayList<RSS> rssList = ... Flowable.fromIterable(rssList) .subscribeOn(Schedulers.io()) .filter(new Predicate<RSS>() { @Override public boolean test(RSS rss) throws Exception { return rss.id() > 0; } }) .flatMap(new Function<RSS, Publisher<? extends RSS>>() { @Override public Publisher<? extends RSS> apply(RSS rss) throws Exception { return Flowable.just(rss).subscribeOn(Schedulers.io()); } }, new BiFunction<RSS, RSS, RSS>() { @Override public RSS apply(RSS rss, RSS rss2) throws Exception { Log.d("test", "BiFunction, Thread:" + Thread.currentThread().getName() + ", rss:" + rss2.title()); // RSSの更新処理 ... return rss2; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<RSS>() { @Override public void onSubscribe(Subscription s) { s.request(rssList.size()); } @Override public void onNext(RSS rss) { // 進捗更新 } @Override public void onError(Throwable t) { } @Override public void onComplete() { // 終了処理 } });
Java(ラムダあり)
ArrayList<RSS> rssList = ... Flowable.fromIterable(rssList) .subscribeOn(Schedulers.io()) .flatMap(rss -> Flowable.just(rss).subscribeOn(Schedulers.io()), (rss, rss2) -> { Log.d("test", "BiFunction, Thread:" + Thread.currentThread().getName() + ", rss:" + rss2.title()); // RSSの更新処理 ... return rss2; }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<RSS>() { @Override public void onSubscribe(Subscription s) { s.request(rssList.size()); } @Override public void onNext(RSS rss) { // 進捗更新 } @Override public void onError(Throwable t) { } @Override public void onComplete() { // 終了処理 } });
Kotlin
Flowable.fromIterable<RSS>(rsss) .subscribeOn(Schedulers.io()) .filter { rss -> rss.id > 0 } .flatMap({ data -> Flowable.just(data).subscribeOn(Schedulers.io()) }) { _, rss2 -> Log.d("test", "BiFunction, Thread:" + Thread.currentThread().name + ", rss:" + rss2.title) // RSSの更新処理 ... rss2 } .observeOn(AndroidSchedulers.mainThread()) .subscribe(object: Subscriber<RSS> { override fun onSubscribe(s: Subscription?) { s!!.request(rssList.size.toLong()) } override fun onNext(t: RSS?) { // 進捗更新 } override fun onError(t: Throwable?) { } override fun onComplete() { // 終了処理 } })