RxJava es una herramienta impresionante para la programación reactiva que también es útil como lector CSV de Java.

En caso de que nunca haya utilizado o incluso oído hablar de RxJava, es una forma de programar con flujos de datos asíncronos. También es una programación basada en eventos donde puedes observar o escuchar datos de flujo y hacer algo cuando dicho flujo es descubierto. Los flujos de datos pueden ser cualquier cosa, desde variables a estructuras de datos.

Para este ejemplo, digamos que una línea de datos CSV es un flujo de datos. Muchos desarrolladores se encuentran con la necesidad de cargar datos CSV en su base de datos más a menudo de lo que esperan. En la mayoría de los escenarios no se puede simplemente leer las líneas de un archivo CSV y soltarlo en una base de datos. En este ejemplo, vamos a utilizar RxJava para suscribirnos a un flujo de datos y transformarlo en Java para leer los archivos CSV que se guardarán en Couchbase.

Requisitos

No hay demasiados requisitos para poner en marcha este proyecto. Como mínimo, necesitarás lo siguiente:

Todo el desarrollo y el trabajo se llevará a cabo con el JDK 1.8 y Maven, y esto incluye la ejecución de la aplicación.

Comprender el conjunto de datos y el modelo de datos

Una gran manera de mojarse los pies cuando se trata de RxJava es conseguir un conjunto de datos de muestra para jugar. Para simplificar, nos inventaremos nuestro propio archivo de valores separados por comas (CSV), pero si quieres algo más extravagante, puedes visitar el sitio web de ciencia de datos, Kaggle.

Supongamos que nuestro sencillo conjunto de datos CSV tiene las siguientes columnas por fila:

  1. id
  2. nombre
  3. apellido
  4. twitter

Desde una perspectiva de consulta y análisis, trabajar con los datos en formato CSV es casi imposible. En su lugar, estos datos van a ser almacenados como datos NoSQL para que puedan ser procesados posteriormente. No vamos a entrar en el cálculo de números y consultas aquí, pero vendrá en un futuro artículo. En este momento sólo queremos ponerlo en formato NoSQL.

Cuando se cargue en Couchbase, cada fila del CSV tendrá un aspecto similar al siguiente:

Sí, el trozo de datos anterior es un documento JSON, que es lo que soporta Couchbase. Ahora que conocemos los objetivos de los datos, podemos empezar a cargar los datos CSV en Couchbase con RxJava.

Transformación de los datos brutos y escritura en Couchbase

Para utilizar RxJava para cargar datos CSV a través de una aplicación Java, es necesario incluir algunas dependencias. Necesitamos incluir RxJava, OpenCSV, y el Couchbase Java SDK. Dado que estamos usando Maven, todas pueden ser incluidas a través de Maven pom.xml archivo. Para incluir RxJava, incluya la siguiente dependencia en su archivo Maven:

Dado que los datos en bruto estarán en forma de CSV, podemos utilizar la biblioteca CSV de código abierto para Java llamada OpenCSV. La dependencia de Maven para OpenCSV se puede añadir así:

Por último, Java debe conectarse a Servidor Couchbase. Esto puede hacerse a través del SDK Java de Couchbase. Para añadir esta dependencia a su proyecto Maven, añada lo siguiente a su proyecto pom.xml file:

Todas las dependencias del proyecto están listas.

Para cargar, pero no leer, el archivo CSV, creará un nuevo archivo CSVReader como sigue:

Dado que estos datos se procesarán finalmente en Couchbase, debemos conectarnos a nuestro servidor y abrir nuestro bucket.

Lo anterior asume que Couchbase se está ejecutando localmente y los datos se guardarán en el bucket por defecto sin contraseña.

Para procesar el conjunto de datos CSV, se puede crear un RxJava Observable:

Para desglosar lo que ocurre en el Observable, se dan los siguientes pasos.

En CSVReader crea un Iterable. El Observable utilizará el Iterable como fuente de datos utilizando el .de método.

Los datos leídos serán una matriz de cadenas, no algo que pueda almacenarse directamente en la base de datos. Si se utiliza la función .mapa el array de cadenas puede transformarse en lo que decidamos. En este caso, el objetivo es mapear cada línea del CSV a un documento Couchbase. Durante este proceso de mapeo podemos hacer más limpieza de datos. Por ejemplo, podríamos hacer algo como csvRow[*].trim() para eliminar los espacios en blanco iniciales y finales de cada columna CSV.

Por último, cada línea de lectura procesada debe guardarse en Couchbase. El sitio .suscribirse se suscribirá a las notificaciones que emita el Observable, en este caso los datos manipulados.

Conclusión

Acabas de probar a cargar datos CSV sucios en Couchbase usando RxJava y el Couchbase Java SDK. Haciendo programación reactiva puedes tomar acciones en cualquier flujo de datos que estés observando. En nuestro escenario sólo queríamos cargar un archivo CSV en Couchbase.

Si tienes un conjunto de datos masivo, puedes llevar este tutorial al siguiente nivel utilizando Apache Spark. Escribí un cargador CSV muy similar encontrado aquí que hace uso de Spark.

Autor

Publicado por Nic Raboy, Defensor del Desarrollador, Couchbase

Nic Raboy es un defensor de las tecnologías modernas de desarrollo web y móvil. Tiene experiencia en Java, JavaScript, Golang y una variedad de frameworks como Angular, NativeScript y Apache Cordova. Nic escribe sobre sus experiencias de desarrollo relacionadas con hacer el desarrollo web y móvil más fácil de entender.

Dejar una respuesta