En el anterior entrada del blog hablamos de cómo Couchbase puede integrarse perfectamente en un Arquitectura basada en eventos. Usando el Servicio de Eventos de Couchbase, las mutaciones de documentos pueden ser publicadas a un Solace PubSub+ desde donde los datos se ponen a disposición de los microservicios suscriptores casi en tiempo real.
Este post se centra en cómo los microservicios como Topic subscribers pueden aprovechar Couchbase como un almacén de datos escalable y resistente.
Visión general de la arquitectura y aplicación de ejemplo
En este solicitud de muestranos suscribimos a un tema Solace PubSub+, procesamos los datos y actualizamos el documento en Couchbase.
En nuestra aplicación de ejemplo, almacenamos los detalles del hotel en Couchbase. Las actualizaciones de la configuración del hotel se publican en un tema Solace PubSub+. Construimos una aplicación que se suscribe al Topic, recupera los datos actualizados y actualiza la configuración del hotel existente en Couchbase.
Utilizamos el viaje-muestra que contiene datos de muestra como vuelos, rutas y otros. En nuestro ejemplo, nos centramos en el hotel documentos almacenados en el inventario ámbito en el hotel colección. Todos los documentos de hotel constan de un documento JSON con varios atributos.
En este tutorial sólo actualizaremos un pequeño subconjunto de atributos, como por ejemplo mascotas_permitidas y internet_libre.

La aplicación de ejemplo se suscribe a un Solace Topic y actualiza los datos del hotel en Couchbase. Otras aplicaciones pueden proporcionar los cambios de configuración del hotel. Utilizamos un servicio simple que publica actualizaciones en el Topic.
Leyenda: La aplicación de ejemplo se suscribe a un Solace Topic y actualiza los datos del hotel en Couchbase. Otras aplicaciones pueden proporcionar los cambios de configuración del hotel. Usamos un servicio simple que publica actualizaciones al Topic.
Requisitos previos
Revise los requisitos previos para esta solicitud de muestra:
Couchbase Capella
Crear un Couchbase Capella y siga las instrucciones para aprovisionar su clúster de prueba. Esto sólo le llevará unos minutos y le proporcionará un clúster de Couchbase completo, incluyendo los módulos Datos y el conjunto de datos de muestra, muestra-viaje, que utilizaremos a lo largo de esta entrada del blog.
Puede utilizar este tutorial cuando no utilice nuestra oferta gestionada Couchbase Capella. El viaje-muestra bucket forma parte del producto del servidor Couchbase y puede importarse utilizando la consola de Couchbase.
Nube de intermediarios de eventos Solace PubSub
Inscríbete gratis Prueba en la nube de Solace y crear un Servicio/VPN. Utilizaremos la VPN para conectarnos a un tema de Solace.
Revisión del código - Suscriptor y editor de Solace
Para suscribirse a los mensajes de Solace Topic utilizamos la API Java de Solace. Este ejemplo de implementación se deriva de los ejemplos de la documentación de Solace. Para más detalles sobre la implementación, consulte la documentación de Solace.
1 2 3 4 5 6 7 8 9 |
final JCSMPProperties propiedades = nuevo JCSMPProperties(); propiedades.setProperty(JCSMPProperties.ANFITRIÓN, "tcps://yourhost.messaging.solace.cloud:55443"); propiedades.setProperty(JCSMPProperties.NOMBRE, "solace-cloud-client"); propiedades.setProperty(JCSMPProperties.CONTRASEÑA, "contraseña"); propiedades.setProperty(JCSMPProperties.VPN_NAME, "couchbasedemo"); JCSMPSession sesión = JCSMPFactory.onlyInstance().crearSesión(propiedades); sesión.conecte(); final Tema tema = JCSMPFactory.onlyInstance().crearTema("tutorial/tema"); |
Configuramos el JCSMPProperties con la información de acceso pertinente disponible en la interfaz web de Solace. A continuación, creamos un JCSMPSession y un Tema.
Para suscribirnos a los mensajes del Topic implementamos un XMLMessageConsumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// ### Receptor de mensajes - Suscribirse al tema ### final XMLMessageConsumer contras = sesión.getMessageConsumer(nuevo XMLMessageListener() { @Override público void onReceive(BytesXMLMessage msg) { si (msg instanceof Mensaje de texto) { Sistema.fuera.printf("SUBSCRIBE: Mensaje recibido: '%s'%n", ((Mensaje de texto) msg).getText()); ActualizarDatosHotel.getInstance().upsertHotelData(((Mensaje de texto) msg).getText()); } si no { Sistema.fuera.println("Mensaje recibido."); } } @Override público void onException(JCSMPException e) { Sistema.fuera.printf("Consumidor recibió excepción: %s%n", e); } }); sesión.addSubscription(tema); Sistema.fuera.println("SUBSCRIBE: Conectado. Esperando mensaje..."); contras.iniciar(); |
En el onReceive tomamos el mensaje, lo convertimos en una cadena y llamamos al método upsertHotelData método de ActualizarDatosHotel que contiene la implementación de Couchbase de la que hablaremos más adelante.
Añadimos la suscripción al tema a la sesión e iniciamos el consumidor de mensajes.
Para poder probar la implementación, también creamos un publicador de mensajes. Para ello, primero implementamos un XMLMessageProducer.
1 2 3 4 5 6 7 8 9 10 11 12 |
// ### Message Producer - Enviar mensaje a Topic XMLMessageProducer prod = sesión.getMessageProducer(nuevo JCSMPStreamingPublishCorrelatingEventHandler() { @Override público void responseReceivedEx(Objeto clave) { Sistema.fuera.println("Productor recibió respuesta para msg: " + clave.toString()); } @Override público void handleErrorEx(Objeto clave, JCSMPException causa, largo marca de tiempo) { Sistema.fuera.printf("Productor recibido error para msg: %s@%s - %s%n", clave.toString(), marca de tiempo, causa); } }); |
Con el productor de mensajes en su lugar, ahora podemos enviar mensajes como se ve a continuación.
1 2 3 4 5 6 7 8 9 |
// Enviar mensajes. Aquí hacemos un bucle y enviamos el mismo mensaje varias veces. para (int msgsEnviado = 0; msgsEnviado < 2; ++msgsEnviado) { Mensaje de texto msg = JCSMPFactory.onlyInstance().crearMensaje(Mensaje de texto.clase); msg.setText(datos); Sistema.fuera.printf("PUBLICAR: Enviando mensaje '%s' a tema '%s'...%n", datos, tema.getName()); prod.enviar(msg, tema); //Nos da tiempo a seguir los registros de la consola Hilo.dormir(3000); } |
Inserción de los datos en Couchbase
Con la lógica para suscribirse y publicar mensajes en su lugar, ahora vamos a centrarnos en cómo actualizar los documentos del hotel en Couchbase.
Para este ejemplo, supondremos que el mensaje recuperado del tema tiene el formato JSON que se indica a continuación. Incluyendo un hotel_id que utilizaremos como id de documento de los documentos del hotel almacenados en Couchbase, así como una serie de atributos que podemos modificar:
1 2 3 4 5 6 7 |
{ "hotel_id": "hotel_10025", "pets_ok": verdadero, "desayuno_gratuito": verdadero, "free_internet": verdadero, "free_parking": verdadero } |
Para establecer una conexión segura desde nuestra aplicación a Couchbase Capella, necesitamos:
-
- Cree un usuario de base de datos y concédale acceso a los datos del hotel.
- Lista blanca de la dirección IP del host que ejecuta la aplicación
- Capturar la URL de conexión segura
Puedes gestionar todos estos detalles en el plano de gestión de Capella. Conectarse a Capella en el navegador:
- Abra el Juicio - Cluster y vaya a la sección Conectar tab
- Desplácese hasta el Acceso a bases de datos y haga clic en Gestionar credenciales
- Haga clic en Crear credenciales de base de datos y proporcione un nombre de usuario y una contraseña para la base de datos. Configure el nivel de acceso a la base de datos inventario ámbito en el viaje-muestra cubo y conceder acceso de Lectura/Escritura.
- Lista blanca de la dirección IP de la aplicación haciendo clic en Gestionar IP permitidas y luego Añadir IP permitida. Cuando ejecute la aplicación desde su máquina local, simplemente haga clic en Añadir mi IP y tu dirección IP externa será descubierta y añadida automáticamente.
- En el Red de área extensa copie la URL de conexión.
Revisión del código - conexión a Couchbase y actualización de documentos
Nos conectamos a Couchbase Capella como primer paso creando un objeto cluster. Proporcionamos la URL del endpoint capturada en el paso anterior, así como las credenciales de usuario de la base de datos.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// Actualiza esto a tu cluster Cadena punto final = "cb.8c-f5di7pqrfjy8.cloud.couchbase.com"; Cadena nombre de usuario = "dbuser"; Cadena contraseña = "¡Passw0rd!"; // La entrada del usuario termina aquí. Cadena bucketName = "viaje-muestra"; // Configuración del entorno de clúster ClusterEnvironment env = ClusterEnvironment.constructor() .securityConfig( SecurityConfig.enableTls(verdadero).trustManagerFactory(InsecureTrustManagerFactory.INSTANCIA)) .ioConfig(IoConfig.enableDnsSrv(verdadero)).construya(); // Inicializar la conexión y proporcionar las credenciales de usuario de la base de datos Grupo grupo = Grupo.conecte(punto final, ClusterOptions.clusterOptions(nombre de usuario, contraseña).medio ambiente(env)); // Crear objeto cubo cubo = grupo.cubo(bucketName); cubo.waitUntilReady(Duración.analizar("PT10S")); // crear objeto de colección para la colección 'hotel' ubicada en el ámbito 'inventario colección = cubo.alcance("inventario").colección("hotel"); |
Una vez creado el objeto bucket, podemos recuperar el objeto hotel y utilizarlo más tarde para insertar los cambios en el documento. El sitio hotel está dentro del ámbito inventario alcance de la viaje-muestra cubo.
Tenga en cuenta que utilizo el patrón singleton para utilizar el mismo objeto de colección en todas las solicitudes posteriores. Ver el ejemplo de implementación para más detalles en mi proyecto GitHub.
Veamos ahora cómo podemos actualizar los documentos del hotel con los datos recibidos del Tema.
El mensaje recibido es un String por lo que como primer paso necesitamos convertirlo en un objeto JSON.
1 2 3 4 |
público void upsertHotelData(Cadena contenido) { ... hotelData = JsonObject.fromJson(contenido); ... |
Dado que los datos de entrada del Tema sólo contienen un subconjunto de todos los atributos del documento del hotel, debemos estudiar cómo actualizar los campos del documento de la forma más eficaz.
Hay dos opciones posibles:
-
- Utilizamos el ID del documento para recuperar el documento completo de Couchbase, actualizar los atributos y volver a escribir el documento.
- Utilizamos la API de sub-documento en Couchbase para actualizar sólo los campos dedicados dentro del documento en lugar de reemplazar todo el documento.
La primera opción se utiliza habitualmente para actualizar o insertar documentos. Sin embargo, si trabajamos con documentos más grandes y sólo queremos actualizar una pequeña parte de ellos, no es necesario enviar todo el documento a través de la red. En nuestra implementación a continuación, utilizaremos la API de subdocumento para resolver y actualizar campos en el documento del hotel.
1 2 3 4 5 6 7 8 9 |
//En lugar de resolver el documento completo utilizamos la API de subdocumentos //para resolver sólo las partes relevantes del documento del hotel antes de la actualización. //Aquí podemos definir la ruta a los atributos relevantes del documento. //En nuestro escenario 'pets_ok' y 'free_internet' están en la raíz del documento. LookupInResult resultado = colección.buscarEn(hotelData.getString("hotel_id"), Matrices.asList(consiga("pets_ok"), consiga("free_internet"))); //Mostrar los valores actuales antes de la actualización del documento Sistema.fuera.println("Valores anteriores a la actualización: 'pets_ok': " + resultado.contentAs(0, Cadena.clase) + ", 'free_internet': " + resultado.contentAs(1, Cadena.clase)); |
Utilizamos el buscarEn para consultar el documento en busca de determinadas rutas. Los distintos componentes de la ruta se separan mediante puntos (.). La dirección mascotas_ok y internet_libre se encuentran en la raíz del documento. A continuación, recorremos el resultado y mostramos los valores actuales.
1 2 3 4 5 6 |
//En este caso utilizamos la API de subdocumentos. //En lugar de actualizar todo el documento sólo actualizamos los atributos relevantes. //El método UPSERT actualizará los atributos si ya existen o los creará en caso de que no existan en el documento. MutarEnResultado upsertResultado = colección.mutateIn(hotelData.getString("hotel_id"), Matrices.asList(upsert("pets_ok", hotelData.getBoolean("pets_ok")), upsert("free_internet", hotelData.getBoolean("free_internet")))); |
En mutateIn permite la actualización de una o más rutas en un documento. Identificamos el documento en Couchbase por su ID y proporcionamos un array de rutas y valores que queremos establecer.
Por último, pero no por ello menos importante, ejecutamos el buscarEn para comprobar que la actualización se ha realizado correctamente.
1 2 3 4 |
//Resolver y mostrar los atributos actualizados LookupInResult resultado1 = colección.buscarEn(hotelData.getString("hotel_id"), Matrices.asList(consiga("pets_ok"), consiga("free_internet"))); Sistema.fuera.println("Valores tras la actualización: 'pets_ok': " + resultado1.contentAs(0, Cadena.clase) + ", 'free_internet': " + resultado1.contentAs(1, Cadena.clase)); |
Próximos pasos
Este artículo nos enseñó a integrar el broker de eventos Solace con Couchbase usando los SDKs Java de Couchbase y Solace.
- El ejemplo de código completo está disponible en mi repositorio GitHub para este proyecto.
- Aprender a enviar mensajes de Couchbase a un tema de Solace en esta entrada del blog (Parte 1).
- Más información SDK de Couchbase aquí.
- Utiliza el Zona de juegos de Couchbase para empezar a desarrollarse rápidamente.
- Pruebe Solace PubSub+ Nube con un juicio.
- Empiece a utilizar Couchbase hoy mismo con el Base de datos en la nube Couchbase Capella.