Hey all! Accessing CouchBase from Scala and have a point where I want to remove a bunch of documents. I have the documents handy, just need to get rid of them in a fast and efficient manner. To that end, enlisted the aid of rxScala and started using the following code (which works, just happens to emit a the LEAK message).
private def purgeDocuments(documents: List[JsonDocument]): Unit = {
Observable
.from(documents.map(_.id))
.map(id => {
CouchbaseConnection.bucket.async.remove(id, PersistTo.MASTER)
})
.toList
.toBlocking
.single
}
To get rxScala out of the picture, I also tried just walking the list per normal with the same log message.
private def purgeDocuments(documents: List[JsonDocument]): Unit = {
documents.foreach { d =>
CouchbaseConnection.bucket.async.remove(d.id, PersistTo.MASTER)
}
}
Using the Java SDK v2.1.4, the code causes the following problem (used “-Dcom.couchbase.client.deps.io.netty.leakDetectionLevel=paranoid”) to get at the stack trace.
Interestingly, using version 2.2.4, the code silently does nothing.
2016-02-18 17:14:40 ERROR ResourceLeakDetector:171 - LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 0
Created at:
com.couchbase.client.deps.io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:60)
com.couchbase.client.deps.io.netty.buffer.Unpooled.compositeBuffer(Unpooled.java:359)
com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.BinaryMemcacheObjectAggregator.decode(BinaryMemcacheObjectAggregator.java:66)
com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.BinaryMemcacheObjectAggregator.decode(BinaryMemcacheObjectAggregator.java:39)
com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:182)
com.couchbase.client.deps.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
com.couchbase.client.deps.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
java.lang.Thread.run(Thread.java:745)
Is there an obvious fix for this? Should I be pursing another way to accomplish bulk deletes?