Browse Source

o Make errors when reading or writing torrent data non fatal. Instead of

killing btpd, only the troublesome torrent will be stopped.
o Some code shuffle.
master
Richard Nyberg 18 years ago
parent
commit
8d7898d728
8 changed files with 315 additions and 284 deletions
  1. +6
    -6
      btpd/cli_if.c
  2. +189
    -260
      btpd/content.c
  3. +2
    -1
      btpd/content.h
  4. +1
    -12
      btpd/download.c
  5. +75
    -0
      btpd/tlib.c
  6. +16
    -0
      btpd/tlib.h
  7. +24
    -4
      btpd/torrent.c
  8. +2
    -1
      btpd/torrent.h

+ 6
- 6
btpd/cli_if.c View File

@@ -156,13 +156,13 @@ write_ans(struct io_buffer *iob, struct tlib *tl, enum ipc_tval val)
ts = IPC_TSTATE_START;
break;
case T_STOPPING:
ts= IPC_TSTATE_STOP;
ts = IPC_TSTATE_STOP;
break;
case T_ACTIVE:
if (cm_full(tl->tp))
ts = IPC_TSTATE_SEED;
else
ts = IPC_TSTATE_LEECH;
case T_SEED:
ts = IPC_TSTATE_SEED;
break;
case T_LEECH:
ts = IPC_TSTATE_LEECH;
break;
}
}


+ 189
- 260
btpd/content.c View File

@@ -12,14 +12,11 @@
#include "btpd.h"
#include "stream.h"

struct rstat {
time_t mtime;
off_t size;
};

struct content {
enum { CM_INACTIVE, CM_STARTING, CM_ACTIVE } state;

int error;

uint32_t npieces_got;

off_t ncontent_bytes;
@@ -40,9 +37,6 @@ struct content {

static const uint8_t m_zerobuf[ZEROBUFLEN];

int stat_and_adjust(struct torrent *tp, struct rstat ret[]);
static int save_resume(struct torrent *tp, struct rstat sbs[]);

static int
fd_cb_rd(const char *path, int *fd, void *arg)
{
@@ -57,23 +51,9 @@ fd_cb_wr(const char *path, int *fd, void *arg)
return vopen(fd, O_RDWR, "%s/%s", tp->tl->dir, path);
}

struct pct_data {
off_t off, remain;
struct torrent *tp;
SHA_CTX sha;
BTPDQ_ENTRY(pct_data) entry;
uint32_t piece;
void (*cb)(struct torrent *, uint32_t, int);
};

BTPDQ_HEAD(pct_tq, pct_data);

static struct pct_tq m_pctq = BTPDQ_HEAD_INITIALIZER(m_pctq);
static void cm_write_done(struct torrent *tp);

struct start_test_data {
struct torrent *tp;
struct rstat *rstat;
struct file_time_size *fts;
uint32_t start;
BTPDQ_ENTRY(start_test_data) entry;
};
@@ -90,127 +70,39 @@ static int
test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece)
{
char piece_hash[SHA_DIGEST_LENGTH];
int fd;
int err;

err = vopen(&fd, O_RDONLY, "torrents/%s/torrent", tp->relpath);
if (err != 0)
btpd_err("failed to open 'torrents/%s/torrent' (%s).\n",
tp->relpath, strerror(err));

lseek(fd, tp->pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET);
read(fd, piece_hash, SHA_DIGEST_LENGTH);
close(fd);

tlib_read_hash(tp->tl, tp->pieces_off, piece, piece_hash);
return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
}

void
pct_create(struct torrent *tp, uint32_t piece,
void (*cb)(struct torrent *, uint32_t, int))
{
struct pct_data *p = btpd_calloc(1, sizeof(*p));
p->piece = piece;
p->tp = tp;
p->off = piece * tp->piece_length;
p->remain = torrent_piece_size(tp, piece);
SHA1_Init(&p->sha);
p->cb = cb;
BTPDQ_INSERT_TAIL(&m_pctq, p, entry);
btpd_ev_add(&m_workev, (& (struct timeval) { 0, 0 }));
}

void
pct_kill(struct pct_data *p)
{
BTPDQ_REMOVE(&m_pctq, p, entry);
free(p);
}

void
pct_run(struct pct_data *p)
{
char buf[READBUFLEN];
size_t unit = (10 << 14);

while (p->remain > 0 && unit > 0) {
size_t wantread = min(p->remain, sizeof(buf));
if (wantread > unit)
wantread = unit;
if ((errno = bts_get(p->tp->cm->rds, p->off, buf, wantread)) != 0)
btpd_err("IO error on '%s' (%s).\n", bts_filename(p->tp->cm->rds),
strerror(errno));
p->remain -= wantread;
unit -= wantread;
p->off += wantread;
SHA1_Update(&p->sha, buf, wantread);
}
if (p->remain == 0) {
uint8_t hash[SHA_DIGEST_LENGTH];
SHA1_Final(hash, &p->sha);
p->cb(p->tp, p->piece, test_hash(p->tp, hash, p->piece) == 0);
pct_kill(p);
}
}

void
pct_cb(struct torrent *tp, uint32_t piece, int ok)
{
struct content *cm = tp->cm;
if (ok) {
assert(cm->npieces_got < tp->npieces);
cm->npieces_got++;
set_bit(cm->piece_field, piece);
if (net_active(tp))
dl_on_ok_piece(tp->net, piece);
if (cm_full(tp))
cm_write_done(tp);
} else {
cm->ncontent_bytes -= torrent_piece_size(tp, piece);
bzero(cm->block_field + piece * cm->bppbf, cm->bppbf);
if (net_active(tp))
dl_on_bad_piece(tp->net, piece);
}
}

void
work_stop(struct torrent *tp)
static int
test_piece(struct torrent *tp, uint32_t piece, int *ok)
{
struct content *cm = tp->cm;
struct pct_data *pct, *next;
if (cm->state == CM_STARTING) {
struct start_test_data *std;
BTPDQ_FOREACH(std, &m_startq, entry)
if (std->tp == tp) {
BTPDQ_REMOVE(&m_startq, std, entry);
free(std->rstat);
free(std);
break;
}
int err;
uint8_t hash[SHA_DIGEST_LENGTH];
if ((err = bts_sha(tp->cm->rds, piece * tp->piece_length,
torrent_piece_size(tp, piece), hash)) != 0) {
btpd_log(BTPD_L_ERROR, "io error on '%s' (%s).\n",
bts_filename(tp->cm->rds), strerror(err));
return err;;
}
BTPDQ_FOREACH_MUTABLE(pct, &m_pctq, entry, next)
if (pct->tp == tp)
pct_kill(pct);
*ok = test_hash(tp, hash, piece) == 0;
return 0;
}

static int test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece);

static void startup_test_run(void);

void
worker_cb(int fd, short type, void *arg)
{
struct pct_data *p = BTPDQ_FIRST(&m_pctq);
if (p == NULL)
return;
pct_run(p);
if (!BTPDQ_EMPTY(&m_pctq))
event_add(&m_workev, (& (struct timeval) { 0, 0 }));
startup_test_run();
}

void
cm_kill(struct torrent *tp)
{
struct content *cm = tp->cm;
bts_close(cm->rds);
free(cm->piece_field);
free(cm->block_field);
free(cm->pos_field);
@@ -218,25 +110,41 @@ cm_kill(struct torrent *tp)
tp->cm = NULL;
}

static int stat_and_adjust(struct torrent *tp, struct file_time_size ret[]);

void
cm_save(struct torrent *tp)
{
struct rstat sbs[tp->nfiles];
if (stat_and_adjust(tp, sbs) == 0)
save_resume(tp, sbs);
struct file_time_size fts[tp->nfiles];
stat_and_adjust(tp, fts);
tlib_save_resume(tp->tl, tp->nfiles, fts,
ceil(tp->npieces / 8.0), tp->cm->piece_field,
tp->cm->bppbf * tp->npieces, tp->cm->block_field);
}

static void
cm_on_error(struct torrent *tp)
{
if (!tp->cm->error) {
tp->cm->error = 1;
cm_stop(tp);
}
}

static void
cm_write_done(struct torrent *tp)
{
int err = 0;
struct content *cm = tp->cm;

if ((errno = bts_close(cm->wrs)) != 0)
btpd_err("error closing write stream for '%s' (%s).\n",
torrent_name(tp), strerror(errno));
if ((err = bts_close(cm->wrs)) != 0)
btpd_log(BTPD_L_ERROR, "error closing write stream for '%s' (%s).\n",
torrent_name(tp), strerror(err));
cm->wrs = NULL;
btpd_ev_del(&cm->save_timer);
cm_save(tp);
if (!err)
cm_save(tp);
cm_on_error(tp);
}

void
@@ -244,10 +152,24 @@ cm_stop(struct torrent *tp)
{
struct content *cm = tp->cm;

if (cm->state == CM_ACTIVE && !cm_full(tp))
cm_write_done(tp);
if (cm->state != CM_STARTING && cm->state != CM_ACTIVE)
return;

if (cm->state == CM_STARTING) {
struct start_test_data *std;
BTPDQ_FOREACH(std, &m_startq, entry)
if (std->tp == tp) {
BTPDQ_REMOVE(&m_startq, std, entry);
free(std->fts);
free(std);
break;
}
}

work_stop(tp);
if (cm->rds != NULL)
bts_close(cm->rds);
if (cm->wrs != NULL)
cm_write_done(tp);

cm->state = CM_INACTIVE;
}
@@ -259,6 +181,12 @@ cm_active(struct torrent *tp)
return cm->state != CM_INACTIVE;
}

int
cm_error(struct torrent *tp)
{
return tp->cm->error;
}

int
cm_started(struct torrent *tp)
{
@@ -295,13 +223,18 @@ int
cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len,
uint8_t **buf)
{
if (tp->cm->error)
return EIO;

*buf = btpd_malloc(len);
int err =
bts_get(tp->cm->rds, piece * tp->piece_length + begin, *buf, len);
if (err != 0)
btpd_err("IO error on '%s' (%s).\n", bts_filename(tp->cm->rds),
strerror(err));
return 0;
if (err != 0) {
btpd_log(BTPD_L_ERROR, "io error on '%s' (%s).\n",
bts_filename(tp->cm->rds), strerror(err));
cm_on_error(tp);
}
return err;
}

void
@@ -316,7 +249,24 @@ cm_prealloc(struct torrent *tp, uint32_t piece)
void
cm_test_piece(struct torrent *tp, uint32_t piece)
{
pct_create(tp, piece, pct_cb);
int ok;
struct content *cm = tp->cm;
if ((errno = test_piece(tp, piece, &ok)) != 0)
cm_on_error(tp);
else if (ok) {
assert(cm->npieces_got < tp->npieces);
cm->npieces_got++;
set_bit(cm->piece_field, piece);
if (net_active(tp))
dl_on_ok_piece(tp->net,piece);
if (cm_full(tp))
cm_write_done(tp);
} else {
cm->ncontent_bytes -= torrent_piece_size(tp,piece);
bzero(cm->block_field + piece * cm->bppbf, cm->bppbf);
if (net_active(tp))
dl_on_bad_piece(tp->net, piece);
}
}

int
@@ -326,6 +276,13 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
int err;
struct content *cm = tp->cm;

if (cm->error)
return EIO;

uint8_t *bf = cm->block_field + piece * cm->bppbf;
assert(!has_bit(bf, begin / PIECE_BLOCKLEN));
assert(!has_bit(cm->piece_field, piece));

if (!has_bit(cm->pos_field, piece)) {
unsigned npieces = ceil((double)cm_alloc_size / tp->piece_length);
uint32_t start = piece - piece % npieces;
@@ -333,13 +290,17 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,

while (start < end) {
if (!has_bit(cm->pos_field, start)) {
assert(!has_bit(cm->piece_field, start));
off_t len = torrent_piece_size(tp, start);
off_t off = tp->piece_length * start;
while (len > 0) {
size_t wlen = min(ZEROBUFLEN, len);
if ((err = bts_put(cm->wrs, off, m_zerobuf, wlen)) != 0)
btpd_err("IO error on '%s' (%s).\n",
if ((err = bts_put(cm->wrs, off, m_zerobuf, wlen)) != 0) {
btpd_log(BTPD_L_ERROR, "io error on '%s' (%s).\n",
bts_filename(cm->wrs), strerror(errno));
cm_on_error(tp);
return err;
}

len -= wlen;
off += wlen;
@@ -350,12 +311,14 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
}
}
err = bts_put(cm->wrs, piece * tp->piece_length + begin, buf, len);
if (err != 0)
btpd_err("IO error on '%s' (%s)\n", bts_filename(cm->wrs),
strerror(err));
if (err != 0) {
btpd_log(BTPD_L_ERROR, "io error on '%s' (%s)\n",
bts_filename(cm->wrs), strerror(err));
cm_on_error(tp);
return err;
}

cm->ncontent_bytes += len;
uint8_t *bf = cm->block_field + piece * cm->bppbf;
set_bit(bf, begin / PIECE_BLOCKLEN);

return 0;
@@ -398,7 +361,7 @@ cm_has_piece(struct torrent *tp, uint32_t piece)
}

int
stat_and_adjust(struct torrent *tp, struct rstat ret[])
stat_and_adjust(struct torrent *tp, struct file_time_size ret[])
{
int fd;
char path[PATH_MAX];
@@ -426,63 +389,8 @@ again:
return 0;
}

static int
load_resume(struct torrent *tp, struct rstat sbs[])
{
int err, ver;
FILE *fp;
size_t pfsiz = ceil(tp->npieces / 8.0);
size_t bfsiz = tp->npieces * tp->cm->bppbf;

if ((err = vfopen(&fp, "r" , "torrents/%s/resume", tp->relpath)) != 0)
return err;

if (fscanf(fp, "%d\n", &ver) != 1)
goto invalid;
if (ver != 1)
goto invalid;
for (int i = 0; i < tp->nfiles; i++) {
quad_t size;
long time;
if (fscanf(fp, "%qd %ld\n", &size, &time) != 2)
goto invalid;
if (sbs[i].size != size || sbs[i].mtime != time)
err = EINVAL;
}
if (fread(tp->cm->piece_field, 1, pfsiz, fp) != pfsiz)
goto invalid;
if (fread(tp->cm->block_field, 1, bfsiz, fp) != bfsiz)
goto invalid;
fclose(fp);
return err;
invalid:
fclose(fp);
bzero(tp->cm->piece_field, pfsiz);
bzero(tp->cm->block_field, bfsiz);
return EINVAL;
}

static int
save_resume(struct torrent *tp, struct rstat sbs[])
{
int err;
FILE *fp;
if ((err = vfopen(&fp, "wb", "torrents/%s/resume", tp->relpath)) != 0)
return err;
fprintf(fp, "%d\n", 1);
for (int i = 0; i < tp->nfiles; i++)
fprintf(fp, "%lld %ld\n", (long long)sbs[i].size, (long)sbs[i].mtime);
fwrite(tp->cm->piece_field, 1, ceil(tp->npieces / 8.0), fp);
fwrite(tp->cm->block_field, 1, tp->npieces * tp->cm->bppbf, fp);
if (fclose(fp) != 0)
err = errno;
return err;
}

void start_test_cb(struct torrent *tp, uint32_t piece, int ok);

void
start_test_end(struct torrent *tp, int unclean)
startup_test_end(struct torrent *tp, int unclean)
{
struct content *cm = tp->cm;

@@ -510,48 +418,62 @@ start_test_end(struct torrent *tp, int unclean)
} else if (nblocks_got > 0)
set_bit(cm->pos_field, piece);
}
if (unclean) {
struct start_test_data *std = BTPDQ_FIRST(&m_startq);
BTPDQ_REMOVE(&m_startq, std, entry);
tlib_save_resume(tp->tl, tp->nfiles, std->fts,
ceil(tp->npieces / 8.0), cm->piece_field, cm->bppbf * 8,
cm->block_field);
free(std->fts);
free(std);
}
if (!cm_full(tp)) {
int err;
if ((err = bts_open(&cm->wrs, tp->nfiles, tp->files,
fd_cb_wr, tp)) != 0)
btpd_err("failed to open write stream for '%s' (%s).\n",
fd_cb_wr, tp)) != 0) {
btpd_log(BTPD_L_ERROR,
"failed to open write stream for '%s' (%s).\n",
torrent_name(tp), strerror(err));
cm_on_error(tp);
return;
}
btpd_ev_add(&cm->save_timer, SAVE_INTERVAL);
}
if (unclean) {
struct start_test_data *std = BTPDQ_FIRST(&m_startq);

assert(std->tp == tp);
BTPDQ_REMOVE(&m_startq, std, entry);
save_resume(tp, std->rstat);
free(std->rstat);
free(std);

if ((std = BTPDQ_FIRST(&m_startq)) != NULL)
pct_create(std->tp, std->start, start_test_cb);
}
cm->state = CM_ACTIVE;
}

void
start_test_cb(struct torrent *tp, uint32_t piece, int ok)
startup_test_run(void)
{
struct content *cm = tp->cm;
int ok;
struct torrent *tp;
struct content *cm;
struct start_test_data * std = BTPDQ_FIRST(&m_startq);
uint32_t this;
if (std == NULL)
return;
tp = std->tp;
cm = tp->cm;
if (test_piece(std->tp, std->start, &ok) != 0) {
cm_on_error(std->tp);
return;
}
if (ok)
set_bit(cm->piece_field, piece);
set_bit(cm->piece_field, std->start);
else
clear_bit(cm->piece_field, piece);
piece++;
while (piece < tp->npieces && !has_bit(cm->pos_field, piece))
piece++;
if (piece < tp->npieces)
pct_create(tp, piece, start_test_cb);
else
start_test_end(tp, 1);
clear_bit(cm->piece_field, std->start);
this = std->start;
do
std->start++;
while (std->start < tp->npieces && !has_bit(cm->pos_field, std->start));
if (std->start >= tp->npieces)
startup_test_end(tp, 1);
if (!BTPDQ_EMPTY(&m_startq))
event_add(&m_workev, (& (struct timeval) { 0, 0 }));
}

void
start_test(struct torrent *tp, struct rstat *sbs)
startup_test_begin(struct torrent *tp, struct file_time_size *fts)
{
uint32_t piece = 0;
struct content *cm = tp->cm;
@@ -561,45 +483,63 @@ start_test(struct torrent *tp, struct rstat *sbs)
struct start_test_data *std = btpd_calloc(1, sizeof(*std));
std->tp = tp;
std->start = piece;
std->rstat = sbs;
std->fts = fts;
BTPDQ_INSERT_TAIL(&m_startq, std, entry);
if (std == BTPDQ_FIRST(&m_startq))
pct_create(tp, piece, start_test_cb);
event_add(&m_workev, (& (struct timeval) { 0, 0 }));
} else {
free(sbs);
start_test_end(tp, 0);
free(fts);
startup_test_end(tp, 0);
}
}

void
cm_start(struct torrent *tp)
cm_start(struct torrent *tp, int force_test)
{
int err, resume_clean = 0;
struct rstat *sbs;
int err, run_test = force_test;
struct file_time_size *fts;
struct content *cm = tp->cm;

if ((errno = bts_open(&cm->rds, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0)
btpd_err("failed to open stream for '%s' (%s).\n",
torrent_name(tp), strerror(errno));

cm->state = CM_STARTING;

sbs = btpd_calloc(tp->nfiles, sizeof(*sbs));
if ((errno =
bts_open(&cm->rds, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0) {
btpd_log(BTPD_L_ERROR, "failed to open stream for '%s' (%s).\n",
torrent_name(tp), strerror(errno));
cm_on_error(tp);
return;
}

if ((err = stat_and_adjust(tp, sbs)) != 0)
btpd_err("failed stat_and_adjust for '%s' (%s).\n",
fts = btpd_calloc(tp->nfiles * 2, sizeof(*fts));

if ((err = stat_and_adjust(tp, fts)) != 0) {
btpd_log(BTPD_L_ERROR, "failed stat_and_adjust for '%s' (%s).\n",
torrent_name(tp), strerror(err));
free(fts);
cm_on_error(tp);
return;
}

resume_clean = load_resume(tp, sbs) == 0;
if (!resume_clean) {
if (tlib_load_resume(tp->tl, tp->nfiles, fts + tp->nfiles,
ceil(tp->npieces / 8.0), cm->piece_field,
cm->bppbf * tp->npieces, cm->block_field) != 0)
run_test = 1;
for (int i = 0; i < tp->nfiles; i++) {
if ((fts[i].mtime != fts[i + tp->nfiles].mtime ||
fts[i].size != fts[i + tp->nfiles].size)) {
run_test = 1;
break;
}
}
if (run_test) {
memset(cm->pos_field, 0xff, ceil(tp->npieces / 8.0));
off_t off = 0;
for (int i = 0; i < tp->nfiles; i++) {
if (sbs[i].size != tp->files[i].length) {
if (fts[i].size != tp->files[i].length) {
uint32_t start, end;
end = (off + tp->files[i].length - 1)
/ tp->piece_length;
start = (off + sbs[i].size) / tp->piece_length;
start = (off + fts[i].size) / tp->piece_length;
while (start <= end) {
clear_bit(cm->pos_field, start);
clear_bit(cm->piece_field, start);
@@ -610,19 +550,8 @@ cm_start(struct torrent *tp)
off += tp->files[i].length;
}
}
for (uint32_t piece = 0; piece < tp->npieces; piece++) {
if (has_bit(cm->piece_field, piece))
continue;
uint8_t *bf = cm->block_field + cm->bppbf * piece;
uint32_t nblocks = torrent_piece_blocks(tp, piece);
uint32_t block = 0;
while (block < nblocks && has_bit(bf, block))
block++;
if (block == nblocks)
set_bit(cm->pos_field, piece);
}

start_test(tp, sbs);
startup_test_begin(tp, fts);
}

void


+ 2
- 1
btpd/content.h View File

@@ -6,10 +6,11 @@ void cm_init(void);
void cm_create(struct torrent *tp, const char *mi);
void cm_kill(struct torrent *tp);

void cm_start(struct torrent *tp);
void cm_start(struct torrent *tp, int force_test);
void cm_stop(struct torrent * tp);

int cm_active(struct torrent *tp);
int cm_error(struct torrent *tp);
int cm_started(struct torrent *tp);
int cm_full(struct torrent *tp);



+ 1
- 12
btpd/download.c View File

@@ -79,7 +79,7 @@ dl_on_choke(struct peer *p)
void
dl_on_ok_piece(struct net *n, uint32_t piece)
{
struct peer *p, *next;
struct peer *p;
struct piece *pc = dl_find_piece(n, piece);

btpd_log(BTPD_L_POL, "Got piece: %u.\n", pc->index);
@@ -98,17 +98,6 @@ dl_on_ok_piece(struct net *n, uint32_t piece)

assert(pc->nreqs == 0);
piece_free(pc);

if (cm_full(n->tp)) {
btpd_log(BTPD_L_BTPD, "Finished downloading '%s'.\n",
torrent_name(n->tp));
tr_complete(n->tp);
BTPDQ_FOREACH_MUTABLE(p, &n->peers, p_entry, next) {
assert(p->nwant == 0);
if (peer_full(p))
peer_kill(p);
}
}
}

/*


+ 75
- 0
btpd/tlib.c View File

@@ -2,6 +2,7 @@
#include <sys/stat.h>

#include <dirent.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>

@@ -297,3 +298,77 @@ tlib_init(void)
}
closedir(dirp);
}

void
tlib_read_hash(struct tlib *tl, size_t off, uint32_t piece, uint8_t *hash)
{
int fd;
ssize_t nread;
char relpath[RELPATH_SIZE];
bin2hex(tl->hash, relpath, 20);

if ((errno = vopen(&fd, O_RDONLY, "torrents/%s/torrent", relpath)) != 0)
btpd_err("failed to open 'torrents/%s/torrent' (%s).\n",
relpath, strerror(errno));
lseek(fd, off + piece * 20, SEEK_SET);
if ((nread = read(fd, hash, 20)) != 20) {
if (nread == -1)
btpd_err("failed to read 'torrents/%s/torrent' (%s).\n", relpath,
strerror(errno));
else
btpd_err("corrupt file: 'torrents/%s/torrent'.\n", relpath);
}

close(fd);
}

int
tlib_load_resume(struct tlib *tl, unsigned nfiles, struct file_time_size *fts,
size_t pfsize, uint8_t *pc_field, size_t bfsize, uint8_t *blk_field)
{
int err, ver;
FILE *fp;

if ((err = vfopen(&fp, "r" , "torrents/%s/resume", tl->tp->relpath)) != 0)
return err;

if (fscanf(fp, "%d\n", &ver) != 1)
goto invalid;
if (ver != 1)
goto invalid;
for (int i = 0; i < nfiles; i++) {
quad_t size;
long time;
if (fscanf(fp, "%qd %ld\n", &size, &time) != 2)
goto invalid;
fts[i].size = size;
fts[i].mtime = time;
}
if (fread(pc_field, 1, pfsize, fp) != pfsize)
goto invalid;
if (fread(blk_field, 1, bfsize, fp) != bfsize)
goto invalid;
fclose(fp);
return 0;
invalid:
fclose(fp);
bzero(pc_field, pfsize);
bzero(blk_field, bfsize);
return EINVAL;
}

void
tlib_save_resume(struct tlib *tl, unsigned nfiles, struct file_time_size *fts,
size_t pfsize, uint8_t *pc_field, size_t bfsize, uint8_t *blk_field)
{
int err;
FILE *fp;
if ((err = vfopen(&fp, "wb", "torrents/%s/resume", tl->tp->relpath)) != 0)
return;
fprintf(fp, "%d\n", 1);
for (int i = 0; i < nfiles; i++)
fprintf(fp, "%lld %ld\n", (long long)fts[i].size, (long)fts[i].mtime);
fwrite(pc_field, 1, pfsize, fp);
fwrite(blk_field, 1, bfsize, fp);
if (fclose(fp) != 0); //XXX
}

+ 16
- 0
btpd/tlib.h View File

@@ -16,6 +16,11 @@ struct tlib {
HTBL_ENTRY(hchain);
};

struct file_time_size {
off_t size;
time_t mtime;
};

void tlib_init(void);
void tlib_put_all(struct tlib **v);

@@ -29,4 +34,15 @@ struct tlib *tlib_by_hash(const uint8_t *hash);
struct tlib *tlib_by_num(unsigned num);
unsigned tlib_count(void);

void tlib_read_hash(struct tlib *tl, size_t off, uint32_t piece,
uint8_t *hash);

int tlib_load_resume(struct tlib *tl, unsigned nfiles,
struct file_time_size *fts, size_t pfsize, uint8_t *pc_field,
size_t bfsize, uint8_t *blk_field);

void tlib_save_resume(struct tlib *tl, unsigned nfiles,
struct file_time_size *fts, size_t pfsize, uint8_t *pc_field,
size_t bfsize, uint8_t *blk_field);

#endif

+ 24
- 4
btpd/torrent.c View File

@@ -140,12 +140,12 @@ torrent_start(struct tlib *tl)

btpd_log(BTPD_L_BTPD, "Starting torrent '%s'.\n", torrent_name(tp));
if (tr_create(tp, mi) == 0) {
tl->tp = tp;
net_create(tp);
cm_create(tp, mi);
BTPDQ_INSERT_TAIL(&m_torrents, tp, entry);
m_ntorrents++;
cm_start(tp);
tl->tp = tp;
cm_start(tp, 0);
free(mi);
return IPC_OK;
} else {
@@ -180,7 +180,8 @@ void
torrent_stop(struct torrent *tp)
{
switch (tp->state) {
case T_ACTIVE:
case T_LEECH:
case T_SEED:
case T_STARTING:
tp->state = T_STOPPING;
if (net_active(tp))
@@ -200,14 +201,33 @@ torrent_stop(struct torrent *tp)
void
torrent_on_tick(struct torrent *tp)
{
if (tp->state != T_STOPPING && cm_error(tp))
torrent_stop(tp);
switch (tp->state) {
case T_STARTING:
if (cm_started(tp)) {
tp->state = T_ACTIVE;
if (cm_full(tp))
tp->state = T_SEED;
else
tp->state = T_LEECH;
net_start(tp);
tr_start(tp);
}
break;
case T_LEECH:
if (cm_full(tp)) {
struct peer *p, *next;
tp->state = T_SEED;
btpd_log(BTPD_L_BTPD, "Finished downloading '%s'.\n",
torrent_name(tp));
tr_complete(tp);
BTPDQ_FOREACH_MUTABLE(p, &tp->net->peers, p_entry, next) {
assert(p->nwant == 0);
if (peer_full(p))
peer_kill(p);
}
}
break;
case T_STOPPING:
if (!(cm_active(tp) || tr_active(tp)))
torrent_kill(tp);


+ 2
- 1
btpd/torrent.h View File

@@ -6,7 +6,8 @@

enum torrent_state {
T_STARTING,
T_ACTIVE,
T_LEECH,
T_SEED,
T_STOPPING
};



Loading…
Cancel
Save