diff --git a/btpd/btpd.c b/btpd/btpd.c index 76303d2..01f34c9 100644 --- a/btpd/btpd.c +++ b/btpd/btpd.c @@ -40,8 +40,9 @@ btpd_shutdown(int grace_seconds) if (tp->state != T_STOPPING) torrent_stop(tp); if (grace_seconds >= 0) { - event_once(-1, EV_TIMEOUT, grace_cb, NULL, - (& (struct timeval) { grace_seconds, 0 })); + if (event_once(-1, EV_TIMEOUT, grace_cb, NULL, + (& (struct timeval) { grace_seconds, 0 })) != 0) + btpd_err("failed to add event (%s).\n", strerror(errno)); } } } @@ -57,13 +58,6 @@ btpd_get_peer_id(void) return m_peer_id; } -void -btpd_on_no_torrents(void) -{ - if (m_shutdown) - btpd_exit(0); -} - static void signal_cb(int signal, short type, void *arg) { @@ -77,9 +71,11 @@ heartbeat_cb(int fd, short type, void *arg) btpd_ev_add(&m_heartbeat, (& (struct timeval) { 1, 0 })); btpd_seconds++; net_on_tick(); + torrent_on_tick_all(); + if (m_shutdown && torrent_count() == 0) + btpd_exit(0); } -void td_init(void); void tr_init(void); void ipc_init(void); @@ -93,7 +89,6 @@ btpd_init(void) for (int i = sizeof(BTPD_VERSION); i < 20; i++) m_peer_id[i] = rand_between(0, 255); - td_init(); net_init(); ipc_init(); ul_init(); diff --git a/btpd/btpd.h b/btpd/btpd.h index cc8d94d..6651b49 100644 --- a/btpd/btpd.h +++ b/btpd/btpd.h @@ -67,13 +67,4 @@ int btpd_is_stopping(void); const uint8_t *btpd_get_peer_id(void); -void td_acquire_lock(void); -void td_release_lock(void); - -#define td_post_begin td_acquire_lock -void td_post(void (*fun)(void *), void *arg); -void td_post_end(void); - -void btpd_on_no_torrents(void); - #endif diff --git a/btpd/content.c b/btpd/content.c index bfe6779..5fef38f 100644 --- a/btpd/content.c +++ b/btpd/content.c @@ -3,8 +3,6 @@ #include <fcntl.h> #include <math.h> -#include <pthread.h> -#include <signal.h> #include <stdio.h> #include <string.h> #include <unistd.h> @@ -14,56 +12,13 @@ #include "btpd.h" #include "stream.h" -struct cm_write_data { - uint32_t begin; - uint8_t *buf; - size_t len; - BTPDQ_ENTRY(cm_write_data) entry; -}; - -BTPDQ_HEAD(cm_write_data_tq, cm_write_data); - -enum cm_op_type { - CM_ALLOC, - CM_SAVE, - CM_START, - CM_TEST, - CM_WRITE -}; - -struct cm_op { - struct torrent *tp; - int error; - int received; - enum cm_op_type type; - union { - struct { - uint32_t piece; - uint32_t pos; - } alloc; - struct { - volatile sig_atomic_t cancel; - } start; - struct { - uint32_t piece; - uint32_t pos; - int ok; - } test; - struct { - uint32_t piece; - uint32_t pos; - struct cm_write_data_tq q; - } write; - } u; - - BTPDQ_ENTRY(cm_op) cm_entry; - BTPDQ_ENTRY(cm_op) td_entry; +struct rstat { + time_t mtime; + off_t size; }; -BTPDQ_HEAD(cm_op_tq, cm_op); - struct content { - int active; + enum { CM_INACTIVE, CM_STARTING, CM_ACTIVE } state; uint32_t npieces_got; @@ -73,11 +28,8 @@ struct content { uint8_t *piece_field; uint8_t *block_field; - uint8_t *hold_field; uint8_t *pos_field; - struct cm_op_tq todoq; - struct bt_stream *rds; struct bt_stream *wrs; @@ -86,15 +38,11 @@ struct content { #define ZEROBUFLEN (1 << 14) -struct cm_comm { - struct cm_op_tq q; - pthread_mutex_t lock; - pthread_cond_t cond; -}; - -static struct cm_comm m_long_comm, m_short_comm; static const uint8_t m_zerobuf[ZEROBUFLEN]; +int stat_and_adjust(struct torrent *tp, struct rstat ret[]); +static int save_resume(struct torrent *tp, struct rstat sbs[]); + static int fd_cb_rd(const char *path, int *fd, void *arg) { @@ -109,53 +57,153 @@ fd_cb_wr(const char *path, int *fd, void *arg) return vopen(fd, O_RDWR, "%s/%s", tp->tl->dir, path); } -static void -cm_td_post_common(struct cm_comm *comm, struct cm_op *op) +struct pct_data { + off_t off, remain; + struct torrent *tp; + SHA_CTX sha; + BTPDQ_ENTRY(pct_data) entry; + uint32_t piece; + void (*cb)(struct torrent *, uint32_t, int); +}; + +BTPDQ_HEAD(pct_tq, pct_data); + +static struct pct_tq m_pctq = BTPDQ_HEAD_INITIALIZER(m_pctq); +static void cm_write_done(struct torrent *tp); + +struct start_test_data { + struct torrent *tp; + struct rstat *rstat; + uint32_t start; + BTPDQ_ENTRY(start_test_data) entry; +}; + +BTPDQ_HEAD(std_tq, start_test_data); + +static struct std_tq m_startq = BTPDQ_HEAD_INITIALIZER(m_startq); + +static struct event m_workev; + +#define READBUFLEN (1 << 14) + +static int +test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece) { - pthread_mutex_lock(&comm->lock); - BTPDQ_INSERT_TAIL(&comm->q, op, td_entry); - pthread_mutex_unlock(&comm->lock); - pthread_cond_signal(&comm->cond); + char piece_hash[SHA_DIGEST_LENGTH]; + int fd; + int err; + + err = vopen(&fd, O_RDONLY, "torrents/%s/torrent", tp->relpath); + if (err != 0) + btpd_err("failed to open 'torrents/%s/torrent' (%s).\n", + tp->relpath, strerror(err)); + + lseek(fd, tp->pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET); + read(fd, piece_hash, SHA_DIGEST_LENGTH); + close(fd); + + return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH); } -static void -cm_td_post_long(struct cm_op *op) +void +pct_create(struct torrent *tp, uint32_t piece, + void (*cb)(struct torrent *, uint32_t, int)) { - cm_td_post_common(&m_long_comm, op); + struct pct_data *p = btpd_calloc(1, sizeof(*p)); + p->piece = piece; + p->tp = tp; + p->off = piece * tp->piece_length; + p->remain = torrent_piece_size(tp, piece); + SHA1_Init(&p->sha); + p->cb = cb; + BTPDQ_INSERT_TAIL(&m_pctq, p, entry); + btpd_ev_add(&m_workev, (& (struct timeval) { 0, 0 })); } -static void -cm_td_post_short(struct cm_op *op) +void +pct_kill(struct pct_data *p) { - cm_td_post_common(&m_short_comm, op); + BTPDQ_REMOVE(&m_pctq, p, entry); + free(p); } -static void -run_todo(struct content *cm) -{ - struct cm_op *op = BTPDQ_FIRST(&cm->todoq); +void +pct_run(struct pct_data *p) +{ + char buf[READBUFLEN]; + size_t unit = (10 << 14); + + while (p->remain > 0 && unit > 0) { + size_t wantread = min(p->remain, sizeof(buf)); + if (wantread > unit) + wantread = unit; + if ((errno = bts_get(p->tp->cm->rds, p->off, buf, wantread)) != 0) + btpd_err("IO error on '%s' (%s).\n", bts_filename(p->tp->cm->rds), + strerror(errno)); + p->remain -= wantread; + unit -= wantread; + p->off += wantread; + SHA1_Update(&p->sha, buf, wantread); + } + if (p->remain == 0) { + uint8_t hash[SHA_DIGEST_LENGTH]; + SHA1_Final(hash, &p->sha); + p->cb(p->tp, p->piece, test_hash(p->tp, hash, p->piece) == 0); + pct_kill(p); + } +} - if (op->type == CM_WRITE && BTPDQ_EMPTY(&op->u.write.q)) { - BTPDQ_REMOVE(&cm->todoq, op, cm_entry); - free(op); - if (!BTPDQ_EMPTY(&cm->todoq)) - run_todo(cm); - return; +void +pct_cb(struct torrent *tp, uint32_t piece, int ok) +{ + struct content *cm = tp->cm; + if (ok) { + assert(cm->npieces_got < tp->npieces); + cm->npieces_got++; + set_bit(cm->piece_field, piece); + if (net_active(tp)) + dl_on_ok_piece(tp->net, piece); + if (cm_full(tp)) + cm_write_done(tp); + } else { + cm->ncontent_bytes -= torrent_piece_size(tp, piece); + bzero(cm->block_field + piece * cm->bppbf, cm->bppbf); + if (net_active(tp)) + dl_on_bad_piece(tp->net, piece); } +} - if (op->type != CM_START) - cm_td_post_short(op); - else - cm_td_post_long(op); +void +work_stop(struct torrent *tp) +{ + struct content *cm = tp->cm; + struct pct_data *pct, *next; + if (cm->state == CM_STARTING) { + struct start_test_data *std; + BTPDQ_FOREACH(std, &m_startq, entry) + if (std->tp == tp) { + BTPDQ_REMOVE(&m_startq, std, entry); + free(std->rstat); + free(std); + break; + } + } + BTPDQ_FOREACH_MUTABLE(pct, &m_pctq, entry, next) + if (pct->tp == tp) + pct_kill(pct); } -static void -add_todo(struct content *cm, struct cm_op *op) +static int test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece); + +void +worker_cb(int fd, short type, void *arg) { - int was_empty = BTPDQ_EMPTY(&cm->todoq); - BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry); - if (was_empty) - run_todo(cm); + struct pct_data *p = BTPDQ_FIRST(&m_pctq); + if (p == NULL) + return; + pct_run(p); + if (!BTPDQ_EMPTY(&m_pctq)) + event_add(&m_workev, (& (struct timeval) { 0, 0 })); } void @@ -165,7 +213,6 @@ cm_kill(struct torrent *tp) bts_close(cm->rds); free(cm->piece_field); free(cm->block_field); - free(cm->hold_field); free(cm->pos_field); free(cm); tp->cm = NULL; @@ -174,11 +221,9 @@ cm_kill(struct torrent *tp) void cm_save(struct torrent *tp) { - struct content *cm = tp->cm; - struct cm_op *op = btpd_calloc(1, sizeof(*op)); - op->tp = tp; - op->type = CM_SAVE; - add_todo(cm, op); + struct rstat sbs[tp->nfiles]; + if (stat_and_adjust(tp, sbs) == 0) + save_resume(tp, sbs); } static void @@ -187,7 +232,7 @@ cm_write_done(struct torrent *tp) struct content *cm = tp->cm; if ((errno = bts_close(cm->wrs)) != 0) - btpd_err("Error closing write stream for '%s' (%s).\n", + btpd_err("error closing write stream for '%s' (%s).\n", torrent_name(tp), strerror(errno)); cm->wrs = NULL; btpd_ev_del(&cm->save_timer); @@ -198,30 +243,27 @@ void cm_stop(struct torrent *tp) { struct content *cm = tp->cm; - cm->active = 0; - struct cm_op *op = BTPDQ_FIRST(&cm->todoq); - if (op != NULL && op->type == CM_START) { - pthread_mutex_lock(&m_long_comm.lock); - if (op->received) - op->u.start.cancel = 1; - else { - BTPDQ_REMOVE(&m_long_comm.q, op, td_entry); - BTPDQ_REMOVE(&cm->todoq, op, cm_entry); - free(op); - } - pthread_mutex_unlock(&m_long_comm.lock); - } else if (!cm_full(tp)) + + if (cm->state == CM_ACTIVE && !cm_full(tp)) cm_write_done(tp); - if (BTPDQ_EMPTY(&cm->todoq)) - torrent_on_cm_stopped(tp); + work_stop(tp); + + cm->state = CM_INACTIVE; } int cm_active(struct torrent *tp) { struct content *cm = tp->cm; - return cm->active || !BTPDQ_EMPTY(&cm->todoq); + return cm->state != CM_INACTIVE; +} + +int +cm_started(struct torrent *tp) +{ + struct content *cm = tp->cm; + return cm->state == CM_ACTIVE; } #define SAVE_INTERVAL (& (struct timeval) { 15, 0 }) @@ -234,95 +276,21 @@ save_timer_cb(int fd, short type, void *arg) cm_save(tp); } -static void -cm_td_cb(void *arg) -{ - int err; - struct cm_op *op = arg; - struct torrent *tp = op->tp; - struct content *cm = tp->cm; - - if (op->error) - btpd_err("IO error for '%s'.\n", torrent_name(tp)); - - switch (op->type) { - case CM_ALLOC: - set_bit(cm->pos_field, op->u.alloc.pos); - clear_bit(cm->hold_field, op->u.alloc.piece); - break; - case CM_START: - if (cm->active) { - assert(!op->u.start.cancel); - if (!cm_full(tp)) { - if ((err = bts_open(&cm->wrs, tp->nfiles, tp->files, - fd_cb_wr, tp)) != 0) - btpd_err("Couldn't open write stream for '%s' (%s).\n", - torrent_name(tp), strerror(err)); - btpd_ev_add(&cm->save_timer, SAVE_INTERVAL); - } - torrent_on_cm_started(tp); - } - break; - case CM_TEST: - if (op->u.test.ok) { - assert(cm->npieces_got < tp->npieces); - cm->npieces_got++; - set_bit(cm->piece_field, op->u.test.piece); - if (net_active(tp)) - dl_on_ok_piece(op->tp->net, op->u.test.piece); - if (cm_full(tp)) - cm_write_done(tp); - } else { - cm->ncontent_bytes -= torrent_piece_size(tp, op->u.test.piece); - bzero(cm->block_field + op->u.test.piece * cm->bppbf, cm->bppbf); - if (net_active(tp)) - dl_on_bad_piece(tp->net, op->u.test.piece); - } - break; - case CM_SAVE: - case CM_WRITE: - break; - } - BTPDQ_REMOVE(&cm->todoq, op, cm_entry); - free(op); - if (!BTPDQ_EMPTY(&cm->todoq)) - run_todo(cm); - else if (!cm->active) - torrent_on_cm_stopped(tp); -} - void -cm_create(struct torrent *tp) +cm_create(struct torrent *tp, const char *mi) { size_t pfield_size = ceil(tp->npieces / 8.0); struct content *cm = btpd_calloc(1, sizeof(*cm)); cm->bppbf = ceil((double)tp->piece_length / (1 << 17)); cm->piece_field = btpd_calloc(pfield_size, 1); - cm->hold_field = btpd_calloc(pfield_size, 1); cm->pos_field = btpd_calloc(pfield_size, 1); cm->block_field = btpd_calloc(tp->npieces * cm->bppbf, 1); - BTPDQ_INIT(&cm->todoq); evtimer_set(&cm->save_timer, save_timer_cb, tp); tp->cm = cm; } -void -cm_start(struct torrent *tp) -{ - struct content *cm = tp->cm; - - if ((errno = bts_open(&cm->rds, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0) - btpd_err("Error opening stream (%s).\n", strerror(errno)); - - cm->active = 1; - struct cm_op *op = btpd_calloc(1, sizeof(*op)); - op->tp = tp; - op->type = CM_START; - add_todo(cm, op); -} - int cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len, uint8_t **buf) @@ -331,32 +299,11 @@ cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len, int err = bts_get(tp->cm->rds, piece * tp->piece_length + begin, *buf, len); if (err != 0) - btpd_err("Io error (%s)\n", strerror(err)); + btpd_err("IO error on '%s' (%s).\n", bts_filename(tp->cm->rds), + strerror(err)); return 0; } -static void -cm_post_alloc(struct torrent *tp, uint32_t piece) -{ - struct content *cm = tp->cm; - set_bit(cm->hold_field, piece); - - struct cm_op *op = btpd_calloc(1, sizeof(*op)); - op->tp = tp; - op->type = CM_ALLOC; - op->u.alloc.piece = piece; - op->u.alloc.pos = piece; - add_todo(cm, op); - - op = btpd_calloc(1, sizeof(*op)); - op->tp = tp; - op->type = CM_WRITE; - op->u.write.piece = piece; - op->u.write.pos = piece; - BTPDQ_INIT(&op->u.write.q); - add_todo(cm, op); -} - void cm_prealloc(struct torrent *tp, uint32_t piece) { @@ -364,30 +311,12 @@ cm_prealloc(struct torrent *tp, uint32_t piece) if (cm_alloc_size <= 0) set_bit(cm->pos_field, piece); - else { - unsigned npieces = ceil((double)cm_alloc_size / tp->piece_length); - uint32_t start = piece - piece % npieces; - uint32_t end = min(start + npieces, tp->npieces); - - while (start < end) { - if ((!has_bit(cm->pos_field, start) - && !has_bit(cm->hold_field, start))) - cm_post_alloc(tp, start); - start++; - } - } } void cm_test_piece(struct torrent *tp, uint32_t piece) { - struct content *cm = tp->cm; - struct cm_op *op = btpd_calloc(1, sizeof(*op)); - op->tp = tp; - op->type = CM_TEST; - op->u.test.piece = piece; - op->u.test.pos = piece; - add_todo(cm, op); + pct_create(tp, piece, pct_cb); } int @@ -397,30 +326,33 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, int err; struct content *cm = tp->cm; - if (has_bit(cm->hold_field, piece)) { - struct cm_write_data *d = btpd_calloc(1, sizeof(*d) + len); - d->begin = begin; - d->len = len; - d->buf = (uint8_t *)(d + 1); - bcopy(buf, d->buf, len); - struct cm_op *op; - BTPDQ_FOREACH(op, &cm->todoq, cm_entry) - if (op->type == CM_WRITE && op->u.write.piece == piece) - break; - struct cm_write_data *it; - BTPDQ_FOREACH(it, &op->u.write.q, entry) - if (it->begin > begin) { - BTPDQ_INSERT_BEFORE(it, d, entry); - break; + if (!has_bit(cm->pos_field, piece)) { + unsigned npieces = ceil((double)cm_alloc_size / tp->piece_length); + uint32_t start = piece - piece % npieces; + uint32_t end = min(start + npieces, tp->npieces); + + while (start < end) { + if (!has_bit(cm->pos_field, start)) { + off_t len = torrent_piece_size(tp, start); + off_t off = tp->piece_length * start; + while (len > 0) { + size_t wlen = min(ZEROBUFLEN, len); + if ((err = bts_put(cm->wrs, off, m_zerobuf, wlen)) != 0) + btpd_err("IO error on '%s' (%s).\n", + bts_filename(cm->wrs), strerror(errno)); + + len -= wlen; + off += wlen; + } + set_bit(cm->pos_field, start); } - if (it == NULL) - BTPDQ_INSERT_TAIL(&op->u.write.q, d, entry); - } else { - err = bts_put(cm->wrs, piece * tp->piece_length + begin, buf, - len); - if (err != 0) - btpd_err("Io error (%s)\n", strerror(err)); + start++; + } } + err = bts_put(cm->wrs, piece * tp->piece_length + begin, buf, len); + if (err != 0) + btpd_err("IO error on '%s' (%s)\n", bts_filename(cm->wrs), + strerror(err)); cm->ncontent_bytes += len; uint8_t *bf = cm->block_field + piece * cm->bppbf; @@ -465,114 +397,6 @@ cm_has_piece(struct torrent *tp, uint32_t piece) return has_bit(tp->cm->piece_field, piece); } -static int -test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece) -{ - char piece_hash[SHA_DIGEST_LENGTH]; - int fd; - int err; - - err = vopen(&fd, O_RDONLY, "torrents/%s/torrent", tp->relpath); - if (err != 0) - btpd_err("test_hash: %s\n", strerror(err)); - - lseek(fd, tp->pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET); - read(fd, piece_hash, SHA_DIGEST_LENGTH); - close(fd); - - return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH); -} - -static int -test_piece(struct torrent *tp, uint32_t pos, uint32_t piece, int *ok) -{ - int err; - uint8_t hash[SHA_DIGEST_LENGTH]; - struct bt_stream *bts; - if ((err = bts_open(&bts, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0) - return err; - if ((err = bts_sha(bts, pos * tp->piece_length, - torrent_piece_size(tp, piece), hash)) != 0) - return err;; - bts_close(bts); - *ok = test_hash(tp, hash, piece) == 0; - return 0; -} - -static void -cm_td_alloc(struct cm_op *op) -{ - struct torrent *tp = op->tp; - struct content *cm = tp->cm; - uint32_t pos = op->u.alloc.pos; - struct bt_stream *bts; - int err; - - assert(!has_bit(cm->pos_field, pos)); - - if ((err = bts_open(&bts, tp->nfiles, tp->files, fd_cb_wr, tp)) != 0) - goto out; - - off_t len = torrent_piece_size(tp, pos); - off_t off = tp->piece_length * pos; - while (len > 0) { - size_t wlen = min(ZEROBUFLEN, len); - if ((err = bts_put(bts, off, m_zerobuf, wlen)) != 0) { - bts_close(bts); - goto out; - } - len -= wlen; - off += wlen; - } - err = bts_close(bts); -out: - if (err != 0) - op->error = 1; -} - -static int -test_torrent(struct torrent *tp, volatile sig_atomic_t *cancel) -{ - int err; - FILE *fp; - uint8_t (*hashes)[SHA_DIGEST_LENGTH]; - uint8_t hash[SHA_DIGEST_LENGTH]; - - if ((err = vfopen(&fp, "r", "torrents/%s/torrent", tp->relpath)) != 0) - return err; - - hashes = btpd_malloc(tp->npieces * SHA_DIGEST_LENGTH); - fseek(fp, tp->pieces_off, SEEK_SET); - fread(hashes, SHA_DIGEST_LENGTH, tp->npieces, fp); - fclose(fp); - - struct content *cm = tp->cm; - for (uint32_t piece = 0; piece < tp->npieces; piece++) { - if (!has_bit(cm->pos_field, piece)) - continue; - err = bts_sha(cm->rds, piece * tp->piece_length, - torrent_piece_size(tp, piece), hash); - if (err != 0) - break; - if (bcmp(hashes[piece], hash, SHA_DIGEST_LENGTH) == 0) - set_bit(tp->cm->piece_field, piece); - else - clear_bit(tp->cm->piece_field, piece); - if (*cancel) { - err = EINTR; - break; - } - } - - free(hashes); - return err; -} - -struct rstat { - time_t mtime; - off_t size; -}; - int stat_and_adjust(struct torrent *tp, struct rstat ret[]) { @@ -655,52 +479,13 @@ save_resume(struct torrent *tp, struct rstat sbs[]) return err; } -static void -cm_td_save(struct cm_op *op) -{ - struct torrent *tp = op->tp; - struct rstat sbs[tp->nfiles]; - if (stat_and_adjust(tp, sbs) == 0) - save_resume(tp, sbs); -} +void start_test_cb(struct torrent *tp, uint32_t piece, int ok); -static void -cm_td_start(struct cm_op *op) +void +start_test_end(struct torrent *tp, int unclean) { - int err, resume_clean = 0, tested_torrent = 0; - struct rstat sbs[op->tp->nfiles]; - struct torrent *tp = op->tp; struct content *cm = tp->cm; - if ((err = stat_and_adjust(op->tp, sbs)) != 0) - goto out; - - resume_clean = load_resume(tp, sbs) == 0; - if (!resume_clean) { - memset(cm->pos_field, 0xff, ceil(tp->npieces / 8.0)); - off_t off = 0; - for (int i = 0; i < tp->nfiles; i++) { - if (sbs[i].size != tp->files[i].length) { - uint32_t start, end; - end = (off + tp->files[i].length - 1) - / tp->piece_length; - start = (off + sbs[i].size) / tp->piece_length; - while (start <= end) { - clear_bit(cm->pos_field, start); - clear_bit(cm->piece_field, start); - bzero(cm->block_field + start * cm->bppbf, cm->bppbf); - start++; - } - } - off += tp->files[i].length; - } - if (op->u.start.cancel) - goto out; - if ((err = test_torrent(tp, &op->u.start.cancel)) != 0) - goto out; - tested_torrent = 1; - } - bzero(cm->pos_field, ceil(tp->npieces / 8.0)); for (uint32_t piece = 0; piece < tp->npieces; piece++) { if (cm_has_piece(tp, piece)) { @@ -720,113 +505,128 @@ cm_td_start(struct cm_op *op) } } if (nblocks_got == nblocks) { - resume_clean = 0; - int ok = 0; - if (!tested_torrent) { - if (((err = test_piece(tp, piece, piece, &ok)) != 0 - || op->u.start.cancel)) - goto out; - } - if (ok) { - set_bit(cm->pos_field, piece); - set_bit(cm->piece_field, piece); - } else { - bzero(bf, cm->bppbf); - cm->ncontent_bytes -= torrent_piece_size(tp, piece); - } + bzero(bf, cm->bppbf); + cm->ncontent_bytes -= torrent_piece_size(tp, piece); } else if (nblocks_got > 0) set_bit(cm->pos_field, piece); } + if (!cm_full(tp)) { + int err; + if ((err = bts_open(&cm->wrs, tp->nfiles, tp->files, + fd_cb_wr, tp)) != 0) + btpd_err("failed to open write stream for '%s' (%s).\n", + torrent_name(tp), strerror(err)); + btpd_ev_add(&cm->save_timer, SAVE_INTERVAL); + } + if (unclean) { + struct start_test_data *std = BTPDQ_FIRST(&m_startq); - if (!resume_clean) - save_resume(tp, sbs); + assert(std->tp == tp); + BTPDQ_REMOVE(&m_startq, std, entry); + save_resume(tp, std->rstat); + free(std->rstat); + free(std); -out: - if (!op->u.start.cancel && err != 0) - op->error = 1; + if ((std = BTPDQ_FIRST(&m_startq)) != NULL) + pct_create(std->tp, std->start, start_test_cb); + } + cm->state = CM_ACTIVE; } -static void -cm_td_test(struct cm_op *op) +void +start_test_cb(struct torrent *tp, uint32_t piece, int ok) { - if (test_piece(op->tp, op->u.test.pos, op->u.test.piece, - &op->u.test.ok) != 0) - op->error = 1; + struct content *cm = tp->cm; + if (ok) + set_bit(cm->piece_field, piece); + else + clear_bit(cm->piece_field, piece); + piece++; + while (piece < tp->npieces && !has_bit(cm->pos_field, piece)) + piece++; + if (piece < tp->npieces) + pct_create(tp, piece, start_test_cb); + else + start_test_end(tp, 1); } -static void -cm_td_write(struct cm_op *op) +void +start_test(struct torrent *tp, struct rstat *sbs) { - int err; - struct cm_write_data *d, *next; - off_t base = op->tp->piece_length * op->u.write.pos; - struct bt_stream *bts; - if ((err = bts_open(&bts, op->tp->nfiles, op->tp->files, - fd_cb_wr, op->tp)) != 0) - goto out; - BTPDQ_FOREACH(d, &op->u.write.q, entry) - if ((err = bts_put(bts, base + d->begin, d->buf, d->len)) != 0) { - bts_close(bts); - goto out; - } - err = bts_close(bts); -out: - BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next) - free(d); - if (err) - op->error = 1; + uint32_t piece = 0; + struct content *cm = tp->cm; + while (piece < tp->npieces && !has_bit(cm->pos_field, piece)) + piece++; + if (piece < tp->npieces) { + struct start_test_data *std = btpd_calloc(1, sizeof(*std)); + std->tp = tp; + std->start = piece; + std->rstat = sbs; + BTPDQ_INSERT_TAIL(&m_startq, std, entry); + if (std == BTPDQ_FIRST(&m_startq)) + pct_create(tp, piece, start_test_cb); + } else { + free(sbs); + start_test_end(tp, 0); + } } -static void -cm_td(void *arg) -{ - struct cm_comm *comm = arg; - struct cm_op *op; - for (;;) { - pthread_mutex_lock(&comm->lock); - while (BTPDQ_EMPTY(&comm->q)) - pthread_cond_wait(&comm->cond, &comm->lock); - - op = BTPDQ_FIRST(&comm->q); - BTPDQ_REMOVE(&comm->q, op, td_entry); - op->received = 1; - pthread_mutex_unlock(&comm->lock); - - switch (op->type) { - case CM_ALLOC: - cm_td_alloc(op); - break; - case CM_SAVE: - cm_td_save(op); - break; - case CM_START: - cm_td_start(op); - break; - case CM_TEST: - cm_td_test(op); - break; - case CM_WRITE: - cm_td_write(op); - break; - default: - abort(); +void +cm_start(struct torrent *tp) +{ + int err, resume_clean = 0; + struct rstat *sbs; + struct content *cm = tp->cm; + + if ((errno = bts_open(&cm->rds, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0) + btpd_err("failed to open stream for '%s' (%s).\n", + torrent_name(tp), strerror(errno)); + + cm->state = CM_STARTING; + + sbs = btpd_calloc(tp->nfiles, sizeof(*sbs)); + + if ((err = stat_and_adjust(tp, sbs)) != 0) + btpd_err("failed stat_and_adjust for '%s' (%s).\n", + torrent_name(tp), strerror(err)); + + resume_clean = load_resume(tp, sbs) == 0; + if (!resume_clean) { + memset(cm->pos_field, 0xff, ceil(tp->npieces / 8.0)); + off_t off = 0; + for (int i = 0; i < tp->nfiles; i++) { + if (sbs[i].size != tp->files[i].length) { + uint32_t start, end; + end = (off + tp->files[i].length - 1) + / tp->piece_length; + start = (off + sbs[i].size) / tp->piece_length; + while (start <= end) { + clear_bit(cm->pos_field, start); + clear_bit(cm->piece_field, start); + bzero(cm->block_field + start * cm->bppbf, cm->bppbf); + start++; + } + } + off += tp->files[i].length; } - td_post_begin(); - td_post(cm_td_cb, op); - td_post_end(); } + for (uint32_t piece = 0; piece < tp->npieces; piece++) { + if (has_bit(cm->piece_field, piece)) + continue; + uint8_t *bf = cm->block_field + cm->bppbf * piece; + uint32_t nblocks = torrent_piece_blocks(tp, piece); + uint32_t block = 0; + while (block < nblocks && has_bit(bf, block)) + block++; + if (block == nblocks) + set_bit(cm->pos_field, piece); + } + + start_test(tp, sbs); } void cm_init(void) { - pthread_t td; - BTPDQ_INIT(&m_long_comm.q); - pthread_mutex_init(&m_long_comm.lock, NULL); - pthread_cond_init(&m_long_comm.cond, NULL); - pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_long_comm); - BTPDQ_INIT(&m_short_comm.q); - pthread_mutex_init(&m_short_comm.lock, NULL); - pthread_cond_init(&m_short_comm.cond, NULL); - pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_short_comm); + evtimer_set(&m_workev, worker_cb, NULL); } diff --git a/btpd/content.h b/btpd/content.h index 9c92dfe..610b353 100644 --- a/btpd/content.h +++ b/btpd/content.h @@ -3,13 +3,14 @@ void cm_init(void); -void cm_create(struct torrent *tp); +void cm_create(struct torrent *tp, const char *mi); void cm_kill(struct torrent *tp); void cm_start(struct torrent *tp); void cm_stop(struct torrent * tp); int cm_active(struct torrent *tp); +int cm_started(struct torrent *tp); int cm_full(struct torrent *tp); off_t cm_content(struct torrent *tp); diff --git a/btpd/td.c b/btpd/td.c deleted file mode 100644 index f9d10f5..0000000 --- a/btpd/td.c +++ /dev/null @@ -1,87 +0,0 @@ -#include <sys/types.h> - -#include <pthread.h> -#include <string.h> -#include <unistd.h> - -#include "btpd.h" - -struct td_cb { - void (*cb)(void *); - void *arg; - BTPDQ_ENTRY(td_cb) entry; -}; - -BTPDQ_HEAD(td_cb_tq, td_cb); - -static int m_td_rd, m_td_wr; -static struct event m_td_ev; -static struct td_cb_tq m_td_cbs = BTPDQ_HEAD_INITIALIZER(m_td_cbs); -static pthread_mutex_t m_td_lock; - -void -td_acquire_lock(void) -{ - pthread_mutex_lock(&m_td_lock); -} - -void -td_release_lock(void) -{ - pthread_mutex_unlock(&m_td_lock); -} - -void -td_post(void (*fun)(void *), void *arg) -{ - struct td_cb *cb = btpd_calloc(1, sizeof(*cb)); - cb->cb = fun; - cb->arg = arg; - BTPDQ_INSERT_TAIL(&m_td_cbs, cb, entry); -} - -void -td_post_end(void) -{ - char c = '1'; - td_release_lock(); - write(m_td_wr, &c, sizeof(c)); -} - -static void -td_cb(int fd, short type, void *arg) -{ - char buf[1024]; - struct td_cb_tq tmpq = BTPDQ_HEAD_INITIALIZER(tmpq); - struct td_cb *cb, *next; - - read(fd, buf, sizeof(buf)); - td_acquire_lock(); - BTPDQ_FOREACH_MUTABLE(cb, &m_td_cbs, entry, next) - BTPDQ_INSERT_TAIL(&tmpq, cb, entry); - BTPDQ_INIT(&m_td_cbs); - td_release_lock(); - - BTPDQ_FOREACH_MUTABLE(cb, &tmpq, entry, next) { - cb->cb(cb->arg); - free(cb); - } -} - -void -td_init(void) -{ - int err; - int fds[2]; - if (pipe(fds) == -1) { - btpd_err("Couldn't create thread callback pipe (%s).\n", - strerror(errno)); - } - m_td_rd = fds[0]; - m_td_wr = fds[1]; - if ((err = pthread_mutex_init(&m_td_lock, NULL)) != 0) - btpd_err("Couldn't create mutex (%s).\n", strerror(err)); - - event_set(&m_td_ev, m_td_rd, EV_READ|EV_PERSIST, td_cb, NULL); - btpd_ev_add(&m_td_ev, NULL); -} diff --git a/btpd/torrent.h b/btpd/torrent.h index 9fa3792..f35cdff 100644 --- a/btpd/torrent.h +++ b/btpd/torrent.h @@ -47,8 +47,6 @@ uint32_t torrent_block_size(struct torrent *tp, uint32_t piece, uint32_t nblocks, uint32_t block); const char *torrent_name(struct torrent *tp); -void torrent_on_cm_stopped(struct torrent *tp); -void torrent_on_cm_started(struct torrent *tp); -void torrent_on_tr_stopped(struct torrent *tp); +void torrent_on_tick_all(void); #endif diff --git a/btpd/tracker_req.c b/btpd/tracker_req.c index add1462..72f1ebd 100644 --- a/btpd/tracker_req.c +++ b/btpd/tracker_req.c @@ -133,7 +133,6 @@ tr_set_stopped(struct torrent *tp) tr->ttype = TIMER_NONE; if (tr->req != NULL) tr_cancel(tr); - torrent_on_tr_stopped(tp); } void