In modern data-driven applications, retaining historical documents is essential for compliance, auditing, and cost optimization. However, keeping all data indefinitely in your primary operational database is often unsustainable and expensive.
In this blog post, I’ll walk you through building a fully serverless archival pipeline that automatically moves documents from Couchbase to Amazon S3, using Couchbase Eventing, Amazon API Gateway, SNS, and AWS Lambda. The architecture demonstrates how to leverage asynchronous decoupling to improve resilience, scalability, and performance.
By the end of this tutorial, you’ll have a robust, end-to-end solution that reacts to document mutations or TTL-based expirations in Couchbase and archives them efficiently in S3—without any manual intervention.
Architecture overview
Here’s what the architecture looks like:
Flow:
-
- Couchbase detects a document condition (like TTL expiry or a custom
archive: trueflag). - A Couchbase Eventing function triggers and sends the document to an API Gateway.
- API Gateway forwards the document to an SNS topic.
- SNS invokes a Lambda function subscribed to the topic.
- Lambda writes the full JSON document into an S3 bucket using a date-based folder structure.
- Couchbase detects a document condition (like TTL expiry or a custom
This setup is decoupled, scalable, and requires zero polling.
Why Couchbase Eventing for archival?
Couchbase Eventing provides a native way to trigger business logic in response to document mutations (creates, updates, deletes) or expirations.
With Eventing, we can:
-
- Monitor specific document types or fields (like
archive === trueand/ortype === logs) - React in real time to TTL expirations
- Push data out to external services (like AWS) via HTTP calls
- Monitor specific document types or fields (like
Writing the Couchbase Eventing function
Here’s a simplified example of the Couchbase Eventing function we use for archiving documents. The function implements logic to handle two primary scenarios:
-
- TTL-based Archiving: When a document has an
expirationset, we register a timer that fires 60 seconds before the TTL. Once the timer expires, theDocTimerCallbackfunction is invoked, which then calls thepublish(doc, meta)function to archive the document. - Flag-based Archiving: Alternatively, if a document includes the field
archive: true, the function immediately callspublish(doc, meta)to archive the document.
- TTL-based Archiving: When a document has an
In both cases, the document is sent to an external API Gateway for archiving. If the API response status is either 200 or 302, the document is explicitly deleted from the source bucket, completing the archive workflow. This provides a flexible mechanism to archive documents either on demand or via TTL-based automation.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
function OnUpdate(doc, meta) { if (doc.archive && doc.archive === true){ log('Archiving document with ID:', meta.id); var status = publish(doc, meta); if (status === true) { delete src[meta.id]; } else { log('Publish failed, document will not be deleted:', meta.id); } } else if (meta.expiration > 0){ var nMinsPrior = new Date((meta.expiration - 60) * 1000); var currentTime = new Date().getTime(); log('Time difference (ms): ', currentTime - nMinsPrior); if (currentTime > nMinsPrior) { log('Within 1 minute of TTL expiry, archiving:', meta.id); var publishStatus = publish(doc, meta); } else { log('Timer set for future archiving:', meta.id); createTimer(DocTimerCallback, nMinsPrior, meta.id, meta.id); } } else { log('No archiving conditions met for:', meta.id); return; } } function DocTimerCallback(context) { var doc = src[context]; if (doc) { var meta = { id: context }; var publishStatus = publish(doc, meta); } else { log('Timer callback failed: document not found for:', context); } } function publish(doc, meta) { try { var request = { path: 'archive', headers: { 'Content-Type': 'application/json' }, body: { ...doc, id: meta.id } }; log("Sending request:", request); var response = curl('POST', archive2S3, request); if (response.status === 200 || response.status === 302) { log("Publish success for:", meta.id, " Response body:", response.body); return true; } else { log("Publish failed with status:", response.status, " Request body:", request); return false; } } catch (e) { log("Exception during publish:", e); return false; } } |
Note: For performance reasons, we recommend commenting out all the
log()statements shown above. These logs were included primarily for debugging and development purposes. Excessive logging in production environments can impact performance and increase log storage costs.
Here is how we defined the settings and bindings, while creating the eventing function.
Hit Next to create bindings. This is where we will bind the endpoint of our API Gateway, to an alias archive2S3 and also uses source bucket as alias src. Make a note that we used Read/Write permission for our source bucket as we would like data to be purged out from there.
Hit Next again and copy/paste the JS function from above into the window and Save. At this point your function is saved but not deployed. Hit three dots and select Deploy option to run the eventing function. This is how it would look once function is running.
Building the Lambda Function to Archive to S3
The Lambda function consumes the SNS message and archives the full JSON to an S3 bucket, organized by date.
Example Lambda Code
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import boto3 import json from datetime import datetime s3 = boto3.client('s3') bucket_name = 'your-s3-archive-bucket' def lambda_handler(event, context): for record in event['Records']: msg = json.loads(record['Sns']['Message']) doc_id = msg['id'] content = msg now = datetime.utcnow() folder = f"{now.year}/{now.month}/{now.day}" key = f"{folder}/{doc_id}.json" s3.put_object( Bucket=bucket_name, Key=key, Body=json.dumps(content), ContentType='application/json' ) return { 'statusCode': 200, 'body': 'Archived successfully.' } |
This results in an S3 object like:
|
1 |
s3://your-s3-archive-bucket/2025/6/29/log123.json |
Using SNS to decouple the archival trigger
SNS (Simple Notification Service) allows multiple services to receive the same message. Here, it passes the archive request to a Lambda function.
Steps:
-
- Create a topic, e.g.,
ArchiveTriggerTopic - Allow API Gateway to publish to it via IAM
- Subscribe the Lambda function
- Create a topic, e.g.,
Fixing permissions
Ensure API Gateway is allowed to publish to the SNS topic via a trust and access policy:
Trust Policy for snsAccess Role:
|
1 2 3 4 5 6 7 8 |
{ "Effect": "Allow", "Principal": { "Service": "apigateway.amazonaws.com" }, "Action": "sts:AssumeRole" } |
Access Policy for the Role:
|
1 2 3 4 5 6 |
{ "Effect": "Allow", "Action": "sns:Publish", "Resource": "arn:aws:sns:us-east-1:account-id:ArchiveTriggerTopic" } |
Setting up API Gateway to accept archive requests
API Gateway acts as our public endpoint that receives the archive requests and forwards them to SNS.
Key steps:
-
- Create a REST API in API Gateway.
From the options provided select REST API as it provides integration with the SNS service.
Give an API name and select API endpoint type as Regional. Hit Create API button.
-
- Set up a
POST /archiveroute.
- Set up a
On the next Resources page, create a resource by hitting Create resource button on the left pane.
Resource name. I am calling my resource name to be archive.
Hit Create resource button. On the next page, under the Methods pane hit Create method button. This will allow us to map bunch of settings to our POST method and details about our SNS service like what AWS Region it is running, ARN of the IAM role that has the required permission to publish to the SNS topic.
TopicArn. Additionally, we’ll map Message to method.request.body, which will contain the full payload of our JSON document.
Save button.
Congratulations, you have just deployed an API Gateway that can POST your JSON document to the SNS Topic, which ultimately triggers the Lambda to write it to S3.
Testing the end-to-end flow
You can test your pipeline in two ways:
First test the API POST method
-
- Hit the
Testtab and submit a simple JSON, withidfield as must.
- Hit the
When you hit the Test button make sure the log trace shows no error and response status is 200. At this point we have our API endpoint working fine. Next we will test this service from curl.
From curl or Postman
|
1 2 3 4 5 6 7 8 9 |
curl -X POST https://your-api-gateway-url/archive \ -H "Content-Type: application/json" \ -d '{ "id": "hotel::10025", "type": "Hotel", "message": "Archiving via curl", "archive": true }' |
After triggering, check that a corresponding object is created in your S3 bucket.
From Capella using Query Workbench
To test the setup from the Capella, insert a document with a TTL value of 120 seconds.
|
1 2 3 |
UPSERT INTO bulk.data.source (KEY, VALUE) VALUES ("test::001", {"type": "test", "field": "value"}, {"expiration": 2*60}); |
Execute the above SQL command from query workbench and then wait for the document to appear in the configured S3 bucket approximately 60 seconds before its expiration, as the Eventing function sets a timer to trigger one minute prior to the TTL.
Troubleshooting
Here are some common issues and how to fix them:
ValidationError: message must not be null
-
- This usually means the
Messagefield sent to SNS is empty. - Ensure your API Gateway mapping template is correctly extracting the body.
- This usually means the
API Gateway does not have permission to assume the role
-
- Confirm that your IAM role has the correct trust policy.
- The role should allow the
apigateway.amazonaws.comservice to assume it.
Wrong Content-Type in API request
-
- API Gateway only applies mapping templates when the content type is
application/json. - Ensure the Couchbase Eventing function (or Postman) sets this header.
- API Gateway only applies mapping templates when the content type is
SNS receives escaped or malformed JSON
-
- Double-check your use of
$util.escapeJavaScript($input.body)in the mapping template. - Incorrect escaping can cause issues in downstream Lambda parsing.
- Double-check your use of
CloudWatch logs to inspect Lambda
-
- Monitor the execution trace of Lambda function to confirm everything ran as expected
Enhancements & best practices
-
- Use environment variables in Lambda for S3 bucket name and region.
- Enable S3 server-side encryption (SSE-S3 or SSE-KMS) for compliance.
- Turn on S3 versioning to preserve historical copies.
- Add CloudWatch alarms for Lambda errors or API Gateway 5XXs.
- Use SNS fan-out to notify additional consumers (e.g., Kinesis, other Lambdas).
- Consider replacing SNS with direct Lambda integration if you only have one consumer and want simplified permissions.
Conclusion
In this blog post, we built a robust, real-time document archival pipeline using:
-
- Couchbase Eventing to detect archivable documents
- API Gateway to expose a public endpoint
- SNS to decouple producers from consumers
- Lambda to process and save documents into S3
This architecture is fully serverless, scales effortlessly, and is a cost-effective way to offload historical data for retention, compliance, or analysis.
Resources
To help you dive deeper and expand your knowledge on the technologies used in this pipeline, here are some valuable resources: