O Serviço de eventos do Couchbase permite que você aja prontamente em relação a mutações (ou alterações) em seus dados. Todas as ações em Eventos são realizados com a execução de um lambda, uma pequena parte da lógica comercial escrita em JavaScript.
Os casos de uso comuns incluem enriquecimento de dados, arquivamento de documentos e integração com serviços REST externos. Veja mais detalhes aqui.
No blog a seguir, descreveremos como os erros durante a execução do ouvinte de eventos podem ser tratados. Ao usar um mecanismo de repetição, garantimos que a ação prevista seja executada, mesmo que o ouvinte de eventos falhe durante a execução.
Aplicativo de amostra
Como exemplo, implementamos parte de um aplicativo de comércio eletrônico que armazena os pedidos dos clientes em uma coleção do Couchbase. Assim que o status de um pedido é alterado para pagoSe o cliente não tiver uma confirmação de pedido, queremos enviar uma confirmação de pedido para o cliente.
Para isso, integramos um ouvinte de eventos do Couchbase com um serviço de e-mail. O ouvinte de eventos do Couchbase captará todas as alterações no documento do pedido, verificará se o pedido foi pago e, em seguida, chamará o serviço de e-mail para acionar a mensagem de confirmação.
O serviço de e-mail é um microsserviço autônomo que fornece um ponto de extremidade REST. Usamos o suporte cURL incorporado diretamente no Couchbase Eventing para chamar o microsserviço a partir do ouvinte de eventos.
Fluxo conceitual: À medida que os pedidos são atualizados no Couchbase, os eventos são acionados e captados por um ouvinte de eventos. Em seguida, o ouvinte de eventos chama o serviço de e-mail externo.
Serviço de e-mail que retorna um erro
O cenário descrito funciona muito bem se o serviço de e-mail está operacional. No entanto, o que acontece se o serviço de e-mail está retornando um erro? As solicitações do ouvinte de eventos para o serviço de e-mail falharão e, portanto, nenhuma mensagem de confirmação será enviada ao cliente. Como no momento da falha o evento de alteração de documento do Couchbase já foi processado, nenhum novo evento para o mesmo documento é acionado, a menos que haja outra alteração nele. Para garantir que a confirmação seja enviada, precisamos tratar o erro e implementar um mecanismo de nova tentativa. Ao fazer isso, podemos contornar qualquer problema temporário do serviço externo e, ao mesmo tempo, garantir que a confirmação seja enviada.
Há diferentes maneiras de abordar isso. No meu exemplo abaixo, optei por criar uma nova coleção chamada "retry", que armazenará referências aos documentos para os quais a execução do ouvinte de eventos falhou.
O ouvinte de eventos captará as alterações nos documentos do pedido (etapa #1) e, em seguida, chamará o serviço de e-mail (etapa #2). Se a chamada para o serviço de e-mail for bem-sucedida, o ouvinte de eventos atualizará o status da mensagem de confirmação no documento do pedido (etapa #3). No entanto, em caso de falha, um documento de nova tentativa é criado e colocado na coleção 'retry' (etapa #3*).
Manter uma referência aos documentos nos permite identificar todas as atualizações que falharam e nos permite executá-las novamente mais tarde. Isso pode ser feito por meio da intervenção manual de um operador ou por uma nova tentativa automática usando os cronômetros de eventos do Couchbase.
- Iniciamos o processo de repetição adicionando um documento com um ID de documento especificado à coleção de repetição. Um cronômetro recorrente é criado com um intervalo de cronômetro fornecido.
- Na execução do cronômetro, todos os documentos mais antigos do que um pequeno quanta de tempo no tentar novamente são atualizados. Ao adicionar um atributo como fireRetry = true para os documentos de nova tentativa, acionamos outro evento de atualização que é captado pelo ouvinte de eventos para executar o mecanismo de nova tentativa. Isso nos dá uma mutação recursiva que ilumina todos os documentos na coleção de tentativas em paralelo. A função de nova tentativa agora é executada usando todos os threads de trabalho disponíveis em paralelo.
- Um evento de atualização de documento é acionado para cada documento de nova tentativa individualmente.
- O documento de pedido correspondente é recuperado da coleção de entrada
- Agora o serviço de e-mail é chamado.
- Se a chamada para o serviço de e-mail for bem-sucedida, o ouvinte do evento atualizará o status da mensagem de confirmação no documento do pedido e removerá o documento de nova tentativa
- Em caso de falha, o documento de nova tentativa é atualizado e colocado na coleção "retry".
Revisão do código
Agora que já estabelecemos o projeto conceitual, vamos dar uma olhada no exemplo de implementação:
Pré-requisitos:
- Couchbase 7 Enterprise Edition. Executo o Couchbase como um cluster de nó único no Docker em minha máquina local. (https://docs.couchbase.com/server/current/install/getting-started-docker.html)
- Para fins de desenvolvimento, criamos um cluster do Couchbase de nó único que executa os seguintes serviços:
- Índice, consulta, eventos e serviço de dados
Observe que a instalação de um único nó não é recomendada para uso em produção.
Preparação
- Crie um bucket chamado pedidos
- Crie duas coleções no arquivo pedidos buckets _default scope:
- de entrada (isso conterá todos os pedidos recebidos)
- tentar novamente (isso conterá os documentos de nova tentativa referentes às ordens que falharam)
- Criar balde 'metadata'. Usaremos o escopo _default e a coleção _default. O bucket de metadados é usado para os metadados do Eventing.
- Criar um índice no tentar novamente coleção. O ouvinte de repetição consultará todos os documentos contidos na coleção usando N1QL, portanto, um índice precisa estar em vigor para que a consulta seja executada.
1 |
CRIAR PRIMÁRIO ÍNDICE idx_default_primary ON pedidos.Padrão.tentar novamente USO GSI; |
Documento de pedido de modelo de dados
Para fins deste aplicativo de amostra, usamos um modelo de dados leve para o documento de pedido, contendo apenas os campos relevantes. Muitos outros campos que normalmente seriam esperados em um documento de pedido são omitidos.
1 2 3 4 5 6 7 8 9 10 11 12 |
{ "email": "customer_email", "paymentStatus": "iniciado", "confirmationEmailSent": falso, "itens": [ { "name" (nome): "Almôndegas suecas 500g", "amount" (valor): 2, "unitPrice": 9.95 } ] } |
Documento de repetição do modelo de dados
O documento de nova tentativa contém alguns atributos básicos, como o ID do documento do pedido, um contador de tentativas e um registro de data e hora. O documento tipo não é necessário em nosso aplicativo, mas pode ser útil para determinar o tipo de notificação por e-mail, caso o aplicativo seja ampliado para enviar também atualizações de remessa e entrega.
1 2 3 4 5 6 |
{ "tipo": "confirmação", "docId": "order_140", "tentativa": 1, "ts": 1632775908319 } |
Serviço de e-mail MOCK
Vamos simular o serviço de e-mail usando um script Python simples que executa um servidor da Web local. O script responderá aleatoriamente com HTTP 200 OK ou com HTTP 406 para indicar uma falha.
- Atualize o endereço IP para o endereço IP de seu computador local na linha 31server = ThreadedHTTPServer(('replace with your IP', 9080), Handler)
- Inicie o script executando: python http.py
Ouvintes de eventos
Agora, com todos os preparativos prontos, podemos adicionar os dois ouvintes de eventos:
- evt_send_confirmation_email - fornece a integração com o serviço de e-mail
- evt_send_confirmation_email_retry - contém a lógica de nova tentativa
- Os ouvintes estão disponíveis aqui: https://github.com/puhhma/cb_eventing_retry_sample
- Importar os ouvintes (json ) no serviço Couchbase Eventing.
Observe que, para que os ouvintes funcionem, você precisa seguir as convenções de nomenclatura usadas neste artigo.
Revisão evt_send_confirmation_email ouvinte
Configuração do ouvinte de eventos:
- O ouvinte de eventos está ouvindo o de entrada na coleção pedidos balde.
- O metadados O bucket é usado para armazenar os metadados dos ouvintes
- Os aliases de balde bkt_order_inbound e bkt_order_retry referenciam o correspondente de entrada e tentar novamente na coleção ordem balde
- O curlEmailServiceHost especifica o alias de URL para o EmailService simulado. Certifique-se de atualizar com seu endereço IP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
// O OnUpdate é chamado para todos os documentos criados/atualizados no bucket "inbound". função Sobre a atualização(doc, meta) { // determinar se o status do documento é "pago" e se o e-mail de confirmação não foi enviado anteriormente se( doc.status do pagamento === "pago" && !doc.confirmationEmailSent ) { SendConfirmationMail(doc, meta.id); } mais { se (nível de depuração > 1) registro("Nada para fazer: " + meta.id); } } função SendConfirmationMail(doc, docId) { tentar { // criar a solicitação para o EmailService var solicitação = { caminho: 'sendConfirmation', corpo: doc }; // executar a solicitação cURL usando o alias de URL 'curlEmailServiceHost' das configurações var resposta = enrolar('POST', curlEmailServiceHost, solicitação); se (resposta.status != 200) { // isso não funcionou como esperado se (nível de depuração > 1) { registro("docId", docId, "Falha no POST do cURL response.status:",resposta.status); } // criar um documento de nova tentativa referenciando o documentId e armazenar no bucket 'retry' bkt_order_retry[docId] = { "docId": docId, "tentativa": 1, "ts": Data.agora() } } mais { se (nível de depuração > 5) { registro("Sucesso do cURL POST, enviado",docId,"response.body:",resposta.corpo); } // atualizar o status confirmationEmailSent doc.confirmationEmailSent = verdadeiro; bkt_order_inbound[docId] = doc; } } captura (e) { registro("ERRO A solicitação cURL teve uma exceção:",e) } } |
- Consulte os comentários inline para obter detalhes
- O Sobre a atualização é acionada quando um documento de pedido é atualizado ou criado
- A solicitação é construída e a solicitação HTTP POST é enviada para o EmailService usando cURL.
- O resultado é avaliado. Caso a resposta HTTP não seja bem-sucedida, um documento de nova tentativa é construído e adicionado ao arquivo tentar novamente coleção.
- O ouvinte de eventos está ouvindo o tentar novamente na coleção pedidos balde.
- O metadados O bucket é usado para armazenar os metadados dos ouvintes
- Os aliases de balde bkt_order_inbound e bkt_order_retry referenciam o correspondente de entrada e tentar novamente no balde de pedidos
- O curlEmailServiceHost especifica o alias de URL para o EmailService simulado
- O retryTimerIntervall especifica o intervalo do cronômetro em segundos.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
função Sobre a atualização(doc, meta) { se (meta.id === "allow_retrys") { // o cronômetro é inicializado com a criação de um documento com id = 'allow_retrys' CreateRetryTimer({"id": meta.id, "mode" (modo): "inicial"}); } mais se (doc.fireRetry) { // processar documentos de repetição SendConfirmationMail(doc, meta.id); } } função CreateRetryTimer(contexto) { se (nível de depuração > 2) { registro('De CreateRetryTimer: criando um cronômetro', contexto.modo, contexto.id); } // Criar um registro de data e hora 'retryTimerInterval' segundos (das configurações) a partir de agora var timerStartTime = novo Data(); // Obtenha a hora atual e adicione a ela o segundo 'retryTimerInterval'. timerStartTime.setSeconds(timerStartTime.getSeconds() + retryTimerInterval); // Criar um documento para usar como saída para nosso contexto createTimer(RetryTimerCallback, timerStartTime, contexto.id, contexto); } função RetryTimerCallback(contexto) { se (nível de depuração > 2) { registro('From RetryTimerCallback: timer fired', contexto); } // rearmar o cronômetro o mais rápido possível, para garantir que o cronômetro continue funcionando no evento // de erros posteriores ou tempos limite de script em "trabalhos recorrentes" posteriores. CreateRetryTimer({ "id": contexto.id, "mode" (modo): "via_callback" }); // Atualizar todos os documentos de retry no bucket 'retry'. Excluir o documento "allow_retys // e todos os documentos que foram criados há mais de 15 segundos, para evitar tentativas antecipadas. N1QL("UPDATE orders._default.retry SET fireRetry = true WHERE meta().id ! = 'allow_retrys' AND ts < DATE_ADD_MILLIS(NOW_MILLIS(), -15, 'second')"); } função SendConfirmationMail(retryDoc, docId) { tentar { // resolver o documento de pedido por id var doc = bkt_order_inbound[docId]; // criar a solicitação var solicitação = { caminho: 'sendConfirmation', corpo: doc }; // executar a solicitação cURL usando o alias de URL das configurações var resposta = enrolar('POST', curlEmailServiceHost, solicitação); se (resposta.status != 200) { // isso não funcionou como esperado se (nível de depuração > 1) { registro("docId", docId, "Falha no POST do cURL response.status:",resposta.status); } // incrementar a contagem de tentativas no documento de nova tentativa retryDoc.tentativa = ++retryDoc.tentativa; // Defina fireRetry = false, para evitar a execução de novas tentativas com essa alteração de documento retryDoc.fireRetry = falso; retryDoc.ts = Data.agora(); // atualizar documento de nova tentativa bkt_order_retry[docId] = retryDoc; } mais { se (nível de depuração > 5) { registro("Sucesso do cURL POST, enviado",docId,"response.body:",resposta.corpo); } doc.confirmationEmailSent = verdadeiro; bkt_order_inbound[docId] = doc; // excluir o documento de nova tentativa excluir bkt_order_retry[docId]; } } captura (e) { registro("ERRO A solicitação cURL teve uma exceção:",e) } } |
- O cronômetro é iniciado com a adição de um documento com o ID allow_retrys para o tentar novamente coleção
- O cronômetro é então inicializado e a função RetryTimerCallback associada ao cronômetro
- Quando o cronômetro é executado, a função RetryTimerCallback é chamada
- Antes de prosseguir com o mecanismo de repetição, um novo timer é criado como a primeira etapa para garantir que ele continue em execução no caso de erros posteriores
- Uma consulta N1QL é usada para atualizar todos os tentar novamente documentos no tentar novamente adicionando uma coleção fireRetry ao documento
- Cada alteração de documento resulta em um evento de atualização de documento e o mecanismo de nova tentativa é executado
- O documento do pedido é resolvido a partir do de entrada e o EmailService é chamado via cURL
- Em caso de falha, o tentar novamente é atualizado e o documento tentativa contador aumentado
Teste o aplicativo de amostra
Agora é hora de finalmente testar o aplicativo de amostra:
- Certifique-se de que o EmailService simulado esteja funcionando
- Iniciar o evt_send_confirmation_email mas mantenha o ouvinte de eventos evt_send_confirmation_email_retry O ouvinte parou por enquanto.
- Crie um documento de pedido de amostra (consulte o modelo de dados acima) no console do Couchbase
- No caso de uma resposta bem-sucedida, o confirmationEmailSent é atualizado para true no atributo ordem documento.
- Em caso de falha, um documento de nova tentativa é criado no tentar novamente coleta. Como o EmailService responderá aleatoriamente com um erro, repita a etapa #3 até que ocorra um erro.
- Agora que capturamos um erro, vamos iniciar o ouvinte do evento de nova tentativa evt_send_confirmation_email_retry
- Crie um documento com o ID 'allow_retrys'. Isso inicializará o mecanismo de repetição.
- Após um breve período, o ouvinte se tornará ativo e começará a processar os documentos no tentar novamente coleção.
- Observe que o atributo "attempt" (tentativa) é aumentado a cada atualização com falha no serviço de e-mail. Em caso de sucesso, o documento do pedido é atualizado e o atributo tentar novamente documento removido do tentar novamente coleção.
Como a resposta do Email Service Mock é aleatória, talvez seja necessário repetir as etapas acima para poder observar o comportamento previsto.
Conclusão
Neste artigo, descrevo um mecanismo de repetição para lidar com condições de erro ao integrar o Couchbase Eventing a um serviço REST externo. Essa ou outras soluções semelhantes podem ser usadas para garantir que as ações previstas sejam executadas mesmo que o serviço externo esteja temporariamente com mau funcionamento.
Ao considerar um mecanismo de nova tentativa, vários fatores precisam ser levados em conta, como o volume de novas tentativas, os threads de trabalho disponíveis para o Couchbase Eventing e as solicitações que o serviço externo pode tratar.
Você pode encontrar mais informações sobre os aspectos internos do Couchbase Eventing aqui: https://docs.couchbase.com/server/current/eventing/eventing-overview.html
Muito obrigado a Jon Strabala (gerente de produto principal da Couchbase) pela visão técnica e pelo suporte a este artigo.