Search:

Search all manuals
Search this manual
Manual
Couchbase Client Library: Java 1.0
Community Wiki and Resources
Wiki: Java Client Library
Download Client Library
JavaDoc
Java Client Library
SDK Forum
Additional Resources
Community Wiki
Community Forums
Couchbase SDKs
Parent Section
2 Tutorial
Chapter Sections
Chapters

2.7. Prepend and Append Operations

We have implemented a client application that tracks how many clients are connected at the moment. Let's add the functionality to track the user numbers that are currently connected.

We'll nominate a key to keep a list of all of the user numbers that are currently connected, and then when a user disconnects, remove their entry from the key. Right away, though, this presents a challenge. How can we guarantee that two users starting at exactly the same time don't mess up the value by overwriting each other's edits? Let's find out.

The API contains a method called append. It takes a special value called a CAS, which stands for Check and Set, but where do we get this number? There is another method called gets which returns an object that can be asked for the CAS value we need to perform an append operation. Another interesting thing about the append method is that returns a Future<Boolean> which means it is an asynchronous method. You can use the value it returns to wait for an answer indicating whether the operation succeeded or failed. Asynchronous methods also allow the code to do other things without having to wait for the result. At a later point in the code, the result of the operation can be obtained by using the future variable.

You will be using the append method in this tutorial, but the prepend method functions in exactly the same way except that append adds a string to the end of a value in the cache, and prepend puts a string at the front of a value in the cache.

Both the append and prepend methods operate atomically, meaning they will perform the operation on a value in the cache and finish each operation before moving on to the next. You will not get interleaved appends and prepends. Of course, the absolute order of these operations is not guaranteed.

The lines that are in bold-face should be changed in the register method of the code.

private static String getUserNameToken() {
     return String.format("<User-%d>", userId);
}

private static boolean register() {

    userId = client.incr("UserId", 1, 1);
    System.out.println("You are user " + userId + ".");

    CASValue<Object> casValue = client.gets("CurrentUsers");

    if (casValue == null) {

        System.out.println("First user ever!");

        try {
            client.set("CurrentUsers", Integer.MAX_VALUE,
                    getUserNameToken()).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    } else {

        Future<Boolean> appendDone = client.append(casValue.getCas(),
                "CurrentUsers", getUserNameToken());

        try {

            if (appendDone.get()) {
                System.out.println("Registration succeeded.");
            } else {
                System.out.println("Sorry registration failed.");
                return false;
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        } catch (ExecutionException e) {
            e.printStackTrace();
            return false;
        }

    }

    userCount = client.incr("UserCount", 1, 1);
    System.out.println("There are currently " + userCount
            + " connected.");

    return true;
}

First you can see that the client.gets("CurrentUsers") method is called to get a casValue. If that value is null, it means that no one has ever connected, so that user is the first user. So we will simply set the value of CurrentUsers using the new getUserNameToken() method.

Otherwise, we will append our userid to the list of users. To do this, we need to call the append method with the CAS that is in the casValue by calling its getCas() method. The append method is also asynchronous and returns a Future<Boolean>. Calling the get() method on that future will return its value when its operation has been performed. The append operation can possibly fail, if for instance the size of the list of usernames exceeds the maximum size of a value in the cache. So we handle both cases. If it returns true, we tell the user that the registration was a success. Otherwise the registration failed, so we'll tell the user this and return false to tell the main program that something went wrong.

You need to modify the main method as well, to handle the possibility of the register method returning false.

try {
     connect(serverAddress);

     if (register()) {
         unregister();
     }

     client.shutdown(1, TimeUnit.MINUTES);

} catch (IOException e) {
    e.printStackTrace();
}

Now, we need to implement the cleanup of the user list when a user leaves. We will be modifying the unregister method to be very careful to remove the current userId from the CurrentUsers list before finishing. This is a potentially dangerous operation for a distributed cache since two or more users may try to exit the application at the same time and may try to replace the user list overwriting the previous changes. We will use a trick that effectively forces a distributed critical section.

private static void unregister() {

    try {
        // Wait for add to succeed. It will only
        // succeed if the value is not in the cache.
        while (!client.add("lock:CurrentUsers", 10, "1").get()) {
            System.out.println("Waiting for the lock...");
            Thread.sleep(500);
        }

        try {
            String oldValue = (String)client.get("CurrentUsers");
            String userNameToken = getUserNameToken();
            String newValue = oldValue.replaceAll(userNameToken, "");
            client.set("CurrentUsers", Integer.MAX_VALUE, newValue);
        } finally {
            client.delete("lock:CurrentUsers");
        }

    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

    client.decr("UserCount", 1);
    System.out.println("Unregistered.");

}

Here we use the fact that the client.add() method will succeed if and only if a value does not exist for the given key to provide a way for only one application to be able to edit the CurrentUsers at a time. We will call this lock:CurrentUsers and it will expire in ten seconds. If we are not able to add, the code will sleep for 500 milliseconds and try again.

The expiry time as defined in the protocol is documented as follows in the JavaDocs for the API:

Note

The actual value sent may either be Unix time (number of seconds since January 1, 1970, as a 32-bit value), or a number of seconds starting from current time. In the latter case, this number of seconds may not exceed 60*60*24*30 (number of seconds in 30 days); if the number sent by a client is larger than that, the server will consider it to be real Unix time value rather than an offset from current time.

Once the add succeeds, a try/finally block is entered that actually gets the value of CurrentUsers and edits it, replacing the current user token with the empty string. Then it sets it back. In the finally block, you can see that the lock is deleted using the client.delete() method. This will remove the key from Couchbase and allow any other clients that are waiting to unregister to continue into the critical section one at a time.

It is now time to complete the functionality of the tutorial application by writing a thread that will output the messages that users type as well as a method of getting input from the users.

First add the following member variable to the class:

private static Thread messageThread;

Next, modify the main method again to add the following lines in bold:

if (register()) {
    startMessageThread();
    processInput();
    unregister();
    messageThread.interrupt();
}

Now we will need to write a few helper methods, the first is:

private static void printMessages(long startId, long endId) {

    for (long i = startId; i <= endId; i++) {
        String message = (String)client.get("Message:" + i);
        if (message != null)
            System.out.println(message);
    }

}

This method just iterates through a set of message numbers and prints the message to the screen. Couchbase does not allow iteration of keys, but that's alright, we know exactly what pattern the key names follow, so we can do this.

The second method helps to find the oldest message that hasn't expired in the cache, starting at the last message and running back toward the first message. Eventually it will find the first message and will return its number, considering that it will have run one past the end, it needs to do a little fix-up to return the correct number.

private static long findFirstMessage(long currentId) {
    CASValue<Object> cas = null;
    long firstId = currentId;
    do {
        firstId -= 1;
        cas = client.gets("Message:" + firstId);
    } while (cas != null);

    return firstId + 1;
}

Finally we come to the method that prints out all of the messages as they come in. It's somewhat complicated so I'll describe it in detail afterward.

private static void startMessageThread() {

    messageThread = new Thread(new Runnable() {
        public void run() {

            long messageId = -1;

            try {
                while (!Thread.interrupted()) {

                    CASValue<Object> msgCountCas = client.gets("Messages");

                    if (msgCountCas == null) {
                        Thread.sleep(250);
                        continue;
                    }

                    long current = Long.parseLong((String)msgCountCas.getValue());

                    if (messageId == -1) {
                        printMessages(findFirstMessage(current),
                                current);
                    } else if (current > messageId) {
                        printMessages(messageId + 1, current);
                    } else {
                        Thread.sleep(250);
                        continue;
                    }

                    messageId = current;

                }

            } catch (InterruptedException ex) {
            } catch (RuntimeException ex) {
            }

            System.out.println("Stopped message thread.");
        }
    });

    messageThread.start();

}

This code creates a new thread of execution and assigns it to the messageThread variable. It creates an anonymous Runnable class that implements a run() method inline.

The messageId variable is set to a sentinel value so that we know when it is the first time through the while loop. The while loop will iterate until the thread has been interrupted.

First, in the while loop, we write client.gets("Messages") which will return null if the value does not exist (in which case the loop sleeps for a little while and continues back to the top), or the method will return a CASValue<Object> instance that we can use to obtain the current message id.

If this is the first time through the loop (messageId == -1), we need to print out all of the messages since the first to the current.

Otherwise if the current messageId is bigger than what we've previously seen, it means that some new messages have come in since we last checked, so we will print them out.

Finally, nothing has changed since we last checked so just sleep some more.

At the end of the loop, we just make sure that the current message id is remembered for the next iteration of the loop. Exceptions are handled by suppressing them, and if the while loop exits we'll print a message saying the message thread stopped.

At the end of the method, the thread is actually started.

Great, so, now if messages come in, we'll display them. Also, when we first start the application, all of the messages stored in the cache will be displayed. We need to implement the actual method that allows the user to interact with the cache.

private static void processInput() {
    boolean quit = false;

    System.out.println("Enter text, or /who to see user list, or /quit to exit.");

    do {
        String input = System.console().readLine();
        if (input.startsWith("/quit")) {
            quit = true;
        } else if (input.startsWith("/who")) {
            System.out.println("Users connected: "
                    + client.get("CurrentUsers"));
        } else {
            // Send a new message to the chat
            long messageId = client.incr("Messages", 1, 1);
            client.set("Message:" + messageId, 3600,
                    getUserNameToken() + ": " + input);
        }

    } while (!quit);

}

The method keeps track of a quit variable to know when to exit the do/while loop, then prints out some simple instructions for the user.

The console is read one line at a time, and each is checked to see if it starts with a command. If the user has typed '/quit' the quit flag will be set, and the loop will exit.

If the user has typed '/who' the CurrentUsers cached value will be output to the screen, so that at any time a user can check who is currently online.

Otherwise, the line is treated as a message. Here we increment the Messages key and use that value as a message id. Then the client.set() method is called with a key of Message:MessageId with a timeout of one hour, followed by the user's name and the text that they entered.

These changes to the cache will be noticed by the message thread, and output to the screen. Of course this means that each user will see his or her messages repeated back to them.

If you compile and run the program in multiple terminal windows, you can talk to yourself. This is about as fun as things can get, isn't it? Notice how intelligent you can be.