From de29f2ea84fc8995b97a4cbe67c8a4368bdd47df Mon Sep 17 00:00:00 2001 From: Richard Nyberg Date: Sat, 7 Jan 2006 14:55:34 +0000 Subject: [PATCH] Nicer code for the http thread. --- btpd/http.c | 172 ++++++++++++++++++++++++---------------------------- 1 file changed, 78 insertions(+), 94 deletions(-) diff --git a/btpd/http.c b/btpd/http.c index d01ec6b..82a0750 100644 --- a/btpd/http.c +++ b/btpd/http.c @@ -7,7 +7,8 @@ #include "btpd.h" #include "http.h" -#define MAX_DOWNLOAD (256 << 10) +#define MAX_DOWNLOAD (1 << 18) // 256kB +#define CURL_SELECT_TIME (& (struct timeval) { 1, 0 }) enum http_state { HS_ADD, @@ -29,8 +30,7 @@ struct http { BTPDQ_HEAD(http_tq, http); -static pthread_t m_httptd; -static struct http_tq m_httpq = BTPDQ_HEAD_INITIALIZER(m_httpq); +static struct http_tq m_httpq = BTPDQ_HEAD_INITIALIZER(m_httpq); static pthread_mutex_t m_httpq_lock; static pthread_cond_t m_httpq_cond; static CURLM *m_curlh; @@ -54,8 +54,7 @@ http_write_cb(void *ptr, size_t size, size_t nmemb, void *arg) int http_get(struct http **ret, - void (*cb)(struct http *, struct http_res *, void *), - void *arg, + void (*cb)(struct http *, struct http_res *, void *), void *arg, const char *fmt, ...) { struct http *h = btpd_calloc(1, sizeof(*h)); @@ -113,7 +112,7 @@ http_td_cb(void *arg) if (h->res.res == HRES_OK) curl_easy_getinfo(h->curlh, CURLINFO_RESPONSE_CODE, &h->res.code); if (h->res.res == HRES_FAIL) { - btpd_log(BTPD_L_BTPD, "Http error for url '%s' (%s).\n", h->url, + btpd_log(BTPD_L_ERROR, "Http error for url '%s' (%s).\n", h->url, curl_easy_strerror(h->res.code)); } h->cb(h, &h->res, h->cb_arg); @@ -127,119 +126,104 @@ http_td_cb(void *arg) static void http_td_actions(void) { - int has_posted = 0; + int nmsgs, has_posted; struct http *http, *next; - int nmsgs; CURLMsg *cmsg; - BTPDQ_FOREACH_MUTABLE(http, &m_httpq, entry, next) { - switch (http->state) { - case HS_ADD: - curl_multi_add_handle(m_curlh, http->curlh); - http->state = HS_ACTIVE; - break; - case HS_CANCEL: - curl_multi_remove_handle(m_curlh, http->curlh); - case HS_NOADD: + pthread_mutex_lock(&m_httpq_lock); + do { + has_posted = 0; + while (BTPDQ_EMPTY(&m_httpq)) + pthread_cond_wait(&m_httpq_cond, &m_httpq_lock); + + BTPDQ_FOREACH_MUTABLE(http, &m_httpq, entry, next) { + switch (http->state) { + case HS_ADD: + curl_multi_add_handle(m_curlh, http->curlh); + http->state = HS_ACTIVE; + break; + case HS_CANCEL: + curl_multi_remove_handle(m_curlh, http->curlh); + case HS_NOADD: + BTPDQ_REMOVE(&m_httpq, http, entry); + http->state = HS_CANCEL; + http->res.res = HRES_CANCEL; + if (!has_posted) { + has_posted = 1; + td_post_begin(); + } + td_post(http_td_cb, http); + break; + case HS_DONE: + abort(); + default: + break; + } + } + + while ((cmsg = curl_multi_info_read(m_curlh, &nmsgs)) != NULL) { + BTPDQ_FOREACH(http, &m_httpq, entry) { + if (http->curlh == cmsg->easy_handle) + break; + } + assert(http != NULL); BTPDQ_REMOVE(&m_httpq, http, entry); - http->state = HS_CANCEL; - http->res.res = HRES_CANCEL; + http->state = HS_DONE; + if (cmsg->data.result == 0) + http->res.res = HRES_OK; + else { + http->res.res = HRES_FAIL; + http->res.code = cmsg->data.result; + } + curl_multi_remove_handle(m_curlh, http->curlh); if (!has_posted) { - has_posted = 1; td_post_begin(); + has_posted = 1; } td_post(http_td_cb, http); - break; - case HS_DONE: - abort(); - default: - break; } - } - - while ((cmsg = curl_multi_info_read(m_curlh, &nmsgs)) != NULL) { - BTPDQ_FOREACH(http, &m_httpq, entry) { - if (http->curlh == cmsg->easy_handle) - break; - } - assert(http != NULL); - BTPDQ_REMOVE(&m_httpq, http, entry); - http->state = HS_DONE; - if (cmsg->data.result == 0) - http->res.res = HRES_OK; - else { - http->res.res = HRES_FAIL; - http->res.code = cmsg->data.result; - } - curl_multi_remove_handle(m_curlh, http->curlh); - if (!has_posted) { - td_post_begin(); - has_posted = 1; - } - td_post(http_td_cb, http); - } - - if (has_posted) - td_post_end(); + if (has_posted) + td_post_end(); + } while (BTPDQ_EMPTY(&m_httpq)); + pthread_mutex_unlock(&m_httpq_lock); } -static void -http_td_curl(void) +static void * +http_td(void *arg) { fd_set rset, wset, eset; int maxfd, nbusy; - pthread_mutex_unlock(&m_httpq_lock); + for (;;) { + http_td_actions(); - do { while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(m_curlh, &nbusy)) ; - FD_ZERO(&rset); - FD_ZERO(&wset); - FD_ZERO(&eset); - - curl_multi_fdset(m_curlh, &rset, &wset, &eset, &maxfd); - select(maxfd + 1, &rset, &wset, &eset, (& (struct timeval) { 2, 0})); - - pthread_mutex_lock(&m_httpq_lock); - http_td_actions(); - pthread_mutex_unlock(&m_httpq_lock); - } while (nbusy > 0); - - pthread_mutex_lock(&m_httpq_lock); + if (nbusy > 0) { + FD_ZERO(&rset); + FD_ZERO(&wset); + FD_ZERO(&eset); + curl_multi_fdset(m_curlh, &rset, &wset, &eset, &maxfd); + select(maxfd + 1, &rset, &wset, &eset, CURL_SELECT_TIME); + } + } } -static void * -http_td(void *arg) +static void +errdie(int err) { - pthread_mutex_lock(&m_httpq_lock); - for (;;) { - while (BTPDQ_EMPTY(&m_httpq)) - pthread_cond_wait(&m_httpq_cond, &m_httpq_lock); - http_td_actions(); - if (!BTPDQ_EMPTY(&m_httpq)) - http_td_curl(); - } + if (err != 0) + btpd_err("Fatal error in http_init.\n"); } void http_init(void) { - int err; - if (curl_global_init(0)) - goto curl_err; - if ((m_curlh = curl_multi_init()) == NULL) - goto curl_err; - - err = pthread_mutex_init(&m_httpq_lock, NULL); - if (err == 0) - err = pthread_cond_init(&m_httpq_cond, NULL); - if (err == 0) - err = pthread_create(&m_httptd, NULL, http_td, NULL); - if (err != 0) - btpd_err("pthread failure (%s)\n", strerror(err)); - return; -curl_err: - btpd_err("Fatal error in curl.\n"); + pthread_t ret; + errdie(curl_global_init(0)); + errdie((m_curlh = curl_multi_init()) == NULL); + errdie(pthread_mutex_init(&m_httpq_lock, NULL)); + errdie(pthread_cond_init(&m_httpq_cond, NULL)); + errdie(pthread_create(&ret, NULL, http_td, NULL)); }