Browse Source

Implemented a service thread for operations such as testing piece hashes,

preallocating disk space, etc. Tweaked the cm <-> net api a bit.
master
Richard Nyberg 19 years ago
parent
commit
d324b49aaf
7 changed files with 369 additions and 87 deletions
  1. +1
    -0
      btpd/btpd.c
  2. +347
    -70
      btpd/content.c
  3. +8
    -5
      btpd/content.h
  4. +8
    -8
      btpd/download.c
  5. +3
    -3
      btpd/download.h
  6. +1
    -0
      btpd/download_subr.c
  7. +1
    -1
      btpd/tracker_req.c

+ 1
- 0
btpd/btpd.c View File

@@ -218,6 +218,7 @@ btpd_init(void)
net_init();
//ipc_init();
ul_init();
cm_init();

load_library();



+ 347
- 70
btpd/content.c View File

@@ -1,132 +1,409 @@
#include <fcntl.h>
#include <math.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>

#include <openssl/sha.h>

#include "btpd.h"
#include "stream.h"

struct data {
uint32_t begin;
uint8_t *buf;
size_t len;
BTPDQ_ENTRY(data) entry;
};

BTPDQ_HEAD(data_tq, data);

enum cm_op_type {
CM_ALLOC,
CM_TEST,
CM_WRITE
};

struct cm_op {
struct torrent *tp;

enum cm_op_type type;
union {
struct {
uint32_t piece;
uint32_t pos;
} alloc;
struct {
uint32_t piece;
uint32_t pos;
int ok;
} test;
struct {
uint32_t piece;
uint32_t pos;
struct data_tq q;
} write;
} u;

int error;
char *errmsg;

BTPDQ_ENTRY(cm_op) cm_entry;
BTPDQ_ENTRY(cm_op) td_entry;
};

BTPDQ_HEAD(cm_op_tq, cm_op);

struct content {
uint32_t npieces;
uint8_t *piece_field;
uint32_t npieces_got;

off_t ncontent_bytes;

size_t bppbf; // bytes per piece block field

uint64_t nblocks;
uint8_t *piece_field;
uint8_t *block_field;
uint8_t *hold_field;
uint8_t *pos_field;

uint32_t *piece_map;
struct cm_op_tq todoq;

struct event done;
struct bt_stream *rds;
struct bt_stream *wrs;
};

void
done_cb(int fd, short type, void *arg)
#define ZEROBUFLEN (1 << 14)

static const uint8_t m_zerobuf[ZEROBUFLEN];
static struct cm_op_tq m_tdq = BTPDQ_HEAD_INITIALIZER(m_tdq);
static pthread_mutex_t m_tdq_lock;
static pthread_cond_t m_tdq_cond;

static int
fd_cb_rd(const char *path, int *fd, void *arg)
{
struct torrent *tp = arg;
if (tp->state == T_STARTING)
torrent_cm_cb(tp, CM_STARTED);
else
torrent_cm_cb(tp, CM_STOPPED);
return vopen(fd, O_RDONLY, "library/%s/content/%s", tp->relpath, path);
}

int
cm_start(struct torrent *tp)
static int
fd_cb_wr(const char *path, int *fd, void *arg)
{
struct torrent *tp = arg;
return vopen(fd, O_RDWR|O_CREAT, "library/%s/content/%s", tp->relpath,
path);
}

static void
run_todo(struct content *cm)
{
size_t mem =
sizeof(struct content)
+ sizeof(uint32_t) * tp->meta.npieces
+ ceil(tp->meta.npieces / 8.0)
+ tp->meta.npieces * ceil(tp->meta.piece_length / (double)(1 << 17));
struct cm_op *op = BTPDQ_FIRST(&cm->todoq);

if (op->type == CM_WRITE && BTPDQ_EMPTY(&op->u.write.q)) {
BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
free(op);
if (!BTPDQ_EMPTY(&cm->todoq))
run_todo(cm);
return;
}

tp->cm = btpd_calloc(mem, 1);
tp->cm->piece_map = (uint32_t *)(tp->cm + 1);
tp->cm->piece_field = (uint8_t *)(tp->cm->piece_map + tp->meta.npieces);
tp->cm->block_field = tp->cm->piece_field
+ (size_t)ceil(tp->meta.npieces / 8.0);
pthread_mutex_lock(&m_tdq_lock);
BTPDQ_INSERT_TAIL(&m_tdq, op, td_entry);
pthread_mutex_unlock(&m_tdq_lock);
pthread_cond_signal(&m_tdq_cond);
}

evtimer_set(&tp->cm->done, done_cb, tp);
evtimer_add(&tp->cm->done, (& (struct timeval) { 0, 0 }));
static void
cm_td_cb(void *arg)
{
struct cm_op *op = arg;
struct torrent *tp = op->tp;
struct content *cm = tp->cm;

return 0;
if (op->error)
btpd_err("%s", op->errmsg);

switch (op->type) {
case CM_ALLOC:
set_bit(cm->pos_field, op->u.alloc.pos);
clear_bit(cm->hold_field, op->u.alloc.piece);
break;
case CM_TEST:
if (op->u.test.ok) {
cm->npieces_got++;
set_bit(cm->piece_field, op->u.test.piece);
if (tp->net != NULL)
dl_on_ok_piece(op->tp->net, op->u.test.piece);
} else {
cm->ncontent_bytes -= torrent_piece_size(tp, op->u.test.piece);
bzero(cm->block_field + op->u.test.piece * cm->bppbf, cm->bppbf);
if (tp->net != NULL)
dl_on_bad_piece(tp->net, op->u.test.piece);
}
break;
default:
break;
}
BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
free(op);
if (!BTPDQ_EMPTY(&cm->todoq))
run_todo(cm);
}

void
cm_stop(struct torrent *tp)
static int
test_hash(struct torrent *tp, uint8_t *hash, uint32_t index)
{
evtimer_add(&tp->cm->done, (& (struct timeval) { 0, 0 }));
if (tp->meta.piece_hash != NULL)
return bcmp(hash, tp->meta.piece_hash[index], SHA_DIGEST_LENGTH);
else {
char piece_hash[SHA_DIGEST_LENGTH];
int fd;
int err;

err = vopen(&fd, O_RDONLY, "library/%s/torrent", tp->relpath);
if (err != 0)
btpd_err("test_hash: %s\n", strerror(err));

lseek(fd, tp->meta.pieces_off + index * SHA_DIGEST_LENGTH, SEEK_SET);
read(fd, piece_hash, SHA_DIGEST_LENGTH);
close(fd);

return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
}
}

int
cm_full(struct torrent *tp)
static void
cm_td_alloc(struct cm_op *op)
{
return tp->cm->npieces == tp->meta.npieces;
struct bt_stream *bts;
off_t len = torrent_piece_size(op->tp, op->u.alloc.pos);
off_t off = op->tp->meta.piece_length * op->u.alloc.pos;
bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp);
while (len > 0) {
size_t wlen = min(ZEROBUFLEN, len);
bts_put(bts, off, m_zerobuf, wlen);
len -= wlen;
off += wlen;
}
bts_close(bts);
}

uint8_t *
cm_get_piece_field(struct torrent *tp)
static void
cm_td_test(struct cm_op *op)
{
return tp->cm->piece_field;
uint8_t hash[SHA_DIGEST_LENGTH];
struct bt_stream *bts;
bts_open(&bts, &op->tp->meta, fd_cb_rd, op->tp);
bts_sha(bts, op->u.test.pos * op->tp->meta.piece_length,
torrent_piece_size(op->tp, op->u.test.piece), hash);
bts_close(bts);
op->u.test.ok = test_hash(op->tp, hash, op->u.test.piece) == 0;
}

uint8_t *
cm_get_block_field(struct torrent *tp, uint32_t piece)
static void
cm_td_write(struct cm_op *op)
{
return tp->cm->block_field +
piece * (size_t)ceil(tp->meta.piece_length / (double)(1 << 17));
struct data *d, *next;
off_t base = op->tp->meta.piece_length * op->u.write.pos;
struct bt_stream *bts;
bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp);
BTPDQ_FOREACH(d, &op->u.write.q, entry)
bts_put(bts, base + d->begin, d->buf, d->len);
bts_close(bts);
BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next)
free(d);
}

int
cm_has_piece(struct torrent *tp, uint32_t piece)
static void
cm_td(void *arg)
{
return has_bit(tp->cm->piece_field, piece);
for (;;) {
pthread_mutex_lock(&m_tdq_lock);
while (BTPDQ_EMPTY(&m_tdq))
pthread_cond_wait(&m_tdq_cond, &m_tdq_lock);
struct cm_op *op = BTPDQ_FIRST(&m_tdq);
BTPDQ_REMOVE(&m_tdq, op, td_entry);
pthread_mutex_unlock(&m_tdq_lock);

switch (op->type) {
case CM_ALLOC:
cm_td_alloc(op);
break;
case CM_TEST:
cm_td_test(op);
break;
case CM_WRITE:
cm_td_write(op);
break;
default:
abort();
}
td_post_begin();
td_post(cm_td_cb, op);
td_post_end();
}
}

void
cm_init(void)
{
pthread_t td;
pthread_mutex_init(&m_tdq_lock, NULL);
pthread_cond_init(&m_tdq_cond, NULL);
pthread_create(&td, NULL, (void *(*)(void *))cm_td, NULL);
}

int
cm_put_block(struct torrent *tp, uint32_t piece, uint32_t block,
const char *data)
cm_start(struct torrent *tp)
{
set_bit(tp->cm->block_field +
piece * (size_t)ceil(tp->meta.piece_length / (double)(1 << 17)),
block);
tp->cm->nblocks++;
int err;
struct content *cm = btpd_calloc(1, sizeof(*cm));
size_t pfield_size = ceil(tp->meta.npieces / 8.0);
cm->bppbf = ceil((double)tp->meta.piece_length / (1 << 17));
cm->piece_field = btpd_calloc(pfield_size, 1);
cm->hold_field = btpd_calloc(pfield_size, 1);
cm->pos_field = btpd_calloc(pfield_size, 1);
cm->block_field = btpd_calloc(tp->meta.npieces * cm->bppbf, 1);

BTPDQ_INIT(&cm->todoq);

if ((err = bts_open(&cm->rds, &tp->meta, fd_cb_rd, tp)) != 0)
btpd_err("Error opening stream (%s).\n", strerror(err));
if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0)
btpd_err("Error opening stream (%s).\n", strerror(err));

tp->cm = cm;
torrent_cm_cb(tp, CM_STARTED);
return 0;
}

int
cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len,
char **buf)
uint8_t **buf)
{
*buf = btpd_malloc(len);
int err =
bts_get(tp->cm->rds, piece * tp->meta.piece_length + begin, *buf, len);
if (err != 0)
btpd_err("Io error (%s)\n", strerror(err));
return 0;
}

struct test {
struct piece *pc;
struct event test;
};

static
void test_cb(int fd, short type, void *arg)
void
cm_prealloc(struct torrent *tp, uint32_t piece)
{
struct test *t = arg;
set_bit(t->pc->n->tp->cm->piece_field, t->pc->index);
t->pc->n->tp->cm->npieces++;
dl_on_ok_piece(t->pc);
free(t);
struct content *cm = tp->cm;
if (has_bit(cm->pos_field, piece))
return;

set_bit(cm->hold_field, piece);

int was_empty = BTPDQ_EMPTY(&cm->todoq);
struct cm_op *op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->type = CM_ALLOC;
op->u.alloc.piece = piece;
op->u.alloc.pos = piece;
BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);

op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->type = CM_WRITE;
op->u.write.piece = piece;
op->u.write.pos = piece;
BTPDQ_INIT(&op->u.write.q);
BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);

if (was_empty)
run_todo(cm);
}

void
cm_test_piece(struct piece *pc)
cm_test_piece(struct torrent *tp, uint32_t piece)
{
struct content *cm = tp->cm;
int was_empty = BTPDQ_EMPTY(&cm->todoq);
struct cm_op *op = btpd_calloc(1, sizeof(*op));
op->tp = tp;
op->type = CM_TEST;
op->u.test.piece = piece;
op->u.test.pos = piece;
BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
if (was_empty)
run_todo(cm);
}

int
cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
const uint8_t *buf, size_t len)
{
int err;
struct content *cm = tp->cm;

if (has_bit(cm->hold_field, piece)) {
struct data *d = btpd_calloc(1, sizeof(*d) + len);
d->begin = begin;
d->len = len;
d->buf = (uint8_t *)(d + 1);
bcopy(buf, d->buf, len);
struct cm_op *op;
BTPDQ_FOREACH(op, &cm->todoq, cm_entry)
if (op->type == CM_WRITE && op->u.write.piece == piece)
break;
struct data *it;
BTPDQ_FOREACH(it, &op->u.write.q, entry)
if (it->begin > begin) {
BTPDQ_INSERT_BEFORE(it, d, entry);
break;
}
if (it == NULL)
BTPDQ_INSERT_TAIL(&op->u.write.q, d, entry);
} else {
err = bts_put(cm->wrs, piece * tp->meta.piece_length + begin, buf,
len);
if (err != 0)
btpd_err("Io error (%s)\n", strerror(err));
}

cm->ncontent_bytes += len;
uint8_t *bf = cm->block_field + piece * cm->bppbf;
set_bit(bf, begin / PIECE_BLOCKLEN);

return 0;
}

int
cm_full(struct torrent *tp)
{
return tp->cm->npieces_got == tp->meta.npieces;
}

off_t
cm_get_size(struct torrent *tp)
{
struct test *t = btpd_calloc(1, sizeof(*t));
t->pc = pc;
evtimer_set(&t->test, test_cb, t);
evtimer_add(&t->test, (& (struct timeval) { 0 , 0 }));
return tp->cm->ncontent_bytes;
}

uint32_t
cm_get_npieces(struct torrent *tp)
{
return tp->cm->npieces;
return tp->cm->npieces_got;
}

off_t
cm_bytes_left(struct torrent *tp)
uint8_t *
cm_get_piece_field(struct torrent *tp)
{
return cm_full(tp) ? 0 : tp->meta.total_length;
return tp->cm->piece_field;
}

uint8_t *
cm_get_block_field(struct torrent *tp, uint32_t piece)
{
return tp->cm->block_field + piece * tp->cm->bppbf;
}

int
cm_has_piece(struct torrent *tp, uint32_t piece)
{
return has_bit(tp->cm->piece_field, piece);
}

+ 8
- 5
btpd/content.h View File

@@ -1,6 +1,8 @@
#ifndef BTPD_CONTENT_H
#define BTPD_CONTENT_H

void cm_init(void);

int cm_start(struct torrent *tp);
void cm_stop(struct torrent * tp);

@@ -13,13 +15,14 @@ uint32_t cm_get_npieces(struct torrent *tp);

int cm_has_piece(struct torrent *tp, uint32_t piece);

int cm_put_block(struct torrent *tp, uint32_t piece, uint32_t block,
const char *buf);
int cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
const uint8_t *buf, size_t len);
int cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
size_t len, char **buf);
size_t len, uint8_t **buf);

void cm_test_piece(struct piece *pc);
void cm_prealloc(struct torrent *tp, uint32_t piece);
void cm_test_piece(struct torrent *tp, uint32_t piece);

off_t cm_bytes_left(struct torrent *tp);
off_t cm_get_size(struct torrent *tp);

#endif

+ 8
- 8
btpd/download.c View File

@@ -77,10 +77,10 @@ dl_on_choke(struct peer *p)
* Called when a piece has been tested positively.
*/
void
dl_on_ok_piece(struct piece *pc)
dl_on_ok_piece(struct net *n, uint32_t piece)
{
struct peer *p;
struct net *n = pc->n;
struct piece *pc = dl_find_piece(n, piece);

btpd_log(BTPD_L_POL, "Got piece: %u.\n", pc->index);

@@ -108,9 +108,9 @@ dl_on_ok_piece(struct piece *pc)
* Called when a piece has been tested negatively.
*/
void
dl_on_bad_piece(struct piece *pc)
dl_on_bad_piece(struct net *n, uint32_t piece)
{
struct net *n = pc->n;
struct piece *pc = dl_find_piece(n, piece);

btpd_log(BTPD_L_ERROR, "Bad hash for piece %u of %s.\n",
pc->index, n->tp->relpath);
@@ -151,13 +151,13 @@ dl_on_lost_peer(struct peer *p)

void
dl_on_block(struct peer *p, struct block_request *req,
uint32_t index, uint32_t begin, uint32_t length, const char *data)
uint32_t index, uint32_t begin, uint32_t length, const uint8_t *data)
{
struct net *n = p->n;
struct block *blk = req->blk;
struct piece *pc = blk->pc;

cm_put_block(p->n->tp, index, begin / PIECE_BLOCKLEN, data);
cm_put_bytes(p->n->tp, index, begin, data, length);
pc->ngot++;

if (n->endgame) {
@@ -181,7 +181,7 @@ dl_on_block(struct peer *p, struct block_request *req,
}
BTPDQ_INIT(&blk->reqs);
if (pc->ngot == pc->nblocks)
cm_test_piece(pc);
cm_test_piece(pc->n->tp, pc->index);
} else {
BTPDQ_REMOVE(&blk->reqs, req, blk_entry);
free(req);
@@ -190,7 +190,7 @@ dl_on_block(struct peer *p, struct block_request *req,
clear_bit(pc->down_field, begin / PIECE_BLOCKLEN);
pc->nbusy--;
if (pc->ngot == pc->nblocks)
cm_test_piece(pc);
cm_test_piece(pc->n->tp, pc->index);
if (peer_leech_ok(p) && !peer_laden(p))
dl_assign_requests(p);
}


+ 3
- 3
btpd/download.h View File

@@ -28,9 +28,9 @@ void dl_on_download(struct peer *p);
void dl_on_undownload(struct peer *p);
void dl_on_piece_ann(struct peer *p, uint32_t index);
void dl_on_block(struct peer *p, struct block_request *req,
uint32_t index, uint32_t begin, uint32_t length, const char *data);
uint32_t index, uint32_t begin, uint32_t length, const uint8_t *data);

void dl_on_ok_piece(struct piece *pc);
void dl_on_bad_piece(struct piece *pc);
void dl_on_ok_piece(struct net *n, uint32_t piece);
void dl_on_bad_piece(struct net *n, uint32_t piece);

#endif

+ 1
- 0
btpd/download_subr.c View File

@@ -278,6 +278,7 @@ struct piece *
dl_new_piece(struct net *n, uint32_t index)
{
btpd_log(BTPD_L_POL, "Started on piece %u.\n", index);
cm_prealloc(n->tp, index);
return piece_alloc(n, index);
}



+ 1
- 1
btpd/tracker_req.c View File

@@ -181,7 +181,7 @@ tr_send(struct torrent *tp, enum tr_event event)
"&downloaded=%ju&left=%ju&compact=1%s%s",
tp->meta.announce, qc, e_hash, e_id, net_port,
(intmax_t)tp->net->uploaded, (intmax_t)tp->net->downloaded,
(intmax_t)cm_bytes_left(tp),
(intmax_t)tp->meta.total_length - cm_get_size(tp),
event == TR_EV_EMPTY ? "" : "&event=", m_events[event]);
}



Loading…
Cancel
Save