getBulk memory leak?

I encapsulated getBulk method like this:

public Map<String, Object> getBulk(Collection<String> ids) {
	Observable<Map<String, Object>> asyncResult = Observable.from(ids)
			.flatMap(new Func1<String, Observable<BinaryDocument>>() {
			  @Override
			  public Observable<BinaryDocument> call(String id) {
				return bucket.async().get(id, BinaryDocument.class);
			  }
			})
			.toMap(new Func1<BinaryDocument, String>() {
					 @Override
					 public String call(BinaryDocument bd) {
					   return bd.id();
					 }
				   },
				new Func1<BinaryDocument, Object>() {
				  @Override
				  public Object call(BinaryDocument bd) {
					return bd.content();
				  }
				});

		Map<String, Object> syncResult = asyncResult.toBlocking().single();
		return syncResult;
}

When I used getBulk, I got the following log:

2017-04-13 20:05:11,835 cb-io-5-3 com.couchbase.client.deps.io.netty.util.ResourceLeakDetector [ERROR]: LEAK: ByteBuf.release() was not called before it’s garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information

I’m quite sure I haven’t call ReferenceCountUtil.release method in getBulk, since I wrote get method like this, the above log wouldn’t happen.

public byte[] get(String key) {
	BinaryDocument res = bucket.get(BinaryDocument.create(key));
	if (res == null)
		return null;
	byte[] data = new byte[res.content().capacity()];
	for (int i = 0; i < res.content().capacity(); ++i) {
		data[i] = res.content().getByte(i);
	}
	ReferenceCountUtil.release(res.content()); // quite important !!
	return data;
}

The problem is I don’t know how to add release call within getBulk method.
Please help me to write a safer getBulk without memory leak reported by log.
Thanks.

By the way, my java sdk version is 2.3.2

Yeah if you use the binary document its up to you to release the buffers, otherwise you leak memory and netty complains.

In your code you could turn it into a byte[] in your toMap function and store it as a byte[] instead of a ByteBuf? The other option is to pass it along as a ByteBuf but then make sure release the buffer once used.

Ah, you mean I process the BinaryDocument just the same way as I did in get method ?
That is, convert BinaryDocument to byte[] in toMap.call function and release bd.content at the same time.

If it’s not BinaryDocument, is it necessary to release manually ?

No, only with the BinaryDocument. All the others handle this for you. The BinaryDocument is designed to give you lowest level of access but it also means more duties :slight_smile:

Now I got. Thank you. :smile: