Na versão anterior postagem no blog discutimos como o Couchbase pode se integrar perfeitamente em uma Arquitetura orientada a eventos. Usando o Couchbase Eventing Service, as mutações de documentos podem ser publicadas em um Solace PubSub+ de onde os dados são disponibilizados para os microsserviços assinantes quase em tempo real.
Esta postagem se concentra em como os microsserviços como assinantes de tópicos podem aproveitar o Couchbase como um armazenamento de dados escalável e resiliente.
Visão geral da arquitetura e aplicativo de amostra
Neste aplicativo de amostraEm seguida, assinamos um tópico do Solace PubSub+, processamos os dados e atualizamos o documento no Couchbase.
Em nosso aplicativo de exemplo, armazenamos os detalhes do hotel no Couchbase. As atualizações da configuração do hotel são publicadas em um tópico do Solace PubSub+. Criamos um aplicativo que assina o tópico, recupera os dados atualizados e atualiza a configuração do hotel existente no Couchbase.
Usamos o amostra de viagem que contém dados de amostra, como voos, rotas e outros. Em nosso exemplo, focamos no hotel documentos armazenados no inventário escopo no hotel coleção. Todos os documentos de hotel consistem em um documento JSON com vários atributos.
Atualizaremos apenas um pequeno subconjunto de atributos para este tutorial, como animais_permitidos e internet_livre.

O aplicativo de exemplo assina um tópico do Solace e atualiza os dados do hotel no Couchbase. Outros aplicativos podem fornecer as alterações na configuração do hotel. Usamos um serviço simples que publica atualizações no tópico.
Legenda: O aplicativo de amostra assina um tópico do Solace e atualiza os dados do hotel no Couchbase. Outros aplicativos podem fornecer as alterações na configuração do hotel. Usamos um serviço simples que publica atualizações no tópico.
Pré-requisitos
Consulte os pré-requisitos para este aplicativo de amostra:
Couchbase Capella
Criar uma conta gratuita Couchbase Capella e siga as instruções para provisionar seu cluster de avaliação. Isso leva apenas alguns minutos e fornece a você um cluster do Couchbase completo, incluindo o Dados e o conjunto de dados de amostra, amostra de viagem, que usaremos ao longo desta postagem do blog.
Você pode usar este tutorial quando não estiver usando nossa oferta gerenciada do Couchbase Capella. O amostra de viagem faz parte do produto do servidor Couchbase e pode ser importado usando o console do Couchbase.
Nuvem do corretor de eventos Solace PubSub+
Inscreva-se gratuitamente Avaliação do Solace na nuvem e criar um serviço/VPN. Usaremos a VPN para nos conectarmos a um Solace Topic.
Revisão do código - Assinante e editor do Solace
Para assinar mensagens do tópico do Solace, usamos a API Java do Solace. Este exemplo de implementação é derivado de exemplos na documentação do Solace. Para obter detalhes sobre a implementação, consulte a documentação do Solace.
1 2 3 4 5 6 7 8 9 |
final JCSMPProperties propriedades = novo JCSMPProperties(); propriedades.setProperty(JCSMPProperties.HOST, "tcps://yourhost.messaging.solace.cloud:55443"); propriedades.setProperty(JCSMPProperties.NOME DE USUÁRIO, "solace-cloud-client"); propriedades.setProperty(JCSMPProperties.SENHA, "senha"); propriedades.setProperty(JCSMPProperties.VPN_NAME, "couchbasedemo"); JCSMPSession sessão = JCSMPFactory.onlyInstance().createSession(propriedades); sessão.conectar(); final Tópico tópico = JCSMPFactory.onlyInstance().createTopic("tutorial/topic" (tutorial/tópico)); |
Configuramos o JCSMPProperties com as informações de acesso relevantes disponíveis na interface da Web do Solace. Em seguida, criamos um JCSMPSession e um Tópico.
Para assinar as mensagens no tópico, implementamos um XMLMessageConsumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// ### Receptor de mensagens - Assinar o tópico ### final XMLMessageConsumer contras = sessão.getMessageConsumer(novo Ouvinte de XMLMessageListener() { @Override público vazio onReceive(BytesXMLMessage mensagem) { se (mensagem instância de TextMessage) { Sistema.fora.printf("SUBSCRIBE: Mensagem recebida: '%s'%n", ((TextMessage) mensagem).getText()); UpdateHotelData.getInstance().upsertHotelData(((TextMessage) mensagem).getText()); } mais { Sistema.fora.println("Mensagem recebida".); } } @Override público vazio onException(JCSMPException e) { Sistema.fora.printf("O consumidor recebeu uma exceção: %s%n", e); } }); sessão.addSubscription(tópico); Sistema.fora.println("SUBSCRIBE: Conectado. Aguardando mensagem..."); contras.iniciar(); |
No onReceive pegamos a mensagem, convertemos em uma string e, em seguida, chamamos o método upsertHotelData método de UpdateHotelData que contém a implementação do Couchbase discutida um pouco mais adiante nesta postagem do blog.
Adicionamos a assinatura do tópico à sessão e iniciamos o consumidor de mensagens.
Para podermos testar a implementação, também criamos um editor de mensagens. Para isso, primeiro implementamos um XMLMessageProducer.
1 2 3 4 5 6 7 8 9 10 11 12 |
// ### Produtor de mensagens - Enviar mensagem ao tópico XMLMessageProducer prod = sessão.getMessageProducer(novo JCSMPStreamingPublishCorrelatingEventHandler() { @Override público vazio responseReceivedEx(Objeto chave) { Sistema.fora.println("O produtor recebeu resposta para a msg: " + chave.toString()); } @Override público vazio handleErrorEx(Objeto chave, JCSMPException causa, longo carimbo de data/hora) { Sistema.fora.printf("O produtor recebeu um erro para a msg: %s@%s - %s%n", chave.toString(), carimbo de data/hora, causa); } }); |
Com o produtor de mensagens instalado, agora podemos enviar mensagens, conforme mostrado abaixo.
1 2 3 4 5 6 7 8 9 |
// Enviar mensagens. Aqui, fazemos um loop e enviamos a mesma mensagem várias vezes. para (int msgsSent = 0; msgsSent < 2; ++msgsSent) { TextMessage mensagem = JCSMPFactory.onlyInstance().createMessage(TextMessage.classe); mensagem.setText(dados); Sistema.fora.printf("PUBLISH: Enviando a mensagem '%s' para o tópico '%s'...%n", dados, tópico.getName()); prod.enviar(mensagem, tópico); /Dá-nos algum tempo para acompanhar os logs do console Tópico.dormir(3000); } |
Inserção dos dados no Couchbase
Com a lógica para assinar e publicar mensagens implementada, vamos nos concentrar agora em como atualizar os documentos do hotel no Couchbase.
Para este exemplo, supomos que a mensagem recuperada do tópico tenha o formato JSON abaixo. Incluindo um hotel_id que usaremos como o ID do documento dos documentos do hotel armazenados no Couchbase, bem como vários atributos que podemos alterar:
1 2 3 4 5 6 7 |
{ "hotel_id": "hotel_10025", "pets_ok": verdadeiro, "free_breakfast" (café da manhã gratuito): verdadeiro, "free_internet": verdadeiro, "free_parking" (estacionamento gratuito): verdadeiro } |
Para estabelecer uma conexão segura de nosso aplicativo com o Couchbase Capella, precisamos:
-
- Crie um usuário de banco de dados e conceda a esse usuário acesso aos dados do hotel.
- Coloque na lista de permissões o endereço IP do host que está executando o aplicativo
- Capture o URL da conexão segura
Você pode gerenciar todos esses detalhes no plano de gerenciamento do Capella. Faça login na Capella no navegador:
- Abra o Teste - Cluster e navegue até o cluster Conectar guia
- Role para baixo até a seção Acesso ao banco de dados e clique em Gerenciar credenciais
- Clique em Criar credenciais de banco de dados e forneça um nome de usuário e uma senha do banco de dados. Configure o acesso em nível de Bucket para o inventário escopo no amostra de viagem e conceder acesso de leitura/gravação.
- Coloque o endereço IP do aplicativo na lista de permissões clicando em Gerenciar IPs permitidos e depois Adicionar IP permitido. Ao executar o aplicativo a partir de sua máquina local, basta clicar em Adicionar meu IP e seu endereço IP externo será descoberto e adicionado automaticamente.
- No Rede de longa distância copie o URL da conexão.
Revisão de código - conectar-se ao Couchbase e atualizar documentos
Nós nos conectamos ao Couchbase Capella como primeira etapa, criando um objeto de cluster. Fornecemos o URL do endpoint capturado na etapa anterior, bem como as credenciais de usuário do banco de dados.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// Atualize isso para seu cluster Cordas ponto final = "cb.8c-f5di7pqrfjy8.cloud.couchbase.com"; Cordas nome de usuário = "dbuser"; Cordas senha = "Passw0rd!"; // A entrada do usuário termina aqui. Cordas bucketName = "amostra de viagem"; // Configuração do ambiente de cluster ClusterEnvironment env = ClusterEnvironment.construtor() .securityConfig( SecurityConfig.enableTls(verdadeiro).trustManagerFactory(InsecureTrustManagerFactory.INSTÂNCIA)) .ioConfig(IoConfig.enableDnsSrv(verdadeiro)).construir(); // Inicialize a conexão e forneça as credenciais de usuário do banco de dados Aglomerado agrupamento = Aglomerado.conectar(ponto final, ClusterOptions.clusterOptions(nome de usuário, senha).ambiente(env)); // Criar objeto bucket balde = agrupamento.balde(bucketName); balde.waitUntilReady(Duração.analisar("PT10S")); // criar objeto de coleção para a coleção 'hotel' localizada no escopo 'inventory' coleção = balde.escopo("inventário").coleção("hotel"); |
Depois que o objeto bucket for criado, poderemos recuperar o hotel e usá-lo posteriormente para inserir as alterações no documento. As hotel está dentro do escopo inventário escopo do amostra de viagem balde.
Observe que eu uso o padrão singleton para usar o mesmo objeto de coleção em todas as solicitações subsequentes. Veja a implementação de exemplo para obter detalhes em meu projeto no GitHub.
Vamos ver agora como podemos atualizar os documentos do hotel com os dados recebidos do Topic.
A mensagem recebida é uma string, portanto, como primeira etapa, precisamos convertê-la em um objeto JSON.
1 2 3 4 |
público vazio upsertHotelData(Cordas conteúdo) { ... hotelData = JsonObject.fromJson(conteúdo); ... |
Como os dados de entrada do tópico contêm apenas um subconjunto de todos os atributos do documento do hotel, precisamos considerar como atualizar os campos do documento com mais eficiência.
Há duas opções possíveis:
-
- Usamos o ID do documento para recuperar o documento completo do Couchbase, atualizar os atributos e gravar o documento novamente.
- Usamos a API de subdocumento no Couchbase para atualizar apenas os campos dedicados dentro do documento, em vez de substituir o documento inteiro.
A primeira opção é comumente usada para atualizar ou fazer upsert de documentos. No entanto, se trabalharmos com documentos maiores e quisermos atualizar apenas uma pequena parte deles, não há necessidade de enviar o documento inteiro pela rede. Em nossa implementação abaixo, usaremos a API de subdocumento para resolver e atualizar campos no documento do hotel.
1 2 3 4 5 6 7 8 9 |
//Em vez de resolver o documento inteiro, usamos a API de subdocumento //para resolver apenas as partes relevantes do documento do hotel antes da atualização. //Aqui podemos definir o caminho para os atributos relevantes do documento. //Em nosso cenário, 'pets_ok' e 'free_internet' estão na raiz do documento. LookupInResult resultado = coleção.lookupIn(hotelData.getString("hotel_id"), Matrizes.asList(obter("pets_ok"), obter("free_internet"))); /Exibe os valores atuais antes da atualização do documento Sistema.fora.println("Valores anteriores à atualização: 'pets_ok': " + resultado.contentAs(0, Cordas.classe) + ", 'free_internet': " + resultado.contentAs(1, Cordas.classe)); |
Usamos o lookupIn para consultar o documento em relação a determinados caminhos. Nesse caso, os diferentes componentes do caminho são separados por pontos (.). A operação pets_ok e internet_livre estão localizados na raiz do documento em nosso cenário. Em seguida, percorremos o resultado e exibimos os valores atuais.
1 2 3 4 5 6 |
/Mesmo aqui usamos a API de subdocumento. //Em vez de atualizar o documento inteiro, atualizamos apenas os atributos relevantes. //O método UPSERT atualizará os atributos se eles já existirem ou os criará caso eles não existam no documento MutateInResult upsertResult = coleção.mutateIn(hotelData.getString("hotel_id"), Matrizes.asList(upsert("pets_ok", hotelData.getBoolean("pets_ok")), upsert("free_internet", hotelData.getBoolean("free_internet")))); |
O mutateIn permite a atualização de um ou mais caminhos em um documento. Identificamos o documento no Couchbase por seu ID e fornecemos uma matriz de caminhos e valores que queremos definir.
Por último, mas não menos importante, executamos o lookupIn para verificar se a atualização foi bem-sucedida.
1 2 3 4 |
//Resolver e exibir os atributos atualizados LookupInResult resultado1 = coleção.lookupIn(hotelData.getString("hotel_id"), Matrizes.asList(obter("pets_ok"), obter("free_internet"))); Sistema.fora.println("Valores após a atualização: 'pets_ok': " + resultado1.contentAs(0, Cordas.classe) + ", 'free_internet': " + resultado1.contentAs(1, Cordas.classe)); |
Próximas etapas
Este artigo nos ensinou a integrar o corretor de eventos Solace ao Couchbase usando os SDKs Java do Couchbase e do Solace.
- O exemplo de código completo está disponível em meu repositório do GitHub para este projeto.
- Aprenda a enviar mensagens de Couchbase em um tópico do Solace nesta postagem do blog (Parte 1).
- Leia sobre os recursos disponíveis SDKs do Couchbase aqui.
- Use o Playground do Couchbase para começar a se desenvolver rapidamente.
- Tente Solace PubSub+ Nuvem com um teste.
- Comece a usar o Couchbase hoje mesmo com o Banco de dados em nuvem Couchbase Capella.