Welcome! Log In Create A New Profile

Advanced

eliminating the taking lock for cqi_freelist

Posted by ilnarb 
ilnarb
eliminating the taking lock for cqi_freelist
August 17, 2010 12:20PM
I suggest you to eliminate taking of the lock on cqi_freelist.
In order to it we should done all work on cqi_freelist by one thread
-- dispatcher thread in memcached.
I made some changes in queue code, including improvements for cache
line.
It will work on any platform.
There are patch file (diff -C3 memcached-1.4.5/thread.c <new
thread.c>)

*** thread.c.orig Sat Apr 3 11:07:16 2010
--- thread.c Tue Aug 17 14:09:28 2010
***************
*** 12,16 ****
#include <pthread.h>

! #define ITEMS_PER_ALLOC 64

/* An item in the connection queue. */
--- 12,17 ----
#include <pthread.h>

! #define CACHE_LINE_SIZE 64
! #define ITEMS_PER_ALLOC 256

/* An item in the connection queue. */
***************
*** 29,35 ****
struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t lock;
! pthread_cond_t cond;
};

--- 30,40 ----
struct conn_queue {
CQ_ITEM *head;
+ char pad0[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
+ CQ_ITEM *divider;
+ char pad1[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
CQ_ITEM *tail;
+ char pad2[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
pthread_mutex_t lock;
! char pad3[CACHE_LINE_SIZE - sizeof(pthread_mutex_t)];
};

***************
*** 45,49 ****
/* Free list of CQ_ITEM structs */
static CQ_ITEM *cqi_freelist;
- static pthread_mutex_t cqi_freelist_lock;

static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
--- 50,53 ----
***************
*** 65,68 ****
--- 69,75 ----
static void thread_libevent_process(int fd, short which, void *arg);

+ static CQ_ITEM *cqi_new(void);
+ static void cqi_free(CQ_ITEM *item);
+
/*
* Initializes a connection queue.
***************
*** 70,76 ****
static void cq_init(CQ *cq) {
pthread_mutex_init(&cq->lock, NULL);
! pthread_cond_init(&cq->cond, NULL);
! cq->head = NULL;
! cq->tail = NULL;
}

--- 77,81 ----
static void cq_init(CQ *cq) {
pthread_mutex_init(&cq->lock, NULL);
! cq->head = cq->divider = cq->tail = cqi_new();
}

***************
*** 78,96 ****
* Looks for an item on a connection queue, but doesn't block if
there isn't
* one.
! * Returns the item, or NULL if no item is available
*/
! static CQ_ITEM *cq_pop(CQ *cq) {
! CQ_ITEM *item;

pthread_mutex_lock(&cq->lock);
! item = cq->head;
! if (NULL != item) {
! cq->head = item->next;
! if (NULL == cq->head)
! cq->tail = NULL;
}
pthread_mutex_unlock(&cq->lock);

! return item;
}

--- 83,105 ----
* Looks for an item on a connection queue, but doesn't block if
there isn't
* one.
! * Returns 1 if there are new item, or 0 if no item is available
*/
! static int cq_pop(CQ *cq, CQ_ITEM *item) {
! int res = 0;
!
! if (NULL == cq->divider->next)
! return 0;

pthread_mutex_lock(&cq->lock);
! if (NULL != cq->divider->next) {
! *item = *cq->divider->next;
! res = 1;
! cq->divider = cq->divider->next;
}
pthread_mutex_unlock(&cq->lock);

! item->next = NULL;
!
! return res;
}

***************
*** 102,112 ****

pthread_mutex_lock(&cq->lock);
! if (NULL == cq->tail)
! cq->head = item;
! else
! cq->tail->next = item;
! cq->tail = item;
! pthread_cond_signal(&cq->cond);
pthread_mutex_unlock(&cq->lock);
}

--- 111,124 ----

pthread_mutex_lock(&cq->lock);
! cq->tail->next = item;
! cq->tail = cq->tail->next;
pthread_mutex_unlock(&cq->lock);
+
+ while(cq->head != cq->divider)
+ {
+ CQ_ITEM *tmp = cq->head;
+ cq->head = cq->head->next;
+ cqi_free(tmp);
+ }
}

***************
*** 116,125 ****
static CQ_ITEM *cqi_new(void) {
CQ_ITEM *item = NULL;
- pthread_mutex_lock(&cqi_freelist_lock);
if (cqi_freelist) {
item = cqi_freelist;
cqi_freelist = item->next;
}
- pthread_mutex_unlock(&cqi_freelist_lock);

if (NULL == item) {
--- 128,135 ----
***************
*** 139,148 ****
item[i - 1].next = &item;

- pthread_mutex_lock(&cqi_freelist_lock);
item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
cqi_freelist = &item[1];
- pthread_mutex_unlock(&cqi_freelist_lock);
}

return item;
}
--- 149,158 ----
item[i - 1].next = &item;

item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
cqi_freelist = &item[1];
}

+ item->next = NULL;
+
return item;
}
***************
*** 153,160 ****
*/
static void cqi_free(CQ_ITEM *item) {
- pthread_mutex_lock(&cqi_freelist_lock);
item->next = cqi_freelist;
cqi_freelist = item;
- pthread_mutex_unlock(&cqi_freelist_lock);
}

--- 163,168 ----
***************
*** 254,258 ****
static void thread_libevent_process(int fd, short which, void *arg)
{
LIBEVENT_THREAD *me = arg;
! CQ_ITEM *item;
char buf[1];

--- 262,267 ----
static void thread_libevent_process(int fd, short which, void *arg)
{
LIBEVENT_THREAD *me = arg;
! CQ_ITEM _item;
! CQ_ITEM *item = &_item;
char buf[1];

***************
*** 261,267 ****
fprintf(stderr, "Can't read from libevent pipe\n");

! item = cq_pop(me->new_conn_queue);
!
! if (NULL != item) {
conn *c = conn_new(item->sfd, item->init_state, item-
>event_flags,
item->read_buffer_size, item->transport,
me->base);
--- 270,274 ----
fprintf(stderr, "Can't read from libevent pipe\n");

! if (cq_pop(me->new_conn_queue, item)) {
conn *c = conn_new(item->sfd, item->init_state, item-
>event_flags,
item->read_buffer_size, item->transport,
me->base);
***************
*** 280,284 ****
c->thread = me;
}
- cqi_free(item);
}
}
--- 287,290 ----
***************
*** 595,599 ****
pthread_cond_init(&init_cond, NULL);

- pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;

--- 601,604 ----



Also I tested lock- & wait- free single-producer single-consumer
conn_queue using volatile variables. It works on my SMP server (2*Xeon
E55**), centos 5.5, gcc 4.1. But I'm not sure that it will work on
other platforms without memory fence instructions/calls, for example,
on Itanium. I can send patch if anybody interested in it.
dormando
Re: eliminating the taking lock for cqi_freelist
August 17, 2010 07:10PM
Can you resubmit as a unified diff?

On Tue, 17 Aug 2010, ilnarb wrote:

> I suggest you to eliminate taking of the lock on cqi_freelist.
> In order to it we should done all work on cqi_freelist by one thread
> -- dispatcher thread in memcached.
> I made some changes in queue code, including improvements for cache
> line.
> It will work on any platform.
> There are patch file (diff -C3 memcached-1.4.5/thread.c <new
> thread.c>)
>
> *** thread.c.orig Sat Apr 3 11:07:16 2010
> --- thread.c Tue Aug 17 14:09:28 2010
> ***************
> *** 12,16 ****
> #include <pthread.h>
>
> ! #define ITEMS_PER_ALLOC 64
>
> /* An item in the connection queue. */
> --- 12,17 ----
> #include <pthread.h>
>
> ! #define CACHE_LINE_SIZE 64
> ! #define ITEMS_PER_ALLOC 256
>
> /* An item in the connection queue. */
> ***************
> *** 29,35 ****
> struct conn_queue {
> CQ_ITEM *head;
> CQ_ITEM *tail;
> pthread_mutex_t lock;
> ! pthread_cond_t cond;
> };
>
> --- 30,40 ----
> struct conn_queue {
> CQ_ITEM *head;
> + char pad0[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
> + CQ_ITEM *divider;
> + char pad1[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
> CQ_ITEM *tail;
> + char pad2[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
> pthread_mutex_t lock;
> ! char pad3[CACHE_LINE_SIZE - sizeof(pthread_mutex_t)];
> };
>
> ***************
> *** 45,49 ****
> /* Free list of CQ_ITEM structs */
> static CQ_ITEM *cqi_freelist;
> - static pthread_mutex_t cqi_freelist_lock;
>
> static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
> --- 50,53 ----
> ***************
> *** 65,68 ****
> --- 69,75 ----
> static void thread_libevent_process(int fd, short which, void *arg);
>
> + static CQ_ITEM *cqi_new(void);
> + static void cqi_free(CQ_ITEM *item);
> +
> /*
> * Initializes a connection queue.
> ***************
> *** 70,76 ****
> static void cq_init(CQ *cq) {
> pthread_mutex_init(&cq->lock, NULL);
> ! pthread_cond_init(&cq->cond, NULL);
> ! cq->head = NULL;
> ! cq->tail = NULL;
> }
>
> --- 77,81 ----
> static void cq_init(CQ *cq) {
> pthread_mutex_init(&cq->lock, NULL);
> ! cq->head = cq->divider = cq->tail = cqi_new();
> }
>
> ***************
> *** 78,96 ****
> * Looks for an item on a connection queue, but doesn't block if
> there isn't
> * one.
> ! * Returns the item, or NULL if no item is available
> */
> ! static CQ_ITEM *cq_pop(CQ *cq) {
> ! CQ_ITEM *item;
>
> pthread_mutex_lock(&cq->lock);
> ! item = cq->head;
> ! if (NULL != item) {
> ! cq->head = item->next;
> ! if (NULL == cq->head)
> ! cq->tail = NULL;
> }
> pthread_mutex_unlock(&cq->lock);
>
> ! return item;
> }
>
> --- 83,105 ----
> * Looks for an item on a connection queue, but doesn't block if
> there isn't
> * one.
> ! * Returns 1 if there are new item, or 0 if no item is available
> */
> ! static int cq_pop(CQ *cq, CQ_ITEM *item) {
> ! int res = 0;
> !
> ! if (NULL == cq->divider->next)
> ! return 0;
>
> pthread_mutex_lock(&cq->lock);
> ! if (NULL != cq->divider->next) {
> ! *item = *cq->divider->next;
> ! res = 1;
> ! cq->divider = cq->divider->next;
> }
> pthread_mutex_unlock(&cq->lock);
>
> ! item->next = NULL;
> !
> ! return res;
> }
>
> ***************
> *** 102,112 ****
>
> pthread_mutex_lock(&cq->lock);
> ! if (NULL == cq->tail)
> ! cq->head = item;
> ! else
> ! cq->tail->next = item;
> ! cq->tail = item;
> ! pthread_cond_signal(&cq->cond);
> pthread_mutex_unlock(&cq->lock);
> }
>
> --- 111,124 ----
>
> pthread_mutex_lock(&cq->lock);
> ! cq->tail->next = item;
> ! cq->tail = cq->tail->next;
> pthread_mutex_unlock(&cq->lock);
> +
> + while(cq->head != cq->divider)
> + {
> + CQ_ITEM *tmp = cq->head;
> + cq->head = cq->head->next;
> + cqi_free(tmp);
> + }
> }
>
> ***************
> *** 116,125 ****
> static CQ_ITEM *cqi_new(void) {
> CQ_ITEM *item = NULL;
> - pthread_mutex_lock(&cqi_freelist_lock);
> if (cqi_freelist) {
> item = cqi_freelist;
> cqi_freelist = item->next;
> }
> - pthread_mutex_unlock(&cqi_freelist_lock);
>
> if (NULL == item) {
> --- 128,135 ----
> ***************
> *** 139,148 ****
> item[i - 1].next = &item;
>
> - pthread_mutex_lock(&cqi_freelist_lock);
> item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
> cqi_freelist = &item[1];
> - pthread_mutex_unlock(&cqi_freelist_lock);
> }
>
> return item;
> }
> --- 149,158 ----
> item[i - 1].next = &item;
>
> item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
> cqi_freelist = &item[1];
> }
>
> + item->next = NULL;
> +
> return item;
> }
> ***************
> *** 153,160 ****
> */
> static void cqi_free(CQ_ITEM *item) {
> - pthread_mutex_lock(&cqi_freelist_lock);
> item->next = cqi_freelist;
> cqi_freelist = item;
> - pthread_mutex_unlock(&cqi_freelist_lock);
> }
>
> --- 163,168 ----
> ***************
> *** 254,258 ****
> static void thread_libevent_process(int fd, short which, void *arg)
> {
> LIBEVENT_THREAD *me = arg;
> ! CQ_ITEM *item;
> char buf[1];
>
> --- 262,267 ----
> static void thread_libevent_process(int fd, short which, void *arg)
> {
> LIBEVENT_THREAD *me = arg;
> ! CQ_ITEM _item;
> ! CQ_ITEM *item = &_item;
> char buf[1];
>
> ***************
> *** 261,267 ****
> fprintf(stderr, "Can't read from libevent pipe\n");
>
> ! item = cq_pop(me->new_conn_queue);
> !
> ! if (NULL != item) {
> conn *c = conn_new(item->sfd, item->init_state, item-
> >event_flags,
> item->read_buffer_size, item->transport,
> me->base);
> --- 270,274 ----
> fprintf(stderr, "Can't read from libevent pipe\n");
>
> ! if (cq_pop(me->new_conn_queue, item)) {
> conn *c = conn_new(item->sfd, item->init_state, item-
> >event_flags,
> item->read_buffer_size, item->transport,
> me->base);
> ***************
> *** 280,284 ****
> c->thread = me;
> }
> - cqi_free(item);
> }
> }
> --- 287,290 ----
> ***************
> *** 595,599 ****
> pthread_cond_init(&init_cond, NULL);
>
> - pthread_mutex_init(&cqi_freelist_lock, NULL);
> cqi_freelist = NULL;
>
> --- 601,604 ----
>
>
>
> Also I tested lock- & wait- free single-producer single-consumer
> conn_queue using volatile variables. It works on my SMP server (2*Xeon
> E55**), centos 5.5, gcc 4.1. But I'm not sure that it will work on
> other platforms without memory fence instructions/calls, for example,
> on Itanium. I can send patch if anybody interested in it.
>
ilnarb
Re: eliminating the taking lock for cqi_freelist
August 18, 2010 11:10AM
--- thread.c.orig 2010-04-03 11:07:16.000000000 +0400
+++ thread.c 2010-08-18 13:03:50.000000000 +0400
@@ -11,7 +11,8 @@
#include <string.h>
#include <pthread.h>

-#define ITEMS_PER_ALLOC 64
+#define CACHE_LINE_SIZE 64
+#define ITEMS_PER_ALLOC 256

/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
@@ -28,9 +29,13 @@
typedef struct conn_queue CQ;
struct conn_queue {
CQ_ITEM *head;
+ char pad0[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
+ CQ_ITEM *divider;
+ char pad1[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
CQ_ITEM *tail;
+ char pad2[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
pthread_mutex_t lock;
- pthread_cond_t cond;
+ char pad3[CACHE_LINE_SIZE - sizeof(pthread_mutex_t)];
};

/* Lock for cache operations (item_*, assoc_*) */
@@ -44,7 +49,6 @@

/* Free list of CQ_ITEM structs */
static CQ_ITEM *cqi_freelist;
-static pthread_mutex_t cqi_freelist_lock;

static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;

@@ -64,34 +68,39 @@

static void thread_libevent_process(int fd, short which, void *arg);

+static CQ_ITEM *cqi_new(void);
+static void cqi_free(CQ_ITEM *item);
+
/*
* Initializes a connection queue.
*/
static void cq_init(CQ *cq) {
pthread_mutex_init(&cq->lock, NULL);
- pthread_cond_init(&cq->cond, NULL);
- cq->head = NULL;
- cq->tail = NULL;
+ cq->head = cq->divider = cq->tail = cqi_new();
}

/*
* Looks for an item on a connection queue, but doesn't block if
there isn't
* one.
- * Returns the item, or NULL if no item is available
+ * Returns 1 if there are new item, or 0 if no item is available
*/
-static CQ_ITEM *cq_pop(CQ *cq) {
- CQ_ITEM *item;
+static int cq_pop(CQ *cq, CQ_ITEM *item) {
+ int res = 0;
+
+ if (NULL == cq->divider->next)
+ return 0;

pthread_mutex_lock(&cq->lock);
- item = cq->head;
- if (NULL != item) {
- cq->head = item->next;
- if (NULL == cq->head)
- cq->tail = NULL;
+ if (NULL != cq->divider->next) {
+ *item = *cq->divider->next;
+ res = 1;
+ cq->divider = cq->divider->next;
}
pthread_mutex_unlock(&cq->lock);

- return item;
+ item->next = NULL;
+
+ return res;
}

/*
@@ -101,13 +110,16 @@
item->next = NULL;

pthread_mutex_lock(&cq->lock);
- if (NULL == cq->tail)
- cq->head = item;
- else
- cq->tail->next = item;
- cq->tail = item;
- pthread_cond_signal(&cq->cond);
+ cq->tail->next = item;
+ cq->tail = cq->tail->next;
pthread_mutex_unlock(&cq->lock);
+
+ while(cq->head != cq->divider)
+ {
+ CQ_ITEM *tmp = cq->head;
+ cq->head = cq->head->next;
+ cqi_free(tmp);
+ }
}

/*
@@ -115,12 +127,10 @@
*/
static CQ_ITEM *cqi_new(void) {
CQ_ITEM *item = NULL;
- pthread_mutex_lock(&cqi_freelist_lock);
if (cqi_freelist) {
item = cqi_freelist;
cqi_freelist = item->next;
}
- pthread_mutex_unlock(&cqi_freelist_lock);

if (NULL == item) {
int i;
@@ -138,12 +148,12 @@
for (i = 2; i < ITEMS_PER_ALLOC; i++)
item[i - 1].next = &item;

- pthread_mutex_lock(&cqi_freelist_lock);
item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
cqi_freelist = &item[1];
- pthread_mutex_unlock(&cqi_freelist_lock);
}

+ item->next = NULL;
+
return item;
}

@@ -152,10 +162,8 @@
* Frees a connection queue item (adds it to the freelist.)
*/
static void cqi_free(CQ_ITEM *item) {
- pthread_mutex_lock(&cqi_freelist_lock);
item->next = cqi_freelist;
cqi_freelist = item;
- pthread_mutex_unlock(&cqi_freelist_lock);
}


@@ -253,16 +261,15 @@
*/
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
- CQ_ITEM *item;
+ CQ_ITEM _item;
+ CQ_ITEM *item = &_item;
char buf[1];

if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");

- item = cq_pop(me->new_conn_queue);
-
- if (NULL != item) {
+ if (cq_pop(me->new_conn_queue, item)) {
conn *c = conn_new(item->sfd, item->init_state, item-
>event_flags,
item->read_buffer_size, item->transport,
me->base);
if (c == NULL) {
@@ -279,7 +286,6 @@
} else {
c->thread = me;
}
- cqi_free(item);
}
}

@@ -594,7 +600,6 @@
pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);

- pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;

threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
ilnarb
Re: eliminating the taking lock for cqi_freelist
August 18, 2010 11:50AM
Some little bit changes:
Using the fact that dispatcher keep balance between threads while
dispatch new connections, we can move out freelist into conn_queue
without any problem with unbalanced size of free items on each
conn_queue. Also I eliminate tailing "while .. free" code in cq_push
in my previous inplementation -- new items will get from head of queue
while there are free items (from head to divider).
This make able to make more than one dispatcher thread without
synchronization problems on unguarded global freelist.
There are unified diff of thread.c/1.4.5


--- thread.c.orig 2010-04-03 11:07:16.000000000 +0400
+++ thread.c 2010-08-18 13:28:18.000000000 +0400
@@ -11,6 +11,7 @@
#include <string.h>
#include <pthread.h>

+#define CACHE_LINE_SIZE 64
#define ITEMS_PER_ALLOC 64

/* An item in the connection queue. */
@@ -28,9 +29,15 @@
typedef struct conn_queue CQ;
struct conn_queue {
CQ_ITEM *head;
+ char pad0[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
+ CQ_ITEM *divider;
+ char pad1[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
CQ_ITEM *tail;
+ char pad2[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
pthread_mutex_t lock;
- pthread_cond_t cond;
+ char pad3[CACHE_LINE_SIZE - sizeof(pthread_mutex_t)];
+ CQ_ITEM *freelist;
+ char pad4[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
};

/* Lock for cache operations (item_*, assoc_*) */
@@ -42,10 +49,6 @@
/* Lock for global stats */
static pthread_mutex_t stats_lock;

-/* Free list of CQ_ITEM structs */
-static CQ_ITEM *cqi_freelist;
-static pthread_mutex_t cqi_freelist_lock;
-
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;

/*
@@ -65,62 +68,18 @@
static void thread_libevent_process(int fd, short which, void *arg);

/*
- * Initializes a connection queue.
- */
-static void cq_init(CQ *cq) {
- pthread_mutex_init(&cq->lock, NULL);
- pthread_cond_init(&cq->cond, NULL);
- cq->head = NULL;
- cq->tail = NULL;
-}
-
-/*
- * Looks for an item on a connection queue, but doesn't block if
there isn't
- * one.
- * Returns the item, or NULL if no item is available
- */
-static CQ_ITEM *cq_pop(CQ *cq) {
- CQ_ITEM *item;
-
- pthread_mutex_lock(&cq->lock);
- item = cq->head;
- if (NULL != item) {
- cq->head = item->next;
- if (NULL == cq->head)
- cq->tail = NULL;
- }
- pthread_mutex_unlock(&cq->lock);
-
- return item;
-}
-
-/*
- * Adds an item to a connection queue.
- */
-static void cq_push(CQ *cq, CQ_ITEM *item) {
- item->next = NULL;
-
- pthread_mutex_lock(&cq->lock);
- if (NULL == cq->tail)
- cq->head = item;
- else
- cq->tail->next = item;
- cq->tail = item;
- pthread_cond_signal(&cq->cond);
- pthread_mutex_unlock(&cq->lock);
-}
-
-/*
* Returns a fresh connection queue item.
*/
-static CQ_ITEM *cqi_new(void) {
+static CQ_ITEM *cq_new_item(CQ *cq) {
CQ_ITEM *item = NULL;
- pthread_mutex_lock(&cqi_freelist_lock);
- if (cqi_freelist) {
- item = cqi_freelist;
- cqi_freelist = item->next;
+ if (cq->head && cq->head != cq->divider) {
+ item = cq->head;
+ cq->head = cq->head->next;
+ }
+ else if (cq->freelist) {
+ item = cq->freelist;
+ cq->freelist = item->next;
}
- pthread_mutex_unlock(&cqi_freelist_lock);

if (NULL == item) {
int i;
@@ -138,26 +97,60 @@
for (i = 2; i < ITEMS_PER_ALLOC; i++)
item[i - 1].next = &item;

- pthread_mutex_lock(&cqi_freelist_lock);
- item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
- cqi_freelist = &item[1];
- pthread_mutex_unlock(&cqi_freelist_lock);
+ item[ITEMS_PER_ALLOC - 1].next = cq->freelist;
+ cq->freelist = &item[1];
}

+ item->next = NULL;
+
return item;
}

+/*
+ * Initializes a connection queue.
+ */
+static void cq_init(CQ *cq) {
+ pthread_mutex_init(&cq->lock, NULL);
+ cq->freelist = NULL;
+ cq->head = NULL;
+ cq->head = cq->divider = cq->tail = cq_new_item(cq);
+}

/*
- * Frees a connection queue item (adds it to the freelist.)
+ * Looks for an item on a connection queue, but doesn't block if
there isn't
+ * one.
+ * Returns 1 if there are new item, or 0 if no item is available
*/
-static void cqi_free(CQ_ITEM *item) {
- pthread_mutex_lock(&cqi_freelist_lock);
- item->next = cqi_freelist;
- cqi_freelist = item;
- pthread_mutex_unlock(&cqi_freelist_lock);
+static int cq_pop(CQ *cq, CQ_ITEM *item) {
+ int res = 0;
+
+ if (NULL == cq->divider->next)
+ return 0;
+
+ pthread_mutex_lock(&cq->lock);
+ if (NULL != cq->divider->next) {
+ *item = *cq->divider->next;
+ res = 1;
+ cq->divider = cq->divider->next;
+ }
+ pthread_mutex_unlock(&cq->lock);
+
+ item->next = NULL;
+
+ return res;
}

+/*
+ * Adds an item to a connection queue.
+ */
+static void cq_push(CQ *cq, CQ_ITEM *item) {
+ item->next = NULL;
+
+ pthread_mutex_lock(&cq->lock);
+ cq->tail->next = item;
+ cq->tail = cq->tail->next;
+ pthread_mutex_unlock(&cq->lock);
+}

/*
* Creates a worker thread.
@@ -253,16 +246,15 @@
*/
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
- CQ_ITEM *item;
+ CQ_ITEM _item;
+ CQ_ITEM *item = &_item;
char buf[1];

if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");

- item = cq_pop(me->new_conn_queue);
-
- if (NULL != item) {
+ if (cq_pop(me->new_conn_queue, item)) {
conn *c = conn_new(item->sfd, item->init_state, item-
>event_flags,
item->read_buffer_size, item->transport,
me->base);
if (c == NULL) {
@@ -279,7 +271,6 @@
} else {
c->thread = me;
}
- cqi_free(item);
}
}

@@ -293,13 +284,15 @@
*/
void dispatch_conn_new(int sfd, enum conn_states init_state, int
event_flags,
int read_buffer_size, enum network_transport
transport) {
- CQ_ITEM *item = cqi_new();
+ CQ_ITEM *item = NULL;
int tid = (last_thread + 1) % settings.num_threads;

LIBEVENT_THREAD *thread = threads + tid;

last_thread = tid;

+ item = cq_new_item(thread->new_conn_queue);
+
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
@@ -594,9 +587,6 @@
pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);

- pthread_mutex_init(&cqi_freelist_lock, NULL);
- cqi_freelist = NULL;
-
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {
perror("Can't allocate thread descriptors");
Sorry, only registered users may post in this forum.

Click here to login