Hola a todos Soy Koji, un ingeniero de soluciones que trabaja en Japón. Este es mi primer post en couchbase.com y estoy muy emocionado.
En este blog, voy a explicar cómo puedes integrar Couchbase Server con Apache NiFi.
Índice:
-
Configuración de la conexión al servidor Couchbase: CouchbaseClusterService
-
Ejemplo de PutCouchbaseKey: Almacenar Tweets en Couchbase Server
-
GetCouchbaseKey ejemplo: Descargar documentos específicos de Couchbase en un único archivo Zip
Qué es NiFi
Apache NiFi es un proyecto Apache de alto nivel que soporta grafos dirigidos potentes y escalables de enrutamiento de datos, transformación y lógica de mediación de sistemas. Recientemente, Hortonworks anunció que proporciona Hortonworks DataFlow (HDF). NiFi se utiliza en HDF como motor central de procesamiento de flujos de datos para dar soporte a los casos de uso de IoAT (Internet of Anything). Para más información, consulte estos enlaces.
NiFi, Couchbase y yo
Mi título oficial en Couchbase es 'Ingeniero de Soluciones', y la preventa es mi tarea principal. Sin embargo, también me encanta escribir código. Escribir código mantiene frescos mis conocimientos técnicos, lo que en última instancia me ayuda a ofrecer mejores soluciones a nuestros clientes.
Hace unos días, un conjunto de procesadores NiFi para el acceso a Couchbase Server fue añadido al código base de Nifi. ¡La contribución fue hecha por ME! Fue una gran experiencia trabajar con los committers de NiFi a través de los detallados procesos de revisión. La exhaustiva guía del desarrollador documentación me ayudó mucho a meterme en el proyecto.
En caso de que le interese saber cómo funcionaba el proceso de contribución, aquí tiene algunos enlaces que puede consultar:
-
Solicitud de extracción: NIFI-992: Añadir nifi-couchbase-bundle
Vale, basta de introducción. Vamos a sumergirnos en las configuraciones de NiFi para describir cómo integrar Couchbase Server.
Componentes clave de NiFi
Después de descargar NiFi, puede iniciarlo y acceder al diseñador de flujos de datos GUI a través de su navegador. Estos son algunos de los componentes clave con los que debe familiarizarse:
-
FlowFile: Cada dato transmitido dentro del flujo NiFi se transfiere como un objeto denominado FlowFile. Tiene un contenido opaco y un conjunto arbitrario de atributos. Sí, parece un archivo.
-
Procesador: Un pequeño módulo de procesamiento que se supone que es bueno en una sola tarea, algo así como un comando de Linux. Actualmente hay unos 80 procesadores disponibles. Realizan funciones como el manejo de archivos, el acceso a bases de datos y el manejo de HTTP y otros protocolos.
-
Relación: Cada procesador está conectado por una tubería llamada Relación. Algunos procesadores tienen múltiples relaciones como éxito, fracaso u original. El FlowFile procesado será transferido al siguiente procesador a través de esta relación.
Organizar los flujos de datos por grupo de procesos
En NiFi Data Flow, un "Grupo de Procesos" puede ser muy útil cuando el flujo se vuelve más complejo. Te permite organizar múltiples flujos, entonces cada Grupo de Proceso puede ser iniciado/detenido individualmente. En este flujo de datos de demostración, he creado dos grupos de procesos, "Tweets to Couchbase sample" y "Dump Couchbase Documents sample".
Configuración de la conexión al servidor Couchbase: CouchbaseClusterService
Permítanme describir cómo configurar una conexión a un clúster de Couchbase Server.
Dentro de un flujo de datos NiFi realista, tendrás que usar procesadores Couchbase varias veces para poner y obtener datos del cluster. Así que no sería una buena idea configurar los ajustes de conexión en cada procesador. Si hicieras eso, sería difícil cambiar el cluster de destino porque los ajustes del cluster estarían dispersos por todas partes.
Para evitar este problema, NiFi proporciona un mecanismo llamado ControllerService para configurar un componente central que puede ser compartido entre procesadores. NiFi incluye algunos Controller Services existentes como el que proporciona connection pooling a un RDBMS. Así que seguí el diseño e implementé CouchbaseClusterService.
Te permite establecer la Cadena de Conexión para especificar a qué cluster de Couchbase Server acceder. Si los buckets requieren una contraseña, también puedes establecerla aquí. La configuración de NiFi tiene dos tipos de propiedades, estáticas y dinámicas. "Connection String" es estática, y "Bucket Password for {bucket_name}" es dinámica. Puede añadir nuevas propiedades dinámicas haciendo clic en el botón "Nueva propiedad" para especificar contraseñas para diferentes cubos.
Así que, de nuevo, lo importante es que toda la configuración a nivel de cluster es gestionada por este CouchbaseClusterService. Si quieres trabajar con otro cluster de Couchbase, simplemente añade otro CouchbaseClusterService y configúralo apropiadamente.
Ejemplo de PutCouchbaseKey: Almacenar Tweets en Couchbase Server
El procesamiento de feeds de Twitter es un ejemplo común que podemos utilizar para ilustrar el flujo de datos en streaming. Con NiFi y Couchbase, es increíblemente fácil, como se muestra en la siguiente imagen:
-
GetTwitter: NiFi ha una variedad de procesadores útiles como éste, y puede integrarse fácilmente con otros sistemas.
-
PutCouchbaseKey: Cada Tweet es enviado como un FlowFile. Aquí, lo almaceno usando el UUID del FlowFile como el ID del documento de Couchbase. Como muestra la imagen, PutCouchbaseKey tiene una relación de auto "reintento". Si un FlowFile falla con CouchbaseExceptions y puede ser reintentado, como podría suceder con un error temporal del lado del servidor, entonces transfiérelo a la relación "retry". Si el error no es recuperable, como un error de configuración o algún otro error duro, entonces esos FlowFiles son transferidos a la relación "failure".
-
LogAttibute: He añadido un procesador LogAttribute al final del flujo. El LogAttribute puede emitir mensajes de registro sobre las propiedades y contenidos de un FlowFile. Esto es útil para depurar cualquier problema que pueda surgir.
Veamos la configuración de PutCouchbaseKey:
-
Couchbase Cluster Controller Service se refiere al servicio controlador centralizado de Couchbase que se describió anteriormente.
-
Nombre del cubo es el nombre del cubo en el que desea almacenar el contenido.
-
El tipo de documento es Json o Binario.
-
He dejado la propiedad Id Documento en blanco para que el procesador utilice el UUID de FlowFile como id del documento. Alternativamente, puede especificar Lenguaje de expresión NiFi aquí para utilizar otro valor de propiedad o para calcular un id de documento.
Ahora que hemos configurado CouchbaseClusterService y los procesadores, vamos a iniciar NiFi Data Flow. Lo único que tienes que hacer es pulsar el botón del triángulo verde. ¡Entonces podrás confirmar que los Tweets están siendo almacenados en Couchbase!
GetCouchbaseKey ejemplo: Descargar documentos específicos de Couchbase en un único archivo Zip
Es posible que desee descargar un conjunto determinado de documentos de Couchbase Server para enviarlos a otro sistema o para hacer una copia de seguridad parcial.
Para ello, he configurado el flujo de datos como se ve en la siguiente imagen. Es más complejo que el ejemplo anterior de Twitter y utiliza algunos tipos diferentes de procesadores:
Voy a explicar qué hace cada procesador:
-
GetFile: Vigila el directorio especificado y, una vez que se introduce en él el archivo de destino, transfiere el contenido al siguiente procesador.
-
SplitText: Divide el contenido del fichero y envía cada línea como un FlowFile.
-
GetCouchbaseKey: Obtiene un documento de Couchbase usando el contenido entrante de FlowFile como id de documento.
-
ActualizarAtributo: Con el fin de utilizar Couchbase documento id para el nombre de archivo real que se utiliza en el archivo Zip final, he copiado "couchbase.doc.id" atributo a "nombre de archivo" aquí.
-
FusionarContenido: Fusiona y comprime múltiples FlowFiles en un único archivo Zip.
-
ActualizarAtributo: Establece el nombre del archivo Zip en la fecha actual, utilizando la expresión "${now():format('yyyyMMdd_HHmmss')}.zip".
-
PonerArchivo: Por último, coloca el archivo Zip en el directorio especificado.
Los directorios y archivos reales tienen el siguiente aspecto:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# Directorio y archivo drwxr-xr-x 2 koji rueda 68B Octubre 2 16:19 couchbase-vertedero-en/ drwxr-xr-x 2 koji rueda 68B Octubre 2 16:29 couchbase-vertedero-fuera/ -rw-r--r-- 1 koji rueda 111B Octubre 2 16:25 en.dat # Especifique los identificadores de documento de Couchbase que desea obtener koji@Kojis-MacBook-Pro:tmp$ cat en.dat 000069ee-cf4d-46bb-a11d-de09a00cd82c 00021100-bb6c-4327-8cad-16474f5cd928 0004b561-1ea4-4e46-8455-2040481d638e # GetFile borra el fichero original para que no vuelva a ser procesado. # Se recomienda crear el archivo en diferentes dir, # luego poner el archivo en el dir de entrada. # (Opcionalmente, puede conservar el archivo original) koji@Kojis-MacBook-Pro:tmp$ cp en.dat couchbase-vertedero-en/ # Tras el procesamiento NiFi, se crea un archivo Zip. koji@Kojis-MacBook-Pro:tmp$ ll couchbase-vertedero-fuera/ total 8 -rw-r--r-- 1 koji rueda 3.8K Octubre 2 16:51 20151002_165136.zip # Extraiga el archivo Zip y confirme que contiene archivos JSON. koji@Kojis-MacBook-Pro:couchbase-vertedero-fuera$ descomprimir 20151002_165136.zip Archivo: 20151002_165136.zip inflando: 000069ee-cf4d-46bb-a11d-de09a00cd82c inflando: 00021100-bb6c-4327-8cad-16474f5cd928 inflando: 0004b561-1ea4-4e46-8455-2040481d638e |
Ahora echemos un vistazo a la configuración de GetCouchbaseKey:
-
Al igual que PutCouchbaseKey, la conexión a Couchbase se configura en ControllerService
-
Dejé el Id de Documento en blanco, para permitirle usar el contenido entrante de FlowFile como id de documento. También puede especificar Expression Language aquí para construir un id de documento.
Conclusión
¿No es fantástico poder automatizar tareas como éstas sin tener que escribir ningún programa? Aunque de momento sólo están disponibles procesadores de acceso simple clave/valor, ¡puedes utilizarlo de forma creativa! Estoy planeando añadir más procesadores para que puedas utilizar consultas View y N1QL desde NiFi, y estoy deseando volver a verte con nuevas funcionalidades.
Gracias, y ¡feliz tratamiento de los datos!