Nesta publicação, mostrarei como converter uma API de estilo de ouvinte baseada em callback em uma API reativa com o RxJava 2.
Introdução
A programação reativa e o RxJava se tornaram tópicos muito importantes, especialmente nos últimos dois anos. Tive uma amostra do RxJava ao experimentá-lo em um projeto Android há pouco tempo. Eu tinha um problema simples de threading, que poderia ter resolvido facilmente de outras maneiras. No entanto, como já havia lido sobre o RxJava, decidi experimentá-lo. Fiquei imediatamente impressionado com o fato de o código ser muito mais simples e compreensível.
Apesar desse sucesso, o RxJava tem uma certa reputação de ser difícil de aprender. Em um projeto mais recente, eu queria lidar com atualizações ao vivo de um banco de dados. O banco de dados (Couchbase Lite), tem um sistema baseado em retorno de chamada para monitorar alterações. Eu queria envolver esse retorno de chamada em uma estrutura reativa. (Isso poderia ter sido um Observável ou um Fluxível. Aguarde um artigo posterior que fale sobre a escolha entre eles).
A primeira coisa que descobri foi que não consegui encontrar um bom exemplo de uma versão geral do que eu queria. Há um exemplo simples na seção Documentação do RxJavamas ele tem algumas desvantagens que eu queria evitar. Por exemplo, no exemplo, presume-se que o objeto Event tenha um método para determinar se um determinado evento é o último no fluxo. Muitos retornos de chamada no Android não têm esse método.
Embora eu tenha descoberto mais tarde um Postagem no Stack Overflow que abrange muito bem os conceitos básicos, eu queria entender mais.
Uma discussão completa sobre os aspectos internos poderia, não surpreendentemente, preencher um livro. Nesta postagem, abordarei apenas o núcleo. Há um código com vários experimentos para ajudar a entender os detalhes. Isso é demais para um único artigo, portanto, deixaremos isso para outra ocasião.
Objetivo
Para sermos mais explícitos, veremos como usar uma interface de retorno de chamada de ouvinte, comum na programação orientada por eventos, e transformá-la em um Observável
.
Ou seja, como podemos passar de
1 2 3 |
público interface OnItemListener<T> { vazio onItem(T item); } |
para
1 |
Observável<T> |
Android OnClickListener
é um exemplo desse tipo de API. OnClickListener
é uma interface com um método, onClick
. Ele tem um sistema operacional Android Ver
como parâmetro. O sistema Android usa isso para fornecer fluxos de eventos, como pressionamento de botões e assim por diante.
Primeiros passos
A fonte para este trabalho pode ser encontrada no GitHub aqui.
Nesta postagem, examinaremos apenas uma pequena parte desse código. Outras partes do código foram criadas para testar vários experimentos. Isso pode ser assunto para artigos futuros. Para isso, vamos nos concentrar apenas no tópico central.
Para acompanhar o processo, clone o repositório. O código está configurado para ser criado pelo gradle, portanto, você pode executá-lo a partir da linha de comando ou importá-lo para seu ambiente favorito.
Criando uma fonte
O objetivo é converter uma API para ouvir algum tipo de fonte de eventos. A primeira coisa que precisamos é de um fluxo de eventos real para testar. Há algumas fontes criadas para experimentar, portanto, começaremos com uma classe base.
Listagem: BasicSource.java
1 2 3 4 5 6 7 8 9 10 |
// Arquivo: src/main/java/com/couchbase/rx/BasicSource.java pacote com.couchbase.rx; público classe Fonte básica<T> { protegida volátil OnItemListener<? super T> ouvinte; público estático interface OnItemListener<T> { vazio onItem(T item); } público vazio setOnItemListener(OnItemListener<? super T> ouvinte) { este.ouvinte = ouvinte; } } |
Isso define a interface e tem o campo comum e o método setter necessários para todas as implementações.
Em seguida, obtenha uma fonte que imite um fluxo ilimitado de eventos da seguinte forma.
Listagem: UnboundSource.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
// Arquivo: src/main/java/com/couchbase/rx/UnboundSource.java pacote com.couchbase.rx; importação java.util.função.Fornecedor; importação estático com.couchbase.Util.dormir; público classe UnboundSource<T> se estende Fonte básica<T> { privado Fornecedor<T> fornecedor; privado OnItemListener<? super T> atual; público UnboundSource(Fornecedor<T> fornecedor) { este.fornecedor = fornecedor; } público vazio iniciar() { Sistema.fora.println("Fonte emitindo na linha" + Tópico.currentThread().getName()); para (;;) { Sistema.fora.println("Emitindo item na linha" + Tópico.currentThread().getName()); atual = ouvinte; se (nulo != atual) atual.onItem(fornecedor.obter()); dormir(100); } } } |
Essa versão gera novos itens usando um Fornecedor
passada para o construtor. Isso apenas mostra que não há nada de especial sobre os objetos reais fornecidos, já que o fornecedor pode criar qualquer coisa.
Temos um método para iniciar explicitamente a criação de itens. Aqui, usamos um loop infinito para gerá-los indefinidamente.
A atribuição de ouvinte
para atual
contorna uma condição de corrida em que o descarte da assinatura pode ocorrer entre a verificação de nulo e a invocação real do onItem
retorno de chamada.
Conversão para um observável
Ótimo, agora temos uma fonte que imita, por exemplo, uma série aberta de cliques em botões. Em seguida, vamos criar um Observável
.
Usaremos o método recomendado na documentação do RxJava. Ele usa a variável Observável.create
em vez de subclasse Observável
diretamente. (O projeto inclui código para fazer isso também, para fins de comparação).
Dê uma olhada na listagem.
Listagem: Observables.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// Arquivo: src/main/java/com/couchbase/rx/Observables.java pacote com.couchbase.rx; importação io.reativox.Observável; importação io.reativox.ObservableOnSubscribe; importação com.couchbase.Util.ComputeFunction; público classe Observáveis { público estático vazio principal(Cordas[] argumentos) { UnboundSource<Object> fonte = novo UnboundSource<>(Objeto::novo); ObservableOnSubscribe<Object> manipulador = emissor -> { Sistema.fora.println("Criar na linha - " + Tópico.currentThread().getName()); fonte.setOnItemListener(item -> { Sistema.fora.println("Ouça na linha - " + Tópico.currentThread().getName()); se (emissor.isDisposed()) retorno; emissor.onNext(item); }); emissor.setCancellable(() -> fonte.setOnItemListener(nulo)); fonte.iniciar(); }; Observável.criar(manipulador) .assinar(ComputeFunction::computar, Lançável::printStackTrace, () -> Sistema.fora.println("Feito"), t -> Sistema.fora.println("onSubscribe na thread" + Tópico.currentThread().getName())); } } |
Primeiro, criamos uma instância do nosso UnboundSource
.
Em seguida, criamos uma instância de ObservableOnSubscribe
usando uma expressão lambda. O método que estamos substituindo, assinar
tem um parâmetro, um Emissor
objeto.
Isso fornece a conexão entre o retorno de chamada do ouvinte da nossa fonte e um assinante, por meio de uma segunda expressão lambda. Essa segunda expressão apenas verifica emissor.isDisposed
para se certificar de que a assinatura ainda está ativa e, em seguida, empurra um item para baixo chamando emissor.onNext
. Essa é a linha principal para a qual tudo está sendo construído.
Depois de conectar nosso retorno de chamada original, queremos oferecer uma maneira de interromper o fluxo. Usamos um Cancelável
aqui para simplificar. A expressão lambda interrompe o fluxo anulando a chamada de retorno do ouvinte.
A Descartável
também funcionaria. Isso Resposta do Stack Overflow dá uma boa ideia da diferença entre eles e de como escolher qual usar.
Os descartáveis são a solução do RxJava 2 para cancelar a assinatura de um fluxo. Esta postagem por Kaushik Gopal explica alguns dos raciocínios sobre o uso de descartáveis em geral.
Com tudo interconectado, acionamos a fonte para começar a gerar eventos.
Instanciação e assinatura
Com nosso ObservableOnSubscribe
em mãos, agora podemos criar nossa instância Observável
com a chamada para criar
.
O Observável
fornece uma interface fluente com vários métodos disponíveis. Assinamos o observável resultante usando um método que divide as funções "onXXX" em partes individuais.
Saída
Se você executar o exemplo conforme mostrado, deverá obter algo parecido com o seguinte resultado.
1 2 3 4 5 6 7 8 9 10 |
onSubscribe em linha principal Criar em linha - principal Fonte emissão em linha principal Emissão item em linha principal Ouça em linha - principal Calcular: objeto java.lang.Objeto@d8355a8 em linha principal Emissão item em linha principal Ouça em linha - principal Calcular: objeto java.lang.Objeto@28d25987 em linha principal ... |
Tudo está acontecendo em série no tópico principal. Não é muito interessante, apesar de todo esse esforço. Ainda não aproveitamos todo o potencial do RxJava. Para quem conhece o RxJava, sabe como é fácil fazer com que o código seja executado de forma assíncrona. Isso é algo interessante, com peculiaridades que eu não esperava. Mais uma vez, algo a ser explorado em outro post.
Comentários: Aprendendo RxJava
Quando decidi escrever esta postagem, eu queria falar sobre as coisas que aprendi ao estudar alguns aspectos internos do RxJava. No final, ficou muito grande para um artigo.
Parte do que torna o RxJava desafiador é o grande número de APIs, mas mesmo as básicas envolvem uma rede de interfaces entrelaçadas.
Por exemplo, há apenas uma versão do Observável.create
. É necessário um ObservableOnSubscribe
como um argumento, como vimos. Bastante simples.
Mas depois nos aprofundamos um pouco. ObservableOnSubscribe
é uma interface com apenas um método, assinar
. Mais uma vez, simples, mas isso começa a revelar parte do que torna complicado entender o RxJava.
Além disso, vemos ObservableOnSubscribe.subscribe
fornece um ObservávelEmissor
como um argumento. Um ObservávelEmissor
estende um Emissor
e adiciona alguns métodos para gerenciar um Descartável
. E Emissor
tem quase a mesma interface que um Observável
. Só lhe falta a onSubscribe
método.
Há principalmente definições de interface. Ainda não tocamos nas implementações, e estou apenas arranhando a superfície aqui.
O código é fascinante. Explore mais e você verá como os operadores podem modificar o que está acontecendo em toda uma cadeia de chamadas, como o RxJava faz o buffering, como algumas operações sinalizam quantos itens devem ser transmitidos e muito mais.
Estou ansioso para explorar mais e passar adiante o que aprender.
Pós-escrito
O Couchbase é de código aberto e grátis para experimentar.
Comece a usar com código de amostra, consultas de exemplo, tutoriais e muito mais.
Encontre mais recursos em nosso portal do desenvolvedor.
Siga-nos no Twitter @CouchbaseDev.
Você pode postar perguntas em nosso fóruns.
Participamos ativamente de Estouro de pilha.
Entre em contato comigo pelo Twitter com perguntas, comentários, tópicos que você gostaria de ver etc. @HodGreeley
Logotipo do ReactiveX usado como cortesia da Projetos ReactiveX nos termos do Licença Creative Commons Attribution 3.0.