Browse Source

Nicer code for the http thread.

master
Richard Nyberg 19 years ago
parent
commit
de29f2ea84
1 changed files with 78 additions and 94 deletions
  1. +78
    -94
      btpd/http.c

+ 78
- 94
btpd/http.c View File

@@ -7,7 +7,8 @@
#include "btpd.h"
#include "http.h"

#define MAX_DOWNLOAD (256 << 10)
#define MAX_DOWNLOAD (1 << 18) // 256kB
#define CURL_SELECT_TIME (& (struct timeval) { 1, 0 })

enum http_state {
HS_ADD,
@@ -29,8 +30,7 @@ struct http {

BTPDQ_HEAD(http_tq, http);

static pthread_t m_httptd;
static struct http_tq m_httpq = BTPDQ_HEAD_INITIALIZER(m_httpq);
static struct http_tq m_httpq = BTPDQ_HEAD_INITIALIZER(m_httpq);
static pthread_mutex_t m_httpq_lock;
static pthread_cond_t m_httpq_cond;
static CURLM *m_curlh;
@@ -54,8 +54,7 @@ http_write_cb(void *ptr, size_t size, size_t nmemb, void *arg)

int
http_get(struct http **ret,
void (*cb)(struct http *, struct http_res *, void *),
void *arg,
void (*cb)(struct http *, struct http_res *, void *), void *arg,
const char *fmt, ...)
{
struct http *h = btpd_calloc(1, sizeof(*h));
@@ -113,7 +112,7 @@ http_td_cb(void *arg)
if (h->res.res == HRES_OK)
curl_easy_getinfo(h->curlh, CURLINFO_RESPONSE_CODE, &h->res.code);
if (h->res.res == HRES_FAIL) {
btpd_log(BTPD_L_BTPD, "Http error for url '%s' (%s).\n", h->url,
btpd_log(BTPD_L_ERROR, "Http error for url '%s' (%s).\n", h->url,
curl_easy_strerror(h->res.code));
}
h->cb(h, &h->res, h->cb_arg);
@@ -127,119 +126,104 @@ http_td_cb(void *arg)
static void
http_td_actions(void)
{
int has_posted = 0;
int nmsgs, has_posted;
struct http *http, *next;
int nmsgs;
CURLMsg *cmsg;

BTPDQ_FOREACH_MUTABLE(http, &m_httpq, entry, next) {
switch (http->state) {
case HS_ADD:
curl_multi_add_handle(m_curlh, http->curlh);
http->state = HS_ACTIVE;
break;
case HS_CANCEL:
curl_multi_remove_handle(m_curlh, http->curlh);
case HS_NOADD:
pthread_mutex_lock(&m_httpq_lock);
do {
has_posted = 0;
while (BTPDQ_EMPTY(&m_httpq))
pthread_cond_wait(&m_httpq_cond, &m_httpq_lock);

BTPDQ_FOREACH_MUTABLE(http, &m_httpq, entry, next) {
switch (http->state) {
case HS_ADD:
curl_multi_add_handle(m_curlh, http->curlh);
http->state = HS_ACTIVE;
break;
case HS_CANCEL:
curl_multi_remove_handle(m_curlh, http->curlh);
case HS_NOADD:
BTPDQ_REMOVE(&m_httpq, http, entry);
http->state = HS_CANCEL;
http->res.res = HRES_CANCEL;
if (!has_posted) {
has_posted = 1;
td_post_begin();
}
td_post(http_td_cb, http);
break;
case HS_DONE:
abort();
default:
break;
}
}

while ((cmsg = curl_multi_info_read(m_curlh, &nmsgs)) != NULL) {
BTPDQ_FOREACH(http, &m_httpq, entry) {
if (http->curlh == cmsg->easy_handle)
break;
}
assert(http != NULL);
BTPDQ_REMOVE(&m_httpq, http, entry);
http->state = HS_CANCEL;
http->res.res = HRES_CANCEL;
http->state = HS_DONE;
if (cmsg->data.result == 0)
http->res.res = HRES_OK;
else {
http->res.res = HRES_FAIL;
http->res.code = cmsg->data.result;
}
curl_multi_remove_handle(m_curlh, http->curlh);
if (!has_posted) {
has_posted = 1;
td_post_begin();
has_posted = 1;
}
td_post(http_td_cb, http);
break;
case HS_DONE:
abort();
default:
break;
}
}

while ((cmsg = curl_multi_info_read(m_curlh, &nmsgs)) != NULL) {
BTPDQ_FOREACH(http, &m_httpq, entry) {
if (http->curlh == cmsg->easy_handle)
break;
}
assert(http != NULL);
BTPDQ_REMOVE(&m_httpq, http, entry);
http->state = HS_DONE;
if (cmsg->data.result == 0)
http->res.res = HRES_OK;
else {
http->res.res = HRES_FAIL;
http->res.code = cmsg->data.result;
}
curl_multi_remove_handle(m_curlh, http->curlh);
if (!has_posted) {
td_post_begin();
has_posted = 1;
}
td_post(http_td_cb, http);
}

if (has_posted)
td_post_end();
if (has_posted)
td_post_end();
} while (BTPDQ_EMPTY(&m_httpq));
pthread_mutex_unlock(&m_httpq_lock);
}

static void
http_td_curl(void)
static void *
http_td(void *arg)
{
fd_set rset, wset, eset;
int maxfd, nbusy;

pthread_mutex_unlock(&m_httpq_lock);
for (;;) {
http_td_actions();

do {
while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(m_curlh, &nbusy))
;

FD_ZERO(&rset);
FD_ZERO(&wset);
FD_ZERO(&eset);

curl_multi_fdset(m_curlh, &rset, &wset, &eset, &maxfd);
select(maxfd + 1, &rset, &wset, &eset, (& (struct timeval) { 2, 0}));

pthread_mutex_lock(&m_httpq_lock);
http_td_actions();
pthread_mutex_unlock(&m_httpq_lock);
} while (nbusy > 0);

pthread_mutex_lock(&m_httpq_lock);
if (nbusy > 0) {
FD_ZERO(&rset);
FD_ZERO(&wset);
FD_ZERO(&eset);
curl_multi_fdset(m_curlh, &rset, &wset, &eset, &maxfd);
select(maxfd + 1, &rset, &wset, &eset, CURL_SELECT_TIME);
}
}
}

static void *
http_td(void *arg)
static void
errdie(int err)
{
pthread_mutex_lock(&m_httpq_lock);
for (;;) {
while (BTPDQ_EMPTY(&m_httpq))
pthread_cond_wait(&m_httpq_cond, &m_httpq_lock);
http_td_actions();
if (!BTPDQ_EMPTY(&m_httpq))
http_td_curl();
}
if (err != 0)
btpd_err("Fatal error in http_init.\n");
}

void
http_init(void)
{
int err;
if (curl_global_init(0))
goto curl_err;
if ((m_curlh = curl_multi_init()) == NULL)
goto curl_err;

err = pthread_mutex_init(&m_httpq_lock, NULL);
if (err == 0)
err = pthread_cond_init(&m_httpq_cond, NULL);
if (err == 0)
err = pthread_create(&m_httptd, NULL, http_td, NULL);
if (err != 0)
btpd_err("pthread failure (%s)\n", strerror(err));
return;
curl_err:
btpd_err("Fatal error in curl.\n");
pthread_t ret;
errdie(curl_global_init(0));
errdie((m_curlh = curl_multi_init()) == NULL);
errdie(pthread_mutex_init(&m_httpq_lock, NULL));
errdie(pthread_cond_init(&m_httpq_cond, NULL));
errdie(pthread_create(&ret, NULL, http_td, NULL));
}

Loading…
Cancel
Save