Olá a todos! Eu sou Koji, um engenheiro de soluções que trabalha no Japão. Esta é minha primeira postagem no couchbase.com e estou muito animado!
Neste blog, vou explicar como você pode integrar o Couchbase Server com o Apache NiFi.
Índice:
-
Configuração de conexão do servidor Couchbase: CouchbaseClusterService
-
Exemplo de PutCouchbaseKey: Armazenar tweets no servidor Couchbase
-
Exemplo de GetCouchbaseKey: Baixar documentos específicos do Couchbase como um único arquivo Zip
O que é NiFi
Apache NiFi é um projeto de nível superior da Apache que oferece suporte a gráficos direcionados avançados e dimensionáveis de roteamento de dados, transformação e lógica de mediação do sistema. Recentemente, a Hortonworks anunciou que fornece Hortonworks DataFlow (HDF). O NiFi é usado no HDF como um mecanismo central de processamento de fluxo de dados para dar suporte a casos de uso de IoAT (Internet of Anything). Consulte esses links para obter mais informações.
NiFi, Couchbase e eu
Meu título oficial na Couchbase é "Engenheiro de soluções", e a pré-venda é minha principal tarefa. No entanto, eu também adoro escrever código. Escrever código mantém meu conhecimento técnico atualizado, o que, em última análise, me ajuda a oferecer soluções melhores aos nossos clientes.
Há alguns dias, um conjunto de processadores NiFi para acesso ao Couchbase Server foi adicionado à base de código do Nifi. A contribuição foi feita pelo ME! Foi uma ótima experiência trabalhar com os committers do NiFi durante os processos de revisão detalhados. O abrangente guia do desenvolvedor A documentação realmente me ajudou a entrar no projeto.
Caso você tenha interesse em saber como funcionava o processo de contribuição, aqui estão alguns links para consulta:
-
Solicitação de pull: NIFI-992: Adicionando o pacote nifi-couchbase
Ok, chega de introdução. Vamos nos aprofundar nas configurações do NiFi para descrever como integrar o Couchbase Server!
Componentes principais do NiFi
Depois de fazer o download do NiFi, você pode iniciá-lo e acessar o designer de fluxo de dados da GUI por meio do navegador. Aqui estão alguns dos principais componentes com os quais você deve se familiarizar:
-
FlowFile: Cada parte dos dados transmitidos no NiFi Flow é transferida como um objeto chamado FlowFile. Ele tem conteúdo opaco e um conjunto arbitrário de atributos. Sim, de fato, parece um arquivo.
-
Processador: Um pequeno módulo de processamento que deve ser bom em uma única tarefa, como um comando do Linux. Atualmente, há cerca de 80 processadores disponíveis. Eles executam funções como manipulação de arquivos, acesso a bancos de dados e manipulação de HTTP e outros protocolos.
-
Relacionamento: Cada processador é conectado por um tubo chamado Relacionamento. Alguns processadores têm vários relacionamentos, como sucesso, falha ou original. O FlowFile processado será transferido para o próximo processador por meio dessa relação.
Organize os fluxos de dados por grupo de processos
No NiFi Data Flow, um "Process Group" pode ser muito útil quando o fluxo se torna mais complexo. Ele permite que você organize vários fluxos e, em seguida, cada Process Group pode ser iniciado/interrompido individualmente. Neste fluxo de dados de demonstração, configurei dois Process Groups, "Tweets to Couchbase sample" e "Dump Couchbase Documents sample".
Configuração de conexão do servidor Couchbase: CouchbaseClusterService
Vou descrever como configurar uma conexão com um cluster do Couchbase Server.
Em um fluxo de dados NiFi realista, você terá de usar os processadores do Couchbase várias vezes para colocar e obter dados do cluster. Portanto, não seria uma boa ideia definir as configurações de conexão em cada processador. Se você fizesse isso, seria difícil alterar o cluster de destino porque as configurações do cluster estariam espalhadas por toda parte.
Para evitar esse problema, o NiFi fornece um mecanismo chamado ControllerService para configurar um componente central que pode ser compartilhado entre os processadores. O NiFi inclui alguns Controller Services existentes, como o que fornece pooling de conexão a um RDBMS. Portanto, segui o design e implementei o CouchbaseClusterService.
Permite definir a cadeia de conexão para especificar qual cluster do Couchbase Server deve ser acessado. Se os buckets exigirem uma senha, você também poderá defini-la aqui. A configuração do NiFi tem dois tipos de propriedades: estática e dinâmica. A "Connection String" é estática, e a "Bucket Password for {bucket_name}" é dinâmica. Você pode adicionar novas configurações de propriedade dinâmica clicando no botão "New property" para especificar senhas para diferentes buckets.
Portanto, mais uma vez, o importante é que toda a configuração em nível de cluster é gerenciada por esse CouchbaseClusterService. Se você quiser trabalhar com outro cluster do Couchbase, basta adicionar outro CouchbaseClusterService e configurá-lo adequadamente.
Exemplo de PutCouchbaseKey: Armazenar tweets no servidor Couchbase
O processamento do feed do Twitter é um exemplo comum que podemos usar para ilustrar o fluxo de dados de streaming. Com o NiFi e o Couchbase, isso é incrivelmente fácil, como mostra a imagem a seguir:
-
GetTwitter: A NiFi tem uma variedade de processadores úteis como esse, e pode ser facilmente integrado a outros sistemas.
-
PutCouchbaseKey: Cada Tweet é enviado como um FlowFile. Aqui, eu o armazeno usando o UUID do FlowFile como o ID do documento do Couchbase. Como mostra a imagem, PutCouchbaseKey tem uma relação de "repetição" própria. Se um FlowFile falhar com CouchbaseExceptions e puder ser tentado novamente, como pode acontecer com um erro temporário do lado do servidor, transfira-o para o relacionamento "retry". Se o erro não for recuperável, como uma configuração incorreta ou algum outro erro grave, esses FlowFiles serão transferidos para o relacionamento "failure" (falha).
-
Registro de atributos: Adicionei um processador LogAttribute no final do fluxo. O LogAttribute pode emitir mensagens de registro sobre as propriedades e o conteúdo de um FlowFile. Isso é útil para depurar quaisquer problemas que possam surgir.
Vamos dar uma olhada na configuração do PutCouchbaseKey:
-
O Couchbase Cluster Controller Service refere-se ao serviço de controlador centralizado do Couchbase descrito anteriormente.
-
Bucket Name (Nome do compartimento) é o nome do compartimento no qual você deseja armazenar o conteúdo.
-
O tipo de documento é Json ou Binário.
-
Deixei a propriedade Document Id em branco para permitir que o processador use o UUID do FlowFile como ID do documento. Como alternativa, você poderia especificar Linguagem de expressão NiFi aqui para usar outro valor de propriedade ou para calcular um ID de documento.
Agora que já configuramos o CouchbaseClusterService e os processadores, vamos iniciar o NiFi Data Flow. A única coisa que você precisa fazer é apertar o botão do triângulo verde. Então, você poderá confirmar que os Tweets estão sendo armazenados no Couchbase!
Exemplo de GetCouchbaseKey: Baixar documentos específicos do Couchbase como um único arquivo Zip
Talvez você queira fazer download de um determinado conjunto de documentos do Couchbase Server para enviá-los a outro sistema ou para fazer um backup parcial.
Para fazer isso, configurei o fluxo de dados como você vê na imagem a seguir. Ele é mais complexo do que o exemplo anterior do Twitter e usa alguns tipos diferentes de processadores:
Vou explicar o que cada processador faz:
-
GetFile: Observa o diretório especificado e, quando o arquivo de destino é colocado nele, transfere o conteúdo para o próximo processador.
-
SplitText: Divide o conteúdo do arquivo e envia cada linha como um FlowFile.
-
GetCouchbaseKey: Obtém um documento do Couchbase usando o conteúdo do FlowFile de entrada como um ID de documento.
-
UpdateAttribute: Para usar o ID do documento do Couchbase para o nome de arquivo real que é usado no arquivo Zip final, copiei o atributo "couchbase.doc.id" para "filename" aqui.
-
Mesclar conteúdo: Mescla e compacta vários FlowFiles em um único arquivo Zip.
-
UpdateAttribute: Define o nome do arquivo Zip como a data atual, usando a expressão "${now():format('yyyyMMdd_HHmmss')}.zip"
-
PutFile: Finalmente, coloca o arquivo Zip no diretório especificado.
Os diretórios e arquivos reais têm a aparência abaixo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# Diretório e arquivo drwxr-xr-x 2 koji roda 68B Outubro 2 16:19 couchbase-lixão-em/ drwxr-xr-x 2 koji roda 68B Outubro 2 16:29 couchbase-lixão-fora/ -rw-r--r-- 1 koji roda 111B Outubro 2 16:25 em.dados # Especificar IDs de documentos do Couchbase para obter koji@Kojis-MacBook-Profissional:tmp$ gato em.dados 000069ee-cf4d-46bb-a11d-de09a00cd82c 00021100-bb6c-4327-8cad-16474f5cd928 0004b561-1ea4-4e46-8455-2040481d638e # GetFile exclui o arquivo original para que ele não seja processado novamente. # Recomenda-se criar o arquivo em um diretório diferente, # e, em seguida, coloque o arquivo no diretório de entrada. # (opcionalmente, você pode manter o arquivo original) koji@Kojis-MacBook-Profissional:tmp$ cp em.dados couchbase-lixão-em/ # Após o processamento do NiFi, é criado um arquivo Zip. koji@Kojis-MacBook-Profissional:tmp$ ll couchbase-lixão-fora/ total 8 -rw-r--r-- 1 koji roda 3.8K Outubro 2 16:51 20151002_165136.zip # Extraia o arquivo Zip e confirme se os arquivos JSON estão armazenados nele. koji@Kojis-MacBook-Profissional:couchbase-lixão-fora$ descompactar 20151002_165136.zip Arquivo: 20151002_165136.zip inflando: 000069ee-cf4d-46bb-a11d-de09a00cd82c inflando: 00021100-bb6c-4327-8cad-16474f5cd928 inflando: 0004b561-1ea4-4e46-8455-2040481d638e |
Agora vamos dar uma olhada na configuração do GetCouchbaseKey:
-
Assim como o PutCouchbaseKey, uma conexão com o Couchbase é configurada no ControllerService
-
Deixei o Document Id em branco para permitir que ele usasse o conteúdo do FlowFile de entrada como id do documento. Você também pode especificar a Expression Language aqui para criar um ID de documento.
Conclusão
Não é fantástico que você possa automatizar tarefas como essas sem precisar escrever nenhum programa? Embora apenas processadores simples de acesso a chave/valor estejam disponíveis no momento, você pode usá-los de forma criativa! Estou planejando adicionar mais processadores para que você possa usar consultas View e N1QL do NiFi, e estou ansioso para vê-lo novamente com novas funcionalidades.
Obrigado e feliz processamento de dados!