Trond Norbye's Weblog

« I'm back.. | Main | Free webinars on... »

http://blogs.sun.com/trond/date/20080524 Saturday May 24, 2008

Memcached and customized storage engines

Most forks of memcached is because people would like another storage-engine than the default memory-based engine in Memcached. For the last two weeks I have been working with Toru Maesaka on designing an API to allow users to plug in their storage engine into Memcached. The current version of the API consist of just 14 functions, so it should be an easy task to implement your own storage engine. The storage engine API is defined in engine.h.

With a draft of the engine specification in place, I started to refactor the source code so that the memory based backend use this API. I finished a prototype a couple of days ago, and pushed it to my git repository at http://github.com/trondn/memcached/commits/binary. As you can see there are still some loose ends, and I would like to clean up the implementation of this engine a bit. In this prototype I have just moved the code out of the internal server without trying to be smart ;-)

When I talked about the work at lunch the other day, the guy sitting in the office next to me got exited and wanted a demo. He is working in the PostgreSQL team, so he asked me how difficult it would be to create a small backend that stored the items in a database. I guess others may be interesting in how to create their own backend, so I decided to "document" it as a blog-post.

To create your own storage engine, you must create a shared library that exports the following function:

ENGINE_HANDLE* create_instance(int version, ENGINE_ERROR_CODE* error);
        

The memcached server will call this function to ask your library to create an instance of the storage engine. When the core server calls the function in the API, this handle is passed as the first argument. That makes the handle a perfect place to store extra engine-specific data. The little example-engine we created use the following struct:

struct pg_engine {
   /**
    * The handle defined in the API
    */
   ENGINE_HANDLE engine;

   /**
    * Is the engine initalized or not
    */
   bool initialized;

   /**
    * A single lock is used for accessing the cache ;-)
    */
   pthread_mutex_t cache_lock;

   /**
    * Connection to the postgres database
    */
   PGconn *psql;
};
        

The implementation of create_instance looks like:

ENGINE_HANDLE* create_instance(int version, ENGINE_ERROR_CODE* error) {
   struct pg_engine *handle;
   if (version != 1) {
      if (error != NULL) {
         *error = ENGINE_ENOTSUP;
      }
      return 0;
   }

   if ((handle = calloc(1, sizeof(*handle))) == NULL) {
      if (error != NULL) {
         *error = ENGINE_ENOMEM;
      }
      return 0;
   }

   handle->engine.interface_level = version;
   handle->initialized = false;
   handle->engine.get_info = pg_get_info;
   handle->engine.initialize = pg_initialize;
   handle->engine.destroy = pg_destroy;
   handle->engine.item_size_ok = pg_item_size_ok;
   handle->engine.item_allocate = pg_item_allocate;
   handle->engine.item_delete = pg_item_delete;
   handle->engine.item_release = pg_item_release;
   handle->engine.get = pg_get;
   handle->engine.get_not_deleted = pg_get_not_deleted;
   handle->engine.get_stats = pg_get_stats;
   handle->engine.store = pg_store;
   handle->engine.arithmetic = pg_arithmetic;
   handle->engine.flush = pg_flush;
   handle->engine.update_lru_time = pg_update_lru_time;

   return &handle->engine;
}            
        

If we look at the structure and the create_instance function, there are some small interesting details. First of all you see that the pg_engine structure contains all of the variables needed, so that we don't use global variables. Secondly we verify that we support the version that the memcached server requests.

When the memcached server have created the instance, it will try to initialize the engine by calling initialize. This is where the engine should initialize it's internal datastructures. For our server, we initilize the mutex and creates the databse connection:

static inline struct pg_engine* get_handle(struct engine_handle* handle) {
   /*
    * We can cast the pointer to a pg_handle because the engine_handle
    * is the first member in the pg_engine struct.
    */
   return (struct pg_engine*)handle;
}

static ENGINE_ERROR_CODE pg_initialize(struct engine_handle* handle,
                                       const char* config_str) {
   struct pg_engine* engine = get_handle(handle);

   if (engine->initialized) {
      return ENGINE_EINVAL;
   }

   if (pthread_mutex_init(&engine->cache_lock, NULL) != 0) {
      return ENGINE_EINVAL;
   }

   engine->psql = PQconnectdb("hostaddr='127.0.0.1' port='' dbname='postgres' user='postgres' password='' connect_timeout='10'");
   if (engine->psql == NULL || PQstatus(engine->psql) != CONNECTION_OK) {
      pthread_mutex_destroy(&engine->cache_lock);
      return ENGINE_EINVAL;
   }
   
   engine->initialized = true;
   return ENGINE_SUCCESS;
}            
        

In our little engine we hardcoded the database connection information, but as you see from the prototype memcached will provide a configuration string you could use.

Ok. Now the server is ready to accept clients. The first functions we would like to implement are probably those needed in order to support insertion of data into the server. The first function we must implement is the item_allocate. The server will call this function to allocate memory where it can spool the data from the client. Since we don't try to be smart and implement a fast server, we just use calloc to allocate a memory chunk. Our implementation looks like:

static item* pg_item_allocate(struct engine_handle* handle, const void* key,
                              const size_t nkey, const int flags,
                              const rel_time_t exptime,
                              const int nbytes) {
   item *it;
   char suffix[40];
   size_t nsuffix = snprintf(suffix, sizeof(suffix),
                             " %d %d\r\n", flags, nbytes - 2);
   size_t ntotal = nsuffix + sizeof(item) + nkey + 1 + nbytes;
      
   if ((it = calloc(1, ntotal)) == NULL) {
      return NULL;
   }
   
   it->it_flags = 0;
   it->nkey = nkey;
   it->nbytes = nbytes;
   memcpy(ITEM_key(it), key, nkey);
   it->exptime = exptime;
   memcpy(ITEM_suffix(it), suffix, nsuffix);
   it->nsuffix = nsuffix;

   return it;
}            
        

When the server is done with an item, it will call item_release to notify the backend that it may release the allocated resources. In our little example this function just releases the memory:

static void pg_item_release(struct engine_handle* handle, item* item) {
    free(item);
}            
        

When we have received all of the data from the client, the memcached server will try to store the item in the engine by calling store. This function is used for all types of store-acces (add, set, replace, append and prepend). In our little server, we only support add, set and replace. Add is implemented with a INSERT SQL statement while replace is implemented with an UPDATE. Unfortunately we don't have a "insert-or-update" in SQL, so our set command is implemented by first trying and add and if that fails, we try an update. The source looks like:

const char* add_query = "INSERT INTO memcached (key, header_size, data) VALUES ( $1, $2, $3)";

const char* update_query = "UPDATE memcached SET header_size = $2, data = $3 where key = $1";

static ENGINE_ERROR_CODE do_db_store_item(const char *sql,
                                          struct pg_engine* engine, item *it) {
   uint32_t hl = htonl(it->nsuffix);
   
   const char *params[3];
   params[0] = (char*)ITEM_key(it);
   params[1] = (char*)&hl;
   params[2] = (char*)ITEM_suffix(it);
   
   int sizes[3] = { it->nkey , 4, it->nbytes + it->nsuffix };
   int formats[3] = {1, 1, 1};

   PGresult *result;
   result = PQexecParams(engine->psql, sql, 3, NULL,
                         params, sizes, formats, 1);

   ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
   
   if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK) {
      ret = ENGINE_EINVAL;
   }

   PQclear(result);
   return ret;
}

static ENGINE_ERROR_CODE do_store_item(struct pg_engine* engine,
                                       item *it, enum operation comm) {
   switch (comm) {
   case NREAD_ADD:
      return do_db_store_item(add_query, engine, it);
   case NREAD_REPLACE:
      return do_db_store_item(update_query, engine, it);
   case NREAD_SET:
      if (do_db_store_item(add_query, engine, it) != ENGINE_SUCCESS) {
         return do_db_store_item(update_query, engine, it);
      }
      return ENGINE_SUCCESS;
      
   default:
      return ENGINE_ENOTSUP;
   }
}

static ENGINE_ERROR_CODE pg_store(struct engine_handle* handle,
                                  item* item, enum operation operation) {
   int ret;
   struct pg_engine* engine = get_handle(handle);

   pthread_mutex_lock(&engine->cache_lock);
   if (do_store_item(engine, item, operation) == ENGINE_SUCCESS) {
      ret = 1;
   } else {
      ret = 0;
   }
   pthread_mutex_unlock(&engine->cache_lock);
   return ret;
}            
        

If you look at the assignments of ret in pg_store you will most likely spot a bug. We don't use the correct return values!! I noted that when I wrote this plugin and I haven't had the time to fix it yet.

Now that we got data in the cache we can try to get the data back to the client. When the client tries to get an element from the cache, the memcached server will call item_get. In our little server this function will try to get the item from the database. Because the server may call this function from multiple threads, we need to synchronize the access to our PostgreSQL handle:

static item* do_pg_get(struct pg_engine* engine, const void* key,
                       const int nkey) {
   const char* query = "SELECT header_size, data FROM memcached WHERE key=$1";
   const char *params[1];
   params[0] = (char*)key;
   int paramLengths[1] = { nkey };
   
   PGresult *result;
   result = PQexecParams(engine->psql, query, 1, NULL, params,
                         paramLengths, NULL, 1);
   if (result == NULL) {
      return NULL;
   }
   
   int tup = PQntuples(result);
   if (tup == 0) {
      PQclear(result);
      return NULL;
   }

   char *sptr = PQgetvalue(result, 0, 0);
   uint32_t size = ntohl(*((uint32_t*)sptr));
   int datalen = PQgetlength(result, 0, 1);
   size_t ntotal = sizeof(item) + nkey + 1 + size + datalen;
   
   item *it;
   if ((it = calloc(1, ntotal)) == NULL) {
      PQclear(result);
      return NULL;
   }
   
   it->nkey = nkey;
   it->nsuffix = size;
   it->nbytes = datalen - size;
   memcpy(ITEM_key(it), key, nkey);
   memcpy(ITEM_suffix(it), PQgetvalue(result, 0, 1), datalen);
   
   PQclear(result);
   return it;
}

static item* pg_get(struct engine_handle* handle, const void* key,
                    const int nkey) {
   item *it;
   struct pg_engine* engine = get_handle(handle);
   pthread_mutex_lock(&engine->cache_lock);
   it = do_pg_get(engine, key, nkey);
   pthread_mutex_unlock(&engine->cache_lock);
   return it;
}            
        

Well, that covers the basic functions you need to implement in order to create a minimalistic memcached storage engine. So please go ahead and play with the API and let us know if the API is usable or not.

Comments:

[Trackback] Last week, Trond and I played with PostgreSQL as a storage engine for Memcached . Great stuff if you think that the world is moving too fast, and want to slow down a bit He forgot to post the table we used in the database. Nothing revolutionary, ...

Posted by Jorgen Austvik's Weblog on May 26, 2008 at 10:44 AM CEST #

Post a Comment:
  • HTML Syntax: NOT allowed

Valid HTML! Valid CSS!

This is a personal weblog, I do not speak for my employer.