Nesta postagem, vamos abordar o tema reativo até o fim!
Alguns clientes do Couchbase usam Vert.xuma estrutura para escrever aplicativos totalmente assíncronos, sabendo que o SDK Java do Couchbase se encaixa bem nesse quadro, sendo assíncrono desde o início e expondo um RxJava-API assíncrona com base em dados.
Portanto, veremos como começar a trabalhar rapidamente com um Couchbase Vertical que inicia uma conexão com um Couchbase Aglomerado e Balde e, em seguida, serve documentos JSON do banco de dados, usando o Java 8.
Esta postagem do blog pressupõe que você esteja familiarizado com a noções básicas do Vert.x. Aqui está uma breve tabela de conteúdo:
- Início de um novo projeto Vert.x
- Obtenção assíncrona de um bucket
- Desmontando o SDK de forma graciosa
- Vendo-o em ação
- Indo além
- Conclusão
Início de um novo projeto Vert.x
Vamos começar criando um novo projeto baseado em Maven: crie uma pasta raiz para o seu projeto e inicialize uma estrutura de diretório maven (ou use seu arquétipo Maven favorito). Por exemplo, você pode usar o seguinte comando: "mkdir -p cbvertx/src/main/java/com/couchbase/demo/vertx/“.
Agora vamos iniciar o pom.xml na raiz do projeto:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
<!--?xml versão="1.0" codificação="UTF-8"?--> 4.0.0 com.couchbase.demonstração cbvertx 1.0-FOTOGRAFIA io.vertx vertx-núcleo 3.1.0 io.vertx vertx-rx-java 3.1.0 com.couchbase.cliente java-cliente 2.2.2 <!-- este é de fato já a transitivo dependência de o Java SDK--> io.reativox rxjava 1.0.15 log4j log4j 1.2.17 mentor-compilador-plug-in 3.3 1.8 1.8 |
Como você pode ver, usaremos Vert.x versão 3.1.0 e sua extensão para associações no RxJava, SDK Java do Couchbase versão 2.2.2 e RxJava versão 1.0.15…
Esqueleto da partícula
Basearemos nossa CouchbaseVerticle no AbstractVerticle em io.vertx.rxjava.core (da extensão vertx-rx-java). Vamos criar seu esqueleto no projeto:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
pacote com.couchbase.demonstração.vertx; importação java.util.ArrayList; importação java.util.Lista; importação com.couchbase.cliente.java.CouchbaseAsyncCluster; importação io.vertx.núcleo.Contexto; importação io.vertx.núcleo.Vertx; importação io.vertx.núcleo.json.JsonArray; importação io.vertx.núcleo.json.JsonObject; importação io.vertx.núcleo.registro.Registrador; importação io.vertx.núcleo.registro.Fábrica de registradores; importação io.vertx.rxjava.núcleo.AbstractVerticle; público classe CouchbaseVerticle se estende AbstractVerticle { privado estático final Registrador LOGGER = Fábrica de registradores.getLogger(CouchbaseVerticle.classe); privado CouchbaseAsyncCluster agrupamento; } |
A fase de inicialização que escreveremos logo após isso mostrará como usar a configuração do Vert.x para determinar, no tempo de execução, de quais nós do cluster do Couchbase faremos o bootstrap. A instanciação do CouchbaseCluster ainda é leve o suficiente para que possa ser feito dessa forma durante o init.
Adicione o seguinte método init ao CouchbaseVerticle:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Substituir público vazio inicial(Vertx vertx, Contexto contexto) { super.inicial(vertx, contexto); //obtenção do JSON de configuração JsonObject configuração = contexto.configuração(); //obtenção do nó bootstrap, como uma matriz JSON (padrão para localhost) JsonArray seedNodeArray = configuração.getJsonArray("couchbase.seedNodes", novo JsonArray().adicionar("localhost")); //converter em uma lista Lista seedNodes = novo ArrayList<>(seedNodeArray.tamanho()); para (Objeto seedNode : seedNodeArray) { seedNodes.adicionar((Cordas) seedNode); } //use isso para inicializar o Cluster este.agrupamento = CouchbaseAsyncCluster.criar(seedNodes); } |
Obtenção assíncrona de um bucket
O principal ponto de entrada para a API do Couchbase é o Balde para a API de sincronização, ou AsyncBucket para a API assíncrona. Estabelecer a conexão com o bucket ("abri-lo") é muito mais pesado, portanto, deve ser feito de forma assíncrona.
Vamos ver como podemos iniciar nossa partícula abrindo primeiro o bucket que usaremos durante toda a vida útil da partícula. Queremos manter uma referência a ele e usar a função start(Future startFuture) para notificar de forma assíncrona o Vert.x de que o Verticle está pronto:
|
1 2 3 4 5 6 7 8 9 10 11 |
privado volátil AsyncBucket balde; @Substituir público vazio iniciar(Futuro startFuture) lançamentos Exceção { agrupamento.openBucket(configuração().getString("couchbase.bucketName", "default"), configuração().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.informações("Balde aberto" + openedBucket.nome())) .assinar( openedBucket -> balde = openedBucket, startFuture::falhar, startFuture::completo); } |
Observe que primeiro obtemos o nome do bucket (e a senha associada, se relevante) dinamicamente, a partir da configuração do Vert.x. Abrimos o bucket de forma assíncrona, estabelecendo as conexões e os recursos internos do SDK.
O doOnNext é usado para registrar a abertura do balde.
Em seguida, assinamos nosso Observável e descrever como queremos "consumir" os dados finais:
- Ao receber a referência do bucket, nós a armazenamos em um campo para uso posterior
- Se houver um erro no caminho, falharemos na inicialização do Verticle usando o
Future#failmétodo. - Caso contrário, notificamos o Vert.x de que o Verticle foi iniciado com sucesso usando o comando
Future#completemétodo.
Esse é um bom começo!
Desmontando o SDK de forma graciosa
Quando o Verticle parar, os recursos criados pelo SDK deverão ser descartados adequadamente. O Aglomerado tem um objeto desconectar que faz isso, chamando recursivamente o método próximo em cada Balde que ele abriu (close pode ser usado para descartar um único Bucket).
Além disso, desde 1.0.15 O RxJava tem um método para encerrar seus Threads internos: Agendadores.shutdown. No entanto, isso deve ser invocado somente quando não houver uso subsequente do RxJava no aplicativo, portanto, pode ser uma ideia melhor fazer isso no desligamento do Vert.x...
Mais uma vez, interromperemos o Verticle de forma assíncrona, usando um Futuro para notificar a estrutura sobre a conclusão da parada:
|
1 2 3 4 5 6 7 8 9 |
@Substituir público vazio parar(Futuro stopFuture) lançamentos Exceção { agrupamento.desconectar() .doOnNext(isDisconnectedCleanly -> LOGGER.informações("Cluster desconectado (linhas limpas): " + isDisconnectedCleanly + ")")) .assinar( isDisconnectedCleanly -> stopFuture.completo(), stopFuture::falhar, Agendadores::desligamento); } |
(optamos por encerrar o RxJava após a conclusão da desconexão do SDK)
Observação Você pode ajustar o SDK passando um
CouchbaseEnvironmentapós a criação doAglomerado. Nesse caso, cabe a você também ligar paradesligamentono ambiente ao desativar todo o SDK (ou seja, quando todos os clusters em que você usou o ambiente, geralmente apenas um, forem desativados).Se você não tiver criado um ambiente específico, o SDK criará internamente um ambiente e o encerrará adequadamente, cujo resultado é visto acima no
isDisconnectedCleanlyvariável.
Vendo-o em ação
Vamos criar um principal que incorpora o Vert.x, implementa o Verticle e depois para. Observe que essa é uma implementação bastante ingênua com CountDownLatches, em que você normalmente prefere usar a linha de comando ou o Lançador como uma classe principal.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
público estático vazio principal(Cordas[] argumentos) lançamentos InterruptedException { Vertx vertx = Vertx.vertx(); final CountDownLatch startLatch = novo CountDownLatch(1); vertx.implantarVerículo(novo CouchbaseVerticle(), evento -> { se (evento.bem-sucedido()) LOGGER.informações("Verticle Deployed - " + evento.resultado()); mais LOGGER.erro("Erro de implantação de partícula", evento.causa()); startLatch.countDown(); }); startLatch.aguardar(); final CountDownLatch stopLatch = novo CountDownLatch(1); vertx.próximo(evento -> { se (evento.bem-sucedido()) LOGGER.informações("Vert.x Parado - " + evento.resultado()); mais LOGGER.erro("Erro de parada do Vert.x", evento.causa()); stopLatch.countDown(); }); stopLatch.aguardar(); } |
Se você executar isso, verá o seguinte (notou a diferença no formato do registro de data e hora? 2015-12-11 são do SDK, enquanto os 11 de dezembro de 2015 são do Vert.x):
|
1 2 3 4 5 6 7 8 9 10 11 12 |
2015-12-11 16:21:20 INFORMAÇÕES Nó:135 - Conectado para Nó localhost 2015-12-11 16:21:20 INFORMAÇÕES Provedor de configuração:263 - Aberto balde padrão Dez 11, 2015 4:21:20 PM com.couchbase.demonstração.vertx.CouchbaseVerticle INFORMAÇÕES: Balde aberto padrão Dez 11, 2015 4:21:20 PM com.couchbase.demonstração.vertx.CouchbaseVerticle INFORMAÇÕES: Vertical Implementado - caf06dd3-c8d1-4b89-8de0-58f09467b805 2015-12-11 16:21:20 INFORMAÇÕES Provedor de configuração:284 - Fechado balde padrão 2015-12-11 16:21:20 INFORMAÇÕES Nó:145 - Desconectado de Nó localhost Dez 11, 2015 4:21:22 PM com.couchbase.demonstração.vertx.CouchbaseVerticle INFORMAÇÕES: Desconectado Aglomerado (limpo fios: verdadeiro) Dez 11, 2015 4:21:22 PM com.couchbase.demonstração.vertx.CouchbaseVerticle INFORMAÇÕES: Vert.x Parado - nulo |
Como verificar o comportamento do erro? Poderíamos simplesmente alterar a senha para uma que esteja errada, apenas para verificar os registros, que seriam semelhantes:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
2015-12-11 16:25:45 AVISO Ponto final:283 - [nulo][KeyValueEndpoint]: Autenticação Falha. 2015-12-11 16:25:45 AVISO Conversor de status de resposta:129 - Desconhecido Status da resposta com Protocolo HTTP: 401 2015-12-11 16:25:45 AVISO Conversor de status de resposta:129 - Desconhecido Status da resposta com Protocolo HTTP: 401 Dez 11, 2015 4:25:45 PM com.couchbase.demonstração.vertx.CouchbaseVerticle SEVERO: Vertical implantação erro com.couchbase.cliente.java.erro.InvalidPasswordException: Senhas para balde "default" fazer não partida. em com.couchbase.cliente.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:210) em com.couchbase.cliente.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:200) em rx.interno.operadores.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:99) em rx.interno.operadores.OperatorMap$1.onError(OperatorMap.java:48) em rx.observadores.Assinantes$5.onError(Assinantes.java:229) em rx.interno.operadores.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:197) em rx.interno.operadores.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:170) em rx.interno.programadores.ScheduledAction.executar(ScheduledAction.java:55) em java.util.concomitante.Executores$Adaptador executável.chamada(Executores.java:511) em java.util.concomitante.FutureTask.executar(FutureTask.java:266) em java.util.concomitante.ScheduledThreadPoolExecutor$ScheduledFutureTask.acesso$201(ScheduledThreadPoolExecutor.java:180) em java.util.concomitante.ScheduledThreadPoolExecutor$ScheduledFutureTask.executar(ScheduledThreadPoolExecutor.java:293) em java.util.concomitante.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) em java.util.concomitante.ThreadPoolExecutor$Trabalhador.executar(ThreadPoolExecutor.java:617) em java.lang.Tópico.executar(Tópico.java:745) Dez 11, 2015 4:25:45 PM com.couchbase.demonstração.vertx.CouchbaseVerticle INFORMAÇÕES: Vert.x Parado - nulo |
Assim, implantamos (e paramos) com sucesso nosso primeiro Couchbase Verticle!
Parabéns!
/! Não se esqueça de alterar a senha de volta para a correta
Indo além
Vamos tentar fazer um pouco mais com esse Verticle. Que tal tentarmos preparar dados de amostra no Couchbase e servi-los em um endpoint REST gerenciado pelo Vert.x?
Criação de dados de amostra no Couchbase na inicialização
Criaremos dois documentos de amostra no Couchbase durante a inicialização do Verticle, com os usuários Alice e Bob.
É possível armazenar JSON no Couchbase usando dois Documento implementações:
JsonDocumenté o padrão. Ele se baseia em uma representação JSON simples fornecida pelo SDK, oJsonObject.RawJsonDocumenté útil quando você já tem JSON marshalling/unmarshalling em seu aplicativo (ou outra forma de representar JSON, como o próprioJsonObject). Nessa implementação, o que você passa é a representação bruta da cadeia de caracteres JSON.
Aqui estão Alice e Bob, criados usando ambas as alternativas:
|
1 2 |
JsonDocument.criar("user1", com.couchbase.cliente.java.documento.json.JsonObject.criar() .colocar("name" (nome), "Alice").colocar("idade", 26)) |
e
|
1 |
RawJsonDocument.criar("user2", novo JsonObject().colocar("name" (nome), "Bob").colocar("idade", 31).codificar()) |
Agora, o método start precisa de um pequeno ajuste. Em vez de salvar a referência ao bucket na assinatura, moveremos isso anteriormente em um doOnNext. Depois disso, criaremos os documentos e faremos um Observable com eles usando Observável.just. Isso pode ser encaminhado ao SDK para inserção usando flatMap:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Substituir público vazio iniciar(Futuro startFuture) lançamentos Exceção { agrupamento.openBucket(configuração().getString("couchbase.bucketName", "default"), configuração().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.informações("Balde aberto" + openedBucket.nome())) .doOnNext(openedBucket -> balde = openedBucket) .flatMap(nowIgnoreBucket -> Observável.apenas( JsonDocument.criar("user1", com.couchbase.cliente.java.documento.json.JsonObject.criar() .colocar("name" (nome), "Alice").colocar("idade", 26)), RawJsonDocument.criar("user2", novo JsonObject().colocar("name" (nome), "Bob").colocar("idade", 31).codificar()) )) .flatMap(doc -> balde.upsert(doc)) .assinar(Ações.vazio(), startFuture::falhar, startFuture::completo); } |
O uso de upsert aqui garante que os documentos serão criados, quer a chave já exista no banco de dados ou não.
Fornecimento de dados JSON do Couchbase
Vamos modificar nossa partícula para que ela não pare imediatamente e, em vez disso, gire um servidor HTTP que tentará recuperar um documento json do banco de dados e enviá-lo a um cliente quando a rota usuário/{id} é usado:
Aqui está uma maneira rápida e simples de usar o Vert.x Lançador para iniciar o programa (que não interromperá o núcleo do Vert.x imediatamente). Substitua o conteúdo de nosso principal com o seguinte:
|
1 2 |
Cordas[] vertxArgs = novo Cordas[] { "executar", "com.couchbase.demo.vertx.CouchbaseVerticle" }; Lançador.principal(vertxArgs); |
Observação: Em um aplicativo real,
Lançadornormalmente se tornaria a classe principal do jar e você passaria os argumentos da linha de comando diretamente.
Agora vamos ativar um servidor HTTP na inicialização do Verticle. Insira o seguinte código na seção iniciar logo após o método flatMap(doc -> bucket.upsert(doc)) ligar:
|
1 2 3 4 |
.último() .flatMap(ignorar -> vertx.createHttpServer() .manipulador de pedidos(este::manuseio) .listenObservable(8080)) |
Precisamos criar o manuseio para configurar essa rota:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
privado vazio manuseio(HttpServerRequest r) { Cordas[] caminho = r.caminho().dividir("/"); se (caminho.comprimento == 3 &lificador;&lificador; "usuários".iguais(caminho[1])) { balde.obter(caminho[2], RawJsonDocument.classe) .switchIfEmpty(Observável.erro(novo DocumentDoesNotExistException(caminho[2]))) .assinar(doc -> r.resposta() .putHeader("content-type", "application/json") .final(doc.conteúdo()), erro -> { r.resposta() .putHeader("content-type", "application/json") .setStatusCode(500).setStatusMessage(erro.toString()) .final("{"erro": "" + error.toString() + ""}"); }); } } |
Vamos testá-lo: execute o aplicativo e acesse este URL: http://localhost:8080/users/user1. Você verá o JSON da Alice, servido diretamente do Couchbase!
|
1 2 3 4 |
{ "name" (nome): "Alice", "idade": 26 } |
Para outra chavevocê deverá ver a exceção no formato JSON:
|
1 2 3 |
{ "error" (erro): "com.couchbase.client.java.error.DocumentDoesNotExistException: user3" } |
Interromper o Verticle por meio de um ponto de extremidade HTTP
Vamos adicionar rapidamente uma rota que pare a Vert.x, para diversão e lucro :)
|
1 2 3 4 |
//...substituindo a última linha em `handle` } mais se (r.caminho().iguais("/stop")) { r.resposta() .final(" |
Fechamento do Couchbase e do Vertx
Observe que, ao executar a partir de uma vertx Starter, isso não eliminará a thread principal
"); vertx.close(); }
Abertura http://localhost:8080/stop acionará a parada de todo o aplicativo Vert.x, destruindo os Verticles implantados.
Observação: Conforme indicado na mensagem, isso não elimina o processo quando executado a partir do IDE.
Conclusão
Nesta postagem do blog, descobrimos como Vert.x e o SDK Java do Couchbase podem trabalhar juntos para criar um aplicativo totalmente assíncrono.
Feliz codificação assíncrona!
Você tem o link do repositório do github onde a implementação acima está disponível?