Skip to end of metadata
Go to start of metadata
You are viewing an old version of this page. View the current version. Compare with Current  |   View Page History
The recommendations here are under development and may change before implementation.

Overview

The intent of the OBSERVE operation, as implemented by Couchbase client libraries, is to give developers the ability to observe the status of a key with respect to a specific change.  For example, a developer may want to somehow mutate a document behind the key identified by the string "foo".  Given that a Couchbase cluster is distributed and concurrent and acknowledges the request before the given document, there are situations where the application developer may want to observe whether the specific mutation they made is either:

  1. safely replicated to one or more nodes within the cluster
  2. persisted to the master, or persisted to one or more nodes within the cluster
  3. has been taken into consideration in any indexes

Client API

Java

The first approach is a synchronous (possibly the simplest from a developer viewpoint) approach
This allows specifying requirements across the cluster (2 persists and
2 replicas).

public enum PersistTo {
    ZERO, ONE, TWO, THREE, FOUR
}


public enum ReplicateTo {
    ZERO, ONE, TWO, THREE
}

try {
  client.set("a", 0 "foo", PersistTo.4);  // this blocks until 4 copies persisted to disk
  client.set("b", 0, "foo", ReplicateTo.2); // this blocks until 2 replicas are complete
  client.add("c", 0, "foo", PersistTo.2, ReplicateTo.2); // this blocks until 2 copies persisted 
                                                     // to disk and 2 replicas complete on cluster
  client.add("d", 0, "foo", PersistTo.TWO, ReplicateTo.ONE); // throws InvalidArgumentException
} catch (ObservedException ex) {
// Warn if observe requirements are more stringent than bucket settings
} catch (ObservedTimeoutException ex) {
// do something
} catch (ObservedModifiedException ex) {
// do something
}

Note that we likely will implement the shorthand as seen in the .NET client where one may do something like ReplicateTo.MASTER_SLAVE.

It's possible to take an asynchronous approach as well, but, the api usage becomes a bit
more involved.

The Java client library, and it's dependencies, will require changes to return the CAS value through the OperationStatus object.

The basic idea is that given the OperationStatus object (or optionally, multiple OperationStatus objects), an Observer can report on whether or not the operation has been replicated, persisted, or processed for indexing.

OperationFuture<Boolean> setResult = cbc.set("foo", 0, "bar");

if (setResult.get()) {
  Observer watchit = new Observer(setResult.getStatus());
else {
  // do something because the set failed
}

// now that the observer knows what to observe...
ObserveBoolean observeSuccess = watchit.blockForReplication(1); // this will block
if (!observeSuccess) {
  // handle whatever we want to do for this change
  switch (observeSuccess.getStatus()) {
    case OBS_TIMEDOUT:
      // do something for the timeout case
      break;
    case OBS_MODIFIED:
      // key was either modified by someone else or failover occurred
      break;
  }
}

This approach would mean not having to overload any/all of the mutation operations.

an alternate approach would be

ObserveSet os = new ObserveSet(4, 3); // max paranoia i.e. 4 persists and 3 replicas

os.addOp(c.set("a", 0 "foo"));
os.addOp(c.set("b", 0, "foo"));
os.addOp(c.add("c", 0, "foo"));

try {
    os.blockTillComplete();
} catch (ObservedTimeoutException ex) {
// do something
} catch (ObservedModifiedException ex) {
// do something
}

With this more complex approach, it's possible to do multiple operations asynchronously. However, the downside of this
approach is that it would not be possible to use multiple connections
(on different buckets) with different replication factors which makes it confusing.

It should be possible to workaround that as follows

ObserveSet os1 = new ObserveSet(c1, 4, 3); // max paranoia i.e. 4 persists and 3 replicas
ObserveSet os2 = new ObserveSet(c2, MAX_FOR_BUCKET); // would depend on replicas for bucket

os1.addOp(c1.set("a", 0 "foo"));
os1.addOp(c1.set("b", 0, "foo"));
os2.addOp(c2.add("c", 0, "foo"));
try {
    os1.blockTillComplete();
} catch (ObservedTimeoutException ex) {
// do something
} catch (ObservedModifiedException ex) {
// do something
}

.NET

The existing .NET Client Library mutate methods will be overloaded to include two new options. The first option will be for specifying the number of replicas to which a key is written before the operation is considered a success. The second option will be to specify how whether a write must be persisted to be considered successful.

//result.Success would be true only if the key successfully replicates to at least 2 nodes.
var result = 
client.ExecuteStore(StoreMode.Set, "foo", "{ \"message\" : \"bar\" }", ReplicateTo.2);
Assert.That(result.Success, Is.True);

//The key has been modified
var result = client.ExecuteStore(StoreMode.Set, "foo", "{ \"message\" : \"bar\" }", ReplicateTo.2);
client.ExecuteStore(StoreMode.Set, "foo", "{ \"message\" : \"not bar\" }"); //someone changes it
Assert.That(result.Success, Is.False);
Assert.That(result.Message, Is.StringMatching("Modified"));

//result.Success would be true only if the key is successfully persisted to the master and at least 1 slave.
var result = client.ExecuteStore(StoreMode.Set, "foo", "{ \"message\" : \"bar\" }", PersistTo.MasterSlave);
Assert.That(result.Success, Is.True);

//combining the two durability checks
var result = client.ExecuteStore(StoreMode.Set, "foo", "{ \"message\" : \"bar\" }", PersistTo.Master, ReplicateTo.2);

In the examples above, the ReplicateTo and PersistTo are enums with values from 1 to 4.

The original mutate operations defined in the MemcachedClient returned only Boolean values. Because of the need to report a 'modified' result, only the ExecuteXXX mutate methods will be modified.

PHP

Ruby

The observe method will return Result object describing state of the key for each node containing it in the cluster. This is low-level operation and user is free to use it to query state of the arbitrary keys.

conn = Couchbase.bucket
stats = conn.observe("foo")
# for cluster with 1-replica the stats contents will look like:
#  [
#     #<Couchbase::Result:0x0000000182d718 error=0x0 key="foo" status=:persisted cas=4640963567427715072 from_master=false time_to_persist=0 time_to_replicate=0>,
#     #<Couchbase::Result:0x0000000182d588 error=0x0 key="foo" status=:persisted cas=4640963567427715072 from_master=true time_to_persist=0 time_to_replicate=0>
#  ]

The operation supports multiple keys (and will try do distribute them in packets efficiently)

conn = Couchbase.bucket
stats = conn.observe(["foo", "bar"]) # it splats the arguments, so brackets could be omitted
# again the command executed against 1-replica cluster
#  {
#    "foo"=>
#      [
#        #<Couchbase::Result:0x0000000184de50 error=0x0 key="foo" status=:persisted cas=4640963567427715072 from_master=true time_to_persist=57 time_to_replicate=0>,
#        #<Couchbase::Result:0x0000000184dc98 error=0x0 key="foo" status=:persisted cas=4640963567427715072 from_master=false time_to_persist=99 time_to_replicate=0>
#      ],
#    "bar"=>
#      [
#        #<Couchbase::Result:0x0000000184dd60 error=0x0 key="bar" status=:persisted cas=13536425208630804480 from_master=false time_to_persist=57 time_to_replicate=0>,
#        #<Couchbase::Result:0x0000000184dbd0 error=0x0 key="bar" status=:persisted cas=13536425208630804480 from_master=true time_to_persist=99 time_to_replicate=0>
#      ]
#  }

The more high level API is available for all mutators (i.e. set/add/replace/append/prepend). User could specify :observe option with combination of the following keys:

  • :persisted — the number of nodes desired to persist the key
  • :replicated — the number of nodes desired to replicate the key (optionally persisted)
  • :timeout — the timeout for observe.
conn.set("foo", "bar", :observe => {:persisted => 2, :timeout => 5}) 

To minimize network IO, there is observe_and_wait operation which will block the execution until the condition for given key set will be satisfied or timeout occurred.

conn.set("foo", "bar")
conn.append("baz", "bar")
conn.observe_and_wait("foo", "baz", :persisted => 2, :timeout => 5)

The same kind of API is valid for asynchronous mode too.

C

For C client (libcouchbase) it will be better to implement only low-level operation because of its asynchronous nature. This means the there should be libcouchbase_observe() function accepting the key and returning its status on all replicas into the corresponding callback.

Recommended Implementation

These recommendations are preliminary, and have not been reviewed.

Given that the item being mutated may be changed at any time by another actor in a deployment, the key idea here is for the client to use the CAS value returned from the mutation operation.  Since the client library knows at all times (though, asynchronously) which node is responsible for the vbucket for a given key, and knows which nodes are slaves for that key, the client library can use the OBSERVE command as a method of determining what has happened with a key on this given node.

Implementation Approach

If, for example, application code wanted to check for "foo" with CAS value 12345, it would use OBSERVE command against whichever nodes it needs to in order to report the status. This would be done in a loop, with some reasonable backoff (possibly guided by recommended polling times from server stats) until a reasonable or user specified timeout.

Looping steps would consist of...

First the client would check to ensure that "foo" 12345 is still the current value on the master for this vbucket via OBSERVE. If it is not the current value, it simply returns saying the item is modified.

If it is still the current value, the next step for the client depends whether the application code is simply checking for persistence or index processing. This can be evaluated from the OBSERVE response. If the application code is checking for replication or whether or not the modification has been persisted on multiple nodes, it can then proceed to check any slaves, as identified by the cluster configuration, for status of persistence or replication of "foo" with CAS 12345. If successful, it returns that response to the application code. If it is not successful, then it waits an interval and loops again, blocking the application code until status is determined or a timeout value has been reached.

Thus the possible set of return values would be either OBS_SUCCESS, OBS_MODIFIED, or OBS_TIMEDOUT.

Note that the status OBS_MODIFIED does not indicate monotonic forward mutation. For example, in one scenario a failover may have occurred and the item key "foo" being observed may have been reverted to a previous state. This state may even be some value prior to the initial fetch before the application code mutated the value of the document.

Implementation Constraints

OBSERVE is a binary protocol only operation. It could be implemented in ASCII, but that would currently be complicated by the fact that mutation operations do not return the new CAS value in ASCII protocol. Since Couchbase Server uses binary protocol exclusively, we do not implement that currently.

Expected use Cases

Ensure Indexable

The current implementation of Couchbase Server does not consider items in stale=false view queries until items are persisted. The combination of an application making changes, ensuring they're persisted and then issuing a stale=false query will give the app an ability to consistently access the view with respect to any changes that application made.

Further information on this use case is to be documented elsewhere.

Ensure Durability

Through client library specific APIs, applications may wish to verify that the change recently made was durable. For example, in some cases a user application may wish to verify a data item has been made on the master and at least one replica. Another example may be that a user application may be extremely paranoid that a particular data update not be lost. In that case, it would want to ensure that item has been replicated to at least 3 servers and persisted to at least 4 servers (the maximum durability supported by Couchbase Server).

Protocol Level Details

Request

  Byte/     0       |       1       |       2       |       3       |
     /              |               |               |               |
    |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
    +---------------+---------------+---------------+---------------+
   0| 0x80          | 0x92          | 0x00          | 0x00          |
    +---------------+---------------+---------------+---------------+
   4| 0x00          | 0x00          | 0x00          | 0x00          |
    +---------------+---------------+---------------+---------------+
   8| 0x00          | 0x00          | 0x00          | 0x14          |
    +---------------+---------------+---------------+---------------+
  12| 0xde          | 0xad          | 0xbe          | 0xef          |
    +---------------+---------------+---------------+---------------+
  16| 0x00          | 0x00          | 0x00          | 0x00          |
    +---------------+---------------+---------------+---------------+
  20| 0x00          | 0x00          | 0x00          | 0x00          |
    +---------------+---------------+---------------+---------------+
  24| 0x00          | 0x04          | 0x00          | 0x05          |
    +---------------+---------------+---------------+---------------+
  28| 0x68 ('h')    | 0x65 ('e')    | 0x6c ('l')    | 0x6c ('l')    |
    +---------------+---------------+---------------+---------------+
  32| 0x6f ('o')    | 0x00          | 0x05          | 0x00          |
    +---------------+---------------+---------------+---------------+
  36| 0x05          | 0x77 ('w')    | 0x6f ('o')    | 0x72 ('r')    |
    +---------------+---------------+---------------+---------------+
  40| 0x6c ('l')    | 0x64 ('d')    |
    +---------------+---------------+
observe command
Field        (offset) (value)
Magic        (0)    : 0x80
Opcode       (1)    : 0x92
Key length   (2,3)  : 0x0000
Extra length (4)    : 0x00
Data type    (5)    : 0x00
Vbucket      (6,7)  : 0x0000
Total body   (8-11) : 0x00000014
Opaque       (12-15): 0xdeadbeef
CAS          (16-23): 0x0000000000000000
Key #0
    vbucket  (24-25): 0x0004
    keylen   (26-27): 0x0005
             (28-32): "hello"
Key #1
    vbucket  (33-34): 0x0005
    keylen   (35-36): 0x0005
             (37-41): "world"

Response

  Byte/     0       |       1       |       2       |       3       |
     /              |               |               |               |
    |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
    +---------------+---------------+---------------+---------------+
   0| 0x81          | 0x92          | 0x00          | 0x00          |
    +---------------+---------------+---------------+---------------+
   4| 0x00          | 0x00          | 0x00          | 0x00          |
    +---------------+---------------+---------------+---------------+
   8| 0x00          | 0x00          | 0x00          | 0x26          |
    +---------------+---------------+---------------+---------------+
  12| 0xde          | 0xad          | 0xbe          | 0xef          |
    +---------------+---------------+---------------+---------------+
  16| 0x00          | 0x00          | 0x03          | 0xe8          |
    +---------------+---------------+---------------+---------------+
  20| 0x00          | 0x00          | 0x00          | 0x64          |
    +---------------+---------------+---------------+---------------+
  24| 0x00          | 0x04          | 0x00          | 0x05          |
    +---------------+---------------+---------------+---------------+
  28| 0x68 ('h')    | 0x65 ('e')    | 0x6c ('l')    | 0x6c ('l')    |
    +---------------+---------------+---------------+---------------+
  32| 0x6f ('o')    | 0x01          | 0x00          | 0x00          |
    +---------------+---------------+---------------+---------------+
  36| 0x00          | 0x00          | 0x00          | 0x00          |
    +---------------+---------------+---------------+---------------+
  40| 0x00          | 0x0a          | 0x00          | 0x05          |
    +---------------+---------------+---------------+---------------+
  44| 0x00          | 0x05          | 0x77 ('w')    | 0x6f ('o')    |
    +---------------+---------------+---------------+---------------+
  48| 0x72 ('r')    | 0x6c ('l')    | 0x64 ('d')    | 0x00          |
    +---------------+---------------+---------------+---------------+
  52| 0xde          | 0xad          | 0xbe          | 0xef          |
    +---------------+---------------+---------------+---------------+
  56| 0xde          | 0xad          | 0xca          | 0xfe          |
    +---------------+---------------+---------------+---------------+
observe response
Field        (offset) (value)
Magic        (0)    : 0x81
Opcode       (1)    : 0x92
Key length   (2,3)  : 0x0000
Extra length (4)    : 0x00
Data type    (5)    : 0x00
Status       (6,7)  : 0x0000
Total body   (8-11) : 0x00000026
Opaque       (12-15): 0xdeadbeef
Persist Stat (16-19): 0x000003e8 (msec time)
Repl Stat    (20-23): 0x00000064 (msec time)
Key #0
    vbucket  (24-25): 0x0004
    keylen   (26-27): 0x0005
             (28-32): "hello"
    keystate (33)   : 0x01 (persisted)
    cas      (34-41): 000000000000000a
Key #1
    vbucket  (42-43): 0x0005
    keylen   (44-45): 0x0005
             (46-50): "world"
    keystate (51)   : 0x00 (not persisted)
    cas      (52-59): deadbeefdeadcafe

In the response, the keystate field is intended to be a bitmask for various states. For example:
0x80 = not found
0x00 = found, not persisted
0x01 = found, persisted
0x81 = logically deleted

This is primarily because replication can potentially be significantly behind the initial create and we do not want to receive a NOT_FOUND error for the entire set of items being observed in that state. The intent of this being a bitmask is that future extension could allow for this field to indicate if the item is considered in any indexes, etc. Error masks start from the top of the field.

It is important for client developers to understand the difference between not found and logically deleted. If a client does a set then not found and logically deleted mean mean the same thing from the clients perspective. For a set the first thing the client needs to know is whether or not an item has made it to the appropriate servers and in this case both not found and logically deleted mean that this hasn't happened yet. In the case of a delete not found and logically deleted mean two different things. If you are returned not found for a delete then you can assume that the delete has been persisted on that node, but if you are returned logically deleted then it means that the delete has made it to the server, but has not yet been persisted.

Also, in the response above, the field normally used for CAS has been separated into two fields which are to provide statistics on the average amount of time for certain events to propagate from a given server. The first is the amount of time, in milliseconds, it's taking on average to persist items. The second is the amount of time, in milliseconds, it's taking to replicate items. If not implemented by a given server version, each of these fields would be padded with 0. These fields allow a client library to optionally be a bit more intelligent, and thus efficient, about its polling intervals. The examples above show 1000 milliseconds to persistence and 100 milliseconds to replication. Note that this is a server's best guess on how long things take on average for any given operation, it is not related to the specific key being observed.

Notes

The client build a request packet once and put there all keys it interested in. After that it sends to all servers owning vbuckets (master and replicas). The servers should send back response with only keys he knows about.

Implementation Questions

q: Is it better to return the values OBS_SUCCESS, OBS_MODIFIED, and OBS_TIMEDOUT instead of true/false and treating the timeout as an exceptional condition? Since OBS_MODIFIED and OBS_TIMEDOUT may effectively require the same error handling, it may be easier to switch on these or return some extended boolean with status.

a: ?

q: Is a C implementation of OBSERVE needed?

Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.