Is it possible to access coucbbase Capella via DCP client

hey folks,

Is it possible to connect to Couchbase Capella via DCP client , if so kindly provide a sample code.

I get “Exception in thread “main” com.couchbase.client.dcp.error.BootstrapException: Could not connect to Cluster/Bucket” while using the below code

package org.example;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.StreamFrom;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.FlowControlMode;
import com.couchbase.client.dcp.highlevel.Mutation;
import com.couchbase.client.dcp.highlevel.StreamFailure;

/**

  • Hello world!

*/
public class App
{
public static void main( String args )
{
Client client = Client.builder().seedNodes(“couchbases://cb.s9ptsvomaiatw6q9.cloud.couchbase.com”)
.credentials(“admin”,“**”)
.collectionsAware(true)
.bucket(“travel-sample”)
.collectionNames(“inventory.airline”)
.flowControl(64 *1024 * 1024)
.build();
client.listener(new DatabaseChangeListener() {
@Override
public void onFailure(StreamFailure streamFailure) {

        }

        @Override
        public void onMutation(Mutation mutation) {
            System.out.println("m");
        }
    }, FlowControlMode.AUTOMATIC);

    client.connect().block();
    client.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();
    client.startStreaming().block();
}

}

is there anything different I wanted to do if iam connecting to capella from which I have done for couchbase server

Hi Karthikeyan. I think Capella is really great, and I’m glad you’re using it.

To connect to Capella using the DCP client, there are two additional steps:

  1. Connect using the individual node addresses instead of the cluster address.

The cluster address in the “Connect” tab is a DNS SRV address, which the DCP client cannot currently resolve (I’m working on improving this). As a workaround, use one or more node addresses from the “Nodes” tab instead. Make sure at least one of the nodes you use for this is running the “Data” service.

  1. Enable TLS and trust the Capella CA certificate.

If you’re reading this in the future, please get the CA certificate from the “Connect” tab in case it has changed. Here’s the certificate to use today:

// All Capella clusters use the same Certificate Authority.
private static final String capellaCaCert = "" +
    "-----BEGIN CERTIFICATE-----\n" +
    "MIIDFTCCAf2gAwIBAgIRANLVkgOvtaXiQJi0V6qeNtswDQYJKoZIhvcNAQELBQAw\n" +
    "JDESMBAGA1UECgwJQ291Y2hiYXNlMQ4wDAYDVQQLDAVDbG91ZDAeFw0xOTEyMDYy\n" +
    "MjEyNTlaFw0yOTEyMDYyMzEyNTlaMCQxEjAQBgNVBAoMCUNvdWNoYmFzZTEOMAwG\n" +
    "A1UECwwFQ2xvdWQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCfvOIi\n" +
    "enG4Dp+hJu9asdxEMRmH70hDyMXv5ZjBhbo39a42QwR59y/rC/sahLLQuNwqif85\n" +
    "Fod1DkqgO6Ng3vecSAwyYVkj5NKdycQu5tzsZkghlpSDAyI0xlIPSQjoORA/pCOU\n" +
    "WOpymA9dOjC1bo6rDyw0yWP2nFAI/KA4Z806XeqLREuB7292UnSsgFs4/5lqeil6\n" +
    "rL3ooAw/i0uxr/TQSaxi1l8t4iMt4/gU+W52+8Yol0JbXBTFX6itg62ppb/Eugmn\n" +
    "mQRMgL67ccZs7cJ9/A0wlXencX2ohZQOR3mtknfol3FH4+glQFn27Q4xBCzVkY9j\n" +
    "KQ20T1LgmGSngBInAgMBAAGjQjBAMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYE\n" +
    "FJQOBPvrkU2In1Sjoxt97Xy8+cKNMA4GA1UdDwEB/wQEAwIBhjANBgkqhkiG9w0B\n" +
    "AQsFAAOCAQEARgM6XwcXPLSpFdSf0w8PtpNGehmdWijPM3wHb7WZiS47iNen3oq8\n" +
    "m2mm6V3Z57wbboPpfI+VEzbhiDcFfVnK1CXMC0tkF3fnOG1BDDvwt4jU95vBiNjY\n" +
    "xdzlTP/Z+qr0cnVbGBSZ+fbXstSiRaaAVcqQyv3BRvBadKBkCyPwo+7svQnScQ5P\n" +
    "Js7HEHKVms5tZTgKIw1fbmgR2XHleah1AcANB+MAPBCcTgqurqr5G7W2aPSBLLGA\n" +
    "fRIiVzm7VFLc7kWbp7ENH39HVG6TZzKnfl9zJYeiklo5vQQhGSMhzBsO70z4RRzi\n" +
    "DPFAN/4qZAgD5q3AFNIq2WWADFQGSwVJhg==\n" +
    "-----END CERTIFICATE-----";

When building the client:

Client client = Client.builder()
    .securityConfig(SecurityConfig.builder()
        .enableTls(true)
        .trustCertificates(SecurityConfig.decodeCertificates(
            singletonList(capellaCaCert))
        )
    )
    // ...
    .build();

Oh, and because everybody always forgets: make sure you’ve added your IP address to the list of allowed IPs. From the “Connect” tab, click on “Managed Allowed IP” to edit the list.

Thanks,
David

2 Likes

hi David,
public class App
{
private static final String capellaCaCert = “” +
“-----BEGIN CERTIFICATE-----\n” +
“MIIDFTCCAf2gAwIBAgIRANLVkgOvtaXiQJi0V6qeNtswDQYJKoZIhvcNAQELBQAw\n” +
“JDESMBAGA1UECgwJQ291Y2hiYXNlMQ4wDAYDVQQLDAVDbG91ZDAeFw0xOTEyMDYy\n” +
“MjEyNTlaFw0yOTEyMDYyMzEyNTlaMCQxEjAQBgNVBAoMCUNvdWNoYmFzZTEOMAwG\n” +
“A1UECwwFQ2xvdWQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCfvOIi\n” +
“enG4Dp+hJu9asdxEMRmH70hDyMXv5ZjBhbo39a42QwR59y/rC/sahLLQuNwqif85\n” +
“Fod1DkqgO6Ng3vecSAwyYVkj5NKdycQu5tzsZkghlpSDAyI0xlIPSQjoORA/pCOU\n” +
“WOpymA9dOjC1bo6rDyw0yWP2nFAI/KA4Z806XeqLREuB7292UnSsgFs4/5lqeil6\n” +
“rL3ooAw/i0uxr/TQSaxi1l8t4iMt4/gU+W52+8Yol0JbXBTFX6itg62ppb/Eugmn\n” +
“mQRMgL67ccZs7cJ9/A0wlXencX2ohZQOR3mtknfol3FH4+glQFn27Q4xBCzVkY9j\n” +
“KQ20T1LgmGSngBInAgMBAAGjQjBAMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYE\n” +
“FJQOBPvrkU2In1Sjoxt97Xy8+cKNMA4GA1UdDwEB/wQEAwIBhjANBgkqhkiG9w0B\n” +
“AQsFAAOCAQEARgM6XwcXPLSpFdSf0w8PtpNGehmdWijPM3wHb7WZiS47iNen3oq8\n” +
“m2mm6V3Z57wbboPpfI+VEzbhiDcFfVnK1CXMC0tkF3fnOG1BDDvwt4jU95vBiNjY\n” +
“xdzlTP/Z+qr0cnVbGBSZ+fbXstSiRaaAVcqQyv3BRvBadKBkCyPwo+7svQnScQ5P\n” +
“Js7HEHKVms5tZTgKIw1fbmgR2XHleah1AcANB+MAPBCcTgqurqr5G7W2aPSBLLGA\n” +
“fRIiVzm7VFLc7kWbp7ENH39HVG6TZzKnfl9zJYeiklo5vQQhGSMhzBsO70z4RRzi\n” +
“DPFAN/4qZAgD5q3AFNIq2WWADFQGSwVJhg==\n” +
“-----END CERTIFICATE-----”;
public static void main( String args )
{
//cb.s9ptsvomaiatw6q9.cloud.couchbase.com

    Client client = Client.builder().seedNodes("aisgscfvptu1bftz.s9ptsvomaiatw6q9.cloud.couchbase.com")
            .securityConfig(SecurityConfig.builder()
                    .enableTls(true)
                    .trustCertificates(SecurityConfig.decodeCertificates(
                            singletonList(capellaCaCert))
                    )
            )
            .credentials("admin","00")
            .collectionsAware(true)
            .bucket("travel-sample")
            .collectionNames("inventory.airline")
            .flowControl(64 *1024 * 1024)
            .build();
    client.listener(new DatabaseChangeListener() {
        @Override
        public void onFailure(StreamFailure streamFailure) {

        }

        @Override
        public void onMutation(Mutation mutation) {
            System.out.println("m");
        }
    }, FlowControlMode.AUTOMATIC);

    client.connect().block();
    client.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();
    client.startStreaming().block();
}

}

still I get could not connect to cluster or bucket issue , I have added my ip to whitelisted too. and added my certificate too.

Is there anything to do with roles and permissions

Did you create a database user with permission to read the bucket?

hey David , I have created a credential with read and writer permission, iam able to use that credential to query data from cappella using java-client . but unable to connect to cappella using the same credential via DCP-client.

is there any documentation for connecting to cappella via java-client and DCP-client.

Select your cluster in the Capella admin web UI, then the “Connect” tab, then the “SDK Examples” (in the “Connection” section). You should see sample code for connecting to Capella using the various supported SDKs. If you have issues connecting using the Java SDK, take a look at Troubleshooting Cloud Connections.

The DCP client is unsupported and undocumented. If you’d like to share a screenshot of your database user account settings from the Capella web UI, the code (without the actual password), and the log output, I will take a look when I have a moment.

Thanks,
David

Sample Code

private static final String capellaCaCert = "-----BEGIN CERTIFICATE-----\n" +
            "MIIDFTCCAf2gAwIBAgIRANLVkgOvtaXiQJi0V6qeNtswDQYJKoZIhvcNAQELBQAw\n" +
            "JDESMBAGA1UECgwJQ291Y2hiYXNlMQ4wDAYDVQQLDAVDbG91ZDAeFw0xOTEyMDYy\n" +
            "MjEyNTlaFw0yOTEyMDYyMzEyNTlaMCQxEjAQBgNVBAoMCUNvdWNoYmFzZTEOMAwG\n" +
            "A1UECwwFQ2xvdWQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCfvOIi\n" +
            "enG4Dp+hJu9asdxEMRmH70hDyMXv5ZjBhbo39a42QwR59y/rC/sahLLQuNwqif85\n" +
            "Fod1DkqgO6Ng3vecSAwyYVkj5NKdycQu5tzsZkghlpSDAyI0xlIPSQjoORA/pCOU\n" +
            "WOpymA9dOjC1bo6rDyw0yWP2nFAI/KA4Z806XeqLREuB7292UnSsgFs4/5lqeil6\n" +
            "rL3ooAw/i0uxr/TQSaxi1l8t4iMt4/gU+W52+8Yol0JbXBTFX6itg62ppb/Eugmn\n" +
            "mQRMgL67ccZs7cJ9/A0wlXencX2ohZQOR3mtknfol3FH4+glQFn27Q4xBCzVkY9j\n" +
            "KQ20T1LgmGSngBInAgMBAAGjQjBAMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYE\n" +
            "FJQOBPvrkU2In1Sjoxt97Xy8+cKNMA4GA1UdDwEB/wQEAwIBhjANBgkqhkiG9w0B\n" +
            "AQsFAAOCAQEARgM6XwcXPLSpFdSf0w8PtpNGehmdWijPM3wHb7WZiS47iNen3oq8\n" +
            "m2mm6V3Z57wbboPpfI+VEzbhiDcFfVnK1CXMC0tkF3fnOG1BDDvwt4jU95vBiNjY\n" +
            "xdzlTP/Z+qr0cnVbGBSZ+fbXstSiRaaAVcqQyv3BRvBadKBkCyPwo+7svQnScQ5P\n" +
            "Js7HEHKVms5tZTgKIw1fbmgR2XHleah1AcANB+MAPBCcTgqurqr5G7W2aPSBLLGA\n" +
            "fRIiVzm7VFLc7kWbp7ENH39HVG6TZzKnfl9zJYeiklo5vQQhGSMhzBsO70z4RRzi\n" +
            "DPFAN/4qZAgD5q3AFNIq2WWADFQGSwVJhg==\n" +
            "-----END CERTIFICATE-----";
    public static void main( String[] args ) throws InterruptedException {
        //cb.s9ptsvomaiatw6q9.cloud.couchbase.com

        Client client = Client.builder()
                .securityConfig(SecurityConfig.builder()
                        .enableTls(true)
                        .trustCertificates(SecurityConfig.decodeCertificates(
                                singletonList(capellaCaCert))
                        )
                )
                .seedNodes("aisgscfvptu1bftz.s9ptsvomaiatw6q9.cloud.couchbase.com")
                .credentials("admin","")
                .collectionsAware(true)
                .bucket("travel-sample")
                .collectionNames("inventory.airline")
                .flowControl(64 *1024 * 1024)
                .build();

        client.listener(new DatabaseChangeListener() {
            @Override
            public void onFailure(StreamFailure streamFailure) {

            }

            @Override
            public void onMutation(Mutation mutation) {
                System.out.println("m");
            }
        }, FlowControlMode.AUTOMATIC);

        client.connect().block();
        client.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();
        client.startStreaming().block();
    }

The database Credentials are:

error log:
Exception in thread “main” com.couchbase.client.dcp.error.BootstrapException: Could not connect to Cluster/Bucket
at com.couchbase.client.dcp.Client.lambda$connect$3(Client.java:461)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
at reactor.core.publisher.MonoRunnable.subscribe(MonoRunnable.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.Mono.block(Mono.java:1730)
at org.example.App.main(App.java:73)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:139)
at reactor.core.publisher.Mono.block(Mono.java:1731)
… 1 more
Caused by: java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timed out after waiting 7000 MILLISECONDS for latch.
at com.couchbase.client.dcp.conductor.Conductor.lambda$await$2(Conductor.java:136)
at reactor.core.publisher.MonoRunnable.subscribe(MonoRunnable.java:49)
… 3 more
Caused by: java.util.concurrent.TimeoutException: Timed out after waiting 7000 MILLISECONDS for latch.
… 5 more

Process finished with exit code 1

kindly help in this regard

Hi @Karthikeyan

Thanks for providing this info.

Is there more to the log? I’m wondering what happened before the client gave up and reported failure.

Thanks,
David

I’m attaching the stack trace for ur reference David

Before we dig deeper, can you please triple-check that the IP address of the machine running the DCP client is present in the cluster’s set of “Allowed IPs” ?

yes David , it is present in Allowed IPs

for your info , iam able to connect with cappella using java-client and I’m able to query data.

this code works fine

  public static void main( String[] args )
    {


        Cluster cluster = Cluster.connect("couchbases://cb.s9ptsvomaiatw6q9.cloud.couchbase.com","admin","");
        Bucket bucket = cluster.bucket("travel-sample");
        bucket.waitUntilReady(Duration.ofSeconds(10));
        Collection collection = bucket.scope("inventory").collection("airline");
        System.out.println(collection.get("airline_10"));

    }

hey David,
Any information available on this regard.

Hi Karthikeyan,

I’m still scratching my head about what could be happening.

Please can you share a screenshot of the nodes page from the Capella admin UI?

It would also be good to make sure you have an SLF4J binding on your class path so you get log output from the DCP client. It would be useful to see what the DCP client is doing before that exception occurs.

Thanks,
David

Hi David,

Here are the logs you requested from the user. He posted it on another thread. Looping it back for you to further check. Permissions on free trial version of cappella

1 Like

Thanks Mark. The first lines of that log output (the ones that start with “SLF4J:”) indicate there is no SLF4J binding on the class path. @Karthikeyan You’ll need to add one so we can see what the DCP client is doing.

hey David,

I got some infos on this issue , I tried switching off the vpn and tried connecting to the dcp, I got connected after throwing 3-4 connectionTimeOutExcpetions . and sometimes it now connects even when vpn is turned on. I’m not just confused with this scenario.

is VPN actually matters anywhere in this issue? I’m sure I have whitelisted ip in cluster.

one more doubt is , I get connectiontimeout exception several time before it actually gets connected to dcp. I’m adding the log for your ref



CODE

        Client client = Client.builder()
                .securityConfig(SecurityConfig.builder()
                        .enableTls(true)
                        .trustCertificates(singletonList((X509Certificate) certificate)
                        )
                )
                .seedNodes("aisgscfvptu1bftz.s9ptsvomaiatw6q9.cloud.couchbase.com:11207")
                .credentials("admin6","")
                .collectionsAware(true)
                .bucket("travel-sample")
                .collectionNames("inventory.airline")
                .flowControl(64 *1024 * 1024)
                .build();

        client.listener(new DatabaseChangeListener() {
            @Override
            public void onFailure(StreamFailure streamFailure) {
                System.out.println("mw");
            }

            @Override
            public void onMutation(Mutation mutation) {
                System.out.println("m");
            }
            @Override
            public void onDeletion(Deletion deletion)
            {
                System.out.println("d");
            }
        }, FlowControlMode.AUTOMATIC);
        
        client.connect().block();
        Thread.sleep(10000);
        client.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();
        int a =  client.numPartitions();
        client.startStreaming().block();
    }

I GET THIS EXCEPTION WHEN I SPECIFY THE PORT AND ALSO WHEN I DONT

Since you’re connecting over a wide-area network, you might need to increase some timeout values.

.bootstrapTimeout(Duration.ofSeconds(30)) // default is 5 seconds
.socketConnectTimeout(TimeUnit.SECONDS.toMillis(30)) // default is one second

Incidentally, you might want to decrease the flow control buffer. The Couchabse connectors default to 16 MiB nowadays instead of 64.

this solved the problem David, thank you so much.

what should be the ideal value of flowControl while connecting with Couchbase local server and cappella?

now I use flowControl(64 *1024 * 1024).

The flow control buffer size affects the client’s peak heap memory usage. Multiply this value by the number of nodes in the cluster to see get a rough idea of the client’s heap requirement under load. Smaller flow control buffer size means lower heap requirements. There’s a tradeoff with performance. There’s no one ideal value; best is to lower it until you notice negative impact on performance under load.

1 Like