From 42c2605dcbbeececc7bef7800cfd61d26c9694a2 Mon Sep 17 00:00:00 2001 From: Richard Nyberg Date: Tue, 23 Dec 2008 19:20:14 +0100 Subject: [PATCH] Bring back the old thread callback code. --- btpd/Makefile.am | 2 +- btpd/btpd.c | 2 ++ btpd/btpd.h | 7 ++++ btpd/thread_cb.c | 87 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 btpd/thread_cb.c diff --git a/btpd/Makefile.am b/btpd/Makefile.am index 2656659..2568763 100644 --- a/btpd/Makefile.am +++ b/btpd/Makefile.am @@ -10,7 +10,7 @@ btpd_SOURCES=\ net_buf.c net_buf.h\ opts.c opts.h\ peer.c peer.h\ - tlib.c tlib.h torrent.c torrent.h\ + thread_cb.c tlib.c tlib.h torrent.c torrent.h\ tracker_req.c tracker_req.h\ upload.c upload.h\ util.c diff --git a/btpd/btpd.c b/btpd/btpd.c index b68439c..c8bd5d2 100644 --- a/btpd/btpd.c +++ b/btpd/btpd.c @@ -76,6 +76,7 @@ heartbeat_cb(int fd, short type, void *arg) void tr_init(void); void ipc_init(void); +void td_init(void); void btpd_init(void) @@ -100,6 +101,7 @@ btpd_init(void) srandom(seed); + td_init(); net_init(); ipc_init(); ul_init(); diff --git a/btpd/btpd.h b/btpd/btpd.h index 848914c..fe6cabf 100644 --- a/btpd/btpd.h +++ b/btpd/btpd.h @@ -77,4 +77,11 @@ int btpd_is_stopping(void); const uint8_t *btpd_get_peer_id(void); +void td_acquire_lock(void); +void td_release_lock(void); + +void td_post(void (*cb)(void *), void *arg); +void td_post_end(); +#define td_post_begin td_acquire_lock + #endif diff --git a/btpd/thread_cb.c b/btpd/thread_cb.c new file mode 100644 index 0000000..f9d10f5 --- /dev/null +++ b/btpd/thread_cb.c @@ -0,0 +1,87 @@ +#include + +#include +#include +#include + +#include "btpd.h" + +struct td_cb { + void (*cb)(void *); + void *arg; + BTPDQ_ENTRY(td_cb) entry; +}; + +BTPDQ_HEAD(td_cb_tq, td_cb); + +static int m_td_rd, m_td_wr; +static struct event m_td_ev; +static struct td_cb_tq m_td_cbs = BTPDQ_HEAD_INITIALIZER(m_td_cbs); +static pthread_mutex_t m_td_lock; + +void +td_acquire_lock(void) +{ + pthread_mutex_lock(&m_td_lock); +} + +void +td_release_lock(void) +{ + pthread_mutex_unlock(&m_td_lock); +} + +void +td_post(void (*fun)(void *), void *arg) +{ + struct td_cb *cb = btpd_calloc(1, sizeof(*cb)); + cb->cb = fun; + cb->arg = arg; + BTPDQ_INSERT_TAIL(&m_td_cbs, cb, entry); +} + +void +td_post_end(void) +{ + char c = '1'; + td_release_lock(); + write(m_td_wr, &c, sizeof(c)); +} + +static void +td_cb(int fd, short type, void *arg) +{ + char buf[1024]; + struct td_cb_tq tmpq = BTPDQ_HEAD_INITIALIZER(tmpq); + struct td_cb *cb, *next; + + read(fd, buf, sizeof(buf)); + td_acquire_lock(); + BTPDQ_FOREACH_MUTABLE(cb, &m_td_cbs, entry, next) + BTPDQ_INSERT_TAIL(&tmpq, cb, entry); + BTPDQ_INIT(&m_td_cbs); + td_release_lock(); + + BTPDQ_FOREACH_MUTABLE(cb, &tmpq, entry, next) { + cb->cb(cb->arg); + free(cb); + } +} + +void +td_init(void) +{ + int err; + int fds[2]; + if (pipe(fds) == -1) { + btpd_err("Couldn't create thread callback pipe (%s).\n", + strerror(errno)); + } + m_td_rd = fds[0]; + m_td_wr = fds[1]; + if ((err = pthread_mutex_init(&m_td_lock, NULL)) != 0) + btpd_err("Couldn't create mutex (%s).\n", strerror(err)); + + event_set(&m_td_ev, m_td_rd, EV_READ|EV_PERSIST, td_cb, NULL); + btpd_ev_add(&m_td_ev, NULL); +}