Na versão anterior postagem no blog discutimos como o Couchbase pode se integrar perfeitamente em uma Arquitetura orientada a eventos. Usando o Couchbase Eventing Service, as mutações de documentos podem ser publicadas em um Solace PubSub+ de onde os dados são disponibilizados para os microsserviços assinantes quase em tempo real.
Esta postagem se concentra em como os microsserviços como assinantes de tópicos podem aproveitar o Couchbase como um armazenamento de dados escalável e resiliente.
Visão geral da arquitetura e aplicativo de amostra
Neste aplicativo de amostraEm seguida, assinamos um tópico do Solace PubSub+, processamos os dados e atualizamos o documento no Couchbase.
Em nosso aplicativo de exemplo, armazenamos os detalhes do hotel no Couchbase. As atualizações da configuração do hotel são publicadas em um tópico do Solace PubSub+. Criamos um aplicativo que assina o tópico, recupera os dados atualizados e atualiza a configuração do hotel existente no Couchbase.
Usamos o amostra de viagem que contém dados de amostra, como voos, rotas e outros. Em nosso exemplo, focamos no hotel documentos armazenados no inventário escopo no hotel coleção. Todos os documentos de hotel consistem em um documento JSON com vários atributos.
Atualizaremos apenas um pequeno subconjunto de atributos para este tutorial, como animais_permitidos e internet_livre.

O aplicativo de exemplo assina um tópico do Solace e atualiza os dados do hotel no Couchbase. Outros aplicativos podem fornecer as alterações na configuração do hotel. Usamos um serviço simples que publica atualizações no tópico.
Legenda: O aplicativo de amostra assina um tópico do Solace e atualiza os dados do hotel no Couchbase. Outros aplicativos podem fornecer as alterações na configuração do hotel. Usamos um serviço simples que publica atualizações no tópico.
Pré-requisitos
Consulte os pré-requisitos para este aplicativo de amostra:
Couchbase Capella
Criar uma conta gratuita Couchbase Capella e siga as instruções para provisionar seu cluster de avaliação. Isso leva apenas alguns minutos e fornece a você um cluster do Couchbase completo, incluindo o Dados e o conjunto de dados de amostra, amostra de viagem, que usaremos ao longo desta postagem do blog.
Você pode usar este tutorial quando não estiver usando nossa oferta gerenciada do Couchbase Capella. O amostra de viagem faz parte do produto do servidor Couchbase e pode ser importado usando o console do Couchbase.
Nuvem do corretor de eventos Solace PubSub+
Inscreva-se gratuitamente Avaliação do Solace na nuvem e criar um serviço/VPN. Usaremos a VPN para nos conectarmos a um Solace Topic.
Revisão do código - Assinante e editor do Solace
Para assinar mensagens do tópico do Solace, usamos a API Java do Solace. Este exemplo de implementação é derivado de exemplos na documentação do Solace. Para obter detalhes sobre a implementação, consulte a documentação do Solace.
|
1 2 3 4 5 6 7 8 9 |
final JCSMPProperties properties = new JCSMPProperties(); properties.setProperty(JCSMPProperties.HOST, "tcps://yourhost.messaging.solace.cloud:55443"); properties.setProperty(JCSMPProperties.USERNAME, "solace-cloud-client"); properties.setProperty(JCSMPProperties.PASSWORD, "password"); properties.setProperty(JCSMPProperties.VPN_NAME, "couchbasedemo"); JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties); session.connect(); final Topic topic = JCSMPFactory.onlyInstance().createTopic("tutorial/topic"); |
Configuramos o JCSMPProperties com as informações de acesso relevantes disponíveis na interface da Web do Solace. Em seguida, criamos um JCSMPSession e um Tópico.
Para assinar as mensagens no tópico, implementamos um XMLMessageConsumer:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// ### Message Receiver - Subscribe to Topic ### final XMLMessageConsumer cons = session.getMessageConsumer(new XMLMessageListener() { @Override public void onReceive(BytesXMLMessage msg) { if (msg instanceof TextMessage) { System.out.printf("SUBSCRIBE: Message received: '%s'%n", ((TextMessage) msg).getText()); UpdateHotelData.getInstance().upsertHotelData(((TextMessage) msg).getText()); } else { System.out.println("Message received."); } } @Override public void onException(JCSMPException e) { System.out.printf("Consumer received exception: %s%n", e); } }); session.addSubscription(topic); System.out.println("SUBSCRIBE: Connected. Awaiting message..."); cons.start(); |
No onReceive pegamos a mensagem, convertemos em uma string e, em seguida, chamamos o método upsertHotelData método de UpdateHotelData que contém a implementação do Couchbase discutida um pouco mais adiante nesta postagem do blog.
Adicionamos a assinatura do tópico à sessão e iniciamos o consumidor de mensagens.
Para podermos testar a implementação, também criamos um editor de mensagens. Para isso, primeiro implementamos um XMLMessageProducer.
|
1 2 3 4 5 6 7 8 9 10 11 12 |
// ### Message Producer - Send message to Topic XMLMessageProducer prod = session.getMessageProducer(new JCSMPStreamingPublishCorrelatingEventHandler() { @Override public void responseReceivedEx(Object key) { System.out.println("Producer received response for msg: " + key.toString()); } @Override public void handleErrorEx(Object key, JCSMPException cause, long timestamp) { System.out.printf("Producer received error for msg: %s@%s - %s%n", key.toString(), timestamp, cause); } }); |
Com o produtor de mensagens instalado, agora podemos enviar mensagens, conforme mostrado abaixo.
|
1 2 3 4 5 6 7 8 9 |
// Send messages. Here we loop through and send the same message multiple times. for (int msgsSent = 0; msgsSent < 2; ++msgsSent) { TextMessage msg = JCSMPFactory.onlyInstance().createMessage(TextMessage.class); msg.setText(data); System.out.printf("PUBLISH: Sending message '%s' to topic '%s'...%n", data, topic.getName()); prod.send(msg, topic); //Gives us some time to follow the console logs Thread.sleep(3000); } |
Inserção dos dados no Couchbase
Com a lógica para assinar e publicar mensagens implementada, vamos nos concentrar agora em como atualizar os documentos do hotel no Couchbase.
Para este exemplo, supomos que a mensagem recuperada do tópico tenha o formato JSON abaixo. Incluindo um hotel_id que usaremos como o ID do documento dos documentos do hotel armazenados no Couchbase, bem como vários atributos que podemos alterar:
|
1 2 3 4 5 6 7 |
{ "hotel_id": "hotel_10025", "pets_ok": true, "free_breakfast": true, "free_internet": true, "free_parking": true } |
Para estabelecer uma conexão segura de nosso aplicativo com o Couchbase Capella, precisamos:
-
- Crie um usuário de banco de dados e conceda a esse usuário acesso aos dados do hotel.
- Coloque na lista de permissões o endereço IP do host que está executando o aplicativo
- Capture o URL da conexão segura
Você pode gerenciar todos esses detalhes no plano de gerenciamento do Capella. Faça login na Capella no navegador:
- Abra o Teste - Cluster e navegue até o cluster Conectar guia
- Role para baixo até a seção Acesso ao banco de dados e clique em Gerenciar credenciais

- Clique em Criar credenciais de banco de dados e forneça um nome de usuário e uma senha do banco de dados. Configure o acesso em nível de Bucket para o inventário escopo no amostra de viagem e conceder acesso de leitura/gravação.

- Coloque o endereço IP do aplicativo na lista de permissões clicando em Gerenciar IPs permitidos e depois Adicionar IP permitido. Ao executar o aplicativo a partir de sua máquina local, basta clicar em Adicionar meu IP e seu endereço IP externo será descoberto e adicionado automaticamente.

- No Rede de longa distância copie o URL da conexão.
Revisão de código - conectar-se ao Couchbase e atualizar documentos
Nós nos conectamos ao Couchbase Capella como primeira etapa, criando um objeto de cluster. Fornecemos o URL do endpoint capturado na etapa anterior, bem como as credenciais de usuário do banco de dados.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// Update this to your cluster String endpoint = "cb.8c-f5di7pqrfjy8.cloud.couchbase.com"; String username = "dbuser"; String password = "Passw0rd!"; // User Input ends here. String bucketName = "travel-sample"; // Cluster Environment configuration ClusterEnvironment env = ClusterEnvironment.builder() .securityConfig( SecurityConfig.enableTls(true).trustManagerFactory(InsecureTrustManagerFactory.INSTANCE)) .ioConfig(IoConfig.enableDnsSrv(true)).build(); // Initialize the Connection and provide database user credentials Cluster cluster = Cluster.connect(endpoint, ClusterOptions.clusterOptions(username, password).environment(env)); // Create bucket object bucket = cluster.bucket(bucketName); bucket.waitUntilReady(Duration.parse("PT10S")); // create collection object for the 'hotel' collection located in the 'inventory' scope collection = bucket.scope("inventory").collection("hotel"); |
Depois que o objeto bucket for criado, poderemos recuperar o hotel e usá-lo posteriormente para inserir as alterações no documento. As hotel está dentro do escopo inventário escopo do amostra de viagem balde.
Observe que eu uso o padrão singleton para usar o mesmo objeto de coleção em todas as solicitações subsequentes. Veja a implementação de exemplo para obter detalhes em meu projeto no GitHub.
Vamos ver agora como podemos atualizar os documentos do hotel com os dados recebidos do Topic.
A mensagem recebida é uma string, portanto, como primeira etapa, precisamos convertê-la em um objeto JSON.
|
1 2 3 4 |
public void upsertHotelData(String content) { … hotelData = JsonObject.fromJson(content); … |
Como os dados de entrada do tópico contêm apenas um subconjunto de todos os atributos do documento do hotel, precisamos considerar como atualizar os campos do documento com mais eficiência.
Há duas opções possíveis:
-
- Usamos o ID do documento para recuperar o documento completo do Couchbase, atualizar os atributos e gravar o documento novamente.
- Usamos a API de subdocumento no Couchbase para atualizar apenas os campos dedicados dentro do documento, em vez de substituir o documento inteiro.
A primeira opção é comumente usada para atualizar ou fazer upsert de documentos. No entanto, se trabalharmos com documentos maiores e quisermos atualizar apenas uma pequena parte deles, não há necessidade de enviar o documento inteiro pela rede. Em nossa implementação abaixo, usaremos a API de subdocumento para resolver e atualizar campos no documento do hotel.
|
1 2 3 4 5 6 7 8 9 |
//Instead of resolving the entire document we use the sub-document API //to only resolve the relevant parts of the hotel document prior to the update. //Here we can define the path to the relevant attributes of the document. //In our scenario 'pets_ok' and 'free_internet' are at the document root. LookupInResult result = collection.lookupIn(hotelData.getString("hotel_id"), Arrays.asList(get("pets_ok"), get("free_internet"))); //Display the current values prior to the document update System.out.println("Values prior to update: 'pets_ok': " + result.contentAs(0, String.class) + ", 'free_internet': " + result.contentAs(1, String.class)); |
Usamos o lookupIn para consultar o documento em relação a determinados caminhos. Nesse caso, os diferentes componentes do caminho são separados por pontos (.). A operação pets_ok e internet_livre estão localizados na raiz do documento em nosso cenário. Em seguida, percorremos o resultado e exibimos os valores atuais.
|
1 2 3 4 5 6 |
//Even here we use the sub-document API. //Instead of updating the entire document we only update the relevant attributes. //The UPSERT method will either update the attributes if they already exist or create them in case they do not exist in within the document MutateInResult upsertResult = collection.mutateIn(hotelData.getString("hotel_id"), Arrays.asList(upsert("pets_ok", hotelData.getBoolean("pets_ok")), upsert("free_internet", hotelData.getBoolean("free_internet")))); |
O mutateIn permite a atualização de um ou mais caminhos em um documento. Identificamos o documento no Couchbase por seu ID e fornecemos uma matriz de caminhos e valores que queremos definir.
Por último, mas não menos importante, executamos o lookupIn para verificar se a atualização foi bem-sucedida.
|
1 2 3 4 |
//Resolve and display the updated attributes LookupInResult result1 = collection.lookupIn(hotelData.getString("hotel_id"), Arrays.asList(get("pets_ok"), get("free_internet"))); System.out.println("Values after the update: 'pets_ok': " + result1.contentAs(0, String.class) + ", 'free_internet': " + result1.contentAs(1, String.class)); |
Próximas etapas
Este artigo nos ensinou a integrar o corretor de eventos Solace ao Couchbase usando os SDKs Java do Couchbase e do Solace.
- O exemplo de código completo está disponível em meu repositório do GitHub para este projeto.
- Aprenda a enviar mensagens de Couchbase em um tópico do Solace nesta postagem do blog (Parte 1).
- Leia sobre os recursos disponíveis SDKs do Couchbase aqui.
- Use o Playground do Couchbase para começar a se desenvolver rapidamente.
- Tente Solace PubSub+ Nuvem com um teste.
- Comece a usar o Couchbase hoje mesmo com o Banco de dados em nuvem Couchbase Capella.