In this post we’ll go reactive all the way!

Some Couchbase customers use Vert.x, a framework for writing fully asynchronous applications, knowing that the Couchbase Java SDK fits well into this picture, being asynchronous from the ground up and exposing a RxJava-based asynchronous API.

So we’ll see how to get going fast with a Couchbase Verticle that spins up a connection to a Couchbase Cluster and Bucket then serves JSON documents from the database, using Java 8.

This blog post assumes that you are familiar with the basics of Vert.x. Here is a short table of content:

Starting a new Vert.x project

Let’s start by creating a new Maven based project: create a root folder for your project and initialize a maven directory structure (or use your favorite Maven archetype). For instance you can use the following command: “mkdir -p cbvertx/src/main/java/com/couchbase/demo/vertx/“.

Now let’s initiate the pom.xml at the root of the project:

As you can see we’ll be using Vert.x version 3.1.0 and its extension for bindings in RxJava, Couchbase Java SDK version 2.2.2 and RxJava version 1.0.15

Skeleton of the Verticle

We’ll base our CouchbaseVerticle on the AbstractVerticle in io.vertx.rxjava.core (from the vertx-rx-java extension). Let’s create its skeleton in the project:

The init phase that we’ll write right after this will show how to use the Vert.x configuration to determine at runtime which node(s) in the Couchbase cluster we’ll bootstrap from. Instantiation of the CouchbaseCluster is still lightweight enough that it can be done like this during init.

Add the following init method to the CouchbaseVerticle:

Asynchronously Obtaining a Bucket

The main entry point to the Couchbase API is the Bucket interface for the sync API, or AsyncBucket for the async API. Establishing the connection to the bucket (“opening” it) is much more heavyweight, so it should be done asynchronously.

Let’s see how we can start our Verticle by first opening the bucket we’ll use throughout the lifetime of the Verticle. We want to keep a reference to it, and use the start(Future startFuture) method to asynchronously notify Vert.x that the Verticle is ready:

Notice first we get the name of the bucket (and the associated password if relevant) dynamically, from the Vert.x configuration. We open the bucket asynchronously, establishing the connections and resources internal to the SDK.

The doOnNext method is used to log the opening of the bucket.

Then we subscribe to our Observable stream and describe how we want to “consume” the final data:

  • upon receiving the bucket reference, we store it in a field for later use
  • if there is an error along the way, we fail the startup of the Verticle using the Future#fail method.
  • otherwise we notify Vert.x that the Verticle was successfully started using the Future#complete method.

That’s a pretty good start!

Gracefully tearing down the SDK

When the Verticle stops, the resources created by the SDK should be properly disposed of. The Cluster object has a disconnect method that does this, recursively calling close on every Bucket it opened (close can be used to dispose of a single Bucket).

Also since 1.0.15 RxJava has a method for shutting down its internal Threads: Schedulers.shutdown. This should be invoked only when there won’t be subsequent usage of RxJava in the application though, so it might be a better idea to do that upon Vert.x shutdown…

Once again we’ll stop the Verticle asynchronously, using a Future to notify the framework of stop completion:

(we chose to shutdown RxJava upon completion of the SDK disconnection)

Note You can tune the SDK by passing in a CouchbaseEnvironment upon creation of the Cluster. In that case, it is up to you to also call shutdown on the environment when tearing down the whole SDK (that is, when all Clusters you used the environment in, generally just the one, are shut down).

If you didn’t create a specific environment the SDK will internally create one and shut it down properly, the result of which is seen above in the isDisconnectedCleanly variable.

Seeing it in action

Let’s create a quick main that embeds Vert.x, deploys the Verticle and then stops. Note this is a pretty naive implementation with CountDownLatches, where you would usually rather use the command line or Launcher as a main class.

If you execute this, here is what you should see (notice the difference in timestamp format? 2015-12-11 ones are from the SDK while Dec 11, 2015 ones are from Vert.x):

How to verify the error behavior? We could simply change the password to one that is wrong, just to check the logs, which then look like:

So we successfully deployed (and stopped) our first Couchbase Verticle!
High five!

/! don’t forget to change the password back to the correct one

Going further

Let’s try to do a little bit more with this Verticle. How about we try to prepare sample data in Couchbase and serve it in a REST endpoint managed by Vert.x?

Creating sample data in Couchbase on startup

We’ll create two sample documents in Couchbase during the starting up of the Verticle, users Alice and Bob.

One can store JSON in Couchbase using two Document implementations:

  • JsonDocument is the default one. It is based on a simple JSON representation provided by the SDK, the JsonObject.
  • RawJsonDocument is useful when you already have JSON marshalling/unmarshalling in your application (or another way of representing JSON like Vert.x’s own JsonObject). In this implementation what you pass in is the raw JSON String representation.

Here are Alice and Bob, created using both alternatives:

and

Now, the start method needs a little bit of adjustment. Instead of saving the reference to the bucket in the subscription, we’ll move that earlier in a doOnNext. After that, we’ll create the documents and make an Observable out of them using Observable.just. This can be forwarded to the SDK for insertion using flatMap:

The use of upsert here guarantees that the documents will be created, whether the key already exist in database or not.

Serving JSON data from Couchbase

Let’s modify our verticle so that it doesn’t stop right away, instead spinning a HTTP Server that will try to retrieve a json document from the database and send it to a client when the route user/{id} is used:

Here is a quick and dirty way of using Vert.x’s Launcher to start the program (which will not stop the Vert.x core right away). Replace the content of our main method with the following:

Note: In a real application, Launcher would usually be made the main class of the jar and you’d pass in command line arguments directly.

Now let’s spin a HTTP Server on Verticle startup. Chain in the following code in the start method, right after the flatMap(doc -> bucket.upsert(doc)) call:

We need to create the handle method to set up that route:

Let’s test it: run the application and go to this url: http://localhost:8080/users/user1. You should see Alice’s JSON, served directly from Couchbase!

For another key, you should see the exception in JSON form:

Stopping the Verticle via a HTTP endpoint

Let’s quickly add a route that stops Vert.x, for fun and profit :)

Closing Couchbase and Vertx

Note that running from a vertx Starter, this won’t kill the main thread

“); vertx.close(); }

Opening http://localhost:8080/stop will trigger the whole Vert.x application to stop, tearing down deployed Verticles.

Note: As stated in the message, this doesn’t kill the process when run from the IDE.

Conclusion

In this blog post, we’ve discovered how Vert.x and the Couchbase Java SDK can work together towards building a fully asynchronous application.

Happy asynchronous coding!

Author

Posted by Simon Basle, Software Engineer, Pivotal

Simon Basl_ is a Paris-based Software Engineer working in the Spring team at Pivotal. Previously, he worked in the Couchbase Java SDK team. His interests span software design aspects (OOP, design patterns, software architecture), rich clients, what lies beyond code (continuous integration, (D)VCS, best practices), and reactive programming. He is also an editor for the French version of InfoQ.com.

One Comment

  1. do you have a github repo link where the above implementation is available

Leave a reply