Kaynağa Gözat

* Added a method for other threads to execute callbacks in the main thread.

* Added a thread and api for http requests. The thread uses the curl multi
  interface and will be used by the tracker code. The tracker code currently
  use forked processes to do its bidding.
master
Richard Nyberg 19 yıl önce
ebeveyn
işleme
7f8f5dd03b
5 değiştirilmiş dosya ile 367 ekleme ve 1 silme
  1. +1
    -0
      btpd/Makefile.am
  2. +85
    -1
      btpd/btpd.c
  3. +7
    -0
      btpd/btpd.h
  4. +245
    -0
      btpd/http.c
  5. +29
    -0
      btpd/http.h

+ 1
- 0
btpd/Makefile.am Dosyayı Görüntüle

@@ -3,6 +3,7 @@ btpd_SOURCES=\
btpd.c btpd.h\
content.c content.h\
download.c download_subr.c download.h\
http.c http.h\
main.c\
net.c net.h\
net_buf.c net_buf.h\


+ 85
- 1
btpd/btpd.c Dosyayı Görüntüle

@@ -17,6 +17,7 @@
#include <getopt.h>
#include <math.h>
#include <locale.h>
#include <pthread.h>
#include <pwd.h>
#include <signal.h>
#include <stdio.h>
@@ -26,6 +27,7 @@
#include <unistd.h>

#include "btpd.h"
#include "http.h"

struct child {
pid_t pid;
@@ -160,7 +162,87 @@ load_library(void)
free(entries);
}

extern void ipc_init(void);
struct td_cb {
void (*cb)(void *);
void *arg;
BTPDQ_ENTRY(td_cb) entry;
};

BTPDQ_HEAD(td_cb_tq, td_cb);

static int m_td_rd, m_td_wr;
static struct event m_td_ev;
static struct td_cb_tq m_td_cbs = BTPDQ_HEAD_INITIALIZER(m_td_cbs);
static pthread_mutex_t m_td_lock;

void
td_acquire_lock(void)
{
pthread_mutex_lock(&m_td_lock);
}

void
td_release_lock(void)
{
pthread_mutex_unlock(&m_td_lock);
}

void
td_post(void (*fun)(void *), void *arg)
{
struct td_cb *cb = btpd_calloc(1, sizeof(*cb));
cb->cb = fun;
cb->arg = arg;
BTPDQ_INSERT_TAIL(&m_td_cbs, cb, entry);
}

void
td_post_end(void)
{
char c = '1';
td_release_lock();
write(m_td_wr, &c, sizeof(c));
}

static void
td_cb(int fd, short type, void *arg)
{
char buf[1024];
struct td_cb_tq tmpq = BTPDQ_HEAD_INITIALIZER(tmpq);
struct td_cb *cb, *next;

read(fd, buf, sizeof(buf));
td_acquire_lock();
BTPDQ_FOREACH_MUTABLE(cb, &m_td_cbs, entry, next)
BTPDQ_INSERT_TAIL(&tmpq, cb, entry);
BTPDQ_INIT(&m_td_cbs);
td_release_lock();

BTPDQ_FOREACH_MUTABLE(cb, &tmpq, entry, next) {
cb->cb(cb->arg);
free(cb);
}
}

static void
td_init(void)
{
int err;
int fds[2];
if (pipe(fds) == -1) {
btpd_err("Couldn't create thread callback pipe (%s).\n",
strerror(errno));
}
m_td_rd = fds[0];
m_td_wr = fds[1];
if ((err = pthread_mutex_init(&m_td_lock, NULL)) != 0)
btpd_err("Couldn't create mutex (%s).\n", strerror(err));

event_set(&m_td_ev, m_td_rd, EV_READ|EV_PERSIST, td_cb, NULL);
event_add(&m_td_ev, NULL);
}

void ipc_init(void);

void
btpd_init(void)
@@ -171,6 +253,8 @@ btpd_init(void)
for (int i = sizeof(BTPD_VERSION); i < 20; i++)
m_peer_id[i] = rand_between(0, 255);

td_init();
http_init();
net_init();
//ipc_init();
ul_init();


+ 7
- 0
btpd/btpd.h Dosyayı Görüntüle

@@ -58,4 +58,11 @@ void btpd_del_torrent(struct torrent *tp);
unsigned btpd_get_ntorrents(void);
const uint8_t *btpd_get_peer_id(void);

void td_acquire_lock(void);
void td_release_lock(void);

#define td_post_begin td_acquire_lock
void td_post(void (*fun)(void *), void *arg);
void td_post_end(void);

#endif

+ 245
- 0
btpd/http.c Dosyayı Görüntüle

@@ -0,0 +1,245 @@
#include <pthread.h>
#include <stdarg.h>
#include <string.h>
#include <unistd.h>
#include <curl/curl.h>

#include "btpd.h"
#include "http.h"

#define MAX_DOWNLOAD (256 << 10)

enum http_state {
HS_ADD,
HS_ACTIVE,
HS_DONE,
HS_NOADD,
HS_CANCEL
};

struct http {
enum http_state state;
char *url;
CURL *curlh;
struct http_res res;
BTPDQ_ENTRY(http) entry;
void (*cb)(struct http *, struct http_res *, void *);
void *cb_arg;
};

BTPDQ_HEAD(http_tq, http);

static pthread_t m_httptd;
static struct http_tq m_httpq = BTPDQ_HEAD_INITIALIZER(m_httpq);
static pthread_mutex_t m_httpq_lock;
static pthread_cond_t m_httpq_cond;
static CURLM *m_curlh;

static size_t
http_write_cb(void *ptr, size_t size, size_t nmemb, void *arg)
{
char *mem;
struct http_res *res = arg;
size_t nbytes = size * nmemb;
size_t nlength = res->length + nbytes;
if (nlength > MAX_DOWNLOAD)
return 0;
if ((mem = realloc(res->content, nlength)) == NULL)
return 0;
res->content = mem;
bcopy(ptr, res->content + res->length, nbytes);
res->length = nlength;
return nbytes;
}

int
http_get(struct http **ret,
void (*cb)(struct http *, struct http_res *, void *),
void *arg,
const char *fmt, ...)
{
struct http *h = btpd_calloc(1, sizeof(*h));

h->state = HS_ADD;
h->cb = cb;
h->cb_arg = arg;
if ((h->curlh = curl_easy_init()) == NULL)
btpd_err("Fatal error in curl.\n");

va_list ap;
va_start(ap, fmt);
if (vasprintf(&h->url, fmt, ap) == -1)
btpd_err("Out of memory.\n");
va_end(ap);

curl_easy_setopt(h->curlh, CURLOPT_URL, h->url);
curl_easy_setopt(h->curlh, CURLOPT_USERAGENT, BTPD_VERSION);
curl_easy_setopt(h->curlh, CURLOPT_WRITEFUNCTION, http_write_cb);
curl_easy_setopt(h->curlh, CURLOPT_WRITEDATA, &h->res);
curl_easy_setopt(h->curlh, CURLOPT_FOLLOWLOCATION, 1);

pthread_mutex_lock(&m_httpq_lock);
BTPDQ_INSERT_TAIL(&m_httpq, h, entry);
pthread_mutex_unlock(&m_httpq_lock);
pthread_cond_signal(&m_httpq_cond);

if (ret != NULL)
*ret = h;

return 0;
}

void
http_cancel(struct http *http)
{
pthread_mutex_lock(&m_httpq_lock);
if (http->state == HS_ADD)
http->state = HS_NOADD;
else if (http->state == HS_ACTIVE)
http->state = HS_CANCEL;
pthread_mutex_unlock(&m_httpq_lock);
}

int
http_succeeded(struct http_res *res)
{
return res->res == HRES_OK && res->code >= 200 && res->code < 300;
}

static void
http_td_cb(void *arg)
{
struct http *h = arg;
if (h->res.res == HRES_OK)
curl_easy_getinfo(h->curlh, CURLINFO_RESPONSE_CODE, &h->res.code);
if (h->res.res == HRES_FAIL) {
btpd_log(BTPD_L_BTPD, "Http error for url '%s' (%s).\n", h->url,
curl_easy_strerror(h->res.code));
}
h->cb(h, &h->res, h->cb_arg);
curl_easy_cleanup(h->curlh);
if (h->res.content != NULL)
free(h->res.content);
free(h->url);
free(h);
}

static void
http_td_actions(void)
{
int has_posted = 0;
struct http *http, *next;
int nmsgs;
CURLMsg *cmsg;

BTPDQ_FOREACH_MUTABLE(http, &m_httpq, entry, next) {
switch (http->state) {
case HS_ADD:
curl_multi_add_handle(m_curlh, http->curlh);
http->state = HS_ACTIVE;
break;
case HS_CANCEL:
curl_multi_remove_handle(m_curlh, http->curlh);
case HS_NOADD:
BTPDQ_REMOVE(&m_httpq, http, entry);
http->state = HS_CANCEL;
http->res.res = HRES_CANCEL;
if (!has_posted) {
has_posted = 1;
td_post_begin();
}
td_post(http_td_cb, http);
break;
case HS_DONE:
abort();
default:
break;
}
}

while ((cmsg = curl_multi_info_read(m_curlh, &nmsgs)) != NULL) {
BTPDQ_FOREACH(http, &m_httpq, entry) {
if (http->curlh == cmsg->easy_handle)
break;
}
assert(http != NULL);
BTPDQ_REMOVE(&m_httpq, http, entry);
http->state = HS_DONE;
if (cmsg->data.result == 0)
http->res.res = HRES_OK;
else {
http->res.res = HRES_FAIL;
http->res.code = cmsg->data.result;
}
curl_multi_remove_handle(m_curlh, http->curlh);
if (!has_posted) {
td_post_begin();
has_posted = 1;
}
td_post(http_td_cb, http);
}

if (has_posted)
td_post_end();
}

static void
http_td_curl(void)
{
fd_set rset, wset, eset;
int maxfd, nbusy;

pthread_mutex_unlock(&m_httpq_lock);

do {
while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(m_curlh, &nbusy))
;

FD_ZERO(&rset);
FD_ZERO(&wset);
FD_ZERO(&eset);

curl_multi_fdset(m_curlh, &rset, &wset, &eset, &maxfd);
select(maxfd + 1, &rset, &wset, &eset, (& (struct timeval) { 2, 0}));

pthread_mutex_lock(&m_httpq_lock);
http_td_actions();
pthread_mutex_unlock(&m_httpq_lock);
} while (nbusy > 0);

pthread_mutex_lock(&m_httpq_lock);
}

static void *
http_td(void *arg)
{
pthread_mutex_lock(&m_httpq_lock);
for (;;) {
while (BTPDQ_EMPTY(&m_httpq))
pthread_cond_wait(&m_httpq_cond, &m_httpq_lock);
http_td_actions();
if (!BTPDQ_EMPTY(&m_httpq))
http_td_curl();
}
}

void
http_init(void)
{
int err;
if (curl_global_init(0))
goto curl_err;
if ((m_curlh = curl_multi_init()) == NULL)
goto curl_err;

err = pthread_mutex_init(&m_httpq_lock, NULL);
if (err == 0)
err = pthread_cond_init(&m_httpq_cond, NULL);
if (err == 0)
err = pthread_create(&m_httptd, NULL, http_td, NULL);
if (err != 0)
btpd_err("pthread failure (%s)\n", strerror(err));
return;
curl_err:
btpd_err("Fatal error in curl.\n");
}

+ 29
- 0
btpd/http.h Dosyayı Görüntüle

@@ -0,0 +1,29 @@
#ifndef BTPD_HTTP_H
#define BTPD_HTTP_H

struct http;

enum http_status {
HRES_OK,
HRES_FAIL,
HRES_CANCEL
};

struct http_res {
enum http_status res;
long code;
char *content;
size_t length;
};

int http_get(struct http **ret,
void (*cb)(struct http *, struct http_res *, void *),
void *arg,
const char *fmt, ...);
void http_cancel(struct http *http);

int http_succeeded(struct http_res *res);

void http_init(void);

#endif

Yükleniyor…
İptal
Kaydet