@@ -79,6 +79,8 @@ struct content {
struct bt_stream *rds;
struct bt_stream *wrs;
struct event save_timer;
};
#define ZEROBUFLEN (1 << 14)
@@ -179,6 +181,20 @@ cm_save(struct torrent *tp)
add_todo(cm, op);
}
static void
cm_write_done(struct torrent *tp)
{
int err;
struct content *cm = tp->cm;
if ((err = bts_close(cm->wrs)) != 0)
btpd_err("Error closing write stream for %s (%s).\n", tp->relpath,
strerror(err));
cm->wrs = NULL;
event_del(&cm->save_timer);
cm_save(tp);
}
void
cm_stop(struct torrent *tp)
{
@@ -195,16 +211,27 @@ cm_stop(struct torrent *tp)
free(op);
}
pthread_mutex_unlock(&m_long_comm.lock);
} else if (cm->npieces_got < tp->meta.npieces )
cm_sav e(tp);
} else if (!cm_full(tp) )
cm_write_don e(tp);
if (BTPDQ_EMPTY(&cm->todoq))
cm_destroy(tp);
}
#define SAVE_INTERVAL (& (struct timeval) { 15, 0 })
static void
save_timer_cb(int fd, short type, void *arg)
{
struct torrent *tp = arg;
event_add(&tp->cm->save_timer, SAVE_INTERVAL);
cm_save(tp);
}
static void
cm_td_cb(void *arg)
{
int err;
struct cm_op *op = arg;
struct torrent *tp = op->tp;
struct content *cm = tp->cm;
@@ -220,6 +247,12 @@ cm_td_cb(void *arg)
case CM_START:
if (cm->active) {
assert(!op->u.start.cancel);
if (!cm_full(tp)) {
if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0)
btpd_err("Couldn't open write stream for %s (%s).\n",
tp->relpath, strerror(err));
event_add(&cm->save_timer, SAVE_INTERVAL);
}
torrent_on_cm_started(tp);
}
break;
@@ -231,7 +264,7 @@ cm_td_cb(void *arg)
if (tp->net != NULL)
dl_on_ok_piece(op->tp->net, op->u.test.piece);
if (cm_full(tp))
cm_sav e(tp);
cm_write_don e(tp);
} else {
cm->ncontent_bytes -= torrent_piece_size(tp, op->u.test.piece);
bzero(cm->block_field + op->u.test.piece * cm->bppbf, cm->bppbf);
@@ -270,6 +303,8 @@ cm_start(struct torrent *tp)
btpd_err("Error opening stream (%s).\n", strerror(err));
tp->cm = cm;
evtimer_set(&cm->save_timer, save_timer_cb, tp);
struct cm_op *op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->type = CM_START;
@@ -572,14 +607,14 @@ load_resume(struct torrent *tp, struct stat sbs[])
if (fscanf(fp, "%qd %ld\n", &size, &time) != 2)
goto invalid;
if (sbs[i].st_size != size || sbs[i].st_mtime != time)
goto invalid ;
err = EINVAL ;
}
if (fread(tp->cm->piece_field, 1, pfsiz, fp) != pfsiz)
goto invalid;
if (fread(tp->cm->block_field, 1, bfsiz, fp) != bfsiz)
goto invalid;
fclose(fp);
return 0 ;
return err ;
invalid:
fclose(fp);
bzero(tp->cm->piece_field, pfsiz);
@@ -610,33 +645,13 @@ save_resume(struct torrent *tp)
static void
cm_td_save(struct cm_op *op)
{
int err;
struct torrent *tp = op->tp;
struct content *cm = tp->cm;
struct bt_stream *bts = cm->wrs;
cm->wrs = NULL;
if ((err = bts_close(bts)) != 0)
goto out;
for (int i = 0; i < tp->meta.nfiles; i++) {
int lerr;
if ((lerr = vfsync("library/%s/content/%s", tp->relpath,
tp->meta.files[i])) != 0 && lerr != ENOENT)
err = lerr;
}
if (err != 0)
goto out;
save_resume(tp);
out:
if (err != 0)
op->error = 1;
save_resume(op->tp);
}
static void
cm_td_start(struct cm_op *op)
{
int err;
int err, resume_clean = 0, tested_torrent = 0;
struct stat sbs[op->tp->meta.nfiles];
struct torrent *tp = op->tp;
struct content *cm = tp->cm;
@@ -644,7 +659,8 @@ cm_td_start(struct cm_op *op)
if ((err = stat_and_adjust(op->tp, sbs)) != 0)
goto out;
if (load_resume(tp, sbs) != 0) {
resume_clean = load_resume(tp, sbs) == 0;
if (!resume_clean) {
memset(cm->pos_field, 0xff, ceil(tp->meta.npieces / 8.0));
off_t off = 0;
for (int i = 0; i < tp->meta.nfiles; i++) {
@@ -654,6 +670,8 @@ cm_td_start(struct cm_op *op)
tp->meta.piece_length;
while (start <= end) {
clear_bit(cm->pos_field, start);
clear_bit(cm->piece_field, start);
bzero(cm->block_field + start * cm->bppbf, cm->bppbf);
start++;
}
} else if (sbs[i].st_size < tp->meta.files[i].length) {
@@ -672,7 +690,7 @@ cm_td_start(struct cm_op *op)
goto out;
if ((err = test_torrent(tp, &op->u.start.cancel)) != 0)
goto out;
save_resume(tp) ;
tested_torrent = 1 ;
}
bzero(cm->pos_field, ceil(tp->meta.npieces / 8.0));
@@ -694,10 +712,13 @@ cm_td_start(struct cm_op *op)
}
}
if (nblocks_got == nblocks) {
int ok;
if (((err = test_piece(tp, piece, piece, &ok)) != 0
|| op->u.start.cancel))
goto out;
resume_clean = 0;
int ok = 0;
if (!tested_torrent) {
if (((err = test_piece(tp, piece, piece, &ok)) != 0
|| op->u.start.cancel))
goto out;
}
if (ok) {
set_bit(cm->pos_field, piece);
set_bit(cm->piece_field, piece);
@@ -707,9 +728,9 @@ cm_td_start(struct cm_op *op)
set_bit(cm->pos_field, piece);
}
if (cm->npieces_got < tp->meta.npieces )
if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0)
goto out;
if (!resume_clean )
save_resume(tp);
out:
if (!op->u.start.cancel && err != 0)
op->error = 1;