En el anterior entrada del blog hablamos de cómo Couchbase puede integrarse perfectamente en un Arquitectura basada en eventos. Usando el Servicio de Eventos de Couchbase, las mutaciones de documentos pueden ser publicadas a un Solace PubSub+ desde donde los datos se ponen a disposición de los microservicios suscriptores casi en tiempo real.
Este post se centra en cómo los microservicios como Topic subscribers pueden aprovechar Couchbase como un almacén de datos escalable y resistente.
Visión general de la arquitectura y aplicación de ejemplo
En este solicitud de muestranos suscribimos a un tema Solace PubSub+, procesamos los datos y actualizamos el documento en Couchbase.
En nuestra aplicación de ejemplo, almacenamos los detalles del hotel en Couchbase. Las actualizaciones de la configuración del hotel se publican en un tema Solace PubSub+. Construimos una aplicación que se suscribe al Topic, recupera los datos actualizados y actualiza la configuración del hotel existente en Couchbase.
Utilizamos el viaje-muestra que contiene datos de muestra como vuelos, rutas y otros. En nuestro ejemplo, nos centramos en el hotel documentos almacenados en el inventario ámbito en el hotel colección. Todos los documentos de hotel constan de un documento JSON con varios atributos.
En este tutorial sólo actualizaremos un pequeño subconjunto de atributos, como por ejemplo mascotas_permitidas y internet_libre.

La aplicación de ejemplo se suscribe a un Solace Topic y actualiza los datos del hotel en Couchbase. Otras aplicaciones pueden proporcionar los cambios de configuración del hotel. Utilizamos un servicio simple que publica actualizaciones en el Topic.
Leyenda: La aplicación de ejemplo se suscribe a un Solace Topic y actualiza los datos del hotel en Couchbase. Otras aplicaciones pueden proporcionar los cambios de configuración del hotel. Usamos un servicio simple que publica actualizaciones al Topic.
Requisitos previos
Revise los requisitos previos para esta solicitud de muestra:
Couchbase Capella
Crear un Couchbase Capella y siga las instrucciones para aprovisionar su clúster de prueba. Esto sólo le llevará unos minutos y le proporcionará un clúster de Couchbase completo, incluyendo los módulos Datos y el conjunto de datos de muestra, muestra-viaje, que utilizaremos a lo largo de esta entrada del blog.
Puede utilizar este tutorial cuando no utilice nuestra oferta gestionada Couchbase Capella. El viaje-muestra bucket forma parte del producto del servidor Couchbase y puede importarse utilizando la consola de Couchbase.
Nube de intermediarios de eventos Solace PubSub
Inscríbete gratis Prueba en la nube de Solace y crear un Servicio/VPN. Utilizaremos la VPN para conectarnos a un tema de Solace.
Revisión del código - Suscriptor y editor de Solace
Para suscribirse a los mensajes de Solace Topic utilizamos la API Java de Solace. Este ejemplo de implementación se deriva de los ejemplos de la documentación de Solace. Para más detalles sobre la implementación, consulte la documentación de Solace.
|
1 2 3 4 5 6 7 8 9 |
final JCSMPProperties properties = new JCSMPProperties(); properties.setProperty(JCSMPProperties.HOST, "tcps://yourhost.messaging.solace.cloud:55443"); properties.setProperty(JCSMPProperties.USERNAME, "solace-cloud-client"); properties.setProperty(JCSMPProperties.PASSWORD, "password"); properties.setProperty(JCSMPProperties.VPN_NAME, "couchbasedemo"); JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties); session.connect(); final Topic topic = JCSMPFactory.onlyInstance().createTopic("tutorial/topic"); |
Configuramos el JCSMPProperties con la información de acceso pertinente disponible en la interfaz web de Solace. A continuación, creamos un JCSMPSession y un Tema.
Para suscribirnos a los mensajes del Topic implementamos un XMLMessageConsumer:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// ### Message Receiver - Subscribe to Topic ### final XMLMessageConsumer cons = session.getMessageConsumer(new XMLMessageListener() { @Override public void onReceive(BytesXMLMessage msg) { if (msg instanceof TextMessage) { System.out.printf("SUBSCRIBE: Message received: '%s'%n", ((TextMessage) msg).getText()); UpdateHotelData.getInstance().upsertHotelData(((TextMessage) msg).getText()); } else { System.out.println("Message received."); } } @Override public void onException(JCSMPException e) { System.out.printf("Consumer received exception: %s%n", e); } }); session.addSubscription(topic); System.out.println("SUBSCRIBE: Connected. Awaiting message..."); cons.start(); |
En el onReceive tomamos el mensaje, lo convertimos en una cadena y llamamos al método upsertHotelData método de ActualizarDatosHotel que contiene la implementación de Couchbase de la que hablaremos más adelante.
Añadimos la suscripción al tema a la sesión e iniciamos el consumidor de mensajes.
Para poder probar la implementación, también creamos un publicador de mensajes. Para ello, primero implementamos un XMLMessageProducer.
|
1 2 3 4 5 6 7 8 9 10 11 12 |
// ### Message Producer - Send message to Topic XMLMessageProducer prod = session.getMessageProducer(new JCSMPStreamingPublishCorrelatingEventHandler() { @Override public void responseReceivedEx(Object key) { System.out.println("Producer received response for msg: " + key.toString()); } @Override public void handleErrorEx(Object key, JCSMPException cause, long timestamp) { System.out.printf("Producer received error for msg: %s@%s - %s%n", key.toString(), timestamp, cause); } }); |
Con el productor de mensajes en su lugar, ahora podemos enviar mensajes como se ve a continuación.
|
1 2 3 4 5 6 7 8 9 |
// Send messages. Here we loop through and send the same message multiple times. for (int msgsSent = 0; msgsSent < 2; ++msgsSent) { TextMessage msg = JCSMPFactory.onlyInstance().createMessage(TextMessage.class); msg.setText(data); System.out.printf("PUBLISH: Sending message '%s' to topic '%s'...%n", data, topic.getName()); prod.send(msg, topic); //Gives us some time to follow the console logs Thread.sleep(3000); } |
Inserción de los datos en Couchbase
Con la lógica para suscribirse y publicar mensajes en su lugar, ahora vamos a centrarnos en cómo actualizar los documentos del hotel en Couchbase.
Para este ejemplo, supondremos que el mensaje recuperado del tema tiene el formato JSON que se indica a continuación. Incluyendo un hotel_id que utilizaremos como id de documento de los documentos del hotel almacenados en Couchbase, así como una serie de atributos que podemos modificar:
|
1 2 3 4 5 6 7 |
{ "hotel_id": "hotel_10025", "pets_ok": true, "free_breakfast": true, "free_internet": true, "free_parking": true } |
Para establecer una conexión segura desde nuestra aplicación a Couchbase Capella, necesitamos:
-
- Cree un usuario de base de datos y concédale acceso a los datos del hotel.
- Lista blanca de la dirección IP del host que ejecuta la aplicación
- Capturar la URL de conexión segura
Puedes gestionar todos estos detalles en el plano de gestión de Capella. Conectarse a Capella en el navegador:
- Abra el Juicio - Cluster y vaya a la sección Conectar tab
- Desplácese hasta el Acceso a bases de datos y haga clic en Gestionar credenciales

- Haga clic en Crear credenciales de base de datos y proporcione un nombre de usuario y una contraseña para la base de datos. Configure el nivel de acceso a la base de datos inventario ámbito en el viaje-muestra cubo y conceder acceso de Lectura/Escritura.

- Lista blanca de la dirección IP de la aplicación haciendo clic en Gestionar IP permitidas y luego Añadir IP permitida. Cuando ejecute la aplicación desde su máquina local, simplemente haga clic en Añadir mi IP y tu dirección IP externa será descubierta y añadida automáticamente.

- En el Red de área extensa copie la URL de conexión.
Revisión del código - conexión a Couchbase y actualización de documentos
Nos conectamos a Couchbase Capella como primer paso creando un objeto cluster. Proporcionamos la URL del endpoint capturada en el paso anterior, así como las credenciales de usuario de la base de datos.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// Update this to your cluster String endpoint = "cb.8c-f5di7pqrfjy8.cloud.couchbase.com"; String username = "dbuser"; String password = "Passw0rd!"; // User Input ends here. String bucketName = "travel-sample"; // Cluster Environment configuration ClusterEnvironment env = ClusterEnvironment.builder() .securityConfig( SecurityConfig.enableTls(true).trustManagerFactory(InsecureTrustManagerFactory.INSTANCE)) .ioConfig(IoConfig.enableDnsSrv(true)).build(); // Initialize the Connection and provide database user credentials Cluster cluster = Cluster.connect(endpoint, ClusterOptions.clusterOptions(username, password).environment(env)); // Create bucket object bucket = cluster.bucket(bucketName); bucket.waitUntilReady(Duration.parse("PT10S")); // create collection object for the 'hotel' collection located in the 'inventory' scope collection = bucket.scope("inventory").collection("hotel"); |
Una vez creado el objeto bucket, podemos recuperar el objeto hotel y utilizarlo más tarde para insertar los cambios en el documento. El sitio hotel está dentro del ámbito inventario alcance de la viaje-muestra cubo.
Tenga en cuenta que utilizo el patrón singleton para utilizar el mismo objeto de colección en todas las solicitudes posteriores. Ver el ejemplo de implementación para más detalles en mi proyecto GitHub.
Veamos ahora cómo podemos actualizar los documentos del hotel con los datos recibidos del Tema.
El mensaje recibido es un String por lo que como primer paso necesitamos convertirlo en un objeto JSON.
|
1 2 3 4 |
public void upsertHotelData(String content) { … hotelData = JsonObject.fromJson(content); … |
Dado que los datos de entrada del Tema sólo contienen un subconjunto de todos los atributos del documento del hotel, debemos estudiar cómo actualizar los campos del documento de la forma más eficaz.
Hay dos opciones posibles:
-
- Utilizamos el ID del documento para recuperar el documento completo de Couchbase, actualizar los atributos y volver a escribir el documento.
- Utilizamos la API de sub-documento en Couchbase para actualizar sólo los campos dedicados dentro del documento en lugar de reemplazar todo el documento.
La primera opción se utiliza habitualmente para actualizar o insertar documentos. Sin embargo, si trabajamos con documentos más grandes y sólo queremos actualizar una pequeña parte de ellos, no es necesario enviar todo el documento a través de la red. En nuestra implementación a continuación, utilizaremos la API de subdocumento para resolver y actualizar campos en el documento del hotel.
|
1 2 3 4 5 6 7 8 9 |
//Instead of resolving the entire document we use the sub-document API //to only resolve the relevant parts of the hotel document prior to the update. //Here we can define the path to the relevant attributes of the document. //In our scenario 'pets_ok' and 'free_internet' are at the document root. LookupInResult result = collection.lookupIn(hotelData.getString("hotel_id"), Arrays.asList(get("pets_ok"), get("free_internet"))); //Display the current values prior to the document update System.out.println("Values prior to update: 'pets_ok': " + result.contentAs(0, String.class) + ", 'free_internet': " + result.contentAs(1, String.class)); |
Utilizamos el buscarEn para consultar el documento en busca de determinadas rutas. Los distintos componentes de la ruta se separan mediante puntos (.). La dirección mascotas_ok y internet_libre se encuentran en la raíz del documento. A continuación, recorremos el resultado y mostramos los valores actuales.
|
1 2 3 4 5 6 |
//Even here we use the sub-document API. //Instead of updating the entire document we only update the relevant attributes. //The UPSERT method will either update the attributes if they already exist or create them in case they do not exist in within the document MutateInResult upsertResult = collection.mutateIn(hotelData.getString("hotel_id"), Arrays.asList(upsert("pets_ok", hotelData.getBoolean("pets_ok")), upsert("free_internet", hotelData.getBoolean("free_internet")))); |
En mutateIn permite la actualización de una o más rutas en un documento. Identificamos el documento en Couchbase por su ID y proporcionamos un array de rutas y valores que queremos establecer.
Por último, pero no por ello menos importante, ejecutamos el buscarEn para comprobar que la actualización se ha realizado correctamente.
|
1 2 3 4 |
//Resolve and display the updated attributes LookupInResult result1 = collection.lookupIn(hotelData.getString("hotel_id"), Arrays.asList(get("pets_ok"), get("free_internet"))); System.out.println("Values after the update: 'pets_ok': " + result1.contentAs(0, String.class) + ", 'free_internet': " + result1.contentAs(1, String.class)); |
Próximos pasos
Este artículo nos enseñó a integrar el broker de eventos Solace con Couchbase usando los SDKs Java de Couchbase y Solace.
- El ejemplo de código completo está disponible en mi repositorio GitHub para este proyecto.
- Aprender a enviar mensajes de Couchbase a un tema de Solace en esta entrada del blog (Parte 1).
- Más información SDK de Couchbase aquí.
- Utiliza el Zona de juegos de Couchbase para empezar a desarrollarse rápidamente.
- Pruebe Solace PubSub+ Nube con un juicio.
- Empiece a utilizar Couchbase hoy mismo con el Base de datos en la nube Couchbase Capella.