Search:

Search all manuals
Search this manual
Manual
Couchbase Developer's Guide 2.0
Community Wiki and Resources
Download Couchbase Server 2.0
Couchbase Server 2.0 Manual
Client Libraries
Couchbase Server Forum
Additional Resources
Community Wiki
Community Forums
Couchbase SDKs
Parent Section
7.4 Providing Transactional Logic
Chapter Sections
Chapters

7.4.2. Performing Two-Phase Commits

For traditional relational databases, we can store information for an object in one or more tables. This helps us from having a lot of duplicate information in a table. In the case of a document database, such as Couchbase Server, we can store the high level information in a JSON document and store related information in a separate JSON documents.

This leads to the challenge of transactions in document-based databases. In relational databases, you are able to change both the blog post and the comments in a single transaction. You can undo all the changes from the transaction via rollback, ensure you have a consistent version of the data during the transaction, or in case of system failure during the transaction, leave the data in a state that is easier to repair.

The Ruby and PHP examples we describe here plus two slightly more complex versions are available on Github:

With Couchbase Server, you can generally provide something functional analogous to an atomic transaction by performing a two-phase commit. You follow this approach:

Figure 7.3. Couchbase SDK Two-Phase Commit

Couchbase SDK Two-Phase Commit

Here is the same approach demonstrated in actual code using the Couchbase Ruby SDK. To view the complete code, as well as a slightly more complex version, see sample two-phase transaction and transfer(). First we start by storing the documents/objects that we want to update. The example below shows how to create the new Couchbase client, and then store two players and their points:

require 'rubygems'
require 'couchbase'

cb = Couchbase.bucket

karen = {"name" => "karen", "points" => 500, "transactions" => []}
dipti = {"name" => "dipti", "points" => 700, "transactions" => []}

# preload initial documents

cb.set("karen", karen)
cb.set("dipti", dipti)

We then create a third record that represents the transaction between the two objects:

# STEP 1: prepare transaction document

trans = {"source" => "karen", "destination" => "dipti", "amount" => 100, "state" => "initial"}
cb.set("trans:1", trans)

Then we set the transfer state to pending, which indicates the transfer between karen and dipti is in progress. Notice in this case we do this in a begin..rescue block so that we can perform a rollback in the rescue in case of server/system failure.

Next in our begin..rescue block we refer the two documents we want to update to the actual transfer document. We then update the amounts in the documents and change the transfer status to committed:

begin

    # STEP 2: Switch transfer into pending state
    
    cb.cas("trans:1") do
    trans.update("state" => "pending")
    end

    # STEP 3 + 4: Apply transfer to both documents
    
    cb.cas("karen") do |val|
        val.update("points" => val["points"] - 100,
        "transactions" => val["transactions"] + ["trans:1"])
    end
  
    cb.cas("dipti") do |val|
        val.update("points" => val["points"] + 100,
        "transactions" => val["transactions"] + ["trans:1"])
    end
    
    # STEP 4: Switch transfer document into committed state
    
    cb.cas("trans:1") do |val|
        val.update("state" => "committed")
    end

In this case we have combined both steps 3 and 4 into three cas operations: one operation per document. In other words, we update the documents to refer to the transfer, and we also update their points. Depending on your programming languages, it may be easier to combine these two, or keep them separate updates.

For this last step in the begin..rescue block we change remove the two references from the player documents and update the transfer to be done.

# STEP 5: Remove transfer from the documents
    
    cb.cas("karen") do |val|
        val.update("transactions" => val["transactions"] - ["trans:1"])
    end
  
    cb.cas("dipti") do |val|
        val.update("transactions" => val["transactions"] - ["trans:1"])
    end

    # STEP 5: Switch transfer into done state
  
    cb.cas("trans:1") do |val|
        val.update("state" => "done")
    end

To perform the rollback, we had placed all of our update operations in a begin..rescue..end block. If there are any failures during the begin block, we will execute the rescue part of the block. In order to undo the transfer when it is left in a particular state, we have a case statement to test whether the transfer failed at a pending, commit, or done status:

rescue Couchbase::Error::Base => ex

    # Rollback transaction
    
    trans = cb.get("trans:1")
    
    case trans["state"]
        
        when "committed"
        
            # Create new transaction and swap the targets or amount sign.
            # The code block about could be wrapped in the method something like
            #
            #     def transfer(source, destination, amount)
            #     ...
            #   end
            #
            # So that this handler could just re-use it.
        
        when "pending"
            # STEP 1: Switch transaction into cancelling state
            
            cb.cas("trans:1") do |val|
                val.update("state" => "cancelling")
            end

            # STEP 2: Revert changes if they were applied
            
            cb.cas("karen") do |val|
                break unless val["transactions"].include?("trans:1")
                val.update("points" => val["points"] + 100,
                "transactions" => val["transactions"] - ["trans:1"])
            end
             
            cb.cas("dipti") do |val|
                break unless val["transactions"].include?("trans:1")
                val.update("points" => val["points"] - 100,
                 "transactions" => val["transactions"] - ["trans:1"])
            end

            # STEP 3: Switch transaction into cancelled state
            
            cb.cas("trans:1") do |val|
                val.update("state" => "cancelled")
            end
            
      end

    # Re-raise original exception
    raise ex
    
end

As the comments in the code note, it may be most useful to put the entire transfer, including the rollback into a new transfer method. As a method, it could include a counter, and also take parameters to represent the documents updated in a transfer. This variation also uses a cas value with update to rollback the transfer; this is to avoid the unintended risk of rewriting the entire transfer document. To see the complete sample code provided above, as well as a Ruby variation which includes the code as a transfer() method, see sample two-phase transaction and transfer().

This next illustration shows you the diagram we initially introduced to you at the start of this section. but this we update it to show when system failures may occur and the rollback scenario you may want to provide. Depending on the programming language that you use, how you implement the rollbacks may vary slightly:

Figure 7.4. Couchbase SDK Rollback for Transactions

Couchbase SDK Rollback for Transactions

The next example demonstrates a transaction using the PHP SDK; as in the Ruby example provided above, we follow the same process of creating a separate transfer document to track the state of our changes. To see the example we illustrate above, as well as the alternate class, see Two-Phase PHP Couchbase Commit and Advanced Two-Phase PHP Couchbase Commit

In this case we provide the functionality within a single exception class which manages the commits as well as the possible rollback cases based on errors. First we establish some base elements before we actually set any documents

Here we create our Transaction class which will throw an error if any issues arise as we try to perform our transaction. We then provide a public method, transfer() which we can use to retrieve the documents and decode the JSON. We can provide parameters to this method that specify the document from which we remove points, also known as the source document, and the document to which we add points, also known as the destination document. We can also provide the client instance and the amount of the transaction as parameters. We will use the client instance as our connection to the server. Within the transfer() function we try to create and store the new document which represents the actual transfer:

<?php

class TransactionException extends RuntimeException {}

function transfer($source, $destination, $amount, &$cb) {
      $get = function($key, $casOnly = false) use (&$cb) {
          $return = null;
          $cb->getDelayed(array($key), true, function($cb, $data) use(&$return, $casOnly) {
              $return = $casOnly ? $data['cas'] : array(json_decode($data['value'], true), $data['cas']);
		          });
		      return $return;
      };

      if($cb->get('transaction:counter') === null) {
          $cb->set('transaction:counter', 0);
      }
	    
	    $id = $cb->increment('transaction:counter', 1);

	    $state = 'initial';
	    $transKey = "transaction:$id";
	     
	    $transDoc = compact('source', 'destination', 'amount', 'state');
	    $cb->set($transKey, json_encode($transDoc));
	    $transactionCas = $get($transKey, true);

		  if(!$transactionCas) {
		        throw new TransactionException("Could not insert transaction document");
	    }

The first thing we do is try to retrieve any existing, named document transaction:counter and if it does not exist, create a new one with the default counter of 0. We then increment the id for our transfer and set the state and key. Finally we perform the SDK store operation set() to save the document as JSON to Couchbase Server. In the transfer() function, we use a try..catch block to try to update the transfer to a pending state and throw an exception if we cannot update the state:

In the try block we try to retrieve the stored documents and apply the attributes from the documents provided as parameters. We also provide a reference to the new transfer document in the source and destination documents as we described in our illustration.

We perform a check and set operations to update the source and destination documents in the try block; if either attempts fail and return false, we raise an exception. We then update the transfer document in Couchbase Server to indicate the commit state is now committed:

try {
		        $transDoc['state'] = 'pending';
		        if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
			           throw new TransactionException("Could not switch to pending state");
		        }

		        list($sourceDoc, $sourceCas) = $get($source);
		        list($destDoc, $destCas) = $get($destination);

		        $sourceDoc['points'] -= $amount;
		        $sourceDoc['transactions'] += array($transKey);
		        $destDoc['points'] += $amount;
		        $destDoc['transactions'] += array($transKey);

		        if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) {
			             throw new TransactionException("Could not update source document");
		        }

		        if(!$cb->cas($destCas, $destination, json_encode($destDoc))) {
			             throw new TransactionException("Could not update destination document");
		        }
            $transDoc['state'] = 'committed';
		        $transactionCas = $get($transKey, true);
		      
		        if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
			            throw new TransactionException("Could not switch to committed state");
		        }

Again in the try block we throw an exception if we fail to update the transfer state. We then remove the reference to the transfer for the source and destination documents. At the end of our try we update the transfer document so that it is marked as 'done':

list($sourceDoc, $sourceCas) = $get($source);
		        list($destDoc, $destCas) = $get($destination);

		        $sourceDoc['transactions'] = array_diff($sourceDoc['transactions'], array($transKey));
		        $destDoc['transactions'] = array_diff($destDoc['transactions'], array($transKey));

  		      if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) {
			           throw new TransactionException("Could not remove transaction from source document");
		        }

  		      if(!$cb->cas($destCas, $destination, json_encode($destDoc))) {
			           throw new TransactionException("Could not remove transaction from destination document");
	   	      }

            $transDoc['state'] = 'done';
		        $transactionCas = $get($transKey, true);
		        if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
			            throw new TransactionException("Could not switch to done state");
		        }

We can now handle any system failures in our transfer() function with exception handling code which looks at the state of our two-phase commit:

} catch(Exception $e) {
		
		      // Rollback transaction
		    
		      list($transDoc, $transCas) = $get($transKey);

		      switch($transDoc['state']) {
			       
			         case 'committed':
				            // create new transaction and swap the targets
				            transfer($destination, $source, $amount, $cb);
				        break;
			         
			         case 'pending':
			         
				          // STEP 1: switch transaction into cancelling state
				          
				          $transDoc['state'] = 'cancelling';
				          $transactionCas = $get($transKey, true);
				          
				          if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
					           throw new TransactionException("Could not switch into cancelling state");
				          }

				          // STEP 2: revert changes if applied
				      
				          list($sourceDoc, $sourceCas) = $get($source);
				          list($destDoc, $destCas) = $get($destination);

				          if(in_array($transKey, $sourceDoc['transactions'])) {
					             $sourceDoc['points'] += $amount;
					             $sourceDoc['transactions'] = array_diff($sourceDoc['transactions'], array($transKey));
					             if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) {
						              throw new TransactionException("Could not revert source document");
					             }
				           }

				          if(in_array($transKey, $destDoc['transactions'])) {
					             $destDoc['points'] -= $amount;
					             $destDoc['transactions'] = array_diff($destDoc['transactions'], array($transKey));
					             if(!$cb->cas($destCas, $destination, json_encode($destDoc))) {
						                throw new TransactionException("Could not revert destination document");
					             }
				          }

				          // STEP 3: switch transaction into cancelled state
				      
				          $transDoc['state'] = 'cancelled';
				          $transactionCas = $get($transKey, true);
				          if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
					             throw new TransactionException("Could not switch into cancelled state");
				          }

				      break;
		      }

		    // Rethrow the original exception
		    throw new Exception("Transaction failed, rollback executed", null, $e);
		    
	   }
	   
}

If the transfer is in a indeterminate state, such as 'pending' or 'committed' but not 'done', we flag the document as in the process of being cancelled and then revert the values for the stored documents into their original states. To revert the documents, we use the transfer() method again, but this time we invert the parameters and provide the destination as the source of points and source as the destination of points. This will take away the amount from the destination and revert them back to the source. This final sample code illustrates our new class and transfer() method in action:

$cb = new Couchbase('localhost:8091');

$cb->set('karen', json_encode(array(
	       'name' => 'karen',
	       'points' => 500,
	       'transactions' => array()
        )));

$cb->set('dipti', json_encode(array(
	        'name' => 'dipti',
	        'points' => 700,
	        'transactions' => array()
       )));

transfer('karen', 'dipti', 100, $cb);

?>

There is also another variation for handling transactions with the Couchbase PHP SDK that relies on helper functions to create the document objects, and to provide the additional option to create a document if it does not exist in Couchbase Server. The sample is slightly more complex, but handles cases where the documents do not already exist in Couchbase Server, and cases where the documents provided as parameters are only partial values to be added to the stored documents. To see the example we illustrate above, as well as the alternate class, see Two-Phase PHP Couchbase Commit and Advanced Two-Phase PHP Couchbase Commit