libcouchbase event loop

Hey guys,

I've been struggling to get my simple C++ wrapper around libcouchbase to work consistently. I'm using libcouchbase-2.3.1_x86_vc11 against a "2.5.1 enterprise edition (build-1083)" server. At the bottom of this post is my example code built with VC11. It just wraps libcouchbase using the default IOPS and calls run_event_loop from a std::thread. Get's and Store's can then be scheduled with callbacks to get the results.

However, when running the example either I get an access violation somewhere inside libcouchbase (usually at different places) or after about 50 loops the callbacks stop getting called altogether but the application keeps running and continually eats up more and more memory. (I assume the read/write buffers are going as more Get's and Store's are scheduled)

Because of the access violations I assume that some sort of race condition is occurring. Am I supposed to be using some kind of locking\sync mechanism that I'm unaware of?

Is it safe for me to start the event loop this way (see bottom of CouchbaseCache constructor)?

Any help will be greatly appreciated.

The below code listing can be copied pasted directly and should work provided you have the libcouchase include and lib paths set correctly.

#include <chrono>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <thread>
 
#include "libcouchbase\couchbase.h"
 
class CouchbaseCache
{
public:
  /// Creates a connection to a couchbase cluster.
  /// \param host  The ip and port of a host in the cluster. eg "w.x.y.z:8080".
  /// \param bucket  The bucket which will be used to store and retrieve documents.
  /// \param username  The username required to login to the couchbase cluster.
  /// \param password  The password required to login to the couchbase cluster.
  /// \param errorHandler  A handler that will be called for errors not related
  ///                                         to get or store events.
  CouchbaseCache(const std::string& host, const std::string& bucket,
                                  const std::string& username, const std::string& password,
                                  const std::function<void(const std::string&)>& errorHandler);
  ~CouchbaseCache();
 
  enum class StoreMode
  {
    Add = 1, /// Add the item to the cache, but fail if the object already exists.
    Replace = 2, /// Replace the existing object in the cache.
    Set = 3, /// Unconditionally set the object in the cache.
  };
  /// Stores the specified data in couchbase at the specified key.
  /// Use the StoreMode to to set the storage conditions.
  void Store(const std::string& key, const std::string& value, StoreMode,
                     const std::function<void()>& successHandler,
                     const std::function<void(const std::string& errorMessage)>& failureHandler);
 
  /// Returns the value for the specified key if one exists.
  void Get(const std::string& key,
                   const std::function<void(const std::string& value)>& successHandler,
                   const std::function<void(const std::string& errorMessage)>& failureHandler);
 
private:
  std::function<void(const std::string&)> m_errorHandler;
  std::unique_ptr<lcb_io_opt_st, std::function<void(lcb_io_opt_st*)>> m_io;
  std::unique_ptr<lcb_create_st> m_connectionOptions;
  std::unique_ptr<lcb_st, std::function<void(lcb_st*)>> m_connection;
  std::thread m_eventThread;
};
 
struct StoreHandlers
{
  std::function<void()> successHandler;
  std::function<void(const std::string& errorMessage)> failureHandler;
};
 
struct GetHandlers
{
  std::function<void(const std::string& value)> successHandler;
  std::function<void(const std::string& errorMessage)> failureHandler;
};
 
std::string CreateErrorMessage(lcb_st* connection, const char* errorPrefix, lcb_error_t error)
{
  std::ostringstream message;
  message << errorPrefix << ": (" << error << ") " << lcb_strerror(connection, error);
  return message.str();
}
 
 
void ErrorCallback(lcb_t connection, lcb_error_t error, const char* errorInfo)
{
  auto cookie = lcb_get_cookie(connection);
  if (!cookie)
  {
    return;
  }
  auto errorHandler = static_cast<const std::function<void(const std::string&)>*>(cookie);
  if (!errorHandler)
  {
    return;
  }
 
  std::ostringstream errorMessage;
  errorMessage << "General failure (" << error << "):";
  if (errorInfo)
  {
    errorMessage << " " << errorInfo;
  }
  (*errorHandler)(errorMessage.str());
}
 
void StoreCallback(lcb_t connection, const void *cookie,
  lcb_storage_t operation, lcb_error_t error, const lcb_store_resp_t *resp)
{
  std::unique_ptr<StoreHandlers> storeHandlers(static_cast<StoreHandlers*>(const_cast<void*>(cookie)));
  if (error == LCB_SUCCESS && storeHandlers->successHandler)
  {
    storeHandlers->successHandler();
  }
  else if (storeHandlers->failureHandler)
  {
    storeHandlers->failureHandler(CreateErrorMessage(connection, "Store failure", error));
  }
}
 
void GetCallback(lcb_t connection, const void *cookie, lcb_error_t error, const lcb_get_resp_t *item)
{
  std::unique_ptr<GetHandlers> getHandlers(static_cast<GetHandlers*>(const_cast<void*>(cookie)));
  if (error == LCB_SUCCESS && getHandlers->successHandler)
  {
    std::string returnVal(static_cast<const char*>(item->v.v0.bytes), item->v.v0.nbytes);
    getHandlers->successHandler(returnVal);
  }
  else if (getHandlers->failureHandler)
  {
    getHandlers->failureHandler(CreateErrorMessage(connection, "Get failure", error));
  }
}
 
lcb_io_opt_st* CreateIo()
{
  lcb_io_opt_st* io;
  auto error = lcb_create_io_ops(&io, nullptr);
  if (error != LCB_SUCCESS)
  {
    throw std::runtime_error(CreateErrorMessage(nullptr, "Failed to create couchbase IO backend", error));
  }
 
  return io;
}
 
lcb_st* CreateConnection(lcb_create_st* connectionOptions)
{
  lcb_st* connection;
  auto error = lcb_create(&connection, connectionOptions);
  if (error != LCB_SUCCESS)
  {
    throw std::runtime_error(CreateErrorMessage(nullptr, "Failed to create couchbase connection object", error));
  }
 
  return connection;
}
 
CouchbaseCache::CouchbaseCache(const std::string& ipPort, const std::string& bucket,
                               const std::string& username, const std::string& password,
                               const std::function<void(const std::string&)>& errorHandler)
: m_errorHandler(errorHandler),
  m_io(CreateIo(), [](lcb_io_opt_st* io) { if (io) lcb_destroy_io_ops(io); }),
  m_connectionOptions(new lcb_create_st),
  m_connection(nullptr, [](lcb_st* connection) { if (connection) lcb_destroy(connection); })
{
  m_connectionOptions->version = 0;
  m_connectionOptions->v.v0.host = ipPort.c_str();
  m_connectionOptions->v.v0.bucket = bucket.c_str();
  m_connectionOptions->v.v0.user = username.c_str();
  m_connectionOptions->v.v0.passwd = password.c_str();
  m_connectionOptions->v.v0.io = m_io.get(); // Safe because m_io always gets deleted last.
 
  m_connection.reset(CreateConnection(m_connectionOptions.get()));
 
  lcb_set_cookie(m_connection.get(), &m_errorHandler);
  lcb_set_error_callback(m_connection.get(), ErrorCallback);
  lcb_set_store_callback(m_connection.get(), StoreCallback);
  lcb_set_get_callback(m_connection.get(), GetCallback);
 
  auto error = lcb_connect(m_connection.get());
  if (error != LCB_SUCCESS)
  {
    throw std::runtime_error(CreateErrorMessage(m_connection.get(), "Failed to schedule connection to couchbase", error));
  }
 
  error = lcb_wait(m_connection.get()); // Wait until connection is complete before leaving the constructor.
  if (error != LCB_SUCCESS)
  {
    throw std::runtime_error(CreateErrorMessage(m_connection.get(), "Failed to connect to couchbase", error));
  }
 
  m_eventThread = std::thread([=]()
  {
    m_io->v.v0.run_event_loop(m_io.get());
  });
}
 
CouchbaseCache::~CouchbaseCache()
{
  m_io->v.v0.stop_event_loop(m_io.get());
  m_eventThread.join();
  lcb_wait(m_connection.get()); // Process any events the loop didn't get to.
}
 
void CouchbaseCache::Store(const std::string& key, const std::string& value, CouchbaseCache::StoreMode mode,
                                                     const std::function<void()>& successHandler,
                                                     const std::function<void(const std::string& errorMessage)>& failureHandler)
{
  lcb_store_cmd_t cmd;
  const lcb_store_cmd_t *cmds[] = {&cmd};
  memset(&cmd, 0, sizeof(cmd));
  cmd.version = 0;
  cmd.v.v0.key = key.c_str();
  cmd.v.v0.nkey = key.size();
  cmd.v.v0.bytes = value.c_str();
  cmd.v.v0.nbytes = value.size();
  cmd.v.v0.operation = static_cast<lcb_storage_t>(mode);
 
  std::unique_ptr<StoreHandlers> storeHandlers(new StoreHandlers);
  storeHandlers->successHandler = successHandler;
  storeHandlers->failureHandler = failureHandler;
 
  auto error = lcb_store(m_connection.get(), storeHandlers.release(), 1, cmds);
  if (error != LCB_SUCCESS)
  {
    std::ostringstream errorMessage;
    errorMessage << "Failed to schedule store operation for key " << key;
    failureHandler(CreateErrorMessage(m_connection.get(), errorMessage.str().c_str(), error));
  }
}
 
void CouchbaseCache::Get(const std::string& key,
                                                  const std::function<void(const std::string& value)>& successHandler,
                                                  const std::function<void(const std::string& errorMessage)>& failureHandler)
{
  lcb_get_cmd_t cmd;
  const lcb_get_cmd_t *cmds[] = {&cmd};
  cmd.version = 0;
  cmd.v.v0.key = key.c_str();
  cmd.v.v0.nkey = key.size();
 
  std::unique_ptr<GetHandlers> getHandlers(new GetHandlers);
  getHandlers->successHandler = successHandler;
  getHandlers->failureHandler = failureHandler;
 
  auto error = lcb_get(m_connection.get(), getHandlers.release(), 1, cmds);
  if (error != LCB_SUCCESS)
  {
    std::ostringstream errorMessage;
    errorMessage << "Failed to schedule get operation for key " << key;
    failureHandler(CreateErrorMessage(m_connection.get(), errorMessage.str().c_str(), error));
  }
}
 
 
int main()
{
  try
  {
    CouchbaseCache cache("http://dercachetier01:8091", "default", "default", "",
      [](const std::string& errorMessage)
    {
      auto x = errorMessage;
    });
 
    int loopCount = 0;
    std::string key = "Test1";
    std::string value = "SomeData";
    while (true)
    {
      cache.Store(key, value, CouchbaseCache::StoreMode::Set,
        []()
      {
        std::cout << "Store success" << std::endl;
      },
        [](const std::string& errorMessage)
      {
        std::cout << "Store failure " << errorMessage << std::endl;
      });
 
      loopCount++;
      std::cout << loopCount << std::endl;
      std::this_thread::sleep_for(std::chrono::milliseconds(200));
 
      cache.Get(key,
        [](const std::string& val)
      {
        std::cout << "Get success" << std::endl;
      },
        [](const std::string& errorMessage)
      {
        std::cout << "Get failure " << errorMessage << std::endl;
      });
    }
  }
  catch (const std::exception& e)
  {
    std::cout << e.what() << std::endl;
  }
}

1 Answer

« Back to question.

Thank you for your question.

Unfortunately running the event loop from a different thread without actually locking all the functions needed to access it are unsupported. As the event loop processes events it will modify their state - however since the actual lcb_wait/lcb_get (etc.) calls are performed in a different thread you are likely to run into various conditions where the "operations thread" and the "event thread" attempt to modify the same structures at the same time.

To do this properly you would need to add a lock and provide synchronization around _each_ event loop entry point.

Additionally, the library itself will call stop_event_loop, so if you want to have an 'asynchronous infinite' loop you would also want to wrap these so that these calls become a noop.

Here is a little sample project I was working on some time ago which attempts to run an event loop in its own thread, and expose the operations as 'Future' objects. This project _mostly_ works, but I would not use this in production nor would I recommend you deploy such a solution in production. The I/O interfaces are going to change with the next release and we hope to eventually support such use cases (i.e. the threaded event loop) out of the box. The link is here: https://github.com/couchbaselabs/lcbmt

Hi,

Thanks for the response. I will eventually need to use this code in production so I look forward to your new release. For now I have a moved all calls to libcouchbase into a separate thread and am sending commands to it via a thread safe queue.

Thanks again for taking the time to have a look.