Alberto Marchetti es desarrollador full-stack y autor de "RenderScript: computación paralela en Android, de forma sencilla." Vive siempre al límite, lanzándose constantemente al descubrimiento de lenguas y tecnologías modernas.
Tareas cronometradas utilizando Couchbase y Go
En este post voy a mostrar cómo puedes explotar el sistema de indexación de Couchbase para crear un sistema distribuido de tareas cronometradas. El código de ejemplo para el proyecto, junto con sus instrucciones de ejecución, se puede encontrar en https://github.com/cmaster11/cb-blog-timed-tasks.
Descargo de responsabilidad: Debido a la complejidad del tema, en esta página sólo se publican extractos de código relevantes.
El concepto
Intentemos definir los requisitos de un sistema de este tipo:
- La principal característica de un sistema de tareas temporizadas es poder especificar cuándo se ejecutará una determinada tarea en el tiempo. Esto se puede conseguir utilizando un campo ExecuteAt, que contendrá el tiempo de ejecución deseado (Hora Unixen milisegundos).
- Un requisito del software moderno es que debe soportar un entorno multinodo, lo que significa que debe ser un sistema distribuido. Debemos asegurarnos de que varios trabajadores NO procesen las mismas tareas. Aquí podemos usar una buena característica de Couchbase, cierre pesimistaque permitirá a un trabajador obtener un documento y bloquearlo para que ningún otro trabajador pueda procesarlo.
La siguiente es una posible estructura para representar nuestra tarea:
1 2 3 4 5 6 7 8 9 |
tipo Tarea struct { Id cadena // El tiempo de ejecución de la tarea deseado EjecutarEn int64 // Contenido específico de la tarea Contenido cadena } |
Características de Couchbase
En primer lugar, he aquí una visión general de las características de Couchbase que vamos a utilizar:
META()
Cada documento de un bucket de Couchbase tiene un documento META()asociado, que contiene información específica de la entidad del documento, como:
- id - la clave del documento dentro del cubo.
- cas - un número int64, usado por Couchbase para prevenir condiciones de carrera durante la edición de documentos.
- expiración - cuando un documento está destinado a expirar, o 0 si nunca expirará.
Sugerencia: Estos campos (por ejemplo, META().cas) puede indexarse (a partir de Couchbase 5.0).
CAS (Comprobar y fijar)
Cuando se obtiene un documento, también se devuelve su valor CAS, y las llamadas posteriores para modificar el documento pueden especificar este valor para asegurarse de que van a editar la versión deseada del documento.
Por ejemplo:
- El cliente A obtiene un documento y su valor CAS actual es 1234.
- El cliente B edita el documento, lo que altera el valor CAS a 5678.
- Si A intenta editar el documento sin proporcionar el valor CAS, la edición se realizará correctamente, pero los cambios realizados por B se perderán.
- Si A intenta editar el documento proporcionando el valor CAS (1234), se devolverá un error porque el actual (5678) es diferente. El cliente A tendrá que recuperar el documento de nuevo y volver a ejecutar el proceso.
El valor CAS es una herramienta extremadamente útil para asegurarnos de que no estamos reemplazando o alterando una versión incorrecta/nueva de un documento, perdiendo sus cambios.
Cierre pesimista
Couchbase nos permite "bloquear" un documento, para que sólo pueda ser leído y escrito por un cliente a la vez, utilizando gocb.GetAndLock Función Go SDK.
1 2 3 4 5 6 |
// Bloquear el documento lockTime := 10 // segundos bloqueadoCAS, err := cubo.GetAndLock(documentKey, lockTime, &outStruct) // Desbloquearlo _, err = cubo.Desbloquear(documentKey, bloqueadoCAS) |
Cuando un documento está bloqueado, cualquier otra solicitud para bloquearlo/mutarlo/desbloquearlo arrojará un error (sigue siendo posible simplemente obtener el documento), a menos que se utilice el valor CAS correcto.
Nota: El tiempo máximo de bloqueo de un documento es de 15 segundos, y el uso de un valor lockTime de 0 hará que se establezca el tiempo máximo. Esto crea una limitación sobre cuánto tiempo puede ejecutarse una tarea antes de ser marcada automáticamente como disponible (por tiempo de espera de bloqueo).
Pista: Mientras un documento está bloqueado, su valor CAS devuelto es -1.
Indexación y consulta
Cabe destacar que las dos sugerencias juntas nos indican que podemos indexar un campo (META().cas), que se convierte en -1 cuando un documento está bloqueado. También significa que podemos consultar documentos basándonos en esta condición.
La consulta
Intentemos definir una consulta que se ajuste a los requisitos:
- Queremos obtener un id de tarea, que puede ser utilizado más tarde para obtener y bloquear el documento: SELECT Id.
- La tarea no debe estar ya bloqueada: WHERE META().cas -1.
- La tarea debe ejecutarse ahora: WHERE ExecuteAt <= NOW_MILLIS() (AHORA_MILLIS devuelve el tiempo Unix actual en milisegundos).
- Necesitamos obtener la tarea más cercana en el tiempo, por lo que queremos ordenar las tareas por su tiempo de ejecución: ORDER BY ExecuteAt ASC.
- Digamos por ahora (!!!) que un trabajador querrá que sólo se procese una tarea a la vez: LIMIT 1.
El resultado debería ser similar a esta consulta:
1 2 3 4 5 6 |
SELECCIONE `Id` DESDE `tareas_temporizadas` // Nuestro cubo DONDE META().`cas` <> -1 Y `EjecutarEn` <= AHORA_MILLIS() PEDIR POR `EjecutarEn` ASC LÍMITE 1 |
Su ejecución devolverá un array similar a:
1 2 3 |
[{ "Id": "task_id_goes_here" }] |
El índice
Ahora podemos planificar un índice específico para la consulta, optimizado para la ejecución de la consulta en la que acabamos de pensar. Los índices específicos de consulta son imprescindibles para mejorar el rendimiento de las consultas en bases de datos NoSQL.
- La consulta está comprobando que un documento no está bloqueado actualmente:
1DONDE META().cas <> -1. - Además, está pidiendo directamente que el tiempo de ejecución esté en el pasado. Entonces necesitamos indexar el campo ExecuteAt.
La consulta del índice podría ser entonces la siguiente:
1 2 3 4 |
CREAR ÍNDICE `idx_timed_task` EN `tareas_temporizadas` (`EjecutarEn` ASC) DONDE META().`cas` <> -1 |
Optimización de la consulta
Ahora podemos optimizar aún más la consulta:
- Podemos indicar a la consulta que utilice nuestro índice proporcionando una variable pista a la misma: USE INDEX (idx_timed_task USING GSI).
- Podemos pedir a Couchbase que espere a que el índice esté actualizado (normalmente la indexación es un proceso asíncrono) antes de ejecutar la consulta, de forma que nuestros resultados contengan con seguridad tareas desbloqueadas, proporcionando una directiva coherencia requisito a nivel del SDK: query.Consistency(gocb.RequestPlus).
El flujo
Un posible flujo para el bucle de trabajador consumidor de tarea temporizada es:
- Consulta de un identificador de tarea disponible.
- Obtener y bloquear la tarea.
- Procesar la tarea.
- Eliminar la tarea.
Múltiples nodos
Pensemos por un momento en cómo una configuración multinodo puede alterar este flujo.
Si varios trabajadores van a consultar tareas disponibles de forma concurrente, probablemente encontrarán la misma tarea, y sólo uno de ellos podrá procesarla con éxito, mientras que los demás trabajadores tendrán que repetir el bucle (ejecutar una nueva consulta) para obtener nuevas tareas.
Podemos aplicar entonces otro enfoque:
- Consulta de ids de tareas disponibles, limitando la cantidad de ids al número de trabajadores.
- Para cada id de tarea, intente bloquear la tarea. Al primer bloqueo con éxito, pasa al 4.
- Si no se ha bloqueado ninguna tarea, repite el bucle.
- Procesar la tarea.
- Eliminar la tarea.
En el mejor de los casos, cada trabajador podrá bloquear con éxito una tarea al primer intento. En el peor de los casos, los trabajadores tendrán que intentar bloquear varios documentos sin éxito. En una ejecución media, los trabajadores bloquearán las tareas con éxito, quizá después de haber intentado bloquear algunas otras.
Tenemos que llegar a un compromiso entre la frecuencia con la que queremos consultar la base de datos y el número de intentos de bloqueo fallidos que podemos soportar. En general, intentar bloquear documentos será mucho más rápido que ejecutar consultas N1QL.
El código
Veamos algunos ejemplos de código relevantes:
El productor
La generación de la tarea puede resumirse en esta función:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func NuevaTarea(executeAt tiempo.Tiempo, contenido cadena) (*Tarea, error) { si executeAt.IsZero() { devolver nil, errores.Nuevo("executeAt no debe ser un tiempo cero") } taskUUID, err := uuid.NuevoV1() // github.com/satori/go.uuid si err != nil { devolver nil, err } // Convertir time.Time en int64 milisegundos executeAtMillis := executeAt.UnixNano() / int64(tiempo.Milisegundo) tarea := Tarea{ Id: taskUUID.Cadena(), EjecutarEn: executeAtMillis, Contenido: contenido, } devolver &tarea, nil } |
Una vez que generamos un objeto de tarea válido, podemos simplemente insertarlo en nuestro cubo con:
1 |
_, err := controlador.cubo.Inserte(tarea.Id, tarea, 0) |
El consumidor
Podemos obtener y bloquear un documento por id, usando este código:
1 2 3 |
// El uso de valores cero para el tiempo de bloqueo fijará el tiempo máximo disponible. tarea := nuevo(Tarea) bloqueadoCAS, err := controlador.cubo.GetAndLock(taskId, 0, &tarea) |
Se puede eliminar una tarea utilizando este código:
1 |
_, err := controlador.cubo.Eliminar(taskId, bloqueadoCAS) |
El código del consumidor principal puede resumirse con el siguiente fragmento:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
taskIds, err := couchbaseController.QueryNextTaskIds(consumersCount) ... si len(taskIds) == 0 { ... // No se han encontrado tareas, reinicia el bucle } var taskId cadena var tarea *interno.Tarea var bloqueadoCAS gocb.Cas para _, taskId = gama taskIds { // Bloquear y obtener la tarea, para que sólo este consumidor la procese tarea, bloqueadoCAS, err = couchbaseController.GetAndLockTask(taskId) si err != nil { ... // Error al obtener la tarea, pasar a la siguiente de la lista continuar } // ¡Tarea bloqueada con éxito! // Salir para procesarlo romper } si tarea == nil { ... // No se ha podido bloquear ninguna tarea, reiniciar bucle } // Procesamiento real de la tarea // Mejora: también podría devolver un error, lo que permitiría que la tarea se // procesado por otro trabajador más tarde. processTask(tarea) /* Eliminar la tarea de Couchbase. La tarea estará bloqueada en ese momento, lo que significa que tenemos que proporcionar el valor valor CAS actual, para que el productor esté autorizado a retirarlo. */ err = couchbaseController.EliminarTarea(taskId, bloqueadoCAS) ... |
Conclusión
En este post hemos visto una forma de crear un sistema fiable de tareas cronometradas distribuidas usando Couchbase y Go.
Este sistema podría seguir desarrollándose:
- Soporte para el tratamiento de errores.
- Implementar una función de reintento (si el procesamiento falla, reprogramar la tarea en el futuro).
- Mejora de la lógica de bloqueo mediante:
- Ajuste del número máximo de identificadores de tarea devueltos (en lugar del recuento de trabajadores por defecto).
- Soportar una duración de procesamiento de tareas superior a 15 segundos (el tiempo máximo de bloqueo de un documento en Couchbase).
Gracias por su tiempo y ¡feliz desarrollo!
Este post forma parte del Programa de escritura comunitaria