Bulk delete from Scala triggering "LEAK: ByteBuf.release() was not called..."

Hey all! Accessing CouchBase from Scala and have a point where I want to remove a bunch of documents. I have the documents handy, just need to get rid of them in a fast and efficient manner. To that end, enlisted the aid of rxScala and started using the following code (which works, just happens to emit a the LEAK message).

  private def purgeDocuments(documents: List[JsonDocument]): Unit = {
    Observable
      .from(documents.map(_.id))
      .map(id => {
        CouchbaseConnection.bucket.async.remove(id, PersistTo.MASTER)
      })
      .toList
      .toBlocking
      .single
  }

To get rxScala out of the picture, I also tried just walking the list per normal with the same log message.

  private def purgeDocuments(documents: List[JsonDocument]): Unit = {
    documents.foreach { d =>
      CouchbaseConnection.bucket.async.remove(d.id, PersistTo.MASTER)
    }
  }

Using the Java SDK v2.1.4, the code causes the following problem (used “-Dcom.couchbase.client.deps.io.netty.leakDetectionLevel=paranoid”) to get at the stack trace.

Interestingly, using version 2.2.4, the code silently does nothing.

2016-02-18 17:14:40 ERROR ResourceLeakDetector:171 - LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 0
Created at:
    com.couchbase.client.deps.io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:60)
    com.couchbase.client.deps.io.netty.buffer.Unpooled.compositeBuffer(Unpooled.java:359)
    com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.BinaryMemcacheObjectAggregator.decode(BinaryMemcacheObjectAggregator.java:66)
    com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.BinaryMemcacheObjectAggregator.decode(BinaryMemcacheObjectAggregator.java:39)
    com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
    com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:182)
    com.couchbase.client.deps.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
    com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    com.couchbase.client.deps.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
    com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    java.lang.Thread.run(Thread.java:745)

Is there an obvious fix for this? Should I be pursing another way to accomplish bulk deletes?

Hi,

You should flatMap instead of map in your RxScala snippet (the bucket.async().remove() returns an Observable ). Similarly in the “classical” version you should either remove the .async() or chain a toBlocking().single() at the end, in both cases reverting to blocking behavior.

It does something with 2.1.4, because SDK 2.1.x still triggered the operations as soon as the method on bucket was called. On the other hand, 2.2.x will defer actually performing the operation to the moment a Subscription is performed on the Observable (ie. someone is ready to consume). That has the added benefit of being compatible with operators likes retry().

With that change, I think 2.2.4 may not be subject to the leak. Can you try it and report back?

Awesome; thank you for the assist!

Working solution with no leak (using 2.2.4)…

  private def purgeDocuments(documents: List[JsonDocument]): Unit = {
    Observable
      .from(documents.map(_.id))
      .flatMap(id => {
        toScalaObservable(CouchbaseConnection.bucket.async.remove(id, PersistTo.MASTER))

      })
      .subscribe
  }