Alberto Marchetti is a full-stack developer, and author of “RenderScript: parallel computing on Android, the easy way.” He is always living on the edge by constantly jumping into the discovery of modern languages and technologies.

 

Timed tasks using Couchbase and Go

In this post I’m going to show how you can exploit the Couchbase indexing system to create a timed-tasks distributed system. The example code for the project, together with its run instructions, can be found at https://github.com/cmaster11/cb-blog-timed-tasks.
Disclaimer: Because of the complexity of the topic, only relevant code extracts are posted in this page.

The concept

Let’s try to define the requirements of such a system:

  • The main feature of a timed tasks system is to be able to specify when a certain task will be executed in time. This can be achieved by using an ExecuteAt field, which will contain the desired execution time (Unix time, milliseconds-based).
  • A modern software requirement is that it must support a multi-node environment, which means it needs to be a distributed system. We must then ensure that multiple workers will NOT process the same tasks! We can use a nice Couchbase feature here, pessimistic locking, which will let a worker fetch a document and lock it down, so that no other workers can process it.

The following is a possible structure to represent our task:

 

Couchbase features

First, here’s an overview of the Couchbase features we’ll be using:

META()

Every document in a Couchbase bucket has an associated META()-document, which contains document entity-specific information like:

  • id – the document key inside the bucket.
  • cas – an int64 number, used by Couchbase to prevent race conditions while editing documents.
  • expiration – when a document is meant to expire, or 0 if it will never expire.

Hint: These fields (e.g., META().cas) can be indexed (starting from Couchbase 5.0).

CAS (Check and Set)

When fetching a document, its CAS value is returned too, and subsequent calls to alter the document can specify this value to make sure they’re going to edit the desired version of the document.

Example:

  • Client A fetches a document and its current CAS value is 1234.
  • Client B edits the document, which alters the CAS value to 5678.
  1. If A tries to edit the document without providing the CAS value, the edit will be successful, but changes made by B will be lost.
  2. If A tries to edit the document providing the CAS value (1234), an error will be returned because the current one (5678) is different. Client A will then need to fetch the document again and re-execute the process.

The CAS value is an extremely useful tool to ensure we’re not replacing or altering a wrong/newer version of a document, losing its changes.

Pessimistic locking

Couchbase lets us “lock” a document, so that it can only be read and written by one client at a time, using gocb.GetAndLock Go SDK function.

 

When a document is locked, every other request to lock/mutate/unlock it will throw an error (it’s still possible to simply get the document), unless the correct CAS value is used.

Note: The maximum lock time of a document is 15 seconds, and using a lockTime value of 0 will cause the maximum time to be set. This creates a limitation on how long a task can run before being automatically marked as available (by locking timeout).

Hint: While a document is locked, its returned CAS value is -1.

Indexing and querying

Of note two hints put together tell us that we can index a field (META().cas), which turns to -1 when a document is locked. It also means  that we can query documents based on this condition!

The query

Let’s try to define a query to match the requirements:

  • We want to get a task id, which can be used later to get-and-lock the document: SELECT Id.
  • The task should not be already locked: WHERE META().cas <> -1.
  • The task needs to be executed now: WHERE ExecuteAt <= NOW_MILLIS() (NOW_MILLIS returns the current Unix time in milliseconds).
  • We need to fetch the closest task in time, so we want to sort tasks by their execution time: ORDER BY ExecuteAt ASC.
  • Let’s say for now (!!!) that a worker will want to get only one task to process at a time: LIMIT 1.

The result should be similar to this query:

Its execution will return an array similar to:

 

The index

We can now plan a query-specific index, optimized for the execution of the query we just thought about. Query-specific indexes are a must to improve NoSQL database query performance.

  • The query is checking that a document is not currently locked:
  • Also, it’s directly asking the execution time to be in the past. We then need to index the ExecuteAt field.
    The index query could then be the following:

 

Optimizing the query

We can now further optimize the query:

  • We can tell the query to use our index by providing a hint to it: USE INDEX (idx_timed_task USING GSI).
  • We can ask Couchbase to wait for the index to be up to date (usually indexing is an asynchronous process) before executing the query, so that our results will for sure contain unlocked tasks, by providing a consistency requirement at SDK level: query.Consistency(gocb.RequestPlus).

The flow

A possible flow for the timed task consumer worker loop is:

  1. Query for an available task id.
  2. Get and lock the task.
  3. Process the task.
  4. Delete the task.

Multiple nodes

Let’s think for a second about how a multi-node setup can alter this flow.

If multiple workers are going to query for available tasks concurrently, they’d probably find the same task, and only one of them would be able to process it successfully, while the other workers will have to repeat the loop (execute a new query) to get new tasks.

We can implement then another approach:

  1. Query for available tasks ids, limiting the amount of ids to the number of workers.
  2. For each task id, try to lock the task. At first successful lock, go to 4.
  3. If no tasks have been successfully locked, repeat loop.
  4. Process the task.
  5. Delete the task.

At its best, every worker will be able to successfully lock one task at first try. At its worse, workers will need to try to lock multiple documents unsuccessfully. The average execution will see workers successfully locking tasks, maybe after trying to lock a few others.

We have to make a compromise between how frequently we want to query the database, and how many failed lock attempts we can support. Generally speaking, trying to lock documents will be much faster than executing N1QL queries.

The code

Let’s take a look at some relevant code examples:

The producer

The generation of the task can be summed up in this function:

 

Once we generate a valid task object, we can simply insert it in our bucket with:

 

The consumer

We can get and lock a document by id, using this code:

A task can be removed using this code:

The main consumer code can be summed up with the following snippet:

 

Conclusion

In this post we’ve seen a way to create a reliable distributed timed tasks system using Couchbase and Go.

This system could be further developed by:

  • Support for processing errors.
  • Implementing a retry feature (if processing fails, reschedule the task in the future).
  • Improving the locking logic by:
    • Tuning the maximum number of returned task ids (instead of the default workers count).
    • Supporting a task processing duration of more than 15 seconds (the maximum lock time of a document in Couchbase).

Thank you for your time, and happy developing!

This post is part of the Community Writing program

Author

Posted by Laura Czajkowski, Developer Community Manager, Couchbase

Laura Czajkowski is the Snr. Developer Community Manager at Couchbase overseeing the community. She’s responsible for our monthly developer newsletter.

Leave a reply