Esta publicação do blog explica o raciocínio e a motivação que nos levaram a escolher o RxJava como um dos componentes integrais do nosso novo Java SDK.
Motivação
Há muitas maneiras de projetar uma API e cada uma delas tem seu próprio conjunto de benefícios (e desvantagens). No processo de criação de nossas novas APIs, uma das principais questões era como expô-las ao usuário.
Uma pergunta que não tivemos que nos fazer foi: deveria ser síncrona ou assíncrona? Acreditamos firmemente que as APIs assíncronas são a única maneira sensata de obter o desempenho e o dimensionamento de que você frequentemente precisa, e também é muito mais fácil passar de assíncrono para síncrono do que o contrário. O SDK estável atual (1.4.3 no momento em que este artigo foi escrito) já faz uso intenso de Futuros de várias maneiras para fornecer respostas assíncronas e isso remonta a 2006/7, quando o spymemcached originalmente introduziu o conceito em sua API.
É sabido que a interface do Java Future é muito limitada em comparação com outras soluções (como os futuros do Scala). Além disso, ela também é um pouco mais complicada de codificar se você precisar criar fluxos de dados assíncronos em que um cálculo depende do outro e você deseja que tudo seja assíncrono. Em versões recentes, adicionamos suporte a ouvintes, o que melhora bastante a situação, mas ainda não é a solução ideal.
Nos últimos anos, surgiram outras bibliotecas e padrões que acompanhamos de perto. Um dos conceitos maduros é conhecido como Reactive Extensions (Extensões reativas), originado da Microsoft e do .NET. Ele se baseia na ideia de que os aplicativos devem ser orientados a eventos e reagir a esses eventos de forma assíncrona. Ele define um conjunto muito rico de operadores sobre o que você pode fazer com os dados (modificar, combinar, filtrar e assim por diante). Recentemente, a Netflix o portou para Java e o apelidou de RxJava (observe que, embora o projeto esteja atualmente no namespace da Netflix, ele será movido para "io.reactivex" mais cedo ou mais tarde). Ele é muito estável e também fornece adaptadores para outras linguagens JVM, como Scala, Groovy e JRuby, o que se encaixa bem em nossos planos de ampliar o suporte também.
O conceito
A ideia principal do Rx gira em torno de Observáveis e seus observadores. Se você ainda não conhece esse conceito, pode pensar no Observable como o primo assíncrono e baseado em push (ou mais formalmente chamado de dual) de um Iterable. Mais especificamente, esta é a relação entre eles:
| Evento | Iterável (pull) | Observável (push) |
|---|---|---|
| recuperar dados | T next() | onNext(T) |
| descobrir erro | lança Exceção | onError(Exception) |
| completo | retornos | onCompleted() |
Toda vez que os dados são enviados para um observável, todos os observáveis que estão inscritos nele recebem os dados em seu método onNext(). Se o observável for concluído eventualmente (o que não precisa ser sempre o caso), o método onCompleted será chamado. Agora, em qualquer parte do processo, se ocorrer um erro, o método onError será chamado e o Observable também será considerado concluído.
Se você gosta de gramática, o contrato tem a seguinte aparência:
Observe especificamente que não há distinção se apenas 1 ou N dados são retornados; isso pode ser normalmente inferido a partir dos métodos que você chama e como eles são documentados. De qualquer forma, isso não altera seu fluxo de programação. Como isso é um pouco abstrato, vamos dar uma olhada em um exemplo concreto. Na classe CouchbaseCluster, há um método chamado openBucket que inicializa todos os recursos necessários e retorna uma instância do Bucket para que você possa trabalhar com ela. Agora você pode imaginar que abrir soquetes, obter uma configuração e assim por diante leva algum tempo, portanto, esse é um candidato perfeito. A API de bloqueio seria semelhante a:
Bucket openBucket(String name, String password);
}
Como podemos torná-lo assíncrono? Precisamos envolvê-lo em um Observable:
Observável
}
Portanto, agora retornamos um observável que, por fim, retornará com uma instância de balde que podemos usar. Vamos adicionar um observador:
@Override
público vazio onCompleted() {
Sistema.fora.println("Observável feito!");
}
@Override
público vazio onError(Lançável e) {
Sistema.erro.println("Algo aconteceu");
e.printStackTrace();
}
@Override
público vazio onNext(Balde) {
Sistema.fora.println("Balde recebido: " + balde);
}
});
Observe que esses métodos são chamados em um thread diferente, portanto, se você deixar o código assim e encerrar o thread principal depois, provavelmente não verá nada. Embora agora você possa escrever todo o restante do seu código no método onNext, essa provavelmente não é a melhor maneira de fazer isso. Como o bucket é algo
quiser abrir antecipadamente, você pode bloqueá-lo e depois prosseguir com o restante do código. Todo observável pode ser convertido em um observável de bloqueio, que se assemelha a um iterável:
Você encontrará muitos métodos para iterar sobre os dados recebidos de forma bloqueada, mas também há métodos abreviados se você esperar apenas um único valor (o que sabemos que é o nosso caso):
O que acontece aqui internamente é que o valor chamado em onNext é armazenado para nós e retornado quando onComplete é chamado. Se onError for chamado, o throwable será lançado diretamente e você poderá capturá-lo.
Unificação de APIs
Agora, o que você já viu mal toca a superfície. A abertura da caçamba poderia muito bem ser feita também com um Futuro
Novamente, vamos examinar um exemplo concreto. O SDK expõe um método get que retorna um documento. Ele tem a seguinte aparência:
Observável
}
Mas também oferecemos suporte a consultas (visualizações, N1QL) que podem retornar mais de um resultado (ou até mesmo nenhum). Graças ao contrato Observable, podemos criar uma API como essa:
Observável
}
Está vendo? O contrato diz implicitamente que "se você passar uma consulta, receberá N ViewResults de volta", já que você sabe como um Observable precisa se comportar. E para ter uma visão mais ampla, aqui estão ainda mais métodos que intuitivamente se comportam da maneira que você espera que eles se comportem.
<D se estende Documento> Observable<D insert(D document);
<D extends Document> Observável<D> upsert(Documento D);
<D se estende Documento>> Observável<D> substituir(Documento D);
Observável<ViewResult> consulta(Consulta ViewQuery);
Observável<Resultado da consulta> consulta(Consulta de consulta);
Observável<Resultado da consulta> consulta(Cordas consulta);
Observável<Booleano> descarga();
}
Async meu fluxo de dados!
Até agora, vimos o que os Observables podem fazer por nós e como eles nos ajudam a fornecer APIs coesas, simples e assíncronas. Mas os observáveis realmente brilham com seus aspectos de composição. Você pode fazer muitas coisas com os observáveis, e não podemos abordar todas elas neste post. O RxJava tem uma documentação de referência muito boa que pode ser encontrada aqui, portanto, dê uma olhada. Ela usa diagramas de mármore para mostrar como os fluxos de dados assíncronos funcionam, algo que também queremos fornecer como parte de nossa documentação no futuro.
Vamos considerar um exemplo prático: Você quer carregar um documento do couchbase (que é um objeto JSON completo com detalhes do usuário), mas só quer fazer algo com o primeiro nome mais adiante no código. Podemos usar a função map para mapear do JsonDocument para a string firstname:
.obter("user::1")
.mapa(novo Func1<JsonDocument, String>() {
@Override
público Cordas chamada(JsonDocument jsonDocument) {
retorno jsonDocument.conteúdo().getString("firstname");
}
})
.assinar(novo Ação1<Cordas>() {
@Override
público vazio chamada(Cordas primeiro nome) {
Sistema.fora.println(primeiro nome);
}
});
Há dois aspectos importantes aqui: Todo método encadeado aqui também é executado de forma assíncrona, portanto, não está bloqueando o thread de origem. Quando a chamada get contra o couchbase retorna, mapeamos o primeiro nome do documento JSON e, por fim, o imprimimos. Você não precisa fornecer um Observer completo; se estiver interessado apenas no valor onNext, basta implementá-lo (como mostrado aqui). Consulte os métodos sobrecarregados para obter mais exemplos.
Observe também que estou deliberadamente mostrando classes anônimas no estilo do Java 6/7 aqui. Também oferecemos suporte ao Java 8, mas falaremos mais sobre isso depois. Agora, como poderíamos estender essa cadeia se quisermos imprimir o nome apenas se ele começar com um "a"?
.obter("user::1")
.mapa(novo Func1<JsonDocument, String>() {
@Override
público Cordas chamada(JsonDocument jsonDocument) {
retorno jsonDocument.conteúdo().getString("firstname");
}
})
.filtro(novo Func1<Cordas, booleano>() {
@Override
público Booleano chamada(Cordas s) {
retorno s.startsWith("a");
}
})
.assinar(novo Ação1<Cordas>() {
@Override
público vazio chamada(Cordas primeiro nome) {
Sistema.fora.println(primeiro nome);
}
});
É claro que uma simples instrução if seria suficiente, mas você pode imaginar que seu código para filtrar poderia ser muito mais complexo (e provavelmente chamando outra coisa também). Como exemplo final de transformação de observáveis, vamos fazer algo que ocorre com muita frequência: você carrega um documento, modifica seu conteúdo e o salva de volta no couchbase:
.obter("user::1")
.mapa(novo Func1<JsonDocument, JsonDocument>() {
@Override
público Chamada de JsonDocument(JsonDocument original) {
original.conteúdo().colocar("firstname", "SomethingElse");
retorno original;
}
})
.flatMap(novo Func1<JsonDocument, Observável<JsonDocument>>() {
@Override
público Observável<JsonDocument> chamada(JsonDocument modificado) {
retorno balde.substituir(modificado);
}
}).assinar();
O FlatMap se comporta de forma muito parecida com o map, a diferença é que ele retorna um observável, portanto, é perfeitamente adequado para mapear operações assíncronas.
Um outro aspecto é que, com os Observables, o tratamento sofisticado de erros está bem na ponta dos dedos. Vamos implementar um exemplo que aplica um tempo limite de 2 segundos e, se a chamada não retornar, devolve outra coisa:
Aqui, um documento fictício é retornado (fingindo alguns padrões razoáveis para o nosso exemplo) se a chamada get não retornar em 2 segundos. Esse é apenas um exemplo simples, mas você pode fazer muitas coisas com exceções, como tentar novamente, ramificar para outros observáveis e assim por diante. Consulte a documentação oficial (e a documentação do Rx) para saber como usá-las corretamente.
Espere, há mais
Há muito mais recursos disponíveis, como combinar (mesclar, compactar, concatenar) diferentes observáveis, agrupar os resultados em intervalos de tempo, fazer efeitos colaterais e outros. Depois que você supera o (pequeno) obstáculo inicial de entender o conceito, ele se torna muito natural e prometemos que você não vai querer voltar atrás (se estivermos errados, no entanto, você sempre poderá bloquear um Observable ou convertê-lo em um future).
O RxJava também tem um suporte decente ao Java 8, portanto, se você tiver a sorte de já poder usá-lo em seus projetos, poderá simplificar um exemplo acima para isso:
.get("user::1")
.map(jsonDocument -> jsonDocument.content().getString("firstname"))
.filter(s -> s.startsWith("a"))
.subscribe(System.out::println);
Legal, não é? O RxJava também fornece diferentes adaptadores de linguagem sobre ele, no momento em que escrevo Scala, Clojure, Groovy, JRuby e Kotlin. Eles podem ser usados para fornecer uma integração ainda mais específica da linguagem e também estamos planejando usar alguns deles para aprimorar o suporte ao couchbase para cada uma dessas linguagens, conforme a demanda. Nossa maior prioridade, além do SDK Java, é definitivamente o Scala, portanto, fique atento a alguns anúncios em breve!
Esperamos que você esteja tão animado quanto nós e aguardamos seus comentários e perguntas pelos canais habituais!
Estou realmente ansioso por um anúncio relacionado ao Scala. Acabei de dar uma olhada no https://reactivecouchbase.org/ mas atualmente ele depende do Java SDK 1.4. Vale a pena aguardar seu anúncio antes de iniciar a portabilidade de um aplicativo que atualmente usa o mongodb e o ReactiveMongo?
[...] de código assíncrono. Alguns editores de bases de dados já perceberam isso: o driver do CouchBase já utiliza Observable em seu driver assíncrono. O MongoDB, por sua vez, publicou [...]