RxJava es una herramienta impresionante para la programación reactiva que también es útil como lector CSV de Java.
En caso de que nunca haya utilizado o incluso oído hablar de RxJava, es una forma de programar con flujos de datos asíncronos. También es una programación basada en eventos donde puedes observar o escuchar datos de flujo y hacer algo cuando dicho flujo es descubierto. Los flujos de datos pueden ser cualquier cosa, desde variables a estructuras de datos.
Para este ejemplo, digamos que una línea de datos CSV es un flujo de datos. Muchos desarrolladores se encuentran con la necesidad de cargar datos CSV en su base de datos más a menudo de lo que esperan. En la mayoría de los escenarios no se puede simplemente leer las líneas de un archivo CSV y soltarlo en una base de datos. En este ejemplo, vamos a utilizar RxJava para suscribirnos a un flujo de datos y transformarlo en Java para leer los archivos CSV que se guardarán en Couchbase.
Requisitos
No hay demasiados requisitos para poner en marcha este proyecto. Como mínimo, necesitarás lo siguiente:
- JDK 1.8+
- Apache Maven 3.3+
- Servidor Couchbase 4.1+
Todo el desarrollo y el trabajo se llevará a cabo con el JDK 1.8 y Maven, y esto incluye la ejecución de la aplicación.
Comprender el conjunto de datos y el modelo de datos
Una gran manera de mojarse los pies cuando se trata de RxJava es conseguir un conjunto de datos de muestra para jugar. Para simplificar, nos inventaremos nuestro propio archivo de valores separados por comas (CSV), pero si quieres algo más extravagante, puedes visitar el sitio web de ciencia de datos, Kaggle.
Supongamos que nuestro sencillo conjunto de datos CSV tiene las siguientes columnas por fila:
- id
- nombre
- apellido
Desde una perspectiva de consulta y análisis, trabajar con los datos en formato CSV es casi imposible. En su lugar, estos datos van a ser almacenados como datos NoSQL para que puedan ser procesados posteriormente. No vamos a entrar en el cálculo de números y consultas aquí, pero vendrá en un futuro artículo. En este momento sólo queremos ponerlo en formato NoSQL.
Cuando se cargue en Couchbase, cada fila del CSV tendrá un aspecto similar al siguiente:
1 2 3 4 5 6 7 8 |
{ "id": 1, "nombre": "Nic", "apellido": "Raboy", "twitter": "nraboy" } |
Sí, el trozo de datos anterior es un documento JSON, que es lo que soporta Couchbase. Ahora que conocemos los objetivos de los datos, podemos empezar a cargar los datos CSV en Couchbase con RxJava.
Transformación de los datos brutos y escritura en Couchbase
Para utilizar RxJava para cargar datos CSV a través de una aplicación Java, es necesario incluir algunas dependencias. Necesitamos incluir RxJava, OpenCSV, y el Couchbase Java SDK. Dado que estamos usando Maven, todas pueden ser incluidas a través de Maven pom.xml archivo. Para incluir RxJava, incluya la siguiente dependencia en su archivo Maven:
1 2 3 4 5 6 7 |
io.reactivex rxjava 1.1.2 |
Dado que los datos en bruto estarán en forma de CSV, podemos utilizar la biblioteca CSV de código abierto para Java llamada OpenCSV. La dependencia de Maven para OpenCSV se puede añadir así:
1 2 3 4 5 6 7 |
com.opencsv opencsv 3.7 |
Por último, Java debe conectarse a Servidor Couchbase. Esto puede hacerse a través del SDK Java de Couchbase. Para añadir esta dependencia a su proyecto Maven, añada lo siguiente a su proyecto pom.xml file:
1 2 3 4 5 6 7 |
com.couchbase.cliente java-cliente 2.2.0 |
Todas las dependencias del proyecto están listas.
Para cargar, pero no leer, el archivo CSV, creará un nuevo archivo CSVReader
como sigue:
1 2 3 |
CSVReader lector = nuevo CSVReader(nuevo Lector de archivos("RUTA_AL_ARCHIVO_CSV")); |
Dado que estos datos se procesarán finalmente en Couchbase, debemos conectarnos a nuestro servidor y abrir nuestro bucket.
1 2 3 |
Cubo cubo = CouchbaseCluster.crear("http://localhost:8091").openBucket("por defecto", ""); |
Lo anterior asume que Couchbase se está ejecutando localmente y los datos se guardarán en el bucket por defecto sin contraseña.
Para procesar el conjunto de datos CSV, se puede crear un RxJava Observable:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Observable .de(lector) .mapa( csvRow -> { JsonObject objeto = JsonObject.crear(); objeto .poner("nombre", csvRow[1]) .poner("apellido", csvRow[2]) .poner("twitter", csvRow[3]); devolver JsonDocument.crear(csvRow[0], objeto); } ) .suscríbase a(documento -> cubo.upsert(documento), error -> Sistema.fuera.println(error)); |
Para desglosar lo que ocurre en el Observable, se dan los siguientes pasos.
En CSVReader
crea un Iterable
. El Observable utilizará el Iterable
como fuente de datos utilizando el .de
método.
Los datos leídos serán una matriz de cadenas, no algo que pueda almacenarse directamente en la base de datos. Si se utiliza la función .mapa
el array de cadenas puede transformarse en lo que decidamos. En este caso, el objetivo es mapear cada línea del CSV a un documento Couchbase. Durante este proceso de mapeo podemos hacer más limpieza de datos. Por ejemplo, podríamos hacer algo como csvRow[*].trim()
para eliminar los espacios en blanco iniciales y finales de cada columna CSV.
Por último, cada línea de lectura procesada debe guardarse en Couchbase. El sitio .suscribirse
se suscribirá a las notificaciones que emita el Observable, en este caso los datos manipulados.
Conclusión
Acabas de probar a cargar datos CSV sucios en Couchbase usando RxJava y el Couchbase Java SDK. Haciendo programación reactiva puedes tomar acciones en cualquier flujo de datos que estés observando. En nuestro escenario sólo queríamos cargar un archivo CSV en Couchbase.
Si tienes un conjunto de datos masivo, puedes llevar este tutorial al siguiente nivel utilizando Apache Spark. Escribí un cargador CSV muy similar encontrado aquí que hace uso de Spark.