We have implemented a client application that tracks how many clients are connected at the moment. Let's add the functionality to track the actual users that are currently connected, by storing a user name token for each connected user.
We'll nominate a key to keep a list of all of the user numbers that are currently connected, and then when a user disconnects, remove their entry from the key. Right away, though, this presents a challenge. How can we guarantee that two users starting at exactly the same time don't mess up the value by overwriting each other's edits? Let's find out.
The API contains a method called append. It
takes a special value called a CAS, which stands for Compare And
Swap, but where do we get this number? There is another method
called gets, which returns an object that can be asked for the CAS
value we need to perform an append operation. Another interesting
thing about the append method is that it returns a
Future<Boolean>, which means it is an
asynchronous method. You can use the value it returns to wait for
an answer indicating whether the operation succeeded or failed.
Asynchronous methods also allow the code to do other things
without having to wait for the result. At a later point in the
code, the result of the operation can be obtained by using the
future variable.
You will be using the append method in this tutorial, but the prepend method functions in exactly the same way except that append adds a string to the end of a value in the database, while prepend puts a string at the front of a value.
Both the append and prepend methods operate atomically, meaning they will perform the operation on a value in the database and finish each operation before moving on to the next. You will not get interleaved appends and prepends. Of course, the absolute order of these operations is not guaranteed.
The lines that are in bold-face should be changed in the register method of the code:
private String getUserNameToken() { return String.format("<User-%d>", userId); } private boolean register() throws Exception { userId = client.incr("UserId", 1, 1); System.out.println("You are user " + userId + "."); CASValue<Object> casValue = client.gets("CurrentUsers"); if (casValue == null) { System.out.println("First user ever!"); client.set("CurrentUsers", Integer.MAX_VALUE, getUserNameToken()).get(); } else { Future<Boolean> appendDone = client.append(casValue.getCas(), "CurrentUsers", getUserNameToken()); if (appendDone.get()) { System.out.println("Registration succeeded."); } else { System.out.println("Sorry registration failed."); return false; } } userCount = client.incr("UserCount", 1, 1); System.out.println("There are currently " + userCount + " connected."); return true; }
First you can see that the
client.gets("CurrentUsers") method is called to
get a casValue. If that value is null, it means
that no one has ever connected, so that user is the first user. So
we will simply set the value of CurrentUsers
using the new getUserNameToken() method.
Otherwise, we will append our userid to the list of users. To do
this, we need to call the append method with the CAS that is in
the casValue by calling its
getCas() method. The append method is also
asynchronous and returns a
Future<Boolean>. Calling the
get() method on that future will return its
value when its operation has been performed. The append operation
can possibly fail, if for instance the size of the list of
usernames exceeds the maximum size of a value in the membase
bucket. So we handle both cases. If it returns true, we tell the
user that the registration was a success. Otherwise the
registration failed, so we'll tell the user this and return false
to tell the main program that something went wrong.
You need to modify the run() method as well, to
handle the possibility of the register method returning false:
connect(serverAddress); if (register()) { unregister(); } client.shutdown(1, TimeUnit.MINUTES);
Now, we need to clean up the user list when a user leaves. We will
be modifying the unregister method to be very careful to remove
the current userId from the CurrentUsers list before finishing.
The Java client library has a very elegant API that provides a way
to ensure that the operation is retried if another client has beat
us to making their change to the same database value. This API
uses a CASMutation<T> and
CASMutator<T> to perform the modification
safely. These classes require a
Transcoder<T> so that everything can be
nice and type safe. Add the following code just before the
unregister() method:
class StringTranscoder implements Transcoder<String> { final SerializingTranscoder delegate = new SerializingTranscoder(); public boolean asyncDecode(CachedData d) { return delegate.asyncDecode(d); } public String decode(CachedData d) { return (String)delegate.decode(d); } public CachedData encode(String o) { return delegate.encode(o); } public int getMaxSize() { return delegate.getMaxSize(); } }
Then, add the following bold lines to the
unregister() method:
private void unregister() throws Exception { CASMutation<String> mutation = new CASMutation<String>() { public String getNewValue(String current) { return current.replaceAll(getUserNameToken(), ""); } }; Transcoder<String> transcoder = new StringTranscoder(); CASMutator<String> mutator = new CASMutator<String>(client, transcoder); mutator.cas("CurrentUsers", "", 0, mutation); client.decr("UserCount", 1); System.out.println("Unregistered."); }
The mutation anonymous class instance defines what to change the
string to, once it is obtained from the database. The mutator
instance starts the cas() method that that uses
the mutation. What happens is that the operation will be retried
until it succeeds. If two clients are writing to the same database
value at the same time, one of the clients will need to retry
because its CAS value will not match the database after the
operation. We will discuss the CAS error handling more later on.
All you need to know at the moment is that the mutator will
eventually succeed in storing its value to the database, and that
value will be consistent with what was written by other clients,
which is very convenient.
It is now time to complete the functionality of the tutorial application by writing a thread that will output the messages that users type as well as a method of getting input from the users.
First add the following member variable to the class:
private Thread messageThread;
Next, modify the run() method again to add the
following lines in bold:
if (register()) { startMessageThread(); processInput(); unregister(); messageThread.interrupt(); }
Now we will need to write a few helper methods, the first is:
private void printMessages(long startId, long endId) { for (long i = startId; i <= endId; i++) { String message = (String)client.get("Message:" + i); if (message != null) System.out.println(message); } }
This method just iterates through a set of message numbers and prints the message to the screen. Membase does not allow iteration of keys, but that's okay, we know exactly what pattern the key names follow, so we can do this.
The second method helps to find the oldest message that hasn't expired in the database, starting at the last message and running back toward the first message. Eventually it will find the first message and will return its number, considering that it will have run one past the end, it needs to do a little fix-up to return the correct number:
private long findFirstMessage(long currentId) { CASValue<Object> cas = null; long firstId = currentId; do { firstId -= 1; cas = client.gets("Message:" + firstId); } while (cas != null); return firstId + 1; }
Finally we come to the method that prints out all of the messages as they come in. It's somewhat complicated so we'll describe it in detail afterward.
private void startMessageThread() { messageThread = new Thread(new Runnable() { public void run() { long messageId = -1; try { while (!Thread.interrupted()) { CASValue<Object> msgCountCas = client.gets("Messages"); if (msgCountCas == null) { Thread.sleep(250); continue; } long current = Long.parseLong((String)msgCountCas.getValue()); if (messageId == -1) { printMessages(findFirstMessage(current), current); } else if (current > messageId) { printMessages(messageId + 1, current); } else { Thread.sleep(250); continue; } messageId = current; } } catch (InterruptedException ex) { } catch (RuntimeException ex) { } System.out.println("Stopped message thread."); } }); messageThread.start(); }
This code creates a new thread of execution and assigns it to the
messageThread variable. It creates an anonymous
Runnable class that implements a
run() method inline.
The messageId variable is set to a sentinel
value so that we know when it is the first time through the while
loop. The while loop will iterate until the thread has been
interrupted.
First, in the while loop, we write client.gets("Messages"), which
will return null if the value does not exist (in which case the
loop sleeps for a little while and continues back to the top). Or
the method will return a CASValue<Object>
instance that we can use to obtain the current message id.
If this is the first time through the loop (messageId ==
-1), we need to print out all of the messages from the
first to the current message.
Otherwise if the current messageId is bigger
than what we've previously seen, it means that some new messages
have come in since we last checked, so we will print them out.
Finally, nothing has changed since we last checked so just sleep some more.
At the end of the loop, we just make sure that the current message id is remembered for the next iteration of the loop. Exceptions are handled by suppressing them, and if the while loop exits we'll print a message saying the message thread stopped.
At the end of the method, the thread is actually started.
Great, so, now if messages come in, we'll display them. Also, when we first start the application, all of the messages stored in the database will be displayed. We need to implement the actual method that allows the user to interact with the database:
private static void processInput() { boolean quit = false; System.out.println("Enter text, or /who to see user list, or /quit to exit."); do { String input = System.console().readLine(); if (input.startsWith("/quit")) { quit = true; } else if (input.startsWith("/who")) { System.out.println("Users connected: " + client.get("CurrentUsers")); } else { // Send a new message to the chat long messageId = client.incr("Messages", 1, 1); client.set("Message:" + messageId, 3600, getUserNameToken() + ": " + input); } } while (!quit); }
The method keeps track of a quit variable to know when to exit the do/while loop, then prints out some simple instructions for the user.
The console is read one line at a time, and each is checked to see if it starts with a command. If the user has typed '/quit' the quit flag will be set, and the loop will exit.
If the user has typed '/who' the value of the
CurrentUsers variable will be output to the
screen, so that at any time a user can check who is currently
online.
Otherwise, the line is treated as a message. Here we increment the
Messages key and use that value as a message id. Then the
client.set() method is called with a key of
Message:MessageId with a timeout of one hour, followed by the
user's name and the text that they entered.
These changes to the database will be noticed by the message thread, and output to the screen. Of course this means that each user will see his or her messages repeated back to them.
If you compile and run the program in multiple terminal windows, you can talk to yourself. This is about as fun as things can get, isn't it? Notice how intelligent you can be.