Nesta postagem, vamos abordar o tema reativo até o fim!
Alguns clientes do Couchbase usam Vert.x
uma estrutura para escrever aplicativos totalmente assíncronos, sabendo que o SDK Java do Couchbase
se encaixa bem nesse quadro, sendo assíncrono desde o início e expondo um RxJava
-API assíncrona com base em dados.
Portanto, veremos como começar a trabalhar rapidamente com um Couchbase Vertical
que inicia uma conexão com um Couchbase Aglomerado
e Balde
e, em seguida, serve documentos JSON do banco de dados, usando o Java 8.
Esta postagem do blog pressupõe que você esteja familiarizado com a noções básicas do Vert.x. Aqui está uma breve tabela de conteúdo:
- Início de um novo projeto Vert.x
- Obtenção assíncrona de um bucket
- Desmontando o SDK de forma graciosa
- Vendo-o em ação
- Indo além
- Conclusão
Início de um novo projeto Vert.x
Vamos começar criando um novo projeto baseado em Maven: crie uma pasta raiz para o seu projeto e inicialize uma estrutura de diretório maven (ou use seu arquétipo Maven favorito). Por exemplo, você pode usar o seguinte comando: "mkdir -p cbvertx/src/main/java/com/couchbase/demo/vertx/
“.
Agora vamos iniciar o pom.xml
na raiz do projeto:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
<!--?xml versão="1.0" codificação="UTF-8"?--> 4.0.0 com.couchbase.demonstração cbvertx 1.0-FOTOGRAFIA io.vertx vertx-núcleo 3.1.0 io.vertx vertx-rx-java 3.1.0 com.couchbase.cliente java-cliente 2.2.2 <!-- este é de fato já a transitivo dependência de o Java SDK--> io.reativox rxjava 1.0.15 log4j log4j 1.2.17 mentor-compilador-plug-in 3.3 1.8 1.8 |
Como você pode ver, usaremos Vert.x
versão 3.1.0
e sua extensão para associações no RxJava, SDK Java do Couchbase
versão 2.2.2
e RxJava
versão 1.0.15
…
Esqueleto da partícula
Basearemos nossa CouchbaseVerticle
no AbstractVerticle
em io.vertx.rxjava.core
(da extensão vertx-rx-java). Vamos criar seu esqueleto no projeto:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
pacote com.couchbase.demonstração.vertx; importação java.util.ArrayList; importação java.util.Lista; importação com.couchbase.cliente.java.CouchbaseAsyncCluster; importação io.vertx.núcleo.Contexto; importação io.vertx.núcleo.Vertx; importação io.vertx.núcleo.json.JsonArray; importação io.vertx.núcleo.json.JsonObject; importação io.vertx.núcleo.registro.Registrador; importação io.vertx.núcleo.registro.Fábrica de registradores; importação io.vertx.rxjava.núcleo.AbstractVerticle; público classe CouchbaseVerticle se estende AbstractVerticle { privado estático final Registrador LOGGER = Fábrica de registradores.getLogger(CouchbaseVerticle.classe); privado CouchbaseAsyncCluster agrupamento; } |
A fase de inicialização que escreveremos logo após isso mostrará como usar a configuração do Vert.x para determinar, no tempo de execução, de quais nós do cluster do Couchbase faremos o bootstrap. A instanciação do CouchbaseCluster
ainda é leve o suficiente para que possa ser feito dessa forma durante o init.
Adicione o seguinte método init ao CouchbaseVerticle
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Substituir público vazio inicial(Vertx vertx, Contexto contexto) { super.inicial(vertx, contexto); //obtenção do JSON de configuração JsonObject configuração = contexto.configuração(); //obtenção do nó bootstrap, como uma matriz JSON (padrão para localhost) JsonArray seedNodeArray = configuração.getJsonArray("couchbase.seedNodes", novo JsonArray().adicionar("localhost")); //converter em uma lista Lista seedNodes = novo ArrayList<>(seedNodeArray.tamanho()); para (Objeto seedNode : seedNodeArray) { seedNodes.adicionar((Cordas) seedNode); } //use isso para inicializar o Cluster este.agrupamento = CouchbaseAsyncCluster.criar(seedNodes); } |
Obtenção assíncrona de um bucket
O principal ponto de entrada para a API do Couchbase é o Balde
para a API de sincronização, ou AsyncBucket
para a API assíncrona. Estabelecer a conexão com o bucket ("abri-lo") é muito mais pesado, portanto, deve ser feito de forma assíncrona.
Vamos ver como podemos iniciar nossa partícula abrindo primeiro o bucket que usaremos durante toda a vida útil da partícula. Queremos manter uma referência a ele e usar a função start(Future startFuture)
para notificar de forma assíncrona o Vert.x de que o Verticle está pronto:
1 2 3 4 5 6 7 8 9 10 11 |
privado volátil AsyncBucket balde; @Substituir público vazio iniciar(Futuro startFuture) lançamentos Exceção { agrupamento.openBucket(configuração().getString("couchbase.bucketName", "default"), configuração().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.informações("Balde aberto" + openedBucket.nome())) .assinar( openedBucket -> balde = openedBucket, startFuture::falhar, startFuture::completo); } |
Observe que primeiro obtemos o nome do bucket (e a senha associada, se relevante) dinamicamente, a partir da configuração do Vert.x. Abrimos o bucket de forma assíncrona, estabelecendo as conexões e os recursos internos do SDK.
O doOnNext
é usado para registrar a abertura do balde.
Em seguida, assinamos nosso Observável
e descrever como queremos "consumir" os dados finais:
- Ao receber a referência do bucket, nós a armazenamos em um campo para uso posterior
- Se houver um erro no caminho, falharemos na inicialização do Verticle usando o
Future#fail
método. - Caso contrário, notificamos o Vert.x de que o Verticle foi iniciado com sucesso usando o comando
Future#complete
método.
Esse é um bom começo!
Desmontando o SDK de forma graciosa
Quando o Verticle parar, os recursos criados pelo SDK deverão ser descartados adequadamente. O Aglomerado
tem um objeto desconectar
que faz isso, chamando recursivamente o método próximo
em cada Balde
que ele abriu (close pode ser usado para descartar um único Bucket).
Além disso, desde 1.0.15
O RxJava tem um método para encerrar seus Threads internos: Agendadores.shutdown
. No entanto, isso deve ser invocado somente quando não houver uso subsequente do RxJava no aplicativo, portanto, pode ser uma ideia melhor fazer isso no desligamento do Vert.x...
Mais uma vez, interromperemos o Verticle de forma assíncrona, usando um Futuro
para notificar a estrutura sobre a conclusão da parada:
1 2 3 4 5 6 7 8 9 |
@Substituir público vazio parar(Futuro stopFuture) lançamentos Exceção { agrupamento.desconectar() .doOnNext(isDisconnectedCleanly -> LOGGER.informações("Cluster desconectado (linhas limpas): " + isDisconnectedCleanly + ")")) .assinar( isDisconnectedCleanly -> stopFuture.completo(), stopFuture::falhar, Agendadores::desligamento); } |
(optamos por encerrar o RxJava após a conclusão da desconexão do SDK)
Observação Você pode ajustar o SDK passando um
CouchbaseEnvironment
após a criação doAglomerado
. Nesse caso, cabe a você também ligar paradesligamento
no ambiente ao desativar todo o SDK (ou seja, quando todos os clusters em que você usou o ambiente, geralmente apenas um, forem desativados).Se você não tiver criado um ambiente específico, o SDK criará internamente um ambiente e o encerrará adequadamente, cujo resultado é visto acima no
isDisconnectedCleanly
variável.
Vendo-o em ação
Vamos criar um principal
que incorpora o Vert.x, implementa o Verticle e depois para. Observe que essa é uma implementação bastante ingênua com CountDownLatches, em que você normalmente prefere usar a linha de comando ou o Lançador
como uma classe principal.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
público estático vazio principal(Cordas[] argumentos) lançamentos InterruptedException { Vertx vertx = Vertx.vertx(); final CountDownLatch startLatch = novo CountDownLatch(1); vertx.implantarVerículo(novo CouchbaseVerticle(), evento -> { se (evento.bem-sucedido()) LOGGER.informações("Verticle Deployed - " + evento.resultado()); mais LOGGER.erro("Erro de implantação de partícula", evento.causa()); startLatch.countDown(); }); startLatch.aguardar(); final CountDownLatch stopLatch = novo CountDownLatch(1); vertx.próximo(evento -> { se (evento.bem-sucedido()) LOGGER.informações("Vert.x Parado - " + evento.resultado()); mais LOGGER.erro("Erro de parada do Vert.x", evento.causa()); stopLatch.countDown(); }); stopLatch.aguardar(); } |
Se você executar isso, verá o seguinte (notou a diferença no formato do registro de data e hora? 2015-12-11
são do SDK, enquanto os 11 de dezembro de 2015
são do Vert.x):
1 2 3 4 5 6 7 8 9 10 11 12 |
2015-12-11 16:21:20 INFORMAÇÕES Nó:135 - Conectado para Nó localhost 2015-12-11 16:21:20 INFORMAÇÕES Provedor de configuração:263 - Aberto balde padrão Dez 11, 2015 4:21:20 PM com.couchbase.demonstração.vertx.CouchbaseVerticle INFORMAÇÕES: Balde aberto padrão Dez 11, 2015 4:21:20 PM com.couchbase.demonstração.vertx.CouchbaseVerticle INFORMAÇÕES: Vertical Implementado - caf06dd3-c8d1-4b89-8de0-58f09467b805 2015-12-11 16:21:20 INFORMAÇÕES Provedor de configuração:284 - Fechado balde padrão 2015-12-11 16:21:20 INFORMAÇÕES Nó:145 - Desconectado de Nó localhost Dez 11, 2015 4:21:22 PM com.couchbase.demonstração.vertx.CouchbaseVerticle INFORMAÇÕES: Desconectado Aglomerado (limpo fios: verdadeiro) Dez 11, 2015 4:21:22 PM com.couchbase.demonstração.vertx.CouchbaseVerticle INFORMAÇÕES: Vert.x Parado - nulo |
Como verificar o comportamento do erro? Poderíamos simplesmente alterar a senha para uma que esteja errada, apenas para verificar os registros, que seriam semelhantes:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
2015-12-11 16:25:45 AVISO Ponto final:283 - [nulo][KeyValueEndpoint]: Autenticação Falha. 2015-12-11 16:25:45 AVISO Conversor de status de resposta:129 - Desconhecido Status da resposta com Protocolo HTTP: 401 2015-12-11 16:25:45 AVISO Conversor de status de resposta:129 - Desconhecido Status da resposta com Protocolo HTTP: 401 Dez 11, 2015 4:25:45 PM com.couchbase.demonstração.vertx.CouchbaseVerticle SEVERO: Vertical implantação erro com.couchbase.cliente.java.erro.InvalidPasswordException: Senhas para balde "default" fazer não partida. em com.couchbase.cliente.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:210) em com.couchbase.cliente.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:200) em rx.interno.operadores.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:99) em rx.interno.operadores.OperatorMap$1.onError(OperatorMap.java:48) em rx.observadores.Assinantes$5.onError(Assinantes.java:229) em rx.interno.operadores.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:197) em rx.interno.operadores.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:170) em rx.interno.programadores.ScheduledAction.executar(ScheduledAction.java:55) em java.util.concomitante.Executores$Adaptador executável.chamada(Executores.java:511) em java.util.concomitante.FutureTask.executar(FutureTask.java:266) em java.util.concomitante.ScheduledThreadPoolExecutor$ScheduledFutureTask.acesso$201(ScheduledThreadPoolExecutor.java:180) em java.util.concomitante.ScheduledThreadPoolExecutor$ScheduledFutureTask.executar(ScheduledThreadPoolExecutor.java:293) em java.util.concomitante.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) em java.util.concomitante.ThreadPoolExecutor$Trabalhador.executar(ThreadPoolExecutor.java:617) em java.lang.Tópico.executar(Tópico.java:745) Dez 11, 2015 4:25:45 PM com.couchbase.demonstração.vertx.CouchbaseVerticle INFORMAÇÕES: Vert.x Parado - nulo |
Assim, implantamos (e paramos) com sucesso nosso primeiro Couchbase Verticle!
Parabéns!
/! Não se esqueça de alterar a senha de volta para a correta
Indo além
Vamos tentar fazer um pouco mais com esse Verticle. Que tal tentarmos preparar dados de amostra no Couchbase e servi-los em um endpoint REST gerenciado pelo Vert.x?
Criação de dados de amostra no Couchbase na inicialização
Criaremos dois documentos de amostra no Couchbase durante a inicialização do Verticle, com os usuários Alice e Bob.
É possível armazenar JSON no Couchbase usando dois Documento
implementações:
JsonDocument
é o padrão. Ele se baseia em uma representação JSON simples fornecida pelo SDK, oJsonObject
.RawJsonDocument
é útil quando você já tem JSON marshalling/unmarshalling em seu aplicativo (ou outra forma de representar JSON, como o próprioJsonObject
). Nessa implementação, o que você passa é a representação bruta da cadeia de caracteres JSON.
Aqui estão Alice e Bob, criados usando ambas as alternativas:
1 2 |
JsonDocument.criar("user1", com.couchbase.cliente.java.documento.json.JsonObject.criar() .colocar("name" (nome), "Alice").colocar("idade", 26)) |
e
1 |
RawJsonDocument.criar("user2", novo JsonObject().colocar("name" (nome), "Bob").colocar("idade", 31).codificar()) |
Agora, o método start precisa de um pequeno ajuste. Em vez de salvar a referência ao bucket na assinatura, moveremos isso anteriormente em um doOnNext
. Depois disso, criaremos os documentos e faremos um Observable com eles usando Observável.just
. Isso pode ser encaminhado ao SDK para inserção usando flatMap
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Substituir público vazio iniciar(Futuro startFuture) lançamentos Exceção { agrupamento.openBucket(configuração().getString("couchbase.bucketName", "default"), configuração().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.informações("Balde aberto" + openedBucket.nome())) .doOnNext(openedBucket -> balde = openedBucket) .flatMap(nowIgnoreBucket -> Observável.apenas( JsonDocument.criar("user1", com.couchbase.cliente.java.documento.json.JsonObject.criar() .colocar("name" (nome), "Alice").colocar("idade", 26)), RawJsonDocument.criar("user2", novo JsonObject().colocar("name" (nome), "Bob").colocar("idade", 31).codificar()) )) .flatMap(doc -> balde.upsert(doc)) .assinar(Ações.vazio(), startFuture::falhar, startFuture::completo); } |
O uso de upsert
aqui garante que os documentos serão criados, quer a chave já exista no banco de dados ou não.
Fornecimento de dados JSON do Couchbase
Vamos modificar nossa partícula para que ela não pare imediatamente e, em vez disso, gire um servidor HTTP que tentará recuperar um documento json do banco de dados e enviá-lo a um cliente quando a rota usuário/{id}
é usado:
Aqui está uma maneira rápida e simples de usar o Vert.x Lançador
para iniciar o programa (que não interromperá o núcleo do Vert.x imediatamente). Substitua o conteúdo de nosso principal
com o seguinte:
1 2 |
Cordas[] vertxArgs = novo Cordas[] { "executar", "com.couchbase.demo.vertx.CouchbaseVerticle" }; Lançador.principal(vertxArgs); |
Observação: Em um aplicativo real,
Lançador
normalmente se tornaria a classe principal do jar e você passaria os argumentos da linha de comando diretamente.
Agora vamos ativar um servidor HTTP na inicialização do Verticle. Insira o seguinte código na seção iniciar
logo após o método flatMap(doc -> bucket.upsert(doc))
ligar:
1 2 3 4 |
.último() .flatMap(ignorar -> vertx.createHttpServer() .manipulador de pedidos(este::manuseio) .listenObservable(8080)) |
Precisamos criar o manuseio
para configurar essa rota:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
privado vazio manuseio(HttpServerRequest r) { Cordas[] caminho = r.caminho().dividir("/"); se (caminho.comprimento == 3 &lificador;&lificador; "usuários".iguais(caminho[1])) { balde.obter(caminho[2], RawJsonDocument.classe) .switchIfEmpty(Observável.erro(novo DocumentDoesNotExistException(caminho[2]))) .assinar(doc -> r.resposta() .putHeader("content-type", "application/json") .final(doc.conteúdo()), erro -> { r.resposta() .putHeader("content-type", "application/json") .setStatusCode(500).setStatusMessage(erro.toString()) .final("{"erro": "" + error.toString() + ""}"); }); } } |
Vamos testá-lo: execute o aplicativo e acesse este URL: http://localhost:8080/users/user1. Você verá o JSON da Alice, servido diretamente do Couchbase!
1 2 3 4 |
{ "name" (nome): "Alice", "idade": 26 } |
Para outra chavevocê deverá ver a exceção no formato JSON:
1 2 3 |
{ "error" (erro): "com.couchbase.client.java.error.DocumentDoesNotExistException: user3" } |
Interromper o Verticle por meio de um ponto de extremidade HTTP
Vamos adicionar rapidamente uma rota que pare a Vert.x, para diversão e lucro :)
1 2 3 4 |
//...substituindo a última linha em `handle` } mais se (r.caminho().iguais("/stop")) { r.resposta() .final(" |
Fechamento do Couchbase e do Vertx
Observe que, ao executar a partir de uma vertx Starter, isso não eliminará a thread principal
"); vertx.close(); }
Abertura http://localhost:8080/stop acionará a parada de todo o aplicativo Vert.x, destruindo os Verticles implantados.
Observação: Conforme indicado na mensagem, isso não elimina o processo quando executado a partir do IDE.
Conclusão
Nesta postagem do blog, descobrimos como Vert.x
e o SDK Java do Couchbase
podem trabalhar juntos para criar um aplicativo totalmente assíncrono.
Feliz codificação assíncrona!
Você tem o link do repositório do github onde a implementação acima está disponível?