We have implemented a client application that tracks how many clients are connected at the moment. Let's add the functionality to track the user numbers that are currently connected.
We'll nominate a key to keep a list of all of the user numbers that are currently connected, and then when a user disconnects, remove their entry from the key. Right away, though, this presents a challenge. How can we guarantee that two users starting at exactly the same time don't mess up the value by overwriting each other's edits? Let's find out.
The API contains a method called append. It
takes a special value called a CAS, which stands for Check and
Set, but where do we get this number? There is another method
called gets which returns an object that can be asked for the CAS
value we need to perform an append operation. Another interesting
thing about the append method is that returns a
Future<Boolean> which means it is an
asynchronous method. You can use the value it returns to wait for
an answer indicating whether the operation succeeded or failed.
Asynchronous methods also allow the code to do other things
without having to wait for the result. At a later point in the
code, the result of the operation can be obtained by using the
future variable.
You will be using the append method in this
tutorial, but the prepend method functions in
exactly the same way except that append adds a string to the end
of a value in the cache, and prepend puts a
string at the front of a value in the cache.
Both the append and prepend
methods operate atomically, meaning they will perform the
operation on a value in the cache and finish each operation before
moving on to the next. You will not get interleaved appends and
prepends. Of course, the absolute order of these operations is not
guaranteed.
The lines that are in bold-face should be changed in the register method of the code.
private static String getUserNameToken() { return String.format("<User-%d>", userId); } private static boolean register() { userId = client.incr("UserId", 1, 1); System.out.println("You are user " + userId + "."); CASValue<Object> casValue = client.gets("CurrentUsers"); if (casValue == null) { System.out.println("First user ever!"); try { client.set("CurrentUsers", Integer.MAX_VALUE, getUserNameToken()).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } else { Future<Boolean> appendDone = client.append(casValue.getCas(), "CurrentUsers", getUserNameToken()); try { if (appendDone.get()) { System.out.println("Registration succeeded."); } else { System.out.println("Sorry registration failed."); return false; } } catch (InterruptedException e) { e.printStackTrace(); return false; } catch (ExecutionException e) { e.printStackTrace(); return false; } } userCount = client.incr("UserCount", 1, 1); System.out.println("There are currently " + userCount + " connected."); return true; }
First you can see that the
client.gets("CurrentUsers") method is called to
get a casValue. If that value is null, it means
that no one has ever connected, so that user is the first user. So
we will simply set the value of CurrentUsers
using the new getUserNameToken() method.
Otherwise, we will append our userid to the list of users. To do
this, we need to call the append method with the CAS that is in
the casValue by calling its
getCas() method. The append method is also
asynchronous and returns a
Future<Boolean>. Calling the
get() method on that future will return its
value when its operation has been performed. The append operation
can possibly fail, if for instance the size of the list of
usernames exceeds the maximum size of a value in the cache. So we
handle both cases. If it returns true, we tell the user that the
registration was a success. Otherwise the registration failed, so
we'll tell the user this and return false to tell the main program
that something went wrong.
You need to modify the main method as well, to
handle the possibility of the register method
returning false.
try { connect(serverAddress); if (register()) { unregister(); } client.shutdown(1, TimeUnit.MINUTES); } catch (IOException e) { e.printStackTrace(); }
Now, we need to implement the cleanup of the user list when a user
leaves. We will be modifying the unregister
method to be very careful to remove the current userId from the
CurrentUsers list before finishing. This is a potentially
dangerous operation for a distributed cache since two or more
users may try to exit the application at the same time and may try
to replace the user list overwriting the previous changes. We will
use a trick that effectively forces a distributed critical
section.
private static void unregister() { try { // Wait for add to succeed. It will only // succeed if the value is not in the cache. while (!client.add("lock:CurrentUsers", 10, "1").get()) { System.out.println("Waiting for the lock..."); Thread.sleep(500); } try { String oldValue = (String)client.get("CurrentUsers"); String userNameToken = getUserNameToken(); String newValue = oldValue.replaceAll(userNameToken, ""); client.set("CurrentUsers", Integer.MAX_VALUE, newValue); } finally { client.delete("lock:CurrentUsers"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } client.decr("UserCount", 1); System.out.println("Unregistered."); }
Here we use the fact that the client.add()
method will succeed if and only if a value does not exist for the
given key to provide a way for only one application to be able to
edit the CurrentUsers at a time. We will call this
lock:CurrentUsers and it will expire in ten
seconds. If we are not able to add, the code will sleep for 500
milliseconds and try again.
The expiry time as defined in the protocol is documented as follows in the JavaDocs for the API:
The actual value sent may either be Unix time (number of seconds since January 1, 1970, as a 32-bit value), or a number of seconds starting from current time. In the latter case, this number of seconds may not exceed 60*60*24*30 (number of seconds in 30 days); if the number sent by a client is larger than that, the server will consider it to be real Unix time value rather than an offset from current time.
Once the add succeeds, a try/finally block is entered that
actually gets the value of CurrentUsers and edits it, replacing
the current user token with the empty string. Then it sets it
back. In the finally block, you can see that the lock is deleted
using the client.delete() method. This will
remove the key from Couchbase and allow any other clients that are
waiting to unregister to continue into the critical section one at
a time.
It is now time to complete the functionality of the tutorial application by writing a thread that will output the messages that users type as well as a method of getting input from the users.
First add the following member variable to the class:
private static Thread messageThread;
Next, modify the main method again to add the
following lines in bold:
if (register()) { startMessageThread(); processInput(); unregister(); messageThread.interrupt(); }
Now we will need to write a few helper methods, the first is:
private static void printMessages(long startId, long endId) { for (long i = startId; i <= endId; i++) { String message = (String)client.get("Message:" + i); if (message != null) System.out.println(message); } }
This method just iterates through a set of message numbers and prints the message to the screen. Couchbase does not allow iteration of keys, but that's alright, we know exactly what pattern the key names follow, so we can do this.
The second method helps to find the oldest message that hasn't expired in the cache, starting at the last message and running back toward the first message. Eventually it will find the first message and will return its number, considering that it will have run one past the end, it needs to do a little fix-up to return the correct number.
private static long findFirstMessage(long currentId) { CASValue<Object> cas = null; long firstId = currentId; do { firstId -= 1; cas = client.gets("Message:" + firstId); } while (cas != null); return firstId + 1; }
Finally we come to the method that prints out all of the messages as they come in. It's somewhat complicated so I'll describe it in detail afterward.
private static void startMessageThread() { messageThread = new Thread(new Runnable() { public void run() { long messageId = -1; try { while (!Thread.interrupted()) { CASValue<Object> msgCountCas = client.gets("Messages"); if (msgCountCas == null) { Thread.sleep(250); continue; } long current = Long.parseLong((String)msgCountCas.getValue()); if (messageId == -1) { printMessages(findFirstMessage(current), current); } else if (current > messageId) { printMessages(messageId + 1, current); } else { Thread.sleep(250); continue; } messageId = current; } } catch (InterruptedException ex) { } catch (RuntimeException ex) { } System.out.println("Stopped message thread."); } }); messageThread.start(); }
This code creates a new thread of execution and assigns it to the
messageThread variable. It creates an anonymous
Runnable class that implements a
run() method inline.
The messageId variable is set to a sentinel
value so that we know when it is the first time through the while
loop. The while loop will iterate until the thread has been
interrupted.
First, in the while loop, we write
client.gets("Messages") which will return null
if the value does not exist (in which case the loop sleeps for a
little while and continues back to the top), or the method will
return a CASValue<Object> instance that
we can use to obtain the current message id.
If this is the first time through the loop (messageId ==
-1), we need to print out all of the messages since the
first to the current.
Otherwise if the current messageId is bigger than what we've previously seen, it means that some new messages have come in since we last checked, so we will print them out.
Finally, nothing has changed since we last checked so just sleep some more.
At the end of the loop, we just make sure that the current message id is remembered for the next iteration of the loop. Exceptions are handled by suppressing them, and if the while loop exits we'll print a message saying the message thread stopped.
At the end of the method, the thread is actually started.
Great, so, now if messages come in, we'll display them. Also, when we first start the application, all of the messages stored in the cache will be displayed. We need to implement the actual method that allows the user to interact with the cache.
private static void processInput() { boolean quit = false; System.out.println("Enter text, or /who to see user list, or /quit to exit."); do { String input = System.console().readLine(); if (input.startsWith("/quit")) { quit = true; } else if (input.startsWith("/who")) { System.out.println("Users connected: " + client.get("CurrentUsers")); } else { // Send a new message to the chat long messageId = client.incr("Messages", 1, 1); client.set("Message:" + messageId, 3600, getUserNameToken() + ": " + input); } } while (!quit); }
The method keeps track of a quit variable to know when to exit the do/while loop, then prints out some simple instructions for the user.
The console is read one line at a time, and each is checked to see if it starts with a command. If the user has typed '/quit' the quit flag will be set, and the loop will exit.
If the user has typed '/who' the CurrentUsers cached value will be output to the screen, so that at any time a user can check who is currently online.
Otherwise, the line is treated as a message. Here we increment the
Messages key and use that value as a message id. Then the
client.set() method is called with a key of
Message:MessageId with a timeout of one hour,
followed by the user's name and the text that they entered.
These changes to the cache will be noticed by the message thread, and output to the screen. Of course this means that each user will see his or her messages repeated back to them.
If you compile and run the program in multiple terminal windows, you can talk to yourself. This is about as fun as things can get, isn't it? Notice how intelligent you can be.