Saturday May 24, 2008
Free webinars on MySQL and Memcached
This week Sun will host the first two free webinars on MySQL and Memcached. Ivan Zoratti will kick off on Wednesday with "Highly scalable solutions with MySQL and Memcached".
On Thursday Monty Taylor, Jimmy Guerrero and Frank Mash will talk about "Designing and Implementing Scalable Applications with Memcached and MySQL".
You should do as I did and register for the event as soon as possible :-)
Posted at 09:33PM May 24, 2008 by trond in Memcached | Comments[1]
Memcached and customized storage engines
Most forks of memcached is because people would like another
storage-engine than the default memory-based engine in Memcached.
For the last two weeks I have been working with Toru Maesaka on
designing an API to allow users to plug in their storage engine
into Memcached. The current version of the API consist of just 14
functions, so it should be an easy task to implement your own
storage engine. The storage engine API is defined in
engine.h.
With a draft of the engine specification in place, I started to refactor the source code so that the memory based backend use this API. I finished a prototype a couple of days ago, and pushed it to my git repository at http://github.com/trondn/memcached/commits/binary. As you can see there are still some loose ends, and I would like to clean up the implementation of this engine a bit. In this prototype I have just moved the code out of the internal server without trying to be smart ;-)
When I talked about the work at lunch the other day, the guy sitting in the office next to me got exited and wanted a demo. He is working in the PostgreSQL team, so he asked me how difficult it would be to create a small backend that stored the items in a database. I guess others may be interesting in how to create their own backend, so I decided to "document" it as a blog-post.
To create your own storage engine, you must create a shared library that exports the following function:
ENGINE_HANDLE* create_instance(int version, ENGINE_ERROR_CODE* error);
The memcached server will call this function to ask your library to create an instance of the storage engine. When the core server calls the function in the API, this handle is passed as the first argument. That makes the handle a perfect place to store extra engine-specific data. The little example-engine we created use the following struct:
struct pg_engine {
/**
* The handle defined in the API
*/
ENGINE_HANDLE engine;
/**
* Is the engine initalized or not
*/
bool initialized;
/**
* A single lock is used for accessing the cache ;-)
*/
pthread_mutex_t cache_lock;
/**
* Connection to the postgres database
*/
PGconn *psql;
};
The implementation of create_instance looks like:
ENGINE_HANDLE* create_instance(int version, ENGINE_ERROR_CODE* error) {
struct pg_engine *handle;
if (version != 1) {
if (error != NULL) {
*error = ENGINE_ENOTSUP;
}
return 0;
}
if ((handle = calloc(1, sizeof(*handle))) == NULL) {
if (error != NULL) {
*error = ENGINE_ENOMEM;
}
return 0;
}
handle->engine.interface_level = version;
handle->initialized = false;
handle->engine.get_info = pg_get_info;
handle->engine.initialize = pg_initialize;
handle->engine.destroy = pg_destroy;
handle->engine.item_size_ok = pg_item_size_ok;
handle->engine.item_allocate = pg_item_allocate;
handle->engine.item_delete = pg_item_delete;
handle->engine.item_release = pg_item_release;
handle->engine.get = pg_get;
handle->engine.get_not_deleted = pg_get_not_deleted;
handle->engine.get_stats = pg_get_stats;
handle->engine.store = pg_store;
handle->engine.arithmetic = pg_arithmetic;
handle->engine.flush = pg_flush;
handle->engine.update_lru_time = pg_update_lru_time;
return &handle->engine;
}
If we look at the structure and the create_instance
function, there are some small interesting details. First of all
you see that the pg_engine structure contains all of
the variables needed, so that we don't use global variables.
Secondly we verify that we support the version that the memcached
server requests.
When the memcached server have created the instance, it will try
to initialize the engine by calling initialize. This
is where the engine should initialize it's internal datastructures.
For our server, we initilize the mutex and creates the databse
connection:
static inline struct pg_engine* get_handle(struct engine_handle* handle) {
/*
* We can cast the pointer to a pg_handle because the engine_handle
* is the first member in the pg_engine struct.
*/
return (struct pg_engine*)handle;
}
static ENGINE_ERROR_CODE pg_initialize(struct engine_handle* handle,
const char* config_str) {
struct pg_engine* engine = get_handle(handle);
if (engine->initialized) {
return ENGINE_EINVAL;
}
if (pthread_mutex_init(&engine->cache_lock, NULL) != 0) {
return ENGINE_EINVAL;
}
engine->psql = PQconnectdb("hostaddr='127.0.0.1' port='' dbname='postgres' user='postgres' password='' connect_timeout='10'");
if (engine->psql == NULL || PQstatus(engine->psql) != CONNECTION_OK) {
pthread_mutex_destroy(&engine->cache_lock);
return ENGINE_EINVAL;
}
engine->initialized = true;
return ENGINE_SUCCESS;
}
In our little engine we hardcoded the database connection information, but as you see from the prototype memcached will provide a configuration string you could use.
Ok. Now the server is ready to accept clients. The first functions
we would like to implement are probably those needed in order to
support insertion of data into the server. The first function we
must implement is the item_allocate. The server will call
this function to allocate memory where it can spool the data from
the client. Since we don't try to be smart and implement a fast
server, we just use calloc to allocate a memory chunk. Our
implementation looks like:
static item* pg_item_allocate(struct engine_handle* handle, const void* key,
const size_t nkey, const int flags,
const rel_time_t exptime,
const int nbytes) {
item *it;
char suffix[40];
size_t nsuffix = snprintf(suffix, sizeof(suffix),
" %d %d\r\n", flags, nbytes - 2);
size_t ntotal = nsuffix + sizeof(item) + nkey + 1 + nbytes;
if ((it = calloc(1, ntotal)) == NULL) {
return NULL;
}
it->it_flags = 0;
it->nkey = nkey;
it->nbytes = nbytes;
memcpy(ITEM_key(it), key, nkey);
it->exptime = exptime;
memcpy(ITEM_suffix(it), suffix, nsuffix);
it->nsuffix = nsuffix;
return it;
}
When the server is done with an item, it will call
item_release to notify the backend that it may release
the allocated resources. In our little example this function just
releases the memory:
static void pg_item_release(struct engine_handle* handle, item* item) {
free(item);
}
When we have received all of the data from the client, the memcached
server will try to store the item in the engine by calling store.
This function is used for all types of store-acces (add, set, replace,
append and prepend). In our little server, we only support add, set
and replace. Add is implemented with a INSERT SQL statement
while replace is implemented with an UPDATE. Unfortunately
we don't have a "insert-or-update" in SQL, so our set command
is implemented by first trying and add and if that fails, we try an
update. The source looks like:
const char* add_query = "INSERT INTO memcached (key, header_size, data) VALUES ( $1, $2, $3)";
const char* update_query = "UPDATE memcached SET header_size = $2, data = $3 where key = $1";
static ENGINE_ERROR_CODE do_db_store_item(const char *sql,
struct pg_engine* engine, item *it) {
uint32_t hl = htonl(it->nsuffix);
const char *params[3];
params[0] = (char*)ITEM_key(it);
params[1] = (char*)&hl;
params[2] = (char*)ITEM_suffix(it);
int sizes[3] = { it->nkey , 4, it->nbytes + it->nsuffix };
int formats[3] = {1, 1, 1};
PGresult *result;
result = PQexecParams(engine->psql, sql, 3, NULL,
params, sizes, formats, 1);
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK) {
ret = ENGINE_EINVAL;
}
PQclear(result);
return ret;
}
static ENGINE_ERROR_CODE do_store_item(struct pg_engine* engine,
item *it, enum operation comm) {
switch (comm) {
case NREAD_ADD:
return do_db_store_item(add_query, engine, it);
case NREAD_REPLACE:
return do_db_store_item(update_query, engine, it);
case NREAD_SET:
if (do_db_store_item(add_query, engine, it) != ENGINE_SUCCESS) {
return do_db_store_item(update_query, engine, it);
}
return ENGINE_SUCCESS;
default:
return ENGINE_ENOTSUP;
}
}
static ENGINE_ERROR_CODE pg_store(struct engine_handle* handle,
item* item, enum operation operation) {
int ret;
struct pg_engine* engine = get_handle(handle);
pthread_mutex_lock(&engine->cache_lock);
if (do_store_item(engine, item, operation) == ENGINE_SUCCESS) {
ret = 1;
} else {
ret = 0;
}
pthread_mutex_unlock(&engine->cache_lock);
return ret;
}
If you look at the assignments of ret in
pg_store you will most likely spot a bug. We don't use
the correct return values!! I noted that when I wrote this plugin
and I haven't had the time to fix it yet.
Now that we got data in the cache we can try to get the data back to the client. When the client tries to get an element from the cache, the memcached server will call item_get. In our little server this function will try to get the item from the database. Because the server may call this function from multiple threads, we need to synchronize the access to our PostgreSQL handle:
static item* do_pg_get(struct pg_engine* engine, const void* key,
const int nkey) {
const char* query = "SELECT header_size, data FROM memcached WHERE key=$1";
const char *params[1];
params[0] = (char*)key;
int paramLengths[1] = { nkey };
PGresult *result;
result = PQexecParams(engine->psql, query, 1, NULL, params,
paramLengths, NULL, 1);
if (result == NULL) {
return NULL;
}
int tup = PQntuples(result);
if (tup == 0) {
PQclear(result);
return NULL;
}
char *sptr = PQgetvalue(result, 0, 0);
uint32_t size = ntohl(*((uint32_t*)sptr));
int datalen = PQgetlength(result, 0, 1);
size_t ntotal = sizeof(item) + nkey + 1 + size + datalen;
item *it;
if ((it = calloc(1, ntotal)) == NULL) {
PQclear(result);
return NULL;
}
it->nkey = nkey;
it->nsuffix = size;
it->nbytes = datalen - size;
memcpy(ITEM_key(it), key, nkey);
memcpy(ITEM_suffix(it), PQgetvalue(result, 0, 1), datalen);
PQclear(result);
return it;
}
static item* pg_get(struct engine_handle* handle, const void* key,
const int nkey) {
item *it;
struct pg_engine* engine = get_handle(handle);
pthread_mutex_lock(&engine->cache_lock);
it = do_pg_get(engine, key, nkey);
pthread_mutex_unlock(&engine->cache_lock);
return it;
}
Well, that covers the basic functions you need to implement in order to create a minimalistic memcached storage engine. So please go ahead and play with the API and let us know if the API is usable or not.
Posted at 05:46PM May 24, 2008 by trond in Memcached | Comments[1]