Limit Query not working with spring data couchbase 4.1.5

Hi

I have the following query written and when executing the similar one in web console, I would get only 1 result.
But the same one when put in code base returns more than one document. It looks like LIMIT in the query is not adhered. Kindly suggest a solution for the same.

    @Query("#{#n1ql.selectEntity} WHERE #{#n1ql.filter} AND `policyId` = $1 AND effectiveDateTime <= $2 ORDER BY effectiveDateTime DESC LIMIT 1")
    Mono<PolicySnapshot> findPolicySnapshotByPolicyIdAndEffectiveDateTime(String policyId, long effectiveDateTime);

Regards,
Venkat

Hi @sri_ram - I cannot reproduce this. Can you show the calling code and the exception? (There will be an exception for multiple entries in a Mono<>)

@Query("#{#n1ql.selectEntity} WHERE #{#n1ql.filter} and icao != $1 ORDER BY effectiveDateTime DESC LIMIT 1")
Mono<Airport> findPolicySnapshotByPolicyIdAndEffectiveDateTime(String policyId, long effectiveDateTime);

@Query("#{#n1ql.selectEntity} WHERE #{#n1ql.filter} ORDER BY effectiveDateTime DESC")
Flux<Airport> findPolicySnapshotAll();
Airport vie = new Airport(“airports::vie”, “vie”, “loww”);
airportRepository.save(vie).block();
airportRepository.save(vie.withId(UUID.randomUUID().toString())).block();
airportRepository.findAll().collectList().block(); // findAll has QueryScanConsistency;
Mono airport=airportRepository.findPolicySnapshotByPolicyIdAndEffectiveDateTime(“any”, 0);
System.out.println("------------------------------");
System.out.println(airport.block());
System.out.println("------------------------------");
Flux airports=airportRepository.findPolicySnapshotAll();
System.out.println(airports.collectList().block());
System.out.println("------------------------------");
2021-03-10 08:43:25,912 DEBUG .query.ReactiveStringBasedCouchbaseQuery:  86 - Created query 
------------------------------
{"id":"33e469e2-a323-47b3-809d-1d188c024f33","iata":"vie","icao":"loww","version":1615383418054311936}
------------------------------
2021-03-10 08:43:25,977 DEBUG .query.ReactiveStringBasedCouchbaseQuery:  86 - Created query 
[{"id":"33e469e2-a323-47b3-809d-1d188c024f33","iata":"vie","icao":"loww","version":1615383418054311936}, {id":"43e3580a-47c7-4d1e-b024-8f5ead4f5ea0","iata":"vie","icao":"loww","version":1615383738678837248}]
------------------------------

Thanks for reply, below is the caller method

public Mono<Policy> getPolicyByIdAndEffectiveDateTime(String policyId, Instant effectiveDateTime) {
    return repository.findPolicySnapshotByPolicyIdAndEffectiveDateTime(policyId, effectiveDateTime.toEpochMilli())
            .map(PolicySnapshot::getEntity)
            .doOnError(error ->
                    log.error("MSG='Exception happened while retrieving Policy by Id and effectiveDateTime', " +
                                    "policyId={}, effectiveDateTime={}",
                            policyId, effectiveDateTime, error));
}

And my repository is defined as below,

@Repository
public interface PolicySnapshotCBRepository extends ReactiveSortingRepository<PolicySnapshot, String> {

    @Query("#{#n1ql.selectEntity} WHERE #{#n1ql.filter} AND `policyId` = $1 AND effectiveDateTime <= $2 ORDER BY effectiveDateTime DESC LIMIT 1")
    Mono<PolicySnapshot> findPolicySnapshotByPolicyIdAndEffectiveDateTime(String policyId, long effectiveDateTime);
}

PFB error that we are receiving

2021-03-10 20:07:06.146 ERROR policy-api:local:INML71H8JG5J [fbf861385866730d,-,-] 86918 — [cb-io-http-2-2] c.t.a.p.s.PolicySnapshotRepositoryImpl : MSG=‘Exception happened while retrieving Policy by id’, policyId=570f526b-ec08-41e2-b9d5-a8635f4e5f6e

java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:136) ~[reactor-core-3.4.0.jar:3.4.0]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.0.jar:3.4.0]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250) ~[reactor-core-3.4.0.jar:3.4.0]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:478) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.EmitterProcessor.subscribe(EmitterProcessor.java:196) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Flux.subscribe(Flux.java:8095) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195) ~[reactor-core-3.4.0.jar:3.4.0]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.0.jar:3.4.0]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.0.jar:3.4.0]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130) ~[reactor-core-3.4.0.jar:3.4.0]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.0.jar:3.4.0]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130) ~[reactor-core-3.4.0.jar:3.4.0]
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
at com.couchbase.client.core.Reactor$SilentMonoCompletionStage.lambda$subscribe$0(Reactor.java:178) ~[core-io-2.0.11.jar:na]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[na:na]
at com.couchbase.client.core.msg.BaseRequest.succeed(BaseRequest.java:143) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.io.netty.chunk.ChunkedMessageHandler.completeInitialResponse(ChunkedMessageHandler.java:251) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.io.netty.chunk.ChunkedMessageHandler.handleHttpContent(ChunkedMessageHandler.java:238) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.io.netty.chunk.ChunkedMessageHandler.channelRead(ChunkedMessageHandler.java:187) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:381) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:211) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:289) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[core-io-2.0.11.jar:na]
at com.couchbase.client.core.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[core-io-2.0.11.jar:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Regards,
Venkat

I’m not able to reproduce this. Have you tried removing the map(PolicySnapshot::getEntity) call?

   		Mono<Airport> ap = getPolicyByIdAndEffectiveDateTime("x",Instant.now());
		System.out.println(ap.block());

	}
	public Mono<Airport> getPolicyByIdAndEffectiveDateTime(String policyId, Instant effectiveDateTime) {
		return airportRepository.findPolicySnapshotByPolicyIdAndEffectiveDateTime(policyId, effectiveDateTime.toEpochMilli())
				//.map(Airport::getEntity)
				.doOnError(error ->
						System.out.println("MSG='Exception happened while retrieving Policy by Id and effectiveDateTime', " +
										"policyId={}, effectiveDateTime={}"));
	}

2021-03-10 12:15:51,135 DEBUG .query.ReactiveStringBasedCouchbaseQuery:  86 - Created query 
------------------------------
{"id":"026b1dcb-1c41-4db4-aa41-df30afa0b6f1","iata":"iata","icao":"icao","version":1615395748752916480}
------------------------------
2021-03-10 12:15:51,199 DEBUG .query.ReactiveStringBasedCouchbaseQuery:  86 - Created query 
[{"id":"026b1dcb-1c41-4db4-aa41-df30afa0b6f1","iata":"iata","icao":"icao","version":1615395748752916480}, {"id":"02999d7b-455e-4b84-9c4c-32cd5eb03334","iata":"iata","icao":"icao","version":1615395748679647232}, 
------------------------------
2021-03-10 12:15:51,256 DEBUG .query.ReactiveStringBasedCouchbaseQuery:  86 - Created query 
{"id":"026b1dcb-1c41-4db4-aa41-df30afa0b6f1","iata":"iata","icao":"icao","version":1615395748752916480}  << ap.block()

Hi @mreiche

Many thanks for your response.

Apologies for the confusion, the reactor chain is not written properly from the controller and nothing to do with couchbase sdk. Its working fine now.

Regards,
Venkat