Alberto Marchetti es desarrollador full-stack y autor de "RenderScript: computación paralela en Android, de forma sencilla." Vive siempre al límite, lanzándose constantemente al descubrimiento de lenguas y tecnologías modernas.

Tareas cronometradas utilizando Couchbase y Go
En este post voy a mostrar cómo puedes explotar el sistema de indexación de Couchbase para crear un sistema distribuido de tareas cronometradas. El código de ejemplo para el proyecto, junto con sus instrucciones de ejecución, se puede encontrar en https://github.com/cmaster11/cb-blog-timed-tasks.
Descargo de responsabilidad: Debido a la complejidad del tema, en esta página sólo se publican extractos de código relevantes.
El concepto
Intentemos definir los requisitos de un sistema de este tipo:
- La principal característica de un sistema de tareas temporizadas es poder especificar cuándo se ejecutará una determinada tarea en el tiempo. Esto se puede conseguir utilizando un campo ExecuteAt, que contendrá el tiempo de ejecución deseado (Hora Unixen milisegundos).
- Un requisito del software moderno es que debe soportar un entorno multinodo, lo que significa que debe ser un sistema distribuido. Debemos asegurarnos de que varios trabajadores NO procesen las mismas tareas. Aquí podemos usar una buena característica de Couchbase, cierre pesimistaque permitirá a un trabajador obtener un documento y bloquearlo para que ningún otro trabajador pueda procesarlo.
La siguiente es una posible estructura para representar nuestra tarea:
|
1 2 3 4 5 6 7 8 9 |
type Task struct { Id string // The desired task execution time ExecuteAt int64 // Task-specific content Content string } |
Características de Couchbase
En primer lugar, he aquí una visión general de las características de Couchbase que vamos a utilizar:
META()
Cada documento de un bucket de Couchbase tiene un documento META()asociado, que contiene información específica de la entidad del documento, como:
- id - la clave del documento dentro del cubo.
- cas - un número int64, usado por Couchbase para prevenir condiciones de carrera durante la edición de documentos.
- expiración - cuando un documento está destinado a expirar, o 0 si nunca expirará.
Sugerencia: Estos campos (por ejemplo, META().cas) puede indexarse (a partir de Couchbase 5.0).
CAS (Comprobar y fijar)
Cuando se obtiene un documento, también se devuelve su valor CAS, y las llamadas posteriores para modificar el documento pueden especificar este valor para asegurarse de que van a editar la versión deseada del documento.
Por ejemplo:
- El cliente A obtiene un documento y su valor CAS actual es 1234.
- El cliente B edita el documento, lo que altera el valor CAS a 5678.
- Si A intenta editar el documento sin proporcionar el valor CAS, la edición se realizará correctamente, pero los cambios realizados por B se perderán.
- Si A intenta editar el documento proporcionando el valor CAS (1234), se devolverá un error porque el actual (5678) es diferente. El cliente A tendrá que recuperar el documento de nuevo y volver a ejecutar el proceso.
El valor CAS es una herramienta extremadamente útil para asegurarnos de que no estamos reemplazando o alterando una versión incorrecta/nueva de un documento, perdiendo sus cambios.
Cierre pesimista
Couchbase nos permite "bloquear" un documento, para que sólo pueda ser leído y escrito por un cliente a la vez, utilizando gocb.GetAndLock Función Go SDK.
|
1 2 3 4 5 6 |
// Lock the document lockTime := 10 // seconds lockedCAS, err := bucket.GetAndLock(documentKey, lockTime, &outStruct) // Unlock it _, err = bucket.Unlock(documentKey, lockedCAS) |
Cuando un documento está bloqueado, cualquier otra solicitud para bloquearlo/mutarlo/desbloquearlo arrojará un error (sigue siendo posible simplemente obtener el documento), a menos que se utilice el valor CAS correcto.
Nota: El tiempo máximo de bloqueo de un documento es de 15 segundos, y el uso de un valor lockTime de 0 hará que se establezca el tiempo máximo. Esto crea una limitación sobre cuánto tiempo puede ejecutarse una tarea antes de ser marcada automáticamente como disponible (por tiempo de espera de bloqueo).
Pista: Mientras un documento está bloqueado, su valor CAS devuelto es -1.
Indexación y consulta
Cabe destacar que las dos sugerencias juntas nos indican que podemos indexar un campo (META().cas), que se convierte en -1 cuando un documento está bloqueado. También significa que podemos consultar documentos basándonos en esta condición.
La consulta
Intentemos definir una consulta que se ajuste a los requisitos:
- Queremos obtener un id de tarea, que puede ser utilizado más tarde para obtener y bloquear el documento: SELECT Id.
- La tarea no debe estar ya bloqueada: WHERE META().cas -1.
- La tarea debe ejecutarse ahora: WHERE ExecuteAt <= NOW_MILLIS() (AHORA_MILLIS devuelve el tiempo Unix actual en milisegundos).
- Necesitamos obtener la tarea más cercana en el tiempo, por lo que queremos ordenar las tareas por su tiempo de ejecución: ORDER BY ExecuteAt ASC.
- Digamos por ahora (!!!) que un trabajador querrá que sólo se procese una tarea a la vez: LIMIT 1.
El resultado debería ser similar a esta consulta:
|
1 2 3 4 5 6 |
SELECT `Id` FROM `timed_tasks` // Our bucket WHERE META().`cas` <> -1 AND `ExecuteAt` <= NOW_MILLIS() ORDER BY `ExecuteAt` ASC LIMIT 1 |
Su ejecución devolverá un array similar a:
|
1 2 3 |
[{ "Id": "task_id_goes_here" }] |
El índice
Ahora podemos planificar un índice específico para la consulta, optimizado para la ejecución de la consulta en la que acabamos de pensar. Los índices específicos de consulta son imprescindibles para mejorar el rendimiento de las consultas en bases de datos NoSQL.
- La consulta está comprobando que un documento no está bloqueado actualmente:
1WHERE META().cas <> -1. - Además, está pidiendo directamente que el tiempo de ejecución esté en el pasado. Entonces necesitamos indexar el campo ExecuteAt.
La consulta del índice podría ser entonces la siguiente:
|
1 2 3 4 |
CREATE INDEX `idx_timed_task` ON `timed_tasks` (`ExecuteAt` ASC) WHERE META().`cas` <> -1 |
Optimización de la consulta
Ahora podemos optimizar aún más la consulta:
- Podemos indicar a la consulta que utilice nuestro índice proporcionando una variable pista a la misma: USE INDEX (idx_timed_task USING GSI).
- Podemos pedir a Couchbase que espere a que el índice esté actualizado (normalmente la indexación es un proceso asíncrono) antes de ejecutar la consulta, de forma que nuestros resultados contengan con seguridad tareas desbloqueadas, proporcionando una directiva coherencia requisito a nivel del SDK: query.Consistency(gocb.RequestPlus).
El flujo
Un posible flujo para el bucle de trabajador consumidor de tarea temporizada es:
- Consulta de un identificador de tarea disponible.
- Obtener y bloquear la tarea.
- Procesar la tarea.
- Eliminar la tarea.
Múltiples nodos
Pensemos por un momento en cómo una configuración multinodo puede alterar este flujo.
Si varios trabajadores van a consultar tareas disponibles de forma concurrente, probablemente encontrarán la misma tarea, y sólo uno de ellos podrá procesarla con éxito, mientras que los demás trabajadores tendrán que repetir el bucle (ejecutar una nueva consulta) para obtener nuevas tareas.
Podemos aplicar entonces otro enfoque:
- Consulta de ids de tareas disponibles, limitando la cantidad de ids al número de trabajadores.
- Para cada id de tarea, intente bloquear la tarea. Al primer bloqueo con éxito, pasa al 4.
- Si no se ha bloqueado ninguna tarea, repite el bucle.
- Procesar la tarea.
- Eliminar la tarea.
En el mejor de los casos, cada trabajador podrá bloquear con éxito una tarea al primer intento. En el peor de los casos, los trabajadores tendrán que intentar bloquear varios documentos sin éxito. En una ejecución media, los trabajadores bloquearán las tareas con éxito, quizá después de haber intentado bloquear algunas otras.
Tenemos que llegar a un compromiso entre la frecuencia con la que queremos consultar la base de datos y el número de intentos de bloqueo fallidos que podemos soportar. En general, intentar bloquear documentos será mucho más rápido que ejecutar consultas N1QL.
El código
Veamos algunos ejemplos de código relevantes:
El productor
La generación de la tarea puede resumirse en esta función:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func NewTask(executeAt time.Time, content string) (*Task, error) { if executeAt.IsZero() { return nil, errors.New("executeAt must not be a zero time") } taskUUID, err := uuid.NewV1() // github.com/satori/go.uuid if err != nil { return nil, err } // Convert time.Time to int64 milliseconds executeAtMillis := executeAt.UnixNano() / int64(time.Millisecond) task := Task{ Id: taskUUID.String(), ExecuteAt: executeAtMillis, Content: content, } return &task, nil } |
Una vez que generamos un objeto de tarea válido, podemos simplemente insertarlo en nuestro cubo con:
|
1 |
_, err := controller.bucket.Insert(task.Id, task, 0) |
El consumidor
Podemos obtener y bloquear un documento por id, usando este código:
|
1 2 3 |
// Using zero values for lock time will set the maximum time available. task := new(Task) lockedCAS, err := controller.bucket.GetAndLock(taskId, 0, &task) |
Se puede eliminar una tarea utilizando este código:
|
1 |
_, err := controller.bucket.Remove(taskId, lockedCAS) |
El código del consumidor principal puede resumirse con el siguiente fragmento:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
taskIds, err := couchbaseController.QueryNextTaskIds(consumersCount) ... if len(taskIds) == 0 { ... // No tasks have been found, restart the loop } var taskId string var task *internal.Task var lockedCAS gocb.Cas for _, taskId = range taskIds { // Lock and get the task, so that only this consumer will process it task, lockedCAS, err = couchbaseController.GetAndLockTask(taskId) if err != nil { ... // Error getting the task, proceed to next one in list continue } // Successfully locked task! // Move out to process it break } if task == nil { ... // No tasks could be locked, restart loop } // Actual processing of the task // Improvement: could also return an error, which would let the task be // processed by another worker later. processTask(task) /* Remove the task from Couchbase. The task will be currently locked, which means we need to provide the current CAS value, so that the producer is authorized to remove it. */ err = couchbaseController.RemoveTask(taskId, lockedCAS) ... |
Conclusión
En este post hemos visto una forma de crear un sistema fiable de tareas cronometradas distribuidas usando Couchbase y Go.
Este sistema podría seguir desarrollándose:
- Soporte para el tratamiento de errores.
- Implementar una función de reintento (si el procesamiento falla, reprogramar la tarea en el futuro).
- Mejora de la lógica de bloqueo mediante:
- Ajuste del número máximo de identificadores de tarea devueltos (en lugar del recuento de trabajadores por defecto).
- Soportar una duración de procesamiento de tareas superior a 15 segundos (el tiempo máximo de bloqueo de un documento en Couchbase).
Gracias por su tiempo y ¡feliz desarrollo!
Este post forma parte del Programa de escritura comunitaria