Last time, we talked about the Couchbase Kafka connector’s support for compression and IPv6. Since then we’ve enhanced the sink connector with support for setting time-to-live values for incoming documents, and added the ability to assign document IDs composed of multiple fields from the document.

Recently we’ve been focused on ensuring the Kafka connector is as fault-tolerant as possible. Today I’d like to introduce beta version 3.4 of the Kafka connector and talk about a major improvement to how rollbacks are handled after a failover. Our goal in releasing a beta is to get this feature into your hands as soon as possible so we can incorporate your feedback into the final version.

A pessimist is a realist who maintains a distributed system

Hardware fails all the time. Right now, somewhere on the globe, a burned-out I/O controller is sending up smoke signals. Couchbase nodes can and do fail, but one of the best reasons to build your app on Couchbase is that when the inevitable hardware failure occurs, recovery is a snap.

Each Couchbase bucket is partitioned into several “virtual buckets”, and each partition is replicated across nodes in the cluster. At any given time, one of the copies is “active”, meaning it’s the one that clients will talk to. (A client may read from a replica too, but you have to go out of your way to do so.) You can think of a replica as a real-time backup of an active instance.

So what happens when that smoke signal comes from a node in a Couchbase cluster? At some point, you’ll want to fail over the toasty node (removing it from the cluster) and rebalance (reassigning the failed node’s duties to the remaining nodes). For each partition that was active on the failed node, one of the replicas is promoted to “active” status. The remaining nodes also pick up the slack and start hosting additional replicas to replace the ones that were on the failed node.

Depending on the needs of your deployment, the whole failover and rebalance process can be initiated by a human pushing a button, or by an automatic failure detection algorithm. Pretty cool, right?

Now that the stage is set, let’s consider an important question: what happens when a replica is not completely up-to-date when the active instance fails?

The answer (which I hope will surprise no one) is that any changes that were not yet saved to the replica are lost. In science fiction terms, those changes were on an alternate timeline that has blinked out of existence. They have been “rolled back.”

I’m not trying to cause a big sensation / Just talkin’ ’bout rollback mitigation

What does all of this have to do with the Kafka connector? The connector uses the Database Change Protocol (DCP) to receive change notifications. The DCP protocol implementation is built for speed, and announces changes before they are even written to disk. To compensate for this optimism, the protocol also includes a rollback mechanism that effectively says, “Sorry, I spoke too soon. You know those last few changes I told you about? Well, those didn’t really happen.” This works very well in Couchbase Server where we are aware of these partitioning details and can adapt to the new realities as they happen. It’s exactly what we do with replication, full text updates, index updates, etc.

Previous versions of the Kafka connector would publish events to the Kafka topic immediately. As soon as the DCP server announced a change, that change was published to the topic. When a rollback occurred, there wasn’t much to do about it; Kafka messages can’t be retracted once published, since topics are append-only. Consumers would receive unreal messages from that alternate timeline we talked about.

Depending on your application, the dimension-hopping messages might be harmless, or they might cause a panic. If you’d rather your Kafka topics contain only real, persisted, replicated database changes, get ready for some good news.

Details for the disoriented

Enough buildup. Let’s talk about what’s in the beta, and how rollback mitigation works.

When rollback mitigation is enabled (which it is by default), the connector will automatically check against Couchbase Server to find out which changes have actually been persisted to disk. The connector will buffer changes until persistence is observed across all replicas. At that point it’s very unlikely the changes will be rolled back (though it’s still possible in some scenarios involving multiple failures). Then, and only then, does the connector publish messages to the Kafka topic.

Naturally, there is some latency and a little bit of overhead with this scheme. If you’d like to revert to the old behavior of publishing changes immediately, rollback mitigation can easily be disabled; see the docs for details.

Go with the flow

Flow control is a technique for preventing producers from outpacing consumers. In our case, the Kafka connector sends acknowledgement messages to the DCP server to report how much data has been processed. The server pauses the stream when the amount of unacknowledged data exceeds a threshold referred to as the “flow control buffer size.”

Because rollback mitigation involves buffering a potentially large number of messages, it’s important for the flow control buffer size to be tunable.

Previous versions of the connector used a hard-coded buffer size of 20 megabytes. The new default is 128 megabytes, which allows for more aggressive buffering. Feel free to tweak this value to see what works best for your workload. If you want to crank it really high, you can allocate more memory to the connector using the KAFKA_HEAP_OPTS environment variable.

All good streams must come to an EOF

We hope you’ll give the 3.4 beta with rollback mitigation a try. Grab it from the link at the top of the Quickstart Guide. Please post your impressions on the Couchbase Forum and we’ll do our best to incorporate your feedback into the final release. Thanks, and see you next time!


Posted by David Nault

David Nault writes code at Couchbase, where he works on the SDK & Connectors team.

Leave a reply