Conectores

Cree aplicaciones AI/ML altamente escalables con Couchbase y PySpark

Nos complace anunciar la disponibilidad general (GA) de la compatibilidad de Python con Conector Spark de Couchbaseque aporta una integración de primera clase entre Couchbase Server y Apache Spark a los ingenieros de datos de Python. Esta versión GA significa que el conector está listo para producción y totalmente soportado, permitiendo a las aplicaciones PySpark leer y escribir sin problemas en Couchbase. Con la base de datos NoSQL de alto rendimiento de Couchbase (con lenguaje de consulta SQL++/SQL++) y el motor de procesamiento distribuido de Spark, los ingenieros de datos pueden ahora combinar fácilmente estas tecnologías para construir pipelines de datos y flujos de trabajo analíticos rápidos y escalables. En resumen, el conector Spark de Couchbase para PySpark desbloquea la integración de datos eficiente y paralela, lo que le permite aprovechar Spark para ETL/ELT, análisis en tiempo real, aprendizaje automático y mucho más en los datos almacenados en Couchbase.

En este post, cubriremos cómo empezar con el conector PySpark, demostraremos operaciones básicas de lectura/escritura (tanto clave-valor como basadas en consultas) para bases de datos operacionales Couchbase y bases de datos Columnar Capella; y compartiremos consejos de ajuste de rendimiento para obtener el mejor rendimiento. Tanto si has estado utilizando el conector Spark de Couchbase en Scala, como si eres nuevo en la integración Couchbase-Spark, esta guía te ayudará a utilizar rápidamente PySpark para tus necesidades de ingeniería de datos.

¿Por qué PySpark?

La incorporación de PySpark al conector Spark de Couchbase se debe a la creciente demanda de ingenieros de datos y desarrolladores que prefieren Python por su sencillez y el ecosistema masivo de Python ML para Spark en los flujos de trabajo de ingeniería y ciencia de datos. Este soporte garantiza que los equipos que ya utilizan Python puedan ahora integrar Couchbase (tanto si utiliza Couchbase Capella (DBaaS), base de datos operativa autogestionada o Capella Columnar ) en flujos de trabajo Spark basados en Python, lo que permite una adopción más amplia y procesos de datos racionalizados.

El dominio de Python en los casos de uso de IA/ML, respaldado por marcos como SparkML, PyTorch, TensorFlow, H2O, DataRobot, scikit-learn y SageMaker, así como herramientas populares de análisis exploratorio de datos como Matplotlib y Plotly, subraya aún más la necesidad de integración de PySpark. Además, la compatibilidad con PySpark permite acelerar los procesos ETL y ML aprovechando la aceleración de la GPU (Spark RAPIDS) y facilita sofisticadas tareas de ingeniería de características y manipulación de datos utilizando bibliotecas ampliamente adoptadas como Pandas, NumPy y las API de ingeniería de características integradas en Spark. Este nuevo soporte agiliza significativamente los procesos de datos y amplía las oportunidades de adopción de Couchbase en equipos de ingeniería y ciencia de datos.

Primeros pasos con Couchbase PySpark

Empezar es muy sencillo. El conector Spark de Couchbase se distribuye como un único archivo JAR (Java archive) que puedes añadir a tu entorno Spark. Puedes obtener el conector en la página oficial Sitio de descarga de Couchbase o a través de Coordenadas Maven. Una vez que tengas el JAR, usarlo en PySpark es tan sencillo como configurar tu sesión Spark con la configuración del conector y la conexión a Couchbase.

1. Obtener o crear una base de datos operativa Couchbase o una base de datos Columnar Capella.

La forma más rápida de empezar con Couchbase es utilizar nuestra aplicación Capella DBaaS. Una vez allí, puede buscar su base de datos existente o crear una operativo o columnar (para análisis). También puede utilizar nuestra Couchbase autogestionado.

2. Instale PySpark (si no lo está ya)

Si estás trabajando en un entorno Python, instala PySpark usando pip. Por ejemplo, en un entorno virtual:

Esto instalará Apache Spark para su uso con Python. Si está ejecutando en un clúster Spark o Databricks existente, es posible que PySpark ya esté disponible.

3. Incluye el JAR del conector Spark de Couchbase

Descargar el spark-connector-assembly-.jar para la última versión del conector. Entonces, cuando crees tu sesión Spark o envíes tu trabajo, proporciona este JAR en la configuración. Puedes hacerlo estableciendo el parámetro --jarras opción en spark-submit o a través del constructor SparkSession en código (como se muestra a continuación).

4. Configurar la conexión Couchbase

Es necesario especificar la cadena de conexión del clúster Couchbase y las credenciales (nombre de usuario y contraseña). En Capella, puede encontrar esto en la pestaña "Conectar" para las aplicaciones operativas y de gestión. Configuración->Cadena de conexión para columnar. Opcionalmente, especifique un cubo o ámbito por defecto si es necesario (aunque también puede especificar cubo/ámbito por operación).

A continuación ejemplo rápido de PySpark que establece un SparkSession para conectarse a un clúster Couchbase y luego lee algunos datos:

En el código anterior, configuramos la sesión de Spark para incluir el conector JAR de Couchbase y apuntarlo a un cluster de Couchbase. A continuación creamos un DataFrame df leyendo del nombre_cubo cubo (concretamente el scope_name.collection_name ) a través del servicio de consulta.

Para el resto de este documento, supondremos que ha cargado nuestro conjunto de datos de muestra viaje-muestra que se puede hacer para Couchbase Capella operativa o Columnar muy fácilmente.

Lectura/escritura en Couchbase usando PySpark

Una vez que tu sesión de Spark está conectada a Couchbase, puedes realizar ambas operaciones operaciones clave-valor (para escrituras) y operaciones de consulta (utilizando SQL++ tanto para lectura como para escritura) a través de DataFrames.

La siguiente tabla muestra el formato que soporta el conector Sparks para leer y escribir en Couchbase y bases de datos columnares:

Base de datos operativa Couchbase/Capella Capella Base de datos columnar
Leer las operaciones read.format("couchbase.query") read.format("couchbase.columnar")
Operaciones de escritura (se recomienda utilizar el Servicio de Datos)

write.format("couchbase.kv")

write.format("couchbase.query")

write.format("couchbase.columnar")

Lectura desde Couchbase con un DataFrame de consulta

El conector Spark de Couchbase te permite cargar datos desde un bucket de Couchbase como un Spark DataFrame a través de consultas SQL++. Uso del lector DataFrame con formato couchbase.querypuede especificar un bucket (y ámbito/colección) y parámetros de consulta opcionales. Por ejemplo, para leer todos los documentos de una colección o un subconjunto definido por un filtro:

En este ejemplo, aerolíneas_df carga todos los documentos del viajes-muestra.inventario.aerolinea en un Spark DataFrame. A continuación, aplicamos un filtro para encontrar las aerolíneas con sede en Estados Unidos. El conector intentará empujar hacia abajo a Couchbase para que no se transfieran datos innecesarios (es decir, incluirá los filtros WHERE país = "Estados Unidos en la consulta SQL++ que ejecuta, si es posible). El resultado, usa_airlines_dfse puede utilizar como cualquier otro DataFrame en Spark (por ejemplo, se puede unir con otros DataFrames, aplicar agregaciones, etc.).

Bajo el capó, el tabiques conectores los resultados de la consulta en varias tareas si se ha configurado (más información al respecto en Ajuste del rendimiento más abajo), y utiliza el servicio de consultas de Couchbase (impulsado por el motor SQL++) para recuperar los datos. Cada partición de Spark corresponde a un subconjunto de datos recuperados por una consulta SQL++ equivalente. Esto permite lecturas paralelas desde Couchbase, aprovechando la naturaleza distribuida tanto de Spark como de Couchbase.

Escritura en Couchbase con operaciones clave-valor (KV) (recomendado)

El conector también admite la escritura de datos en Couchbase, ya sea a través de la aplicación Servicio de datos (KV) o a través del servicio Query (ejecución de SQL++ INSERTAR/UPSERTAR comandos para usted). En recomendado para la mayoría de los casos de uso es utilizar el Fuente de datos clave-valor (format("couchbase.kv")) para mejor rendimiento. En el modo clave-valor, cada tarea Spark escribirá documentos directamente en los nodos de datos de Couchbase.

Al escribir un DataFrame en Couchbase, debes asegurarte de que existe un ID único para cada documento (ya que Couchbase requiere un ID de documento). Por defecto, el conector busca una columna llamada __META_ID (o META_ID en versiones más recientes) en el DataFrame para el ID del documento. También puede especificar un campo ID personalizado a través de la opción IdFieldName opción.

Por ejemplo, supongamos que tenemos un Spark DataFrame nuevas_lineas_aereas_df que queremos escribir en Couchbase. Tiene una columna airline_id que debe servir como clave del documento de Couchbase, y el resto de columnas son el contenido del documento:

Escribir en Couchbase con operaciones de consulta (SQL++)

Aunque recomendamos utilizar el servicio de datos (KV) como se ha indicado anteriormente, ya que suele ser más rápido que el servicio de consulta, si lo prefiere, también puede escribir a través del servicio de consulta utilizando format("couchbase.query") en escritura. Esto ejecutará internamente sentencias UPSERT de SQL++ para cada fila. Esto puede ser útil si necesita aprovechar una característica de SQL++ (por ejemplo, transformaciones del lado del servidor), pero para inserciones/actualizaciones directas, el enfoque KV es más eficiente.

En la siguiente sección, vamos a modificar estos casos básicos de lectura/escritura para el último producto analítico de Couchbase - Capella Columnar.

Compatibilidad de PySpark con Capella Columnar

Una de las principales novedades de Couchbase Spark Connector GA es el soporte de Capella Columnar. Capella Columnar es un servicio de base de datos analítica nativa JSON en Couchbase Capella que almacena datos en un formato orientado a columnas para analítica de alto rendimiento.

Lectura de datos en formato columnar con PySpark

La lectura de datos de un cluster Couchbase Capella Columnar en PySpark es similar al cluster operacional couchbase excepto por tres cambios:

  1. Utiliza el format("couchbase.columnar") para especificar que la conexión es para servicio en columna.
  2. La cadena de conexión para columnar puede recuperarse desde Capella UI.
  3. También puede especificar el conjunto de datos que desea cargar proporcionando como opciones los nombres de la base de datos, el ámbito y la colección (de forma análoga a bucket/scope/collection en Couchbase)

Una vez configurado Spark, puede utilizar la API del lector Spark DataFrame para cargar datos del servicio columnar:

En este ejemplo, el resultado aerolíneas_df es un DataFrame de Spark normal - puedes inspeccionarlo, ejecutar transformaciones y realizar acciones como .count() o .mostrar() como de costumbre. Por ejemplo, airlines_df.show(5) imprimirá algunos documentos de la aerolínea, y airlines_df.count() devolverá el número de documentos de la colección. Bajo el capó, el conector infiere automáticamente un esquema para los documentos JSON muestreando hasta un cierto número de registros (por defecto 1000). Todos los campos que aparecen de forma consistente en los documentos muestreados se convierten en columnas en el DataFrame, con los tipos de datos Spark apropiados.

Tenga en cuenta que si sus documentos tienen esquemas variables, la inferencia puede producir un esquema que incluya la unión de todos los campos (los campos no presentes en algunos documentos serán nulos en esas filas). En los casos en los que el esquema evoluciona o en los que desea restringir los registros que se tienen en cuenta, puede proporcionar un filtro explícito (predicado) al lector, como se describe a continuación.

Consulta de un conjunto de datos columnares en Couchbase a través de Spark

A menudo puede que no desee cargar una colección completa, especialmente si es grande. Puede optimizar el rendimiento enviando predicados de filtro directamente al servicio Columnar de Capella al cargar los datos, evitando la transferencia innecesaria de datos. Utilice .option("filtro", "") para aplicar una cláusula WHERE de SQL++ durante la operación de lectura. Por ejemplo, para cargar solo las aerolíneas con sede en Estados Unidos:

El conector ejecuta este filtro directamente en la fuente, recuperando sólo los documentos relevantes. También puede realizar proyecciones (seleccionando campos específicos) y agregaciones en algunos casos. agregados simples como CONTAR, MIN, MAXy SUM al motor Columnar siempre que sea posible, en lugar de calcularlos en Spark, para mejorar el rendimiento.

Una vez cargados los datos en un DataFrame, puede realizar las operaciones estándar de Transformaciones Sparky agregaciones. Por ejemplo, para contar las aerolíneas por país utilizando Spark SQL, puede incluso crear una vista temporal para ejecutar consultas Spark SQL en los datos de la siguiente manera:

Esta consulta se ejecuta íntegramente en el motor Spark, lo que aporta flexibilidad para integrar los datos de Couchbase sin problemas en flujos de trabajo analíticos complejos.

Habiendo cubierto las lecturas y escrituras básicas, pasemos a cómo puedes ajustar el rendimiento al mover grandes volúmenes de datos entre Couchbase y Spark.

Consejos para mejorar el rendimiento

Para maximizar el rendimiento y la eficiencia al utilizar el conector PySpark de Couchbase, ten en cuenta las siguientes prácticas recomendadas.

Ajuste de las operaciones de lectura

Utilizar el particionamiento de consultas para el paralelismo
(Couchbase Capella (DBaaS), base de datos operativa autogestionada o Capella Columnar)

Cuando lea a través del servicio de consulta para bases de datos operativas o columnares, aproveche la capacidad del conector para particionar los resultados de la consulta. Puede especificar una partitionCount (y un campo de partición numérico con límites inferior/superior) para la lectura del DataFrame. Una buena regla general es establecer partitionCount a al menos el número total de núcleos de CPU del servicio de consulta disponibles en tu clúster Couchbase. Esto asegura que Spark ejecutará múltiples consultas en paralelo, aprovechando todos los nodos de consulta. Por ejemplo, si el servicio de consultas de tu cluster Couchbase tiene 8 núcleos en total, establece partitionCount >= 8 para que se emitan al menos 8 consultas SQL++ paralelas. Esto puede aumentar drásticamente el rendimiento de lectura mediante la utilización de todos los nodos de consulta simultáneamente. Tenga en cuenta que debe tener suficientes núcleos en su clúster Spark para ejecutar tantas consultas paralelas.

Aprovechar los índices de cobertura para mejorar la eficacia de las consultas
(Couchbase Capella (DBaaS), base de datos operativa autogestionada)

Si utiliza consultas SQL++, intente realizar la consulta a través de índices de cobertura siempre que sea posible. Un índice de cobertura es un índice que incluye todos que necesita la consulta, de modo que ésta pueda realizarse íntegramente a partir del índice sin tener que recurrir al servicio de datos. Las consultas cubiertas evitan el salto de red adicional para obtener documentos completos, por lo que mejores resultados. Diseñe sus índices secundarios de Couchbase para incluir los campos sobre los que filtra y los campos que devuelve, si es factible. Esto podría significar la creación de índices específicos para tus trabajos Spark que cubran exactamente los datos necesarios.

Garantizar réplicas de índices para evitar cuellos de botella
(Couchbase Capella (DBaaS), base de datos operativa autogestionada)

Además de utilizar índices de cobertura, asegúrese de que sus índices están replicados en varios nodos de índice. Replicación de índices no sólo proporciona alta disponibilidad, sino que también permite que las consultas sean carga equilibrada entre copias de índices en diferentes nodos para un mayor rendimiento. En la práctica, si tienes (por ejemplo) 3 nodos de índice, replicar índices importantes entre ellos significa que las consultas paralelas del conector Spark pueden golpear diferentes nodos de índice en lugar de golpear todos un solo nodo.

Ajuste de las operaciones de escritura

Prefiera el servicio Datos para escrituras masivas
(Couchbase Capella (DBaaS), base de datos operativa autogestionada)

Recomendamos utilizar la fuente de datos clave-valor (Servicio de datos) en lugar del servicio de consulta para las operaciones de escritura. La escritura a través del servicio Data (KV upserts directos) suele ser varias veces más rápido que las inserciones basadas en SQL++. De hecho, las pruebas comparativas internas han demostrado que escribir a través de KV puede rondar el 3 veces más rápido que utilizando SQL++ en los trabajos de Spark. Esto se debe a que el servicio de datos puede ingerir documentos en paralelo directamente en los nodos responsables, con una latencia menor por operación. Ten en cuenta que los índices se actualizan por separado, si es necesario, para esos nuevos documentos, ya que las escrituras KV no activarán automáticamente actualizaciones de índices más allá del índice primario.

Aumentar las particiones de escritura para las escrituras del servicio de consulta
(Couchbase Capella (DBaaS), base de datos operativa autogestionada)

Aunque no es recomendable, si decide utilizar couchbase.query para escribir (por ejemplo, si se realizan transformaciones del lado del servidor mientras se escribe), optimiza el rendimiento utilizando un número elevado de particiones de escritura. Puedes reparticionar tu DataFrame antes de escribir para que Spark ejecute muchas tareas de escritura concurrentes. Una pauta aproximada es utilizar del orden de cientos de particiones para escrituras a gran escala mediante SQL++. Por ejemplo, utilizando 128 particiones por nodo de consulta CPU es un punto de partida que algunos usuarios han encontrado efectivo. Esto significa que si tiene 8 núcleos de consulta, pruebe con ~1024 particiones. La idea es inundar el servicio de consulta con suficientes sentencias UPSERT paralelas para maximizar el rendimiento. Ten cuidado y encuentra el equilibrio adecuado para tu cluster - una concurrencia demasiado alta podría sobrecargar el servicio de consultas. Monitoriza el rendimiento de consultas de Couchbase y ajústalo en consecuencia.

Siguiendo estos consejos de ajuste - alineando los recuentos de particiones con los recursos del clúster, indexando inteligentemente y eligiendo el servicio adecuado para el trabajo - puedes lograr un rendimiento óptimo para la integración Couchbase-Spark. No pierdas de vista las métricas de trabajo de Spark y las estadísticas de rendimiento de Couchbase (disponibles en la interfaz de usuario y los registros de Couchbase) para identificar cualquier cuello de botella (por ejemplo, si un nodo de consulta está haciendo todo el trabajo, o si la red está saturada) y ajusta la configuración según sea necesario.

Comunidad y apoyo

El soporte de Couchbase PySpark se basa en Couchbase Spark Connector para Couchbase y es de código abiertoy le animamos a que contribuya, nos haga llegar sus comentarios y se una a la conversación. documentaciónúnete al Foros de Couchbase o Couchbase Discord.

Para saber más

Para más información y documentación detallada, consulte la página oficial Documentación de Couchbase Spark Connector y la sección correspondiente sobre PySpark:

¡Feliz codificación!

El equipo de Couchbase



Comparte este artículo
Recibe actualizaciones del blog de Couchbase en tu bandeja de entrada
Este campo es obligatorio.

Autor

Publicado por Vishal Dhiman, Director Director de Producto

Deja un comentario

¿Listo para empezar con Couchbase Capella?

Empezar a construir

Consulte nuestro portal para desarrolladores para explorar NoSQL, buscar recursos y empezar con tutoriales.

Utilizar Capella gratis

Ponte manos a la obra con Couchbase en unos pocos clics. Capella DBaaS es la forma más fácil y rápida de empezar.

Póngase en contacto

¿Quieres saber más sobre las ofertas de Couchbase? Permítanos ayudarle.