Welcome! Log In Create A New Profile

Advanced

[PATCH] Make sure all the pollers get fd updates

Posted by Olivier Houchard 
Olivier Houchard
[PATCH] Make sure all the pollers get fd updates
May 04, 2018 05:40PM
Hi,

When the code was changed to use one poller per thread, we overlooked the
fact that some fds can be shared between multiple threads, and when one
event occured, that required the fd to be added or removed from the poller,
the information would be sent only to the current thread, the other threads
may totally miss it, and thus either miss events, or report spurious events.
The attached patches is an attempt at fixing this, by adding a new global
update list, in addition to the local thread update list.

This can't be applied to 1.8, as it uses code that was not, and probably won't
be, backported, so a different patch, similar in spirit, will be developed.

Regards,

Olivier
From 7ae6ae7215984deb4487391201e3b0f99a072c4b Mon Sep 17 00:00:00 2001
From: Olivier Houchard <[email protected]>
Date: Wed, 25 Apr 2018 15:10:30 +0200
Subject: [PATCH 1/4] MINOR: fd: Make the lockless fd list work with multiple
lists.

Modify fd_add_to_fd_list() and fd_rm_from_fd_list() so that they take an
offset in the fdtab to the list entry, instead of hardcoding the fd cache,
so we can use them with other lists.
---
include/proto/fd.h | 12 ++++++------
src/fd.c | 55 +++++++++++++++++++++++++++++-------------------------
2 files changed, 36 insertions(+), 31 deletions(-)

diff --git a/include/proto/fd.h b/include/proto/fd.h
index 6c9cfe701..543a42007 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -93,8 +93,8 @@ void run_poller();
*/
void fd_process_cached_events();

-void fd_add_to_fd_list(volatile struct fdlist *list, int fd);
-void fd_rm_from_fd_list(volatile struct fdlist *list, int fd);
+void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off);
+void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);

/* Mark fd <fd> as updated for polling and allocate an entry in the update list
* for this if it was not already there. This can be done at any time.
@@ -119,9 +119,9 @@ static inline void fd_alloc_cache_entry(const int fd)
{
HA_ATOMIC_OR(&fd_cache_mask, fdtab[fd].thread_mask);
if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
- fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
+ fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd, offsetof(struct fdtab, cache));
else
- fd_add_to_fd_list(&fd_cache, fd);
+ fd_add_to_fd_list(&fd_cache, fd, offsetof(struct fdtab, cache));
}

/* Removes entry used by fd <fd> from the FD cache and replaces it with the
@@ -131,9 +131,9 @@ static inline void fd_alloc_cache_entry(const int fd)
static inline void fd_release_cache_entry(const int fd)
{
if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
- fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
+ fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd, offsetof(struct fdtab, cache));
else
- fd_rm_from_fd_list(&fd_cache, fd);
+ fd_rm_from_fd_list(&fd_cache, fd, offsetof(struct fdtab, cache));
}

/* This function automatically enables/disables caching for an entry depending
diff --git a/src/fd.c b/src/fd.c
index 1af64e543..cbf22bd22 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -175,8 +175,10 @@ unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
THREAD_LOCAL int *fd_updt = NULL; // FD updates list
THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list

+#define _FD_NEXT(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->next
+#define _FD_PREV(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->prev
/* adds fd <fd> to fd list <list> if it was not yet in it */
-void fd_add_to_fd_list(volatile struct fdlist *list, int fd)
+void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off)
{
int next;
int new;
@@ -184,11 +186,11 @@ void fd_add_to_fd_list(volatile struct fdlist *list, int fd)
int last;

redo_next:
- next = fdtab[fd].cache.next;
+ next = _FD_NEXT(fd, off);
/* Check that we're not already in the cache, and if not, lock us. */
if (next >= -2)
goto done;
- if (!HA_ATOMIC_CAS(&fdtab[fd].cache.next, &next, -2))
+ if (!HA_ATOMIC_CAS(&_FD_NEXT(fd, off), &next, -2))
goto redo_next;
__ha_barrier_store();

@@ -198,7 +200,7 @@ redo_last:
last = list->last;
old = -1;

- fdtab[fd].cache.prev = -2;
+ _FD_PREV(fd, off) = -2;
/* Make sure the "prev" store is visible before we update the last entry */
__ha_barrier_store();

@@ -214,7 +216,7 @@ redo_last:
* The CAS will only succeed if its next is -1,
* which means it's in the cache, and the last element.
*/
- if (unlikely(!HA_ATOMIC_CAS(&fdtab[last].cache.next, &old, new)))
+ if (unlikely(!HA_ATOMIC_CAS(&_FD_NEXT(last, off), &old, new)))
goto redo_last;

/* Then, update the last entry */
@@ -224,15 +226,15 @@ redo_last:
/* since we're alone at the end of the list and still locked(-2),
* we know noone tried to add past us. Mark the end of list.
*/
- fdtab[fd].cache.prev = last;
- fdtab[fd].cache.next = -1;
+ _FD_PREV(fd, off) = last;
+ _FD_NEXT(fd, off) = -1;
__ha_barrier_store();
done:
return;
}

/* removes fd <fd> from fd list <list> */
-void fd_rm_from_fd_list(volatile struct fdlist *list, int fd)
+void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off)
{
#if defined(HA_HAVE_CAS_DW) || defined(HA_CAS_IS_8B)
volatile struct fdlist_entry cur_list, next_list;
@@ -246,7 +248,7 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int fd)
lock_self:
#if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW))
next_list.next = next_list.prev = -2;
- cur_list = fdtab[fd].cache;
+ cur_list = *(volatile struct fdlist_entry *)(((char *)&fdtab[fd]) + off);
/* First, attempt to lock our own entries */
do {
/* The FD is not in the FD cache, give up */
@@ -256,9 +258,9 @@ lock_self:
goto lock_self;
} while (
#ifdef HA_CAS_IS_8B
- unlikely(!HA_ATOMIC_CAS(((void **)(void *)&fdtab[fd].cache.next), ((void **)(void *)&cur_list), (*(void **)(void *)&next_list))))
+ unlikely(!HA_ATOMIC_CAS(((void **)(void *)&_FD_NEXT(fd, off)), ((void **)(void *)&cur_list), (*(void **)(void *)&next_list))))
#else
- unlikely(!__ha_cas_dw((void *)&fdtab[fd].cache.next, (void *)&cur_list, (void *)&next_list)))
+ unlikely(!__ha_cas_dw((void *)&_FD_NEXT(fd, off), (void *)&cur_list, (void *)&next_list)))
#endif
;
next = cur_list.next;
@@ -266,18 +268,18 @@ lock_self:

#else
lock_self_next:
- next = ({ volatile int *next = &fdtab[fd].cache.next; *next; });
+ next = ({ volatile int *next = &_FD_NEXT(fd, off); *next; });
if (next == -2)
goto lock_self_next;
if (next <= -3)
goto done;
- if (unlikely(!HA_ATOMIC_CAS(&fdtab[fd].cache.next, &next, -2)))
+ if (unlikely(!HA_ATOMIC_CAS(&_FD_NEXT(fd, off), &next, -2)))
goto lock_self_next;
lock_self_prev:
- prev = ({ volatile int *prev = &fdtab[fd].cache.prev; *prev; });
+ prev = ({ volatile int *prev = &_FD_PREV(fd, off); *prev; });
if (prev == -2)
goto lock_self_prev;
- if (unlikely(!HA_ATOMIC_CAS(&fdtab[fd].cache.prev, &prev, -2)))
+ if (unlikely(!HA_ATOMIC_CAS(&_FD_PREV(fd, off), &prev, -2)))
goto lock_self_prev;
#endif
__ha_barrier_store();
@@ -287,14 +289,14 @@ lock_self_prev:
redo_prev:
old = fd;

- if (unlikely(!HA_ATOMIC_CAS(&fdtab[prev].cache.next, &old, new))) {
+ if (unlikely(!HA_ATOMIC_CAS(&_FD_NEXT(prev, off), &old, new))) {
if (unlikely(old == -2)) {
/* Neighbour already locked, give up and
* retry again once he's done
*/
- fdtab[fd].cache.prev = prev;
+ _FD_PREV(fd, off) = prev;
__ha_barrier_store();
- fdtab[fd].cache.next = next;
+ _FD_NEXT(fd, off) = next;
__ha_barrier_store();
goto lock_self;
}
@@ -304,18 +306,18 @@ redo_prev:
if (likely(next != -1)) {
redo_next:
old = fd;
- if (unlikely(!HA_ATOMIC_CAS(&fdtab[next].cache.prev, &old, new))) {
+ if (unlikely(!HA_ATOMIC_CAS(&_FD_PREV(next, off), &old, new))) {
if (unlikely(old == -2)) {
/* Neighbour already locked, give up and
* retry again once he's done
*/
if (prev != -1) {
- fdtab[prev].cache.next = fd;
+ _FD_NEXT(prev, off) = fd;
__ha_barrier_store();
}
- fdtab[fd].cache.prev = prev;
+ _FD_PREV(fd, off) = prev;
__ha_barrier_store();
- fdtab[fd].cache.next = next;
+ _FD_NEXT(fd, off) = next;
__ha_barrier_store();
goto lock_self;
}
@@ -333,18 +335,21 @@ redo_next:
*/
__ha_barrier_store();
if (likely(prev != -1))
- fdtab[prev].cache.next = next;
+ _FD_NEXT(prev, off) = next;
__ha_barrier_store();
if (likely(next != -1))
- fdtab[next].cache.prev = prev;
+ _FD_PREV(next, off) = prev;
__ha_barrier_store();
/* Ok, now we're out of the fd cache */
- fdtab[fd].cache.next = -(next + 4);
+ _FD_NEXT(fd, off) = -(next + 4);
__ha_barrier_store();
done:
return;
}

+#undef _FD_NEXT
+#undef _FD_PREV
+
/* Deletes an FD from the fdsets.
* The file descriptor is also closed.
*/
--
2.14.3

From bc11330fd8ef0fb6edccc58810da02c5fac16ee2 Mon Sep 17 00:00:00 2001
From: Olivier Houchard <[email protected]>
Date: Wed, 25 Apr 2018 16:58:25 +0200
Subject: [PATCH 2/4] BUG/MEDIUM: pollers: Use a global list for fd shared
between threads.

With the old model, any fd shared by multiple threads, such as listeners
or dns sockets, would only be updated on one threads, so that could lead
to missed event, or spurious wakeups.
To avoid this, add a global list for fd that are shared, using the same
implementation as the fd cache, and only remove entries from this list
when every thread as updated its poller.
---
include/common/hathreads.h | 2 +
include/proto/fd.h | 54 +++++++++++++++++++++++---
include/types/fd.h | 1 +
src/ev_epoll.c | 92 +++++++++++++++++++++++++++----------------
src/ev_kqueue.c | 84 ++++++++++++++++++++++++++-------------
src/ev_poll.c | 95 ++++++++++++++++++++++++++++++---------------
src/ev_select.c | 97 ++++++++++++++++++++++++++++++----------------
src/fd.c | 5 ++-
src/hathreads.c | 2 +-
9 files changed, 297 insertions(+), 135 deletions(-)

diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 0f10b48ca..e27ecc63f 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -256,6 +256,8 @@ void thread_exit_sync(void);
int thread_no_sync(void);
int thread_need_sync(void);

+extern unsigned long all_threads_mask;
+
#if defined(DEBUG_THREAD) || defined(DEBUG_FULL)

/* WARNING!!! if you update this enum, please also keep lock_label() up to date below */
diff --git a/include/proto/fd.h b/include/proto/fd.h
index 543a42007..a88f7ae64 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -36,6 +36,8 @@
extern volatile struct fdlist fd_cache;
extern volatile struct fdlist fd_cache_local[MAX_THREADS];

+extern volatile struct fdlist update_list;
+
extern unsigned long fd_cache_mask; // Mask of threads with events in the cache

extern THREAD_LOCAL int *fd_updt; // FD updates list
@@ -101,15 +103,55 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);
*/
static inline void updt_fd_polling(const int fd)
{
- unsigned int oldupdt;
+ if (fdtab[fd].thread_mask == tid_bit) {
+ unsigned int oldupdt;
+
+ /* note: we don't have a test-and-set yet in hathreads */
+
+ if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+ return;
+
+ oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1;
+ fd_updt[oldupdt] = fd;
+ } else {
+ unsigned long update_mask = fdtab[fd].update_mask;
+ do {
+ if (update_mask == fdtab[fd].thread_mask)
+ return;
+ } while (!HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask,
+ fdtab[fd].thread_mask));
+ fd_add_to_fd_list(&update_list, fd, offsetof(struct fdtab, update));
+ }

- /* note: we don't have a test-and-set yet in hathreads */
+}

- if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
- return;
+/* Called from the poller to acknoledge we read an entry from the global
+ * update list, to remove our bit from the update_mask, and remove it from
+ * the list if we were the last one.
+ */
+static inline void done_update_polling(int fd)
+{
+ unsigned long update_mask;
+
+ update_mask = HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+ while ((update_mask & all_threads_mask) == 0) {
+ /* If we were the last one that had to update that entry, remove it from the list */
+ fd_rm_from_fd_list(&update_list, fd, offsetof(struct fdtab, update));
+ update_mask = (volatile unsigned long)fdtab[fd].update_mask;
+ if ((update_mask & all_threads_mask) != 0) {
+ /* Maybe it's been re-updated in the meanwhile, and we
+ * wrongly removed it from the list, if so, re-add it
+ */
+ fd_add_to_fd_list(&update_list, fd, offsetof(struct fdtab, update));
+ update_mask = (volatile unsigned long)(fdtab[fd].update_mask);
+ /* And then check again, just in case after all it
+ * should be removed, even if it's very unlikely, given
+ * the current thread wouldn't have been able to take
+ * care of it yet */
+ } else
+ break;

- oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1;
- fd_updt[oldupdt] = fd;
+ }
}

/* Allocates a cache entry for a file descriptor if it does not yet have one.
diff --git a/include/types/fd.h b/include/types/fd.h
index 0902e7fc4..aa18ebefc 100644
--- a/include/types/fd.h
+++ b/include/types/fd.h
@@ -117,6 +117,7 @@ struct fdtab {
unsigned long polled_mask; /* mask of thread IDs currently polling this fd */
unsigned long update_mask; /* mask of thread IDs having an update for fd */
struct fdlist_entry cache; /* Entry in the fdcache */
+ struct fdlist_entry update; /* Entry in the global update list */
void (*iocb)(int fd); /* I/O handler */
void *owner; /* the connection or listener associated with this fd, NULL if closed */
unsigned char state; /* FD state for read and write directions (2*3 bits) */
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index a8e57973f..2fec9070d 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -59,16 +59,55 @@ REGPRM1 static void __fd_clo(int fd)
}
}

+static inline void _update_fd(int fd)
+{
+ int en, opcode;
+
+ en = fdtab[fd].state;
+
+ if (fdtab[fd].polled_mask & tid_bit) {
+ if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
+ /* fd removed from poll list */
+ opcode = EPOLL_CTL_DEL;
+ HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+ }
+ else {
+ /* fd status changed */
+ opcode = EPOLL_CTL_MOD;
+ }
+ }
+ else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
+ /* new fd in the poll list */
+ opcode = EPOLL_CTL_ADD;
+ HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+ }
+ else {
+ return;
+ }
+
+ /* construct the epoll events based on new state */
+ ev.events = 0;
+ if (en & FD_EV_POLLED_R)
+ ev.events |= EPOLLIN | EPOLLRDHUP;
+
+ if (en & FD_EV_POLLED_W)
+ ev.events |= EPOLLOUT;
+
+ ev.data.fd = fd;
+ epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
+}
+
/*
* Linux epoll() poller
*/
REGPRM2 static void _do_poll(struct poller *p, int exp)
{
- int status, en;
- int fd, opcode;
+ int status;
+ int fd;
int count;
int updt_idx;
int wait_time;
+ int old_fd;

/* first, scan the update list to find polling changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
@@ -80,40 +119,27 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
continue;
}

- en = fdtab[fd].state;
-
- if (fdtab[fd].polled_mask & tid_bit) {
- if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
- /* fd removed from poll list */
- opcode = EPOLL_CTL_DEL;
- HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
- }
- else {
- /* fd status changed */
- opcode = EPOLL_CTL_MOD;
- }
- }
- else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
- /* new fd in the poll list */
- opcode = EPOLL_CTL_ADD;
- HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
- }
- else {
+ _update_fd(fd);
+ }
+ fd_nbupdt = 0;
+ /* Scan the global update list */
+ for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+ if (fd == -2) {
+ fd = old_fd;
continue;
}
-
- /* construct the epoll events based on new state */
- ev.events = 0;
- if (en & FD_EV_POLLED_R)
- ev.events |= EPOLLIN | EPOLLRDHUP;
-
- if (en & FD_EV_POLLED_W)
- ev.events |= EPOLLOUT;
-
- ev.data.fd = fd;
- epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
+ else if (fd <= -3)
+ fd = -fd -4;
+ if (fd == -1)
+ break;
+ if (fdtab[fd].update_mask & tid_bit)
+ done_update_polling(fd);
+ else
+ continue;
+ if (!fdtab[fd].owner)
+ continue;
+ _update_fd(fd);
}
- fd_nbupdt = 0;

/* compute the epoll_wait() timeout */
if (!exp)
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index ebfd5d210..6392dbfdd 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -33,6 +33,41 @@ static int kqueue_fd[MAX_THREADS]; // per-thread kqueue_fd
static THREAD_LOCAL struct kevent *kev = NULL;
static struct kevent *kev_out = NULL; // Trash buffer for kevent() to write the eventlist in

+static inline int _update_fd(int fd)
+{
+ int en;
+ int changes = 0;
+
+ en = fdtab[fd].state;
+
+ if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
+ if (!(fdtab[fd].polled_mask & tid_bit)) {
+ /* fd was not watched, it's still not */
+ return 0;
+ }
+ /* fd totally removed from poll list */
+ EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+ EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+ HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+ }
+ else {
+ /* OK fd has to be monitored, it was either added or changed */
+
+ if (en & FD_EV_POLLED_R)
+ EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
+ else if (fdtab[fd].polled_mask & tid_bit)
+ EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+
+ if (en & FD_EV_POLLED_W)
+ EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
+ else if (fdtab[fd].polled_mask & tid_bit)
+ EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+
+ HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+ }
+ return changes;
+}
+
/*
* kqueue() poller
*/
@@ -41,8 +76,9 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
int status;
int count, fd, delta_ms;
struct timespec timeout;
- int updt_idx, en;
+ int updt_idx;
int changes = 0;
+ int old_fd;

timeout.tv_sec = 0;
timeout.tv_nsec = 0;
@@ -55,35 +91,27 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
activity[tid].poll_drop++;
continue;
}
-
- en = fdtab[fd].state;
-
- if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
- if (!(fdtab[fd].polled_mask & tid_bit)) {
- /* fd was not watched, it's still not */
- continue;
- }
- /* fd totally removed from poll list */
- EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
- EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
- }
- else {
- /* OK fd has to be monitored, it was either added or changed */
-
- if (en & FD_EV_POLLED_R)
- EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
- else if (fdtab[fd].polled_mask & tid_bit)
- EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
-
- if (en & FD_EV_POLLED_W)
- EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
- else if (fdtab[fd].polled_mask & tid_bit)
- EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-
- HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+ changes += _update_fd(fd);
+ }
+ /* Scan the global update list */
+ for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+ if (fd == -2) {
+ fd = old_fd;
+ continue;
}
+ else if (fd <= -3)
+ fd = -fd -4;
+ if (fd == -1)
+ break;
+ if (fdtab[fd].update_mask & tid_bit)
+ done_update_polling(fd);
+ else
+ continue;
+ if (!fdtab[fd].owner)
+ continue;
+ changes += _update_fd(fd);
}
+
if (changes) {
#ifdef EV_RECEIPT
kev[0].flags |= EV_RECEIPT;
diff --git a/src/ev_poll.c b/src/ev_poll.c
index 6093b652b..2f10fe9ae 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -45,6 +45,44 @@ REGPRM1 static void __fd_clo(int fd)
hap_fd_clr(fd, fd_evts[DIR_WR]);
}

+static inline void _update_fd(int fd, int *max_add_fd)
+{
+ int en;
+
+ en = fdtab[fd].state;
+
+ /* we have a single state for all threads, which is why we
+ * don't check the tid_bit. First thread to see the update
+ * takes it for every other one.
+ */
+ if (!(en & FD_EV_POLLED_RW)) {
+ if (!fdtab[fd].polled_mask) {
+ /* fd was not watched, it's still not */
+ return;
+ }
+ /* fd totally removed from poll list */
+ hap_fd_clr(fd, fd_evts[DIR_RD]);
+ hap_fd_clr(fd, fd_evts[DIR_WR]);
+ HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
+ }
+ else {
+ /* OK fd has to be monitored, it was either added or changed */
+ if (!(en & FD_EV_POLLED_R))
+ hap_fd_clr(fd, fd_evts[DIR_RD]);
+ else
+ hap_fd_set(fd, fd_evts[DIR_RD]);
+
+ if (!(en & FD_EV_POLLED_W))
+ hap_fd_clr(fd, fd_evts[DIR_WR]);
+ else
+ hap_fd_set(fd, fd_evts[DIR_WR]);
+
+ HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+ if (fd > *max_add_fd)
+ *max_add_fd = fd;
+ }
+}
+
/*
* Poll() poller
*/
@@ -53,11 +91,12 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
int status;
int fd;
int wait_time;
- int updt_idx, en;
+ int updt_idx;
int fds, count;
int sr, sw;
int old_maxfd, new_maxfd, max_add_fd;
unsigned rn, wn; /* read new, write new */
+ int old_fd;

max_add_fd = -1;

@@ -70,39 +109,31 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
activity[tid].poll_drop++;
continue;
}
+ _update_fd(fd, &max_add_fd);
+ }

- en = fdtab[fd].state;
-
- /* we have a single state for all threads, which is why we
- * don't check the tid_bit. First thread to see the update
- * takes it for every other one.
- */
- if (!(en & FD_EV_POLLED_RW)) {
- if (!fdtab[fd].polled_mask) {
- /* fd was not watched, it's still not */
- continue;
- }
- /* fd totally removed from poll list */
- hap_fd_clr(fd, fd_evts[DIR_RD]);
- hap_fd_clr(fd, fd_evts[DIR_WR]);
- HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
- }
- else {
- /* OK fd has to be monitored, it was either added or changed */
- if (!(en & FD_EV_POLLED_R))
- hap_fd_clr(fd, fd_evts[DIR_RD]);
- else
- hap_fd_set(fd, fd_evts[DIR_RD]);
-
- if (!(en & FD_EV_POLLED_W))
- hap_fd_clr(fd, fd_evts[DIR_WR]);
- else
- hap_fd_set(fd, fd_evts[DIR_WR]);
-
- HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
- if (fd > max_add_fd)
- max_add_fd = fd;
+ /* Now scan the global update list */
+ for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+ if (fd == -2) {
+ fd = old_fd;
+ continue;
}
+ else if (fd <= -3)
+ fd = -fd -4;
+ if (fd == -1)
+ break;
+ if (fdtab[fd].update_mask & tid_bit) {
+ /* Cheat a bit, as the state is global to all pollers
+ * we don't need every thread ot take care of the
+ * update.
+ */
+ HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
+ done_update_polling(fd);
+ } else
+ continue;
+ if (!fdtab[fd].owner)
+ continue;
+ _update_fd(fd, &max_add_fd);
}

/* maybe we added at least one fd larger than maxfd */
diff --git a/src/ev_select.c b/src/ev_select.c
index 163a45839..67775ca7e 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -36,6 +36,44 @@ REGPRM1 static void __fd_clo(int fd)
hap_fd_clr(fd, fd_evts[DIR_WR]);
}

+static inline void _update_fd(int fd, int *max_add_fd)
+{
+ int en;
+
+ en = fdtab[fd].state;
+
+ /* we have a single state for all threads, which is why we
+ * don't check the tid_bit. First thread to see the update
+ * takes it for every other one.
+ */
+ if (!(en & FD_EV_POLLED_RW)) {
+ if (!fdtab[fd].polled_mask) {
+ /* fd was not watched, it's still not */
+ return;
+ }
+ /* fd totally removed from poll list */
+ hap_fd_clr(fd, fd_evts[DIR_RD]);
+ hap_fd_clr(fd, fd_evts[DIR_WR]);
+ HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
+ }
+ else {
+ /* OK fd has to be monitored, it was either added or changed */
+ if (!(en & FD_EV_POLLED_R))
+ hap_fd_clr(fd, fd_evts[DIR_RD]);
+ else
+ hap_fd_set(fd, fd_evts[DIR_RD]);
+
+ if (!(en & FD_EV_POLLED_W))
+ hap_fd_clr(fd, fd_evts[DIR_WR]);
+ else
+ hap_fd_set(fd, fd_evts[DIR_WR]);
+
+ HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+ if (fd > *max_add_fd)
+ *max_add_fd = fd;
+ }
+}
+
/*
* Select() poller
*/
@@ -46,10 +84,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
struct timeval delta;
int delta_ms;
int fds;
- int updt_idx, en;
+ int updt_idx;
char count;
int readnotnull, writenotnull;
int old_maxfd, new_maxfd, max_add_fd;
+ int old_fd;

max_add_fd = -1;

@@ -62,41 +101,33 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
activity[tid].poll_drop++;
continue;
}
-
- en = fdtab[fd].state;
-
- /* we have a single state for all threads, which is why we
- * don't check the tid_bit. First thread to see the update
- * takes it for every other one.
- */
- if (!(en & FD_EV_POLLED_RW)) {
- if (!fdtab[fd].polled_mask) {
- /* fd was not watched, it's still not */
- continue;
- }
- /* fd totally removed from poll list */
- hap_fd_clr(fd, fd_evts[DIR_RD]);
- hap_fd_clr(fd, fd_evts[DIR_WR]);
- HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
- }
- else {
- /* OK fd has to be monitored, it was either added or changed */
- if (!(en & FD_EV_POLLED_R))
- hap_fd_clr(fd, fd_evts[DIR_RD]);
- else
- hap_fd_set(fd, fd_evts[DIR_RD]);
-
- if (!(en & FD_EV_POLLED_W))
- hap_fd_clr(fd, fd_evts[DIR_WR]);
- else
- hap_fd_set(fd, fd_evts[DIR_WR]);
-
- HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
- if (fd > max_add_fd)
- max_add_fd = fd;
+ _update_fd(fd, &max_add_fd);
+ }
+ /* Now scan the global update list */
+ for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+ if (fd == -2) {
+ fd = old_fd;
+ continue;
}
+ else if (fd <= -3)
+ fd = -fd -4;
+ if (fd == -1)
+ break;
+ if (fdtab[fd].update_mask & tid_bit) {
+ /* Cheat a bit, as the state is global to all pollers
+ * we don't need every thread ot take care of the
+ * update.
+ */
+ HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
+ done_update_polling(fd);
+ } else
+ continue;
+ if (!fdtab[fd].owner)
+ continue;
+ _update_fd(fd, &max_add_fd);
}

+
/* maybe we added at least one fd larger than maxfd */
for (old_maxfd = maxfd; old_maxfd <= max_add_fd; ) {
if (HA_ATOMIC_CAS(&maxfd, &old_maxfd, max_add_fd + 1))
diff --git a/src/fd.c b/src/fd.c
index cbf22bd22..508ee098f 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -169,6 +169,7 @@ int nbpollers = 0;

volatile struct fdlist fd_cache ; // FD events cache
volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread
+volatile struct fdlist update_list; // Global update list

unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache

@@ -244,7 +245,6 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off)
int prev;
int next;
int last;
-
lock_self:
#if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW))
next_list.next = next_list.prev = -2;
@@ -492,6 +492,7 @@ int init_pollers()
goto fail_info;

fd_cache.first = fd_cache.last = -1;
+ update_list.first = update_list.last = -1;
hap_register_per_thread_init(init_pollers_per_thread);
hap_register_per_thread_deinit(deinit_pollers_per_thread);

@@ -499,7 +500,7 @@ int init_pollers()
HA_SPIN_INIT(&fdtab[p].lock);
/* Mark the fd as out of the fd cache */
fdtab[p].cache.next = -3;
- fdtab[p].cache.next = -3;
+ fdtab[p].update.next = -3;
}
for (p = 0; p < global.nbthread; p++)
fd_cache_local[p].first = fd_cache_local[p].last = -1;
diff --git a/src/hathreads.c b/src/hathreads.c
index 0d690f383..5db3c2197 100644
--- a/src/hathreads.c
+++ b/src/hathreads.c
@@ -31,7 +31,7 @@ void thread_sync_io_handler(int fd)
static HA_SPINLOCK_T sync_lock;
static int threads_sync_pipe[2];
static unsigned long threads_want_sync = 0;
-static unsigned long all_threads_mask = 0;
+unsigned long all_threads_mask = 0;

#if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
struct lock_stat lock_stats[LOCK_LABELS];
--
2.14.3

From 036d5b1c96e70b38764cc69e58741011f5786159 Mon Sep 17 00:00:00 2001
From: Olivier Houchard <[email protected]>
Date: Thu, 26 Apr 2018 14:23:07 +0200
Subject: [PATCH 3/4] MINOR: pollers: move polled_mask outside of struct fdtab.

The polled_mask is only used in the pollers, and removing it from the
struct fdtab makes it fit in one 64B cacheline again, on a 64bits machine,
so make it a separate array.
---
include/proto/fd.h | 2 ++
include/types/fd.h | 1 -
src/ev_epoll.c | 10 +++++-----
src/ev_kqueue.c | 10 +++++-----
src/ev_poll.c | 6 +++---
src/ev_select.c | 6 +++---
src/fd.c | 8 +++++++-
7 files changed, 25 insertions(+), 18 deletions(-)

diff --git a/include/proto/fd.h b/include/proto/fd.h
index a88f7ae64..98a58f55d 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -38,6 +38,8 @@ extern volatile struct fdlist fd_cache_local[MAX_THREADS];

extern volatile struct fdlist update_list;

+extern unsigned long *polled_mask;
+
extern unsigned long fd_cache_mask; // Mask of threads with events in the cache

extern THREAD_LOCAL int *fd_updt; // FD updates list
diff --git a/include/types/fd.h b/include/types/fd.h
index aa18ebefc..5947bafc3 100644
--- a/include/types/fd.h
+++ b/include/types/fd.h
@@ -114,7 +114,6 @@ struct fdlist {
struct fdtab {
__decl_hathreads(HA_SPINLOCK_T lock);
unsigned long thread_mask; /* mask of thread IDs authorized to process the task */
- unsigned long polled_mask; /* mask of thread IDs currently polling this fd */
unsigned long update_mask; /* mask of thread IDs having an update for fd */
struct fdlist_entry cache; /* Entry in the fdcache */
struct fdlist_entry update; /* Entry in the global update list */
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index 2fec9070d..05f1b5ec8 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -50,7 +50,7 @@ static THREAD_LOCAL struct epoll_event ev;
REGPRM1 static void __fd_clo(int fd)
{
if (unlikely(fdtab[fd].cloned)) {
- unsigned long m = fdtab[fd].polled_mask;
+ unsigned long m = polled_mask[fd];
int i;

for (i = global.nbthread - 1; i >= 0; i--)
@@ -65,11 +65,11 @@ static inline void _update_fd(int fd)

en = fdtab[fd].state;

- if (fdtab[fd].polled_mask & tid_bit) {
+ if (polled_mask[fd] & tid_bit) {
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
/* fd removed from poll list */
opcode = EPOLL_CTL_DEL;
- HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+ HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
}
else {
/* fd status changed */
@@ -79,7 +79,7 @@ static inline void _update_fd(int fd)
else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
/* new fd in the poll list */
opcode = EPOLL_CTL_ADD;
- HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+ HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
}
else {
return;
@@ -177,7 +177,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
/* FD has been migrated */
activity[tid].poll_skip++;
epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
- HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+ HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
continue;
}

diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 6392dbfdd..0fa4e1651 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -41,29 +41,29 @@ static inline int _update_fd(int fd)
en = fdtab[fd].state;

if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
- if (!(fdtab[fd].polled_mask & tid_bit)) {
+ if (!(polled_mask[fd] & tid_bit)) {
/* fd was not watched, it's still not */
return 0;
}
/* fd totally removed from poll list */
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+ HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */

if (en & FD_EV_POLLED_R)
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
- else if (fdtab[fd].polled_mask & tid_bit)
+ else if (polled_mask[fd] & tid_bit)
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);

if (en & FD_EV_POLLED_W)
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
- else if (fdtab[fd].polled_mask & tid_bit)
+ else if (polled_mask[fd] & tid_bit)
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);

- HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+ HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
}
return changes;
}
diff --git a/src/ev_poll.c b/src/ev_poll.c
index 2f10fe9ae..65c92d992 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -56,14 +56,14 @@ static inline void _update_fd(int fd, int *max_add_fd)
* takes it for every other one.
*/
if (!(en & FD_EV_POLLED_RW)) {
- if (!fdtab[fd].polled_mask) {
+ if (!polled_mask[fd]) {
/* fd was not watched, it's still not */
return;
}
/* fd totally removed from poll list */
hap_fd_clr(fd, fd_evts[DIR_RD]);
hap_fd_clr(fd, fd_evts[DIR_WR]);
- HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
+ HA_ATOMIC_AND(&polled_mask[fd], 0);
}
else {
/* OK fd has to be monitored, it was either added or changed */
@@ -77,7 +77,7 @@ static inline void _update_fd(int fd, int *max_add_fd)
else
hap_fd_set(fd, fd_evts[DIR_WR]);

- HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+ HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
if (fd > *max_add_fd)
*max_add_fd = fd;
}
diff --git a/src/ev_select.c b/src/ev_select.c
index 67775ca7e..d83db6c74 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -47,14 +47,14 @@ static inline void _update_fd(int fd, int *max_add_fd)
* takes it for every other one.
*/
if (!(en & FD_EV_POLLED_RW)) {
- if (!fdtab[fd].polled_mask) {
+ if (!polled_mask[fd]) {
/* fd was not watched, it's still not */
return;
}
/* fd totally removed from poll list */
hap_fd_clr(fd, fd_evts[DIR_RD]);
hap_fd_clr(fd, fd_evts[DIR_WR]);
- HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
+ HA_ATOMIC_AND(&polled_mask[fd], 0);
}
else {
/* OK fd has to be monitored, it was either added or changed */
@@ -68,7 +68,7 @@ static inline void _update_fd(int fd, int *max_add_fd)
else
hap_fd_set(fd, fd_evts[DIR_WR]);

- HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+ HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
if (fd > *max_add_fd)
*max_add_fd = fd;
}
diff --git a/src/fd.c b/src/fd.c
index 508ee098f..bb6fc4283 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -159,6 +159,7 @@
#include <proto/port_range.h>

struct fdtab *fdtab = NULL; /* array of all the file descriptors */
+unsigned long *polled_mask = NULL; /* Array for the polled_mask of each fd */
struct fdinfo *fdinfo = NULL; /* less-often used infos for file descriptors */
int totalconn; /* total # of terminated sessions */
int actconn; /* # of active sessions */
@@ -373,7 +374,7 @@ static void fd_dodelete(int fd, int do_close)
fdtab[fd].update_mask &= ~tid_bit;
fdtab[fd].thread_mask = 0;
if (do_close) {
- fdtab[fd].polled_mask = 0;
+ polled_mask[fd] = 0;
close(fd);
}
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
@@ -488,6 +489,8 @@ int init_pollers()
if ((fdtab = calloc(global.maxsock, sizeof(struct fdtab))) == NULL)
goto fail_tab;

+ if ((polled_mask = calloc(global.maxsock, sizeof(unsigned long))) == NULL)
+ goto fail_polledmask;
if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL)
goto fail_info;

@@ -526,6 +529,8 @@ int init_pollers()
fail_info:
free(fdtab);
fail_tab:
+ free(polled_mask);
+ fail_polledmask:
return 0;
}

@@ -549,6 +554,7 @@ void deinit_pollers() {

free(fdinfo); fdinfo = NULL;
free(fdtab); fdtab = NULL;
+ free(polled_mask); polled_mask = NULL;
}

/*
--
2.14.3
Willy Tarreau
Re: [PATCH] Make sure all the pollers get fd updates
May 06, 2018 06:40AM
Hi Olivier,

On Fri, May 04, 2018 at 05:32:24PM +0200, Olivier Houchard wrote:
> This can't be applied to 1.8, as it uses code that was not, and probably won't
> be, backported, so a different patch, similar in spirit, will be developed.

Thanks for these patches. Now applied. I mentioned the point above in the
second patch's commit message to help stable maintainers know what to do
with it. Now you'll have fun figuring a solution suitable for 1.8 :-)

Willy
Olivier Houchard
Re: [PATCH] Make sure all the pollers get fd updates
May 17, 2018 07:50PM
Hi,

On Fri, May 04, 2018 at 05:32:24PM +0200, Olivier Houchard wrote:
> Hi,
>
> When the code was changed to use one poller per thread, we overlooked the
> fact that some fds can be shared between multiple threads, and when one
> event occured, that required the fd to be added or removed from the poller,
> the information would be sent only to the current thread, the other threads
> may totally miss it, and thus either miss events, or report spurious events.
> The attached patches is an attempt at fixing this, by adding a new global
> update list, in addition to the local thread update list.
>
> This can't be applied to 1.8, as it uses code that was not, and probably won't
> be, backported, so a different patch, similar in spirit, will be developed.
>

Here is a patch that should do the same for 1.8.

Regards,

Olivier
From 3f8fe65b4433f4f8c543ff9b11c48554fe862f45 Mon Sep 17 00:00:00 2001
From: Olivier Houchard <[email protected]>
Date: Thu, 17 May 2018 18:34:02 +0200
Subject: [PATCH] BUG/MEDIUM: pollers: Use a global list for fd shared between
threads.

With the old model, any fd shared by multiple threads, such as listeners
or dns sockets, would only be updated on one threads, so that could lead
to missed event, or spurious wakeups.
To avoid this, add a global list for fd that are shared, and only remove
entries from this list when every thread as updated its poller.
This subtly changes the semantics of updt_fd_polling(), as it now unlocks
the FD_LOCK on exit.

This is similar in spirit to commit 6b96f7289c2f401deef4bdc6e20792360807dde4
(with the bugfix from c55b88ece616afe0b28dc81eb39bad37b5f9c33f) applied,
but had to be rewrote, because of the differences between 1.8 and master.

This should only be applied to 1.8.
---
include/common/hathreads.h | 4 ++
include/proto/fd.h | 130 ++++++++++++++++++++++++++++++++++-----------
include/types/fd.h | 13 +++++
src/ev_epoll.c | 90 +++++++++++++++++++++----------
src/ev_kqueue.c | 83 +++++++++++++++++++++--------
src/ev_poll.c | 45 ++++++++++++++++
src/ev_select.c | 37 +++++++++++++
src/fd.c | 6 +++
src/hathreads.c | 2 +-
9 files changed, 327 insertions(+), 83 deletions(-)

diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 325a869a..86db4d5c 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -201,6 +201,8 @@ void thread_exit_sync(void);
int thread_no_sync(void);
int thread_need_sync(void);

+extern unsigned long all_threads_mask;
+
#if defined(DEBUG_THREAD) || defined(DEBUG_FULL)

/* WARNING!!! if you update this enum, please also keep lock_label() up to date below */
@@ -209,6 +211,7 @@ enum lock_label {
FDTAB_LOCK,
FDCACHE_LOCK,
FD_LOCK,
+ FD_UPDATE_LOCK,
POLL_LOCK,
TASK_RQ_LOCK,
TASK_WQ_LOCK,
@@ -330,6 +333,7 @@ static inline const char *lock_label(enum lock_label label)
case FDCACHE_LOCK: return "FDCACHE";
case FD_LOCK: return "FD";
case FDTAB_LOCK: return "FDTAB";
+ case FD_UPDATE_LOCK: return "FD_UPDATE";
case POLL_LOCK: return "POLL";
case TASK_RQ_LOCK: return "TASK_RQ";
case TASK_WQ_LOCK: return "TASK_WQ";
diff --git a/include/proto/fd.h b/include/proto/fd.h
index bb91bb2c..b6199ccf 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -43,6 +43,9 @@ extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
__decl_hathreads(extern HA_SPINLOCK_T __attribute__((aligned(64))) fdtab_lock); /* global lock to protect fdtab array */
__decl_hathreads(extern HA_RWLOCK_T __attribute__((aligned(64))) fdcache_lock); /* global lock to protect fd_cache array */
__decl_hathreads(extern HA_SPINLOCK_T __attribute__((aligned(64))) poll_lock); /* global lock to protect poll info */
+__decl_hathreads(extern HA_SPINLOCK_T __attribute__((aligned(64))) fd_updt_lock); /* global lock to protect the update list */
+
+extern struct fdlist update_list; // Global update list

/* Deletes an FD from the fdsets, and recomputes the maxfd limit.
* The file descriptor is also closed.
@@ -96,14 +99,70 @@ void fd_process_cached_events();

/* Mark fd <fd> as updated for polling and allocate an entry in the update list
* for this if it was not already there. This can be done at any time.
+ * This function expects the FD lock to be locked, and returns with the
+ * FD lock unlocked.
*/
static inline void updt_fd_polling(const int fd)
{
- if (fdtab[fd].update_mask & tid_bit)
+ if ((fdtab[fd].update_mask & fdtab[fd].thread_mask) ==
+ fdtab[fd].thread_mask) {
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
/* already scheduled for update */
return;
- fdtab[fd].update_mask |= tid_bit;
- fd_updt[fd_nbupdt++] = fd;
+ }
+ if (fdtab[fd].thread_mask == tid_bit) {
+ fdtab[fd].update_mask |= tid_bit;
+ fd_updt[fd_nbupdt++] = fd;
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ } else {
+ /* This is ugly, but we can afford to unlock the FD lock
+ * before we acquire the fd_updt_lock, to prevent a
+ * lock order reversal, because this function is only called
+ * from fd_update_cache(), and all users of fd_update_cache()
+ * used to just unlock the fd lock just after, anyway.
+ */
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ HA_SPIN_LOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+ /* If update_mask is non-nul, then it's already in the list
+ * so we don't have to add it.
+ */
+ if (fdtab[fd].update_mask == 0) {
+ if (update_list.first == -1) {
+ update_list.first = update_list.last = fd;
+ fdtab[fd].update.next = fdtab[fd].update.prev = -1;
+ } else {
+ fdtab[update_list.last].update.next = fd;
+ fdtab[fd].update.prev = update_list.last;
+ fdtab[fd].update.next = -1;
+ update_list.last = fd;
+ }
+ }
+ fdtab[fd].update_mask |= fdtab[fd].thread_mask;
+ HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+
+ }
+}
+
+/* Called from the poller to acknoledge we read an entry from the global
+ * update list, to remove our bit from the update_mask, and remove it from
+ * the list if we were the last one.
+ */
+/* Expects to be called with the FD lock and the FD update lock held */
+static inline void done_update_polling(int fd)
+{
+ fdtab[fd].update_mask &= ~tid_bit;
+ if ((fdtab[fd].update_mask & all_threads_mask) == 0) {
+ if (fdtab[fd].update.prev != -1)
+ fdtab[fdtab[fd].update.prev].update.next =
+ fdtab[fd].update.next;
+ else
+ update_list.first = fdtab[fd].update.next;
+ if (fdtab[fd].update.next != -1)
+ fdtab[fdtab[fd].update.next].update.prev =
+ fdtab[fd].update.prev;
+ else
+ update_list.last = fdtab[fd].update.prev;
+ }
}


@@ -175,13 +234,6 @@ static inline int fd_compute_new_polled_status(int state)
*/
static inline void fd_update_cache(int fd)
{
- /* 3 states for each direction require a polling update */
- if ((fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_ACTIVE_R)) == FD_EV_POLLED_R ||
- (fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_READY_R | FD_EV_ACTIVE_R)) == FD_EV_ACTIVE_R ||
- (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_ACTIVE_W)) == FD_EV_POLLED_W ||
- (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_READY_W | FD_EV_ACTIVE_W)) == FD_EV_ACTIVE_W)
- updt_fd_polling(fd);
-
/* only READY and ACTIVE states (the two with both flags set) require a cache entry */
if (((fdtab[fd].state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
((fdtab[fd].state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) {
@@ -190,6 +242,14 @@ static inline void fd_update_cache(int fd)
else {
fd_release_cache_entry(fd);
}
+ /* 3 states for each direction require a polling update */
+ if ((fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_ACTIVE_R)) == FD_EV_POLLED_R ||
+ (fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_READY_R | FD_EV_ACTIVE_R)) == FD_EV_ACTIVE_R ||
+ (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_ACTIVE_W)) == FD_EV_POLLED_W ||
+ (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_READY_W | FD_EV_ACTIVE_W)) == FD_EV_ACTIVE_W)
+ updt_fd_polling(fd);
+ else
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

/*
@@ -271,8 +331,9 @@ static inline void fd_stop_recv(int fd)
if (fd_recv_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_R;
fd_update_cache(fd); /* need an update entry to change the state */
- }
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ /* the FD lock is unlocked by fd_update_cache() */
+ } else
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

/* Disable processing send events on fd <fd> */
@@ -282,8 +343,9 @@ static inline void fd_stop_send(int fd)
if (fd_send_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_W;
fd_update_cache(fd); /* need an update entry to change the state */
- }
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ /* the FD lock is unlocked by fd_update_cache() */
+ } else
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

/* Disable processing of events on fd <fd> for both directions. */
@@ -293,8 +355,9 @@ static inline void fd_stop_both(int fd)
if (fd_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_RW;
fd_update_cache(fd); /* need an update entry to change the state */
- }
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ /* the FD lock is unlocked by fd_update_cache() */
+ } else
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

/* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
@@ -304,8 +367,9 @@ static inline void fd_cant_recv(const int fd)
if (fd_recv_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
- }
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ /* the FD lock is unlocked by fd_update_cache() */
+ } else
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

/* Report that FD <fd> can receive anymore without polling. */
@@ -315,8 +379,9 @@ static inline void fd_may_recv(const int fd)
if (!fd_recv_ready(fd)) {
fdtab[fd].state |= FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
- }
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ /* the FD lock is unlocked by fd_update_cache() */
+ } else
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

/* Disable readiness when polled. This is useful to interrupt reading when it
@@ -330,8 +395,9 @@ static inline void fd_done_recv(const int fd)
if (fd_recv_polled(fd) && fd_recv_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
- }
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ /* the FD lock is unlocked by fd_update_cache() */
+ } else
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

/* Report that FD <fd> cannot send anymore without polling (EAGAIN detected). */
@@ -341,8 +407,9 @@ static inline void fd_cant_send(const int fd)
if (fd_send_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_W;
fd_update_cache(fd); /* need an update entry to change the state */
- }
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ /* the FD lock is unlocked by fd_update_cache() */
+ } else
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

/* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
@@ -352,8 +419,9 @@ static inline void fd_may_send(const int fd)
if (!fd_send_ready(fd)) {
fdtab[fd].state |= FD_EV_READY_W;
fd_update_cache(fd); /* need an update entry to change the state */
- }
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ /* the FD lock is unlocked by fd_update_cache() */
+ } else
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

/* Prepare FD <fd> to try to receive */
@@ -363,8 +431,9 @@ static inline void fd_want_recv(int fd)
if (!fd_recv_active(fd)) {
fdtab[fd].state |= FD_EV_ACTIVE_R;
fd_update_cache(fd); /* need an update entry to change the state */
- }
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ /* the FD lock is unlocked by fd_update_cache() */
+ } else
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

/* Prepare FD <fd> to try to send */
@@ -374,8 +443,9 @@ static inline void fd_want_send(int fd)
if (!fd_send_active(fd)) {
fdtab[fd].state |= FD_EV_ACTIVE_W;
fd_update_cache(fd); /* need an update entry to change the state */
- }
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ /* the FD lock is unlocked by fd_update_cache() */
+ } else
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

/* Update events seen for FD <fd> and its state if needed. This should be called
diff --git a/include/types/fd.h b/include/types/fd.h
index 9f2c5fee..8e34c624 100644
--- a/include/types/fd.h
+++ b/include/types/fd.h
@@ -90,11 +90,24 @@ enum fd_states {
*/
#define DEAD_FD_MAGIC 0xFDDEADFD

+struct fdlist_entry {
+ int next;
+ int prev;
+} __attribute__ ((aligned(8)));
+
+/* head of the fd list */
+struct fdlist {
+ int first;
+ int last;
+} __attribute__ ((aligned(8)));
+
+
/* info about one given fd */
struct fdtab {
__decl_hathreads(HA_SPINLOCK_T lock);
unsigned long thread_mask; /* mask of thread IDs authorized to process the task */
unsigned long polled_mask; /* mask of thread IDs currently polling this fd */
+ struct fdlist_entry update; /* Entry in the global update list */
unsigned long update_mask; /* mask of thread IDs having an update for fd */
void (*iocb)(int fd); /* I/O handler */
void *owner; /* the connection or listener associated with this fd, NULL if closed */
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index 124b8163..adc15acd 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -59,13 +59,51 @@ REGPRM1 static void __fd_clo(int fd)
}
}

+static void _update_fd(int fd)
+{
+ int en, opcode;
+
+ en = fdtab[fd].state;
+
+ if (fdtab[fd].polled_mask & tid_bit) {
+ if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
+ /* fd removed from poll list */
+ opcode = EPOLL_CTL_DEL;
+ HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+ }
+ else {
+ /* fd status changed */
+ opcode = EPOLL_CTL_MOD;
+ }
+ }
+ else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
+ /* new fd in the poll list */
+ opcode = EPOLL_CTL_ADD;
+ HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+ }
+ else {
+ return;
+ }
+
+ /* construct the epoll events based on new state */
+ ev.events = 0;
+ if (en & FD_EV_POLLED_R)
+ ev.events |= EPOLLIN | EPOLLRDHUP;
+
+ if (en & FD_EV_POLLED_W)
+ ev.events |= EPOLLOUT;
+
+ ev.data.fd = fd;
+ epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
+}
+
/*
* Linux epoll() poller
*/
REGPRM2 static void _do_poll(struct poller *p, int exp)
{
int status, eo, en;
- int fd, opcode;
+ int fd;
int count;
int updt_idx;
int wait_time;
@@ -89,39 +127,31 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
en = fd_compute_new_polled_status(eo);
fdtab[fd].state = en;
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
-
- if (fdtab[fd].polled_mask & tid_bit) {
- if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
- /* fd removed from poll list */
- opcode = EPOLL_CTL_DEL;
- HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
- }
- else {
- /* fd status changed */
- opcode = EPOLL_CTL_MOD;
- }
- }
- else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
- /* new fd in the poll list */
- opcode = EPOLL_CTL_ADD;
- HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
- }
+ _update_fd(fd);
+ }
+ fd_nbupdt = 0;
+ /* Scan the global update list */
+ HA_SPIN_LOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+ for (fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+ HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ if (fdtab[fd].update_mask & tid_bit)
+ done_update_polling(fd);
else {
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
continue;
}
+ fdtab[fd].new = 0;

- /* construct the epoll events based on new state */
- ev.events = 0;
- if (en & FD_EV_POLLED_R)
- ev.events |= EPOLLIN | EPOLLRDHUP;
-
- if (en & FD_EV_POLLED_W)
- ev.events |= EPOLLOUT;
+ eo = fdtab[fd].state;
+ en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;

- ev.data.fd = fd;
- epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ if (!fdtab[fd].owner)
+ continue;
+ _update_fd(fd);
}
- fd_nbupdt = 0;
+ HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);

/* compute the epoll_wait() timeout */
if (!exp)
@@ -208,8 +238,10 @@ static int init_epoll_per_thread()
* fd for this thread. Let's just mark them as updated, the poller will
* do the rest.
*/
- for (fd = 0; fd < maxfd; fd++)
+ for (fd = 0; fd < maxfd; fd++) {
+ HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
updt_fd_polling(fd);
+ }

return 1;
fail_fd:
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 8cd6dd84..642de8b3 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -33,6 +33,41 @@ static int kqueue_fd[MAX_THREADS]; // per-thread kqueue_fd
static THREAD_LOCAL struct kevent *kev = NULL;
static struct kevent *kev_out = NULL; // Trash buffer for kevent() to write the eventlist in

+static int _update_fd(int fd, int start)
+{
+ int en;
+ int changes = start;
+
+ en = fdtab[fd].state;
+
+ if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
+ if (!(fdtab[fd].polled_mask & tid_bit)) {
+ /* fd was not watched, it's still not */
+ return 0;
+ }
+ /* fd totally removed from poll list */
+ EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+ EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+ HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+ }
+ else {
+ /* OK fd has to be monitored, it was either added or changed */
+
+ if (en & FD_EV_POLLED_R)
+ EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
+ else if (fdtab[fd].polled_mask & tid_bit)
+ EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+
+ if (en & FD_EV_POLLED_W)
+ EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
+ else if (fdtab[fd].polled_mask & tid_bit)
+ EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+
+ HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+ }
+ return changes;
+}
+
/*
* kqueue() poller
*/
@@ -66,32 +101,32 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
fdtab[fd].state = en;
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);

- if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
- if (!(fdtab[fd].polled_mask & tid_bit)) {
- /* fd was not watched, it's still not */
- continue;
- }
- /* fd totally removed from poll list */
- EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
- EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
- }
- else {
- /* OK fd has to be monitored, it was either added or changed */
+ changes = _update_fd(fd, changes);
+ }

- if (en & FD_EV_POLLED_R)
- EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
- else if (fdtab[fd].polled_mask & tid_bit)
- EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+ /* Scan the global update list */
+ HA_SPIN_LOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+ for (fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+ HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ if (fdtab[fd].update_mask & tid_bit)
+ done_update_polling(fd);
+ else {
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ continue;
+ }
+ fdtab[fd].new = 0;

- if (en & FD_EV_POLLED_W)
- EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
- else if (fdtab[fd].polled_mask & tid_bit)
- EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+ eo = fdtab[fd].state;
+ en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;

- HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
- }
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ if (!fdtab[fd].owner)
+ continue;
+ changes = _update_fd(fd, changes);
}
+ HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+
if (changes) {
#ifdef EV_RECEIPT
kev[0].flags |= EV_RECEIPT;
@@ -189,8 +224,10 @@ static int init_kqueue_per_thread()
* fd for this thread. Let's just mark them as updated, the poller will
* do the rest.
*/
- for (fd = 0; fd < maxfd; fd++)
+ for (fd = 0; fd < maxfd; fd++) {
+ HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
updt_fd_polling(fd);
+ }

return 1;
fail_fd:
diff --git a/src/ev_poll.c b/src/ev_poll.c
index b7cc0bb3..c913ced2 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -104,6 +104,51 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
HA_SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
}
+ HA_SPIN_LOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+ for (fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+ HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ if (fdtab[fd].update_mask & tid_bit) {
+ /* Cheat a bit, as the state is global to all pollers
+ * we don't need every thread ot take care of the
+ * update.
+ */
+ fdtab[fd].update_mask &= ~all_threads_mask;
+ done_update_polling(fd);
+ } else {
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ continue;
+ }
+
+ if (!fdtab[fd].owner) {
+ activity[tid].poll_drop++;
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ continue;
+ }
+
+ fdtab[fd].new = 0;
+
+ eo = fdtab[fd].state;
+ en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+
+ if ((eo ^ en) & FD_EV_POLLED_RW) {
+ /* poll status changed, update the lists */
+ HA_SPIN_LOCK(POLL_LOCK, &poll_lock);
+ if ((eo & ~en) & FD_EV_POLLED_R)
+ hap_fd_clr(fd, fd_evts[DIR_RD]);
+ else if ((en & ~eo) & FD_EV_POLLED_R)
+ hap_fd_set(fd, fd_evts[DIR_RD]);
+
+ if ((eo & ~en) & FD_EV_POLLED_W)
+ hap_fd_clr(fd, fd_evts[DIR_WR]);
+ else if ((en & ~eo) & FD_EV_POLLED_W)
+ hap_fd_set(fd, fd_evts[DIR_WR]);
+ HA_SPIN_UNLOCK(POLL_LOCK, &poll_lock);
+ }
+
+ }
+ HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
fd_nbupdt = 0;

nbfd = 0;
diff --git a/src/ev_select.c b/src/ev_select.c
index 5f3486ed..bde923ea 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -70,7 +70,42 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
en = fd_compute_new_polled_status(eo);
fdtab[fd].state = en;
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ if ((eo ^ en) & FD_EV_POLLED_RW) {
+ /* poll status changed, update the lists */
+ HA_SPIN_LOCK(POLL_LOCK, &poll_lock);
+ if ((eo & ~en) & FD_EV_POLLED_R)
+ FD_CLR(fd, fd_evts[DIR_RD]);
+ else if ((en & ~eo) & FD_EV_POLLED_R)
+ FD_SET(fd, fd_evts[DIR_RD]);
+
+ if ((eo & ~en) & FD_EV_POLLED_W)
+ FD_CLR(fd, fd_evts[DIR_WR]);
+ else if ((en & ~eo) & FD_EV_POLLED_W)
+ FD_SET(fd, fd_evts[DIR_WR]);
+ HA_SPIN_UNLOCK(POLL_LOCK, &poll_lock);
+ }
+ }
+ HA_SPIN_LOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+ for (fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+ HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ if (fdtab[fd].update_mask & tid_bit) {
+ /* Cheat a bit, as the state is global to all pollers
+ * we don't need every thread ot take care of the
+ * update.
+ */
+ fdtab[fd].update_mask &= ~all_threads_mask;
+ done_update_polling(fd);
+ } else {
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ continue;
+ }

+ fdtab[fd].new = 0;
+
+ eo = fdtab[fd].state;
+ en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;
+ HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed, update the lists */
HA_SPIN_LOCK(POLL_LOCK, &poll_lock);
@@ -85,7 +120,9 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
FD_SET(fd, fd_evts[DIR_WR]);
HA_SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
+
}
+ HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
fd_nbupdt = 0;

/* let's restore fdset state */
diff --git a/src/fd.c b/src/fd.c
index b64130ed..a134e93e 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -175,9 +175,12 @@ unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
THREAD_LOCAL int *fd_updt = NULL; // FD updates list
THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list

+struct fdlist update_list; // Global update list
__decl_hathreads(HA_SPINLOCK_T fdtab_lock); /* global lock to protect fdtab array */
__decl_hathreads(HA_RWLOCK_T fdcache_lock); /* global lock to protect fd_cache array */
__decl_hathreads(HA_SPINLOCK_T poll_lock); /* global lock to protect poll info */
+__decl_hathreads(HA_SPINLOCK_T) fd_updt_lock; /* global lock to protect the update list */
+

/* Deletes an FD from the fdsets, and recomputes the maxfd limit.
* The file descriptor is also closed.
@@ -341,6 +344,9 @@ int init_pollers()
HA_SPIN_INIT(&fdtab_lock);
HA_RWLOCK_INIT(&fdcache_lock);
HA_SPIN_INIT(&poll_lock);
+ HA_SPIN_INIT(&fd_updt_lock);
+ update_list.first = update_list.last = -1;
+
do {
bp = NULL;
for (p = 0; p < nbpollers; p++)
diff --git a/src/hathreads.c b/src/hathreads.c
index daf226ce..5d90402b 100644
--- a/src/hathreads.c
+++ b/src/hathreads.c
@@ -31,7 +31,7 @@ void thread_sync_io_handler(int fd)
static HA_SPINLOCK_T sync_lock;
static int threads_sync_pipe[2];
static unsigned long threads_want_sync = 0;
-static unsigned long all_threads_mask = 0;
+unsigned long all_threads_mask = 0;

#if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
struct lock_stat lock_stats[LOCK_LABELS];
--
2.14.3
Willy Tarreau
Re: [PATCH] Make sure all the pollers get fd updates
May 17, 2018 08:30PM
On Thu, May 17, 2018 at 07:37:10PM +0200, Olivier Houchard wrote:
> Here is a patch that should do the same for 1.8.

Pretty cool, thank you. Now I think we'll have about everything we wanted
to emit a new 1.8.

Willy
Sorry, only registered users may post in this forum.

Click here to login