Nesta postagem, vamos abordar o tema reativo até o fim!
Alguns clientes do Couchbase usam Vert.xuma estrutura para escrever aplicativos totalmente assíncronos, sabendo que o SDK Java do Couchbase se encaixa bem nesse quadro, sendo assíncrono desde o início e expondo um RxJava-API assíncrona com base em dados.
Portanto, veremos como começar a trabalhar rapidamente com um Couchbase Vertical que inicia uma conexão com um Couchbase Aglomerado e Balde e, em seguida, serve documentos JSON do banco de dados, usando o Java 8.
Esta postagem do blog pressupõe que você esteja familiarizado com a noções básicas do Vert.x. Aqui está uma breve tabela de conteúdo:
- Início de um novo projeto Vert.x
- Obtenção assíncrona de um bucket
- Desmontando o SDK de forma graciosa
- Vendo-o em ação
- Indo além
- Conclusão
Início de um novo projeto Vert.x
Vamos começar criando um novo projeto baseado em Maven: crie uma pasta raiz para o seu projeto e inicialize uma estrutura de diretório maven (ou use seu arquétipo Maven favorito). Por exemplo, você pode usar o seguinte comando: "mkdir -p cbvertx/src/main/java/com/couchbase/demo/vertx/“.
Agora vamos iniciar o pom.xml na raiz do projeto:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
<!--?xml version="1.0" encoding="UTF-8"?--> 4.0.0 com.couchbase.demo cbvertx 1.0-SNAPSHOT io.vertx vertx-core 3.1.0 io.vertx vertx-rx-java 3.1.0 com.couchbase.client java-client 2.2.2 <!-- this is actually already a transitive dependency of the Java SDK--> io.reactivex rxjava 1.0.15 log4j log4j 1.2.17 maven-compiler-plugin 3.3 1.8 1.8 |
Como você pode ver, usaremos Vert.x versão 3.1.0 e sua extensão para associações no RxJava, SDK Java do Couchbase versão 2.2.2 e RxJava versão 1.0.15…
Esqueleto da partícula
Basearemos nossa CouchbaseVerticle no AbstractVerticle em io.vertx.rxjava.core (da extensão vertx-rx-java). Vamos criar seu esqueleto no projeto:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package com.couchbase.demo.vertx; import java.util.ArrayList; import java.util.List; import com.couchbase.client.java.CouchbaseAsyncCluster; import io.vertx.core.Context; import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.rxjava.core.AbstractVerticle; public class CouchbaseVerticle extends AbstractVerticle { private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseVerticle.class); private CouchbaseAsyncCluster cluster; } |
A fase de inicialização que escreveremos logo após isso mostrará como usar a configuração do Vert.x para determinar, no tempo de execução, de quais nós do cluster do Couchbase faremos o bootstrap. A instanciação do CouchbaseCluster ainda é leve o suficiente para que possa ser feito dessa forma durante o init.
Adicione o seguinte método init ao CouchbaseVerticle:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Override public void init(Vertx vertx, Context context) { super.init(vertx, context); //getting the configuration JSON JsonObject config = context.config(); //getting the bootstrap node, as a JSON array (default to localhost) JsonArray seedNodeArray = config.getJsonArray("couchbase.seedNodes", new JsonArray().add("localhost")); //convert to a List List seedNodes = new ArrayList<>(seedNodeArray.size()); for (Object seedNode : seedNodeArray) { seedNodes.add((String) seedNode); } //use that to bootstrap the Cluster this.cluster = CouchbaseAsyncCluster.create(seedNodes); } |
Obtenção assíncrona de um bucket
O principal ponto de entrada para a API do Couchbase é o Balde para a API de sincronização, ou AsyncBucket para a API assíncrona. Estabelecer a conexão com o bucket ("abri-lo") é muito mais pesado, portanto, deve ser feito de forma assíncrona.
Vamos ver como podemos iniciar nossa partícula abrindo primeiro o bucket que usaremos durante toda a vida útil da partícula. Queremos manter uma referência a ele e usar a função start(Future startFuture) para notificar de forma assíncrona o Vert.x de que o Verticle está pronto:
|
1 2 3 4 5 6 7 8 9 10 11 |
private volatile AsyncBucket bucket; @Override public void start(Future startFuture) throws Exception { cluster.openBucket(config().getString("couchbase.bucketName", "default"), config().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.info("Bucket opened " + openedBucket.name())) .subscribe( openedBucket -> bucket = openedBucket, startFuture::fail, startFuture::complete); } |
Observe que primeiro obtemos o nome do bucket (e a senha associada, se relevante) dinamicamente, a partir da configuração do Vert.x. Abrimos o bucket de forma assíncrona, estabelecendo as conexões e os recursos internos do SDK.
O doOnNext é usado para registrar a abertura do balde.
Em seguida, assinamos nosso Observável e descrever como queremos "consumir" os dados finais:
- Ao receber a referência do bucket, nós a armazenamos em um campo para uso posterior
- Se houver um erro no caminho, falharemos na inicialização do Verticle usando o
Future#failmétodo. - Caso contrário, notificamos o Vert.x de que o Verticle foi iniciado com sucesso usando o comando
Future#completemétodo.
Esse é um bom começo!
Desmontando o SDK de forma graciosa
Quando o Verticle parar, os recursos criados pelo SDK deverão ser descartados adequadamente. O Aglomerado tem um objeto desconectar que faz isso, chamando recursivamente o método próximo em cada Balde que ele abriu (close pode ser usado para descartar um único Bucket).
Além disso, desde 1.0.15 O RxJava tem um método para encerrar seus Threads internos: Agendadores.shutdown. No entanto, isso deve ser invocado somente quando não houver uso subsequente do RxJava no aplicativo, portanto, pode ser uma ideia melhor fazer isso no desligamento do Vert.x...
Mais uma vez, interromperemos o Verticle de forma assíncrona, usando um Futuro para notificar a estrutura sobre a conclusão da parada:
|
1 2 3 4 5 6 7 8 9 |
@Override public void stop(Future stopFuture) throws Exception { cluster.disconnect() .doOnNext(isDisconnectedCleanly -> LOGGER.info("Disconnected Cluster (cleaned threads: " + isDisconnectedCleanly + ")")) .subscribe( isDisconnectedCleanly -> stopFuture.complete(), stopFuture::fail, Schedulers::shutdown); } |
(optamos por encerrar o RxJava após a conclusão da desconexão do SDK)
Observação Você pode ajustar o SDK passando um
CouchbaseEnvironmentapós a criação doAglomerado. Nesse caso, cabe a você também ligar paradesligamentono ambiente ao desativar todo o SDK (ou seja, quando todos os clusters em que você usou o ambiente, geralmente apenas um, forem desativados).Se você não tiver criado um ambiente específico, o SDK criará internamente um ambiente e o encerrará adequadamente, cujo resultado é visto acima no
isDisconnectedCleanlyvariável.
Vendo-o em ação
Vamos criar um principal que incorpora o Vert.x, implementa o Verticle e depois para. Observe que essa é uma implementação bastante ingênua com CountDownLatches, em que você normalmente prefere usar a linha de comando ou o Lançador como uma classe principal.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public static void main(String[] args) throws InterruptedException { Vertx vertx = Vertx.vertx(); final CountDownLatch startLatch = new CountDownLatch(1); vertx.deployVerticle(new CouchbaseVerticle(), event -> { if (event.succeeded()) LOGGER.info("Verticle Deployed - " + event.result()); else LOGGER.error("Verticle deployment error", event.cause()); startLatch.countDown(); }); startLatch.await(); final CountDownLatch stopLatch = new CountDownLatch(1); vertx.close(event -> { if (event.succeeded()) LOGGER.info("Vert.x Stopped - " + event.result()); else LOGGER.error("Vert.x stopping error", event.cause()); stopLatch.countDown(); }); stopLatch.await(); } |
Se você executar isso, verá o seguinte (notou a diferença no formato do registro de data e hora? 2015-12-11 são do SDK, enquanto os 11 de dezembro de 2015 são do Vert.x):
|
1 2 3 4 5 6 7 8 9 10 11 12 |
2015-12-11 16:21:20 INFO Node:135 - Connected to Node localhost 2015-12-11 16:21:20 INFO ConfigurationProvider:263 - Opened bucket default Dec 11, 2015 4:21:20 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Bucket opened default Dec 11, 2015 4:21:20 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Verticle Deployed - caf06dd3-c8d1-4b89-8de0-58f09467b805 2015-12-11 16:21:20 INFO ConfigurationProvider:284 - Closed bucket default 2015-12-11 16:21:20 INFO Node:145 - Disconnected from Node localhost Dec 11, 2015 4:21:22 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Disconnected Cluster (cleaned threads: true) Dec 11, 2015 4:21:22 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Vert.x Stopped - null |
Como verificar o comportamento do erro? Poderíamos simplesmente alterar a senha para uma que esteja errada, apenas para verificar os registros, que seriam semelhantes:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
2015-12-11 16:25:45 WARN Endpoint:283 - [null][KeyValueEndpoint]: Authentication Failure. 2015-12-11 16:25:45 WARN ResponseStatusConverter:129 - Unknown ResponseStatus with Protocol HTTP: 401 2015-12-11 16:25:45 WARN ResponseStatusConverter:129 - Unknown ResponseStatus with Protocol HTTP: 401 Dec 11, 2015 4:25:45 PM com.couchbase.demo.vertx.CouchbaseVerticle SEVERE: Verticle deployment error com.couchbase.client.java.error.InvalidPasswordException: Passwords for bucket "default" do not match. at com.couchbase.client.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:210) at com.couchbase.client.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:200) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:99) at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:48) at rx.observers.Subscribers$5.onError(Subscribers.java:229) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:197) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:170) at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Dec 11, 2015 4:25:45 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Vert.x Stopped - null |
Assim, implantamos (e paramos) com sucesso nosso primeiro Couchbase Verticle!
Parabéns!
/! Não se esqueça de alterar a senha de volta para a correta
Indo além
Vamos tentar fazer um pouco mais com esse Verticle. Que tal tentarmos preparar dados de amostra no Couchbase e servi-los em um endpoint REST gerenciado pelo Vert.x?
Criação de dados de amostra no Couchbase na inicialização
Criaremos dois documentos de amostra no Couchbase durante a inicialização do Verticle, com os usuários Alice e Bob.
É possível armazenar JSON no Couchbase usando dois Documento implementações:
JsonDocumenté o padrão. Ele se baseia em uma representação JSON simples fornecida pelo SDK, oJsonObject.RawJsonDocumenté útil quando você já tem JSON marshalling/unmarshalling em seu aplicativo (ou outra forma de representar JSON, como o próprioJsonObject). Nessa implementação, o que você passa é a representação bruta da cadeia de caracteres JSON.
Aqui estão Alice e Bob, criados usando ambas as alternativas:
|
1 2 |
JsonDocument.create("user1", com.couchbase.client.java.document.json.JsonObject.create() .put("name", "Alice").put("age", 26)) |
e
|
1 |
RawJsonDocument.create("user2", new JsonObject().put("name", "Bob").put("age", 31).encode()) |
Agora, o método start precisa de um pequeno ajuste. Em vez de salvar a referência ao bucket na assinatura, moveremos isso anteriormente em um doOnNext. Depois disso, criaremos os documentos e faremos um Observable com eles usando Observável.just. Isso pode ser encaminhado ao SDK para inserção usando flatMap:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Override public void start(Future startFuture) throws Exception { cluster.openBucket(config().getString("couchbase.bucketName", "default"), config().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.info("Bucket opened " + openedBucket.name())) .doOnNext(openedBucket -> bucket = openedBucket) .flatMap(nowIgnoreBucket -> Observable.just( JsonDocument.create("user1", com.couchbase.client.java.document.json.JsonObject.create() .put("name", "Alice").put("age", 26)), RawJsonDocument.create("user2", new JsonObject().put("name", "Bob").put("age", 31).encode()) )) .flatMap(doc -> bucket.upsert(doc)) .subscribe(Actions.empty(), startFuture::fail, startFuture::complete); } |
O uso de upsert aqui garante que os documentos serão criados, quer a chave já exista no banco de dados ou não.
Fornecimento de dados JSON do Couchbase
Vamos modificar nossa partícula para que ela não pare imediatamente e, em vez disso, gire um servidor HTTP que tentará recuperar um documento json do banco de dados e enviá-lo a um cliente quando a rota usuário/{id} é usado:
Aqui está uma maneira rápida e simples de usar o Vert.x Lançador para iniciar o programa (que não interromperá o núcleo do Vert.x imediatamente). Substitua o conteúdo de nosso principal com o seguinte:
|
1 2 |
String[] vertxArgs = new String[] { "run", "com.couchbase.demo.vertx.CouchbaseVerticle" }; Launcher.main(vertxArgs); |
Observação: Em um aplicativo real,
Lançadornormalmente se tornaria a classe principal do jar e você passaria os argumentos da linha de comando diretamente.
Agora vamos ativar um servidor HTTP na inicialização do Verticle. Insira o seguinte código na seção iniciar logo após o método flatMap(doc -> bucket.upsert(doc)) ligar:
|
1 2 3 4 |
.last() .flatMap(ignore -> vertx.createHttpServer() .requestHandler(this::handle) .listenObservable(8080)) |
Precisamos criar o manuseio para configurar essa rota:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private void handle(HttpServerRequest r) { String[] path = r.path().split("/"); if (path.length == 3 && "users".equals(path[1])) { bucket.get(path[2], RawJsonDocument.class) .switchIfEmpty(Observable.error(new DocumentDoesNotExistException(path[2]))) .subscribe(doc -> r.response() .putHeader("content-type", "application/json") .end(doc.content()), error -> { r.response() .putHeader("content-type", "application/json") .setStatusCode(500).setStatusMessage(error.toString()) .end("{"error": "" + error.toString() + ""}"); }); } } |
Vamos testá-lo: execute o aplicativo e acesse este URL: https://localhost:8080/users/user1. Você verá o JSON da Alice, servido diretamente do Couchbase!
|
1 2 3 4 |
{ "name": "Alice", "age": 26 } |
Para outra chavevocê deverá ver a exceção no formato JSON:
|
1 2 3 |
{ "error": "com.couchbase.client.java.error.DocumentDoesNotExistException: user3" } |
Interromper o Verticle por meio de um ponto de extremidade HTTP
Vamos adicionar rapidamente uma rota que pare a Vert.x, para diversão e lucro :)
|
1 2 3 4 |
//...replacing from the last line in `handle` } else if (r.path().equals("/stop")) { r.response() .end(" |
Fechamento do Couchbase e do Vertx
Observe que, ao executar a partir de uma vertx Starter, isso não eliminará a thread principal
"); vertx.close(); }
Abertura https://localhost:8080/stop acionará a parada de todo o aplicativo Vert.x, destruindo os Verticles implantados.
Observação: Conforme indicado na mensagem, isso não elimina o processo quando executado a partir do IDE.
Conclusão
Nesta postagem do blog, descobrimos como Vert.x e o SDK Java do Couchbase podem trabalhar juntos para criar um aplicativo totalmente assíncrono.
Feliz codificação assíncrona!
Você tem o link do repositório do github onde a implementação acima está disponível?