En este post, voy a mostrar cómo convertir una API basada en callback estilo listener a una reactiva con RxJava 2.
Introducción
La programación reactiva y RxJava se han convertido en temas bastante candentes, especialmente en los últimos dos años. Hace un tiempo probé RxJava en un proyecto Android. Tenía un simple problema de hilos, que podría haber resuelto fácilmente de otras maneras. Sin embargo, como había estado leyendo sobre RxJava, decidí probarlo. Inmediatamente me impresionó lo mucho más sencillo y comprensible que era el código.
A pesar de ese éxito, RxJava tiene fama de ser difícil de aprender. En un proyecto más reciente, quería manejar actualizaciones en vivo de una base de datos. La base de datos (Couchbase Lite), tiene un sistema basado en callbacks para monitorizar los cambios. Yo quería envolver ese callback en una estructura reactiva. (Esto podría haber sido un Observable o un Fluido. En un próximo artículo hablaremos de cómo elegir entre ellos).
Lo primero que descubrí fue que no podía encontrar un buen ejemplo de una versión general de lo que quería. Hay un ejemplo sencillo en el Documentación de RxJavapero tiene algunos inconvenientes que quería evitar. Por ejemplo, en el ejemplo, se asume que el objeto Evento tiene un método para determinar si un evento dado es el último en el flujo. Muchos callbacks en Android no tienen tal método.
Aunque más tarde descubrí una Stack Overflow post que cubre lo básico bastante bien, quería entender más.
No es de extrañar que un análisis completo de su funcionamiento interno pudiera llenar un libro. En este post me limitaré a cubrir el núcleo. Hay código con varios experimentos para ayudar a entender los detalles. Eso es demasiado para un artículo, así que lo dejaremos para otro momento.
Objetivo
Para ser más explícitos, veremos cómo tomar una interfaz de devolución de llamada de oyente, común en la programación basada en eventos, y envolverla en un archivo Observable
.
Es decir, ¿cómo pasamos de
1 2 3 |
público interfaz OnItemListener<T> { void onItem(T artículo); } |
a
1 |
Observable<T> |
Android OnClickListener
es un ejemplo de este tipo de API. OnClickListener
es una interfaz con un método, onClick
. Dispone de un Android Ver
como parámetro. El sistema Android utiliza esto para entregar flujos de eventos como pulsaciones de botones, etc.
Primeros pasos
La fuente de este trabajo se encuentra en GitHub aquí.
En este post sólo veremos una pequeña parte de ese código. Otras partes del código están diseñadas para probar varios experimentos. Esos pueden ser algo para futuros artículos. Por ahora, sólo nos centraremos en el tema central.
Para seguirlo, clona el repositorio. El código está configurado para ser construido por gradle, por lo que puede ejecutarlo desde la línea de comandos o importarlo en su entorno favorito.
Creación de una fuente
El objetivo es convertir una API para escuchar algún tipo de fuente de eventos. Lo primero que necesitamos es un flujo de eventos real con el que probar. Hay algunas fuentes construidas para experimentar, así que vamos a empezar con una clase base.
Listado: BasicSource.java
1 2 3 4 5 6 7 8 9 10 |
// Archivo: src/main/java/com/couchbase/rx/BasicSource.java paquete com.couchbase.rx; público clase BasicSource<T> { protegido volátil OnItemListener<? super T> oyente; público estático interfaz OnItemListener<T> { void onItem(T artículo); } público void setOnItemListener(OnItemListener<? super T> oyente) { este.oyente = oyente; } } |
Define la interfaz y tiene el campo común y el método setter que necesitan todas las implementaciones.
A continuación, deduzca una fuente que imite un flujo ilimitado de eventos de la siguiente manera.
Listado: UnboundSource.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
// Archivo: src/main/java/com/couchbase/rx/UnboundSource.java paquete com.couchbase.rx; importar java.util.función.Proveedor; importar estático com.couchbase.Util.dormir; público clase Fuente no vinculante<T> extiende BasicSource<T> { privado Proveedor<T> proveedor; privado OnItemListener<? super T> actual; público Fuente no vinculante(Proveedor<T> proveedor) { este.proveedor = proveedor; } público void iniciar() { Sistema.fuera.println("Fuente que emite en el hilo" + Hilo.currentThread().getName()); para (;;) { Sistema.fuera.println("Elemento emisor en el hilo" + Hilo.currentThread().getName()); actual = oyente; si (null != actual) actual.onItem(proveedor.consiga()); dormir(100); } } } |
Esta versión genera nuevos elementos utilizando un Proveedor
pasada al constructor. Esto sólo muestra que no hay nada especial acerca de los objetos reales alimentados, ya que el proveedor podría crear cualquier cosa.
Tenemos un método para empezar a crear elementos explícitamente. Aquí usamos un bucle infinito para generarlos indefinidamente.
La asignación de oyente
a actual
soluciona una condición de carrera en la que la eliminación de la suscripción puede producirse entre la comprobación de null y la invocación real del método onItem
devolución de llamada.
Conversión a observable
Genial, ahora tenemos una fuente que imita, digamos, una serie abierta de pulsaciones de botón. A continuación, vamos a crear un Observable
.
Utilizaremos el método recomendado en la documentación de RxJava. Éste utiliza el método Observable.create
en lugar de subclasificar Observable
directamente. (El proyecto incluye también código para hacer esto último, a modo de comparación).
Echa un vistazo al listado.
Listado: Observables.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// Archivo: src/main/java/com/couchbase/rx/Observables.java paquete com.couchbase.rx; importar io.reactivex.Observable; importar io.reactivex.ObservableOnSubscribe; importar com.couchbase.Util.ComputeFunction; público clase Observables { público estático void principal(Cadena[] args) { Fuente no vinculante<Object> fuente = nuevo Fuente no vinculante<>(Objeto::nuevo); ObservableOnSubscribe<Object> manipulador = emisor -> { Sistema.fuera.println("Crear en hilo - " + Hilo.currentThread().getName()); fuente.setOnItemListener(artículo -> { Sistema.fuera.println("Listen on thread -" + Hilo.currentThread().getName()); si (emisor.isDisposed()) devolver; emisor.onNext(artículo); }); emisor.setCancellable(() -> fuente.setOnItemListener(null)); fuente.iniciar(); }; Observable.crear(manipulador) .suscríbase a(ComputeFunction::calcula, Throwable::printStackTrace, () -> Sistema.fuera.println("Hecho"), t -> Sistema.fuera.println(" onSubscribe en el hilo " + Hilo.currentThread().getName())); } } |
En primer lugar, creamos una instancia de nuestro Fuente no vinculante
.
A continuación, creamos una instancia de ObservableOnSubscribe
utilizando una expresión lambda. El método que estamos sobrescribiendo, suscríbase a
tiene un parámetro, un Emisor
objeto.
Esto proporciona la conexión entre la llamada de retorno del oyente de nuestra fuente y un suscriptor, a través de una segunda expresión lambda. Esta segunda expresión sólo comprueba emisor.isDisposed
para asegurarse de que la suscripción sigue activa y, a continuación, empuja un elemento hacia abajo llamando a emisor.onSiguiente
. Esa es la línea clave que esto ha estado construyendo.
Después de haber conectado nuestra llamada de retorno original, queremos proporcionar una forma de detener el flujo. Para ello utilizamos un Cancelable
aquí por simplicidad. La expresión lambda rompe el flujo anulando la llamada de retorno del oyente.
A Desechable
también funcionaría. Este Respuesta de Stack Overflow da una buena idea de la diferencia entre ellos y de cómo elegir cuál utilizar.
Los desechables son la solución de RxJava 2 para darse de baja de un flujo. Este puesto de Kaushik Gopal explica algunos de los razonamientos en torno al uso de productos desechables en general.
Con todo interconectado, encendemos la fuente para empezar a generar eventos.
Instanciación y suscripción
Con nuestro ObservableOnSubscribe
ya podemos crear nuestro Observable
con la llamada a crear
.
En Observable
proporciona una interfaz fluida con bastantes métodos disponibles. Nos suscribimos al observable resultante utilizando un método que descompone las funciones 'onXXX' en piezas individuales.
Salida
Si ejecuta el ejemplo como se muestra, debería obtener algo parecido a la siguiente salida.
1 2 3 4 5 6 7 8 9 10 |
onSubscribe en hilo principal Cree en hilo - principal Fuente que emite en hilo principal Emisión artículo en hilo principal Escuche en hilo - principal Compute: objeto java.lang.Objeto@d8355a8 en hilo principal Emisión artículo en hilo principal Escuche en hilo - principal Compute: objeto java.lang.Objeto@28d25987 en hilo principal ... |
Todo está sucediendo en serie en el hilo principal. No es super interesante, por todo ese esfuerzo. Aún no hemos aprovechado toda la potencia de RxJava. Para aquellos familiarizados con RxJava, sabrán lo fácil que es hacer que el código se ejecute de forma asíncrona. Eso es algo interesante con peculiaridades que no esperaba. De nuevo, algo a explorar en otro post.
Comentarios: Aprender RxJava
Cuando originalmente decidí escribir este post, quería hablar de las cosas que aprendí mientras estudiaba algunos de los aspectos internos de RxJava. Al final, se convirtió en demasiado para un artículo.
Parte de lo que hace que RxJava sea un reto es el gran número de API, pero incluso las más básicas implican una red de interfaces entrelazadas.
Por ejemplo, sólo hay una versión de Observable.create
. Se necesita un ObservableOnSubscribe
instancia como argumento, como hemos visto. Bastante simple.
Pero luego profundizamos un poco. ObservableOnSubscribe
es una interfaz con un solo método, suscríbase a
. De nuevo, simple, pero esto empieza a revelar parte de lo que hace que entender RxJava sea complicado.
Yendo más allá, vemos ObservableOnSubscribe.subscribe
suministra un ObservableEmitter
como argumento. Un ObservableEmitter
amplía un Emisor
y añade algunos métodos para gestionar un Desechable
. Un Emisor
resulta que tiene casi la misma interfaz que un Observable
. Sólo le falta el onSubscribe
método.
La mayoría son definiciones de interfaces. No hemos tocado las implementaciones, y sólo estoy arañando la superficie aquí.
El código es fascinante. Si exploras más, descubrirás cómo los operadores pueden modificar lo que ocurre en toda una cadena de llamadas, cómo RxJava almacena en búfer, cómo algunas operaciones indican cuántos elementos deben pasarse y mucho más.
Estoy deseando seguir explorando y transmitir lo que aprenda.
Posdata
Couchbase es de código abierto y probar gratis.
Empezar con código de ejemplo, consultas de ejemplo, tutoriales y mucho más.
Más recursos en nuestra portal para desarrolladores.
Síguenos en Twitter @CouchbaseDev.
Puede enviar preguntas a nuestro foros.
Participamos activamente en Stack Overflow.
Envíame tus preguntas, comentarios, temas que te gustaría ver, etc. a Twitter. @HodGreeley
Logotipo de ReactiveX utilizado por cortesía del Proyectos ReactiveX en virtud del Licencia Creative Commons Atribución 3.0.