Temos o prazer de anunciar a disponibilidade geral (GA) do suporte a Python para Conector do Couchbase Sparktrazendo uma integração de primeira classe entre o Couchbase Server e o Apache Spark para engenheiros de dados Python. Essa versão GA significa que o conector está pronto para a produção e é totalmente compatível, permitindo que os aplicativos PySpark leiam e gravem perfeitamente no Couchbase. Com o banco de dados NoSQL de alto desempenho do Couchbase (com linguagem de consulta SQL++/SQL++) e o mecanismo de processamento distribuído do Spark, os engenheiros de dados agora podem combinar facilmente essas tecnologias para criar pipelines de dados e fluxos de trabalho de análise rápidos e dimensionáveis. Em suma, o Couchbase Spark Connector for PySpark permite a integração de dados paralela e eficiente, permitindo que você utilize o Spark para ETL/ELT, análise em tempo real, aprendizado de máquina e muito mais em dados armazenados no Couchbase.

Nesta postagem, abordaremos como começar a usar o conector PySpark, demonstraremos operações básicas de leitura/gravação (baseadas em chave-valor e em consulta) para o banco de dados operacional Couchbase e para os bancos de dados Capella Columnar e compartilharemos dicas de ajuste de desempenho para obter a melhor taxa de transferência. Se você já usa o conector do Couchbase Spark em Scala ou se é novo na integração Couchbase-Spark, este guia o ajudará a aumentar rapidamente o uso do PySpark para suas necessidades de engenharia de dados.

Por que o PySpark?

A inclusão do suporte ao PySpark no conector Spark do Couchbase foi motivada pela crescente demanda de engenheiros de dados e desenvolvedores que preferem Python por sua simplicidade e pelo enorme ecossistema Python ML para Spark em fluxos de trabalho de ciência de dados e engenharia. Esse suporte garante que as equipes que já usam Python possam agora integrar o Couchbase (quer você esteja usando Couchbase Capella (DBaaS), banco de dados operacional autogerenciado ou Capella Columnar ) em fluxos de trabalho Spark baseados em Python, permitindo uma adoção mais ampla e processos de dados simplificados.

O domínio do Python em casos de uso de IA/ML, apoiado por estruturas como SparkML, PyTorch, TensorFlow, H2O, DataRobot, scikit-learn e SageMaker, além de ferramentas populares de análise exploratória de dados, como Matplotlib e Plotly, ressalta ainda mais a necessidade de integração com o PySpark. Além disso, a compatibilidade com o PySpark desbloqueia pipelines acelerados de ETL e ML, aproveitando a aceleração de GPU (Spark RAPIDS), e facilita tarefas sofisticadas de engenharia de recursos e de manipulação de dados usando bibliotecas amplamente adotadas, como Pandas, NumPy e as APIs de engenharia de recursos incorporadas do Spark. Esse novo suporte simplifica significativamente os processos de dados e expande as oportunidades de adoção do Couchbase nas equipes de ciência de dados e engenharia.

Introdução ao Couchbase PySpark

Os primeiros passos são simples. O Couchbase Spark Connector é distribuído como um único JAR (arquivo Java) que você adiciona ao seu ambiente Spark. Você pode obter o conector no site oficial do Site de download do Couchbase ou via Coordenadas do Maven. Depois que você tiver o JAR, usá-lo no PySpark é tão simples quanto configurar sua sessão do Spark com as configurações de conexão do conector e do Couchbase.

1. Obtenha ou crie um banco de dados operacional Couchbase ou um banco de dados Capella Columnar

A maneira mais rápida de começar a usar o Couchbase é usar nosso Capella DBaaS. Uma vez lá, você pode encontrar seu banco de dados existente ou criar um operacional ou colunar (para análise). Como alternativa, você pode usar nosso Couchbase autogerenciado.

2. Instale o PySpark (se ainda não o tiver feito)

Se estiver trabalhando em um ambiente Python, instale o PySpark usando o pip. Por exemplo, em um ambiente virtual:

Isso instalará o Apache Spark para uso com o Python. Se você estiver executando em um cluster Spark ou Databricks existente, o PySpark já poderá estar disponível.

3. Incluir o JAR do conector do Couchbase Spark

Baixar o spark-connector-assembly-.jar para obter a versão mais recente do conector. Em seguida, ao criar sua sessão do Spark ou enviar seu trabalho, forneça esse JAR na configuração. Você pode fazer isso definindo o parâmetro --jarros opção em spark-submit ou por meio do construtor SparkSession no código (como mostrado abaixo).

4. Configurar a conexão com o Couchbase

Você precisa especificar a string de conexão e as credenciais do cluster do Couchbase (nome de usuário e senha). No Capella, você pode encontrar isso na guia "Connect" (Conectar) para os sistemas operacionais e Configurações->Conexão String para colunar. Opcionalmente, especifique um bucket ou escopo padrão, se necessário (embora você também possa especificar bucket/escopo por operação).

Abaixo está uma exemplo rápido do PySpark que configura um SparkSession para se conectar a um cluster do Couchbase e, em seguida, lê alguns dados:

No código acima, configuramos a sessão do Spark para incluir o JAR do conector do Couchbase e apontá-lo para um cluster do Couchbase. Em seguida, criamos um DataFrame df lendo o nome_do_balde (especificamente o balde scope_name.collection_name coleção) por meio do serviço Query.

No restante deste documento, presumimos que você tenha carregado nosso conjunto de dados de amostra amostra de viagem que pode ser feito para o Couchbase Capella operacional ou Colunar com muita facilidade.

Leitura/gravação no Couchbase usando o PySpark

Quando sua sessão do Spark estiver conectada ao Couchbase, você poderá executar operações de valor-chave (para gravações) e operações de consulta (usando SQL++ para leitura e gravação) por meio de DataFrames.

A tabela a seguir mostra o formato suportado pelo conector Sparks para ler e gravar em bancos de dados Couchbase e colunares:

Banco de dados operacional do Couchbase/Capella Banco de dados colunar Capella
Operações de leitura read.format("couchbase.query") read.format("couchbase.columnar")
Operações de gravação (recomendável usar o serviço de dados)

write.format("couchbase.kv")

write.format("couchbase.query")

write.format("couchbase.columnar")

Leitura do Couchbase com um DataFrame de consulta

O Couchbase Spark Connector permite que você carregue dados de um bucket do Couchbase como um Spark DataFrame por meio de consultas SQL++. Usando o leitor de DataFrame com formato couchbase.querySe você quiser ler documentos de uma coleção, pode especificar um bucket (e escopo/coleção) e parâmetros de consulta opcionais. Por exemplo, para ler todos os documentos de uma coleção ou um subconjunto definido por um filtro:

Neste exemplo, companhias aéreas_df carrega todos os documentos do travel-sample.inventory.airline em um DataFrame do Spark. Em seguida, aplicamos um filtro para encontrar companhias aéreas sediadas nos Estados Unidos. O conector tentará empurrar para baixo para o Couchbase para que dados desnecessários não sejam transferidos (ou seja, ele incluirá os filtros WHERE country = 'United States' (país = 'Estados Unidos') na consulta SQL++ que ele executa, se possível). O resultado, usa_airlines_dfpode ser usado como qualquer outro DataFrame no Spark (por exemplo, você pode uni-lo a outros DataFrames, aplicar agregações etc.).

Sob o capô, o partições de conectores os resultados da consulta em várias tarefas, se configurado (mais sobre isso em Ajuste de desempenho abaixo) e usa o serviço Query do Couchbase (alimentado pelo mecanismo SQL++) para recuperar os dados. Cada partição do Spark corresponde a um subconjunto de dados recuperados por uma consulta SQL++ equivalente. Isso permite leituras paralelas do Couchbase, aproveitando a natureza distribuída do Spark e do Couchbase.

Gravação no Couchbase com operações de chave-valor (KV) (recomendado)

O conector também oferece suporte à gravação de dados no Couchbase, seja por meio do Serviço de dados (KV) ou por meio do serviço Query (executando o SQL++ INSERT/UPSERT comandos para você). O recomendado para a maioria dos casos de uso é usar o Fonte de dados chave-valor (formato("couchbase.kv")) para melhor desempenho. No modo de valor-chave, cada tarefa do Spark gravará documentos diretamente nos nós de dados do Couchbase.

Ao gravar um DataFrame no Couchbase, você deve garantir que haja um ID exclusivo para cada documento (já que o Couchbase exige um ID de documento). Por padrão, o conector procura uma coluna chamada __META_ID (ou META_ID em versões mais recentes) no DataFrame para o ID do documento. Você também pode especificar um campo de ID personalizado por meio da opção IdFieldName opção.

Por exemplo, suponha que tenhamos um DataFrame do Spark new_airlines_df que queremos gravar no Couchbase. Ele tem uma coluna identificação da companhia aérea que deve servir como a chave do documento do Couchbase, e o restante das colunas é o conteúdo do documento:

Gravação no Couchbase com operações de consulta (SQL++)

Embora recomendemos o uso do serviço de dados (KV) como acima, pois ele é normalmente mais rápido do que o serviço de consulta, se preferir, você também pode gravar por meio do serviço de consulta usando format("couchbase.query") na gravação. Isso executará internamente instruções SQL++ UPSERT para cada linha. Isso pode ser útil se você precisar aproveitar um recurso do SQL++ (por exemplo, transformações no lado do servidor), mas para inserções/atualizações simples, a abordagem KV é mais eficiente.

Na próxima seção, vamos modificar esses casos básicos de leitura/gravação para o mais recente produto de análise do Couchbase, o Capella Columnar.

Suporte do PySpark para Capella Columnar

Um dos principais novos recursos do Couchbase Spark Connector GA é o suporte ao Capella Columnar. O Capella Columnar é um serviço de banco de dados analítico nativo de JSON no Couchbase Capella que armazena dados em um formato orientado por coluna para análise de alto desempenho

Leitura de dados em formato colunar com o PySpark

A leitura de dados de um cluster do Couchbase Capella Columnar no PySpark é semelhante ao cluster operacional do Couchbase, exceto por três alterações:

  1. Use o formato("couchbase.columnar") para especificar que a conexão é para serviço colunar.
  2. A string de conexão para o columnar pode ser recuperada na interface do usuário do Capella.
  3. Você também especificará o conjunto de dados a ser carregado, fornecendo os nomes do banco de dados, do escopo e da coleção (análogo a bucket/scope/collection no Couchbase) como opções

Depois que o Spark estiver configurado, você poderá usar a API de leitura do Spark DataFrame para carregar dados do serviço colunar:

Neste exemplo, o resultado companhias aéreas_df é um Spark DataFrame normal - você pode inspecioná-lo, executar transformações e realizar ações como .count() ou .show() como de costume. Por exemplo, airlines_df.show(5) imprimirá alguns documentos da companhia aérea e airlines_df.count() retornará o número de documentos na coleção. Nos bastidores, o conector infere automaticamente um esquema para os documentos JSON por amostragem até um determinado número de registros (por padrão, 1000). Todos os campos que aparecem consistentemente nos documentos amostrados tornam-se colunas no DataFrame, com os tipos de dados Spark apropriados.

Observe que, se seus documentos tiverem esquemas variados, a inferência poderá produzir um esquema que inclua a união de todos os campos (os campos não presentes em alguns documentos serão nulos nessas linhas). Nos casos em que o esquema está evoluindo ou em que você deseja restringir quais registros são considerados, é possível fornecer um filtro explícito (predicado) ao leitor, conforme descrito a seguir.

Consulta de um conjunto de dados colunares no Couchbase via Spark

Muitas vezes, talvez você não queira carregar uma coleção inteira, especialmente se ela for grande. Você pode otimizar o desempenho enviando os predicados de filtro diretamente para o serviço Capella Columnar ao carregar dados, evitando a transferência desnecessária de dados. Use .opção("filtro", "") para aplicar uma cláusula SQL++ WHERE durante a operação de leitura. Por exemplo, para carregar somente companhias aéreas baseadas nos Estados Unidos:

O conector executa esse filtro diretamente na origem, recuperando apenas os documentos relevantes. Você também pode fazer push down de projeções (selecionando campos específicos) e agregações em alguns casos - o conector descarregará agregados simples como CONTAGEM, MIN, MAXe SUM para o mecanismo Columnar sempre que possível, em vez de computá-los no Spark, para melhorar o desempenho

Depois que os dados são carregados em um DataFrame, você pode executar as funções padrão Transformações do Spark, junções e agregações. Por exemplo, para contar as companhias aéreas por país usando o Spark SQL, você pode até criar uma visualização temporária para executar consultas do Spark SQL nos dados da seguinte forma:

Essa consulta é executada inteiramente no mecanismo Spark, oferecendo flexibilidade para integrar perfeitamente os dados do Couchbase em fluxos de trabalho analíticos complexos.

Tendo abordado as leituras e gravações básicas, vamos ver como você pode ajustar o desempenho ao mover grandes volumes de dados entre o Couchbase e o Spark.

Dicas de ajuste de desempenho

Para maximizar o rendimento e a eficiência ao usar o conector PySpark do Couchbase, considere as práticas recomendadas a seguir.

Ajuste de suas operações de leitura

Usar o particionamento de consultas para paralelismo
(Couchbase Capella (DBaaS), banco de dados operacional autogerenciado ou Capella Columnar)

Ao ler por meio do serviço Query para banco de dados operacional ou colunar, aproveite a capacidade do conector de particionar os resultados da consulta. Você pode especificar um partitionCount (e um campo de particionamento numérico com limites inferior/superior) para a leitura do DataFrame. Uma boa regra geral é definir partitionCount para pelo menos o número total de núcleos de CPU do serviço de consulta disponíveis em seu cluster do Couchbase. Isso garante que o Spark executará várias consultas em paralelo, aproveitando todos os nós de consulta. Por exemplo, se o serviço de consulta do seu cluster do Couchbase tiver 8 núcleos no total, defina partitionCount >= 8 de modo que pelo menos 8 consultas SQL++ paralelas sejam emitidas. Isso pode aumentar drasticamente a taxa de transferência de leitura ao utilizar todos os nós de consulta simultaneamente. Observe que também é necessário ter núcleos suficientes no cluster do Spark para executar esse número de consultas paralelas.

Aproveite os índices de cobertura para aumentar a eficiência das consultas
(Couchbase Capella (DBaaS), banco de dados operacional autogerenciado)

Se estiver usando consultas SQL++, tente consultar por meio de índices de cobertura sempre que possível. Um índice de cobertura é um índice que inclui todos campos de que sua consulta precisa, de modo que a consulta possa ser atendida inteiramente a partir do índice, sem buscar no serviço de dados. As consultas cobertas evitam o salto extra na rede para buscar documentos completos, portanto proporcionando melhor desempenho. Projete seus índices secundários do Couchbase para incluir os campos que você filtra e os campos que você retorna, se possível. Isso pode significar a criação de índices específicos para seus trabalhos Spark que cubram exatamente os dados necessários.

Garantir réplicas de índice para evitar gargalos
(Couchbase Capella (DBaaS), banco de dados operacional autogerenciado)

Além de usar índices de cobertura, certifique-se de que seus índices sejam replicados em vários nós de índice. Replicação de índices não apenas fornece alta disponibilidade, mas também permite que as consultas sejam balanceamento de carga entre cópias de índices em diferentes nós para aumentar a taxa de transferência. Na prática, se você tiver (por exemplo) 3 nós de índice, a replicação de índices importantes entre eles significa que as consultas paralelas do conector do Spark podem atingir diferentes nós de índice, em vez de atingir um único nó.

Ajuste de suas operações de gravação

Prefira o serviço Data para gravações em massa
(Couchbase Capella (DBaaS), banco de dados operacional autogerenciado)

Recomendamos usar a fonte de dados de valor-chave (Serviço de dados) em vez do serviço Query para operações de gravação. A gravação por meio do serviço Data (upserts diretos de KV) é normalmente várias vezes mais rápido do que fazer inserções baseadas em SQL++. De fato, benchmarks internos mostraram que a gravação via KV pode ser cerca de 3x mais rápido do que usar o SQL++ em trabalhos do Spark. Isso ocorre porque o serviço de dados pode ingerir documentos em paralelo diretamente nos nós responsáveis, com menor latência por operação. Observe que os índices são atualizados separadamente, se necessário, para esses novos documentos, pois as gravações KV não acionam automaticamente as atualizações de índice além do índice primário.

Aumentar as partições de gravação para gravações do serviço de consulta
(Couchbase Capella (DBaaS), banco de dados operacional autogerenciado)

Embora não seja recomendado, se você decidir usar o couchbase.query para gravação (por exemplo, se estiver executando transformações no lado do servidor durante a gravação), otimize o desempenho usando um número alto de partições de gravação. Você pode reparticionar seu DataFrame antes da gravação para que o Spark execute muitas tarefas de gravação simultâneas. Uma diretriz aproximada é usar na ordem de centenas de partições para gravações em grande escala via SQL++. Por exemplo, usando cerca de 128 partições por nó de consulta CPU é um ponto de partida que alguns usuários consideram eficaz. Isso significa que, se você tiver 8 núcleos de consulta, tente ~1024 partições. A ideia é inundar o serviço de consulta com instruções UPSERT paralelas suficientes para maximizar a taxa de transferência. Seja cauteloso e encontre o equilíbrio certo para seu cluster - uma simultaneidade muito alta pode sobrecarregar o serviço de consulta. Monitore a taxa de transferência de consultas do Couchbase e faça os ajustes necessários.

Seguindo essas dicas de ajuste - alinhando a contagem de partições com os recursos do cluster, indexando de forma inteligente e escolhendo o serviço certo para o trabalho - você pode obter o desempenho ideal para a integração Couchbase-Spark. Fique de olho nas métricas de trabalho do Spark e nas estatísticas de desempenho do Couchbase (disponíveis na interface do usuário e nos registros do Couchbase) para identificar quaisquer gargalos (por exemplo, se um nó de consulta estiver fazendo todo o trabalho ou se a rede estiver saturada) e ajuste a configuração conforme necessário.

Comunidade e suporte

O suporte ao PySpark do Couchbase foi desenvolvido com base no Conector Spark do Couchbase para o Couchbase e no é de código abertoe incentivamos você a contribuir, fornecer feedback e participar da conversa. Você pode acessar nosso abrangente documentação, junte-se ao Fóruns do Couchbase ou Discórdia do Couchbase.

Leitura adicional

Para obter mais informações e documentação detalhada, consulte o site oficial do Documentação do conector do Couchbase Spark e a seção relevante sobre PySpark:

Boa codificação!

A equipe do Couchbase



Autor

Postado por Vishal Dhiman, gerente sênior de produtos Gerente de produtos

Deixar uma resposta