In modern data-driven applications, retaining historical documents is essential for compliance, auditing, and cost optimization. However, keeping all data indefinitely in your primary operational database is often unsustainable and expensive.
In this blog post, I’ll walk you through building a fully serverless archival pipeline that automatically moves documents from Couchbase to Amazon S3, using Couchbase Eventing, Amazon API Gateway, SNS, and AWS Lambda. The architecture demonstrates how to leverage asynchronous decoupling to improve resilience, scalability, and performance.
By the end of this tutorial, you’ll have a robust, end-to-end solution that reacts to document mutations or TTL-based expirations in Couchbase and archives them efficiently in S3—without any manual intervention.
Architecture overview
Here’s what the architecture looks like:

Flow:
-
- Couchbase detects a document condition (like TTL expiry or a custom
archive: true
flag). - A Couchbase Eventing function triggers and sends the document to an API Gateway.
- API Gateway forwards the document to an SNS topic.
- SNS invokes a Lambda function subscribed to the topic.
- Lambda writes the full JSON document into an S3 bucket using a date-based folder structure.
- Couchbase detects a document condition (like TTL expiry or a custom
This setup is decoupled, scalable, and requires zero polling.
Why Couchbase Eventing for archival?
Couchbase Eventing provides a native way to trigger business logic in response to document mutations (creates, updates, deletes) or expirations.
With Eventing, we can:
-
- Monitor specific document types or fields (like
archive === true
and/ortype === logs
) - React in real time to TTL expirations
- Push data out to external services (like AWS) via HTTP calls
- Monitor specific document types or fields (like
Writing the Couchbase Eventing function
Here’s a simplified example of the Couchbase Eventing function we use for archiving documents. The function implements logic to handle two primary scenarios:
-
- TTL-based Archiving: When a document has an
expiration
set, we register a timer that fires 60 seconds before the TTL. Once the timer expires, theDocTimerCallback
function is invoked, which then calls thepublish(doc, meta)
function to archive the document. - Flag-based Archiving: Alternatively, if a document includes the field
archive: true
, the function immediately callspublish(doc, meta)
to archive the document.
- TTL-based Archiving: When a document has an
In both cases, the document is sent to an external API Gateway for archiving. If the API response status is either 200
or 302
, the document is explicitly deleted from the source bucket, completing the archive workflow. This provides a flexible mechanism to archive documents either on demand or via TTL-based automation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
function OnUpdate(doc, meta) { if (doc.archive && doc.archive === true){ log('Archiving document with ID:', meta.id); var status = publish(doc, meta); if (status === true) { delete src[meta.id]; } else { log('Publish failed, document will not be deleted:', meta.id); } } else if (meta.expiration > 0){ var nMinsPrior = new Date((meta.expiration - 60) * 1000); var currentTime = new Date().getTime(); log('Time difference (ms): ', currentTime - nMinsPrior); if (currentTime > nMinsPrior) { log('Within 1 minute of TTL expiry, archiving:', meta.id); var publishStatus = publish(doc, meta); } else { log('Timer set for future archiving:', meta.id); createTimer(DocTimerCallback, nMinsPrior, meta.id, meta.id); } } else { log('No archiving conditions met for:', meta.id); return; } } function DocTimerCallback(context) { var doc = src[context]; if (doc) { var meta = { id: context }; var publishStatus = publish(doc, meta); } else { log('Timer callback failed: document not found for:', context); } } function publish(doc, meta) { try { var request = { path: 'archive', headers: { 'Content-Type': 'application/json' }, body: { ...doc, id: meta.id } }; log("Sending request:", request); var response = curl('POST', archive2S3, request); if (response.status === 200 || response.status === 302) { log("Publish success for:", meta.id, " Response body:", response.body); return true; } else { log("Publish failed with status:", response.status, " Request body:", request); return false; } } catch (e) { log("Exception during publish:", e); return false; } } |
Note: For performance reasons, we recommend commenting out all the
log()
statements shown above. These logs were included primarily for debugging and development purposes. Excessive logging in production environments can impact performance and increase log storage costs.
Here is how we defined the settings and bindings, while creating the eventing function.
Hit Next
to create bindings. This is where we will bind the endpoint of our API Gateway, to an alias archive2S3
and also uses source bucket as alias src
. Make a note that we used Read/Write permission for our source bucket as we would like data to be purged out from there.
Hit Next
again and copy/paste the JS function from above into the window and Save
. At this point your function is saved but not deployed. Hit three dots and select Deploy
option to run the eventing function. This is how it would look once function is running.
Building the Lambda Function to Archive to S3
The Lambda function consumes the SNS message and archives the full JSON to an S3 bucket, organized by date.
Example Lambda Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import boto3 import json from datetime import datetime s3 = boto3.client('s3') bucket_name = 'your-s3-archive-bucket' def lambda_handler(event, context): for record in event['Records']: msg = json.loads(record['Sns']['Message']) doc_id = msg['id'] content = msg now = datetime.utcnow() folder = f"{now.year}/{now.month}/{now.day}" key = f"{folder}/{doc_id}.json" s3.put_object( Bucket=bucket_name, Key=key, Body=json.dumps(content), ContentType='application/json' ) return { 'statusCode': 200, 'body': 'Archived successfully.' } |
This results in an S3 object like:
1 |
s3://your-s3-archive-bucket/2025/6/29/log123.json |
Using SNS to decouple the archival trigger
SNS (Simple Notification Service) allows multiple services to receive the same message. Here, it passes the archive request to a Lambda function.
Steps:
-
- Create a topic, e.g.,
ArchiveTriggerTopic
- Allow API Gateway to publish to it via IAM
- Subscribe the Lambda function
- Create a topic, e.g.,

Fixing permissions
Ensure API Gateway is allowed to publish to the SNS topic via a trust and access policy:
Trust Policy for snsAccess
Role:
1 2 3 4 5 6 7 8 |
{ "Effect": "Allow", "Principal": { "Service": "apigateway.amazonaws.com" }, "Action": "sts:AssumeRole" } |
Access Policy for the Role:
1 2 3 4 5 6 |
{ "Effect": "Allow", "Action": "sns:Publish", "Resource": "arn:aws:sns:us-east-1:account-id:ArchiveTriggerTopic" } |
Setting up API Gateway to accept archive requests
API Gateway acts as our public endpoint that receives the archive requests and forwards them to SNS.
Key steps:
-
- Create a REST API in API Gateway.
From the options provided select REST API
as it provides integration with the SNS
service.
Give an API name
and select API endpoint type
as Regional
. Hit Create API
button.
-
- Set up a
POST /archive
route.
- Set up a
On the next Resources
page, create a resource by hitting Create resource
button on the left pane.

Resource name
. I am calling my resource name to be archive
.
Hit Create resource
button. On the next page, under the Methods
pane hit Create method
button. This will allow us to map bunch of settings to our POST method and details about our SNS service like what AWS Region
it is running, ARN of the IAM role
that has the required permission to publish to the SNS topic.

TopicArn
. Additionally, we’ll map Message
to method.request.body
, which will contain the full payload of our JSON document.

Save
button.
Congratulations, you have just deployed an API Gateway that can POST your JSON document to the SNS Topic, which ultimately triggers the Lambda to write it to S3.
Testing the end-to-end flow
You can test your pipeline in two ways:
First test the API POST method
-
- Hit the
Test
tab and submit a simple JSON, withid
field as must.
- Hit the
When you hit the Test
button make sure the log trace shows no error and response status is 200. At this point we have our API endpoint working fine. Next we will test this service from curl.
From curl or Postman
1 2 3 4 5 6 7 8 9 |
curl -X POST https://your-api-gateway-url/archive \ -H "Content-Type: application/json" \ -d '{ "id": "hotel::10025", "type": "Hotel", "message": "Archiving via curl", "archive": true }' |
After triggering, check that a corresponding object is created in your S3 bucket.
From Capella using Query Workbench
To test the setup from the Capella, insert a document with a TTL value of 120 seconds.
1 2 3 |
UPSERT INTO bulk.data.source (KEY, VALUE) VALUES ("test::001", {"type": "test", "field": "value"}, {"expiration": 2*60}); |
Execute the above SQL command from query workbench and then wait for the document to appear in the configured S3 bucket approximately 60 seconds before its expiration, as the Eventing function sets a timer to trigger one minute prior to the TTL.

Troubleshooting
Here are some common issues and how to fix them:
ValidationError: message must not be null
-
- This usually means the
Message
field sent to SNS is empty. - Ensure your API Gateway mapping template is correctly extracting the body.
- This usually means the
API Gateway does not have permission to assume the role
-
- Confirm that your IAM role has the correct trust policy.
- The role should allow the
apigateway.amazonaws.com
service to assume it.
Wrong Content-Type in API request
-
- API Gateway only applies mapping templates when the content type is
application/json
. - Ensure the Couchbase Eventing function (or Postman) sets this header.
- API Gateway only applies mapping templates when the content type is
SNS receives escaped or malformed JSON
-
- Double-check your use of
$util.escapeJavaScript($input.body)
in the mapping template. - Incorrect escaping can cause issues in downstream Lambda parsing.
- Double-check your use of
CloudWatch logs to inspect Lambda
-
- Monitor the execution trace of Lambda function to confirm everything ran as expected
Enhancements & best practices
-
- Use environment variables in Lambda for S3 bucket name and region.
- Enable S3 server-side encryption (SSE-S3 or SSE-KMS) for compliance.
- Turn on S3 versioning to preserve historical copies.
- Add CloudWatch alarms for Lambda errors or API Gateway 5XXs.
- Use SNS fan-out to notify additional consumers (e.g., Kinesis, other Lambdas).
- Consider replacing SNS with direct Lambda integration if you only have one consumer and want simplified permissions.
Conclusion
In this blog post, we built a robust, real-time document archival pipeline using:
-
- Couchbase Eventing to detect archivable documents
- API Gateway to expose a public endpoint
- SNS to decouple producers from consumers
- Lambda to process and save documents into S3
This architecture is fully serverless, scales effortlessly, and is a cost-effective way to offload historical data for retention, compliance, or analysis.
Resources
To help you dive deeper and expand your knowledge on the technologies used in this pipeline, here are some valuable resources: