AndroidとかiOSとかモバイル多め。その他技術的なことも書いていきます。

RxJavaのflatMap(mapper, combine)でリストデータをそれぞれ別スレッドで非同期処理する

例えばRSSのリストがあり、それを別々のスレッドで処理して全部の処理が終わったらUIを更新したいとする。 RxJavaのflatMap(mapper, combine)を使えばリストのデータを1つ1つのObservable/Flowableに変換して処理できる。

まず第一引数のmapper部分でそれぞれのデータを受け取ってObservable/Flowableを作る。 このときsubscribeOn(Schedulers.io())などでスレッドを分けておかないと全て同じワーカースレッドで実行されてしまうので注意。

ArrayList<RSS> rssList = ...
Flowable.fromIterable(rssList)
    .subscribeOn(Schedulers.io())
    .flatMap(new Function<RSS, Publisher<? extends RSS>>() {
                 @Override
                 public Publisher<? extends RSS> apply(RSS rss) throws Exception {
                    return Flowable.just(rss).subscribeOn(Schedulers.io());
                 }
             },

次に第二引数のcombine部分で更新処理を行う

Flowable.fromIterable(rssList)
    .subscribeOn(Schedulers.io())
    .flatMap(new Function<RSS, Publisher<? extends RSS>>() {
                 ...
             },
            new BiFunction<RSS, RSS, RSS>() {
                @Override
                public RSS apply(RSS rss, RSS rss2) throws Exception {
                    Log.d("test", "BiFunction, Thread:" + Thread.currentThread().getName() + ", rss:" + rss2.title());
                    // RSSの更新処理
                    ...
                    return rss2;
                }
            })

あとはUIスレッドで結果を受け取って処理をすればよい。

ArrayList<RSS> rssList = ...
Flowable.fromIterable(rssList)
    .subscribeOn(Schedulers.io())
    .flatMap(new Function<RSS, Publisher<? extends RSS>>() {
                 ...
             },
            new BiFunction<RSS, RSS, RSS>() {
                ...
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<RSS>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(rssList.size());
        }

        @Override
        public void onNext(RSS rss) {
            // 進捗更新 
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {
            // 終了処理
        }
    });

全体のコード

Java(ラムダなし)

ArrayList<RSS> rssList = ...
Flowable.fromIterable(rssList)
    .subscribeOn(Schedulers.io())
    .filter(new Predicate<RSS>() {
        @Override
        public boolean test(RSS rss) throws Exception {
            return rss.id() > 0;
        }
    })
    .flatMap(new Function<RSS, Publisher<? extends RSS>>() {
                 @Override
                 public Publisher<? extends RSS> apply(RSS rss) throws Exception {
                    return Flowable.just(rss).subscribeOn(Schedulers.io());
                 }
             },
            new BiFunction<RSS, RSS, RSS>() {
                @Override
                public RSS apply(RSS rss, RSS rss2) throws Exception {
                    Log.d("test", "BiFunction, Thread:" + Thread.currentThread().getName() + ", rss:" + rss2.title());
                    // RSSの更新処理
                    ...
                    return rss2;
                }
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<RSS>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(rssList.size());
        }

        @Override
        public void onNext(RSS rss) {
            // 進捗更新 
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {
              // 終了処理
        }
    });

Java(ラムダあり)

ArrayList<RSS> rssList = ...
Flowable.fromIterable(rssList)
    .subscribeOn(Schedulers.io())
    .flatMap(rss -> Flowable.just(rss).subscribeOn(Schedulers.io()),
    (rss, rss2) -> {
        Log.d("test", "BiFunction, Thread:" + Thread.currentThread().getName() + ", rss:" + rss2.title());
        // RSSの更新処理
             ...
        return rss2;
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<RSS>() {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(rssList.size());
        }
    
        @Override
        public void onNext(RSS rss) {
                // 進捗更新 
        }
    
        @Override
        public void onError(Throwable t) {
    
        }
    
        @Override
        public void onComplete() {
            // 終了処理
        }
    });

Kotlin

Flowable.fromIterable<RSS>(rsss)
    .subscribeOn(Schedulers.io())
    .filter { rss -> rss.id > 0 }
    .flatMap({ data -> Flowable.just(data).subscribeOn(Schedulers.io()) })
        { _, rss2 ->
        Log.d("test", "BiFunction, Thread:" + Thread.currentThread().name + ", rss:" + rss2.title)
        // RSSの更新処理
             ...
        rss2
    }
    .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object: Subscriber<RSS> {
            override fun onSubscribe(s: Subscription?) {
                s!!.request(rssList.size.toLong())
            }

            override fun onNext(t: RSS?) {
                // 進捗更新 
            }

            override fun onError(t: Throwable?) {
            }

            override fun onComplete() {
                // 終了処理
            }
    })