Selaa lähdekoodia

Split the tracker code into a generic part and a http specific part. This

allows me to add code for other types of trackers.

Remove the curl interface glue, since I use my own http client now. The curl
code was my main reason for using threads, so I'm a large step closer to make
btpd unthreaded again.
master
Richard Nyberg 18 vuotta sitten
vanhempi
commit
03c3b7ec49
5 muutettua tiedostoa jossa 284 lisäystä ja 433 poistoa
  1. +0
    -251
      btpd/http.c
  2. +0
    -32
      btpd/http.h
  3. +174
    -0
      btpd/http_tr_if.c
  4. +88
    -150
      btpd/tracker_req.c
  5. +22
    -0
      btpd/tracker_req.h

+ 0
- 251
btpd/http.c Näytä tiedosto

@@ -1,251 +0,0 @@
#include <pthread.h>
#include <stdarg.h>
#include <string.h>
#include <unistd.h>
#include <curl/curl.h>

#include "btpd.h"
#include "http.h"

#define MAX_DOWNLOAD (1 << 18) // 256kB
#define CURL_SELECT_TIME (& (struct timeval) { 1, 0 })

enum http_state {
HS_ADD,
HS_ACTIVE,
HS_DONE,
HS_NOADD,
HS_CANCEL
};

struct http {
long t_created;
enum http_state state;
char *url;
CURL *curlh;
struct http_res res;
BTPDQ_ENTRY(http) entry;
void (*cb)(struct http *, struct http_res *, void *);
void *cb_arg;
};

BTPDQ_HEAD(http_tq, http);

static struct http_tq m_httpq = BTPDQ_HEAD_INITIALIZER(m_httpq);
static pthread_mutex_t m_httpq_lock;
static pthread_cond_t m_httpq_cond;
static CURLM *m_curlh;

static size_t
http_write_cb(void *ptr, size_t size, size_t nmemb, void *arg)
{
char *mem;
struct http_res *res = arg;
size_t nbytes = size * nmemb;
size_t nlength = res->length + nbytes;
if (nlength > MAX_DOWNLOAD)
return 0;
if ((mem = realloc(res->content, nlength)) == NULL)
return 0;
res->content = mem;
bcopy(ptr, res->content + res->length, nbytes);
res->length = nlength;
return nbytes;
}

int
http_get(struct http **ret,
void (*cb)(struct http *, struct http_res *, void *), void *arg,
const char *fmt, ...)
{
struct http *h = btpd_calloc(1, sizeof(*h));

h->state = HS_ADD;
h->cb = cb;
h->cb_arg = arg;
h->t_created = btpd_seconds;
if ((h->curlh = curl_easy_init()) == NULL)
btpd_err("Fatal error in curl.\n");

va_list ap;
va_start(ap, fmt);
if (vasprintf(&h->url, fmt, ap) == -1)
btpd_err("Out of memory.\n");
va_end(ap);

curl_easy_setopt(h->curlh, CURLOPT_URL, h->url);
curl_easy_setopt(h->curlh, CURLOPT_USERAGENT, BTPD_VERSION);
curl_easy_setopt(h->curlh, CURLOPT_WRITEFUNCTION, http_write_cb);
curl_easy_setopt(h->curlh, CURLOPT_WRITEDATA, &h->res);
curl_easy_setopt(h->curlh, CURLOPT_FOLLOWLOCATION, 1);

pthread_mutex_lock(&m_httpq_lock);
BTPDQ_INSERT_TAIL(&m_httpq, h, entry);
pthread_mutex_unlock(&m_httpq_lock);
pthread_cond_signal(&m_httpq_cond);

if (ret != NULL)
*ret = h;

return 0;
}

long
http_server_busy_time(const char *url, long s)
{
struct http *h;
size_t len = strlen(url);

pthread_mutex_lock(&m_httpq_lock);
h = BTPDQ_LAST(&m_httpq, http_tq);
while (h != NULL &&
!((h->state == HS_ACTIVE || h->state == HS_ADD) &&
strncmp(url, h->url, len) == 0))
h = BTPDQ_PREV(h, http_tq, entry);
pthread_mutex_unlock(&m_httpq_lock);

if (h == NULL || btpd_seconds - h->t_created >= s)
return 0;
else
return s - (btpd_seconds - h->t_created);
}

void
http_cancel(struct http *http)
{
pthread_mutex_lock(&m_httpq_lock);
if (http->state == HS_ADD)
http->state = HS_NOADD;
else
http->state = HS_CANCEL;
pthread_mutex_unlock(&m_httpq_lock);
}

int
http_succeeded(struct http_res *res)
{
return res->res == HRES_OK && res->code >= 200 && res->code < 300;
}

static void
http_td_cb(void *arg)
{
struct http *h = arg;
if (h->res.res == HRES_OK)
curl_easy_getinfo(h->curlh, CURLINFO_RESPONSE_CODE, &h->res.code);
if (h->res.res == HRES_FAIL)
h->res.errmsg = curl_easy_strerror(h->res.code);
if (h->state != HS_CANCEL)
h->cb(h, &h->res, h->cb_arg);
curl_easy_cleanup(h->curlh);
if (h->res.content != NULL)
free(h->res.content);
free(h->url);
free(h);
}

static void
http_td_actions(void)
{
int nmsgs;
struct http *http, *next;
struct http_tq postq;
CURLMsg *cmsg;

pthread_mutex_lock(&m_httpq_lock);
do {
while (BTPDQ_EMPTY(&m_httpq))
pthread_cond_wait(&m_httpq_cond, &m_httpq_lock);

BTPDQ_INIT(&postq);

BTPDQ_FOREACH_MUTABLE(http, &m_httpq, entry, next) {
switch (http->state) {
case HS_ADD:
curl_multi_add_handle(m_curlh, http->curlh);
http->state = HS_ACTIVE;
break;
case HS_CANCEL:
curl_multi_remove_handle(m_curlh, http->curlh);
case HS_NOADD:
BTPDQ_REMOVE(&m_httpq, http, entry);
BTPDQ_INSERT_TAIL(&postq, http, entry);
http->state = HS_CANCEL;
http->res.res = HRES_CANCEL;
break;
case HS_DONE:
abort();
default:
break;
}
}

while ((cmsg = curl_multi_info_read(m_curlh, &nmsgs)) != NULL) {
BTPDQ_FOREACH(http, &m_httpq, entry) {
if (http->curlh == cmsg->easy_handle)
break;
}
assert(http != NULL);
BTPDQ_REMOVE(&m_httpq, http, entry);
BTPDQ_INSERT_TAIL(&postq, http, entry);
http->state = HS_DONE;
if (cmsg->data.result == 0)
http->res.res = HRES_OK;
else {
http->res.res = HRES_FAIL;
http->res.code = cmsg->data.result;
}
curl_multi_remove_handle(m_curlh, http->curlh);
}

if (!BTPDQ_EMPTY(&postq)) {
pthread_mutex_unlock(&m_httpq_lock);
td_post_begin();
BTPDQ_FOREACH(http, &postq, entry)
td_post(http_td_cb, http);
td_post_end();
pthread_mutex_lock(&m_httpq_lock);
}
} while (BTPDQ_EMPTY(&m_httpq));
pthread_mutex_unlock(&m_httpq_lock);
}

static void
http_td(void *arg)
{
fd_set rset, wset, eset;
int maxfd, nbusy;

for (;;) {
http_td_actions();

while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(m_curlh, &nbusy))
;

if (nbusy > 0) {
FD_ZERO(&rset);
FD_ZERO(&wset);
FD_ZERO(&eset);
curl_multi_fdset(m_curlh, &rset, &wset, &eset, &maxfd);
select(maxfd + 1, &rset, &wset, &eset, CURL_SELECT_TIME);
}
}
}

static void
errdie(int err)
{
if (err != 0)
btpd_err("Fatal error in http_init.\n");
}

void
http_init(void)
{
pthread_t ret;
errdie(curl_global_init(0));
errdie((m_curlh = curl_multi_init()) == NULL);
errdie(pthread_mutex_init(&m_httpq_lock, NULL));
errdie(pthread_cond_init(&m_httpq_cond, NULL));
errdie(pthread_create(&ret, NULL, (void *(*)(void *))http_td, NULL));
}

+ 0
- 32
btpd/http.h Näytä tiedosto

@@ -1,32 +0,0 @@
#ifndef BTPD_HTTP_H
#define BTPD_HTTP_H

struct http;

enum http_status {
HRES_OK,
HRES_FAIL,
HRES_CANCEL
};

struct http_res {
enum http_status res;
long code;
const char *errmsg;
char *content;
size_t length;
};

__attribute__((format (printf, 4, 5)))
int http_get(struct http **ret,
void (*cb)(struct http *, struct http_res *, void *),
void *arg,
const char *fmt, ...);
void http_cancel(struct http *http);
int http_succeeded(struct http_res *res);

long http_server_busy_time(const char *url, long s);

void http_init(void);

#endif

+ 174
- 0
btpd/http_tr_if.c Näytä tiedosto

@@ -0,0 +1,174 @@
#include <string.h>

#include "btpd.h"
#include "tracker_req.h"
#include "http_client.h"

#define MAX_DOWNLOAD (1 << 18) // 256kB

static const char *m_tr_events[] = { "started", "stopped", "completed", "" };

struct http_tr_req {
struct torrent *tp;
struct http_req *req;
struct evbuffer *buf;
enum tr_event event;
};

static void
http_tr_free(struct http_tr_req *treq)
{
evbuffer_free(treq->buf);
free(treq);
}

static void
maybe_connect_to(struct torrent *tp, const char *pinfo)
{
const char *pid;
char *ip;
int port;
size_t len;

if ((pid = benc_dget_mem(pinfo, "peer id", &len)) == NULL || len != 20)
return;

if (bcmp(btpd_get_peer_id(), pid, 20) == 0)
return;

if (net_torrent_has_peer(tp->net, pid))
return;

if ((ip = benc_dget_str(pinfo, "ip", NULL)) == NULL)
return;

port = benc_dget_int(pinfo, "port");
peer_create_out(tp->net, pid, ip, port);

if (ip != NULL)
free(ip);
}

static int
parse_reply(struct torrent *tp, const char *content, size_t size, int parse,
int *interval)
{
const char *buf;
size_t len;
const char *peers;

if (benc_validate(content, size) != 0)
goto bad_data;

if ((buf = benc_dget_mem(content, "failure reason", &len)) != NULL) {
btpd_log(BTPD_L_ERROR, "Tracker failure: '%.*s' for '%s'.\n",
(int)len, buf, torrent_name(tp));
return 1;
}

if (!parse)
return 0;

if (!benc_dct_chk(content, 2, BE_INT, 1, "interval", BE_ANY, 1, "peers"))
goto bad_data;

*interval = benc_dget_int(content, "interval");
if (*interval < 1)
goto bad_data;

peers = benc_dget_any(content, "peers");

if (benc_islst(peers)) {
for (peers = benc_first(peers);
peers != NULL && net_npeers < net_max_peers;
peers = benc_next(peers))
maybe_connect_to(tp, peers);
} else if (benc_isstr(peers)) {
peers = benc_dget_mem(content, "peers", &len);
for (size_t i = 0; i < len && net_npeers < net_max_peers; i += 6)
peer_create_out_compact(tp->net, peers + i);
} else
goto bad_data;

return 0;

bad_data:
btpd_log(BTPD_L_ERROR, "Bad data from tracker for '%s'.\n",
torrent_name(tp));
return 1;
}

static void
http_cb(struct http_req *req, struct http_response *res, void *arg)
{
int interval;
struct http_tr_req *treq = arg;
switch (res->type) {
case HTTP_T_ERR:
btpd_log(BTPD_L_ERROR, "http request failed for '%s'.\n",
torrent_name(treq->tp));
tr_result(treq->tp, TR_RES_FAIL, -1);
http_tr_free(treq);
break;
case HTTP_T_DATA:
if (treq->buf->off + res->v.data.l > MAX_DOWNLOAD) {
tr_result(treq->tp, TR_RES_FAIL, -1);
http_tr_cancel(treq);
break;
}
if (evbuffer_add(treq->buf, res->v.data.p, res->v.data.l) != 0)
btpd_err("Out of memory.\n");
break;
case HTTP_T_DONE:
if (parse_reply(treq->tp, treq->buf->buffer, treq->buf->off,
treq->event != TR_EV_STOPPED, &interval) == 0)
tr_result(treq->tp, TR_RES_OK, interval);
else
tr_result(treq->tp, TR_RES_FAIL, -1);
http_tr_free(treq);
break;
default:
break;
}
}

struct http_tr_req *
http_tr_req(struct torrent *tp, enum tr_event event, const char *aurl)
{
char e_hash[61], e_id[61], url[512], qc;;
const uint8_t *peer_id = btpd_get_peer_id();

qc = (strchr(aurl, '?') == NULL) ? '?' : '&';

for (int i = 0; i < 20; i++)
snprintf(e_hash + i * 3, 4, "%%%.2x", tp->tl->hash[i]);
for (int i = 0; i < 20; i++)
snprintf(e_id + i * 3, 4, "%%%.2x", peer_id[i]);

snprintf(url, sizeof(url),
"%s%cinfo_hash=%s&peer_id=%s&key=%ld&port=%d&uploaded=%llu"
"&downloaded=%llu&left=%llu&compact=1%s%s",
aurl, qc, e_hash, e_id, tr_key, net_port,
tp->net->uploaded, tp->net->downloaded,
(long long)tp->total_length - cm_content(tp),
event == TR_EV_EMPTY ? "" : "&event=", m_tr_events[event]);

struct http_tr_req *treq = btpd_calloc(1, sizeof(*treq));
if (!http_get(&treq->req, url, "User-Agent: " BTPD_VERSION "\r\n",
http_cb, treq)) {
free(treq);
return NULL;
}
if ((treq->buf = evbuffer_new()) == NULL)
btpd_err("Out of memory.\n");
treq->tp = tp;
treq->event = event;
return treq;
}

void
http_tr_cancel(struct http_tr_req *treq)
{
http_cancel(treq->req);
http_tr_free(treq);
}

+ 88
- 150
btpd/tracker_req.c Näytä tiedosto

@@ -1,24 +1,16 @@
#include <stdio.h>
#include <string.h>

#include "btpd.h"
#include "benc.h"
#include "subr.h"
#include "http.h"
#include "tracker_req.h"

#define REQ_DELAY 1
#define REQ_TIMEOUT (& (struct timeval) { 120, 0 })
#define RETRY_WAIT (& (struct timeval) { rand_between(35, 70), 0 })

enum tr_event {
TR_EV_STARTED,
TR_EV_STOPPED,
TR_EV_COMPLETED,
TR_EV_EMPTY
};
long tr_key;

static long m_key;
static const char *m_events[] = { "started", "stopped", "completed", "" };
static long m_tlast_req, m_tnext_req;

enum timer_type {
TIMER_NONE,
@@ -34,102 +26,28 @@ struct tracker {
unsigned nerrors;
int tier, url;
struct mi_announce *ann;
struct http *req;
void *req;
struct event timer;
};

static void tr_send(struct torrent *tp, enum tr_event event);

static void
maybe_connect_to(struct torrent *tp, const char *pinfo)
{
const char *pid;
char *ip;
int port;
size_t len;

if ((pid = benc_dget_mem(pinfo, "peer id", &len)) == NULL || len != 20)
return;

if (bcmp(btpd_get_peer_id(), pid, 20) == 0)
return;

if (net_torrent_has_peer(tp->net, pid))
return;

if ((ip = benc_dget_str(pinfo, "ip", NULL)) == NULL)
return;

port = benc_dget_int(pinfo, "port");
peer_create_out(tp->net, pid, ip, port);

if (ip != NULL)
free(ip);
}


static int
parse_reply(struct torrent *tp, const char *content, size_t size, int parse)
{
const char *buf;
size_t len;
const char *peers;
int interval;

if (benc_validate(content, size) != 0)
goto bad_data;

if ((buf = benc_dget_mem(content, "failure reason", &len)) != NULL) {
btpd_log(BTPD_L_ERROR, "Tracker failure: '%.*s' for '%s'.\n",
(int)len, buf, torrent_name(tp));
return 1;
}

if (!parse)
return 0;

if (!benc_dct_chk(content, 2, BE_INT, 1, "interval", BE_ANY, 1, "peers"))
goto bad_data;
typedef struct _dummy *(*request_fun_t)(struct torrent *, enum tr_event,
const char *);
typedef void (*cancel_fun_t)(struct _dummy *);

interval = benc_dget_int(content, "interval");
if (interval < 1)
goto bad_data;

tp->tr->interval = interval;
peers = benc_dget_any(content, "peers");

if (benc_islst(peers)) {
for (peers = benc_first(peers);
peers != NULL && net_npeers < net_max_peers;
peers = benc_next(peers))
maybe_connect_to(tp, peers);
} else if (benc_isstr(peers)) {
peers = benc_dget_mem(content, "peers", &len);
for (size_t i = 0; i < len && net_npeers < net_max_peers; i += 6)
peer_create_out_compact(tp->net, peers + i);
} else
goto bad_data;

return 0;
struct tr_op {
int len;
const char *scheme;
request_fun_t request;
cancel_fun_t cancel;
};

bad_data:
btpd_log(BTPD_L_ERROR, "Bad data from tracker for '%s'.\n",
torrent_name(tp));
return 1;
}
static struct tr_op m_http_op = {
7, "http://", (request_fun_t)http_tr_req, (cancel_fun_t)http_tr_cancel
};

static void
tr_set_stopped(struct torrent *tp)
{
struct tracker *tr = tp->tr;
btpd_ev_del(&tr->timer);
tr->ttype = TIMER_NONE;
if (tr->req != NULL) {
http_cancel(tr->req);
tr->req = NULL;
}
torrent_on_tr_stopped(tp);
}
static struct tr_op *m_tr_ops[] = {
&m_http_op, NULL
};

static char *
get_url(struct tracker *tr)
@@ -158,26 +76,82 @@ next_url(struct tracker *tr)
tr->tier = (tr->tier + 1) % tr->ann->ntiers;
}

struct tr_op *
get_op(struct tracker *tr)
{
struct tr_op *op;
char *url = get_url(tr);
for (op = m_tr_ops[0]; op != NULL; op++)
if (strncasecmp(op->scheme, url, op->len) == 0)
return op;
return NULL;
}

static void
http_cb(struct http *req, struct http_res *res, void *arg)
tr_cancel(struct tracker *tr)
{
struct tr_op *op = get_op(tr);
assert(op != NULL);
op->cancel(tr->req);
tr->req = NULL;
}

static void
tr_send(struct torrent *tp, enum tr_event event)
{
struct tracker *tr = tp->tr;
struct tr_op *op = get_op(tr);

tr->event = event;
if (tr->req != NULL)
tr_cancel(tr);

if (m_tlast_req > btpd_seconds - REQ_DELAY) {
m_tnext_req = max(m_tnext_req, m_tlast_req) + REQ_DELAY;
tr->ttype = TIMER_RETRY;
btpd_ev_add(&tr->timer,
(& (struct timeval) { m_tnext_req - btpd_seconds, 0 }));
return;
}

if ((op == NULL ||
(tr->req = op->request(tp, event, get_url(tr))) == NULL)) {
tr->ttype = TIMER_RETRY;
btpd_ev_add(&tr->timer, (& (struct timeval) { 20, 0 }));
} else {
m_tlast_req = btpd_seconds;
tr->ttype = TIMER_TIMEOUT;
btpd_ev_add(&tr->timer, REQ_TIMEOUT);
}
}

static void
tr_set_stopped(struct torrent *tp)
{
struct tracker *tr = tp->tr;
btpd_ev_del(&tr->timer);
tr->ttype = TIMER_NONE;
if (tr->req != NULL)
tr_cancel(tr);
torrent_on_tr_stopped(tp);
}

void
tr_result(struct torrent *tp, enum tr_res res, int interval)
{
struct torrent *tp = arg;
struct tracker *tr = tp->tr;
assert(tr->ttype == TIMER_TIMEOUT);
tr->req = NULL;
if (res->res == HRES_OK && parse_reply(tp, res->content, res->length,
tr->event != TR_EV_STOPPED) == 0) {
if (res == TR_RES_OK) {
good_url(tr);
tr->interval = interval;
tr->nerrors = 0;
tr->ttype = TIMER_INTERVAL;
btpd_ev_add(&tr->timer, (& (struct timeval) { tr->interval, 0 }));
btpd_ev_add(&tr->timer, (& (struct timeval) { tr->interval, 0}));
} else {
if (res->res == HRES_FAIL)
btpd_log(BTPD_L_BTPD, "Tracker request for '%s' failed (%s).\n",
torrent_name(tp), res->errmsg);
tr->nerrors++;
tr->ttype = TIMER_RETRY;
btpd_ev_add(&tr->timer, RETRY_WAIT);
next_url(tr);
}
if (tr->event == TR_EV_STOPPED && (tr->nerrors == 0 || tr->nerrors >= 5))
tr_set_stopped(tp);
@@ -197,8 +171,9 @@ timer_cb(int fd, short type, void *arg)
tr_set_stopped(tp);
break;
}
case TIMER_RETRY:
tr_cancel(tr);
next_url(tr);
case TIMER_RETRY:
tr_send(tp, tr->event);
break;
case TIMER_INTERVAL:
@@ -209,43 +184,6 @@ timer_cb(int fd, short type, void *arg)
}
}

static void
tr_send(struct torrent *tp, enum tr_event event)
{
long busy_secs;
char e_hash[61], e_id[61], qc;;
const uint8_t *peer_id = btpd_get_peer_id();

struct tracker *tr = tp->tr;
tr->event = event;
if (tr->ttype == TIMER_TIMEOUT)
http_cancel(tr->req);

if ((busy_secs = http_server_busy_time(get_url(tr), 3)) > 0) {
tr->ttype = TIMER_RETRY;
btpd_ev_add(&tr->timer, (& (struct timeval) { busy_secs, 0 }));
return;
}

tr->ttype = TIMER_TIMEOUT;
btpd_ev_add(&tr->timer, REQ_TIMEOUT);

qc = (strchr(get_url(tr), '?') == NULL) ? '?' : '&';

for (int i = 0; i < 20; i++)
snprintf(e_hash + i * 3, 4, "%%%.2x", tp->tl->hash[i]);
for (int i = 0; i < 20; i++)
snprintf(e_id + i * 3, 4, "%%%.2x", peer_id[i]);

http_get(&tr->req, http_cb, tp,
"%s%cinfo_hash=%s&peer_id=%s&key=%ld&port=%d&uploaded=%llu"
"&downloaded=%llu&left=%llu&compact=1%s%s",
get_url(tr), qc, e_hash, e_id, m_key, net_port,
tp->net->uploaded, tp->net->downloaded,
(long long)tp->total_length - cm_content(tp),
event == TR_EV_EMPTY ? "" : "&event=", m_events[event]);
}

int
tr_create(struct torrent *tp, const char *mi)
{
@@ -263,7 +201,7 @@ tr_kill(struct torrent *tp)
tp->tr = NULL;
btpd_ev_del(&tr->timer);
if (tr->req != NULL)
http_cancel(tr->req);
tr_cancel(tr);
mi_free_announce(tr->ann);
free(tr);
}
@@ -310,5 +248,5 @@ tr_errors(struct torrent *tp)
void
tr_init(void)
{
m_key = random();
tr_key = random();
}

+ 22
- 0
btpd/tracker_req.h Näytä tiedosto

@@ -1,6 +1,20 @@
#ifndef TRACKER_REQ_H
#define TRACKER_REQ_H

enum tr_event {
TR_EV_STARTED,
TR_EV_STOPPED,
TR_EV_COMPLETED,
TR_EV_EMPTY
};

enum tr_res {
TR_RES_OK,
TR_RES_FAIL
};

extern long tr_key;

int tr_create(struct torrent *tp, const char *mi);
void tr_kill(struct torrent *tp);
void tr_start(struct torrent *tp);
@@ -10,4 +24,12 @@ void tr_complete(struct torrent *tp);
unsigned tr_errors(struct torrent *tp);
int tr_active(struct torrent *tp);

void tr_result(struct torrent *tp, enum tr_res res, int interval);

struct http_tr_req;

struct http_tr_req *http_tr_req(struct torrent *tp, enum tr_event event,
const char *aurl);
void http_tr_cancel(struct http_tr_req *treq);

#endif

Loading…
Peruuta
Tallenna