diff --git a/btpd/Makefile.am b/btpd/Makefile.am index a77def4..69a47f1 100644 --- a/btpd/Makefile.am +++ b/btpd/Makefile.am @@ -10,7 +10,7 @@ btpd_SOURCES=\ net_buf.c net_buf.h\ opts.c opts.h\ peer.c peer.h\ - tlib.c tlib.h torrent.c torrent.h\ + td.c tlib.c tlib.h torrent.c torrent.h\ tracker_req.c tracker_req.h\ upload.c upload.h\ util.c diff --git a/btpd/btpd.c b/btpd/btpd.c index e3218a8..4bfb3e6 100644 --- a/btpd/btpd.c +++ b/btpd/btpd.c @@ -1,30 +1,5 @@ -#include - -#include -#include -#include -#include - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include -#include -#include #include -#include -#include #include "btpd.h" #include "http.h" @@ -104,95 +79,17 @@ heartbeat_cb(int fd, short type, void *arg) net_on_tick(); } -struct td_cb { - void (*cb)(void *); - void *arg; - BTPDQ_ENTRY(td_cb) entry; -}; - -BTPDQ_HEAD(td_cb_tq, td_cb); - -static int m_td_rd, m_td_wr; -static struct event m_td_ev; -static struct td_cb_tq m_td_cbs = BTPDQ_HEAD_INITIALIZER(m_td_cbs); -static pthread_mutex_t m_td_lock; - -void -td_acquire_lock(void) -{ - pthread_mutex_lock(&m_td_lock); -} - -void -td_release_lock(void) -{ - pthread_mutex_unlock(&m_td_lock); -} - -void -td_post(void (*fun)(void *), void *arg) -{ - struct td_cb *cb = btpd_calloc(1, sizeof(*cb)); - cb->cb = fun; - cb->arg = arg; - BTPDQ_INSERT_TAIL(&m_td_cbs, cb, entry); -} - -void -td_post_end(void) -{ - char c = '1'; - td_release_lock(); - write(m_td_wr, &c, sizeof(c)); -} - -static void -td_cb(int fd, short type, void *arg) -{ - char buf[1024]; - struct td_cb_tq tmpq = BTPDQ_HEAD_INITIALIZER(tmpq); - struct td_cb *cb, *next; - - read(fd, buf, sizeof(buf)); - td_acquire_lock(); - BTPDQ_FOREACH_MUTABLE(cb, &m_td_cbs, entry, next) - BTPDQ_INSERT_TAIL(&tmpq, cb, entry); - BTPDQ_INIT(&m_td_cbs); - td_release_lock(); - - BTPDQ_FOREACH_MUTABLE(cb, &tmpq, entry, next) { - cb->cb(cb->arg); - free(cb); - } -} - -static void -td_init(void) -{ - int err; - int fds[2]; - if (pipe(fds) == -1) { - btpd_err("Couldn't create thread callback pipe (%s).\n", - strerror(errno)); - } - m_td_rd = fds[0]; - m_td_wr = fds[1]; - if ((err = pthread_mutex_init(&m_td_lock, NULL)) != 0) - btpd_err("Couldn't create mutex (%s).\n", strerror(err)); - - event_set(&m_td_ev, m_td_rd, EV_READ|EV_PERSIST, td_cb, NULL); - btpd_ev_add(&m_td_ev, NULL); -} - +void td_init(void); void tr_init(void); void ipc_init(void); void btpd_init(void) { + srandom(time(NULL)); + bcopy(BTPD_VERSION, m_peer_id, sizeof(BTPD_VERSION) - 1); m_peer_id[sizeof(BTPD_VERSION) - 1] = '|'; - srandom(time(NULL)); for (int i = sizeof(BTPD_VERSION); i < 20; i++) m_peer_id[i] = rand_between(0, 255); diff --git a/btpd/td.c b/btpd/td.c new file mode 100644 index 0000000..f9d10f5 --- /dev/null +++ b/btpd/td.c @@ -0,0 +1,87 @@ +#include + +#include +#include +#include + +#include "btpd.h" + +struct td_cb { + void (*cb)(void *); + void *arg; + BTPDQ_ENTRY(td_cb) entry; +}; + +BTPDQ_HEAD(td_cb_tq, td_cb); + +static int m_td_rd, m_td_wr; +static struct event m_td_ev; +static struct td_cb_tq m_td_cbs = BTPDQ_HEAD_INITIALIZER(m_td_cbs); +static pthread_mutex_t m_td_lock; + +void +td_acquire_lock(void) +{ + pthread_mutex_lock(&m_td_lock); +} + +void +td_release_lock(void) +{ + pthread_mutex_unlock(&m_td_lock); +} + +void +td_post(void (*fun)(void *), void *arg) +{ + struct td_cb *cb = btpd_calloc(1, sizeof(*cb)); + cb->cb = fun; + cb->arg = arg; + BTPDQ_INSERT_TAIL(&m_td_cbs, cb, entry); +} + +void +td_post_end(void) +{ + char c = '1'; + td_release_lock(); + write(m_td_wr, &c, sizeof(c)); +} + +static void +td_cb(int fd, short type, void *arg) +{ + char buf[1024]; + struct td_cb_tq tmpq = BTPDQ_HEAD_INITIALIZER(tmpq); + struct td_cb *cb, *next; + + read(fd, buf, sizeof(buf)); + td_acquire_lock(); + BTPDQ_FOREACH_MUTABLE(cb, &m_td_cbs, entry, next) + BTPDQ_INSERT_TAIL(&tmpq, cb, entry); + BTPDQ_INIT(&m_td_cbs); + td_release_lock(); + + BTPDQ_FOREACH_MUTABLE(cb, &tmpq, entry, next) { + cb->cb(cb->arg); + free(cb); + } +} + +void +td_init(void) +{ + int err; + int fds[2]; + if (pipe(fds) == -1) { + btpd_err("Couldn't create thread callback pipe (%s).\n", + strerror(errno)); + } + m_td_rd = fds[0]; + m_td_wr = fds[1]; + if ((err = pthread_mutex_init(&m_td_lock, NULL)) != 0) + btpd_err("Couldn't create mutex (%s).\n", strerror(err)); + + event_set(&m_td_ev, m_td_rd, EV_READ|EV_PERSIST, td_cb, NULL); + btpd_ev_add(&m_td_ev, NULL); +}