Lanzamiento de Spark Connector 1.0.0
Después de dos versiones preliminares para desarrolladores y una beta, estoy muy contento de anunciar la primera versión estable de nuestro Couchbase Spark Connector. El momento no es una coincidencia, ya que la próxima semana Cumbre Spark Europa 2015 se celebra en Ámsterdam. Estamos patrocinando el evento, por lo que nos encontrará a mí y a mis colegas en el stand de Couchbase.
Esta versión estable marca el final de los grandes cambios de última hora, aportando estabilidad a la API y un camino claro hacia el futuro. Si no has leído los anuncios anteriores, en la siguiente entrada encontrarás un resumen de las funciones y capacidades.
El conector se distribuye desde Maven Central (así como desde spark-packages.org), así que si quieres experimentar con él usando el spark-shell, esto es todo lo que necesitas para ponerte en marcha:
Para abrir el apetito, aquí tiene un ejemplo de código completo que puede ejecutar con nuestro conjunto de datos "travel-sample". Utiliza Spark SQL para crear un marco de datos para todas las aerolíneas (en función de un predicado que especifique) y, a continuación, selecciona algunos campos y aplica un orden y un límite:
Esto imprime:
En unas pocas líneas de código puede ejecutar todo tipo de consultas para el análisis de datos, ETL o aprendizaje automático sobre Couchbase. Para mí eso es bastante impresionante - si a ti también te gusta sigue leyendo para conocer todos los detalles.
Por cierto, la documentación completa se encuentra en aquí.
Spark Core - La base escalable
La API de usuario más baja en Spark son las RDD (conjuntos de datos distribuidos resistentes). Es básicamente una colección de datos, que Spark distribuye por todo el cluster. Dado que Spark es una máquina de procesamiento de big data, pero no una base de datos, necesita mecanismos para crear RDDs, así como para persistir RDDs al final de los cálculos. Para ayudar con esto, Couchbase proporciona:
- API para crear RDDs mediante KeyValue, Views y N1QL
- Persistir RDDs en un Bucket Couchbase a través de KeyValue
La documentación detallada de estas tareas está disponible en aquí. Los siguientes ejemplos de código muestran cómo crear RDDs fácilmente, así como persistir en ellos. Ten en cuenta que estos ejemplos solo esperan que haya un SparkContext disponible.
Y aquí un ejemplo más complicado que lee todas las aerolíneas, realiza un recuento clásico de palabras en sus nombres, agrega los resultados y los almacena en un documento de vuelta en el clúster Couchbase:
Como puedes imaginar, entre bastidores ocurren muchas cosas. La API se convierte en consultas a Couchbase, pero lo más importante es que el conector maneja los recursos de forma completamente transparente. Dado que tus cálculos se ejecutarán en trabajadores arbitrarios del clúster, el conector abre conexiones donde sea necesario de forma eficiente. Así que sólo tienes que decirle a Spark qué recuperar o persistir - el conector se encargará del resto.
Si ejecutas trabajadores Spark al lado de nodos Couchbase, el conector intenta indicar el trabajador adecuado para las operaciones KeyValue (de nuevo, de forma transparente). De esta forma se reducen las costosas operaciones de barajado de la red, lo que mejora aún más el rendimiento en este tipo de configuraciones. Ten en cuenta que esto es una optimización pura, puedes ejecutar cualquier topología que quieras y simplemente funcionará.
Spark SQL - Una historia de amor N1QL
Spark SQL es un módulo para trabajar con datos estructurados. Permite al usuario poner un esquema sobre un RDD, que pasa a llamarse DataFrame (anteriormente SchemaRDD). Como Spark dispone ahora de información sobre la estructura de los datos con los que trabaja, puede aplicar todo tipo de transformaciones y optimizaciones.
Couchbase Server 4.0 incluye el nuevo lenguaje de consulta N1QL, que se integra perfectamente en las API SQL de Spark. Sólo hay una pega: los documentos almacenados en Couchbase no están obligados a adherirse a un esquema específico - esa es una de sus características. ¿Cómo podemos estructurar un mundo sin esquemas?
La respuesta es la inferencia automática del esquema. Si creas un DataFrame sobre Couchbase, necesitas proporcionar un "schemaFilter" que a su vez creará internamente un predicado. Entonces cargaremos muchos documentos con ese predicado e inferiremos el esquema a partir de ahí. El siguiente ejemplo muestra cómo crear un DataFrame para aerolíneas en el bucket "travel-sample", que se identifican por su atributo type en el propio documento:
Esto imprime:
Si sus documentos son más o menos similares, este enfoque funciona bien. Si sus documentos carecen por completo de esquema, de modo que cada documento tiene un aspecto muy diferente, también puede proporcionar el esquema manualmente. De este modo, sólo especificará los campos que pueda necesitar:
Por último, si esto sigue sin funcionar, siempre puedes recurrir a una consulta RDD y generar un DataFrame a partir de los resultados:
Esto imprime:
Puedes ver cómo detecta incluso la estructura recursiva de los objetos JSON y las matrices. Esto también se puede utilizar en el momento de la consulta, lo que aporta flexibilidad tanto en el modelado de datos como en la consulta.
Ahora que ya tienes tu DataFrame creado, puedes realizar todo tipo de consultas sobre él:
Esto imprime:
Aquí tienes un ejemplo diferente que muestra cómo puedes crear un DataFrame desde HDFS y unirlo con filas de Couchbase:
Una parte importante de este proceso también se gestiona de forma encubierta: los campos y predicados necesarios se transfieren al motor de consulta N1QL del servidor, de modo que sólo se computan y transfieren los datos esenciales, lo que permite una gestión más eficiente de la red y de los recursos de la CPU.
Spark Streaming - In-N-Out en tiempo real (suave)
Spark Streaming aporta un enfoque de streaming microbatch a Spark, permitiéndote realizar aplicaciones tanto batching como streaming en un solo sistema. Couchbase te permite persistir tales flujos en Couchbase así como (de forma experimental) crear tal flujo a través de su protocolo interno de cambio de documentos (DCP).
La persistencia de un DStream funciona de la misma manera que la persistencia de un RDD - sólo tiene que utilizar la importación implícita correcta y convertirlo en una representación de documento. Los siguientes ejemplos muestran cómo persistir el contenido de los tweets en un feed de twitter en couchbase:
Encontrará más información sobre la compatibilidad con Spark Streaming en aquí.
El camino por recorrer
Era importante sacar adelante esta primera versión estable. La próxima versión (1.1) traerá compatibilidad oficial con Spark 1.5, así como otras mejoras y correcciones de estabilidad. Como siempre, por favor, prueba el conector y danos tu opinión sobre lo que crees que deberíamos mejorar.
¡Feliz pirateo, sin errores y con operaciones aleatorias rápidas!
Hola.
Cómo se vería este código en databricks porque si se ejecuta actualmente hay un error: los desarrolladores deberían utilizar el SparkContext compartido en lugar de crear uno utilizando el constructor. En los cuadernos de Scala y Python, se puede acceder al contexto compartido como sc. Al ejecutar un trabajo, puedes acceder al contexto compartido llamando a SparkContext.getOrCreate()
Código al que me refiero:
// Generar el contexto genérico de Spark
val sc = new SparkContext(new SparkConf().setAppName(\"ejemplo\")
.setMaster(\"local[*]\")
.set(\"com.couchbase.bucket.travel-sample\", \"\"))
// Configurar Spark SQL
val sql = nuevo SQLContext(sc)
// Crear un DataFrame con Inferencia de Esquema
val aerolineas = sql.read.couchbase(schemaFilter = EqualTo(\"tipo\", \"aerolinea\"))
// Realizar la consulta
líneas aéreas
.select(\"nombre\", \"iata\", \"icao\")
.sort(airlines(\"nombre\").asc)
.limit(5)
.mostrar()
Gracias,
Mark