Переглянути джерело

Make the content code unthreaded and remove the no longer needed inter

thread messaging code. Also simplify some interfaces by polling for
state changes when appropriate instead of being called directly at any
time.
master
Richard Nyberg 18 роки тому
джерело
коміт
95e83bb247
7 змінених файлів з 301 додано та 604 видалено
  1. +6
    -11
      btpd/btpd.c
  2. +0
    -9
      btpd/btpd.h
  3. +292
    -492
      btpd/content.c
  4. +2
    -1
      btpd/content.h
  5. +0
    -87
      btpd/td.c
  6. +1
    -3
      btpd/torrent.h
  7. +0
    -1
      btpd/tracker_req.c

+ 6
- 11
btpd/btpd.c Переглянути файл

@@ -40,8 +40,9 @@ btpd_shutdown(int grace_seconds)
if (tp->state != T_STOPPING)
torrent_stop(tp);
if (grace_seconds >= 0) {
event_once(-1, EV_TIMEOUT, grace_cb, NULL,
(& (struct timeval) { grace_seconds, 0 }));
if (event_once(-1, EV_TIMEOUT, grace_cb, NULL,
(& (struct timeval) { grace_seconds, 0 })) != 0)
btpd_err("failed to add event (%s).\n", strerror(errno));
}
}
}
@@ -57,13 +58,6 @@ btpd_get_peer_id(void)
return m_peer_id;
}

void
btpd_on_no_torrents(void)
{
if (m_shutdown)
btpd_exit(0);
}

static void
signal_cb(int signal, short type, void *arg)
{
@@ -77,9 +71,11 @@ heartbeat_cb(int fd, short type, void *arg)
btpd_ev_add(&m_heartbeat, (& (struct timeval) { 1, 0 }));
btpd_seconds++;
net_on_tick();
torrent_on_tick_all();
if (m_shutdown && torrent_count() == 0)
btpd_exit(0);
}

void td_init(void);
void tr_init(void);
void ipc_init(void);

@@ -93,7 +89,6 @@ btpd_init(void)
for (int i = sizeof(BTPD_VERSION); i < 20; i++)
m_peer_id[i] = rand_between(0, 255);

td_init();
net_init();
ipc_init();
ul_init();


+ 0
- 9
btpd/btpd.h Переглянути файл

@@ -67,13 +67,4 @@ int btpd_is_stopping(void);

const uint8_t *btpd_get_peer_id(void);

void td_acquire_lock(void);
void td_release_lock(void);

#define td_post_begin td_acquire_lock
void td_post(void (*fun)(void *), void *arg);
void td_post_end(void);

void btpd_on_no_torrents(void);

#endif

+ 292
- 492
btpd/content.c Переглянути файл

@@ -3,8 +3,6 @@

#include <fcntl.h>
#include <math.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
@@ -14,56 +12,13 @@
#include "btpd.h"
#include "stream.h"

struct cm_write_data {
uint32_t begin;
uint8_t *buf;
size_t len;
BTPDQ_ENTRY(cm_write_data) entry;
};

BTPDQ_HEAD(cm_write_data_tq, cm_write_data);

enum cm_op_type {
CM_ALLOC,
CM_SAVE,
CM_START,
CM_TEST,
CM_WRITE
};

struct cm_op {
struct torrent *tp;
int error;
int received;
enum cm_op_type type;
union {
struct {
uint32_t piece;
uint32_t pos;
} alloc;
struct {
volatile sig_atomic_t cancel;
} start;
struct {
uint32_t piece;
uint32_t pos;
int ok;
} test;
struct {
uint32_t piece;
uint32_t pos;
struct cm_write_data_tq q;
} write;
} u;

BTPDQ_ENTRY(cm_op) cm_entry;
BTPDQ_ENTRY(cm_op) td_entry;
struct rstat {
time_t mtime;
off_t size;
};

BTPDQ_HEAD(cm_op_tq, cm_op);

struct content {
int active;
enum { CM_INACTIVE, CM_STARTING, CM_ACTIVE } state;

uint32_t npieces_got;

@@ -73,11 +28,8 @@ struct content {

uint8_t *piece_field;
uint8_t *block_field;
uint8_t *hold_field;
uint8_t *pos_field;

struct cm_op_tq todoq;

struct bt_stream *rds;
struct bt_stream *wrs;

@@ -86,15 +38,11 @@ struct content {

#define ZEROBUFLEN (1 << 14)

struct cm_comm {
struct cm_op_tq q;
pthread_mutex_t lock;
pthread_cond_t cond;
};

static struct cm_comm m_long_comm, m_short_comm;
static const uint8_t m_zerobuf[ZEROBUFLEN];

int stat_and_adjust(struct torrent *tp, struct rstat ret[]);
static int save_resume(struct torrent *tp, struct rstat sbs[]);

static int
fd_cb_rd(const char *path, int *fd, void *arg)
{
@@ -109,53 +57,153 @@ fd_cb_wr(const char *path, int *fd, void *arg)
return vopen(fd, O_RDWR, "%s/%s", tp->tl->dir, path);
}

static void
cm_td_post_common(struct cm_comm *comm, struct cm_op *op)
struct pct_data {
off_t off, remain;
struct torrent *tp;
SHA_CTX sha;
BTPDQ_ENTRY(pct_data) entry;
uint32_t piece;
void (*cb)(struct torrent *, uint32_t, int);
};

BTPDQ_HEAD(pct_tq, pct_data);

static struct pct_tq m_pctq = BTPDQ_HEAD_INITIALIZER(m_pctq);
static void cm_write_done(struct torrent *tp);

struct start_test_data {
struct torrent *tp;
struct rstat *rstat;
uint32_t start;
BTPDQ_ENTRY(start_test_data) entry;
};

BTPDQ_HEAD(std_tq, start_test_data);

static struct std_tq m_startq = BTPDQ_HEAD_INITIALIZER(m_startq);

static struct event m_workev;

#define READBUFLEN (1 << 14)

static int
test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece)
{
pthread_mutex_lock(&comm->lock);
BTPDQ_INSERT_TAIL(&comm->q, op, td_entry);
pthread_mutex_unlock(&comm->lock);
pthread_cond_signal(&comm->cond);
char piece_hash[SHA_DIGEST_LENGTH];
int fd;
int err;

err = vopen(&fd, O_RDONLY, "torrents/%s/torrent", tp->relpath);
if (err != 0)
btpd_err("failed to open 'torrents/%s/torrent' (%s).\n",
tp->relpath, strerror(err));

lseek(fd, tp->pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET);
read(fd, piece_hash, SHA_DIGEST_LENGTH);
close(fd);

return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
}

static void
cm_td_post_long(struct cm_op *op)
void
pct_create(struct torrent *tp, uint32_t piece,
void (*cb)(struct torrent *, uint32_t, int))
{
cm_td_post_common(&m_long_comm, op);
struct pct_data *p = btpd_calloc(1, sizeof(*p));
p->piece = piece;
p->tp = tp;
p->off = piece * tp->piece_length;
p->remain = torrent_piece_size(tp, piece);
SHA1_Init(&p->sha);
p->cb = cb;
BTPDQ_INSERT_TAIL(&m_pctq, p, entry);
btpd_ev_add(&m_workev, (& (struct timeval) { 0, 0 }));
}

static void
cm_td_post_short(struct cm_op *op)
void
pct_kill(struct pct_data *p)
{
cm_td_post_common(&m_short_comm, op);
BTPDQ_REMOVE(&m_pctq, p, entry);
free(p);
}

static void
run_todo(struct content *cm)
{
struct cm_op *op = BTPDQ_FIRST(&cm->todoq);
void
pct_run(struct pct_data *p)
{
char buf[READBUFLEN];
size_t unit = (10 << 14);

while (p->remain > 0 && unit > 0) {
size_t wantread = min(p->remain, sizeof(buf));
if (wantread > unit)
wantread = unit;
if ((errno = bts_get(p->tp->cm->rds, p->off, buf, wantread)) != 0)
btpd_err("IO error on '%s' (%s).\n", bts_filename(p->tp->cm->rds),
strerror(errno));
p->remain -= wantread;
unit -= wantread;
p->off += wantread;
SHA1_Update(&p->sha, buf, wantread);
}
if (p->remain == 0) {
uint8_t hash[SHA_DIGEST_LENGTH];
SHA1_Final(hash, &p->sha);
p->cb(p->tp, p->piece, test_hash(p->tp, hash, p->piece) == 0);
pct_kill(p);
}
}

if (op->type == CM_WRITE && BTPDQ_EMPTY(&op->u.write.q)) {
BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
free(op);
if (!BTPDQ_EMPTY(&cm->todoq))
run_todo(cm);
return;
void
pct_cb(struct torrent *tp, uint32_t piece, int ok)
{
struct content *cm = tp->cm;
if (ok) {
assert(cm->npieces_got < tp->npieces);
cm->npieces_got++;
set_bit(cm->piece_field, piece);
if (net_active(tp))
dl_on_ok_piece(tp->net, piece);
if (cm_full(tp))
cm_write_done(tp);
} else {
cm->ncontent_bytes -= torrent_piece_size(tp, piece);
bzero(cm->block_field + piece * cm->bppbf, cm->bppbf);
if (net_active(tp))
dl_on_bad_piece(tp->net, piece);
}
}

if (op->type != CM_START)
cm_td_post_short(op);
else
cm_td_post_long(op);
void
work_stop(struct torrent *tp)
{
struct content *cm = tp->cm;
struct pct_data *pct, *next;
if (cm->state == CM_STARTING) {
struct start_test_data *std;
BTPDQ_FOREACH(std, &m_startq, entry)
if (std->tp == tp) {
BTPDQ_REMOVE(&m_startq, std, entry);
free(std->rstat);
free(std);
break;
}
}
BTPDQ_FOREACH_MUTABLE(pct, &m_pctq, entry, next)
if (pct->tp == tp)
pct_kill(pct);
}

static void
add_todo(struct content *cm, struct cm_op *op)
static int test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece);

void
worker_cb(int fd, short type, void *arg)
{
int was_empty = BTPDQ_EMPTY(&cm->todoq);
BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
if (was_empty)
run_todo(cm);
struct pct_data *p = BTPDQ_FIRST(&m_pctq);
if (p == NULL)
return;
pct_run(p);
if (!BTPDQ_EMPTY(&m_pctq))
event_add(&m_workev, (& (struct timeval) { 0, 0 }));
}

void
@@ -165,7 +213,6 @@ cm_kill(struct torrent *tp)
bts_close(cm->rds);
free(cm->piece_field);
free(cm->block_field);
free(cm->hold_field);
free(cm->pos_field);
free(cm);
tp->cm = NULL;
@@ -174,11 +221,9 @@ cm_kill(struct torrent *tp)
void
cm_save(struct torrent *tp)
{
struct content *cm = tp->cm;
struct cm_op *op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->type = CM_SAVE;
add_todo(cm, op);
struct rstat sbs[tp->nfiles];
if (stat_and_adjust(tp, sbs) == 0)
save_resume(tp, sbs);
}

static void
@@ -187,7 +232,7 @@ cm_write_done(struct torrent *tp)
struct content *cm = tp->cm;

if ((errno = bts_close(cm->wrs)) != 0)
btpd_err("Error closing write stream for '%s' (%s).\n",
btpd_err("error closing write stream for '%s' (%s).\n",
torrent_name(tp), strerror(errno));
cm->wrs = NULL;
btpd_ev_del(&cm->save_timer);
@@ -198,30 +243,27 @@ void
cm_stop(struct torrent *tp)
{
struct content *cm = tp->cm;
cm->active = 0;
struct cm_op *op = BTPDQ_FIRST(&cm->todoq);
if (op != NULL && op->type == CM_START) {
pthread_mutex_lock(&m_long_comm.lock);
if (op->received)
op->u.start.cancel = 1;
else {
BTPDQ_REMOVE(&m_long_comm.q, op, td_entry);
BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
free(op);
}
pthread_mutex_unlock(&m_long_comm.lock);
} else if (!cm_full(tp))

if (cm->state == CM_ACTIVE && !cm_full(tp))
cm_write_done(tp);

if (BTPDQ_EMPTY(&cm->todoq))
torrent_on_cm_stopped(tp);
work_stop(tp);

cm->state = CM_INACTIVE;
}

int
cm_active(struct torrent *tp)
{
struct content *cm = tp->cm;
return cm->active || !BTPDQ_EMPTY(&cm->todoq);
return cm->state != CM_INACTIVE;
}

int
cm_started(struct torrent *tp)
{
struct content *cm = tp->cm;
return cm->state == CM_ACTIVE;
}

#define SAVE_INTERVAL (& (struct timeval) { 15, 0 })
@@ -234,95 +276,21 @@ save_timer_cb(int fd, short type, void *arg)
cm_save(tp);
}

static void
cm_td_cb(void *arg)
{
int err;
struct cm_op *op = arg;
struct torrent *tp = op->tp;
struct content *cm = tp->cm;

if (op->error)
btpd_err("IO error for '%s'.\n", torrent_name(tp));

switch (op->type) {
case CM_ALLOC:
set_bit(cm->pos_field, op->u.alloc.pos);
clear_bit(cm->hold_field, op->u.alloc.piece);
break;
case CM_START:
if (cm->active) {
assert(!op->u.start.cancel);
if (!cm_full(tp)) {
if ((err = bts_open(&cm->wrs, tp->nfiles, tp->files,
fd_cb_wr, tp)) != 0)
btpd_err("Couldn't open write stream for '%s' (%s).\n",
torrent_name(tp), strerror(err));
btpd_ev_add(&cm->save_timer, SAVE_INTERVAL);
}
torrent_on_cm_started(tp);
}
break;
case CM_TEST:
if (op->u.test.ok) {
assert(cm->npieces_got < tp->npieces);
cm->npieces_got++;
set_bit(cm->piece_field, op->u.test.piece);
if (net_active(tp))
dl_on_ok_piece(op->tp->net, op->u.test.piece);
if (cm_full(tp))
cm_write_done(tp);
} else {
cm->ncontent_bytes -= torrent_piece_size(tp, op->u.test.piece);
bzero(cm->block_field + op->u.test.piece * cm->bppbf, cm->bppbf);
if (net_active(tp))
dl_on_bad_piece(tp->net, op->u.test.piece);
}
break;
case CM_SAVE:
case CM_WRITE:
break;
}
BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
free(op);
if (!BTPDQ_EMPTY(&cm->todoq))
run_todo(cm);
else if (!cm->active)
torrent_on_cm_stopped(tp);
}

void
cm_create(struct torrent *tp)
cm_create(struct torrent *tp, const char *mi)
{
size_t pfield_size = ceil(tp->npieces / 8.0);
struct content *cm = btpd_calloc(1, sizeof(*cm));
cm->bppbf = ceil((double)tp->piece_length / (1 << 17));
cm->piece_field = btpd_calloc(pfield_size, 1);
cm->hold_field = btpd_calloc(pfield_size, 1);
cm->pos_field = btpd_calloc(pfield_size, 1);
cm->block_field = btpd_calloc(tp->npieces * cm->bppbf, 1);

BTPDQ_INIT(&cm->todoq);
evtimer_set(&cm->save_timer, save_timer_cb, tp);

tp->cm = cm;
}

void
cm_start(struct torrent *tp)
{
struct content *cm = tp->cm;

if ((errno = bts_open(&cm->rds, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0)
btpd_err("Error opening stream (%s).\n", strerror(errno));

cm->active = 1;
struct cm_op *op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->type = CM_START;
add_todo(cm, op);
}

int
cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len,
uint8_t **buf)
@@ -331,32 +299,11 @@ cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len,
int err =
bts_get(tp->cm->rds, piece * tp->piece_length + begin, *buf, len);
if (err != 0)
btpd_err("Io error (%s)\n", strerror(err));
btpd_err("IO error on '%s' (%s).\n", bts_filename(tp->cm->rds),
strerror(err));
return 0;
}

static void
cm_post_alloc(struct torrent *tp, uint32_t piece)
{
struct content *cm = tp->cm;
set_bit(cm->hold_field, piece);

struct cm_op *op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->type = CM_ALLOC;
op->u.alloc.piece = piece;
op->u.alloc.pos = piece;
add_todo(cm, op);

op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->type = CM_WRITE;
op->u.write.piece = piece;
op->u.write.pos = piece;
BTPDQ_INIT(&op->u.write.q);
add_todo(cm, op);
}

void
cm_prealloc(struct torrent *tp, uint32_t piece)
{
@@ -364,30 +311,12 @@ cm_prealloc(struct torrent *tp, uint32_t piece)

if (cm_alloc_size <= 0)
set_bit(cm->pos_field, piece);
else {
unsigned npieces = ceil((double)cm_alloc_size / tp->piece_length);
uint32_t start = piece - piece % npieces;
uint32_t end = min(start + npieces, tp->npieces);

while (start < end) {
if ((!has_bit(cm->pos_field, start)
&& !has_bit(cm->hold_field, start)))
cm_post_alloc(tp, start);
start++;
}
}
}

void
cm_test_piece(struct torrent *tp, uint32_t piece)
{
struct content *cm = tp->cm;
struct cm_op *op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->type = CM_TEST;
op->u.test.piece = piece;
op->u.test.pos = piece;
add_todo(cm, op);
pct_create(tp, piece, pct_cb);
}

int
@@ -397,30 +326,33 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
int err;
struct content *cm = tp->cm;

if (has_bit(cm->hold_field, piece)) {
struct cm_write_data *d = btpd_calloc(1, sizeof(*d) + len);
d->begin = begin;
d->len = len;
d->buf = (uint8_t *)(d + 1);
bcopy(buf, d->buf, len);
struct cm_op *op;
BTPDQ_FOREACH(op, &cm->todoq, cm_entry)
if (op->type == CM_WRITE && op->u.write.piece == piece)
break;
struct cm_write_data *it;
BTPDQ_FOREACH(it, &op->u.write.q, entry)
if (it->begin > begin) {
BTPDQ_INSERT_BEFORE(it, d, entry);
break;
if (!has_bit(cm->pos_field, piece)) {
unsigned npieces = ceil((double)cm_alloc_size / tp->piece_length);
uint32_t start = piece - piece % npieces;
uint32_t end = min(start + npieces, tp->npieces);

while (start < end) {
if (!has_bit(cm->pos_field, start)) {
off_t len = torrent_piece_size(tp, start);
off_t off = tp->piece_length * start;
while (len > 0) {
size_t wlen = min(ZEROBUFLEN, len);
if ((err = bts_put(cm->wrs, off, m_zerobuf, wlen)) != 0)
btpd_err("IO error on '%s' (%s).\n",
bts_filename(cm->wrs), strerror(errno));

len -= wlen;
off += wlen;
}
set_bit(cm->pos_field, start);
}
if (it == NULL)
BTPDQ_INSERT_TAIL(&op->u.write.q, d, entry);
} else {
err = bts_put(cm->wrs, piece * tp->piece_length + begin, buf,
len);
if (err != 0)
btpd_err("Io error (%s)\n", strerror(err));
start++;
}
}
err = bts_put(cm->wrs, piece * tp->piece_length + begin, buf, len);
if (err != 0)
btpd_err("IO error on '%s' (%s)\n", bts_filename(cm->wrs),
strerror(err));

cm->ncontent_bytes += len;
uint8_t *bf = cm->block_field + piece * cm->bppbf;
@@ -465,114 +397,6 @@ cm_has_piece(struct torrent *tp, uint32_t piece)
return has_bit(tp->cm->piece_field, piece);
}

static int
test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece)
{
char piece_hash[SHA_DIGEST_LENGTH];
int fd;
int err;

err = vopen(&fd, O_RDONLY, "torrents/%s/torrent", tp->relpath);
if (err != 0)
btpd_err("test_hash: %s\n", strerror(err));

lseek(fd, tp->pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET);
read(fd, piece_hash, SHA_DIGEST_LENGTH);
close(fd);

return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
}

static int
test_piece(struct torrent *tp, uint32_t pos, uint32_t piece, int *ok)
{
int err;
uint8_t hash[SHA_DIGEST_LENGTH];
struct bt_stream *bts;
if ((err = bts_open(&bts, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0)
return err;
if ((err = bts_sha(bts, pos * tp->piece_length,
torrent_piece_size(tp, piece), hash)) != 0)
return err;;
bts_close(bts);
*ok = test_hash(tp, hash, piece) == 0;
return 0;
}

static void
cm_td_alloc(struct cm_op *op)
{
struct torrent *tp = op->tp;
struct content *cm = tp->cm;
uint32_t pos = op->u.alloc.pos;
struct bt_stream *bts;
int err;

assert(!has_bit(cm->pos_field, pos));

if ((err = bts_open(&bts, tp->nfiles, tp->files, fd_cb_wr, tp)) != 0)
goto out;

off_t len = torrent_piece_size(tp, pos);
off_t off = tp->piece_length * pos;
while (len > 0) {
size_t wlen = min(ZEROBUFLEN, len);
if ((err = bts_put(bts, off, m_zerobuf, wlen)) != 0) {
bts_close(bts);
goto out;
}
len -= wlen;
off += wlen;
}
err = bts_close(bts);
out:
if (err != 0)
op->error = 1;
}

static int
test_torrent(struct torrent *tp, volatile sig_atomic_t *cancel)
{
int err;
FILE *fp;
uint8_t (*hashes)[SHA_DIGEST_LENGTH];
uint8_t hash[SHA_DIGEST_LENGTH];

if ((err = vfopen(&fp, "r", "torrents/%s/torrent", tp->relpath)) != 0)
return err;

hashes = btpd_malloc(tp->npieces * SHA_DIGEST_LENGTH);
fseek(fp, tp->pieces_off, SEEK_SET);
fread(hashes, SHA_DIGEST_LENGTH, tp->npieces, fp);
fclose(fp);

struct content *cm = tp->cm;
for (uint32_t piece = 0; piece < tp->npieces; piece++) {
if (!has_bit(cm->pos_field, piece))
continue;
err = bts_sha(cm->rds, piece * tp->piece_length,
torrent_piece_size(tp, piece), hash);
if (err != 0)
break;
if (bcmp(hashes[piece], hash, SHA_DIGEST_LENGTH) == 0)
set_bit(tp->cm->piece_field, piece);
else
clear_bit(tp->cm->piece_field, piece);
if (*cancel) {
err = EINTR;
break;
}
}

free(hashes);
return err;
}

struct rstat {
time_t mtime;
off_t size;
};

int
stat_and_adjust(struct torrent *tp, struct rstat ret[])
{
@@ -655,52 +479,13 @@ save_resume(struct torrent *tp, struct rstat sbs[])
return err;
}

static void
cm_td_save(struct cm_op *op)
{
struct torrent *tp = op->tp;
struct rstat sbs[tp->nfiles];
if (stat_and_adjust(tp, sbs) == 0)
save_resume(tp, sbs);
}
void start_test_cb(struct torrent *tp, uint32_t piece, int ok);

static void
cm_td_start(struct cm_op *op)
void
start_test_end(struct torrent *tp, int unclean)
{
int err, resume_clean = 0, tested_torrent = 0;
struct rstat sbs[op->tp->nfiles];
struct torrent *tp = op->tp;
struct content *cm = tp->cm;

if ((err = stat_and_adjust(op->tp, sbs)) != 0)
goto out;

resume_clean = load_resume(tp, sbs) == 0;
if (!resume_clean) {
memset(cm->pos_field, 0xff, ceil(tp->npieces / 8.0));
off_t off = 0;
for (int i = 0; i < tp->nfiles; i++) {
if (sbs[i].size != tp->files[i].length) {
uint32_t start, end;
end = (off + tp->files[i].length - 1)
/ tp->piece_length;
start = (off + sbs[i].size) / tp->piece_length;
while (start <= end) {
clear_bit(cm->pos_field, start);
clear_bit(cm->piece_field, start);
bzero(cm->block_field + start * cm->bppbf, cm->bppbf);
start++;
}
}
off += tp->files[i].length;
}
if (op->u.start.cancel)
goto out;
if ((err = test_torrent(tp, &op->u.start.cancel)) != 0)
goto out;
tested_torrent = 1;
}

bzero(cm->pos_field, ceil(tp->npieces / 8.0));
for (uint32_t piece = 0; piece < tp->npieces; piece++) {
if (cm_has_piece(tp, piece)) {
@@ -720,113 +505,128 @@ cm_td_start(struct cm_op *op)
}
}
if (nblocks_got == nblocks) {
resume_clean = 0;
int ok = 0;
if (!tested_torrent) {
if (((err = test_piece(tp, piece, piece, &ok)) != 0
|| op->u.start.cancel))
goto out;
}
if (ok) {
set_bit(cm->pos_field, piece);
set_bit(cm->piece_field, piece);
} else {
bzero(bf, cm->bppbf);
cm->ncontent_bytes -= torrent_piece_size(tp, piece);
}
bzero(bf, cm->bppbf);
cm->ncontent_bytes -= torrent_piece_size(tp, piece);
} else if (nblocks_got > 0)
set_bit(cm->pos_field, piece);
}
if (!cm_full(tp)) {
int err;
if ((err = bts_open(&cm->wrs, tp->nfiles, tp->files,
fd_cb_wr, tp)) != 0)
btpd_err("failed to open write stream for '%s' (%s).\n",
torrent_name(tp), strerror(err));
btpd_ev_add(&cm->save_timer, SAVE_INTERVAL);
}
if (unclean) {
struct start_test_data *std = BTPDQ_FIRST(&m_startq);

if (!resume_clean)
save_resume(tp, sbs);
assert(std->tp == tp);
BTPDQ_REMOVE(&m_startq, std, entry);
save_resume(tp, std->rstat);
free(std->rstat);
free(std);

out:
if (!op->u.start.cancel && err != 0)
op->error = 1;
if ((std = BTPDQ_FIRST(&m_startq)) != NULL)
pct_create(std->tp, std->start, start_test_cb);
}
cm->state = CM_ACTIVE;
}

static void
cm_td_test(struct cm_op *op)
void
start_test_cb(struct torrent *tp, uint32_t piece, int ok)
{
if (test_piece(op->tp, op->u.test.pos, op->u.test.piece,
&op->u.test.ok) != 0)
op->error = 1;
struct content *cm = tp->cm;
if (ok)
set_bit(cm->piece_field, piece);
else
clear_bit(cm->piece_field, piece);
piece++;
while (piece < tp->npieces && !has_bit(cm->pos_field, piece))
piece++;
if (piece < tp->npieces)
pct_create(tp, piece, start_test_cb);
else
start_test_end(tp, 1);
}

static void
cm_td_write(struct cm_op *op)
void
start_test(struct torrent *tp, struct rstat *sbs)
{
int err;
struct cm_write_data *d, *next;
off_t base = op->tp->piece_length * op->u.write.pos;
struct bt_stream *bts;
if ((err = bts_open(&bts, op->tp->nfiles, op->tp->files,
fd_cb_wr, op->tp)) != 0)
goto out;
BTPDQ_FOREACH(d, &op->u.write.q, entry)
if ((err = bts_put(bts, base + d->begin, d->buf, d->len)) != 0) {
bts_close(bts);
goto out;
}
err = bts_close(bts);
out:
BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next)
free(d);
if (err)
op->error = 1;
uint32_t piece = 0;
struct content *cm = tp->cm;
while (piece < tp->npieces && !has_bit(cm->pos_field, piece))
piece++;
if (piece < tp->npieces) {
struct start_test_data *std = btpd_calloc(1, sizeof(*std));
std->tp = tp;
std->start = piece;
std->rstat = sbs;
BTPDQ_INSERT_TAIL(&m_startq, std, entry);
if (std == BTPDQ_FIRST(&m_startq))
pct_create(tp, piece, start_test_cb);
} else {
free(sbs);
start_test_end(tp, 0);
}
}

static void
cm_td(void *arg)
{
struct cm_comm *comm = arg;
struct cm_op *op;
for (;;) {
pthread_mutex_lock(&comm->lock);
while (BTPDQ_EMPTY(&comm->q))
pthread_cond_wait(&comm->cond, &comm->lock);

op = BTPDQ_FIRST(&comm->q);
BTPDQ_REMOVE(&comm->q, op, td_entry);
op->received = 1;
pthread_mutex_unlock(&comm->lock);

switch (op->type) {
case CM_ALLOC:
cm_td_alloc(op);
break;
case CM_SAVE:
cm_td_save(op);
break;
case CM_START:
cm_td_start(op);
break;
case CM_TEST:
cm_td_test(op);
break;
case CM_WRITE:
cm_td_write(op);
break;
default:
abort();
void
cm_start(struct torrent *tp)
{
int err, resume_clean = 0;
struct rstat *sbs;
struct content *cm = tp->cm;

if ((errno = bts_open(&cm->rds, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0)
btpd_err("failed to open stream for '%s' (%s).\n",
torrent_name(tp), strerror(errno));

cm->state = CM_STARTING;

sbs = btpd_calloc(tp->nfiles, sizeof(*sbs));

if ((err = stat_and_adjust(tp, sbs)) != 0)
btpd_err("failed stat_and_adjust for '%s' (%s).\n",
torrent_name(tp), strerror(err));

resume_clean = load_resume(tp, sbs) == 0;
if (!resume_clean) {
memset(cm->pos_field, 0xff, ceil(tp->npieces / 8.0));
off_t off = 0;
for (int i = 0; i < tp->nfiles; i++) {
if (sbs[i].size != tp->files[i].length) {
uint32_t start, end;
end = (off + tp->files[i].length - 1)
/ tp->piece_length;
start = (off + sbs[i].size) / tp->piece_length;
while (start <= end) {
clear_bit(cm->pos_field, start);
clear_bit(cm->piece_field, start);
bzero(cm->block_field + start * cm->bppbf, cm->bppbf);
start++;
}
}
off += tp->files[i].length;
}
td_post_begin();
td_post(cm_td_cb, op);
td_post_end();
}
for (uint32_t piece = 0; piece < tp->npieces; piece++) {
if (has_bit(cm->piece_field, piece))
continue;
uint8_t *bf = cm->block_field + cm->bppbf * piece;
uint32_t nblocks = torrent_piece_blocks(tp, piece);
uint32_t block = 0;
while (block < nblocks && has_bit(bf, block))
block++;
if (block == nblocks)
set_bit(cm->pos_field, piece);
}

start_test(tp, sbs);
}

void
cm_init(void)
{
pthread_t td;
BTPDQ_INIT(&m_long_comm.q);
pthread_mutex_init(&m_long_comm.lock, NULL);
pthread_cond_init(&m_long_comm.cond, NULL);
pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_long_comm);
BTPDQ_INIT(&m_short_comm.q);
pthread_mutex_init(&m_short_comm.lock, NULL);
pthread_cond_init(&m_short_comm.cond, NULL);
pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_short_comm);
evtimer_set(&m_workev, worker_cb, NULL);
}

+ 2
- 1
btpd/content.h Переглянути файл

@@ -3,13 +3,14 @@

void cm_init(void);

void cm_create(struct torrent *tp);
void cm_create(struct torrent *tp, const char *mi);
void cm_kill(struct torrent *tp);

void cm_start(struct torrent *tp);
void cm_stop(struct torrent * tp);

int cm_active(struct torrent *tp);
int cm_started(struct torrent *tp);
int cm_full(struct torrent *tp);

off_t cm_content(struct torrent *tp);


+ 0
- 87
btpd/td.c Переглянути файл

@@ -1,87 +0,0 @@
#include <sys/types.h>

#include <pthread.h>
#include <string.h>
#include <unistd.h>

#include "btpd.h"

struct td_cb {
void (*cb)(void *);
void *arg;
BTPDQ_ENTRY(td_cb) entry;
};

BTPDQ_HEAD(td_cb_tq, td_cb);

static int m_td_rd, m_td_wr;
static struct event m_td_ev;
static struct td_cb_tq m_td_cbs = BTPDQ_HEAD_INITIALIZER(m_td_cbs);
static pthread_mutex_t m_td_lock;

void
td_acquire_lock(void)
{
pthread_mutex_lock(&m_td_lock);
}

void
td_release_lock(void)
{
pthread_mutex_unlock(&m_td_lock);
}

void
td_post(void (*fun)(void *), void *arg)
{
struct td_cb *cb = btpd_calloc(1, sizeof(*cb));
cb->cb = fun;
cb->arg = arg;
BTPDQ_INSERT_TAIL(&m_td_cbs, cb, entry);
}

void
td_post_end(void)
{
char c = '1';
td_release_lock();
write(m_td_wr, &c, sizeof(c));
}

static void
td_cb(int fd, short type, void *arg)
{
char buf[1024];
struct td_cb_tq tmpq = BTPDQ_HEAD_INITIALIZER(tmpq);
struct td_cb *cb, *next;

read(fd, buf, sizeof(buf));
td_acquire_lock();
BTPDQ_FOREACH_MUTABLE(cb, &m_td_cbs, entry, next)
BTPDQ_INSERT_TAIL(&tmpq, cb, entry);
BTPDQ_INIT(&m_td_cbs);
td_release_lock();

BTPDQ_FOREACH_MUTABLE(cb, &tmpq, entry, next) {
cb->cb(cb->arg);
free(cb);
}
}

void
td_init(void)
{
int err;
int fds[2];
if (pipe(fds) == -1) {
btpd_err("Couldn't create thread callback pipe (%s).\n",
strerror(errno));
}
m_td_rd = fds[0];
m_td_wr = fds[1];
if ((err = pthread_mutex_init(&m_td_lock, NULL)) != 0)
btpd_err("Couldn't create mutex (%s).\n", strerror(err));

event_set(&m_td_ev, m_td_rd, EV_READ|EV_PERSIST, td_cb, NULL);
btpd_ev_add(&m_td_ev, NULL);
}

+ 1
- 3
btpd/torrent.h Переглянути файл

@@ -47,8 +47,6 @@ uint32_t torrent_block_size(struct torrent *tp, uint32_t piece,
uint32_t nblocks, uint32_t block);
const char *torrent_name(struct torrent *tp);

void torrent_on_cm_stopped(struct torrent *tp);
void torrent_on_cm_started(struct torrent *tp);
void torrent_on_tr_stopped(struct torrent *tp);
void torrent_on_tick_all(void);

#endif

+ 0
- 1
btpd/tracker_req.c Переглянути файл

@@ -133,7 +133,6 @@ tr_set_stopped(struct torrent *tp)
tr->ttype = TIMER_NONE;
if (tr->req != NULL)
tr_cancel(tr);
torrent_on_tr_stopped(tp);
}

void


||||||
x
 
000:0
Завантаження…
Відмінити
Зберегти