From 59905999ce145a81e0003766d468945c2444a90e Mon Sep 17 00:00:00 2001 From: Richard Nyberg Date: Fri, 9 Jan 2009 18:26:23 +0100 Subject: [PATCH] Add evloop, btpd's new event loop. This will replace libevent. --- evloop/Makefile.am | 8 +++ evloop/epoll.c | 118 +++++++++++++++++++++++++++++++++++ evloop/evloop.h | 57 +++++++++++++++++ evloop/kqueue.c | 122 ++++++++++++++++++++++++++++++++++++ evloop/poll.c | 142 ++++++++++++++++++++++++++++++++++++++++++ evloop/timeheap.c | 152 +++++++++++++++++++++++++++++++++++++++++++++ evloop/timeheap.h | 19 ++++++ evloop/timer.c | 104 +++++++++++++++++++++++++++++++ 8 files changed, 722 insertions(+) create mode 100644 evloop/Makefile.am create mode 100644 evloop/epoll.c create mode 100644 evloop/evloop.h create mode 100644 evloop/kqueue.c create mode 100644 evloop/poll.c create mode 100644 evloop/timeheap.c create mode 100644 evloop/timeheap.h create mode 100644 evloop/timer.c diff --git a/evloop/Makefile.am b/evloop/Makefile.am new file mode 100644 index 0000000..9826afa --- /dev/null +++ b/evloop/Makefile.am @@ -0,0 +1,8 @@ +noinst_LIBRARIES=libevloop.a +EXTRA_libevloop_a_SOURCES=epoll.c poll.c +libevloop_a_SOURCES=\ + evloop.h\ + timeheap.c timeheap.h timer.c +CFLAGS=@CFLAGS@ -D@EVLOOP_METHOD@ -I$(top_srcdir)/misc +libevloop_a_LIBADD=@EVLOOP_IMPL@ +libevloop_a_DEPENDENCIES=@EVLOOP_IMPL@ diff --git a/evloop/epoll.c b/evloop/epoll.c new file mode 100644 index 0000000..5152194 --- /dev/null +++ b/evloop/epoll.c @@ -0,0 +1,118 @@ +#include +#include +#include +#include + +#include "evloop.h" + +static int m_epfd; + +static struct epoll_event m_evs[100]; +static uint8_t m_valid[100]; + +int +evloop_init(void) +{ + if (timeheap_init() != 0) + return -1; + m_epfd = epoll_create(getdtablesize()); + return m_epfd >= 0 ? 0 : -1; +} + +int +fdev_new(struct fdev *ev, int fd, uint16_t flags, evloop_cb_t cb, void *arg) +{ + ev->fd = fd; + ev->cb = cb; + ev->arg = arg; + ev->flags = 0; + ev->index = -1; + return fdev_enable(ev, flags); +} + +int +fdev_enable(struct fdev *ev, uint16_t flags) +{ + struct epoll_event epev; + int err = 0; + uint16_t sf = ev->flags; + ev->flags |= flags; + if (sf != ev->flags) { + epev.data.ptr = ev; + epev.events = + ((ev->flags & EV_READ) ? EPOLLIN : 0) | + ((ev->flags & EV_WRITE) ? EPOLLOUT : 0); + if (sf == 0) + err = epoll_ctl(m_epfd, EPOLL_CTL_ADD, ev->fd, &epev); + else + err = epoll_ctl(m_epfd, EPOLL_CTL_MOD, ev->fd, &epev); + } + return err; +} + +int +fdev_disable(struct fdev *ev, uint16_t flags) +{ + struct epoll_event epev; + int err = 0; + uint16_t sf = ev->flags; + ev->flags &= ~flags; + if (sf != ev->flags) { + epev.data.ptr = ev; + epev.events = + ((ev->flags & EV_READ) ? EPOLLIN : 0) | + ((ev->flags & EV_WRITE) ? EPOLLOUT : 0); + if (ev->flags == 0) + err = epoll_ctl(m_epfd, EPOLL_CTL_DEL, ev->fd, &epev); + else + err = epoll_ctl(m_epfd, EPOLL_CTL_MOD, ev->fd, &epev); + } + return err; +} + +int +fdev_del(struct fdev *ev) +{ + if (ev->index >= 0) + m_valid[ev->index] = 0; + return fdev_disable(ev, EV_READ|EV_WRITE); +} + +int +evloop(void) +{ + int nev, i, millisecs; + struct timespec delay; + while (1) { + timers_run(); + delay = timer_delay(); + if (delay.tv_sec >= 0) + millisecs = delay.tv_sec * 1000 + delay.tv_nsec / 1000000; + else + millisecs = -1; + + if ((nev = epoll_wait(m_epfd, m_evs, 100, millisecs)) < 0) { + if (errno == EINTR) + continue; + else + return -1; + } + memset(m_valid, 1, nev); + for (i = 0; i < nev; i++) { + struct fdev *ev = m_evs[i].data.ptr; + ev->index = i; + } + for (i = 0; i < nev; i++) { + struct fdev *ev = m_evs[i].data.ptr; + if ((m_valid[i] && + ev->flags & EV_READ && + m_evs[i].events & (EPOLLIN|EPOLLERR|EPOLLHUP))) + ev->cb(ev->fd, EV_READ, ev->arg); + if ((m_valid[i] && ev->flags & EV_WRITE && + m_evs[i].events & (EPOLLOUT|EPOLLERR|EPOLLHUP))) + ev->cb(ev->fd, EV_WRITE, ev->arg); + if (m_valid[i]) + ev->index = -1; + } + } +} diff --git a/evloop/evloop.h b/evloop/evloop.h new file mode 100644 index 0000000..97f7665 --- /dev/null +++ b/evloop/evloop.h @@ -0,0 +1,57 @@ +#ifndef BTPD_EVLOOP_H +#define BTPD_EVLOOP_H + +#include +#include + +#include "timeheap.h" + +#define EV_READ 1 +#define EV_WRITE 2 +#define EV_TIMEOUT 3 + +typedef void (*evloop_cb_t)(int fd, short type, void *arg); + +#if defined(EVLOOP_EPOLL) || defined(EVLOOP_KQUEUE) + +struct fdev { + evloop_cb_t cb; + void *arg; + int fd; + uint16_t flags; + int16_t index; +}; + +#elif defined(EVLOOP_POLL) + +struct fdev { + int i; +}; + +#else +#error No evloop method defined. +#endif + +struct timeout { + evloop_cb_t cb; + void *arg; + struct th_handle th; +}; + +int evloop_init(void); +int evloop(void); + +int fdev_new(struct fdev *ev, int fd, uint16_t flags, evloop_cb_t cb, + void *arg); +int fdev_del(struct fdev *ev); +int fdev_enable(struct fdev *ev, uint16_t flags); +int fdev_disable(struct fdev *ev, uint16_t flags); + +void timer_init(struct timeout *, evloop_cb_t, void *); +int timer_add(struct timeout *, struct timespec *); +void timer_del(struct timeout *); + +void timers_run(void); +struct timespec timer_delay(void); + +#endif diff --git a/evloop/kqueue.c b/evloop/kqueue.c new file mode 100644 index 0000000..7f323db --- /dev/null +++ b/evloop/kqueue.c @@ -0,0 +1,122 @@ +#include +#include +#include + +#include +#include +#include + +#include "evloop.h" + +static int m_kq; + +static struct kevent m_evs[100]; +static uint8_t m_valid[100]; + +int +evloop_init(void) +{ + if (timeheap_init() != 0) + return -1; + m_kq = kqueue(); + return m_kq >= 0 ? 0 : -1; +} + +int +fdev_new(struct fdev *ev, int fd, uint16_t flags, evloop_cb_t cb, void *arg) +{ + ev->fd = fd; + ev->cb = cb; + ev->arg = arg; + ev->flags = 0; + ev->index = -1; + return fdev_enable(ev, flags); +} + +int +fdev_enable(struct fdev *ev, uint16_t flags) +{ + struct kevent kev[2], *kp = NULL; + int count = 0; + uint16_t sf = ev->flags; + ev->flags |= flags; + if ((sf & EV_READ) == 0 && (flags & EV_READ) != 0) { + EV_SET(&kev[0], ev->fd, EVFILT_READ, EV_ADD, 0, 0, ev); + kp = kev; + count = 1; + } + if ((sf & EV_WRITE) == 0 && (flags & EV_WRITE) != 0) { + EV_SET(&kev[1], ev->fd, EVFILT_WRITE, EV_ADD, 0, 0, ev); + if (count == 0) + kp = &kev[1]; + count++; + } + return count > 0 ? kevent(m_kq, kp, count, NULL, 0, NULL) : 0; +} + +int +fdev_disable(struct fdev *ev, uint16_t flags) +{ + struct kevent kev[2], *kp = NULL; + int count = 0; + uint16_t sf = ev->flags; + ev->flags &= ~flags; + if ((sf & EV_READ) != 0 && (flags & EV_READ) != 0) { + EV_SET(&kev[0], ev->fd, EVFILT_READ, EV_DELETE, 0, 0, ev); + kp = kev; + count = 1; + } + if ((sf & EV_WRITE) != 0 && (flags & EV_WRITE) != 0) { + EV_SET(&kev[1], ev->fd, EVFILT_WRITE, EV_DELETE, 0, 0, ev); + if (count == 0) + kp = &kev[1]; + count++; + } + return count > 0 ? kevent(m_kq, kp, count, NULL, 0, NULL) : 0; +} + +int +fdev_del(struct fdev *ev) +{ + if (ev->index >= 0) + m_valid[ev->index] = 0; + return fdev_disable(ev, EV_READ|EV_WRITE); +} + +int +evloop(void) +{ + int nev, i; + struct timespec delay; + while (1) { + timers_run(); + delay = timer_delay(); + + if ((nev = kevent(m_kq, NULL, 0, m_evs, 100, &delay)) < 0) { + if (errno == EINTR) + continue; + else + return -1; + } + memset(m_valid, 1, nev); + for (i = 0; i < nev; i++) { + struct fdev *ev = (struct fdev *)m_evs[i].udata; + ev->index = i; + } + for (i = 0; i < nev; i++) { + if (m_evs[i].flags & EV_ERROR) { + errno = m_evs[i].data; + return -1; + } + struct fdev *ev = (struct fdev *)m_evs[i].udata; + if (m_valid[i] && ev->flags & EV_READ && + m_evs[i].filter == EVFILT_READ) + ev->cb(ev->fd, EV_READ, ev->arg); + if (m_valid[i] && ev->flags & EV_WRITE && + m_evs[i].filter == EVFILT_WRITE) + ev->cb(ev->fd, EV_WRITE, ev->arg); + if (m_valid[i]) + ev->index = -1; + } + } +} diff --git a/evloop/poll.c b/evloop/poll.c new file mode 100644 index 0000000..0996374 --- /dev/null +++ b/evloop/poll.c @@ -0,0 +1,142 @@ +#include +#include +#include +#include +#include + +#include "evloop.h" + +#define POLL_INIT_SIZE 64 + +struct poll_ev { + struct fdev *ev; + evloop_cb_t cb; + void *arg; +}; + +static struct pollfd *m_pfds; +static struct poll_ev *m_pevs; + +static int m_cap, m_size; +static int m_cur = -1, m_curdel; + +static int +poll_grow(void) +{ + int ncap = m_cap * 2; + struct pollfd *nm_pfds = realloc(m_pfds, ncap * sizeof(*m_pfds)); + struct poll_ev *nm_pevs = realloc(m_pevs, ncap * sizeof(*m_pevs)); + if (nm_pfds != NULL) + m_pfds = nm_pfds; + if (nm_pevs != NULL) + m_pevs = nm_pevs; + if (nm_pfds == NULL || nm_pevs == NULL) + return errno; + m_cap = ncap; + return 0; +} + +int +evloop_init(void) +{ + if (timeheap_init() != 0) + return -1; + m_cap = POLL_INIT_SIZE; + m_size = 0; + if ((m_pfds = calloc(m_cap, sizeof(*m_pfds))) == NULL) + return -1; + if ((m_pevs = calloc(m_cap, sizeof(*m_pevs))) == NULL) { + free(m_pfds); + return -1; + } + return 0; +} + +int +fdev_new(struct fdev *ev, int fd, uint16_t flags, evloop_cb_t cb, void *arg) +{ + if (m_size == m_cap && poll_grow() != 0) + return errno; + ev->i = m_size; + m_size++; + m_pfds[ev->i].fd = fd; + m_pfds[ev->i].events = + ((flags & EV_READ) ? POLLIN : 0) | + ((flags & EV_WRITE) ? POLLOUT : 0); + m_pevs[ev->i].ev = ev; + m_pevs[ev->i].cb = cb; + m_pevs[ev->i].arg = arg; + return 0; +} + +int +fdev_enable(struct fdev *ev, uint16_t flags) +{ + m_pfds[ev->i].events |= + ((flags & EV_READ) ? POLLIN : 0) | + ((flags & EV_WRITE) ? POLLOUT : 0); + return 0; +} + +int +fdev_disable(struct fdev *ev, uint16_t flags) +{ + short pflags = + ((flags & EV_READ) ? POLLIN : 0) | + ((flags & EV_WRITE) ? POLLOUT : 0); + m_pfds[ev->i].events &= ~pflags; + return 0; +} + +int +fdev_del(struct fdev *ev) +{ + assert(ev->i < m_size); + m_size--; + m_pfds[ev->i] = m_pfds[m_size]; + m_pevs[ev->i] = m_pevs[m_size]; + m_pevs[ev->i].ev->i = ev->i; + if (ev->i == m_cur) + m_curdel = 1; + return 0; +} + +int +evloop(void) +{ + int millisecs; + struct timespec delay; + while (1) { + timers_run(); + + delay = timer_delay(); + if (delay.tv_sec >= 0) + millisecs = delay.tv_sec * 1000 + delay.tv_nsec / 1000000; + else + millisecs = -1; + + if (poll(m_pfds, m_size, millisecs) < 0) { + if (errno == EINTR) + continue; + else + return -1; + } + + m_cur = 0; + while (m_cur < m_size) { + struct pollfd *pfd = &m_pfds[m_cur]; + struct poll_ev *pev = &m_pevs[m_cur]; + if ((pfd->events & POLLIN && + pfd->revents & (POLLIN|POLLERR|POLLHUP))) + pev->cb(pfd->fd, EV_READ, pev->arg); + if ((!m_curdel && pfd->events & POLLOUT && + pfd->revents & (POLLOUT|POLLERR|POLLHUP))) + pev->cb(pfd->fd, EV_WRITE, pev->arg); + if (!m_curdel) + m_cur++; + else + m_curdel = 0; + } + m_cur = -1; + } +} diff --git a/evloop/timeheap.c b/evloop/timeheap.c new file mode 100644 index 0000000..470feaf --- /dev/null +++ b/evloop/timeheap.c @@ -0,0 +1,152 @@ +#include +#include +#include + +#include "timeheap.h" + +struct th_entry { + struct timespec t; + struct th_handle *h; +}; + +static struct th_entry *heap; +static int heap_cap; +static int heap_use; + +static int +cmptime_lt(struct timespec a, struct timespec b) +{ + if (a.tv_sec == b.tv_sec) + return a.tv_nsec < b.tv_nsec; + else + return a.tv_sec < b.tv_sec; +} + +static int +cmpentry_lt(int a, int b) +{ + return cmptime_lt(heap[a].t, heap[b].t); +} + +static void +swap(int i, int j) +{ + struct th_entry tmp = heap[i]; + heap[i] = heap[j]; + heap[i].h->i = i; + heap[j] = tmp; + heap[j].h->i = j; +} + +static void +bubble_up(int i) +{ + while (i != 0) { + int p = (i-1)/2; + if (cmpentry_lt(i, p)) { + swap(i, p); + i = p; + } else + return; + } +} + +static void +bubble_down(int i) +{ + int li, ri, ci; +loop: + li = 2*i+1; + ri = 2*i+2; + if (ri < heap_use) + ci = cmpentry_lt(li, ri) ? li : ri; + else if (li < heap_use) + ci = li; + else + return; + if (cmpentry_lt(ci, i)) { + swap(i, ci); + i = ci; + goto loop; + } +} + +int +timeheap_init(void) +{ + heap_cap = 10; + heap_use = 0; + if ((heap = malloc(sizeof(struct th_entry) * heap_cap)) == NULL) + return -1; + else + return 0; +} + +int +timeheap_size(void) +{ + return heap_use; +} + +int +timeheap_insert(struct th_handle *h, struct timespec *t) +{ + if (heap_use == heap_cap) { + int ncap = heap_cap * 2; + struct th_entry *nheap = realloc(heap, ncap * sizeof(struct th_entry)); + if (nheap == NULL) + return -1; + heap_cap = ncap; + heap = nheap; + } + heap[heap_use].t = *t; + heap[heap_use].h = h; + h->i = heap_use; + heap_use++; + bubble_up(h->i); + return 0; +} + +void +timeheap_remove(struct th_handle *h) +{ + assert(h->i >= 0 && h->i < heap_use); + heap_use--; + if (heap_use > 0) { + int i = h->i; + int earlier = cmpentry_lt(heap_use, i); + heap[i] = heap[heap_use]; + heap[i].h->i = i; + if (earlier) + bubble_up(i); + else + bubble_down(i); + } +} + +void +timeheap_change(struct th_handle *h, struct timespec *t) +{ + assert(h->i >= 0 && h->i < heap_use); + int earlier = cmptime_lt(*t, heap[h->i].t); + heap[h->i].t = *t; + if (earlier) + bubble_up(h->i); + else + bubble_down(h->i); +} + +struct timespec +timeheap_top(void) +{ + return heap[0].t; +} + +void * +timeheap_remove_top(void) +{ + void *ret = heap[0].h->data; + struct th_handle h = { 0, NULL }; + timeheap_remove(&h); + return ret; +} diff --git a/evloop/timeheap.h b/evloop/timeheap.h new file mode 100644 index 0000000..8a712c5 --- /dev/null +++ b/evloop/timeheap.h @@ -0,0 +1,19 @@ +#ifndef BTPD_TIMEHEAP_H +#define BTPD_TIMEHEAP_H + +struct th_handle { + int i; + void *data; +}; + +int timeheap_init(void); +int timeheap_size(void); + +int timeheap_insert(struct th_handle *h, struct timespec *t); +void timeheap_remove(struct th_handle *h); +void timeheap_change(struct th_handle *h, struct timespec *t); + +void *timeheap_remove_top(void); +struct timespec timeheap_top(void); + +#endif diff --git a/evloop/timer.c b/evloop/timer.c new file mode 100644 index 0000000..3ef3091 --- /dev/null +++ b/evloop/timer.c @@ -0,0 +1,104 @@ +#include + +#include "evloop.h" +#include "timeheap.h" + +#if defined(CLOCK_MONOTONIC_FAST) +#define TIMER_CLOCK CLOCK_MONOTONIC_FAST +#elif defined(CLOCK_MONOTONIC) +#define TIMER_CLOCK CLOCK_MONOTONIC +#else +#error CLOCK_MONOTONIC needed! +#endif + +static struct timespec +addtime(struct timespec a, struct timespec b) +{ + struct timespec ret; + ret.tv_sec = a.tv_sec + b.tv_sec; + ret.tv_nsec = a.tv_nsec + b.tv_nsec; + if (ret.tv_nsec >= 1000000000) { + ret.tv_sec += 1; + ret.tv_nsec -= 1000000000; + } + return ret; +} + +static struct timespec +subtime(struct timespec a, struct timespec b) +{ + struct timespec ret; + ret.tv_sec = a.tv_sec - b.tv_sec; + ret.tv_nsec = a.tv_nsec - b.tv_nsec; + if (ret.tv_nsec < 0) { + ret.tv_sec -= 1; + ret.tv_nsec += 1000000000; + } + return ret; +} + +void +timer_init(struct timeout *h, evloop_cb_t cb, void *arg) +{ + h->cb = cb; + h->arg = arg; + h->th.i = -1; + h->th.data = h; +} + +int +timer_add(struct timeout *h, struct timespec *t) +{ + struct timespec now, sum; + clock_gettime(TIMER_CLOCK, &now); + sum = addtime(now, *t); + if (h->th.i == -1) + return timeheap_insert(&h->th, &sum); + else { + timeheap_change(&h->th, &sum); + return 0; + } +} + +void +timer_del(struct timeout *h) +{ + if (h->th.i >= 0) { + timeheap_remove(&h->th); + h->th.i = -1; + } +} + +void +timers_run(void) +{ + struct timespec now; + clock_gettime(TIMER_CLOCK, &now); + while (timeheap_size() > 0) { + struct timespec diff = subtime(timeheap_top(), now); + if (diff.tv_sec < 0) { + struct timeout *t = timeheap_remove_top(); + t->th.i = -1; + t->cb(-1, EV_TIMEOUT, t->arg); + } else + break; + } +} + +struct timespec +timer_delay(void) +{ + struct timespec now, diff; + if (timeheap_size() == 0) { + diff.tv_sec = -1; + diff.tv_nsec = 0; + } else { + clock_gettime(TIMER_CLOCK, &now); + diff = subtime(timeheap_top(), now); + if (diff.tv_sec < 0) { + diff.tv_sec = 0; + diff.tv_nsec = 0; + } + } + return diff; +}