La necesidad
Ah, migraciones de bases de datos. Después de haber migrado a Couchbase, donde la representación de datos de documentos JSON es mucho más flexible, no necesitarás tan a menudo retorcer tu proceso de desarrollo a través de la tubería de solicitud de cambio seguido por el tiempo de inactividad, los posibles errores y sólo la incomodidad general cuando se une a filas y columnas.
Dicho esto, sólo porque estés en Couchbase no significa que siempre estés libre de necesitar hacer transformación de datos con datos en Couchbase. Simplemente será mucho menos común.
¡Este es realmente un lugar interesante para Apache Spark!
Un comentario...
Con el conector Spark y Couchbase 4.0 tienes no sólo una, sino cuatro interfaces que son relevantes para los que usan Spark. Se trata de la interfaz K-V, la interfaz de streaming Database Change Protocol (también conocida como DCP), la interfaz de consulta N1QL a través de Spark SQL y la interfaz View Query.
Estos se pueden combinar con una serie de diferentes fuentes de datos del ecosistema Spark para reunir y manipular los datos de varias maneras. Por ejemplo, es posible que desees transmitir los datos desde Couchbase a través de DCP, mezclarlos con una fuente de datos HDFS y volver a colocar los resultados de destino en un bucket de Couchbase diferente.
La solución...
Tomando un caso simple, ¿cómo podemos usar Spark para escribir algún código para transformar eficientemente un conjunto de datos dentro de Couchbase?
Imagina que has adquirido un nuevo conjunto de datos sobre jugadores en formato JSON. Todos ellos van a jugar pronto a su nuevo juego FizzBuzz, y los perfiles proceden de un socio. Los perfiles entrantes se parecen un poco a esto:
|
1 2 3 4 5 6 |
{ "nombre": "Joel", "apellido": "Smith", "correo electrónico": "joelsmith@g00glemail.com", "ficha de derecho": 78238743 } |
El problema es que los perfiles de FizzBuzz tienen todos este aspecto:
|
1 2 3 4 5 6 |
{ "fname": "Matt", "lname": "Ingenthron", "correo electrónico": "matt@couchbase.com", "currentscore": 1000000 } |
Normalmente, si tuvieras otra forma para los datos, añadirías un poco de lógica para el mapeo en el momento de la lectura y la escritura. Sin embargo, esta transición en particular es un proceso de una sola vez y viene con una arruga adicional. Ese "entitlementtoken" tiene que buscarse en una copia de seguridad de la base de datos MySQL que también tienes. No querrá tener que aprovisionar o mantener un gran despliegue MySQL para gestionar el tráfico del día del lanzamiento, por lo que es mejor una transformación única previa al lanzamiento.
Lo ideal sería hacer streaming de los datos, encontrar los que tienen la 'forma' que queremos y luego transformarlos con Spark a partir de una consulta SQL.
Primero, necesitamos configurar nuestra conexión y transmitir los datos, buscando la forma del JSON ya importado. Esto utilizará la interfaz DCP de Couchbase para transmitir los datos.
|
1 2 3 4 5 6 |
val ssc = nuevo StreamingContext(sc, Segundos(5)) ssc.couchbaseStream("transformadora") .filtro(_.isInstanceOf[Mutación]) .mapa(m => (nuevo Cadena(m.asInstanceOf[Mutación].clave), nuevo Cadena(m.asInstanceOf[Mutación].contenido))) |
Una limitación actual es que el DStream nunca se detiene, pero podemos simplemente monitorizar cuando no vemos más datos siendo transformados como una solución para este simple caso.
Luego, para cada elemento, debemos aplicar una transformación basada en esta búsqueda de MySQL. Para ello, tendremos que cargar los datos de MySQL. Suponiendo que la tabla MySQL se ve así:
|
1 2 3 4 5 6 7 8 9 10 |
mysql> describa perfiles; +------------------+-------------+------+-----+---------+-------+ | Campo | Tipo | Nulo | Clave | Por defecto | Extra | +------------------+-------------+------+-----+---------+-------+ | nombre | varchar(20) | SÍ | | NULL | | | apellido | varchar(20) | SÍ | | NULL | | | correo electrónico | varchar(20) | SÍ | | NULL | | | ficha de derecho | int(11) | SÍ | | NULL | | +------------------+-------------+------+-----+---------+-------+ 4 filas en configure (0.00 sec) |
Querremos cargar los datos de MySQL como un DataFrame. Dado que el StreamingContext nos da RDDs a los que unirnos, lo convertiremos en un conjunto de RDDs para un posterior join dentro del stream. Spark 1.6 puede hacer esto más fácil. Esa conversión se parece a esto (extraído a una función para redablility):
|
1 2 3 4 5 6 |
/** Devuelve un RDD basado en la dirección de correo electrónico extraída del documento */ def CreateMappableRdd(s: (Cadena, Cadena)): (Cadena, JsonDocument) = { val documento_de_retorno = JsonDocument.crear(s._1, JsonObject.fromJson(s._2)) (documento_de_retorno.contenido().getString("email"), documento_de_retorno) } |
También tenemos que añadir la nueva ficha de derechos (también extraída):
|
1 2 3 4 5 6 7 8 |
/** Devuelve un JsonDocument enriquecido con el token de asignación de derechos */ def mergeIntoDoc(t: (Cadena, (JsonDocument, Entero))): JsonDocument = { val jsonToEnrich = t._2._1.contenido() val entitlementFromJoin = t._2._2 jsonToEnrich.poner("entitlementtoken", entitlementFromJoin) t._2._1 } |
Al final tenemos una bonita descripción fluida de nuestra transformación modificando los RDDs en vuelo que necesitan cambios. Esto finalmente escribe los datos transformados de nuevo en Couchbase, sobrescribiendo los elementos usando la interfaz K-V.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// cargar el DataFrame de todos los usuarios desde MySQL. // Nota, añadir .cache() puede tener sentido aquí (o no) dependiendo de la cantidad de datos. val derechos = mysqlReader.carga() /* cargando esto: +---------+-----------+-----------------+----------------+ |Nombre, apellidos, dirección de correo electrónico, código de acceso. +---------+-----------+-----------------+----------------+ | Matt| Ingenthron| matt@email.com| 11211| | Michael|Nitschinger|michael@email.com| 11210| +---------+-----------+-----------------+----------------+ */ val derechosSinEsquema = derechos.rdd.mapa[(Cadena, Entero)](f => (f.getAs[Cadena]("email"), f.getAs[Entero]("entitlementtoken"))) val ssc = nuevo StreamingContext(sc, Segundos(5)) ssc.couchbaseStream("transformadora") .filtro(_.isInstanceOf[Mutación]) .mapa(m => (nuevo Cadena(m.asInstanceOf[Mutación].clave), nuevo Cadena(m.asInstanceOf[Mutación].contenido))) .mapa(s => CreateMappableRdd(s)) .filtro(_._2.contenido().consiga("entitlementtoken").eq(null)) .foreachRDD(rdd => { rdd .únase a(derechosSinEsquema) .mapa(mergeIntoDoc) //.foreach(println) // un buen lugar para ver el efecto .saveToCouchbase("transformadora") }) ssc.iniciar() ssc.awaitTermination() |
En el ejemplo completo se encuentra en couchbase-spark-samples repositorio.
La belleza de este ejemplo es que es fácil de entender lo que está pasando y bastante trivial a escala. Lo más probable es que tu propia transformación sea más compleja, pero esto debería darte una idea de lo que es posible y algo en lo que basarte.
Siempre se puede mejorar.
Un problema es que el MySQL podría ser más grande de lo que quiero cargar en la memoria. Spark lo tiene en cuenta dándote una forma de dividir los DataFrames. Yo no necesitaba eso aquí y quería que la muestra fuera legible. La otra cosa que puede ayudar a esto es la capacidad de hacer referencia a un SparkContext desde dentro de un StreamingContext existente. Spark no lo permite por el momento por buenas razones, pero yo diría que este simple caso de uso de hacer un solo registro de búsqueda desde dentro de la corriente no tiene sentido.
En el Conector Couchbase, por el momento, la interfaz DCP está clasificada como volátil y debería considerarse experimental. Además, el ejemplo anterior es muy rápido pero necesita algo de ayuda para escalar. Una próxima actualización de mi colega Sergey Avseyev permitirá dividir los flujos DCP entre trabajadores Spark para paralelizar esta transformación.
Para concluir
Spark es una gran herramienta nueva para este tipo de transformación. Las mismas técnicas pueden aplicarse a la migración a Couchbase desde una fuente de datos diferente, como una base de datos relacional. La técnica puede incluso ampliarse con el aprendizaje automático de Spark para construir un modelo en torno al flujo de datos desde Couchbase para anticipar resultados.