Esta entrada de blog explica nuestro razonamiento y motivación para elegir RxJava como uno de los componentes integrales de nuestro nuevo Java SDK.
Motivación
Hay muchas formas de diseñar una API y cada una tiene sus ventajas (y sus inconvenientes). En el proceso de diseño de nuestras flamantes API, una de las principales cuestiones era cómo exponerlas al usuario.
Una pregunta que no tuvimos que hacernos fue: ¿debería ser síncrona o asíncrona? Creemos firmemente que las API asíncronas son la única forma sensata de obtener el rendimiento y la escalabilidad que a menudo se necesitan, y además es mucho más fácil pasar de asíncrono a síncrono que al revés. El SDK estable actual (1.4.3 en el momento de escribir esto) ya hace un uso intensivo de Futures de varias formas para proporcionar respuestas asíncronas, y esto se remonta a 2006/7, cuando spymemcached introdujo originalmente el concepto en su API.
Es bien sabido que la interfaz Java Future es muy limitada en comparación con otras soluciones (como los futuros de Scala). Además, también es un poco más complicado de programar si necesitas construir flujos de datos asíncronos en los que un cálculo depende del otro y quieres que todo sea asíncrono. En versiones recientes hemos añadido soporte para listeners, que mejoran bastante la situación pero siguen sin ser una solución ideal.
En los últimos años han surgido otras bibliotecas y patrones que hemos seguido de cerca. Uno de los conceptos maduros es el conocido como Extensiones Reactivas, originado en Microsoft y .NET. Se basa en la idea de que las aplicaciones deben estar orientadas a eventos y reaccionar a ellos de forma asíncrona. Define un conjunto muy rico de operadores sobre lo que se puede hacer con los datos (modificarlos, combinarlos, filtrarlos, etc.). Recientemente, Netflix lo ha portado a Java y lo ha bautizado como RxJava (hay que tener en cuenta que, aunque el proyecto se encuentra actualmente en el espacio de nombres de Netflix, se trasladará a "io.reactivex" más pronto que tarde). Es muy estable y también proporciona adaptadores para otros lenguajes JVM como Scala, Groovy y JRuby que juega bien con nuestros planes para ampliar el apoyo también.
El concepto
La idea principal de Rx gira en torno a los Observables y sus observadores. Si no te has topado con este concepto, puedes pensar en el Observable como el primo asíncrono y basado en push (o más formalmente llamado dual) de un Iterable. Más específicamente, esta es su relación:
Evento | Iterable (tirar) | Observable (push) |
---|---|---|
recuperar datos | T siguiente() | onNext(T) |
descubrir error | lanza una excepción | onError(Excepción) |
completa | devuelve | onCompleted() |
Cada vez que se introducen datos en un observable, todos los observables suscritos a él reciben los datos en su método onNext(). Si el observable se completa eventualmente (que no tiene por qué ser siempre el caso). se llama al método onCompleted. Ahora bien, en cualquier punto del proceso, si se produce un error se llama al método onError y el observable también se considera completado.
Si te gusta la gramática, el contrato tiene este aspecto:
Específicamente note que no hay distinción si solo 1 o N datos son retornados, esto puede ser normalmente inferido de los métodos que usted llama y como está documentado. De todas formas no cambia tu flujo de programación. Como esto es un poco abstracto, veamos un ejemplo concreto. En la clase CouchbaseCluster, hay un método llamado openBucket que inicializa todos los recursos necesarios y luego devuelve una instancia de Bucket para que trabajes con ella. Ahora puedes imaginar que abrir sockets, coger una configuración y demás lleva algo de tiempo, así que este es un candidato perfecto. La API de bloqueo se vería así:
Bucket openBucket(String name, String password);
}
¿Cómo podemos hacerlo asíncrono? Tenemos que envolverlo en un Observable:
Observable
}
Así que ahora devolvemos un observable que eventualmente retornará con una instancia de cubo que podemos usar. Vamos a añadir un observador:
@Override
público void onCompleted() {
Sistema.fuera.println("¡Observable hecho!");
}
@Override
público void onError(Throwable e) {
Sistema.err.println("Algo pasó");
e.printStackTrace();
}
@Override
público void onNext(Cubo cubo) {
Sistema.fuera.println("Cubo recibido: " + cubo);
}
});
Ten en cuenta que estos métodos son llamados en un hilo diferente, así que si dejas el código así y sales de tu hilo principal después, probablemente no verás nada. Aunque ahora podrías escribir todo el resto de tu código en el método onNext, probablemente no sea la mejor forma de hacerlo. Dado que el cubo es algo
que quieras abrir por adelantado, podrías bloquearlo y luego proceder con el resto de tu código. Cada Observable se puede convertir en un observable de bloqueo, que se siente como un Iterable:
Encontrarás muchos métodos para iterar sobre los thata recibidos de forma bloqueante, pero también hay métodos abreviados si sólo esperas un único valor (que sabemos que es nuestro caso):
Lo que ocurre aquí internamente es que el valor llamado en onNext se almacena para nosotros y se devuelve una vez que se llama a onComplete. si se llama a onError, el throwable se lanza directamente y se puede atrapar.
API unificadoras
Lo que has visto apenas roza la superficie. La apertura del cubo también podría realizarse con un Future De nuevo, veamos un ejemplo concreto. El SDK expone un método get que devuelve un documento. Su aspecto es el siguiente: Pero también soportamos consultas (Views, N1QL) que potencialmente devuelven más de un resultado (o incluso ninguno). Gracias al contrato Observable, podemos construir una API como esta: ¿Lo ves? El contrato dice implícitamente "si pasas una consulta, obtendrás N ViewResults de vuelta", ya que sabes cómo tiene que comportarse un Observable. Y para tener una visión más amplia, aquí hay aún más métodos que intuitivamente se comportan de la manera que esperas que lo hagan. Observable<VerResultado> consulta(Consulta ViewQuery); Observable<Booleano> descarga(); Hasta ahora hemos visto lo que los Observables pueden hacer por nosotros y cómo nos ayudan a proporcionar APIs cohesivas, simples y a la vez asíncronas. Pero los Observables realmente brillan por sus aspectos de componibilidad. Se pueden hacer muchas cosas con Observables, y no podemos cubrirlas todas en este post. RxJava tiene una muy buena documentación de referencia que se puede encontrar aquí, así que échale un vistazo. Utiliza diagramas de mármol para mostrar cómo funcionan los flujos de datos asíncronos, algo que también queremos incluir en nuestra documentación en el futuro. Consideremos un ejemplo práctico: Quieres cargar un documento desde couchbase (que es un objeto JSON completo con detalles del usuario), pero sólo quieres hacer algo con el firstname más abajo en tu código. Podemos utilizar la función map para mapear desde el JsonDocument a la cadena firstname: Hay dos aspectos importantes aquí: Cada método que se encadena aquí también se ejecuta de forma asíncrona, por lo que no está bloqueando el hilo de origen. Una vez que la llamada get contra couchbase retorna, mapeamos el firstname desde el documento JSON y finalmente lo imprimimos. No necesitas proporcionar un Observer completo, si sólo estás interesado en el valor onNext puedes simplemente implementarlo (como se muestra aquí). Ver los métodos sobrecargados para más ejemplos. También tenga en cuenta que estoy mostrando deliberadamente Java 6/7 estilo clases anónimas aquí. También soportamos Java 8, pero hablaremos de ello más adelante. Ahora, ¿cómo podríamos extender esta cadena si sólo queremos imprimir el nombre si empieza por "a"? Por supuesto, una simple sentencia if sería suficiente, pero puedes imaginar que tu código para filtrar podría ser mucho más complejo (y probablemente llamando a algo más también). Como ejemplo final sobre la transformación de observables, vamos a hacer algo que ocurre muy a menudo: cargas un documento, modificas su contenido y luego lo guardas de nuevo en couchbase: FlatMap se comporta de forma muy parecida a map, la diferencia es que devuelve un observable, por lo que es perfectamente adecuado para map sobre operaciones asíncronas. Otro aspecto es que con Observables, el manejo sofisticado de errores está al alcance de tu mano. Implementemos un ejemplo que aplique un tiempo de espera de 2 segundos y si la llamada no retorna devuelva otra cosa en su lugar: Aquí se devuelve un documento ficticio (pretendiendo algunos valores predeterminados razonables para nuestro ejemplo) si la llamada get no vuelve en 2 segundos. Esto es sólo un ejemplo simple, pero se pueden hacer muchas cosas con las excepciones, como reintentar, ramificarse a otros observables y así sucesivamente. Por favor, consulte la documentación oficial (y la documentación de Rx) para saber cómo utilizarlas correctamente. Hay muchas más funciones disponibles, como combinar (merging, zipping, concat) diferentes observables, agrupar los resultados en intervalos de tiempo, hacer efectos secundarios y otras. Una vez superado el (pequeño) obstáculo inicial de entender el concepto, se siente muy natural y le prometemos que no querrá volver atrás (si nos equivocamos, sin embargo, siempre se puede bloquear en un Observable o convertirlo en un futuro). RxJava también tiene soporte decente para Java 8, así que si eres un afortunado que ya puede usarlo en sus proyectos puedes simplificar un ejemplo de arriba a esto: Genial, ¿verdad? RxJava también proporciona diferentes adaptadores de lenguaje en la parte superior, en el momento de escribir Scala, Clojure, Groovy, JRuby y Kotlin. Se pueden utilizar para proporcionar una integración aún más específica del lenguaje y también estamos planeando utilizar algunos de ellos para mejorar el soporte de couchbase para cada uno de esos lenguajes a medida que veamos la demanda. Nuestra prioridad más importante, aparte del SDK de Java, es sin duda Scala, ¡así que estate atento a algunos anuncios más pronto que tarde! Esperamos que ahora esté tan entusiasmado como nosotros y esperamos sus comentarios y preguntas a través de los canales habituales.
Observable
}
Observable
}
<D extiende Documento> Observable<D> insert(D document);
<D extends Document> Observable<D> upsert(Documento D);
<D extiende Documento>> Observable<D> sustituir(Documento D);
Observable<Resultado de la consulta> consulta(Consulta);
Observable<Resultado de la consulta> consulta(Cadena consulta);
}¡Async mi flujo de datos!
.consiga("usuario::1")
.mapa(nuevo Func1<JsonDocument, Cadena>() {
@Override
público Cadena llame a(JsonDocument jsonDocument) {
devolver jsonDocument.contenido().getString("nombre");
}
})
.suscríbase a(nuevo Acción1<Cadena>() {
@Override
público void llame a(Cadena nombre) {
Sistema.fuera.println(nombre);
}
});
.consiga("usuario::1")
.mapa(nuevo Func1<JsonDocument, Cadena>() {
@Override
público Cadena llame a(JsonDocument jsonDocument) {
devolver jsonDocument.contenido().getString("nombre");
}
})
.filtro(nuevo Func1<CadenaBooleano>() {
@Override
público Booleano llame a(Cadena s) {
devolver s.empiezaCon("a");
}
})
.suscríbase a(nuevo Acción1<Cadena>() {
@Override
público void llame a(Cadena nombre) {
Sistema.fuera.println(nombre);
}
});
.consiga("usuario::1")
.mapa(nuevo Func1<JsonDocument, JsonDocument>() {
@Override
público Llamada a JsonDocument(JsonDocument original) {
original.contenido().poner("nombre", "Otra cosa");
devolver original;
}
})
.flatMap(nuevo Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
público Observable<JsonDocument> llame a(JsonDocument modificado) {
devolver cubo.sustituir(modificado);
}
}).suscríbase a();Espera, hay más
.get("usuario::1")
.map(jsonDocument -> jsonDocument.content().getString("firstname"))
.filter(s -> s.startsWith("a"))
.subscribe(System.out::println);
Espero con impaciencia un anuncio relacionado con Scala. Acabo de echar un vistazo a http://reactivecouchbase.org/ pero actualmente depende del Java SDK 1.4. Merece la pena esperar a su anuncio antes de empezar a portar una aplicación que actualmente utiliza mongodb y ReactiveMongo?
[...] de código asíncrono. Algunos editores de bases de datos lo han incluido: el controlador de CouchBase ya utiliza Observables en su controlador asíncrono. MongoDB, por su parte, ha publicado [...]