Sin categoría

Kafka y Couchbase: Puesta en marcha con el consumidor Kafka de ejemplo en 10 minutos

Introducción

Couchbase Kafka Connector 1.2.0 acaba de ser enviado. Junto con las diversas correcciones de errores, hay un nuevo código de ejemplo para un consumidor Kafka, además del productor Kafka que estaba disponible anteriormente. Para revisar rápidamente las condiciones:

  • Un productor de Kafka escribe datos en Kafka, por lo que es una fuente de mensajes desde la perspectiva de Kafka.
  • Un consumidor en la terminología de Kafka es un proceso que se suscribe a temas y luego hace algo con la alimentación de mensajes publicados que se emiten desde un clúster Kafka. Es básicamente un sumidero.

En este blog, pondrás en marcha un ejemplo de consumidor Kafka al estilo "¡Hola Mundo!" que escribe en Couchbase. En el camino, también obtendrás un entorno sandbox con un broker Kafka y un único nodo Couchbase Server para que puedas ejecutar y modificar el consumidor y productor de ejemplo.

Instalación Requisitos previos

En muestras forman parte del Conector Kafka de Couchbase árbol fuente. Para obtenerlos, basta con clonar todo el repositorio:

Ahora, vamos a configurar tu entorno de pruebas usando imágenes pre-configuradas de Kafka y Couchbase Server. Tienes que instalar Vagrant, VirtualBox y Ansible para configurarlos localmente. Si tienes estos servicios instalados en otro lugar, asegúrate de ajustar las direcciones de host a lo largo de esta guía apropiadamente.

Compruebe las versiones de las dependencias:

Puede asignar nombres legibles por humanos a las cajas utilizando el plugin para Vagrant. Si aún no lo tienes instalado, utiliza el siguiente comando:

Ahora ya puedes aprovisionar los servidores y ponerte en marcha:

Nota: Si un servidor no se instala debido a los tiempos de espera, vuelva a intentar "vagrant up" después de unos minutos y puede que funcione.

Compruebe que los hosts responden:

Si navegas a deberías poder ver tu Couchbase Server de nodo único configurado con credenciales Administrador/contraseña.

Construcción de las muestras

Para evitar cualquier problema de classpath, utilice maven para crear un archivo JAR autónomo para cada aplicación de ejemplo.

La aplicación generadora es una aplicación CLI mínima. Utiliza el SDK Java de Couchbase para envolver líneas de entrada desde STDIN en documentos JSON y los envía al bucket "por defecto" en Couchbase Server:

El productor se conecta a Couchbase Server y transmite todas las mutaciones a Kafka. Esta aplicación utiliza el proyecto couchbase-kafka-connector entre bastidores.

El consumidor es un consumidor típico de Kafka, que por defecto envía cualquier mensaje entrante en el tema "default" a STDOUT.

Ejecución de las muestras

Ahora que lo tienes todo preparado, es el momento de ejecutar todas las muestras. Necesitarás tres sesiones shell diferentes porque cada una de ellas ejecuta un proceso hasta que se detiene. Asumiremos que estás en la sesión  /tmp/conector-kafka/muestras directorio.

En primer lugar, pon en marcha el generador:

Debería mostrar la configuración de la conexión y, a continuación, pasar al símbolo del sistema >. Puede escribir cualquier cosa allí y verificar que se está creando correctamente mirando en la interfaz de usuario de administración del servidor Couchbase:

Documentos del generador en el cubo

En este momento puedes ejecutar el ejemplo de conector

Por cada línea que escriba en el generador, verá una línea del productor como ésta:

La muestra lo escribe justo antes de enviar la carga útil a Kafka, en la implementación de la clase filtro. Comprobemos cómo recibe Kafka estos mensajes.

Puedes seguir jugando con él mientras los tres servicios estén en marcha.

Desarrollando con Couchbase Kafka Connector

Sigamos adelante y echemos un vistazo al código. Las tres aplicaciones son bastante amigables para los experimentos, por ejemplo, el generador cabe en unas pocas líneas:

Básicamente, el generador abre una conexión al bucket "default" en tu instancia "couchbase1.vagrant" y escribe tus mensajes en claves aleatorias. Puedes extenderlo para enviar otros tipos de eventos. Otra cosa que puedes intentar es eliminar claves.

Por defecto, Couchbase Connector para Kafka se ejecuta en modo servidor, donde toma prestado el hilo activo y escucha activamente a Couchbase Server en busca de nuevos eventos. Hay varios puntos donde puedes aplicar tus ideas o cambios. El más obvio es el constructor de configuración, donde no sólo especificas las credenciales y direcciones de los servicios a los que te conectas, sino que también puedes especificar varias clases de serializadores y filtros.

La aplicación de ejemplo implementa varias de ellas. La clase Filter es la más sencilla:

Aquí puedes poner las comprobaciones personalizadas que quieras, y si pasar() devuelve  falsoel conector descarta el mensaje y no lo envía a Kafka.

Default Encoder, que viene con la distribución del conector, intenta representar cada mensaje como JSON, pero eso probablemente no es lo que necesitas, así que puedes aplicar y conversión a Evento DCPEvent y devuelve una matriz de bytes que se almacenará en Kafka. En este ejemplo, solo convertimos los eventos a su representación de cadena.

Un ajuste más avanzado es SerializadorEstado de Couchbase. Implementándolo, puedes controlar cómo la librería rastreará los cursores de flujo (es decir, los números de secuencia para cada partición dentro de Couchbase Server), y si se reanudará después de reiniciar el conector. Hay una implementación Zookeeper del serializador de estado en la distribución. Aquí en el ejemplo, hemos implementado NullStateSerializer que no persiste nada, pero muestra una implementación mínima.

El último componente de su clúster de demostración es el consumidor Kafka ConsumidorAbstractoque es un ejemplo bastante típico de consumidor. Consta de dos partes: , que implementa el arranque y el posicionamiento en el tema Kafka, y ImprimirConsumidorque lleva su "lógica de negocio", o simplemente emite cada mensaje que se le pasa por ConsumidorAbstracto:

Como en los otros ejemplos aquí, puedes jugar modificando el consumidor de ejemplo. Incluso puedes cerrar el circuito enviando todo de vuelta a Couchbase Server. Kafka es software distribuido, al igual que Couchbase Server, así que tenlo en cuenta cuando ejecutes en tu propio cluster y ajusta la función main() en consecuencia. En nuestro ejemplo, sólo tenemos una única partición, partition (0) en Kafka, por lo que nuestro principal tiene este aspecto:

Por supuesto, en un clúster de producción se ejecutará más de una partición.

Conclusión

Espero que esto te ayude a empezar con buen pie con Couchbase y Kafka. Gracias.

Comparte este artículo
Recibe actualizaciones del blog de Couchbase en tu bandeja de entrada
Este campo es obligatorio.

Autor

Publicado por Sergey Avseyev, Ingeniero SDK, Couchbase

Sergey Avseyev es Ingeniero SDK en Couchbase. Sergey Avseyev es responsable del desarrollo del conector Kafka, y la biblioteca subyacente, que implementa DCP, el protocolo de replicación de Couchbase. También mantiene PHP SDK para Couchbase.

Deja un comentario

¿Listo para empezar con Couchbase Capella?

Empezar a construir

Consulte nuestro portal para desarrolladores para explorar NoSQL, buscar recursos y empezar con tutoriales.

Utilizar Capella gratis

Ponte manos a la obra con Couchbase en unos pocos clics. Capella DBaaS es la forma más fácil y rápida de empezar.

Póngase en contacto

¿Quieres saber más sobre las ofertas de Couchbase? Permítanos ayudarle.