Alberto Marchetti é um desenvolvedor full-stack e autor de "RenderScript: computação paralela no Android, de maneira fácil." Ele está sempre vivendo no limite, mergulhando constantemente na descoberta de idiomas e tecnologias modernas.
Tarefas cronometradas usando Couchbase e Go
Nesta postagem, mostrarei como você pode explorar o sistema de indexação do Couchbase para criar um sistema distribuído de tarefas cronometradas. O código de exemplo do projeto, juntamente com suas instruções de execução, pode ser encontrado em https://github.com/cmaster11/cb-blog-timed-tasks.
Isenção de responsabilidade: devido à complexidade do tópico, apenas os trechos de código relevantes são publicados nesta página.
O conceito
Vamos tentar definir os requisitos desse sistema:
- O principal recurso de um sistema de tarefas cronometradas é poder especificar quando uma determinada tarefa será executada no tempo. Isso pode ser feito com o uso de um campo ExecuteAt, que conterá o tempo de execução desejado (Hora do Unixcom base em milissegundos).
- Um requisito moderno de software é que ele deve suportar um ambiente com vários nós, o que significa que precisa ser um sistema distribuído. Devemos então garantir que vários workers NÃO processem as mesmas tarefas! Podemos usar um recurso interessante do Couchbase aqui, bloqueio pessimistaque permitirá que um funcionário busque um documento e o bloqueie, de modo que nenhum outro funcionário possa processá-lo.
A seguir, uma possível estrutura para representar nossa tarefa:
1 2 3 4 5 6 7 8 9 |
tipo Tarefa estrutura { Id string // O tempo de execução da tarefa desejado ExecuteAt int64 // Conteúdo específico da tarefa Conteúdo string } |
Recursos do Couchbase
Primeiro, aqui está uma visão geral dos recursos do Couchbase que usaremos:
META()
Cada documento em um bucket do Couchbase tem um documento META()associado, que contém informações específicas da entidade do documento, como:
- id - a chave do documento dentro do bucket.
- cas - um número int64, usado pelo Couchbase para evitar condições da corrida durante a edição de documentos.
- expiração - quando um documento deve expirar, ou 0 se ele nunca expirar.
Dica: Esses campos (por exemplo, META().cas) pode ser indexado (a partir do Couchbase 5.0).
CAS (Check and Set)
Ao buscar um documento, seu valor CAS também é retornado, e as chamadas subsequentes para alterar o documento podem especificar esse valor para garantir que editarão a versão desejada do documento.
Exemplo:
- O cliente A busca um documento e seu valor CAS atual é 1234.
- O cliente B edita o documento, o que altera o valor CAS para 5678.
- Se A tentar editar o documento sem fornecer o valor CAS, a edição será bem-sucedida, mas as alterações feitas por B serão perdidas.
- Se A tentar editar o documento fornecendo o valor CAS (1234), será retornado um erro porque o valor atual (5678) é diferente. O cliente A precisará, então, buscar o documento novamente e reexecutar o processo.
O valor CAS é uma ferramenta extremamente útil para garantir que não estamos substituindo ou alterando uma versão errada/nova de um documento, perdendo suas alterações.
Bloqueio pessimista
O Couchbase nos permite "bloquear" um documento, de modo que ele só possa ser lido e gravado por um cliente de cada vez, usando gocb.GetAndLock Função do Go SDK.
1 2 3 4 5 6 |
// Bloquear o documento lockTime := 10 // segundos bloqueadoCAS, erro := balde.GetAndLock(documentKey, lockTime, &outStruct) // Desbloqueie-o _, erro = balde.Desbloqueio(documentKey, bloqueadoCAS) |
Quando um documento é bloqueado, todas as outras solicitações de bloqueio/mutação/desbloqueio geram um erro (ainda é possível simplesmente obter o documento), a menos que o valor CAS correto seja usado.
Observação: O tempo máximo de bloqueio de um documento é de 15 segundos, e o uso de um valor de lockTime igual a 0 fará com que o tempo máximo seja definido. Isso cria uma limitação de quanto tempo uma tarefa pode ser executada antes de ser automaticamente marcada como disponível (por tempo limite de bloqueio).
Dica: Enquanto um documento estiver bloqueado, seu valor CAS retornado será -1.
Indexação e consulta
É importante observar que as duas dicas juntas nos informam que podemos indexar um campo (META().cas), que passa a ser -1 quando um documento está bloqueado. Isso também significa que podemos consultar documentos com base nessa condição!
A consulta
Vamos tentar definir uma consulta para atender aos requisitos:
- Queremos obter um ID de tarefa, que pode ser usado posteriormente para obter e bloquear o documento: SELECT Id.
- A tarefa não deve estar bloqueada: WHERE META().cas -1.
- A tarefa precisa ser executada agora: WHERE ExecuteAt <= NOW_MILLIS() (NOW_MILLIS retorna a hora atual do Unix em milissegundos).
- Precisamos buscar a tarefa mais próxima no tempo, portanto, queremos classificar as tarefas por seu tempo de execução: ORDER BY ExecuteAt ASC.
- Digamos, por enquanto (!!!), que um funcionário queira processar apenas uma tarefa por vez: LIMITE 1.
O resultado deve ser semelhante a esta consulta:
1 2 3 4 5 6 |
SELECIONAR `Id` DE `tarefas_temporizadas` // Nosso balde ONDE META().`cas` <> -1 E `ExecuteAt` <= NOW_MILLIS() ORDEM BY `ExecuteAt` ASC LIMITE 1 |
Sua execução retornará uma matriz semelhante a:
1 2 3 |
[{ "Id": "task_id_goes_here" }] |
O índice
Agora podemos planejar um índice específico da consulta, otimizado para a execução da consulta que acabamos de pensar. Os índices específicos da consulta são essenciais para melhorar o desempenho da consulta no banco de dados NoSQL.
- A consulta está verificando se um documento não está bloqueado no momento:
1ONDE META().cas <> -1. - Além disso, ele está solicitando diretamente que o tempo de execução esteja no passado. Em seguida, precisamos indexar o campo ExecuteAt.
A consulta de índice poderia ser a seguinte:
1 2 3 4 |
CRIAR ÍNDICE `idx_timed_task` ON `tarefas_temporizadas` (`ExecuteAt` ASC) ONDE META().`cas` <> -1 |
Otimização da consulta
Agora podemos otimizar ainda mais a consulta:
- Podemos dizer à consulta para usar nosso índice fornecendo um dica a ele: USE INDEX (idx_timed_task USING GSI).
- Podemos pedir ao Couchbase que aguarde a atualização do índice (geralmente a indexação é um processo assíncrono) antes de executar a consulta, para que nossos resultados contenham, com certeza, tarefas desbloqueadas, fornecendo um consistência requisito no nível do SDK: query.Consistency(gocb.RequestPlus).
O fluxo
Um fluxo possível para o loop de trabalho do consumidor da tarefa cronometrada é:
- Consulta de um ID de tarefa disponível.
- Obtenha e bloqueie a tarefa.
- Processar a tarefa.
- Excluir a tarefa.
Vários nós
Vamos pensar um pouco sobre como uma configuração de vários nós pode alterar esse fluxo.
Se vários funcionários forem consultar as tarefas disponíveis simultaneamente, eles provavelmente encontrarão a mesma tarefa e apenas um deles poderá processá-la com êxito, enquanto os outros funcionários terão que repetir o loop (executar uma nova consulta) para obter novas tarefas.
Então, podemos implementar outra abordagem:
- Consulta de ids de tarefas disponíveis, limitando a quantidade de ids ao número de trabalhadores.
- Para cada ID de tarefa, tente bloquear a tarefa. No primeiro bloqueio bem-sucedido, vá para 4.
- Se nenhuma tarefa tiver sido bloqueada com êxito, repita o loop.
- Processar a tarefa.
- Excluir a tarefa.
Na melhor das hipóteses, cada trabalhador conseguirá bloquear uma tarefa com sucesso na primeira tentativa. Na pior das hipóteses, os trabalhadores precisarão tentar bloquear vários documentos sem sucesso. Em uma execução média, os funcionários bloquearão as tarefas com êxito, talvez depois de tentar bloquear algumas outras.
Temos que estabelecer um compromisso entre a frequência com que queremos consultar o banco de dados e o número de tentativas de bloqueio com falha que podemos suportar. De modo geral, tentar bloquear documentos será muito mais rápido do que executar consultas N1QL.
O código
Vamos dar uma olhada em alguns exemplos de código relevantes:
O produtor
A geração da tarefa pode ser resumida nessa função:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func NovaTarefa(executeAt tempo.Tempo, conteúdo string) (*Tarefa, erro) { se executeAt.IsZero() { retorno nulo, erros.Novo("executeAt não deve ser um tempo zero") } taskUUID, erro := uuid.NovoV1() // github.com/satori/go.uuid se erro != nulo { retorno nulo, erro } // Converter time.Time em int64 milissegundos executeAtMillis := executeAt.UnixNano() / int64(tempo.Milissegundos) tarefa := Tarefa{ Id: taskUUID.Cordas(), ExecuteAt: executeAtMillis, Conteúdo: conteúdo, } retorno &tarefa, nulo } |
Depois de gerarmos um objeto de tarefa válido, podemos simplesmente inseri-lo em nosso bucket com:
1 |
_, erro := controlador.balde.Inserir(tarefa.Id, tarefa, 0) |
O consumidor
Podemos obter e bloquear um documento por id, usando este código:
1 2 3 |
// O uso de valores zero para o tempo de bloqueio definirá o tempo máximo disponível. tarefa := novo(Tarefa) bloqueadoCAS, erro := controlador.balde.GetAndLock(taskId, 0, &tarefa) |
Uma tarefa pode ser removida usando esse código:
1 |
_, erro := controlador.balde.Remover(taskId, bloqueadoCAS) |
O código principal do consumidor pode ser resumido com o seguinte trecho:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
taskIds, erro := couchbaseController.QueryNextTaskIds(consumersCount) ... se len(taskIds) == 0 { ... // Nenhuma tarefa foi encontrada, reinicie o loop } var taskId string var tarefa *interno.Tarefa var bloqueadoCAS gocb.Cas para _, taskId = alcance taskIds { // Bloquear e obter a tarefa, de modo que somente esse consumidor a processe tarefa, bloqueadoCAS, erro = couchbaseController.GetAndLockTask(taskId) se erro != nulo { ... // Erro ao obter a tarefa, prossiga para a próxima na lista continuar } // Tarefa bloqueada com sucesso! // Sair para processá-lo quebra } se tarefa == nulo { ... // Nenhuma tarefa pôde ser bloqueada, reinicie o loop } // Processamento real da tarefa // Aprimoramento: também poderia retornar um erro, o que permitiria que a tarefa fosse // processado por outro trabalhador posteriormente. processTask(tarefa) /* Remover a tarefa do Couchbase. A tarefa estará bloqueada no momento, o que significa que precisamos fornecer o valor CAS atual, para que o produtor seja autorizado a removê-lo. */ erro = couchbaseController.RemoveTask(taskId, bloqueadoCAS) ... |
Conclusão
Nesta postagem, vimos uma maneira de criar um sistema confiável de tarefas cronometradas distribuídas usando Couchbase e Go.
Esse sistema pode ser desenvolvido ainda mais:
- Suporte a erros de processamento.
- Implementação de um recurso de repetição (se o processamento falhar, reprogramar a tarefa no futuro).
- Aprimorar a lógica de bloqueio:
- Ajuste do número máximo de IDs de tarefas retornados (em vez da contagem padrão de trabalhadores).
- Suporte a uma duração de processamento de tarefa de mais de 15 segundos (o tempo máximo de bloqueio de um documento no Couchbase).
Obrigado por seu tempo e feliz desenvolvimento!
Esta postagem faz parte do Programa de redação comunitária