Nos aplicativos modernos orientados por dados, a retenção de documentos históricos é essencial para a conformidade, a auditoria e a otimização de custos. No entanto, manter todos os dados indefinidamente em seu banco de dados operacional principal geralmente é insustentável e caro.
Nesta postagem do blog, vou orientá-lo na criação de um Pipeline de arquivamento totalmente sem servidor que move automaticamente documentos de Couchbase para Amazon S3, usando Eventos do Couchbase, Gateway de API da Amazon, SNSe AWS Lambda. A arquitetura demonstra como aproveitar desacoplamento assíncrono para melhorar a resiliência, a escalabilidade e o desempenho.
Ao final deste tutorial, você terá uma solução robusta e completa que reage a mutações de documentos ou expirações baseadas em TTL no Couchbase e as arquiva com eficiência no S3, sem nenhuma intervenção manual.
Visão geral da arquitetura
Esta é a aparência da arquitetura:

Fluxo:
-
- O Couchbase detecta uma condição de documento (como expiração de TTL ou um
arquivo: true
bandeira). - Uma função Eventing do Couchbase é acionada e envia o documento para um API Gateway.
- O API Gateway encaminha o documento para um tópico do SNS.
- O SNS invoca uma função Lambda inscrita no tópico.
- O Lambda grava o documento JSON completo em um bucket S3 usando uma estrutura de pastas baseada em data.
- O Couchbase detecta uma condição de documento (como expiração de TTL ou um
Essa configuração é desacoplada, dimensionável e não requer polling.
Por que o Couchbase Eventing para arquivamento?
Eventos do Couchbase fornece uma maneira nativa de acionar a lógica comercial em resposta a mutações de documentos (criações, atualizações, exclusões) ou expirações.
Com o Eventing, podemos:
-
- Monitorar tipos ou campos de documentos específicos (como
archive === true
e/outipo === registros
) - Reagir em tempo real às expirações de TTL
- Envie dados para serviços externos (como o AWS) por meio de chamadas HTTP
- Monitorar tipos ou campos de documentos específicos (como
Escrevendo a função Eventing do Couchbase
Aqui está um exemplo simplificado da função Eventing do Couchbase que usamos para arquivar documentos. A função implementa a lógica para lidar com dois cenários principais:
-
- Arquivamento baseado em TTL: Quando um documento tem um
expiração
registramos um timer que dispara 60 segundos antes do TTL. Quando o cronômetro expira, oDocTimerCallback
é invocada, que então chama a funçãopublish(doc, meta)
para arquivar o documento. - Arquivamento baseado em bandeira: Como alternativa, se um documento incluir o campo
arquivo: true
a função chama imediatamentepublish(doc, meta)
para arquivar o documento.
- Arquivamento baseado em TTL: Quando um documento tem um
Em ambos os casos, o documento é enviado a um API Gateway externo para arquivamento. Se o status da resposta da API for 200
ou 302
o documento é explicitamente excluído do bucket de origem, concluindo o fluxo de trabalho de arquivamento. Isso oferece um mecanismo flexível para arquivar documentos sob demanda ou por meio de automação baseada em TTL.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
função Sobre a atualização(doc, meta) { se (doc.arquivo && doc.arquivo === verdadeiro){ registro('Arquivamento de documento com ID:', meta.id); var status = publicar(doc, meta); se (status === verdadeiro) { excluir src[meta.id]; } mais { registro('Falha na publicação, o documento não será excluído:', meta.id); } } mais se (meta.expiração > 0){ var nMinsPrior = novo Data((meta.expiração - 60) * 1000); var currentTime = novo Data().getTime(); registro('Diferença de tempo (ms): ', currentTime - nMinsPrior); se (currentTime > nMinsPrior) { registro('Dentro de 1 minuto após a expiração do TTL, arquivamento:', meta.id); var status de publicação = publicar(doc, meta); } mais { registro('Temporizador definido para arquivamento futuro:', meta.id); createTimer(DocTimerCallback, nMinsPrior, meta.id, meta.id); } } mais { registro('Nenhuma condição de arquivamento foi atendida para:', meta.id); retorno; } } função DocTimerCallback(contexto) { var doc = src[contexto]; se (doc) { var meta = { id: contexto }; var status de publicação = publicar(doc, meta); } mais { registro('Falha na chamada de retorno do cronômetro: documento não encontrado para:', contexto); } } função publicar(doc, meta) { tentar { var solicitação = { caminho: 'arquivo', cabeçalhos: { 'Content-Type': 'application/json' }, corpo: { ...doc, id: meta.id } }; registro("Sending request:" (Enviando solicitação:), solicitação); var resposta = enrolar('POST', arquivo2S3, solicitação); se (resposta.status === 200 || resposta.status === 302) { registro("Sucesso de publicação para:", meta.id, " Corpo da resposta:", resposta.corpo); retorno verdadeiro; } mais { registro("A publicação falhou com o status:", resposta.status, " Corpo da solicitação:", solicitação); retorno falso; } } captura (e) { registro("Exceção durante a publicação:", e); retorno falso; } } |
Observação: Por motivos de desempenho, recomendamos comentar todos os
log()
mostradas acima. Esses registros foram incluídos principalmente para fins de depuração e desenvolvimento. O excesso de registros em ambientes de produção pode afetar o desempenho e aumentar os custos de armazenamento de registros.
Veja como definimos as configurações e os vínculos ao criar a função de eventos.
Acertar Próximo
para criar associações. É aqui que vincularemos o endpoint do nosso API Gateway a um alias arquivo2S3
e também usa o bucket de origem como alias src
. Observe que usamos a permissão de leitura/gravação para o nosso bucket de origem, pois gostaríamos que os dados fossem removidos de lá.
Acertar Próximo
novamente e copie/cole a função JS acima na janela e Salvar
. Nesse ponto, sua função está salva, mas não implantada. Pressione três pontos e selecione Implementar
para executar a função de eventos. Esta é a aparência quando a função estiver em execução.
Criação da função Lambda para arquivar no S3
A função Lambda consome a mensagem do SNS e arquiva o JSON completo em um bucket do S3, organizado por data.
Exemplo de código Lambda
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
importação boto3 importação json de data e hora importação data e hora s3 = boto3.cliente('s3') nome_do_balde = 'your-s3-archive-bucket' def manipulador lambda(evento, contexto): para registro em evento['Registros']: mensagem = json.cargas(registro['Sns']['Mensagem']) doc_id = mensagem['id'] conteúdo = mensagem agora = data e hora.utcnow() pasta = f"{now.year}/{now.month}/{now.day}" chave = f"{folder}/{doc_id}.json" s3.put_object( Balde=nome_do_balde, Chave=chave, Corpo=json.lixeiras(conteúdo), ContentType='application/json' ) retorno { 'statusCode': 200, 'corpo': Arquivado com sucesso. } |
Isso resulta em um objeto S3 como:
1 |
s3://your-s3-archive-bucket/2025/6/29/log123.json |
Uso do SNS para desacoplar o acionador de arquivamento
SNS (Serviço de Notificação Simples) permite que vários serviços recebam a mesma mensagem. Aqui, ele passa a solicitação de arquivamento para uma função Lambda.
Passos:
-
- Crie um tópico, por exemplo,
ArchiveTriggerTopic
- Permitir que o API Gateway publique nele por meio do IAM
- Assinar a função Lambda
- Crie um tópico, por exemplo,

Correção de permissões
Certifique-se de que o API Gateway tenha permissão para publicar no tópico do SNS por meio de uma política de confiança e acesso:
Política de confiança para snsAccess
Função:
1 2 3 4 5 6 7 8 |
{ "Efeito": "Permitir", "Principal": { "Serviço": "apigateway.amazonaws.com" }, "Ação": "sts:AssumeRole" } |
Política de acesso para a função:
1 2 3 4 5 6 |
{ "Efeito": "Permitir", "Ação": "sns:Publish", "Recurso": "arn:aws:sns:us-east-1:account-id:ArchiveTriggerTopic" } |
Configuração do API Gateway para aceitar solicitações de arquivamento
O API Gateway atua como nosso ponto de extremidade público que recebe as solicitações de arquivo e as encaminha para o SNS.
Principais etapas:
-
- Criar um API REST no API Gateway.
Entre as opções fornecidas, selecione API REST
pois oferece integração com o SNS
serviço.
Dê um Nome da API
e selecione Tipo de ponto de extremidade da API
como Regional
. Acertar Criar API
botão.
-
- Configurar um
POST /archive
rota.
- Configurar um
Na próxima Recursos
crie um recurso pressionando Criar recurso
no painel esquerdo.

Nome do recurso
. Estou chamando meu nome de recurso para ser arquivo
.
Acertar Criar recurso
botão. Na página seguinte, na seção Métodos
painel atingido Criar método
button. Isso nos permitirá mapear várias configurações para nosso método POST e detalhes sobre nosso SNS serviço como o que Região AWS
que está sendo executado, ARN do Função de IAM
que tenha a permissão necessária para publicar no tópico do SNS.

TopicArn
. Além disso, mapearemos Mensagem
para method.request.body
que conterá a carga útil completa do nosso documento JSON.

Salvar
botão.
Parabéns, você acabou de implantar um Gateway de API que pode fazer o POST de seu documento JSON para o Tópico do SNSque, por fim, aciona o Lambda para escrevê-lo em S3.
Teste do fluxo de ponta a ponta
Você pode testar seu pipeline de duas maneiras:
Primeiro, teste o método POST da API
-
- Pressione o botão
Teste
e enviar um JSON simples, comid
como obrigatório.
- Pressione o botão
Quando você atinge o Teste
certifique-se de que o rastreamento do registro não mostre nenhum erro e que o status da resposta seja 200. Neste ponto, nosso endpoint de API está funcionando bem. Em seguida, testaremos esse serviço com o curl.
De curl ou Postman
1 2 3 4 5 6 7 8 9 |
enrolar -X POST https://your-api-gateway-url/archive \ -H "Content-Type: application/json" \ -d '{ "id": "hotel::10025", "type": "Hotel", "message": "Archiving via curl", "archive": true }' |
Após o acionamento, verifique se um objeto correspondente foi criado em seu bucket S3.
Do Capella usando o Query Workbench
Para testar a configuração do Capelainserir um documento com um Valor TTL de 120 segundos.
1 2 3 |
UPSERT PARA volume.dados.fonte (CHAVE, VALOR) VALORES ("test::001", {"tipo": "teste", "campo": "valor"}, {"expiração": 2*60}); |
Execute o comando SQL acima a partir do query workbench e aguarde até que o documento apareça na janela configurada Balde S3 aproximadamente 60 segundos antes de sua expiraçãoA função Eventing define um cronômetro para acionar um minuto antes do TTL.

Solução de problemas
Aqui estão alguns problemas comuns e como corrigi-los:
ValidationError: a mensagem não deve ser nula
-
- Isso geralmente significa que o
Mensagem
enviado ao SNS está vazio. - Garanta que seu Modelo de mapeamento do API Gateway está extraindo corretamente o corpo.
- Isso geralmente significa que o
O API Gateway não tem permissão para assumir a função
-
- Confirme se a sua função de IAM tem o código correto política de confiança.
- A função deve permitir que o
apigateway.amazonaws.com
serviço para assumi-lo.
Content-Type incorreto na solicitação de API
-
- O API Gateway só aplica modelos de mapeamento quando o tipo de conteúdo é
aplicativo/json
. - Certifique-se de que a função Eventing do Couchbase (ou Postman) defina esse cabeçalho.
- O API Gateway só aplica modelos de mapeamento quando o tipo de conteúdo é
O SNS recebe JSON com escape ou malformado
-
- Verifique novamente o uso de
$util.escapeJavaScript($input.body)
no modelo de mapeamento. - O escape incorreto pode causar problemas na análise do Lambda downstream.
- Verifique novamente o uso de
Registros do CloudWatch para inspecionar o Lambda
-
- Monitorar o rastreamento da execução da função Lambda para confirmar que tudo foi executado conforme o esperado
Aprimoramentos e práticas recomendadas
-
- Uso variáveis de ambiente no Lambda para o nome e a região do bucket S3.
- Ativar Criptografia no lado do servidor S3 (SSE-S3 ou SSE-KMS) para fins de conformidade.
- Ligar Controle de versão S3 para preservar cópias históricas.
- Adicionar Alarmes do CloudWatch para erros do Lambda ou API Gateway 5XXs.
- Uso Expansão do SNS para notificar consumidores adicionais (por exemplo, Kinesis, outros Lambdas).
- Considere a possibilidade de substituir o SNS por integração direta com o Lambda se você tiver apenas um consumidor e quiser permissões simplificadas.
Conclusão
Nesta postagem do blog, criamos um pipeline de arquivamento de documentos robusto e em tempo real usando:
-
- Couchbase Eventing para detectar documentos arquiváveis
- API Gateway para expor um ponto de extremidade público
- SNS para separar produtores de consumidores
- Lambda para processar e salvar documentos no S3
Essa arquitetura é totalmente sem servidor, é dimensionada sem esforço e é uma maneira econômica de descarregar dados históricos para retenção, conformidade ou análise.
Recursos
Para ajudá-lo a se aprofundar e expandir seu conhecimento sobre as tecnologias usadas nesse pipeline, aqui estão alguns recursos valiosos: