Esta entrada de blog explica nuestro razonamiento y motivación para elegir RxJava como uno de los componentes integrales de nuestro nuevo Java SDK.

Motivación

Hay muchas formas de diseñar una API y cada una tiene sus ventajas (y sus inconvenientes). En el proceso de diseño de nuestras flamantes API, una de las principales cuestiones era cómo exponerlas al usuario. 

Una pregunta que no tuvimos que hacernos fue: ¿debería ser síncrona o asíncrona? Creemos firmemente que las API asíncronas son la única forma sensata de obtener el rendimiento y la escalabilidad que a menudo se necesitan, y además es mucho más fácil pasar de asíncrono a síncrono que al revés. El SDK estable actual (1.4.3 en el momento de escribir esto) ya hace un uso intensivo de Futures de varias formas para proporcionar respuestas asíncronas, y esto se remonta a 2006/7, cuando spymemcached introdujo originalmente el concepto en su API.

Es bien sabido que la interfaz Java Future es muy limitada en comparación con otras soluciones (como los futuros de Scala). Además, también es un poco más complicado de programar si necesitas construir flujos de datos asíncronos en los que un cálculo depende del otro y quieres que todo sea asíncrono. En versiones recientes hemos añadido soporte para listeners, que mejoran bastante la situación pero siguen sin ser una solución ideal.

En los últimos años han surgido otras bibliotecas y patrones que hemos seguido de cerca. Uno de los conceptos maduros es el conocido como Extensiones Reactivas, originado en Microsoft y .NET. Se basa en la idea de que las aplicaciones deben estar orientadas a eventos y reaccionar a ellos de forma asíncrona. Define un conjunto muy rico de operadores sobre lo que se puede hacer con los datos (modificarlos, combinarlos, filtrarlos, etc.). Recientemente, Netflix lo ha portado a Java y lo ha bautizado como RxJava (hay que tener en cuenta que, aunque el proyecto se encuentra actualmente en el espacio de nombres de Netflix, se trasladará a "io.reactivex" más pronto que tarde). Es muy estable y también proporciona adaptadores para otros lenguajes JVM como Scala, Groovy y JRuby que juega bien con nuestros planes para ampliar el apoyo también.

El concepto

La idea principal de Rx gira en torno a los Observables y sus observadores. Si no te has topado con este concepto, puedes pensar en el Observable como el primo asíncrono y basado en push (o más formalmente llamado dual) de un Iterable. Más específicamente, esta es su relación:

Evento Iterable (tirar) Observable (push)
recuperar datos T siguiente() onNext(T)
descubrir error lanza una excepción onError(Excepción)
completa devuelve onCompleted()

Cada vez que se introducen datos en un observable, todos los observables suscritos a él reciben los datos en su método onNext(). Si el observable se completa eventualmente (que no tiene por qué ser siempre el caso). se llama al método onCompleted. Ahora bien, en cualquier punto del proceso, si se produce un error se llama al método onError y el observable también se considera completado.

Si te gusta la gramática, el contrato tiene este aspecto: 

OnNext* (OnCompleted | OnError)?

Específicamente note que no hay distinción si solo 1 o N datos son retornados, esto puede ser normalmente inferido de los métodos que usted llama y como está documentado. De todas formas no cambia tu flujo de programación. Como esto es un poco abstracto, veamos un ejemplo concreto. En la clase CouchbaseCluster, hay un método llamado openBucket que inicializa todos los recursos necesarios y luego devuelve una instancia de Bucket para que trabajes con ella. Ahora puedes imaginar que abrir sockets, coger una configuración y demás lleva algo de tiempo, así que este es un candidato perfecto. La API de bloqueo se vería así:

interfaz Cluster {
        Bucket openBucket(String name, String password);
}

¿Cómo podemos hacerlo asíncrono? Tenemos que envolverlo en un Observable:

interfaz Cluster {
        Observable openBucket(String nombre, String contraseña);
}

Así que ahora devolvemos un observable que eventualmente retornará con una instancia de cubo que podemos usar. Vamos a añadir un observador:

racimo.openBucket().suscríbase a(nuevo Observador<Cubo>() {
    @Override
    público void onCompleted() {
        Sistema.fuera.println("¡Observable hecho!");
    }

    @Override
    público void onError(Throwable e) {
        Sistema.err.println("Algo pasó");
        e.printStackTrace();
    }

    @Override
    público void onNext(Cubo cubo) {
        Sistema.fuera.println("Cubo recibido: " + cubo);
    }
});

Ten en cuenta que estos métodos son llamados en un hilo diferente, así que si dejas el código así y sales de tu hilo principal después, probablemente no verás nada. Aunque ahora podrías escribir todo el resto de tu código en el método onNext, probablemente no sea la mejor forma de hacerlo. Dado que el cubo es algo 
que quieras abrir por adelantado, podrías bloquearlo y luego proceder con el resto de tu código. Cada Observable se puede convertir en un observable de bloqueo, que se siente como un Iterable:

BlockingObservable blockingObservable = cluster.openBucket().toBlocking();

Encontrarás muchos métodos para iterar sobre los thata recibidos de forma bloqueante, pero también hay métodos abreviados si sólo esperas un único valor (que sabemos que es nuestro caso):

Bucket bucket = cluster.openBucket().toBlocking().single();

Lo que ocurre aquí internamente es que el valor llamado en onNext se almacena para nosotros y se devuelve una vez que se llama a onComplete. si se llama a onError, el throwable se lanza directamente y se puede atrapar.

API unificadoras

Lo que has visto apenas roza la superficie. La apertura del cubo también podría realizarse con un Future solo. Donde los Observables brillan es cuando se necesita trabajar con más de un resultado devuelto. En este caso, un Future ya no encaja y Future

De nuevo, veamos un ejemplo concreto. El SDK expone un método get que devuelve un documento. Su aspecto es el siguiente:

interfaz Cubo {
        Observable get(String id);
}

Pero también soportamos consultas (Views, N1QL) que potencialmente devuelven más de un resultado (o incluso ninguno). Gracias al contrato Observable, podemos construir una API como esta:

interfaz Cubo {
        Observable query(ViewQuery query);
}

¿Lo ves? El contrato dice implícitamente "si pasas una consulta, obtendrás N ViewResults de vuelta", ya que sabes cómo tiene que comportarse un Observable. Y para tener una visión más amplia, aquí hay aún más métodos que intuitivamente se comportan de la manera que esperas que lo hagan.

interfaz Cubo {
    <D extiende Documento> Observable<D> insert(D document);
<D extends Document> Observable<D> upsert(Documento D);
    <D extiende Documento> Observable<D> sustituir(Documento D);

    Observable<VerResultado> consulta(Consulta ViewQuery);
    Observable<Resultado de la consulta> consulta(Consulta);
    Observable<Resultado de la consulta> consulta(Cadena consulta);

    Observable<Booleano> descarga();
}

¡Async mi flujo de datos!

Hasta ahora hemos visto lo que los Observables pueden hacer por nosotros y cómo nos ayudan a proporcionar APIs cohesivas, simples y a la vez asíncronas. Pero los Observables realmente brillan por sus aspectos de componibilidad. Se pueden hacer muchas cosas con Observables, y no podemos cubrirlas todas en este post. RxJava tiene una muy buena documentación de referencia que se puede encontrar aquí, así que échale un vistazo. Utiliza diagramas de mármol para mostrar cómo funcionan los flujos de datos asíncronos, algo que también queremos incluir en nuestra documentación en el futuro.

Consideremos un ejemplo práctico: Quieres cargar un documento desde couchbase (que es un objeto JSON completo con detalles del usuario), pero sólo quieres hacer algo con el firstname más abajo en tu código. Podemos utilizar la función map para mapear desde el JsonDocument a la cadena firstname:

cubo
    .consiga("usuario::1")
    .mapa(nuevo Func1<JsonDocument, Cadena>() {
        @Override
        público Cadena llame a(JsonDocument jsonDocument) {
            devolver jsonDocument.contenido().getString("nombre");
        }
    })
    .suscríbase a(nuevo Acción1<Cadena>() {
        @Override
        público void llame a(Cadena nombre) {
            Sistema.fuera.println(nombre);
        }
    });

Hay dos aspectos importantes aquí: Cada método que se encadena aquí también se ejecuta de forma asíncrona, por lo que no está bloqueando el hilo de origen. Una vez que la llamada get contra couchbase retorna, mapeamos el firstname desde el documento JSON y finalmente lo imprimimos. No necesitas proporcionar un Observer completo, si sólo estás interesado en el valor onNext puedes simplemente implementarlo (como se muestra aquí). Ver los métodos sobrecargados para más ejemplos.

También tenga en cuenta que estoy mostrando deliberadamente Java 6/7 estilo clases anónimas aquí. También soportamos Java 8, pero hablaremos de ello más adelante. Ahora, ¿cómo podríamos extender esta cadena si sólo queremos imprimir el nombre si empieza por "a"?

cubo
    .consiga("usuario::1")
    .mapa(nuevo Func1<JsonDocument, Cadena>() {
        @Override
        público Cadena llame a(JsonDocument jsonDocument) {
            devolver jsonDocument.contenido().getString("nombre");
        }
    })
    .filtro(nuevo Func1<CadenaBooleano>() {
        @Override
        público Booleano llame a(Cadena s) {
            devolver s.empiezaCon("a");
        }
    })
    .suscríbase a(nuevo Acción1<Cadena>() {
        @Override
        público void llame a(Cadena nombre) {
            Sistema.fuera.println(nombre);
        }
    });

Por supuesto, una simple sentencia if sería suficiente, pero puedes imaginar que tu código para filtrar podría ser mucho más complejo (y probablemente llamando a algo más también). Como ejemplo final sobre la transformación de observables, vamos a hacer algo que ocurre muy a menudo: cargas un documento, modificas su contenido y luego lo guardas de nuevo en couchbase:

cubo
    .consiga("usuario::1")
    .mapa(nuevo Func1<JsonDocument, JsonDocument>() {
        @Override
        público Llamada a JsonDocument(JsonDocument original) {
            original.contenido().poner("nombre", "Otra cosa");
            devolver original;
        }
    })
    .flatMap(nuevo Func1<JsonDocument, Observable<JsonDocument>>() {
        @Override
        público Observable<JsonDocument> llame a(JsonDocument modificado) {
            devolver cubo.sustituir(modificado);
        }
    }).suscríbase a();

FlatMap se comporta de forma muy parecida a map, la diferencia es que devuelve un observable, por lo que es perfectamente adecuado para map sobre operaciones asíncronas.

Otro aspecto es que con Observables, el manejo sofisticado de errores está al alcance de tu mano. Implementemos un ejemplo que aplique un tiempo de espera de 2 segundos y si la llamada no retorna devuelva otra cosa en su lugar:

cubo
    .consiga("usuario::1")
    .tiempo de espera(2, TimeUnit.SEGUNDOS)
    .onErrorReturn(nuevo Func1<ThrowableJsonDocument>() {
        @Override
        público Llamada a JsonDocument(Throwable arrojadizo) {
            devolver JsonDocument.crear("usuario::anónimo"JsonObject.vacío().poner("nombre", "john-doe"));
        }
    });

Aquí se devuelve un documento ficticio (pretendiendo algunos valores predeterminados razonables para nuestro ejemplo) si la llamada get no vuelve en 2 segundos. Esto es sólo un ejemplo simple, pero se pueden hacer muchas cosas con las excepciones, como reintentar, ramificarse a otros observables y así sucesivamente. Por favor, consulte la documentación oficial (y la documentación de Rx) para saber cómo utilizarlas correctamente.

Espera, hay más

Hay muchas más funciones disponibles, como combinar (merging, zipping, concat) diferentes observables, agrupar los resultados en intervalos de tiempo, hacer efectos secundarios y otras. Una vez superado el (pequeño) obstáculo inicial de entender el concepto, se siente muy natural y le prometemos que no querrá volver atrás (si nos equivocamos, sin embargo, siempre se puede bloquear en un Observable o convertirlo en un futuro).

RxJava también tiene soporte decente para Java 8, así que si eres un afortunado que ya puede usarlo en sus proyectos puedes simplificar un ejemplo de arriba a esto:

cubo
    .get("usuario::1")
    .map(jsonDocument -> jsonDocument.content().getString("firstname"))
    .filter(s -> s.startsWith("a"))
    .subscribe(System.out::println);

Genial, ¿verdad? RxJava también proporciona diferentes adaptadores de lenguaje en la parte superior, en el momento de escribir Scala, Clojure, Groovy, JRuby y Kotlin. Se pueden utilizar para proporcionar una integración aún más específica del lenguaje y también estamos planeando utilizar algunos de ellos para mejorar el soporte de couchbase para cada uno de esos lenguajes a medida que veamos la demanda. Nuestra prioridad más importante, aparte del SDK de Java, es sin duda Scala, ¡así que estate atento a algunos anuncios más pronto que tarde!

Esperamos que ahora esté tan entusiasmado como nosotros y esperamos sus comentarios y preguntas a través de los canales habituales.

Autor

Publicado por Michael Nitschinger

Michael Nitschinger trabaja como Ingeniero de Software Principal en Couchbase. Es el arquitecto y mantenedor del SDK Java de Couchbase, uno de los primeros controladores de bases de datos completamente reactivos en la JVM. También es autor y mantiene el conector Spark de Couchbase. Michael participa activamente en la comunidad de código abierto, contribuyendo a otros proyectos como RxJava y Netty.

2 Comentarios

  1. Espero con impaciencia un anuncio relacionado con Scala. Acabo de echar un vistazo a http://reactivecouchbase.org/ pero actualmente depende del Java SDK 1.4. Merece la pena esperar a su anuncio antes de empezar a portar una aplicación que actualmente utiliza mongodb y ReactiveMongo?

  2. [...] de código asíncrono. Algunos editores de bases de datos lo han incluido: el controlador de CouchBase ya utiliza Observables en su controlador asíncrono. MongoDB, por su parte, ha publicado [...]

Dejar una respuesta