Need help with reactive document copy .. Getting OOM

Hi,

We are trying to copy data from one bucket to another using reactive programming(around 1 million documents). We are getting OOM in this piece of code. I am no rxjava expert and need help in order to prevent OOM. I think the read is happening faster than the write and that’s causing the OOM due to buffer getting full. The code is as follows:

    CountDownLatch countDownLatch5 = new CountDownLatch(1);
    Observable
            .from(n1qlKeysForDocsGPC)
            .flatMap(new Func1<String, Observable<JsonDocument>>() {
                @Override
                public Observable<JsonDocument> call(String key) {
                    return readPrimaryMainAsyncBucket
                            .get(key, 10, TimeUnit.SECONDS)
                            .onErrorResumeNext(readPrimaryMainAsyncBucket.get(key, 10, TimeUnit.SECONDS))
                            .retry(50)
                            .switchIfEmpty(Observable.empty())
                            .onErrorResumeNext(Observable.empty());
                }
            })
            .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
                @Override
                public Observable<JsonDocument> call(JsonDocument jsonDocument) {
                    return readPrimaryBackupAsyncBucket.upsert(jsonDocument, 10, TimeUnit.SECONDS).retry(50);
                }
            })
            .last()
            .doOnTerminate(new Action0() {
                @Override
                public void call() {
                    countDownLatch5.countDown();
                }
            })
            .subscribe();
    try {
        countDownLatch5.await();
        logger.info("DataRecoverySchedulers | Completed countDownLatch5");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

Thanks,
Himanshu

Hi Himanshu,

Looks like this question was cross-posted to Stack Overflow. Did Hod’s answer solve the problem for you?

Thanks,
David

Yes it did resolve the issue