Sem categoria

Por que o Couchbase escolheu o RxJava para o novo Java SDK

Esta publicação do blog explica o raciocínio e a motivação que nos levaram a escolher o RxJava como um dos componentes integrais do nosso novo Java SDK.

Motivação

Há muitas maneiras de projetar uma API e cada uma delas tem seu próprio conjunto de benefícios (e desvantagens). No processo de criação de nossas novas APIs, uma das principais questões era como expô-las ao usuário. 

Uma pergunta que não tivemos que nos fazer foi: deveria ser síncrona ou assíncrona? Acreditamos firmemente que as APIs assíncronas são a única maneira sensata de obter o desempenho e o dimensionamento de que você frequentemente precisa, e também é muito mais fácil passar de assíncrono para síncrono do que o contrário. O SDK estável atual (1.4.3 no momento em que este artigo foi escrito) já faz uso intenso de Futuros de várias maneiras para fornecer respostas assíncronas e isso remonta a 2006/7, quando o spymemcached originalmente introduziu o conceito em sua API.

É sabido que a interface do Java Future é muito limitada em comparação com outras soluções (como os futuros do Scala). Além disso, ela também é um pouco mais complicada de codificar se você precisar criar fluxos de dados assíncronos em que um cálculo depende do outro e você deseja que tudo seja assíncrono. Em versões recentes, adicionamos suporte a ouvintes, o que melhora bastante a situação, mas ainda não é a solução ideal.

Nos últimos anos, surgiram outras bibliotecas e padrões que acompanhamos de perto. Um dos conceitos maduros é conhecido como Reactive Extensions (Extensões reativas), originado da Microsoft e do .NET. Ele se baseia na ideia de que os aplicativos devem ser orientados a eventos e reagir a esses eventos de forma assíncrona. Ele define um conjunto muito rico de operadores sobre o que você pode fazer com os dados (modificar, combinar, filtrar e assim por diante). Recentemente, a Netflix o portou para Java e o apelidou de RxJava (observe que, embora o projeto esteja atualmente no namespace da Netflix, ele será movido para "io.reactivex" mais cedo ou mais tarde). Ele é muito estável e também fornece adaptadores para outras linguagens JVM, como Scala, Groovy e JRuby, o que se encaixa bem em nossos planos de ampliar o suporte também.

O conceito

A ideia principal do Rx gira em torno de Observáveis e seus observadores. Se você ainda não conhece esse conceito, pode pensar no Observable como o primo assíncrono e baseado em push (ou mais formalmente chamado de dual) de um Iterable. Mais especificamente, esta é a relação entre eles:

Evento Iterável (pull) Observável (push)
recuperar dados T next() onNext(T)
descobrir erro lança Exceção onError(Exception)
completo retornos onCompleted()

Toda vez que os dados são enviados para um observável, todos os observáveis que estão inscritos nele recebem os dados em seu método onNext(). Se o observável for concluído eventualmente (o que não precisa ser sempre o caso), o método onCompleted será chamado. Agora, em qualquer parte do processo, se ocorrer um erro, o método onError será chamado e o Observable também será considerado concluído.

Se você gosta de gramática, o contrato tem a seguinte aparência: 

OnNext* (OnCompleted | OnError)?

Observe especificamente que não há distinção se apenas 1 ou N dados são retornados; isso pode ser normalmente inferido a partir dos métodos que você chama e como eles são documentados. De qualquer forma, isso não altera seu fluxo de programação. Como isso é um pouco abstrato, vamos dar uma olhada em um exemplo concreto. Na classe CouchbaseCluster, há um método chamado openBucket que inicializa todos os recursos necessários e retorna uma instância do Bucket para que você possa trabalhar com ela. Agora você pode imaginar que abrir soquetes, obter uma configuração e assim por diante leva algum tempo, portanto, esse é um candidato perfeito. A API de bloqueio seria semelhante a:

interface Cluster {
        Bucket openBucket(String name, String password);
}

Como podemos torná-lo assíncrono? Precisamos envolvê-lo em um Observable:

interface Cluster {
        Observável openBucket(String name, String password);
}

Portanto, agora retornamos um observável que, por fim, retornará com uma instância de balde que podemos usar. Vamos adicionar um observador:

agrupamento.openBucket().assinar(novo Observador<Balde>() {
    @Override
    público vazio onCompleted() {
        Sistema.fora.println("Observável feito!");
    }

    @Override
    público vazio onError(Lançável e) {
        Sistema.erro.println("Algo aconteceu");
        e.printStackTrace();
    }

    @Override
    público vazio onNext(Balde) {
        Sistema.fora.println("Balde recebido: " + balde);
    }
});

Observe que esses métodos são chamados em um thread diferente, portanto, se você deixar o código assim e encerrar o thread principal depois, provavelmente não verá nada. Embora agora você possa escrever todo o restante do seu código no método onNext, essa provavelmente não é a melhor maneira de fazer isso. Como o bucket é algo 
quiser abrir antecipadamente, você pode bloqueá-lo e depois prosseguir com o restante do código. Todo observável pode ser convertido em um observável de bloqueio, que se assemelha a um iterável:

BloqueioObservável blockingObservable = cluster.openBucket().toBlocking();

Você encontrará muitos métodos para iterar sobre os dados recebidos de forma bloqueada, mas também há métodos abreviados se você esperar apenas um único valor (o que sabemos que é o nosso caso):

Bucket bucket = cluster.openBucket().toBlocking().single();

O que acontece aqui internamente é que o valor chamado em onNext é armazenado para nós e retornado quando onComplete é chamado. Se onError for chamado, o throwable será lançado diretamente e você poderá capturá-lo.

Unificação de APIs

Agora, o que você já viu mal toca a superfície. A abertura da caçamba poderia muito bem ser feita também com um Futuro sozinho. Os observáveis se destacam quando você precisa trabalhar com mais de um resultado retornado. Nesse caso, um Future não se encaixa mais no perfil e o Future<Coleção<T>> ou algo semelhante não tem o mesmo contrato. Como os observáveis implicam que mais de um T pode ser retornado, as APIs podem ter a mesma aparência, mesmo que às vezes um e às vezes mais de um Ts seja retornado.

Novamente, vamos examinar um exemplo concreto. O SDK expõe um método get que retorna um documento. Ele tem a seguinte aparência:

interface Bucket {
        Observável get(String id);
}

Mas também oferecemos suporte a consultas (visualizações, N1QL) que podem retornar mais de um resultado (ou até mesmo nenhum). Graças ao contrato Observable, podemos criar uma API como essa:

interface Bucket {
        Observável query(consulta ViewQuery);
}

Está vendo? O contrato diz implicitamente que "se você passar uma consulta, receberá N ViewResults de volta", já que você sabe como um Observable precisa se comportar. E para ter uma visão mais ampla, aqui estão ainda mais métodos que intuitivamente se comportam da maneira que você espera que eles se comportem.

interface Balde {
    <D se estende Documento> Observable<D insert(D document);
<D extends Document> Observável<D> upsert(Documento D);
    <D se estende Documento> Observável<D> substituir(Documento D);

    Observável<ViewResult> consulta(Consulta ViewQuery);
    Observável<Resultado da consulta> consulta(Consulta de consulta);
    Observável<Resultado da consulta> consulta(Cordas consulta);

    Observável<Booleano> descarga();
}

Async meu fluxo de dados!

Até agora, vimos o que os Observables podem fazer por nós e como eles nos ajudam a fornecer APIs coesas, simples e assíncronas. Mas os observáveis realmente brilham com seus aspectos de composição. Você pode fazer muitas coisas com os observáveis, e não podemos abordar todas elas neste post. O RxJava tem uma documentação de referência muito boa que pode ser encontrada aqui, portanto, dê uma olhada. Ela usa diagramas de mármore para mostrar como os fluxos de dados assíncronos funcionam, algo que também queremos fornecer como parte de nossa documentação no futuro.

Vamos considerar um exemplo prático: Você quer carregar um documento do couchbase (que é um objeto JSON completo com detalhes do usuário), mas só quer fazer algo com o primeiro nome mais adiante no código. Podemos usar a função map para mapear do JsonDocument para a string firstname:

balde
    .obter("user::1")
    .mapa(novo Func1<JsonDocument, String>() {
        @Override
        público Cordas chamada(JsonDocument jsonDocument) {
            retorno jsonDocument.conteúdo().getString("firstname");
        }
    })
    .assinar(novo Ação1<Cordas>() {
        @Override
        público vazio chamada(Cordas primeiro nome) {
            Sistema.fora.println(primeiro nome);
        }
    });

Há dois aspectos importantes aqui: Todo método encadeado aqui também é executado de forma assíncrona, portanto, não está bloqueando o thread de origem. Quando a chamada get contra o couchbase retorna, mapeamos o primeiro nome do documento JSON e, por fim, o imprimimos. Você não precisa fornecer um Observer completo; se estiver interessado apenas no valor onNext, basta implementá-lo (como mostrado aqui). Consulte os métodos sobrecarregados para obter mais exemplos.

Observe também que estou deliberadamente mostrando classes anônimas no estilo do Java 6/7 aqui. Também oferecemos suporte ao Java 8, mas falaremos mais sobre isso depois. Agora, como poderíamos estender essa cadeia se quisermos imprimir o nome apenas se ele começar com um "a"?

balde
    .obter("user::1")
    .mapa(novo Func1<JsonDocument, String>() {
        @Override
        público Cordas chamada(JsonDocument jsonDocument) {
            retorno jsonDocument.conteúdo().getString("firstname");
        }
    })
    .filtro(novo Func1<Cordas, booleano>() {
        @Override
        público Booleano chamada(Cordas s) {
            retorno s.startsWith("a");
        }
    })
    .assinar(novo Ação1<Cordas>() {
        @Override
        público vazio chamada(Cordas primeiro nome) {
            Sistema.fora.println(primeiro nome);
        }
    });

É claro que uma simples instrução if seria suficiente, mas você pode imaginar que seu código para filtrar poderia ser muito mais complexo (e provavelmente chamando outra coisa também). Como exemplo final de transformação de observáveis, vamos fazer algo que ocorre com muita frequência: você carrega um documento, modifica seu conteúdo e o salva de volta no couchbase:

balde
    .obter("user::1")
    .mapa(novo Func1<JsonDocument, JsonDocument>() {
        @Override
        público Chamada de JsonDocument(JsonDocument original) {
            original.conteúdo().colocar("firstname", "SomethingElse");
            retorno original;
        }
    })
    .flatMap(novo Func1<JsonDocument, Observável<JsonDocument>>() {
        @Override
        público Observável<JsonDocument> chamada(JsonDocument modificado) {
            retorno balde.substituir(modificado);
        }
    }).assinar();

O FlatMap se comporta de forma muito parecida com o map, a diferença é que ele retorna um observável, portanto, é perfeitamente adequado para mapear operações assíncronas.

Um outro aspecto é que, com os Observables, o tratamento sofisticado de erros está bem na ponta dos dedos. Vamos implementar um exemplo que aplica um tempo limite de 2 segundos e, se a chamada não retornar, devolve outra coisa:

balde
    .obter("user::1")
    .tempo limite(2, TimeUnit.SEGUNDOS)
    .onErrorReturn(novo Func1<Lançável, JsonDocument>() {
        @Override
        público Chamada de JsonDocument(Lançável descartável) {
            retorno JsonDocument.criar("user::anonymous", JsonObject.vazio().colocar("firstname", "john-doe"));
        }
    });

Aqui, um documento fictício é retornado (fingindo alguns padrões razoáveis para o nosso exemplo) se a chamada get não retornar em 2 segundos. Esse é apenas um exemplo simples, mas você pode fazer muitas coisas com exceções, como tentar novamente, ramificar para outros observáveis e assim por diante. Consulte a documentação oficial (e a documentação do Rx) para saber como usá-las corretamente.

Espere, há mais

Há muito mais recursos disponíveis, como combinar (mesclar, compactar, concatenar) diferentes observáveis, agrupar os resultados em intervalos de tempo, fazer efeitos colaterais e outros. Depois que você supera o (pequeno) obstáculo inicial de entender o conceito, ele se torna muito natural e prometemos que você não vai querer voltar atrás (se estivermos errados, no entanto, você sempre poderá bloquear um Observable ou convertê-lo em um future).

O RxJava também tem um suporte decente ao Java 8, portanto, se você tiver a sorte de já poder usá-lo em seus projetos, poderá simplificar um exemplo acima para isso:

balde
    .get("user::1")
    .map(jsonDocument -> jsonDocument.content().getString("firstname"))
    .filter(s -> s.startsWith("a"))
    .subscribe(System.out::println);

Legal, não é? O RxJava também fornece diferentes adaptadores de linguagem sobre ele, no momento em que escrevo Scala, Clojure, Groovy, JRuby e Kotlin. Eles podem ser usados para fornecer uma integração ainda mais específica da linguagem e também estamos planejando usar alguns deles para aprimorar o suporte ao couchbase para cada uma dessas linguagens, conforme a demanda. Nossa maior prioridade, além do SDK Java, é definitivamente o Scala, portanto, fique atento a alguns anúncios em breve!

Esperamos que você esteja tão animado quanto nós e aguardamos seus comentários e perguntas pelos canais habituais!

Compartilhe este artigo
Receba atualizações do blog do Couchbase em sua caixa de entrada
Esse campo é obrigatório.

Autor

Postado por Michael Nitschinger, engenheiro de software principal, Couchbase

Michael Nitschinger trabalha como engenheiro de software principal na Couchbase. Ele é o arquiteto e mantenedor do Couchbase Java SDK, um dos primeiros drivers de banco de dados totalmente reativos na JVM. Ele também é o autor e mantenedor do Couchbase Spark Connector. Michael participa ativamente da comunidade de código aberto e contribui para vários outros projetos, como RxJava e Netty.

2 Comentários

  1. Alexander Jarvis setembro 6, 2014 em 10:04 am

    Estou realmente ansioso por um anúncio relacionado ao Scala. Acabei de dar uma olhada no https://reactivecouchbase.org/ mas atualmente ele depende do Java SDK 1.4. Vale a pena aguardar seu anúncio antes de iniciar a portabilidade de um aplicativo que atualmente usa o mongodb e o ReactiveMongo?

  2. [...] de código assíncrono. Alguns editores de bases de dados já perceberam isso: o driver do CouchBase já utiliza Observable em seu driver assíncrono. O MongoDB, por sua vez, publicou [...]

Deixe um comentário

Pronto para começar a usar o Couchbase Capella?

Iniciar a construção

Confira nosso portal do desenvolvedor para explorar o NoSQL, procurar recursos e começar a usar os tutoriais.

Use o Capella gratuitamente

Comece a trabalhar com o Couchbase em apenas alguns cliques. O Capella DBaaS é a maneira mais fácil e rápida de começar.

Entre em contato

Deseja saber mais sobre as ofertas do Couchbase? Deixe-nos ajudar.