diff --git a/btpd/btpd.c b/btpd/btpd.c index 1e10415..1ecbae8 100644 --- a/btpd/btpd.c +++ b/btpd/btpd.c @@ -115,7 +115,7 @@ load_library(void) for (int i = 0; i < ne; i++) { struct torrent *tp; struct dirent *e = entries[i]; - if (torrent_create(&tp, e->d_name) == 0) + if (torrent_load(&tp, e->d_name) == 0) btpd_add_torrent(tp); free(e); } diff --git a/btpd/content.c b/btpd/content.c index 8dbc994..7a3bd96 100644 --- a/btpd/content.c +++ b/btpd/content.c @@ -1,6 +1,10 @@ +#include +#include + #include #include #include +#include #include #include @@ -9,30 +13,36 @@ #include "btpd.h" #include "stream.h" -struct data { +struct cm_write_data { uint32_t begin; uint8_t *buf; size_t len; - BTPDQ_ENTRY(data) entry; + BTPDQ_ENTRY(cm_write_data) entry; }; -BTPDQ_HEAD(data_tq, data); +BTPDQ_HEAD(cm_write_data_tq, cm_write_data); enum cm_op_type { CM_ALLOC, + CM_SAVE, + CM_START, CM_TEST, CM_WRITE }; struct cm_op { struct torrent *tp; - + int error; + int received; enum cm_op_type type; union { struct { uint32_t piece; uint32_t pos; } alloc; + struct { + volatile sig_atomic_t cancel; + } start; struct { uint32_t piece; uint32_t pos; @@ -41,13 +51,10 @@ struct cm_op { struct { uint32_t piece; uint32_t pos; - struct data_tq q; + struct cm_write_data_tq q; } write; } u; - int error; - char *errmsg; - BTPDQ_ENTRY(cm_op) cm_entry; BTPDQ_ENTRY(cm_op) td_entry; }; @@ -55,6 +62,8 @@ struct cm_op { BTPDQ_HEAD(cm_op_tq, cm_op); struct content { + int active; + uint32_t npieces_got; off_t ncontent_bytes; @@ -74,10 +83,14 @@ struct content { #define ZEROBUFLEN (1 << 14) +struct cm_comm { + struct cm_op_tq q; + pthread_mutex_t lock; + pthread_cond_t cond; +}; + +static struct cm_comm m_long_comm, m_short_comm; static const uint8_t m_zerobuf[ZEROBUFLEN]; -static struct cm_op_tq m_tdq = BTPDQ_HEAD_INITIALIZER(m_tdq); -static pthread_mutex_t m_tdq_lock; -static pthread_cond_t m_tdq_cond; static int fd_cb_rd(const char *path, int *fd, void *arg) @@ -94,6 +107,27 @@ fd_cb_wr(const char *path, int *fd, void *arg) path); } +static void +cm_td_post_common(struct cm_comm *comm, struct cm_op *op) +{ + pthread_mutex_lock(&comm->lock); + BTPDQ_INSERT_TAIL(&comm->q, op, td_entry); + pthread_mutex_unlock(&comm->lock); + pthread_cond_signal(&comm->cond); +} + +static void +cm_td_post_long(struct cm_op *op) +{ + cm_td_post_common(&m_long_comm, op); +} + +static void +cm_td_post_short(struct cm_op *op) +{ + cm_td_post_common(&m_short_comm, op); +} + static void run_todo(struct content *cm) { @@ -107,10 +141,65 @@ run_todo(struct content *cm) return; } - pthread_mutex_lock(&m_tdq_lock); - BTPDQ_INSERT_TAIL(&m_tdq, op, td_entry); - pthread_mutex_unlock(&m_tdq_lock); - pthread_cond_signal(&m_tdq_cond); + if (op->type != CM_START) + cm_td_post_short(op); + else + cm_td_post_long(op); +} + +static void +add_todo(struct content *cm, struct cm_op *op) +{ + int was_empty = BTPDQ_EMPTY(&cm->todoq); + BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry); + if (was_empty) + run_todo(cm); +} + +void +cm_destroy(struct torrent *tp) +{ + struct content *cm = tp->cm; + bts_close(cm->rds); + free(cm->piece_field); + free(cm->block_field); + free(cm->hold_field); + free(cm->pos_field); + tp->cm = NULL; + torrent_on_cm_stopped(tp); +} + +void +cm_save(struct torrent *tp) +{ + struct content *cm = tp->cm; + struct cm_op *op = btpd_calloc(1, sizeof(*op)); + op->tp = tp; + op->type = CM_SAVE; + add_todo(cm, op); +} + +void +cm_stop(struct torrent *tp) +{ + struct content *cm = tp->cm; + + struct cm_op *op = BTPDQ_FIRST(&cm->todoq); + if (op != NULL && op->type == CM_START) { + pthread_mutex_lock(&m_long_comm.lock); + if (op->received) + op->u.start.cancel = 1; + else { + BTPDQ_REMOVE(&m_long_comm.q, op, td_entry); + BTPDQ_REMOVE(&cm->todoq, op, cm_entry); + free(op); + } + pthread_mutex_unlock(&m_long_comm.lock); + } else if (cm->npieces_got < tp->meta.npieces) + cm_save(tp); + + if (BTPDQ_EMPTY(&cm->todoq)) + cm_destroy(tp); } static void @@ -121,19 +210,28 @@ cm_td_cb(void *arg) struct content *cm = tp->cm; if (op->error) - btpd_err("%s", op->errmsg); + btpd_err("IO error for %s.\n", tp->relpath); switch (op->type) { case CM_ALLOC: set_bit(cm->pos_field, op->u.alloc.pos); clear_bit(cm->hold_field, op->u.alloc.piece); break; + case CM_START: + if (cm->active) { + assert(!op->u.start.cancel); + torrent_on_cm_started(tp); + } + break; case CM_TEST: if (op->u.test.ok) { + assert(cm->npieces_got < tp->meta.npieces); cm->npieces_got++; set_bit(cm->piece_field, op->u.test.piece); if (tp->net != NULL) dl_on_ok_piece(op->tp->net, op->u.test.piece); + if (cm_full(tp)) + cm_save(tp); } else { cm->ncontent_bytes -= torrent_piece_size(tp, op->u.test.piece); bzero(cm->block_field + op->u.test.piece * cm->bppbf, cm->bppbf); @@ -141,116 +239,16 @@ cm_td_cb(void *arg) dl_on_bad_piece(tp->net, op->u.test.piece); } break; - default: + case CM_SAVE: + case CM_WRITE: break; } BTPDQ_REMOVE(&cm->todoq, op, cm_entry); free(op); if (!BTPDQ_EMPTY(&cm->todoq)) run_todo(cm); -} - -static int -test_hash(struct torrent *tp, uint8_t *hash, uint32_t index) -{ - if (tp->meta.piece_hash != NULL) - return bcmp(hash, tp->meta.piece_hash[index], SHA_DIGEST_LENGTH); - else { - char piece_hash[SHA_DIGEST_LENGTH]; - int fd; - int err; - - err = vopen(&fd, O_RDONLY, "library/%s/torrent", tp->relpath); - if (err != 0) - btpd_err("test_hash: %s\n", strerror(err)); - - lseek(fd, tp->meta.pieces_off + index * SHA_DIGEST_LENGTH, SEEK_SET); - read(fd, piece_hash, SHA_DIGEST_LENGTH); - close(fd); - - return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH); - } -} - -static void -cm_td_alloc(struct cm_op *op) -{ - struct bt_stream *bts; - off_t len = torrent_piece_size(op->tp, op->u.alloc.pos); - off_t off = op->tp->meta.piece_length * op->u.alloc.pos; - bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp); - while (len > 0) { - size_t wlen = min(ZEROBUFLEN, len); - bts_put(bts, off, m_zerobuf, wlen); - len -= wlen; - off += wlen; - } - bts_close(bts); -} - -static void -cm_td_test(struct cm_op *op) -{ - uint8_t hash[SHA_DIGEST_LENGTH]; - struct bt_stream *bts; - bts_open(&bts, &op->tp->meta, fd_cb_rd, op->tp); - bts_sha(bts, op->u.test.pos * op->tp->meta.piece_length, - torrent_piece_size(op->tp, op->u.test.piece), hash); - bts_close(bts); - op->u.test.ok = test_hash(op->tp, hash, op->u.test.piece) == 0; -} - -static void -cm_td_write(struct cm_op *op) -{ - struct data *d, *next; - off_t base = op->tp->meta.piece_length * op->u.write.pos; - struct bt_stream *bts; - bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp); - BTPDQ_FOREACH(d, &op->u.write.q, entry) - bts_put(bts, base + d->begin, d->buf, d->len); - bts_close(bts); - BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next) - free(d); -} - -static void -cm_td(void *arg) -{ - for (;;) { - pthread_mutex_lock(&m_tdq_lock); - while (BTPDQ_EMPTY(&m_tdq)) - pthread_cond_wait(&m_tdq_cond, &m_tdq_lock); - struct cm_op *op = BTPDQ_FIRST(&m_tdq); - BTPDQ_REMOVE(&m_tdq, op, td_entry); - pthread_mutex_unlock(&m_tdq_lock); - - switch (op->type) { - case CM_ALLOC: - cm_td_alloc(op); - break; - case CM_TEST: - cm_td_test(op); - break; - case CM_WRITE: - cm_td_write(op); - break; - default: - abort(); - } - td_post_begin(); - td_post(cm_td_cb, op); - td_post_end(); - } -} - -void -cm_init(void) -{ - pthread_t td; - pthread_mutex_init(&m_tdq_lock, NULL); - pthread_cond_init(&m_tdq_cond, NULL); - pthread_create(&td, NULL, (void *(*)(void *))cm_td, NULL); + else if (!cm->active) + cm_destroy(tp); } int @@ -259,6 +257,7 @@ cm_start(struct torrent *tp) int err; struct content *cm = btpd_calloc(1, sizeof(*cm)); size_t pfield_size = ceil(tp->meta.npieces / 8.0); + cm->active = 1; cm->bppbf = ceil((double)tp->meta.piece_length / (1 << 17)); cm->piece_field = btpd_calloc(pfield_size, 1); cm->hold_field = btpd_calloc(pfield_size, 1); @@ -269,11 +268,12 @@ cm_start(struct torrent *tp) if ((err = bts_open(&cm->rds, &tp->meta, fd_cb_rd, tp)) != 0) btpd_err("Error opening stream (%s).\n", strerror(err)); - if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0) - btpd_err("Error opening stream (%s).\n", strerror(err)); - tp->cm = cm; - torrent_cm_cb(tp, CM_STARTED); + + struct cm_op *op = btpd_calloc(1, sizeof(*op)); + op->tp = tp; + op->type = CM_START; + add_todo(cm, op); return 0; } @@ -289,22 +289,18 @@ cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len, return 0; } -void -cm_prealloc(struct torrent *tp, uint32_t piece) +static void +cm_post_alloc(struct torrent *tp, uint32_t piece) { struct content *cm = tp->cm; - if (has_bit(cm->pos_field, piece)) - return; - set_bit(cm->hold_field, piece); - int was_empty = BTPDQ_EMPTY(&cm->todoq); struct cm_op *op = btpd_calloc(1, sizeof(*op)); op->tp = tp; op->type = CM_ALLOC; op->u.alloc.piece = piece; op->u.alloc.pos = piece; - BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry); + add_todo(cm, op); op = btpd_calloc(1, sizeof(*op)); op->tp = tp; @@ -312,25 +308,40 @@ cm_prealloc(struct torrent *tp, uint32_t piece) op->u.write.piece = piece; op->u.write.pos = piece; BTPDQ_INIT(&op->u.write.q); - BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry); + add_todo(cm, op); +} - if (was_empty) - run_todo(cm); +void +cm_prealloc(struct torrent *tp, uint32_t piece) +{ + struct content *cm = tp->cm; + + if (cm_alloc_size == 0) + set_bit(cm->pos_field, piece); + else { + unsigned npieces = ceil((double)cm_alloc_size / tp->meta.piece_length); + uint32_t start = piece - piece % npieces; + uint32_t end = min(start + npieces, tp->meta.npieces); + + while (start < end) { + if ((!has_bit(cm->pos_field, start) + && !has_bit(cm->hold_field, start))) + cm_post_alloc(tp, start); + start++; + } + } } void cm_test_piece(struct torrent *tp, uint32_t piece) { struct content *cm = tp->cm; - int was_empty = BTPDQ_EMPTY(&cm->todoq); struct cm_op *op = btpd_calloc(1, sizeof(*op)); op->tp = tp; op->type = CM_TEST; op->u.test.piece = piece; op->u.test.pos = piece; - BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry); - if (was_empty) - run_todo(cm); + add_todo(cm, op); } int @@ -341,7 +352,7 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, struct content *cm = tp->cm; if (has_bit(cm->hold_field, piece)) { - struct data *d = btpd_calloc(1, sizeof(*d) + len); + struct cm_write_data *d = btpd_calloc(1, sizeof(*d) + len); d->begin = begin; d->len = len; d->buf = (uint8_t *)(d + 1); @@ -350,7 +361,7 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, BTPDQ_FOREACH(op, &cm->todoq, cm_entry) if (op->type == CM_WRITE && op->u.write.piece == piece) break; - struct data *it; + struct cm_write_data *it; BTPDQ_FOREACH(it, &op->u.write.q, entry) if (it->begin > begin) { BTPDQ_INSERT_BEFORE(it, d, entry); @@ -407,3 +418,383 @@ cm_has_piece(struct torrent *tp, uint32_t piece) { return has_bit(tp->cm->piece_field, piece); } + +static int +test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece) +{ + if (tp->meta.piece_hash != NULL) + return bcmp(hash, tp->meta.piece_hash[piece], SHA_DIGEST_LENGTH); + else { + char piece_hash[SHA_DIGEST_LENGTH]; + int fd; + int err; + + err = vopen(&fd, O_RDONLY, "library/%s/torrent", tp->relpath); + if (err != 0) + btpd_err("test_hash: %s\n", strerror(err)); + + lseek(fd, tp->meta.pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET); + read(fd, piece_hash, SHA_DIGEST_LENGTH); + close(fd); + + return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH); + } +} + +static int +test_piece(struct torrent *tp, uint32_t pos, uint32_t piece, int *ok) +{ + int err; + uint8_t hash[SHA_DIGEST_LENGTH]; + struct bt_stream *bts; + if ((err = bts_open(&bts, &tp->meta, fd_cb_rd, tp)) != 0) + return err; + if ((err = bts_sha(bts, pos * tp->meta.piece_length, + torrent_piece_size(tp, piece), hash)) != 0) + return err;; + bts_close(bts); + *ok = test_hash(tp, hash, piece) == 0; + return 0; +} + +static void +cm_td_alloc(struct cm_op *op) +{ + struct torrent *tp = op->tp; + struct content *cm = tp->cm; + uint32_t pos = op->u.alloc.pos; + struct bt_stream *bts; + int err; + + assert(!has_bit(cm->pos_field, pos)); + + if ((err = bts_open(&bts, &tp->meta, fd_cb_wr, tp)) != 0) + goto out; + + off_t len = torrent_piece_size(tp, pos); + off_t off = tp->meta.piece_length * pos; + while (len > 0) { + size_t wlen = min(ZEROBUFLEN, len); + if ((err = bts_put(bts, off, m_zerobuf, wlen)) != 0) { + bts_close(bts); + goto out; + } + len -= wlen; + off += wlen; + } + err = bts_close(bts); +out: + if (err != 0) + op->error = 1; +} + +static int +test_torrent(struct torrent *tp, volatile sig_atomic_t *cancel) +{ + int err; + FILE *fp; + uint8_t (*hashes)[SHA_DIGEST_LENGTH]; + uint8_t hash[SHA_DIGEST_LENGTH]; + + if ((err = vfopen(&fp, "r", "library/%s/torrent", tp->relpath)) != 0) + return err; + + hashes = btpd_malloc(tp->meta.npieces * SHA_DIGEST_LENGTH); + fseek(fp, tp->meta.pieces_off, SEEK_SET); + fread(hashes, SHA_DIGEST_LENGTH, tp->meta.npieces, fp); + fclose(fp); + + tp->meta.piece_hash = hashes; + + struct content *cm = tp->cm; + for (uint32_t piece = 0; piece < tp->meta.npieces; piece++) { + if (!has_bit(cm->pos_field, piece)) + continue; + err = bts_sha(cm->rds, piece * tp->meta.piece_length, + torrent_piece_size(tp, piece), hash); + if (err != 0) + break; + if (test_hash(tp, hash, piece) == 0) + set_bit(tp->cm->piece_field, piece); + if (*cancel) { + err = EINTR; + break; + } + } + + tp->meta.piece_hash = NULL; + free(hashes); + return err; +} + +int +stat_and_adjust(struct torrent *tp, struct stat ret[]) +{ + char path[PATH_MAX]; + for (int i = 0; i < tp->meta.nfiles; i++) { + snprintf(path, PATH_MAX, "library/%s/content/%s", tp->relpath, + tp->meta.files[i].path); +again: + if (stat(path, &ret[i]) == -1) { + if (errno == ENOENT) { + ret[i].st_mtime = -1; + ret[i].st_size = -1; + } else + return errno; + } + if (ret[i].st_size > tp->meta.files[i].length) { + if (truncate(path, tp->meta.files[i].length) != 0) + return errno; + goto again; + } + } + return 0; +} + +static int +load_resume(struct torrent *tp, struct stat sbs[]) +{ + int err, ver; + FILE *fp; + size_t pfsiz = ceil(tp->meta.npieces / 8.0); + size_t bfsiz = tp->meta.npieces * tp->cm->bppbf; + + if ((err = vfopen(&fp, "r" , "library/%s/resume", tp->relpath)) != 0) + return err; + + if (fscanf(fp, "%d\n", &ver) != 1) + goto invalid; + if (ver != 1) + goto invalid; + for (int i = 0; i < tp->meta.nfiles; i++) { + long long size; + time_t time; + if (fscanf(fp, "%qd %ld\n", &size, &time) != 2) + goto invalid; + if (sbs[i].st_size != size || sbs[i].st_mtime != time) + goto invalid; + } + if (fread(tp->cm->piece_field, 1, pfsiz, fp) != pfsiz) + goto invalid; + if (fread(tp->cm->block_field, 1, bfsiz, fp) != bfsiz) + goto invalid; + fclose(fp); + return 0; +invalid: + fclose(fp); + bzero(tp->cm->piece_field, pfsiz); + bzero(tp->cm->block_field, bfsiz); + return EINVAL; +} + +static int +save_resume(struct torrent *tp) +{ + int err; + FILE *fp; + struct stat sbs[tp->meta.nfiles]; + if ((err = stat_and_adjust(tp, sbs)) != 0) + return err; + if ((err = vfopen(&fp, "wb", "library/%s/resume", tp->relpath)) != 0) + return err; + fprintf(fp, "%d\n", 1); + for (int i = 0; i < tp->meta.nfiles; i++) + fprintf(fp, "%qd %ld\n", (long long)sbs[i].st_size, sbs[i].st_mtime); + fwrite(tp->cm->piece_field, 1, ceil(tp->meta.npieces / 8.0), fp); + fwrite(tp->cm->block_field, 1, tp->meta.npieces * tp->cm->bppbf, fp); + if (fclose(fp) != 0) + err = errno; + return err; +} + +static void +cm_td_save(struct cm_op *op) +{ + int err; + struct torrent *tp = op->tp; + struct content *cm = tp->cm; + struct bt_stream *bts = cm->wrs; + + cm->wrs = NULL; + if ((err = bts_close(bts)) != 0) + goto out; + + for (int i = 0; i < tp->meta.nfiles; i++) { + int lerr; + if ((lerr = vfsync("library/%s/content/%s", tp->relpath, + tp->meta.files[i])) != 0 && lerr != ENOENT) + err = lerr; + } + if (err != 0) + goto out; + save_resume(tp); +out: + if (err != 0) + op->error = 1; +} + +static void +cm_td_start(struct cm_op *op) +{ + int err; + struct stat sbs[op->tp->meta.nfiles]; + struct torrent *tp = op->tp; + struct content *cm = tp->cm; + + if ((err = stat_and_adjust(op->tp, sbs)) != 0) + goto out; + + if (load_resume(tp, sbs) != 0) { + memset(cm->pos_field, 0xff, ceil(tp->meta.npieces / 8.0)); + off_t off = 0; + for (int i = 0; i < tp->meta.nfiles; i++) { + if (sbs[i].st_size == -1 || sbs[i].st_size == 0) { + uint32_t start = off / tp->meta.piece_length; + uint32_t end = (off + tp->meta.files[i].length - 1) / + tp->meta.piece_length; + while (start <= end) { + clear_bit(cm->pos_field, start); + start++; + } + } else if (sbs[i].st_size < tp->meta.files[i].length) { + uint32_t start = (off + sbs[i].st_size) / + tp->meta.piece_length; + uint32_t end = (off + tp->meta.files[i].length - 1) / + tp->meta.piece_length; + while (start <= end) { + clear_bit(cm->pos_field, start); + start++; + } + } + off += tp->meta.files[i].length; + } + if (op->u.start.cancel) + goto out; + if ((err = test_torrent(tp, &op->u.start.cancel)) != 0) + goto out; + save_resume(tp); + } + + bzero(cm->pos_field, ceil(tp->meta.npieces / 8.0)); + for (uint32_t piece = 0; piece < tp->meta.npieces; piece++) { + if (cm_has_piece(tp, piece)) { + cm->ncontent_bytes += torrent_piece_size(tp, piece); + cm->npieces_got++; + set_bit(cm->pos_field, piece); + continue; + } + uint8_t *bf = cm->block_field + cm->bppbf * piece; + uint32_t nblocks = torrent_piece_blocks(tp, piece); + uint32_t nblocks_got = 0; + for (uint32_t i = 0; i < nblocks; i++) { + if (has_bit(bf, i)) { + nblocks_got++; + cm->ncontent_bytes += + torrent_block_size(tp, piece, nblocks, i); + } + } + if (nblocks_got == nblocks) { + int ok; + if (((err = test_piece(tp, piece, piece, &ok)) != 0 + || op->u.start.cancel)) + goto out; + if (ok) { + set_bit(cm->pos_field, piece); + set_bit(cm->piece_field, piece); + } else + bzero(bf, cm->bppbf); + } else if (nblocks_got > 0) + set_bit(cm->pos_field, piece); + } + + if (cm->npieces_got < tp->meta.npieces) + if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0) + goto out; +out: + if (!op->u.start.cancel && err != 0) + op->error = 1; +} + +static void +cm_td_test(struct cm_op *op) +{ + if (test_piece(op->tp, op->u.test.pos, op->u.test.piece, + &op->u.test.ok) != 0) + op->error = 1; +} + +static void +cm_td_write(struct cm_op *op) +{ + int err; + struct cm_write_data *d, *next; + off_t base = op->tp->meta.piece_length * op->u.write.pos; + struct bt_stream *bts; + if ((err = bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp)) != 0) + goto out; + BTPDQ_FOREACH(d, &op->u.write.q, entry) + if ((err = bts_put(bts, base + d->begin, d->buf, d->len)) != 0) { + bts_close(bts); + goto out; + } + err = bts_close(bts); +out: + BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next) + free(d); + if (err) + op->error = 1; +} + +static void +cm_td(void *arg) +{ + struct cm_comm *comm = arg; + struct cm_op *op; + for (;;) { + pthread_mutex_lock(&comm->lock); + while (BTPDQ_EMPTY(&comm->q)) + pthread_cond_wait(&comm->cond, &comm->lock); + + op = BTPDQ_FIRST(&comm->q); + BTPDQ_REMOVE(&comm->q, op, td_entry); + op->received = 1; + pthread_mutex_unlock(&comm->lock); + + switch (op->type) { + case CM_ALLOC: + cm_td_alloc(op); + break; + case CM_SAVE: + cm_td_save(op); + break; + case CM_START: + cm_td_start(op); + break; + case CM_TEST: + cm_td_test(op); + break; + case CM_WRITE: + cm_td_write(op); + break; + default: + abort(); + } + td_post_begin(); + td_post(cm_td_cb, op); + td_post_end(); + } +} + +void +cm_init(void) +{ + pthread_t td; + BTPDQ_INIT(&m_long_comm.q); + pthread_mutex_init(&m_long_comm.lock, NULL); + pthread_cond_init(&m_long_comm.cond, NULL); + pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_long_comm); + BTPDQ_INIT(&m_short_comm.q); + pthread_mutex_init(&m_short_comm.lock, NULL); + pthread_cond_init(&m_short_comm.cond, NULL); + pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_short_comm); +} diff --git a/btpd/download_subr.c b/btpd/download_subr.c index 9164e72..df2aaaf 100644 --- a/btpd/download_subr.c +++ b/btpd/download_subr.c @@ -37,9 +37,8 @@ piece_alloc(struct net *n, uint32_t index) struct piece *pc; size_t mem, field, blocks; unsigned nblocks; - off_t piece_length = torrent_piece_size(n->tp, index); - nblocks = (unsigned)ceil((double)piece_length / PIECE_BLOCKLEN); + nblocks = torrent_piece_blocks(n->tp, index); blocks = sizeof(pc->blocks[0]) * nblocks; field = (size_t)ceil(nblocks / 8.0); mem = sizeof(*pc) + field + blocks; @@ -63,7 +62,7 @@ piece_alloc(struct net *n, uint32_t index) pc->blocks = (struct block *)(pc->down_field + field); for (unsigned i = 0; i < nblocks; i++) { uint32_t start = i * PIECE_BLOCKLEN; - uint32_t len = torrent_block_size(pc, i); + uint32_t len = torrent_block_size(n->tp, index, nblocks, i); struct block *blk = &pc->blocks[i]; blk->pc = pc; BTPDQ_INIT(&blk->reqs); diff --git a/btpd/torrent.c b/btpd/torrent.c index 909b18b..c362ad5 100644 --- a/btpd/torrent.c +++ b/btpd/torrent.c @@ -31,13 +31,20 @@ torrent_piece_size(struct torrent *tp, uint32_t index) } uint32_t -torrent_block_size(struct piece *pc, uint32_t index) +torrent_piece_blocks(struct torrent *tp, uint32_t piece) { - if (index < pc->nblocks - 1) + return ceil(torrent_piece_size(tp, piece) / (double)PIECE_BLOCKLEN); +} + +uint32_t +torrent_block_size(struct torrent *tp, uint32_t piece, uint32_t nblocks, + uint32_t block) +{ + if (block < nblocks - 1) return PIECE_BLOCKLEN; else { - uint32_t allbutlast = PIECE_BLOCKLEN * (pc->nblocks - 1); - return torrent_piece_size(pc->n->tp, pc->index) - allbutlast; + uint32_t allbutlast = PIECE_BLOCKLEN * (nblocks - 1); + return torrent_piece_size(tp, piece) - allbutlast; } } @@ -52,11 +59,14 @@ torrent_activate(struct torrent *tp) void torrent_deactivate(struct torrent *tp) { - + tp->state = T_STOPPING; + tr_stop(tp); + net_del_torrent(tp); + cm_stop(tp); } int -torrent_create(struct torrent **res, const char *path) +torrent_load(struct torrent **res, const char *path) { struct metainfo *mi; int error; @@ -88,16 +98,17 @@ torrent_create(struct torrent **res, const char *path) return error; } -void torrent_cm_cb(struct torrent *tp, enum cm_state state) +void +torrent_on_cm_started(struct torrent *tp) { - switch (state) { - case CM_STARTED: - net_add_torrent(tp); - tr_start(tp); - break; - case CM_STOPPED: - break; - case CM_ERROR: - break; - } + net_add_torrent(tp); + tr_start(tp); + tp->state = T_ACTIVE; +} + +void +torrent_on_cm_stopped(struct torrent *tp) +{ + assert(tp->state == T_STOPPING); + tp->state = T_INACTIVE; } diff --git a/btpd/torrent.h b/btpd/torrent.h index 6ce07a4..73a26c7 100644 --- a/btpd/torrent.h +++ b/btpd/torrent.h @@ -25,19 +25,16 @@ struct torrent { BTPDQ_HEAD(torrent_tq, torrent); -int torrent_create(struct torrent **res, const char *path); +int torrent_load(struct torrent **res, const char *path); void torrent_activate(struct torrent *tp); void torrent_deactivate(struct torrent *tp); -off_t torrent_piece_size(struct torrent *tp, uint32_t index); -uint32_t torrent_block_size(struct piece *pc, uint32_t index); +off_t torrent_piece_size(struct torrent *tp, uint32_t piece); +uint32_t torrent_piece_blocks(struct torrent *tp, uint32_t piece); +uint32_t torrent_block_size(struct torrent *tp, uint32_t piece, + uint32_t nblocks, uint32_t block); -enum cm_state { - CM_STARTED, - CM_STOPPED, - CM_ERROR -}; - -void torrent_cm_cb(struct torrent *tp, enum cm_state state); +void torrent_on_cm_stopped(struct torrent *tp); +void torrent_on_cm_started(struct torrent *tp); #endif