From 95e83bb2473003dc3d1413a2203ae32a2834318a Mon Sep 17 00:00:00 2001
From: Richard Nyberg <rnyberg@murmeldjur.se>
Date: Fri, 3 Nov 2006 08:59:48 +0000
Subject: [PATCH] Make the content code unthreaded and remove the no longer
 needed inter thread messaging code. Also simplify some interfaces by polling
 for state changes when appropriate instead of being called directly at any
 time.

---
 btpd/btpd.c        |  17 +-
 btpd/btpd.h        |   9 -
 btpd/content.c     | 784 +++++++++++++++++----------------------------
 btpd/content.h     |   3 +-
 btpd/td.c          |  87 -----
 btpd/torrent.h     |   4 +-
 btpd/tracker_req.c |   1 -
 7 files changed, 301 insertions(+), 604 deletions(-)
 delete mode 100644 btpd/td.c

diff --git a/btpd/btpd.c b/btpd/btpd.c
index 76303d2..01f34c9 100644
--- a/btpd/btpd.c
+++ b/btpd/btpd.c
@@ -40,8 +40,9 @@ btpd_shutdown(int grace_seconds)
             if (tp->state != T_STOPPING)
                 torrent_stop(tp);
         if (grace_seconds >= 0) {
-            event_once(-1, EV_TIMEOUT, grace_cb, NULL,
-                (& (struct timeval) { grace_seconds, 0 }));
+            if (event_once(-1, EV_TIMEOUT, grace_cb, NULL,
+                    (& (struct timeval) { grace_seconds, 0 })) != 0)
+                btpd_err("failed to add event (%s).\n", strerror(errno));
         }
     }
 }
@@ -57,13 +58,6 @@ btpd_get_peer_id(void)
     return m_peer_id;
 }
 
-void
-btpd_on_no_torrents(void)
-{
-    if (m_shutdown)
-        btpd_exit(0);
-}
-
 static void
 signal_cb(int signal, short type, void *arg)
 {
@@ -77,9 +71,11 @@ heartbeat_cb(int fd, short type, void *arg)
     btpd_ev_add(&m_heartbeat, (& (struct timeval) { 1, 0 }));
     btpd_seconds++;
     net_on_tick();
+    torrent_on_tick_all();
+    if (m_shutdown && torrent_count() == 0)
+        btpd_exit(0);
 }
 
-void td_init(void);
 void tr_init(void);
 void ipc_init(void);
 
@@ -93,7 +89,6 @@ btpd_init(void)
     for (int i = sizeof(BTPD_VERSION); i < 20; i++)
         m_peer_id[i] = rand_between(0, 255);
 
-    td_init();
     net_init();
     ipc_init();
     ul_init();
diff --git a/btpd/btpd.h b/btpd/btpd.h
index cc8d94d..6651b49 100644
--- a/btpd/btpd.h
+++ b/btpd/btpd.h
@@ -67,13 +67,4 @@ int btpd_is_stopping(void);
 
 const uint8_t *btpd_get_peer_id(void);
 
-void td_acquire_lock(void);
-void td_release_lock(void);
-
-#define td_post_begin td_acquire_lock
-void td_post(void (*fun)(void *), void *arg);
-void td_post_end(void);
-
-void btpd_on_no_torrents(void);
-
 #endif
diff --git a/btpd/content.c b/btpd/content.c
index bfe6779..5fef38f 100644
--- a/btpd/content.c
+++ b/btpd/content.c
@@ -3,8 +3,6 @@
 
 #include <fcntl.h>
 #include <math.h>
-#include <pthread.h>
-#include <signal.h>
 #include <stdio.h>
 #include <string.h>
 #include <unistd.h>
@@ -14,56 +12,13 @@
 #include "btpd.h"
 #include "stream.h"
 
-struct cm_write_data {
-    uint32_t begin;
-    uint8_t *buf;
-    size_t len;
-    BTPDQ_ENTRY(cm_write_data) entry;
-};
-
-BTPDQ_HEAD(cm_write_data_tq, cm_write_data);
-
-enum cm_op_type {
-    CM_ALLOC,
-    CM_SAVE,
-    CM_START,
-    CM_TEST,
-    CM_WRITE
-};
-
-struct cm_op {
-    struct torrent *tp;
-    int error;
-    int received;
-    enum cm_op_type type;
-    union {
-        struct {
-            uint32_t piece;
-            uint32_t pos;
-        } alloc;
-        struct {
-            volatile sig_atomic_t cancel;
-        } start;
-        struct {
-            uint32_t piece;
-            uint32_t pos;
-            int ok;
-        } test;
-        struct {
-            uint32_t piece;
-            uint32_t pos;
-            struct cm_write_data_tq q;
-        } write;
-    } u;
-
-    BTPDQ_ENTRY(cm_op) cm_entry;
-    BTPDQ_ENTRY(cm_op) td_entry;
+struct rstat {
+    time_t mtime;
+    off_t size;
 };
 
-BTPDQ_HEAD(cm_op_tq, cm_op);
-
 struct content {
-    int active;
+    enum { CM_INACTIVE, CM_STARTING, CM_ACTIVE } state;
 
     uint32_t npieces_got;
 
@@ -73,11 +28,8 @@ struct content {
 
     uint8_t *piece_field;
     uint8_t *block_field;
-    uint8_t *hold_field;
     uint8_t *pos_field;
 
-    struct cm_op_tq todoq;
-
     struct bt_stream *rds;
     struct bt_stream *wrs;
 
@@ -86,15 +38,11 @@ struct content {
 
 #define ZEROBUFLEN (1 << 14)
 
-struct cm_comm {
-    struct cm_op_tq q;
-    pthread_mutex_t lock;
-    pthread_cond_t cond;
-};
-
-static struct cm_comm m_long_comm, m_short_comm;
 static const uint8_t m_zerobuf[ZEROBUFLEN];
 
+int stat_and_adjust(struct torrent *tp, struct rstat ret[]);
+static int save_resume(struct torrent *tp, struct rstat sbs[]);
+
 static int
 fd_cb_rd(const char *path, int *fd, void *arg)
 {
@@ -109,53 +57,153 @@ fd_cb_wr(const char *path, int *fd, void *arg)
     return vopen(fd, O_RDWR, "%s/%s", tp->tl->dir, path);
 }
 
-static void
-cm_td_post_common(struct cm_comm *comm, struct cm_op *op)
+struct pct_data {
+    off_t off, remain;
+    struct torrent *tp;
+    SHA_CTX sha;
+    BTPDQ_ENTRY(pct_data) entry;
+    uint32_t piece;
+    void (*cb)(struct torrent *, uint32_t, int);
+};
+
+BTPDQ_HEAD(pct_tq, pct_data);
+
+static struct pct_tq m_pctq = BTPDQ_HEAD_INITIALIZER(m_pctq);
+static void cm_write_done(struct torrent *tp);
+
+struct start_test_data {
+    struct torrent *tp;
+    struct rstat *rstat;
+    uint32_t start;
+    BTPDQ_ENTRY(start_test_data) entry;
+};
+
+BTPDQ_HEAD(std_tq, start_test_data);
+
+static struct std_tq m_startq = BTPDQ_HEAD_INITIALIZER(m_startq);
+
+static struct event m_workev;
+
+#define READBUFLEN (1 << 14)
+
+static int
+test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece)
 {
-    pthread_mutex_lock(&comm->lock);
-    BTPDQ_INSERT_TAIL(&comm->q, op, td_entry);
-    pthread_mutex_unlock(&comm->lock);
-    pthread_cond_signal(&comm->cond);
+    char piece_hash[SHA_DIGEST_LENGTH];
+    int fd;
+    int err;
+
+    err = vopen(&fd, O_RDONLY, "torrents/%s/torrent", tp->relpath);
+    if (err != 0)
+        btpd_err("failed to open 'torrents/%s/torrent' (%s).\n",
+            tp->relpath, strerror(err));
+
+    lseek(fd, tp->pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET);
+    read(fd, piece_hash, SHA_DIGEST_LENGTH);
+    close(fd);
+
+    return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
 }
 
-static void
-cm_td_post_long(struct cm_op *op)
+void
+pct_create(struct torrent *tp, uint32_t piece,
+    void (*cb)(struct torrent *, uint32_t, int))
 {
-    cm_td_post_common(&m_long_comm, op);
+    struct pct_data *p = btpd_calloc(1, sizeof(*p));
+    p->piece = piece;
+    p->tp = tp;
+    p->off = piece * tp->piece_length;
+    p->remain = torrent_piece_size(tp, piece);
+    SHA1_Init(&p->sha);
+    p->cb = cb;
+    BTPDQ_INSERT_TAIL(&m_pctq, p, entry);
+    btpd_ev_add(&m_workev, (& (struct timeval) { 0, 0 }));
 }
 
-static void
-cm_td_post_short(struct cm_op *op)
+void
+pct_kill(struct pct_data *p)
 {
-    cm_td_post_common(&m_short_comm, op);
+    BTPDQ_REMOVE(&m_pctq, p, entry);
+    free(p);
 }
 
-static void
-run_todo(struct content *cm)
-{
-    struct cm_op *op = BTPDQ_FIRST(&cm->todoq);
+void
+pct_run(struct pct_data *p)
+{
+    char buf[READBUFLEN];
+    size_t unit = (10 << 14);
+
+    while (p->remain > 0 && unit > 0) {
+        size_t wantread = min(p->remain, sizeof(buf));
+        if (wantread > unit)
+            wantread = unit;
+        if ((errno = bts_get(p->tp->cm->rds, p->off, buf, wantread)) != 0)
+            btpd_err("IO error on '%s' (%s).\n", bts_filename(p->tp->cm->rds),
+                strerror(errno));
+        p->remain -= wantread;
+        unit -= wantread;
+        p->off += wantread;
+        SHA1_Update(&p->sha, buf, wantread);
+    }
+    if (p->remain == 0) {
+        uint8_t hash[SHA_DIGEST_LENGTH];
+        SHA1_Final(hash, &p->sha);
+        p->cb(p->tp, p->piece, test_hash(p->tp, hash, p->piece) == 0);
+        pct_kill(p);
+    }
+}
 
-    if (op->type == CM_WRITE && BTPDQ_EMPTY(&op->u.write.q)) {
-        BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
-        free(op);
-        if (!BTPDQ_EMPTY(&cm->todoq))
-            run_todo(cm);
-        return;
+void
+pct_cb(struct torrent *tp, uint32_t piece, int ok)
+{
+    struct content *cm = tp->cm;
+    if (ok) {
+        assert(cm->npieces_got < tp->npieces);
+        cm->npieces_got++;
+        set_bit(cm->piece_field, piece);
+        if (net_active(tp))
+            dl_on_ok_piece(tp->net, piece);
+        if (cm_full(tp))
+            cm_write_done(tp);
+    } else {
+        cm->ncontent_bytes -= torrent_piece_size(tp, piece);
+        bzero(cm->block_field + piece * cm->bppbf, cm->bppbf);
+        if (net_active(tp))
+            dl_on_bad_piece(tp->net, piece);
     }
+}
 
-    if (op->type != CM_START)
-        cm_td_post_short(op);
-    else
-        cm_td_post_long(op);
+void
+work_stop(struct torrent *tp)
+{
+    struct content *cm = tp->cm;
+    struct pct_data *pct, *next;
+    if (cm->state == CM_STARTING) {
+        struct start_test_data *std;
+        BTPDQ_FOREACH(std, &m_startq, entry)
+            if (std->tp == tp) {
+                BTPDQ_REMOVE(&m_startq, std, entry);
+                free(std->rstat);
+                free(std);
+                break;
+            }
+    }
+    BTPDQ_FOREACH_MUTABLE(pct, &m_pctq, entry, next)
+        if (pct->tp == tp)
+            pct_kill(pct);
 }
 
-static void
-add_todo(struct content *cm, struct cm_op *op)
+static int test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece);
+
+void
+worker_cb(int fd, short type, void *arg)
 {
-    int was_empty = BTPDQ_EMPTY(&cm->todoq);
-    BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
-    if (was_empty)
-        run_todo(cm);
+    struct pct_data *p = BTPDQ_FIRST(&m_pctq);
+    if (p == NULL)
+        return;
+    pct_run(p);
+    if (!BTPDQ_EMPTY(&m_pctq))
+        event_add(&m_workev, (& (struct timeval) { 0, 0 }));
 }
 
 void
@@ -165,7 +213,6 @@ cm_kill(struct torrent *tp)
     bts_close(cm->rds);
     free(cm->piece_field);
     free(cm->block_field);
-    free(cm->hold_field);
     free(cm->pos_field);
     free(cm);
     tp->cm = NULL;
@@ -174,11 +221,9 @@ cm_kill(struct torrent *tp)
 void
 cm_save(struct torrent *tp)
 {
-    struct content *cm = tp->cm;
-    struct cm_op *op = btpd_calloc(1, sizeof(*op));
-    op->tp = tp;
-    op->type = CM_SAVE;
-    add_todo(cm, op);
+    struct rstat sbs[tp->nfiles];
+    if (stat_and_adjust(tp, sbs) == 0)
+        save_resume(tp, sbs);
 }
 
 static void
@@ -187,7 +232,7 @@ cm_write_done(struct torrent *tp)
     struct content *cm = tp->cm;
 
     if ((errno = bts_close(cm->wrs)) != 0)
-        btpd_err("Error closing write stream for '%s' (%s).\n",
+        btpd_err("error closing write stream for '%s' (%s).\n",
             torrent_name(tp), strerror(errno));
     cm->wrs = NULL;
     btpd_ev_del(&cm->save_timer);
@@ -198,30 +243,27 @@ void
 cm_stop(struct torrent *tp)
 {
     struct content *cm = tp->cm;
-    cm->active = 0;
-    struct cm_op *op = BTPDQ_FIRST(&cm->todoq);
-    if (op != NULL && op->type == CM_START) {
-        pthread_mutex_lock(&m_long_comm.lock);
-        if (op->received)
-            op->u.start.cancel = 1;
-        else {
-            BTPDQ_REMOVE(&m_long_comm.q, op, td_entry);
-            BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
-            free(op);
-        }
-        pthread_mutex_unlock(&m_long_comm.lock);
-    } else if (!cm_full(tp))
+
+    if (cm->state == CM_ACTIVE && !cm_full(tp))
         cm_write_done(tp);
 
-    if (BTPDQ_EMPTY(&cm->todoq))
-        torrent_on_cm_stopped(tp);
+    work_stop(tp);
+
+    cm->state = CM_INACTIVE;
 }
 
 int
 cm_active(struct torrent *tp)
 {
     struct content *cm = tp->cm;
-    return cm->active || !BTPDQ_EMPTY(&cm->todoq);
+    return cm->state != CM_INACTIVE;
+}
+
+int
+cm_started(struct torrent *tp)
+{
+    struct content *cm = tp->cm;
+    return cm->state == CM_ACTIVE;
 }
 
 #define SAVE_INTERVAL (& (struct timeval) { 15, 0 })
@@ -234,95 +276,21 @@ save_timer_cb(int fd, short type, void *arg)
     cm_save(tp);
 }
 
-static void
-cm_td_cb(void *arg)
-{
-    int err;
-    struct cm_op *op = arg;
-    struct torrent *tp = op->tp;
-    struct content *cm = tp->cm;
-
-    if (op->error)
-        btpd_err("IO error for '%s'.\n", torrent_name(tp));
-
-    switch (op->type) {
-    case CM_ALLOC:
-        set_bit(cm->pos_field, op->u.alloc.pos);
-        clear_bit(cm->hold_field, op->u.alloc.piece);
-        break;
-    case CM_START:
-        if (cm->active) {
-            assert(!op->u.start.cancel);
-            if (!cm_full(tp)) {
-                if ((err = bts_open(&cm->wrs, tp->nfiles, tp->files,
-                        fd_cb_wr, tp)) != 0)
-                    btpd_err("Couldn't open write stream for '%s' (%s).\n",
-                        torrent_name(tp), strerror(err));
-                btpd_ev_add(&cm->save_timer, SAVE_INTERVAL);
-            }
-            torrent_on_cm_started(tp);
-        }
-        break;
-    case CM_TEST:
-        if (op->u.test.ok) {
-            assert(cm->npieces_got < tp->npieces);
-            cm->npieces_got++;
-            set_bit(cm->piece_field, op->u.test.piece);
-            if (net_active(tp))
-                dl_on_ok_piece(op->tp->net, op->u.test.piece);
-            if (cm_full(tp))
-                cm_write_done(tp);
-        } else {
-            cm->ncontent_bytes -= torrent_piece_size(tp, op->u.test.piece);
-            bzero(cm->block_field + op->u.test.piece * cm->bppbf, cm->bppbf);
-            if (net_active(tp))
-                dl_on_bad_piece(tp->net, op->u.test.piece);
-        }
-        break;
-    case CM_SAVE:
-    case CM_WRITE:
-        break;
-    }
-    BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
-    free(op);
-    if (!BTPDQ_EMPTY(&cm->todoq))
-        run_todo(cm);
-    else if (!cm->active)
-        torrent_on_cm_stopped(tp);
-}
-
 void
-cm_create(struct torrent *tp)
+cm_create(struct torrent *tp, const char *mi)
 {
     size_t pfield_size = ceil(tp->npieces / 8.0);
     struct content *cm = btpd_calloc(1, sizeof(*cm));
     cm->bppbf = ceil((double)tp->piece_length / (1 << 17));
     cm->piece_field = btpd_calloc(pfield_size, 1);
-    cm->hold_field = btpd_calloc(pfield_size, 1);
     cm->pos_field = btpd_calloc(pfield_size, 1);
     cm->block_field = btpd_calloc(tp->npieces * cm->bppbf, 1);
 
-    BTPDQ_INIT(&cm->todoq);
     evtimer_set(&cm->save_timer, save_timer_cb, tp);
 
     tp->cm = cm;
 }
 
-void
-cm_start(struct torrent *tp)
-{
-    struct content *cm = tp->cm;
-
-    if ((errno = bts_open(&cm->rds, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0)
-        btpd_err("Error opening stream (%s).\n", strerror(errno));
-
-    cm->active = 1;
-    struct cm_op *op = btpd_calloc(1, sizeof(*op));
-    op->tp = tp;
-    op->type = CM_START;
-    add_todo(cm, op);
-}
-
 int
 cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len,
     uint8_t **buf)
@@ -331,32 +299,11 @@ cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len,
     int err =
         bts_get(tp->cm->rds, piece * tp->piece_length + begin, *buf, len);
     if (err != 0)
-        btpd_err("Io error (%s)\n", strerror(err));
+        btpd_err("IO error on '%s' (%s).\n", bts_filename(tp->cm->rds),
+            strerror(err));
     return 0;
 }
 
-static void
-cm_post_alloc(struct torrent *tp, uint32_t piece)
-{
-    struct content *cm = tp->cm;
-    set_bit(cm->hold_field, piece);
-
-    struct cm_op *op = btpd_calloc(1, sizeof(*op));
-    op->tp = tp;
-    op->type = CM_ALLOC;
-    op->u.alloc.piece = piece;
-    op->u.alloc.pos = piece;
-    add_todo(cm, op);
-
-    op = btpd_calloc(1, sizeof(*op));
-    op->tp = tp;
-    op->type = CM_WRITE;
-    op->u.write.piece = piece;
-    op->u.write.pos = piece;
-    BTPDQ_INIT(&op->u.write.q);
-    add_todo(cm, op);
-}
-
 void
 cm_prealloc(struct torrent *tp, uint32_t piece)
 {
@@ -364,30 +311,12 @@ cm_prealloc(struct torrent *tp, uint32_t piece)
 
     if (cm_alloc_size <= 0)
         set_bit(cm->pos_field, piece);
-    else {
-        unsigned npieces = ceil((double)cm_alloc_size / tp->piece_length);
-        uint32_t start = piece - piece % npieces;
-        uint32_t end = min(start + npieces, tp->npieces);
-
-        while (start < end) {
-            if ((!has_bit(cm->pos_field, start)
-                    && !has_bit(cm->hold_field, start)))
-                cm_post_alloc(tp, start);
-            start++;
-        }
-    }
 }
 
 void
 cm_test_piece(struct torrent *tp, uint32_t piece)
 {
-    struct content *cm = tp->cm;
-    struct cm_op *op = btpd_calloc(1, sizeof(*op));
-    op->tp = tp;
-    op->type = CM_TEST;
-    op->u.test.piece = piece;
-    op->u.test.pos = piece;
-    add_todo(cm, op);
+    pct_create(tp, piece, pct_cb);
 }
 
 int
@@ -397,30 +326,33 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
     int err;
     struct content *cm = tp->cm;
 
-    if (has_bit(cm->hold_field, piece)) {
-        struct cm_write_data *d = btpd_calloc(1, sizeof(*d) + len);
-        d->begin = begin;
-        d->len = len;
-        d->buf = (uint8_t *)(d + 1);
-        bcopy(buf, d->buf, len);
-        struct cm_op *op;
-        BTPDQ_FOREACH(op, &cm->todoq, cm_entry)
-            if (op->type == CM_WRITE && op->u.write.piece == piece)
-                break;
-        struct cm_write_data *it;
-        BTPDQ_FOREACH(it, &op->u.write.q, entry)
-            if (it->begin > begin) {
-                BTPDQ_INSERT_BEFORE(it, d, entry);
-                break;
+    if (!has_bit(cm->pos_field, piece)) {
+        unsigned npieces = ceil((double)cm_alloc_size / tp->piece_length);
+        uint32_t start = piece - piece % npieces;
+        uint32_t end = min(start + npieces, tp->npieces);
+
+        while (start < end) {
+            if (!has_bit(cm->pos_field, start)) {
+                off_t len = torrent_piece_size(tp, start);
+                off_t off = tp->piece_length * start;
+                while (len > 0) {
+                    size_t wlen = min(ZEROBUFLEN, len);
+                    if ((err = bts_put(cm->wrs, off, m_zerobuf, wlen)) != 0)
+                        btpd_err("IO error on '%s' (%s).\n",
+                            bts_filename(cm->wrs), strerror(errno));
+
+                    len -= wlen;
+                    off += wlen;
+                }
+                set_bit(cm->pos_field, start);
             }
-        if (it == NULL)
-            BTPDQ_INSERT_TAIL(&op->u.write.q, d, entry);
-    } else {
-        err = bts_put(cm->wrs, piece * tp->piece_length + begin, buf,
-            len);
-        if (err != 0)
-            btpd_err("Io error (%s)\n", strerror(err));
+            start++;
+        }
     }
+    err = bts_put(cm->wrs, piece * tp->piece_length + begin, buf, len);
+    if (err != 0)
+        btpd_err("IO error on '%s' (%s)\n", bts_filename(cm->wrs),
+            strerror(err));
 
     cm->ncontent_bytes += len;
     uint8_t *bf = cm->block_field + piece * cm->bppbf;
@@ -465,114 +397,6 @@ cm_has_piece(struct torrent *tp, uint32_t piece)
     return has_bit(tp->cm->piece_field, piece);
 }
 
-static int
-test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece)
-{
-    char piece_hash[SHA_DIGEST_LENGTH];
-    int fd;
-    int err;
-
-    err = vopen(&fd, O_RDONLY, "torrents/%s/torrent", tp->relpath);
-    if (err != 0)
-        btpd_err("test_hash: %s\n", strerror(err));
-
-    lseek(fd, tp->pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET);
-    read(fd, piece_hash, SHA_DIGEST_LENGTH);
-    close(fd);
-
-    return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
-}
-
-static int
-test_piece(struct torrent *tp, uint32_t pos, uint32_t piece, int *ok)
-{
-    int err;
-    uint8_t hash[SHA_DIGEST_LENGTH];
-    struct bt_stream *bts;
-    if ((err = bts_open(&bts, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0)
-        return err;
-    if ((err = bts_sha(bts, pos * tp->piece_length,
-             torrent_piece_size(tp, piece), hash)) != 0)
-        return err;;
-    bts_close(bts);
-    *ok = test_hash(tp, hash, piece) == 0;
-    return 0;
-}
-
-static void
-cm_td_alloc(struct cm_op *op)
-{
-    struct torrent *tp = op->tp;
-    struct content *cm = tp->cm;
-    uint32_t pos = op->u.alloc.pos;
-    struct bt_stream *bts;
-    int err;
-
-    assert(!has_bit(cm->pos_field, pos));
-
-    if ((err = bts_open(&bts, tp->nfiles, tp->files, fd_cb_wr, tp)) != 0)
-        goto out;
-
-    off_t len = torrent_piece_size(tp, pos);
-    off_t off = tp->piece_length * pos;
-    while (len > 0) {
-        size_t wlen = min(ZEROBUFLEN, len);
-        if ((err = bts_put(bts, off, m_zerobuf, wlen)) != 0) {
-            bts_close(bts);
-            goto out;
-        }
-        len -= wlen;
-        off += wlen;
-    }
-    err = bts_close(bts);
-out:
-    if (err != 0)
-        op->error = 1;
-}
-
-static int
-test_torrent(struct torrent *tp, volatile sig_atomic_t *cancel)
-{
-    int err;
-    FILE *fp;
-    uint8_t (*hashes)[SHA_DIGEST_LENGTH];
-    uint8_t hash[SHA_DIGEST_LENGTH];
-
-    if ((err = vfopen(&fp, "r", "torrents/%s/torrent", tp->relpath)) != 0)
-        return err;
-
-    hashes = btpd_malloc(tp->npieces * SHA_DIGEST_LENGTH);
-    fseek(fp, tp->pieces_off, SEEK_SET);
-    fread(hashes, SHA_DIGEST_LENGTH, tp->npieces, fp);
-    fclose(fp);
-
-    struct content *cm = tp->cm;
-    for (uint32_t piece = 0; piece < tp->npieces; piece++) {
-        if (!has_bit(cm->pos_field, piece))
-            continue;
-        err = bts_sha(cm->rds, piece * tp->piece_length,
-            torrent_piece_size(tp, piece), hash);
-        if (err != 0)
-            break;
-        if (bcmp(hashes[piece], hash, SHA_DIGEST_LENGTH) == 0)
-            set_bit(tp->cm->piece_field, piece);
-        else
-            clear_bit(tp->cm->piece_field, piece);
-        if (*cancel) {
-            err = EINTR;
-            break;
-        }
-    }
-
-    free(hashes);
-    return err;
-}
-
-struct rstat {
-    time_t mtime;
-    off_t size;
-};
-
 int
 stat_and_adjust(struct torrent *tp, struct rstat ret[])
 {
@@ -655,52 +479,13 @@ save_resume(struct torrent *tp, struct rstat sbs[])
     return err;
 }
 
-static void
-cm_td_save(struct cm_op *op)
-{
-    struct torrent *tp = op->tp;
-    struct rstat sbs[tp->nfiles];
-    if (stat_and_adjust(tp, sbs) == 0)
-        save_resume(tp, sbs);
-}
+void start_test_cb(struct torrent *tp, uint32_t piece, int ok);
 
-static void
-cm_td_start(struct cm_op *op)
+void
+start_test_end(struct torrent *tp, int unclean)
 {
-    int err, resume_clean = 0, tested_torrent = 0;
-    struct rstat sbs[op->tp->nfiles];
-    struct torrent *tp = op->tp;
     struct content *cm = tp->cm;
 
-    if ((err = stat_and_adjust(op->tp, sbs)) != 0)
-        goto out;
-
-    resume_clean = load_resume(tp, sbs) == 0;
-    if (!resume_clean) {
-        memset(cm->pos_field, 0xff, ceil(tp->npieces / 8.0));
-        off_t off = 0;
-        for (int i = 0; i < tp->nfiles; i++) {
-            if (sbs[i].size != tp->files[i].length) {
-                uint32_t start, end;
-                end = (off + tp->files[i].length - 1)
-                    / tp->piece_length;
-                start = (off + sbs[i].size) / tp->piece_length;
-                while (start <= end) {
-                    clear_bit(cm->pos_field, start);
-                    clear_bit(cm->piece_field, start);
-                    bzero(cm->block_field + start * cm->bppbf, cm->bppbf);
-                    start++;
-                }
-            }
-            off += tp->files[i].length;
-        }
-        if (op->u.start.cancel)
-            goto out;
-        if ((err = test_torrent(tp, &op->u.start.cancel)) != 0)
-            goto out;
-        tested_torrent = 1;
-    }
-
     bzero(cm->pos_field, ceil(tp->npieces / 8.0));
     for (uint32_t piece = 0; piece < tp->npieces; piece++) {
         if (cm_has_piece(tp, piece)) {
@@ -720,113 +505,128 @@ cm_td_start(struct cm_op *op)
             }
         }
         if (nblocks_got == nblocks) {
-            resume_clean = 0;
-            int ok = 0;
-            if (!tested_torrent) {
-                if (((err = test_piece(tp, piece, piece, &ok)) != 0
-                        || op->u.start.cancel))
-                    goto out;
-            }
-            if (ok) {
-                set_bit(cm->pos_field, piece);
-                set_bit(cm->piece_field, piece);
-            } else {
-                bzero(bf, cm->bppbf);
-                cm->ncontent_bytes -= torrent_piece_size(tp, piece);
-            }
+            bzero(bf, cm->bppbf);
+            cm->ncontent_bytes -= torrent_piece_size(tp, piece);
         } else if (nblocks_got > 0)
             set_bit(cm->pos_field, piece);
     }
+    if (!cm_full(tp)) {
+        int err;
+        if ((err = bts_open(&cm->wrs, tp->nfiles, tp->files,
+                 fd_cb_wr, tp)) != 0)
+            btpd_err("failed to open write stream for '%s' (%s).\n",
+                torrent_name(tp), strerror(err));
+        btpd_ev_add(&cm->save_timer, SAVE_INTERVAL);
+    }
+    if (unclean) {
+        struct start_test_data *std = BTPDQ_FIRST(&m_startq);
 
-    if (!resume_clean)
-        save_resume(tp, sbs);
+        assert(std->tp == tp);
+        BTPDQ_REMOVE(&m_startq, std, entry);
+        save_resume(tp, std->rstat);
+        free(std->rstat);
+        free(std);
 
-out:
-    if (!op->u.start.cancel && err != 0)
-        op->error = 1;
+        if ((std = BTPDQ_FIRST(&m_startq)) != NULL)
+            pct_create(std->tp, std->start, start_test_cb);
+    }
+    cm->state = CM_ACTIVE;
 }
 
-static void
-cm_td_test(struct cm_op *op)
+void
+start_test_cb(struct torrent *tp, uint32_t piece, int ok)
 {
-    if (test_piece(op->tp, op->u.test.pos, op->u.test.piece,
-            &op->u.test.ok) != 0)
-        op->error = 1;
+    struct content *cm = tp->cm;
+    if (ok)
+        set_bit(cm->piece_field, piece);
+    else
+        clear_bit(cm->piece_field, piece);
+    piece++;
+    while (piece < tp->npieces && !has_bit(cm->pos_field, piece))
+        piece++;
+    if (piece < tp->npieces)
+        pct_create(tp, piece, start_test_cb);
+    else
+        start_test_end(tp, 1);
 }
 
-static void
-cm_td_write(struct cm_op *op)
+void
+start_test(struct torrent *tp, struct rstat *sbs)
 {
-    int err;
-    struct cm_write_data *d, *next;
-    off_t base = op->tp->piece_length * op->u.write.pos;
-    struct bt_stream *bts;
-    if ((err = bts_open(&bts, op->tp->nfiles, op->tp->files,
-            fd_cb_wr, op->tp)) != 0)
-        goto out;
-    BTPDQ_FOREACH(d, &op->u.write.q, entry)
-        if ((err = bts_put(bts, base + d->begin, d->buf, d->len)) != 0) {
-            bts_close(bts);
-            goto out;
-        }
-    err = bts_close(bts);
-out:
-    BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next)
-        free(d);
-    if (err)
-        op->error = 1;
+    uint32_t piece = 0;
+    struct content *cm = tp->cm;
+    while (piece < tp->npieces && !has_bit(cm->pos_field, piece))
+        piece++;
+    if (piece < tp->npieces) {
+        struct start_test_data *std = btpd_calloc(1, sizeof(*std));
+        std->tp = tp;
+        std->start = piece;
+        std->rstat = sbs;
+        BTPDQ_INSERT_TAIL(&m_startq, std, entry);
+        if (std == BTPDQ_FIRST(&m_startq))
+            pct_create(tp, piece, start_test_cb);
+    } else {
+        free(sbs);
+        start_test_end(tp, 0);
+    }
 }
 
-static void
-cm_td(void *arg)
-{
-    struct cm_comm *comm = arg;
-    struct cm_op *op;
-    for (;;) {
-        pthread_mutex_lock(&comm->lock);
-        while (BTPDQ_EMPTY(&comm->q))
-            pthread_cond_wait(&comm->cond, &comm->lock);
-
-        op = BTPDQ_FIRST(&comm->q);
-        BTPDQ_REMOVE(&comm->q, op, td_entry);
-        op->received = 1;
-        pthread_mutex_unlock(&comm->lock);
-
-        switch (op->type) {
-        case CM_ALLOC:
-            cm_td_alloc(op);
-            break;
-        case CM_SAVE:
-            cm_td_save(op);
-            break;
-        case CM_START:
-            cm_td_start(op);
-            break;
-        case CM_TEST:
-            cm_td_test(op);
-            break;
-        case CM_WRITE:
-            cm_td_write(op);
-            break;
-        default:
-            abort();
+void
+cm_start(struct torrent *tp)
+{
+    int err, resume_clean = 0;
+    struct rstat *sbs;
+    struct content *cm = tp->cm;
+
+    if ((errno = bts_open(&cm->rds, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0)
+        btpd_err("failed to open stream for '%s' (%s).\n",
+            torrent_name(tp), strerror(errno));
+
+    cm->state = CM_STARTING;
+
+    sbs = btpd_calloc(tp->nfiles, sizeof(*sbs));
+
+    if ((err = stat_and_adjust(tp, sbs)) != 0)
+        btpd_err("failed stat_and_adjust for '%s' (%s).\n",
+            torrent_name(tp), strerror(err));
+
+    resume_clean = load_resume(tp, sbs) == 0;
+    if (!resume_clean) {
+        memset(cm->pos_field, 0xff, ceil(tp->npieces / 8.0));
+        off_t off = 0;
+        for (int i = 0; i < tp->nfiles; i++) {
+            if (sbs[i].size != tp->files[i].length) {
+                uint32_t start, end;
+                end = (off + tp->files[i].length - 1)
+                    / tp->piece_length;
+                start = (off + sbs[i].size) / tp->piece_length;
+                while (start <= end) {
+                    clear_bit(cm->pos_field, start);
+                    clear_bit(cm->piece_field, start);
+                    bzero(cm->block_field + start * cm->bppbf, cm->bppbf);
+                    start++;
+                }
+            }
+            off += tp->files[i].length;
         }
-        td_post_begin();
-        td_post(cm_td_cb, op);
-        td_post_end();
     }
+    for (uint32_t piece = 0; piece < tp->npieces; piece++) {
+        if (has_bit(cm->piece_field, piece))
+            continue;
+        uint8_t *bf = cm->block_field + cm->bppbf * piece;
+        uint32_t nblocks = torrent_piece_blocks(tp, piece);
+        uint32_t block = 0;
+        while (block < nblocks && has_bit(bf, block))
+            block++;
+        if (block == nblocks)
+            set_bit(cm->pos_field, piece);
+    }
+
+    start_test(tp, sbs);
 }
 
 void
 cm_init(void)
 {
-    pthread_t td;
-    BTPDQ_INIT(&m_long_comm.q);
-    pthread_mutex_init(&m_long_comm.lock, NULL);
-    pthread_cond_init(&m_long_comm.cond, NULL);
-    pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_long_comm);
-    BTPDQ_INIT(&m_short_comm.q);
-    pthread_mutex_init(&m_short_comm.lock, NULL);
-    pthread_cond_init(&m_short_comm.cond, NULL);
-    pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_short_comm);
+    evtimer_set(&m_workev, worker_cb, NULL);
 }
diff --git a/btpd/content.h b/btpd/content.h
index 9c92dfe..610b353 100644
--- a/btpd/content.h
+++ b/btpd/content.h
@@ -3,13 +3,14 @@
 
 void cm_init(void);
 
-void cm_create(struct torrent *tp);
+void cm_create(struct torrent *tp, const char *mi);
 void cm_kill(struct torrent *tp);
 
 void cm_start(struct torrent *tp);
 void cm_stop(struct torrent * tp);
 
 int cm_active(struct torrent *tp);
+int cm_started(struct torrent *tp);
 int cm_full(struct torrent *tp);
 
 off_t cm_content(struct torrent *tp);
diff --git a/btpd/td.c b/btpd/td.c
deleted file mode 100644
index f9d10f5..0000000
--- a/btpd/td.c
+++ /dev/null
@@ -1,87 +0,0 @@
-#include <sys/types.h>
-
-#include <pthread.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "btpd.h"
-
-struct td_cb {
-    void (*cb)(void *);
-    void *arg;
-    BTPDQ_ENTRY(td_cb) entry;
-};
-
-BTPDQ_HEAD(td_cb_tq, td_cb);
-
-static int m_td_rd, m_td_wr;
-static struct event m_td_ev;
-static struct td_cb_tq m_td_cbs = BTPDQ_HEAD_INITIALIZER(m_td_cbs);
-static pthread_mutex_t m_td_lock;
-
-void
-td_acquire_lock(void)
-{
-    pthread_mutex_lock(&m_td_lock);
-}
-
-void
-td_release_lock(void)
-{
-    pthread_mutex_unlock(&m_td_lock);
-}
-
-void
-td_post(void (*fun)(void *), void *arg)
-{
-    struct td_cb *cb = btpd_calloc(1, sizeof(*cb));
-    cb->cb = fun;
-    cb->arg = arg;
-    BTPDQ_INSERT_TAIL(&m_td_cbs, cb, entry);
-}
-
-void
-td_post_end(void)
-{
-    char c = '1';
-    td_release_lock();
-    write(m_td_wr, &c, sizeof(c));
-}
-
-static void
-td_cb(int fd, short type, void *arg)
-{
-    char buf[1024];
-    struct td_cb_tq tmpq =  BTPDQ_HEAD_INITIALIZER(tmpq);
-    struct td_cb *cb, *next;
-
-    read(fd, buf, sizeof(buf));
-    td_acquire_lock();
-    BTPDQ_FOREACH_MUTABLE(cb, &m_td_cbs, entry, next)
-        BTPDQ_INSERT_TAIL(&tmpq, cb, entry);
-    BTPDQ_INIT(&m_td_cbs);
-    td_release_lock();
-
-    BTPDQ_FOREACH_MUTABLE(cb, &tmpq, entry, next) {
-        cb->cb(cb->arg);
-        free(cb);
-    }
-}
-
-void
-td_init(void)
-{
-    int err;
-    int fds[2];
-    if (pipe(fds) == -1) {
-        btpd_err("Couldn't create thread callback pipe (%s).\n",
-            strerror(errno));
-    }
-    m_td_rd = fds[0];
-    m_td_wr = fds[1];
-    if ((err = pthread_mutex_init(&m_td_lock, NULL)) != 0)
-        btpd_err("Couldn't create mutex (%s).\n", strerror(err));
-
-    event_set(&m_td_ev, m_td_rd, EV_READ|EV_PERSIST, td_cb, NULL);
-    btpd_ev_add(&m_td_ev, NULL);
-}
diff --git a/btpd/torrent.h b/btpd/torrent.h
index 9fa3792..f35cdff 100644
--- a/btpd/torrent.h
+++ b/btpd/torrent.h
@@ -47,8 +47,6 @@ uint32_t torrent_block_size(struct torrent *tp, uint32_t piece,
     uint32_t nblocks, uint32_t block);
 const char *torrent_name(struct torrent *tp);
 
-void torrent_on_cm_stopped(struct torrent *tp);
-void torrent_on_cm_started(struct torrent *tp);
-void torrent_on_tr_stopped(struct torrent *tp);
+void torrent_on_tick_all(void);
 
 #endif
diff --git a/btpd/tracker_req.c b/btpd/tracker_req.c
index add1462..72f1ebd 100644
--- a/btpd/tracker_req.c
+++ b/btpd/tracker_req.c
@@ -133,7 +133,6 @@ tr_set_stopped(struct torrent *tp)
     tr->ttype = TIMER_NONE;
     if (tr->req != NULL)
         tr_cancel(tr);
-    torrent_on_tr_stopped(tp);
 }
 
 void