A equipe do Couchbase Developer Experience dedica-se a fornecer aos usuários do Couchbase um rico ecossistema de código aberto para seus clusters. Recentemente, trabalhamos para atualizar a integração do Alpakka Couchbase com suporte para os recursos mais recentes do Couchbase Server.
A integração atualizada do Couchbase está disponível como parte da versão 10.0.1 do Alpakka. Nesta publicação, analisaremos um projeto simples que usa o conjunto de dados “travel-sample” para fornecer uma visão geral do uso básico da integração e de seus componentes.
As informações mais abrangentes sobre a integração e sua API, incluindo exemplos de uso adicionais, podem ser encontradas na documentação do Alpakka: https://doc.akka.io/libraries/alpakka/current/couchbase.html
Pré-requisitos
- Cluster do Couchbase com bucket de amostra de viagem
- CLI do Akka
- Java 21, Eclipse Adoptium é recomendado pelos desenvolvedores do Akka
- Apache Maven versão 3.9 ou posterior
- enrolar ferramenta de linha de comando
- Git ou GitHub Desktop
Observação: as dependências do Akka estão disponíveis no repositório seguro de bibliotecas do Akka. Para acessá-las, é necessário usar um URL seguro e tokenizado, conforme especificado em https://account.akka.io/token.
Para demonstrar o uso do Couchbase no Alpakka, o projeto usa o conjunto de dados de amostra de viagem que pode ser adicionado a qualquer cluster do Couchbase a partir de suas configurações. Mais informações sobre ele podem ser encontradas na documentação do Сouchbase: https://docs.couchbase.com/java-sdk/current/ref/travel-app-data-model.html
O projeto de exemplo implementa dois pontos de extremidade. Um deles nos permite pesquisar aeroportos por cidade, país, nome do aeroporto ou seus códigos internacionais e o outro retorna informações completas sobre um aeroporto por seu identificador.
O código-fonte do nosso projeto de exemplo pode ser encontrado neste URL: https://github.com/couchbaselabs/alpakka-example
O comando a seguir pode ser usado para cloná-lo com o git:
clone do git https://github.com/couchbaselabs/alpakka-example.git
Visão geral do Alpakka
O Projeto Alpakka é uma iniciativa para implementar pipelines de integração reativos e com reconhecimento de fluxo para Java e Scala. Ele foi desenvolvido com base no Fluxos Akka, e foi projetado desde o início para entender o streaming de forma nativa e fornecer uma DSL para programação reativa e orientada a streaming, com suporte integrado para backpressure.
Os fluxos Akka são estruturados em torno de três componentes principais: Source (fonte), Sink (coletor) e Flow (fluxo). Uma fonte emite dados, um coletor os consome e um fluxo transforma os dados à medida que eles se movem pelo fluxo, permitindo um processamento eficiente e assíncrono.
Nossa integração fornece todos os três tipos de componentes:
- O componente CouchbaseSource executa consultas SQL++ do Couchbase e usa os resultados como fonte de dados em um fluxo
- O CouchbaseFlow busca documentos por seus identificadores e também os insere, substitui e exclui
- O CouchbaseSink fornece conjuntos de operações semelhantes ao CouchbaseFlow que podem ser usados como última etapa em um fluxo
Configuração do Alpakka-Couchbase
O código-fonte do projeto contém o arquivo src/main/resources/application.conf com a seguinte configuração do couchbase:
|
1 2 3 4 5 6 7 |
alpakka.couchbase { session { nodes = ["localhost"] username = "Administrator" password = "password" } } |
Essa configuração deve funcionar com um cluster de desenvolvedor de nó único que usa as configurações padrão e é executado no mesmo computador que o aplicativo Alpakka. Como alternativa, ela pode ser modificada com credenciais para um cluster hospedado na nuvem do Couchbase Capella. Mais informações sobre credenciais e conexão com o Couchbase podem ser encontradas em Couchbase Capella (https://docs.couchbase.com/cloud/get-started/connect.html) e o Couchbase Server (https://docs.couchbase.com/server/current/guides/connect.html#connecting-via-client) documentação.
Controlador de aeroporto
O código de amostra que interage com uma conexão configurada do Couchbase pode ser encontrado no arquivo AirportController classe, localizada em src/main/java/travelsample/api/AirportController.java. A integração Alpakka-Couchbase usa Configurações do CouchbaseSession para fazer referência a conexões específicas do Couchbase. O objeto de configurações de sessão pode ser reutilizado entre diferentes pontos de extremidade e atores, portanto, é criado no construtor do controlador e armazenado em um campo:
|
1 2 3 4 |
public AirportController(ActorSystem system) { sessionSettings = CouchbaseSessionSettings.create(system) .withEnrichAsyncCs(DiscoverySupport.getNodes(system)); } |
O searchAirport nessa classe implementa o método /search?query= e demonstra o uso do SQL++ com argumentos posicionais no Alpakka:
|
1 2 3 4 5 6 7 |
String searchTerm = requestContext().queryParams().getString("query").orElseThrow(); return CouchbaseSource.fromQuery( sessionSettings, "travel-sample", "SELECT * FROM inventory.airport where airportname LIKE '%' $1 '%' OR faa = $1 or icao = $1 or city LIKE '%' $1 '%'", QueryOptions.queryOptions().parameters(JsonArray.from(searchTerm) )).runWith(Sink.seq(), system).toCompletableFuture().get(TIMEOUT, TimeUnit.SECONDS); |
Para executar uma consulta SQL++, primeiro criamos uma instância de CouchbaseSource e configurá-la com as informações necessárias para se conectar ao cluster (sessionSettings), a consulta que queremos executar e os argumentos para essa consulta passados como um objeto QueryOptions. Os argumentos posicionais e nomeados são suportados de acordo com a documentação do Couchbase Java SDK para o objeto Opções de consulta classe.
Em seguida, usamos o runSink na fonte criada para executar a consulta e extrair seus resultados como um fluxo de objetos.
O segundo método de controle, getById, implementa o /byId/{id} que recebe um identificador de aeroporto e retorna informações sobre um aeroporto armazenado em um documento com esse identificador.
|
1 2 3 4 5 6 7 8 9 10 11 |
return Source.from(Collections.singletonList(id)) .via(CouchbaseFlow.fromId( sessionSettings, "travel-sample", "inventory", "Airport" )) .runWith(Sink.seq(), system) .thenApply(List::getFirst) .thenApply(CouchbaseDocument::getDocument) .toCompletableFuture().get(TIMEOUT, TimeUnit.SECONDS); |
Podemos usar a função Fonte::de com uma lista singleton contendo o identificador solicitado para criar uma instância de origem.
Em seguida, usamos o Fonte: via para rotear o fluxo Akka de origem (nesse caso, de um único identificador) por meio de um método CouchbaseFlow que é configurado de forma semelhante ao objeto CouchbaseSource na classe searchAirport método. A instância de CouchbaseFlow aceitará o identificador solicitado do fluxo, buscará o documento correspondente da coleção configurada e o retornará ao fluxo, substituindo o identificador por uma instância de CouchbaseDocument que contém o identificador e o documento associado.
Combinação CouchbaseSink, CouchbaseFlow, e outros objetos documentados do Couchbase como etapas nos fluxos do Akka, juntamente com atores de outros componentes do Alpakka, permite a criação de pipelines que podem usar o Couchbase para armazenar e buscar informações, bem como usar seus recursos avançados para análise e pesquisa.
Documentação e controle de problemas
A documentação do Alpakka (https://doc.akka.io/libraries/alpakka/current/couchbase.html) contém mais informações sobre o uso dessa integração, incluindo upserting, modificando e exclusão documentos em um cluster do Couchbase.
Quaisquer sugestões sobre a integração ou problemas com ela podem ser compartilhados por meio do Página de problemas do Alpakka no GitHub.