Algunos de ustedes habrán notado que publicamos Couchbase 1.8 a principios de hoyy un nuevo conjunto de clientes inteligentes para varios idiomas. Para mí personalmente esto es un hito, porque libcouchbase es ahora un cliente compatible con el lenguaje C.
Entonces, ¿por qué me preocupo por eso? Bueno, libcouchbase comenzó a partir de mis necesidades de probar fácilmente varios componentes del servidor. Dado que hice la mayor parte de mi desarrollo en los componentes del servidor implementado en C, tenía sentido para mí usar C para mis pruebas.
He recibido algunas preguntas sobre cómo funciona libcouchbase en un contexto multihilo, así que probablemente debería empezar aclarando que: libcouchbase no utiliza cualquier para proteger sus estructuras de datos internas, pero esto no significa que no se pueda usar libcouchbase en un programa multihilo. Todo lo que significa es que usted como usuario cliente debe usar bloqueo para protegerse de acceder a la instancia de libcouchbase desde múltiples hilos al mismo tiempo, o simplemente dejar que cada hilo opere en su propia instancia de libcouchbase. Una forma fácil de resolver esto es tener un "pool" de instancias de libcouchbase a las que cada hilo pop y push su instancia cada vez que necesiten acceder a un servidor Couchbase. El acceso a este pool debería estar protegido con un candado (pero supongo que ya te lo habrás imaginado ;-)
En esta entrada del blog crearé un programa de demostración que puedes usar para subir documentos JSON a un servidor Couchbase. Encontrarás el código fuente completo en https://github.com/trondn/vacuum si desea probar el ejemplo.
La idea de este programa es que "monitorizará" un directorio y subirá todos los archivos que aparezcan allí a un cluster de Couchbase. Estoy bastante seguro de que la mayoría de ustedes empiezan a pensar: "¿cómo hacemos eso de forma portable?". No es una tarea fácil de hacer, así que ni siquiera voy a intentar hacerlo. Intentaré escribirlo en un semiportátil para que no sea tan difícil de implementar en otras plataformas. Eso significa que estoy usando las siguientes limitaciones:
- Estoy usando abrir y readdir para recorrer el directorio. Esto puede reimplementarse fácilmente con FindFirst y BuscarSiguiente en Microsoft Windows.
- Monitorizar el directorio significa que voy a escanear el directorio, luego dormir un número determinado de segundos antes de ejecutar otro escaneo. Sé que algunas plataformas soportan la suscripción de cambios en el sistema de archivos, pero no voy a gastar tiempo en eso (al menos no ahora ;-)).
- Para evitar el bloqueo de archivos o el acceso al archivo mientras otros lo están escribiendo, los clientes deben escribir el archivo en el directorio con un "punto" inicial en el nombre del archivo y, a continuación, renombrarlo cuando hayan terminado. El programa ignora todos los archivos que empiezan por un punto.
Así que pasemos al código. La primera pieza de código que podría ser interesante mirar sería donde creamos la instancia libcouchbase en main():
if (instance == NULL) {
fprintf(stderr, "Fallo al crear instancen couchbase");
exit(EXIT_FAILURE);
}
El fragmento de código anterior crea la instancia libcouchbase. No hay forma de que puedas usar una estructura estática para esto, porque hacerlo haría increíblemente difícil mantener la compatibilidad binaria. Me gusta poder arreglar bugs dentro de la librería y liberar nuevas versiones que puedas usar sin tener que recompilar tu programa, y al esconder las datastructures internas de los clientes hace más fácil asegurar que el cliente no dependa de su tamaño. El primer parámetro de libcouchbase_create es el nombre (y puerto) del puerto REST para el servidor couchbase (por defecto: localhost:8091). El segundo y tercer parámetro son las credenciales que quieres usar para conectarte al puerto REST y obtener la información del pool (por defecto no se autentica). El cuarto parámetro es el bucket al que te gustaría conectarte, y si no especificas un bucket terminarás en el "bucket por defecto". El quinto argumento es un objeto especial que puede querer usar si va a utilizar características "avanzadas" en libcouchbase. La mayoría de los usuarios probablemente usarán los valores por defecto y pasarán NULL aquí.
Lo siguiente que tenemos que hacer es configurar algunos manejadores de callback para poder averiguar lo que ocurre. En el ejemplo sólo vamos a utilizar una operación (para cargar datos en la caché) por lo que necesitaremos configurar un manejador para capturar el resultado de las operaciones de almacenamiento. Desafortunadamente también podemos encontrarnos con problemas, por lo que necesitaremos configurar un manejador de errores (volveremos al trabajo en un rato).
libcouchbase_set_error_callback(instancia, error_callback);
Ahora que hemos creado e inicializado la instancia, necesitamos intentar conectarnos al cluster de Couchbase:
if (ret != LIBCOUCHBASE_SUCCESS) {
fprintf(stderr, "Fallo al conectar: %sn",
libcouchbase_strerror(instancia, ret));
exit(EXIT_FAILURE);
}
Debido al hecho de que libcouchbase es totalmente asíncrona, todo lo que sucedió anteriormente fue que iniciamos la conexión. Eso significa que necesitamos espere para que el servidor se conecte al cluster de Couchbase y se conecte al bucket correcto. Si nuestro programa tuviera que hacer otras cosas ahora sería el momento de hacerlo, pero como no tenemos ninguna otra inicialización que hacer podemos esperar a que se complete:
Una de las características "cool" que tenemos en libcouchbase es que proporciona una interfaz de estadísticas internas, por lo que podemos decirle que recoja información de tiempo de las operaciones con el siguiente snippet:
if ((ret = libcouchbase_enable_timings(instance) != LIBCOUCHBASE_SUCCESS)) {
fprintf(stderr, "Fallo al activar la temporización: %sn",
libcouchbase_strerror(instancia, ret));
}
Nuestro programa está ahora totalmente inicializado, y podemos entrar en el bucle principal que se parece bastante a:
{
process_files();
sleep(nsec);
}
Entonces, ¿cómo procesar_archivos() ¿Qué aspecto tiene? No voy a hacer el ejemplo demasiado grande pegándolo todo, pero el primer trozo de ahí parece:
if (strcmp(de->d_name, ".dump_stats") == 0) {
fprintf(stdout, "Volcando estadísticas:n");
libcouchbase_get_timings(instancia, stdout, timings_callback);
fprintf(stdout, "--n");
remove(de->d_name);<
}
continuar;
}
Como puede ver en el fragmento de código anterior, ignoraremos todos los archivos que empiecen por '.excepto el archivo llamado ".dump_stats". Cada vez que veamos ese fichero volcaremos los tiempos de las estadísticas internas utilizando el comando timings_callback (Volveré sobre ello más adelante).
Lo siguiente que hacemos es intentar leer el archivo en memoria y decodificar su JSON antes de intentar obtener el "_id"para utilizarlo como clave. Si todo esto tiene éxito, tratamos de almacenar los datos en Coucbase con:
ret = libcouchbase_store(instancia, &error, LIBCOUCHBASE_SET,
id->valuestring, strlen(id->valuestring),
ptr, tamaño, 0, 0, 0);
if (ret == LIBCOUCHBASE_SUCCESS) {
libcouchbase_wait(instancia);
} else {
error = 1;
}
La pieza &error aquí es bastante interesante. Es una "cookie" que se pasa a la llamada de retorno, para que pueda saber si he encontrado un problema o no. Verás cómo lo uso cuando hable de la función devolución_almacenamiento abajo.
Esta es básicamente toda la lógica importante del ejemplo. Prometí que volvería a los diferentes callbacks, así que vamos a empezar por ver el callback de error:
libcouchbase_error_t error,
const char *errinfo)
{
/* Ignora los tiempos de espera... */
if (error != LIBCOUCHBASE_ETIMEDOUT) {
fprintf(stderr, "rERROR FATAL: %sn",
libcouchbase_strerror(instancia, error));
if (errinfo && strlen(errinfo) != 0) {
fprintf(stderr, "t "%s "n", errinfo);
}
exit(EXIT_FAILURE);
}
}
Como puede ver en el fragmento anterior, libcouchbase llamará a la función error_callback siempre que se produzca un timeout, pero sólo queremos reintentar la operación. Si nos encontramos con un error real imprimimos un mensaje de error y terminamos el programa.
El siguiente callback que utilizamos es el devolución_almacenamiento. Se llama cuando la operación de almacenamiento se ha completado, por lo que es el lugar adecuado para averiguar si se ha producido un error al almacenar los datos. Nuestro callback se parece a:
const void *cookie,
operación libcouchbase_storage_t,
libcouchbase_error_t err,
const void *key, size_t nkey,
uint64_t cas)
{
int *error = (void*)cookie;
if (err == LIBCOUCHBASE_SUCCESS) {
*error = 0;
} else {
*error = 1;
fprintf(stderr, "Error al almacenar "");
fwrite(clave, 1, nclave, stderr);
fprintf(stderr, "": %sn",
libcouchbase_strerror(instancia, err));
fflush(stderr);
}
}
Como ves, estamos almacenando el resultado de la operación en el entero pasado como cookie. El lector observador puede ver que también podríamos desvincular el archivo y eliminar la memoria desde dentro de la llamada de retorno (si proporcionáramos esa información como la cookie en su lugar ;))
El último callback a cubrir es el callback de temporización que estamos utilizando para volcar las estadísticas de temporización.
static void timings_callback(libcouchbase_t instance, const void *cookie,
libcouchbase_timeunit_t timeunit,
uint32_t mín, uint32_t máx,
uint32_t total, uint32_t maxtotal)
{
char buffer[1024];
int offset = sprintf(buffer, "[%3u - %3u]", min, max);
switch (timeunit) {
case LIBCOUCHBASE_TIMEUNIT_NSEC:
offset += sprintf(buffer + offset, "ns");
romper;
case LIBCOUCHBASE_TIMEUNIT_USEC:
offset += sprintf(buffer + offset, "us");
romper;
case LIBCOUCHBASE_TIMEUNIT_MSEC:
offset += sprintf(buffer + offset, "ms");
romper;
case LIBCOUCHBASE_TIMEUNIT_SEC:
offset += sprintf(buffer + offset, "s");
romper;
por defecto:
;
}
int num = (float)40.0 * (float)total / (float)maxtotal;
offset += sprintf(buffer + offset, " |");
for (int ii = 0; ii < num; ++ii) {
offset += sprintf(buffer + offset, "#");
}
offset += sprintf(buffer + offset, " - %un", total);
fputs(buffer, (FILE*)cookie);
}
Cuando solicitas los tiempos de libcouchbase, informa de todas las métricas de tiempo recogidas llamando a la llamada de retorno de tiempos. Como puedes ver en la API, obtendrás el valor mínimo y máximo del rango, y el número de operaciones realizadas dentro de ese rango. Estas métricas no deben ser consideradas como números exactos, porque dependen de lo que hagas en tu código cliente desde que llamas a la operación hasta que llamas a libcouchbase_wait para que la operación se complete.
Así que vamos a seguir adelante y ejecutar el programa. He rellenado /var/spool/vacuum con una serie de archivos JSON, para que el programa haga algo.
durmiendo 3 segundos antes de reintentar..
Desde otro withdow ejecuto el comando:
Y cuando el temporizador expira en la primera ventana, se imprime:
[ 60 - 69]us |######################################## - 18
[ 70 - 79]us |## - 1
[240 - 249]us |## - 1
—-
durmiendo 3 segundos antes de reintentar..
Espero que este blog haya revelado lo fácil que es usar libcouchbase para comunicarse con un cluster de Couchbase. Tenemos varios clientes para otros lenguajes de programación como PHP y Ruby construidos sobre libcouchbase, ¡así que puedo prometerte que verás más funcionalidades añadidas!