@@ -1,6 +1,10 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <fcntl.h>
#include <math.h>
#include <math.h>
#include <pthread.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <string.h>
#include <unistd.h>
#include <unistd.h>
@@ -9,30 +13,36 @@
#include "btpd.h"
#include "btpd.h"
#include "stream.h"
#include "stream.h"
struct data {
struct cm_write_ data {
uint32_t begin;
uint32_t begin;
uint8_t *buf;
uint8_t *buf;
size_t len;
size_t len;
BTPDQ_ENTRY(data) entry;
BTPDQ_ENTRY(cm_write_ data) entry;
};
};
BTPDQ_HEAD(data_tq, data);
BTPDQ_HEAD(cm_write_ data_tq, cm_write_ data);
enum cm_op_type {
enum cm_op_type {
CM_ALLOC,
CM_ALLOC,
CM_SAVE,
CM_START,
CM_TEST,
CM_TEST,
CM_WRITE
CM_WRITE
};
};
struct cm_op {
struct cm_op {
struct torrent *tp;
struct torrent *tp;
int error;
int received;
enum cm_op_type type;
enum cm_op_type type;
union {
union {
struct {
struct {
uint32_t piece;
uint32_t piece;
uint32_t pos;
uint32_t pos;
} alloc;
} alloc;
struct {
volatile sig_atomic_t cancel;
} start;
struct {
struct {
uint32_t piece;
uint32_t piece;
uint32_t pos;
uint32_t pos;
@@ -41,13 +51,10 @@ struct cm_op {
struct {
struct {
uint32_t piece;
uint32_t piece;
uint32_t pos;
uint32_t pos;
struct data_tq q;
struct cm_write_ data_tq q;
} write;
} write;
} u;
} u;
int error;
char *errmsg;
BTPDQ_ENTRY(cm_op) cm_entry;
BTPDQ_ENTRY(cm_op) cm_entry;
BTPDQ_ENTRY(cm_op) td_entry;
BTPDQ_ENTRY(cm_op) td_entry;
};
};
@@ -55,6 +62,8 @@ struct cm_op {
BTPDQ_HEAD(cm_op_tq, cm_op);
BTPDQ_HEAD(cm_op_tq, cm_op);
struct content {
struct content {
int active;
uint32_t npieces_got;
uint32_t npieces_got;
off_t ncontent_bytes;
off_t ncontent_bytes;
@@ -74,10 +83,14 @@ struct content {
#define ZEROBUFLEN (1 << 14)
#define ZEROBUFLEN (1 << 14)
struct cm_comm {
struct cm_op_tq q;
pthread_mutex_t lock;
pthread_cond_t cond;
};
static struct cm_comm m_long_comm, m_short_comm;
static const uint8_t m_zerobuf[ZEROBUFLEN];
static const uint8_t m_zerobuf[ZEROBUFLEN];
static struct cm_op_tq m_tdq = BTPDQ_HEAD_INITIALIZER(m_tdq);
static pthread_mutex_t m_tdq_lock;
static pthread_cond_t m_tdq_cond;
static int
static int
fd_cb_rd(const char *path, int *fd, void *arg)
fd_cb_rd(const char *path, int *fd, void *arg)
@@ -94,6 +107,27 @@ fd_cb_wr(const char *path, int *fd, void *arg)
path);
path);
}
}
static void
cm_td_post_common(struct cm_comm *comm, struct cm_op *op)
{
pthread_mutex_lock(&comm->lock);
BTPDQ_INSERT_TAIL(&comm->q, op, td_entry);
pthread_mutex_unlock(&comm->lock);
pthread_cond_signal(&comm->cond);
}
static void
cm_td_post_long(struct cm_op *op)
{
cm_td_post_common(&m_long_comm, op);
}
static void
cm_td_post_short(struct cm_op *op)
{
cm_td_post_common(&m_short_comm, op);
}
static void
static void
run_todo(struct content *cm)
run_todo(struct content *cm)
{
{
@@ -107,10 +141,65 @@ run_todo(struct content *cm)
return;
return;
}
}
pthread_mutex_lock(&m_tdq_lock);
BTPDQ_INSERT_TAIL(&m_tdq, op, td_entry);
pthread_mutex_unlock(&m_tdq_lock);
pthread_cond_signal(&m_tdq_cond);
if (op->type != CM_START)
cm_td_post_short(op);
else
cm_td_post_long(op);
}
static void
add_todo(struct content *cm, struct cm_op *op)
{
int was_empty = BTPDQ_EMPTY(&cm->todoq);
BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
if (was_empty)
run_todo(cm);
}
void
cm_destroy(struct torrent *tp)
{
struct content *cm = tp->cm;
bts_close(cm->rds);
free(cm->piece_field);
free(cm->block_field);
free(cm->hold_field);
free(cm->pos_field);
tp->cm = NULL;
torrent_on_cm_stopped(tp);
}
void
cm_save(struct torrent *tp)
{
struct content *cm = tp->cm;
struct cm_op *op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->type = CM_SAVE;
add_todo(cm, op);
}
void
cm_stop(struct torrent *tp)
{
struct content *cm = tp->cm;
struct cm_op *op = BTPDQ_FIRST(&cm->todoq);
if (op != NULL && op->type == CM_START) {
pthread_mutex_lock(&m_long_comm.lock);
if (op->received)
op->u.start.cancel = 1;
else {
BTPDQ_REMOVE(&m_long_comm.q, op, td_entry);
BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
free(op);
}
pthread_mutex_unlock(&m_long_comm.lock);
} else if (cm->npieces_got < tp->meta.npieces)
cm_save(tp);
if (BTPDQ_EMPTY(&cm->todoq))
cm_destroy(tp);
}
}
static void
static void
@@ -121,19 +210,28 @@ cm_td_cb(void *arg)
struct content *cm = tp->cm;
struct content *cm = tp->cm;
if (op->error)
if (op->error)
btpd_err("%s", op->errmsg );
btpd_err("IO error for %s.\n", tp->relpath );
switch (op->type) {
switch (op->type) {
case CM_ALLOC:
case CM_ALLOC:
set_bit(cm->pos_field, op->u.alloc.pos);
set_bit(cm->pos_field, op->u.alloc.pos);
clear_bit(cm->hold_field, op->u.alloc.piece);
clear_bit(cm->hold_field, op->u.alloc.piece);
break;
break;
case CM_START:
if (cm->active) {
assert(!op->u.start.cancel);
torrent_on_cm_started(tp);
}
break;
case CM_TEST:
case CM_TEST:
if (op->u.test.ok) {
if (op->u.test.ok) {
assert(cm->npieces_got < tp->meta.npieces);
cm->npieces_got++;
cm->npieces_got++;
set_bit(cm->piece_field, op->u.test.piece);
set_bit(cm->piece_field, op->u.test.piece);
if (tp->net != NULL)
if (tp->net != NULL)
dl_on_ok_piece(op->tp->net, op->u.test.piece);
dl_on_ok_piece(op->tp->net, op->u.test.piece);
if (cm_full(tp))
cm_save(tp);
} else {
} else {
cm->ncontent_bytes -= torrent_piece_size(tp, op->u.test.piece);
cm->ncontent_bytes -= torrent_piece_size(tp, op->u.test.piece);
bzero(cm->block_field + op->u.test.piece * cm->bppbf, cm->bppbf);
bzero(cm->block_field + op->u.test.piece * cm->bppbf, cm->bppbf);
@@ -141,116 +239,16 @@ cm_td_cb(void *arg)
dl_on_bad_piece(tp->net, op->u.test.piece);
dl_on_bad_piece(tp->net, op->u.test.piece);
}
}
break;
break;
default:
case CM_SAVE:
case CM_WRITE:
break;
break;
}
}
BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
free(op);
free(op);
if (!BTPDQ_EMPTY(&cm->todoq))
if (!BTPDQ_EMPTY(&cm->todoq))
run_todo(cm);
run_todo(cm);
}
static int
test_hash(struct torrent *tp, uint8_t *hash, uint32_t index)
{
if (tp->meta.piece_hash != NULL)
return bcmp(hash, tp->meta.piece_hash[index], SHA_DIGEST_LENGTH);
else {
char piece_hash[SHA_DIGEST_LENGTH];
int fd;
int err;
err = vopen(&fd, O_RDONLY, "library/%s/torrent", tp->relpath);
if (err != 0)
btpd_err("test_hash: %s\n", strerror(err));
lseek(fd, tp->meta.pieces_off + index * SHA_DIGEST_LENGTH, SEEK_SET);
read(fd, piece_hash, SHA_DIGEST_LENGTH);
close(fd);
return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
}
}
static void
cm_td_alloc(struct cm_op *op)
{
struct bt_stream *bts;
off_t len = torrent_piece_size(op->tp, op->u.alloc.pos);
off_t off = op->tp->meta.piece_length * op->u.alloc.pos;
bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp);
while (len > 0) {
size_t wlen = min(ZEROBUFLEN, len);
bts_put(bts, off, m_zerobuf, wlen);
len -= wlen;
off += wlen;
}
bts_close(bts);
}
static void
cm_td_test(struct cm_op *op)
{
uint8_t hash[SHA_DIGEST_LENGTH];
struct bt_stream *bts;
bts_open(&bts, &op->tp->meta, fd_cb_rd, op->tp);
bts_sha(bts, op->u.test.pos * op->tp->meta.piece_length,
torrent_piece_size(op->tp, op->u.test.piece), hash);
bts_close(bts);
op->u.test.ok = test_hash(op->tp, hash, op->u.test.piece) == 0;
}
static void
cm_td_write(struct cm_op *op)
{
struct data *d, *next;
off_t base = op->tp->meta.piece_length * op->u.write.pos;
struct bt_stream *bts;
bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp);
BTPDQ_FOREACH(d, &op->u.write.q, entry)
bts_put(bts, base + d->begin, d->buf, d->len);
bts_close(bts);
BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next)
free(d);
}
static void
cm_td(void *arg)
{
for (;;) {
pthread_mutex_lock(&m_tdq_lock);
while (BTPDQ_EMPTY(&m_tdq))
pthread_cond_wait(&m_tdq_cond, &m_tdq_lock);
struct cm_op *op = BTPDQ_FIRST(&m_tdq);
BTPDQ_REMOVE(&m_tdq, op, td_entry);
pthread_mutex_unlock(&m_tdq_lock);
switch (op->type) {
case CM_ALLOC:
cm_td_alloc(op);
break;
case CM_TEST:
cm_td_test(op);
break;
case CM_WRITE:
cm_td_write(op);
break;
default:
abort();
}
td_post_begin();
td_post(cm_td_cb, op);
td_post_end();
}
}
void
cm_init(void)
{
pthread_t td;
pthread_mutex_init(&m_tdq_lock, NULL);
pthread_cond_init(&m_tdq_cond, NULL);
pthread_create(&td, NULL, (void *(*)(void *))cm_td, NULL);
else if (!cm->active)
cm_destroy(tp);
}
}
int
int
@@ -259,6 +257,7 @@ cm_start(struct torrent *tp)
int err;
int err;
struct content *cm = btpd_calloc(1, sizeof(*cm));
struct content *cm = btpd_calloc(1, sizeof(*cm));
size_t pfield_size = ceil(tp->meta.npieces / 8.0);
size_t pfield_size = ceil(tp->meta.npieces / 8.0);
cm->active = 1;
cm->bppbf = ceil((double)tp->meta.piece_length / (1 << 17));
cm->bppbf = ceil((double)tp->meta.piece_length / (1 << 17));
cm->piece_field = btpd_calloc(pfield_size, 1);
cm->piece_field = btpd_calloc(pfield_size, 1);
cm->hold_field = btpd_calloc(pfield_size, 1);
cm->hold_field = btpd_calloc(pfield_size, 1);
@@ -269,11 +268,12 @@ cm_start(struct torrent *tp)
if ((err = bts_open(&cm->rds, &tp->meta, fd_cb_rd, tp)) != 0)
if ((err = bts_open(&cm->rds, &tp->meta, fd_cb_rd, tp)) != 0)
btpd_err("Error opening stream (%s).\n", strerror(err));
btpd_err("Error opening stream (%s).\n", strerror(err));
if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0)
btpd_err("Error opening stream (%s).\n", strerror(err));
tp->cm = cm;
tp->cm = cm;
torrent_cm_cb(tp, CM_STARTED);
struct cm_op *op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->type = CM_START;
add_todo(cm, op);
return 0;
return 0;
}
}
@@ -289,22 +289,18 @@ cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len,
return 0;
return 0;
}
}
void
cm_pre alloc(struct torrent *tp, uint32_t piece)
static void
cm_post_ alloc(struct torrent *tp, uint32_t piece)
{
{
struct content *cm = tp->cm;
struct content *cm = tp->cm;
if (has_bit(cm->pos_field, piece))
return;
set_bit(cm->hold_field, piece);
set_bit(cm->hold_field, piece);
int was_empty = BTPDQ_EMPTY(&cm->todoq);
struct cm_op *op = btpd_calloc(1, sizeof(*op));
struct cm_op *op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->tp = tp;
op->type = CM_ALLOC;
op->type = CM_ALLOC;
op->u.alloc.piece = piece;
op->u.alloc.piece = piece;
op->u.alloc.pos = piece;
op->u.alloc.pos = piece;
BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry );
add_todo(cm, op );
op = btpd_calloc(1, sizeof(*op));
op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->tp = tp;
@@ -312,25 +308,40 @@ cm_prealloc(struct torrent *tp, uint32_t piece)
op->u.write.piece = piece;
op->u.write.piece = piece;
op->u.write.pos = piece;
op->u.write.pos = piece;
BTPDQ_INIT(&op->u.write.q);
BTPDQ_INIT(&op->u.write.q);
BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
add_todo(cm, op);
}
if (was_empty)
run_todo(cm);
void
cm_prealloc(struct torrent *tp, uint32_t piece)
{
struct content *cm = tp->cm;
if (cm_alloc_size == 0)
set_bit(cm->pos_field, piece);
else {
unsigned npieces = ceil((double)cm_alloc_size / tp->meta.piece_length);
uint32_t start = piece - piece % npieces;
uint32_t end = min(start + npieces, tp->meta.npieces);
while (start < end) {
if ((!has_bit(cm->pos_field, start)
&& !has_bit(cm->hold_field, start)))
cm_post_alloc(tp, start);
start++;
}
}
}
}
void
void
cm_test_piece(struct torrent *tp, uint32_t piece)
cm_test_piece(struct torrent *tp, uint32_t piece)
{
{
struct content *cm = tp->cm;
struct content *cm = tp->cm;
int was_empty = BTPDQ_EMPTY(&cm->todoq);
struct cm_op *op = btpd_calloc(1, sizeof(*op));
struct cm_op *op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->tp = tp;
op->type = CM_TEST;
op->type = CM_TEST;
op->u.test.piece = piece;
op->u.test.piece = piece;
op->u.test.pos = piece;
op->u.test.pos = piece;
BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
if (was_empty)
run_todo(cm);
add_todo(cm, op);
}
}
int
int
@@ -341,7 +352,7 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
struct content *cm = tp->cm;
struct content *cm = tp->cm;
if (has_bit(cm->hold_field, piece)) {
if (has_bit(cm->hold_field, piece)) {
struct data *d = btpd_calloc(1, sizeof(*d) + len);
struct cm_write_ data *d = btpd_calloc(1, sizeof(*d) + len);
d->begin = begin;
d->begin = begin;
d->len = len;
d->len = len;
d->buf = (uint8_t *)(d + 1);
d->buf = (uint8_t *)(d + 1);
@@ -350,7 +361,7 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
BTPDQ_FOREACH(op, &cm->todoq, cm_entry)
BTPDQ_FOREACH(op, &cm->todoq, cm_entry)
if (op->type == CM_WRITE && op->u.write.piece == piece)
if (op->type == CM_WRITE && op->u.write.piece == piece)
break;
break;
struct data *it;
struct cm_write_ data *it;
BTPDQ_FOREACH(it, &op->u.write.q, entry)
BTPDQ_FOREACH(it, &op->u.write.q, entry)
if (it->begin > begin) {
if (it->begin > begin) {
BTPDQ_INSERT_BEFORE(it, d, entry);
BTPDQ_INSERT_BEFORE(it, d, entry);
@@ -407,3 +418,383 @@ cm_has_piece(struct torrent *tp, uint32_t piece)
{
{
return has_bit(tp->cm->piece_field, piece);
return has_bit(tp->cm->piece_field, piece);
}
}
static int
test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece)
{
if (tp->meta.piece_hash != NULL)
return bcmp(hash, tp->meta.piece_hash[piece], SHA_DIGEST_LENGTH);
else {
char piece_hash[SHA_DIGEST_LENGTH];
int fd;
int err;
err = vopen(&fd, O_RDONLY, "library/%s/torrent", tp->relpath);
if (err != 0)
btpd_err("test_hash: %s\n", strerror(err));
lseek(fd, tp->meta.pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET);
read(fd, piece_hash, SHA_DIGEST_LENGTH);
close(fd);
return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
}
}
static int
test_piece(struct torrent *tp, uint32_t pos, uint32_t piece, int *ok)
{
int err;
uint8_t hash[SHA_DIGEST_LENGTH];
struct bt_stream *bts;
if ((err = bts_open(&bts, &tp->meta, fd_cb_rd, tp)) != 0)
return err;
if ((err = bts_sha(bts, pos * tp->meta.piece_length,
torrent_piece_size(tp, piece), hash)) != 0)
return err;;
bts_close(bts);
*ok = test_hash(tp, hash, piece) == 0;
return 0;
}
static void
cm_td_alloc(struct cm_op *op)
{
struct torrent *tp = op->tp;
struct content *cm = tp->cm;
uint32_t pos = op->u.alloc.pos;
struct bt_stream *bts;
int err;
assert(!has_bit(cm->pos_field, pos));
if ((err = bts_open(&bts, &tp->meta, fd_cb_wr, tp)) != 0)
goto out;
off_t len = torrent_piece_size(tp, pos);
off_t off = tp->meta.piece_length * pos;
while (len > 0) {
size_t wlen = min(ZEROBUFLEN, len);
if ((err = bts_put(bts, off, m_zerobuf, wlen)) != 0) {
bts_close(bts);
goto out;
}
len -= wlen;
off += wlen;
}
err = bts_close(bts);
out:
if (err != 0)
op->error = 1;
}
static int
test_torrent(struct torrent *tp, volatile sig_atomic_t *cancel)
{
int err;
FILE *fp;
uint8_t (*hashes)[SHA_DIGEST_LENGTH];
uint8_t hash[SHA_DIGEST_LENGTH];
if ((err = vfopen(&fp, "r", "library/%s/torrent", tp->relpath)) != 0)
return err;
hashes = btpd_malloc(tp->meta.npieces * SHA_DIGEST_LENGTH);
fseek(fp, tp->meta.pieces_off, SEEK_SET);
fread(hashes, SHA_DIGEST_LENGTH, tp->meta.npieces, fp);
fclose(fp);
tp->meta.piece_hash = hashes;
struct content *cm = tp->cm;
for (uint32_t piece = 0; piece < tp->meta.npieces; piece++) {
if (!has_bit(cm->pos_field, piece))
continue;
err = bts_sha(cm->rds, piece * tp->meta.piece_length,
torrent_piece_size(tp, piece), hash);
if (err != 0)
break;
if (test_hash(tp, hash, piece) == 0)
set_bit(tp->cm->piece_field, piece);
if (*cancel) {
err = EINTR;
break;
}
}
tp->meta.piece_hash = NULL;
free(hashes);
return err;
}
int
stat_and_adjust(struct torrent *tp, struct stat ret[])
{
char path[PATH_MAX];
for (int i = 0; i < tp->meta.nfiles; i++) {
snprintf(path, PATH_MAX, "library/%s/content/%s", tp->relpath,
tp->meta.files[i].path);
again:
if (stat(path, &ret[i]) == -1) {
if (errno == ENOENT) {
ret[i].st_mtime = -1;
ret[i].st_size = -1;
} else
return errno;
}
if (ret[i].st_size > tp->meta.files[i].length) {
if (truncate(path, tp->meta.files[i].length) != 0)
return errno;
goto again;
}
}
return 0;
}
static int
load_resume(struct torrent *tp, struct stat sbs[])
{
int err, ver;
FILE *fp;
size_t pfsiz = ceil(tp->meta.npieces / 8.0);
size_t bfsiz = tp->meta.npieces * tp->cm->bppbf;
if ((err = vfopen(&fp, "r" , "library/%s/resume", tp->relpath)) != 0)
return err;
if (fscanf(fp, "%d\n", &ver) != 1)
goto invalid;
if (ver != 1)
goto invalid;
for (int i = 0; i < tp->meta.nfiles; i++) {
long long size;
time_t time;
if (fscanf(fp, "%qd %ld\n", &size, &time) != 2)
goto invalid;
if (sbs[i].st_size != size || sbs[i].st_mtime != time)
goto invalid;
}
if (fread(tp->cm->piece_field, 1, pfsiz, fp) != pfsiz)
goto invalid;
if (fread(tp->cm->block_field, 1, bfsiz, fp) != bfsiz)
goto invalid;
fclose(fp);
return 0;
invalid:
fclose(fp);
bzero(tp->cm->piece_field, pfsiz);
bzero(tp->cm->block_field, bfsiz);
return EINVAL;
}
static int
save_resume(struct torrent *tp)
{
int err;
FILE *fp;
struct stat sbs[tp->meta.nfiles];
if ((err = stat_and_adjust(tp, sbs)) != 0)
return err;
if ((err = vfopen(&fp, "wb", "library/%s/resume", tp->relpath)) != 0)
return err;
fprintf(fp, "%d\n", 1);
for (int i = 0; i < tp->meta.nfiles; i++)
fprintf(fp, "%qd %ld\n", (long long)sbs[i].st_size, sbs[i].st_mtime);
fwrite(tp->cm->piece_field, 1, ceil(tp->meta.npieces / 8.0), fp);
fwrite(tp->cm->block_field, 1, tp->meta.npieces * tp->cm->bppbf, fp);
if (fclose(fp) != 0)
err = errno;
return err;
}
static void
cm_td_save(struct cm_op *op)
{
int err;
struct torrent *tp = op->tp;
struct content *cm = tp->cm;
struct bt_stream *bts = cm->wrs;
cm->wrs = NULL;
if ((err = bts_close(bts)) != 0)
goto out;
for (int i = 0; i < tp->meta.nfiles; i++) {
int lerr;
if ((lerr = vfsync("library/%s/content/%s", tp->relpath,
tp->meta.files[i])) != 0 && lerr != ENOENT)
err = lerr;
}
if (err != 0)
goto out;
save_resume(tp);
out:
if (err != 0)
op->error = 1;
}
static void
cm_td_start(struct cm_op *op)
{
int err;
struct stat sbs[op->tp->meta.nfiles];
struct torrent *tp = op->tp;
struct content *cm = tp->cm;
if ((err = stat_and_adjust(op->tp, sbs)) != 0)
goto out;
if (load_resume(tp, sbs) != 0) {
memset(cm->pos_field, 0xff, ceil(tp->meta.npieces / 8.0));
off_t off = 0;
for (int i = 0; i < tp->meta.nfiles; i++) {
if (sbs[i].st_size == -1 || sbs[i].st_size == 0) {
uint32_t start = off / tp->meta.piece_length;
uint32_t end = (off + tp->meta.files[i].length - 1) /
tp->meta.piece_length;
while (start <= end) {
clear_bit(cm->pos_field, start);
start++;
}
} else if (sbs[i].st_size < tp->meta.files[i].length) {
uint32_t start = (off + sbs[i].st_size) /
tp->meta.piece_length;
uint32_t end = (off + tp->meta.files[i].length - 1) /
tp->meta.piece_length;
while (start <= end) {
clear_bit(cm->pos_field, start);
start++;
}
}
off += tp->meta.files[i].length;
}
if (op->u.start.cancel)
goto out;
if ((err = test_torrent(tp, &op->u.start.cancel)) != 0)
goto out;
save_resume(tp);
}
bzero(cm->pos_field, ceil(tp->meta.npieces / 8.0));
for (uint32_t piece = 0; piece < tp->meta.npieces; piece++) {
if (cm_has_piece(tp, piece)) {
cm->ncontent_bytes += torrent_piece_size(tp, piece);
cm->npieces_got++;
set_bit(cm->pos_field, piece);
continue;
}
uint8_t *bf = cm->block_field + cm->bppbf * piece;
uint32_t nblocks = torrent_piece_blocks(tp, piece);
uint32_t nblocks_got = 0;
for (uint32_t i = 0; i < nblocks; i++) {
if (has_bit(bf, i)) {
nblocks_got++;
cm->ncontent_bytes +=
torrent_block_size(tp, piece, nblocks, i);
}
}
if (nblocks_got == nblocks) {
int ok;
if (((err = test_piece(tp, piece, piece, &ok)) != 0
|| op->u.start.cancel))
goto out;
if (ok) {
set_bit(cm->pos_field, piece);
set_bit(cm->piece_field, piece);
} else
bzero(bf, cm->bppbf);
} else if (nblocks_got > 0)
set_bit(cm->pos_field, piece);
}
if (cm->npieces_got < tp->meta.npieces)
if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0)
goto out;
out:
if (!op->u.start.cancel && err != 0)
op->error = 1;
}
static void
cm_td_test(struct cm_op *op)
{
if (test_piece(op->tp, op->u.test.pos, op->u.test.piece,
&op->u.test.ok) != 0)
op->error = 1;
}
static void
cm_td_write(struct cm_op *op)
{
int err;
struct cm_write_data *d, *next;
off_t base = op->tp->meta.piece_length * op->u.write.pos;
struct bt_stream *bts;
if ((err = bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp)) != 0)
goto out;
BTPDQ_FOREACH(d, &op->u.write.q, entry)
if ((err = bts_put(bts, base + d->begin, d->buf, d->len)) != 0) {
bts_close(bts);
goto out;
}
err = bts_close(bts);
out:
BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next)
free(d);
if (err)
op->error = 1;
}
static void
cm_td(void *arg)
{
struct cm_comm *comm = arg;
struct cm_op *op;
for (;;) {
pthread_mutex_lock(&comm->lock);
while (BTPDQ_EMPTY(&comm->q))
pthread_cond_wait(&comm->cond, &comm->lock);
op = BTPDQ_FIRST(&comm->q);
BTPDQ_REMOVE(&comm->q, op, td_entry);
op->received = 1;
pthread_mutex_unlock(&comm->lock);
switch (op->type) {
case CM_ALLOC:
cm_td_alloc(op);
break;
case CM_SAVE:
cm_td_save(op);
break;
case CM_START:
cm_td_start(op);
break;
case CM_TEST:
cm_td_test(op);
break;
case CM_WRITE:
cm_td_write(op);
break;
default:
abort();
}
td_post_begin();
td_post(cm_td_cb, op);
td_post_end();
}
}
void
cm_init(void)
{
pthread_t td;
BTPDQ_INIT(&m_long_comm.q);
pthread_mutex_init(&m_long_comm.lock, NULL);
pthread_cond_init(&m_long_comm.cond, NULL);
pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_long_comm);
BTPDQ_INIT(&m_short_comm.q);
pthread_mutex_init(&m_short_comm.lock, NULL);
pthread_cond_init(&m_short_comm.cond, NULL);
pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_short_comm);
}