¡En este post iremos reactivos hasta el final!
Algunos clientes de Couchbase utilizan Vert.xun marco para escribir aplicaciones totalmente asíncronas, sabiendo que el SDK Java de Couchbase encaja bien en este cuadro, siendo asíncrono desde el principio y exponiendo un RxJava-API asíncrona.
Así que vamos a ver cómo ponerse en marcha rápidamente con un Couchbase Vertical que establece una conexión con Couchbase Grupo y Cubo sirve documentos JSON desde la base de datos, utilizando Java 8.
Esta entrada del blog asume que usted está familiarizado con el fundamentos de Vert.x. He aquí una breve tabla de contenidos:
- Iniciar un nuevo proyecto Vert.x
- Obtención asíncrona de un cubo
- Desmontando con elegancia el SDK
- Verlo en acción
- Ir más lejos
- Conclusión
Iniciar un nuevo proyecto Vert.x
Empecemos por crear un nuevo proyecto basado en Maven: crea una carpeta raíz para tu proyecto e inicializa una estructura de directorios maven (o utiliza tu arquetipo Maven favorito). Por ejemplo, puedes utilizar el siguiente comando "mkdir -p cbvertx/src/main/java/com/couchbase/demo/vertx/“.
Ahora vamos a iniciar el pom.xml en la raíz del proyecto:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
<!--?xml version="1.0" encoding="UTF-8"?--> 4.0.0 com.couchbase.demo cbvertx 1.0-SNAPSHOT io.vertx vertx-core 3.1.0 io.vertx vertx-rx-java 3.1.0 com.couchbase.client java-client 2.2.2 <!-- this is actually already a transitive dependency of the Java SDK--> io.reactivex rxjava 1.0.15 log4j log4j 1.2.17 maven-compiler-plugin 3.3 1.8 1.8 |
Como puede ver, utilizaremos Vert.x versión 3.1.0 y su extensión para bindings en RxJava, SDK Java de Couchbase versión 2.2.2 y RxJava versión 1.0.15…
Esqueleto de la vértebra
Basaremos nuestra CouchbaseVerticle en el ArtículoAbstracto en io.vertx.rxjava.core (de la extensión vertx-rx-java). Vamos a crear su esqueleto en el proyecto:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package com.couchbase.demo.vertx; import java.util.ArrayList; import java.util.List; import com.couchbase.client.java.CouchbaseAsyncCluster; import io.vertx.core.Context; import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.rxjava.core.AbstractVerticle; public class CouchbaseVerticle extends AbstractVerticle { private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseVerticle.class); private CouchbaseAsyncCluster cluster; } |
La fase init que escribiremos justo después mostrará cómo usar la configuración Vert.x para determinar en tiempo de ejecución desde qué nodo(s) del cluster Couchbase arrancaremos. Instanciación del CouchbaseCluster sigue siendo lo suficientemente ligero como para que se pueda hacer así durante el init.
Añade el siguiente método init a la directiva CouchbaseVerticle:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Override public void init(Vertx vertx, Context context) { super.init(vertx, context); //getting the configuration JSON JsonObject config = context.config(); //getting the bootstrap node, as a JSON array (default to localhost) JsonArray seedNodeArray = config.getJsonArray("couchbase.seedNodes", new JsonArray().add("localhost")); //convert to a List List seedNodes = new ArrayList<>(seedNodeArray.size()); for (Object seedNode : seedNodeArray) { seedNodes.add((String) seedNode); } //use that to bootstrap the Cluster this.cluster = CouchbaseAsyncCluster.create(seedNodes); } |
Obtención asíncrona de un cubo
El principal punto de entrada a la API de Couchbase es la aplicación Cubo para la API de sincronización, o AsyncBucket para la API asíncrona. Establecer la conexión con el cubo ("abrirlo") es mucho más pesado, por lo que debe hacerse de forma asíncrona.
Veamos cómo podemos iniciar nuestra Verticula abriendo primero el cubo que utilizaremos durante toda la vida de la Verticula. Queremos mantener una referencia a él, y utilizar la función start(Futuro startFuturo) para notificar de forma asíncrona a Vert.x que la Verticula está lista:
|
1 2 3 4 5 6 7 8 9 10 11 |
private volatile AsyncBucket bucket; @Override public void start(Future startFuture) throws Exception { cluster.openBucket(config().getString("couchbase.bucketName", "default"), config().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.info("Bucket opened " + openedBucket.name())) .subscribe( openedBucket -> bucket = openedBucket, startFuture::fail, startFuture::complete); } |
Observa que primero obtenemos el nombre del bucket (y la contraseña asociada si es relevante) dinámicamente, desde la configuración de Vert.x. Abrimos el bucket de forma asíncrona, estableciendo las conexiones y recursos internos del SDK.
En doOnNext se utiliza para registrar la apertura del cubo.
A continuación, nos suscribimos a nuestro Observable y describir cómo queremos "consumir" los datos finales:
- al recibir la referencia del cubo, la almacenamos en un campo para su uso posterior
- si hay un error en el camino, fallamos el arranque del Verticle usando el comando
Futuro1TP5Fallométodo. - de lo contrario, notificamos a Vert.x que la Verticle se ha iniciado correctamente mediante el comando
Futuro#completométodo.
Es un buen comienzo.
Desmontando con elegancia el SDK
Cuando el Verticle se detiene, los recursos creados por el SDK deben eliminarse correctamente. El sitio Grupo tiene un objeto desconectar que hace esto, llamando recursivamente a cerrar en cada Cubo que abrió (close se puede utilizar para deshacerse de un solo Bucket).
También desde 1.0.15 RxJava tiene un método para apagar sus Threads internos: Programadores.apagado. Esto debe ser invocado sólo cuando no habrá un uso posterior de RxJava en la aplicación, por lo que podría ser una mejor idea hacerlo al cerrar Vert.x...
Una vez más, detendremos la Verticle de forma asíncrona, utilizando un comando Futuro para notificar al marco la finalización de la parada:
|
1 2 3 4 5 6 7 8 9 |
@Override public void stop(Future stopFuture) throws Exception { cluster.disconnect() .doOnNext(isDisconnectedCleanly -> LOGGER.info("Disconnected Cluster (cleaned threads: " + isDisconnectedCleanly + ")")) .subscribe( isDisconnectedCleanly -> stopFuture.complete(), stopFuture::fail, Schedulers::shutdown); } |
(hemos optado por apagar RxJava al finalizar la desconexión del SDK)
Nota Puede ajustar el SDK pasando un parámetro
CouchbaseEnvironmenttras la creación delGrupo. En ese caso, le corresponde a usted también llamar acierreen el entorno cuando se cierra todo el SDK (es decir, cuando se cierran todos los Clusters en los que se utilizó el entorno, generalmente sólo uno).Si no ha creado un entorno específico, el SDK creará internamente uno y lo cerrará correctamente, cuyo resultado se ve arriba en la imagen
isDisconnectedCleanlyvariable.
Verlo en acción
Vamos a crear un principal que incrusta Vert.x, despliega el Verticle y luego se detiene. Tenga en cuenta que esta es una implementación bastante ingenua con CountDownLatches, donde normalmente preferiría utilizar la línea de comandos o Lanzador como clase principal.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public static void main(String[] args) throws InterruptedException { Vertx vertx = Vertx.vertx(); final CountDownLatch startLatch = new CountDownLatch(1); vertx.deployVerticle(new CouchbaseVerticle(), event -> { if (event.succeeded()) LOGGER.info("Verticle Deployed - " + event.result()); else LOGGER.error("Verticle deployment error", event.cause()); startLatch.countDown(); }); startLatch.await(); final CountDownLatch stopLatch = new CountDownLatch(1); vertx.close(event -> { if (event.succeeded()) LOGGER.info("Vert.x Stopped - " + event.result()); else LOGGER.error("Vert.x stopping error", event.cause()); stopLatch.countDown(); }); stopLatch.await(); } |
Si ejecutas esto, esto es lo que deberías ver (¿notas la diferencia en el formato de la marca de tiempo? 2015-12-11 son del SDK mientras que 11 de diciembre de 2015 son de Vert.x):
|
1 2 3 4 5 6 7 8 9 10 11 12 |
2015-12-11 16:21:20 INFO Node:135 - Connected to Node localhost 2015-12-11 16:21:20 INFO ConfigurationProvider:263 - Opened bucket default Dec 11, 2015 4:21:20 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Bucket opened default Dec 11, 2015 4:21:20 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Verticle Deployed - caf06dd3-c8d1-4b89-8de0-58f09467b805 2015-12-11 16:21:20 INFO ConfigurationProvider:284 - Closed bucket default 2015-12-11 16:21:20 INFO Node:145 - Disconnected from Node localhost Dec 11, 2015 4:21:22 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Disconnected Cluster (cleaned threads: true) Dec 11, 2015 4:21:22 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Vert.x Stopped - null |
¿Cómo verificar el comportamiento del error? Podríamos simplemente cambiar la contraseña por una que sea incorrecta, sólo para comprobar los registros, que entonces se ven como:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
2015-12-11 16:25:45 WARN Endpoint:283 - [null][KeyValueEndpoint]: Authentication Failure. 2015-12-11 16:25:45 WARN ResponseStatusConverter:129 - Unknown ResponseStatus with Protocol HTTP: 401 2015-12-11 16:25:45 WARN ResponseStatusConverter:129 - Unknown ResponseStatus with Protocol HTTP: 401 Dec 11, 2015 4:25:45 PM com.couchbase.demo.vertx.CouchbaseVerticle SEVERE: Verticle deployment error com.couchbase.client.java.error.InvalidPasswordException: Passwords for bucket "default" do not match. at com.couchbase.client.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:210) at com.couchbase.client.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:200) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:99) at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:48) at rx.observers.Subscribers$5.onError(Subscribers.java:229) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:197) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:170) at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Dec 11, 2015 4:25:45 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Vert.x Stopped - null |
Así que hemos desplegado (y detenido) con éxito nuestro primer Couchbase Verticle.
¡Choca esos cinco!
/! no olvides volver a cambiar la contraseña por la correcta
Ir más lejos
Intentemos hacer un poco más con este Verticle. ¿Qué tal si intentamos preparar datos de ejemplo en Couchbase y servirlos en un endpoint REST gestionado por Vert.x?
Creación de datos de muestra en Couchbase al inicio
Crearemos dos documentos de ejemplo en Couchbase durante el arranque del Verticle, los usuarios Alice y Bob.
Uno puede almacenar JSON en Couchbase usando dos Documento implementaciones:
JsonDocumentes la predeterminada. Se basa en una representación JSON sencilla proporcionada por el SDK, la representaciónJsonObject.Documento RawJsones útil cuando ya tienes JSON marshalling/unmarshalling en tu aplicación (u otra forma de representar JSON como el propio Vert.x'sJsonObject). En esta implementación lo que se pasa es la representación JSON String sin procesar.
Aquí están Alice y Bob, creados utilizando ambas alternativas:
|
1 2 |
JsonDocument.create("user1", com.couchbase.client.java.document.json.JsonObject.create() .put("name", "Alice").put("age", 26)) |
y
|
1 |
RawJsonDocument.create("user2", new JsonObject().put("name", "Bob").put("age", 31).encode()) |
Ahora, el método de inicio necesita un pequeño ajuste. En lugar de guardar la referencia a la cubeta en la suscripción, vamos a mover que antes en un doOnNext. Después, crearemos los documentos y los convertiremos en observables utilizando Observable.just. Esto se puede enviar al SDK para su inserción utilizando flatMap:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Override public void start(Future startFuture) throws Exception { cluster.openBucket(config().getString("couchbase.bucketName", "default"), config().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.info("Bucket opened " + openedBucket.name())) .doOnNext(openedBucket -> bucket = openedBucket) .flatMap(nowIgnoreBucket -> Observable.just( JsonDocument.create("user1", com.couchbase.client.java.document.json.JsonObject.create() .put("name", "Alice").put("age", 26)), RawJsonDocument.create("user2", new JsonObject().put("name", "Bob").put("age", 31).encode()) )) .flatMap(doc -> bucket.upsert(doc)) .subscribe(Actions.empty(), startFuture::fail, startFuture::complete); } |
El uso de upsert garantiza que los documentos se crearán, tanto si la clave ya existe en la base de datos como si no.
Servir datos JSON desde Couchbase
Modifiquemos nuestra verticula para que no se detenga de inmediato, en su lugar giraremos un Servidor HTTP que intentará recuperar un documento json de la base de datos y enviarlo a un cliente cuando la ruta usuario/{id} se utiliza:
He aquí una forma rápida y sucia de utilizar Vert.x's Lanzador para iniciar el programa (que no detendrá el núcleo Vert.x de inmediato). Sustituye el contenido de nuestro principal con lo siguiente:
|
1 2 |
String[] vertxArgs = new String[] { "run", "com.couchbase.demo.vertx.CouchbaseVerticle" }; Launcher.main(vertxArgs); |
Nota: En una aplicación real,
Lanzadornormalmente se convertiría en la clase principal del tarro y se pasarían los argumentos de la línea de comandos directamente.
Ahora hagamos girar un Servidor HTTP en el arranque de Verticle. Encadena el siguiente código en el directorio iniciar justo después del método flatMap(doc -> bucket.upsert(doc)) llamar:
|
1 2 3 4 |
.last() .flatMap(ignore -> vertx.createHttpServer() .requestHandler(this::handle) .listenObservable(8080)) |
Tenemos que crear el asa para configurar esa ruta:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private void handle(HttpServerRequest r) { String[] path = r.path().split("/"); if (path.length == 3 && "users".equals(path[1])) { bucket.get(path[2], RawJsonDocument.class) .switchIfEmpty(Observable.error(new DocumentDoesNotExistException(path[2]))) .subscribe(doc -> r.response() .putHeader("content-type", "application/json") .end(doc.content()), error -> { r.response() .putHeader("content-type", "application/json") .setStatusCode(500).setStatusMessage(error.toString()) .end("{"error": "" + error.toString() + ""}"); }); } } |
Vamos a probarlo: ejecuta la aplicación y ve a esta url: https://localhost:8080/users/user1. Deberías ver el JSON de Alice, ¡servido directamente desde Couchbase!
|
1 2 3 4 |
{ "name": "Alice", "age": 26 } |
Para otra llavedebería ver la excepción en formato JSON:
|
1 2 3 |
{ "error": "com.couchbase.client.java.error.DocumentDoesNotExistException: user3" } |
Detención del Verticle a través de un punto final HTTP
Añadamos rápidamente una ruta que pare Vert.x, por diversión y beneficio :)
|
1 2 3 4 |
//...replacing from the last line in `handle` } else if (r.path().equals("/stop")) { r.response() .end(" |
Cierre de Couchbase y Vertx
Tenga en cuenta que si se ejecuta desde un Inicio vertx, esto no matará el hilo principal
"); vertx.close(); }
Apertura https://localhost:8080/stop hará que toda la aplicación Vert.x se detenga, destruyendo los Verticles desplegados.
Nota: Como se indica en el mensaje, esto no mata el proceso cuando se ejecuta desde el IDE.
Conclusión
En esta entrada de blog, hemos descubierto cómo Vert.x y el SDK Java de Couchbase pueden trabajar juntos para construir una aplicación totalmente asíncrona.
¡Feliz codificación asíncrona!
¿tiene un enlace github repo donde la aplicación anterior está disponible