¡En este post iremos reactivos hasta el final!
Algunos clientes de Couchbase utilizan Vert.x
un marco para escribir aplicaciones totalmente asíncronas, sabiendo que el SDK Java de Couchbase
encaja bien en este cuadro, siendo asíncrono desde el principio y exponiendo un RxJava
-API asíncrona.
Así que vamos a ver cómo ponerse en marcha rápidamente con un Couchbase Vertical
que establece una conexión con Couchbase Grupo
y Cubo
sirve documentos JSON desde la base de datos, utilizando Java 8.
Esta entrada del blog asume que usted está familiarizado con el fundamentos de Vert.x. He aquí una breve tabla de contenidos:
- Iniciar un nuevo proyecto Vert.x
- Obtención asíncrona de un cubo
- Desmontando con elegancia el SDK
- Verlo en acción
- Ir más lejos
- Conclusión
Iniciar un nuevo proyecto Vert.x
Empecemos por crear un nuevo proyecto basado en Maven: crea una carpeta raíz para tu proyecto e inicializa una estructura de directorios maven (o utiliza tu arquetipo Maven favorito). Por ejemplo, puedes utilizar el siguiente comando "mkdir -p cbvertx/src/main/java/com/couchbase/demo/vertx/
“.
Ahora vamos a iniciar el pom.xml
en la raíz del proyecto:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
<!--?xml versión="1.0" codificación="UTF-8"?--> 4.0.0 com.couchbase.demo cbvertx 1.0-SNAPSHOT io.vertx vertx-núcleo 3.1.0 io.vertx vertx-rx-java 3.1.0 com.couchbase.cliente java-cliente 2.2.2 <!-- este es en realidad ya a transitivo dependencia de el Java SDK--> io.reactivex rxjava 1.0.15 log4j log4j 1.2.17 maven-compilador-plugin 3.3 1.8 1.8 |
Como puede ver, utilizaremos Vert.x
versión 3.1.0
y su extensión para bindings en RxJava, SDK Java de Couchbase
versión 2.2.2
y RxJava
versión 1.0.15
…
Esqueleto de la vértebra
Basaremos nuestra CouchbaseVerticle
en el ArtículoAbstracto
en io.vertx.rxjava.core
(de la extensión vertx-rx-java). Vamos a crear su esqueleto en el proyecto:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
paquete com.couchbase.demo.vertx; importar java.util.ArrayList; importar java.util.Lista; importar com.couchbase.cliente.java.CouchbaseAsyncCluster; importar io.vertx.núcleo.Contexto; importar io.vertx.núcleo.Vertx; importar io.vertx.núcleo.json.JsonArray; importar io.vertx.núcleo.json.JsonObject; importar io.vertx.núcleo.registro.Registrador; importar io.vertx.núcleo.registro.LoggerFactory; importar io.vertx.rxjava.núcleo.ArtículoAbstracto; público clase CouchbaseVerticle extiende ArtículoAbstracto { privado estático final Registrador LOGGER = LoggerFactory.getLogger(CouchbaseVerticle.clase); privado CouchbaseAsyncCluster grupo; } |
La fase init que escribiremos justo después mostrará cómo usar la configuración Vert.x para determinar en tiempo de ejecución desde qué nodo(s) del cluster Couchbase arrancaremos. Instanciación del CouchbaseCluster
sigue siendo lo suficientemente ligero como para que se pueda hacer así durante el init.
Añade el siguiente método init a la directiva CouchbaseVerticle
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Anular público void init(Vertx vertx, Contexto contexto) { super.init(vertx, contexto); //obtener la configuración JSON JsonObject config = contexto.config(); //obtener el nodo bootstrap, como un array JSON (por defecto localhost) JsonArray seedNodeArray = config.getJsonArray("couchbase.seedNodes", nuevo JsonArray().añada("localhost")); //convertir en lista Lista seedNodes = nuevo ArrayList<>(seedNodeArray.talla()); para (Objeto seedNode : seedNodeArray) { seedNodes.añada((Cadena) seedNode); } //utilízalo para arrancar el Cluster este.grupo = CouchbaseAsyncCluster.crear(seedNodes); } |
Obtención asíncrona de un cubo
El principal punto de entrada a la API de Couchbase es la aplicación Cubo
para la API de sincronización, o AsyncBucket
para la API asíncrona. Establecer la conexión con el cubo ("abrirlo") es mucho más pesado, por lo que debe hacerse de forma asíncrona.
Veamos cómo podemos iniciar nuestra Verticula abriendo primero el cubo que utilizaremos durante toda la vida de la Verticula. Queremos mantener una referencia a él, y utilizar la función start(Futuro startFuturo)
para notificar de forma asíncrona a Vert.x que la Verticula está lista:
1 2 3 4 5 6 7 8 9 10 11 |
privado volátil AsyncBucket cubo; @Anular público void iniciar(Futuro startFuture) lanza Excepción { grupo.openBucket(config().getString("couchbase.bucketName", "por defecto"), config().getString("couchbase.bucketPassword", "")) .doOnNext(openBucket -> LOGGER.información("Cubo abierto" + openBucket.nombre())) .suscríbase a( openBucket -> cubo = openBucket, startFuture::falla, startFuture::completa); } |
Observa que primero obtenemos el nombre del bucket (y la contraseña asociada si es relevante) dinámicamente, desde la configuración de Vert.x. Abrimos el bucket de forma asíncrona, estableciendo las conexiones y recursos internos del SDK.
En doOnNext
se utiliza para registrar la apertura del cubo.
A continuación, nos suscribimos a nuestro Observable
y describir cómo queremos "consumir" los datos finales:
- al recibir la referencia del cubo, la almacenamos en un campo para su uso posterior
- si hay un error en el camino, fallamos el arranque del Verticle usando el comando
Futuro1TP5Fallo
método. - de lo contrario, notificamos a Vert.x que la Verticle se ha iniciado correctamente mediante el comando
Futuro#completo
método.
Es un buen comienzo.
Desmontando con elegancia el SDK
Cuando el Verticle se detiene, los recursos creados por el SDK deben eliminarse correctamente. El sitio Grupo
tiene un objeto desconectar
que hace esto, llamando recursivamente a cerrar
en cada Cubo
que abrió (close se puede utilizar para deshacerse de un solo Bucket).
También desde 1.0.15
RxJava tiene un método para apagar sus Threads internos: Programadores.apagado
. Esto debe ser invocado sólo cuando no habrá un uso posterior de RxJava en la aplicación, por lo que podría ser una mejor idea hacerlo al cerrar Vert.x...
Una vez más, detendremos la Verticle de forma asíncrona, utilizando un comando Futuro
para notificar al marco la finalización de la parada:
1 2 3 4 5 6 7 8 9 |
@Anular público void stop(Futuro stopFuture) lanza Excepción { grupo.desconectar() .doOnNext(isDisconnectedCleanly -> LOGGER.información("Cluster desconectado (hilos limpios: " + isDisconnectedCleanly + ")")) .suscríbase a( isDisconnectedCleanly -> stopFuture.completa(), stopFuture::falla, Programadores::cierre); } |
(hemos optado por apagar RxJava al finalizar la desconexión del SDK)
Nota Puede ajustar el SDK pasando un parámetro
CouchbaseEnvironment
tras la creación delGrupo
. En ese caso, le corresponde a usted también llamar acierre
en el entorno cuando se cierra todo el SDK (es decir, cuando se cierran todos los Clusters en los que se utilizó el entorno, generalmente sólo uno).Si no ha creado un entorno específico, el SDK creará internamente uno y lo cerrará correctamente, cuyo resultado se ve arriba en la imagen
isDisconnectedCleanly
variable.
Verlo en acción
Vamos a crear un principal
que incrusta Vert.x, despliega el Verticle y luego se detiene. Tenga en cuenta que esta es una implementación bastante ingenua con CountDownLatches, donde normalmente preferiría utilizar la línea de comandos o Lanzador
como clase principal.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
público estático void principal(Cadena[] args) lanza InterruptedException { Vertx vertx = Vertx.vertx(); final CountDownLatch startLatch = nuevo CountDownLatch(1); vertx.deployVerticle(nuevo CouchbaseVerticle(), evento -> { si (evento.sucedió a()) LOGGER.información("Verticle Deployed - " + evento.resultado()); si no LOGGER.error("Error de despliegue vertical", evento.causa()); startLatch.countDown(); }); startLatch.await(); final CountDownLatch stopLatch = nuevo CountDownLatch(1); vertx.cerrar(evento -> { si (evento.sucedió a()) LOGGER.información("Vert.x Detenido - " + evento.resultado()); si no LOGGER.error("Error de parada de Vert.x", evento.causa()); stopLatch.countDown(); }); stopLatch.await(); } |
Si ejecutas esto, esto es lo que deberías ver (¿notas la diferencia en el formato de la marca de tiempo? 2015-12-11
son del SDK mientras que 11 de diciembre de 2015
son de Vert.x):
1 2 3 4 5 6 7 8 9 10 11 12 |
2015-12-11 16:21:20 INFO Nodo:135 - Conectado a Nodo localhost 2015-12-11 16:21:20 INFO ConfigurationProvider:263 - Abierto cubo por defecto Diciembre 11, 2015 4:21:20 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Cubo abierto por defecto Diciembre 11, 2015 4:21:20 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Vertical Desplegado - caf06dd3-c8d1-4b89-8de0-58f09467b805 2015-12-11 16:21:20 INFO ConfigurationProvider:284 - Cerrado cubo por defecto 2015-12-11 16:21:20 INFO Nodo:145 - Desconectado de Nodo localhost Diciembre 11, 2015 4:21:22 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Desconectado Grupo (limpiado hilos: verdadero) Diciembre 11, 2015 4:21:22 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Vert.x Detenido - null |
¿Cómo verificar el comportamiento del error? Podríamos simplemente cambiar la contraseña por una que sea incorrecta, sólo para comprobar los registros, que entonces se ven como:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
2015-12-11 16:25:45 AVISO Punto final:283 - [null][KeyValueEndpoint]: Autenticación Fallo. 2015-12-11 16:25:45 AVISO Convertidor de ResponseStatus:129 - Desconocido ResponseStatus con Protocolo HTTP: 401 2015-12-11 16:25:45 AVISO Convertidor de ResponseStatus:129 - Desconocido ResponseStatus con Protocolo HTTP: 401 Diciembre 11, 2015 4:25:45 PM com.couchbase.demo.vertx.CouchbaseVerticle SEVERE: Vertical despliegue error com.couchbase.cliente.java.error.InvalidPasswordException: Contraseñas para cubo "por defecto" do no match. en com.couchbase.cliente.java.CouchbaseAsyncCluster$1 llamada(CouchbaseAsyncCluster.java:210) en com.couchbase.cliente.java.CouchbaseAsyncCluster$1 llamada(CouchbaseAsyncCluster.java:200) en rx.interno.operadores.OperadorEnErrorReanudarSiguienteViaFunción$1.onError(OperadorEnErrorReanudarSiguienteViaFunción.java:99) en rx.interno.operadores.OperadorMapa$1.onError(OperadorMapa.java:48) en rx.observadores.Abonados$5.onError(Abonados.java:229) en rx.interno.operadores.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:197) en rx.interno.operadores.OperatorObserveOn$ObserveOnSubscriber$2.llamada(OperatorObserveOn.java:170) en rx.interno.programadores.Acción programada.ejecute(Acción programada.java:55) en java.util.concurrente.Ejecutores$Adaptador ejecutable.llame a(Ejecutores.java:511) en java.util.concurrente.FutureTask.ejecute(FutureTask.java:266) en java.util.concurrente.ScheduledThreadPoolExecutor$ScheduledFutureTask.acceda a$201(ScheduledThreadPoolExecutor.java:180) en java.util.concurrente.ScheduledThreadPoolExecutor$ScheduledFutureTask.ejecute(ScheduledThreadPoolExecutor.java:293) en java.util.concurrente.Ejecutor de ThreadPool.runWorker(Ejecutor de ThreadPool.java:1142) en java.util.concurrente.Ejecutor de ThreadPool$Trabajador.ejecute(Ejecutor de ThreadPool.java:617) en java.lang.Hilo.ejecute(Hilo.java:745) Diciembre 11, 2015 4:25:45 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Vert.x Detenido - null |
Así que hemos desplegado (y detenido) con éxito nuestro primer Couchbase Verticle.
¡Choca esos cinco!
/! no olvides volver a cambiar la contraseña por la correcta
Ir más lejos
Intentemos hacer un poco más con este Verticle. ¿Qué tal si intentamos preparar datos de ejemplo en Couchbase y servirlos en un endpoint REST gestionado por Vert.x?
Creación de datos de muestra en Couchbase al inicio
Crearemos dos documentos de ejemplo en Couchbase durante el arranque del Verticle, los usuarios Alice y Bob.
Uno puede almacenar JSON en Couchbase usando dos Documento
implementaciones:
JsonDocument
es la predeterminada. Se basa en una representación JSON sencilla proporcionada por el SDK, la representaciónJsonObject
.Documento RawJson
es útil cuando ya tienes JSON marshalling/unmarshalling en tu aplicación (u otra forma de representar JSON como el propio Vert.x'sJsonObject
). En esta implementación lo que se pasa es la representación JSON String sin procesar.
Aquí están Alice y Bob, creados utilizando ambas alternativas:
1 2 |
JsonDocument.crear("usuario1", com.couchbase.cliente.java.documento.json.JsonObject.crear() .poner("nombre", "Alice").poner("edad", 26)) |
y
1 |
Documento RawJson.crear("usuario2", nuevo JsonObject().poner("nombre", "Bob").poner("edad", 31).codificar()) |
Ahora, el método de inicio necesita un pequeño ajuste. En lugar de guardar la referencia a la cubeta en la suscripción, vamos a mover que antes en un doOnNext
. Después, crearemos los documentos y los convertiremos en observables utilizando Observable.just
. Esto se puede enviar al SDK para su inserción utilizando flatMap
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Anular público void iniciar(Futuro startFuture) lanza Excepción { grupo.openBucket(config().getString("couchbase.bucketName", "por defecto"), config().getString("couchbase.bucketPassword", "")) .doOnNext(openBucket -> LOGGER.información("Cubo abierto" + openBucket.nombre())) .doOnNext(openBucket -> cubo = openBucket) .flatMap(nowIgnoreBucket -> Observable.sólo( JsonDocument.crear("usuario1", com.couchbase.cliente.java.documento.json.JsonObject.crear() .poner("nombre", "Alice").poner("edad", 26)), Documento RawJson.crear("usuario2", nuevo JsonObject().poner("nombre", "Bob").poner("edad", 31).codificar()) )) .flatMap(doc -> cubo.upsert(doc)) .suscríbase a(Acciones.vacío(), startFuture::falla, startFuture::completa); } |
El uso de upsert
garantiza que los documentos se crearán, tanto si la clave ya existe en la base de datos como si no.
Servir datos JSON desde Couchbase
Modifiquemos nuestra verticula para que no se detenga de inmediato, en su lugar giraremos un Servidor HTTP que intentará recuperar un documento json de la base de datos y enviarlo a un cliente cuando la ruta usuario/{id}
se utiliza:
He aquí una forma rápida y sucia de utilizar Vert.x's Lanzador
para iniciar el programa (que no detendrá el núcleo Vert.x de inmediato). Sustituye el contenido de nuestro principal
con lo siguiente:
1 2 |
Cadena[] vertxArgs = nuevo Cadena[] { "correr", "com.couchbase.demo.vertx.CouchbaseVerticle" }; Lanzador.principal(vertxArgs); |
Nota: En una aplicación real,
Lanzador
normalmente se convertiría en la clase principal del tarro y se pasarían los argumentos de la línea de comandos directamente.
Ahora hagamos girar un Servidor HTTP en el arranque de Verticle. Encadena el siguiente código en el directorio iniciar
justo después del método flatMap(doc -> bucket.upsert(doc))
llamar:
1 2 3 4 |
.último() .flatMap(ignore -> vertx.createHttpServer() .requestHandler(este::asa) .listenObservable(8080)) |
Tenemos que crear el asa
para configurar esa ruta:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
privado void asa(HttpServerRequest r) { Cadena[] ruta = r.ruta().dividir("/"); si (ruta.longitud == 3 && "usuarios".es igual a(ruta[1])) { cubo.consiga(ruta[2], Documento RawJson.clase) .switchIfEmpty(Observable.error(nuevo DocumentDoesNotExistException(ruta[2]))) .suscríbase a(doc -> r.respuesta() .putHeader("tipo de contenido", "application/json") .fin(doc.contenido()), error -> { r.respuesta() .putHeader("tipo de contenido", "application/json") .setStatusCode(500).setMensajeDeEstado(error.toString()) .fin("{"error": "" + error.toString() + ""}"); }); } } |
Vamos a probarlo: ejecuta la aplicación y ve a esta url: http://localhost:8080/users/user1. Deberías ver el JSON de Alice, ¡servido directamente desde Couchbase!
1 2 3 4 |
{ "nombre": "Alice", "edad": 26 } |
Para otra llavedebería ver la excepción en formato JSON:
1 2 3 |
{ "error": "com.couchbase.client.java.error.DocumentDoesNotExistException: user3" } |
Detención del Verticle a través de un punto final HTTP
Añadamos rápidamente una ruta que pare Vert.x, por diversión y beneficio :)
1 2 3 4 |
//...reemplazando desde la última línea en `handle`. } si no si (r.ruta().es igual a("/stop")) { r.respuesta() .fin(" |
Cierre de Couchbase y Vertx
Tenga en cuenta que si se ejecuta desde un Inicio vertx, esto no matará el hilo principal
"); vertx.close(); }
Apertura http://localhost:8080/stop hará que toda la aplicación Vert.x se detenga, destruyendo los Verticles desplegados.
Nota: Como se indica en el mensaje, esto no mata el proceso cuando se ejecuta desde el IDE.
Conclusión
En esta entrada de blog, hemos descubierto cómo Vert.x
y el SDK Java de Couchbase
pueden trabajar juntos para construir una aplicación totalmente asíncrona.
¡Feliz codificación asíncrona!
¿tiene un enlace github repo donde la aplicación anterior está disponible