Pārlūkot izejas kodu

o Rename net_(write|read)32 to (enc|dec)_be32. Add similar functions for 64

bits as well. Implement them in libmisc instead of in btpd.
o Change resume file format and related APIs. The resume files are now memory
  mapped.
master
Richard Nyberg pirms 18 gadiem
vecāks
revīzija
9bd0fc6cdc
9 mainītis faili ar 230 papildinājumiem un 174 dzēšanām
  1. +21
    -38
      btpd/content.c
  2. +10
    -28
      btpd/net.c
  3. +0
    -3
      btpd/net.h
  4. +22
    -22
      btpd/net_buf.c
  5. +118
    -43
      btpd/tlib.c
  6. +11
    -6
      btpd/tlib.h
  7. +1
    -34
      btpd/torrent.c
  8. +42
    -0
      misc/subr.c
  9. +5
    -0
      misc/subr.h

+ 21
- 38
btpd/content.c Parādīt failu

@@ -30,7 +30,7 @@ struct content {
struct bt_stream *rds;
struct bt_stream *wrs;

struct event save_timer;
struct resume_data *resd;
};

#define ZEROBUFLEN (1 << 14)
@@ -103,8 +103,7 @@ void
cm_kill(struct torrent *tp)
{
struct content *cm = tp->cm;
free(cm->piece_field);
free(cm->block_field);
tlib_close_resume(cm->resd);
free(cm->pos_field);
free(cm);
tp->cm = NULL;
@@ -117,9 +116,8 @@ cm_save(struct torrent *tp)
{
struct file_time_size fts[tp->nfiles];
stat_and_adjust(tp, fts);
tlib_save_resume(tp->tl, tp->nfiles, fts,
ceil(tp->npieces / 8.0), tp->cm->piece_field,
tp->cm->bppbf * tp->npieces, tp->cm->block_field);
for (int i = 0; i < tp->nfiles; i++)
resume_set_fts(tp->cm->resd, i, fts + i);
}

static void
@@ -134,18 +132,18 @@ cm_on_error(struct torrent *tp)
static void
cm_write_done(struct torrent *tp)
{
int err = 0;
int err;
struct content *cm = tp->cm;

if ((err = bts_close(cm->wrs)) != 0)
err = bts_close(cm->wrs);
cm->wrs = NULL;
if (err && !cm->error) {
btpd_log(BTPD_L_ERROR, "error closing write stream for '%s' (%s).\n",
torrent_name(tp), strerror(err));
cm->wrs = NULL;
btpd_ev_del(&cm->save_timer);
if (!err)
cm_save(tp);
else
cm_on_error(tp);
}
if (!cm->error)
cm_save(tp);
}

void
@@ -195,27 +193,17 @@ cm_started(struct torrent *tp)
return cm->state == CM_ACTIVE;
}

#define SAVE_INTERVAL (& (struct timeval) { 15, 0 })

static void
save_timer_cb(int fd, short type, void *arg)
{
struct torrent *tp = arg;
btpd_ev_add(&tp->cm->save_timer, SAVE_INTERVAL);
cm_save(tp);
}

void
cm_create(struct torrent *tp, const char *mi)
{
size_t pfield_size = ceil(tp->npieces / 8.0);
struct content *cm = btpd_calloc(1, sizeof(*cm));
cm->bppbf = ceil((double)tp->piece_length / (1 << 17));
cm->piece_field = btpd_calloc(pfield_size, 1);
cm->pos_field = btpd_calloc(pfield_size, 1);
cm->block_field = btpd_calloc(tp->npieces * cm->bppbf, 1);

evtimer_set(&cm->save_timer, save_timer_cb, tp);
cm->resd = tlib_open_resume(tp->tl, tp->nfiles, pfield_size,
cm->bppbf * tp->npieces);
cm->piece_field = resume_piece_field(cm->resd);
cm->block_field = resume_block_field(cm->resd);

tp->cm = cm;
}
@@ -431,9 +419,8 @@ startup_test_end(struct torrent *tp, int unclean)
if (unclean) {
struct start_test_data *std = BTPDQ_FIRST(&m_startq);
BTPDQ_REMOVE(&m_startq, std, entry);
tlib_save_resume(tp->tl, tp->nfiles, std->fts,
ceil(tp->npieces / 8.0), cm->piece_field, cm->bppbf * 8,
cm->block_field);
for (int i = 0; i < tp->nfiles; i++)
resume_set_fts(cm->resd, i, std->fts + i);
free(std->fts);
free(std);
}
@@ -447,7 +434,6 @@ startup_test_end(struct torrent *tp, int unclean)
cm_on_error(tp);
return;
}
btpd_ev_add(&cm->save_timer, SAVE_INTERVAL);
}
cm->state = CM_ACTIVE;
}
@@ -520,7 +506,7 @@ cm_start(struct torrent *tp, int force_test)
return;
}

fts = btpd_calloc(tp->nfiles * 2, sizeof(*fts));
fts = btpd_calloc(tp->nfiles, sizeof(*fts));

if ((err = stat_and_adjust(tp, fts)) != 0) {
free(fts);
@@ -528,13 +514,10 @@ cm_start(struct torrent *tp, int force_test)
return;
}

if (tlib_load_resume(tp->tl, tp->nfiles, fts + tp->nfiles,
ceil(tp->npieces / 8.0), cm->piece_field,
cm->bppbf * tp->npieces, cm->block_field) != 0)
run_test = 1;
for (int i = 0; i < tp->nfiles; i++) {
if ((fts[i].mtime != fts[i + tp->nfiles].mtime ||
fts[i].size != fts[i + tp->nfiles].size)) {
struct file_time_size rfts;
resume_get_fts(cm->resd, i, &rfts);
if ((fts[i].mtime != rfts.mtime || fts[i].size != rfts.size)) {
run_test = 1;
break;
}


+ 10
- 28
btpd/net.c Parādīt failu

@@ -116,24 +116,6 @@ net_active(struct torrent *tp)
return tp->net->active;
}

void
net_write32(void *buf, uint32_t num)
{
uint8_t *p = buf;
*p = (num >> 24) & 0xff;
*(p + 1) = (num >> 16) & 0xff;
*(p + 2) = (num >> 8) & 0xff;
*(p + 3) = num & 0xff;
}

uint32_t
net_read32(const void *buf)
{
const uint8_t *p = buf;
return (uint32_t)*p << 24 | (uint32_t)*(p + 1) << 16
| (uint16_t)*(p + 2) << 8 | *(p + 3);
}

#define BLOCK_MEM_COUNT 4

static unsigned long
@@ -256,7 +238,7 @@ net_dispatch_msg(struct peer *p, const char *buf)
peer_on_uninterest(p);
break;
case MSG_HAVE:
peer_on_have(p, net_read32(buf));
peer_on_have(p, dec_be32(buf));
break;
case MSG_BITFIELD:
if (p->npieces == 0)
@@ -266,9 +248,9 @@ net_dispatch_msg(struct peer *p, const char *buf)
break;
case MSG_REQUEST:
if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) {
index = net_read32(buf);
begin = net_read32(buf + 4);
length = net_read32(buf + 8);
index = dec_be32(buf);
begin = dec_be32(buf + 4);
length = dec_be32(buf + 8);
if ((length > PIECE_BLOCKLEN
|| index >= p->n->tp->npieces
|| !cm_has_piece(p->n->tp, index)
@@ -282,9 +264,9 @@ net_dispatch_msg(struct peer *p, const char *buf)
}
break;
case MSG_CANCEL:
index = net_read32(buf);
begin = net_read32(buf + 4);
length = net_read32(buf + 8);
index = dec_be32(buf);
begin = dec_be32(buf + 4);
length = dec_be32(buf + 8);
peer_on_cancel(p, index, begin, length);
break;
case MSG_PIECE:
@@ -359,7 +341,7 @@ net_state(struct peer *p, const char *buf)
peer_set_in_state(p, BTP_MSGSIZE, 4);
break;
case BTP_MSGSIZE:
p->in.msg_len = net_read32(buf);
p->in.msg_len = dec_be32(buf);
if (p->in.msg_len == 0)
peer_on_keepalive(p);
else
@@ -379,8 +361,8 @@ net_state(struct peer *p, const char *buf)
peer_set_in_state(p, BTP_MSGBODY, p->in.msg_len - 1);
break;
case BTP_PIECEMETA:
p->in.pc_index = net_read32(buf);
p->in.pc_begin = net_read32(buf + 4);
p->in.pc_index = dec_be32(buf);
p->in.pc_begin = dec_be32(buf + 4);
peer_set_in_state(p, BTP_MSGBODY, p->in.msg_len - 9);
break;
case BTP_MSGBODY:


+ 0
- 3
btpd/net.h Parādīt failu

@@ -37,7 +37,4 @@ void net_write_cb(int sd, short type, void *arg);
int net_connect2(struct sockaddr *sa, socklen_t salen, int *sd);
int net_connect(const char *ip, int port, int *sd);

void net_write32(void *buf, uint32_t num);
uint32_t net_read32(const void *buf);

#endif

+ 22
- 22
btpd/net_buf.c Parādīt failu

@@ -53,7 +53,7 @@ static struct net_buf *
nb_create_onesized(char mtype, int btype)
{
struct net_buf *out = nb_create_alloc(btype, 5);
net_write32(out->buf, 1);
enc_be32(out->buf, 1);
out->buf[4] = mtype;
return out;
}
@@ -71,7 +71,7 @@ nb_create_keepalive(void)
{
if (m_keepalive == NULL) {
m_keepalive = nb_create_alloc(NB_KEEPALIVE, 4);
net_write32(m_keepalive->buf, 0);
enc_be32(m_keepalive->buf, 0);
nb_singleton(m_keepalive);
}
return m_keepalive;
@@ -82,10 +82,10 @@ nb_create_piece(uint32_t index, uint32_t begin, size_t blen)
{
struct net_buf *out;
out = nb_create_alloc(NB_PIECE, 13);
net_write32(out->buf, 9 + blen);
enc_be32(out->buf, 9 + blen);
out->buf[4] = MSG_PIECE;
net_write32(out->buf + 5, index);
net_write32(out->buf + 9, begin);
enc_be32(out->buf + 5, index);
enc_be32(out->buf + 9, begin);
return out;
}

@@ -116,11 +116,11 @@ struct net_buf *
nb_create_request(uint32_t index, uint32_t begin, uint32_t length)
{
struct net_buf *out = nb_create_alloc(NB_REQUEST, 17);
net_write32(out->buf, 13);
enc_be32(out->buf, 13);
out->buf[4] = MSG_REQUEST;
net_write32(out->buf + 5, index);
net_write32(out->buf + 9, begin);
net_write32(out->buf + 13, length);
enc_be32(out->buf + 5, index);
enc_be32(out->buf + 9, begin);
enc_be32(out->buf + 13, length);
return out;
}

@@ -128,11 +128,11 @@ struct net_buf *
nb_create_cancel(uint32_t index, uint32_t begin, uint32_t length)
{
struct net_buf *out = nb_create_alloc(NB_CANCEL, 17);
net_write32(out->buf, 13);
enc_be32(out->buf, 13);
out->buf[4] = MSG_CANCEL;
net_write32(out->buf + 5, index);
net_write32(out->buf + 9, begin);
net_write32(out->buf + 13, length);
enc_be32(out->buf + 5, index);
enc_be32(out->buf + 9, begin);
enc_be32(out->buf + 13, length);
return out;
}

@@ -140,9 +140,9 @@ struct net_buf *
nb_create_have(uint32_t index)
{
struct net_buf *out = nb_create_alloc(NB_HAVE, 9);
net_write32(out->buf, 5);
enc_be32(out->buf, 5);
out->buf[4] = MSG_HAVE;
net_write32(out->buf + 5, index);
enc_be32(out->buf + 5, index);
return out;
}

@@ -153,9 +153,9 @@ nb_create_multihave(struct torrent *tp)
struct net_buf *out = nb_create_alloc(NB_MULTIHAVE, 9 * have_npieces);
for (uint32_t i = 0, count = 0; count < have_npieces; i++) {
if (cm_has_piece(tp, i)) {
net_write32(out->buf + count * 9, 5);
enc_be32(out->buf + count * 9, 5);
out->buf[count * 9 + 4] = MSG_HAVE;
net_write32(out->buf + count * 9 + 5, i);
enc_be32(out->buf + count * 9 + 5, i);
count++;
}
}
@@ -202,7 +202,7 @@ nb_create_bitfield(struct torrent *tp)
uint32_t plen = ceil(tp->npieces / 8.0);

struct net_buf *out = nb_create_alloc(NB_BITFIELD, 5);
net_write32(out->buf, plen + 1);
enc_be32(out->buf, plen + 1);
out->buf[4] = MSG_BITFIELD;
return out;
}
@@ -234,7 +234,7 @@ nb_get_index(struct net_buf *nb)
case NB_HAVE:
case NB_PIECE:
case NB_REQUEST:
return net_read32(nb->buf + 5);
return dec_be32(nb->buf + 5);
default:
abort();
}
@@ -247,7 +247,7 @@ nb_get_begin(struct net_buf *nb)
case NB_CANCEL:
case NB_PIECE:
case NB_REQUEST:
return net_read32(nb->buf + 9);
return dec_be32(nb->buf + 9);
default:
abort();
}
@@ -259,9 +259,9 @@ nb_get_length(struct net_buf *nb)
switch (nb->type) {
case NB_CANCEL:
case NB_REQUEST:
return net_read32(nb->buf + 13);
return dec_be32(nb->buf + 13);
case NB_PIECE:
return net_read32(nb->buf) - 9;
return dec_be32(nb->buf) - 9;
default:
abort();
}


+ 118
- 43
btpd/tlib.c Parādīt failu

@@ -1,5 +1,6 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>

#include <dirent.h>
#include <fcntl.h>
@@ -265,7 +266,7 @@ id_test(const void *k1, const void *k2)
static uint32_t
id_hash(const void *k)
{
return net_read32(k + 16);
return dec_be32(k + 16);
}

void
@@ -323,57 +324,131 @@ tlib_read_hash(struct tlib *tl, size_t off, uint32_t piece, uint8_t *hash)
}

int
tlib_load_resume(struct tlib *tl, unsigned nfiles, struct file_time_size *fts,
size_t pfsize, uint8_t *pc_field, size_t bfsize, uint8_t *blk_field)
tlib_load_mi(struct tlib *tl, char **res)
{
int err, ver;
FILE *fp;
char file[PATH_MAX];
char relpath[RELPATH_SIZE];
char *mi;
bin2hex(tl->hash, relpath, 20);

if ((err = vfopen(&fp, "r" , "torrents/%s/resume", relpath)) != 0)
return err;

if (fscanf(fp, "%d\n", &ver) != 1)
goto invalid;
if (ver != 1)
goto invalid;
for (int i = 0; i < nfiles; i++) {
quad_t size;
long time;
if (fscanf(fp, "%qd %ld\n", &size, &time) != 2)
goto invalid;
fts[i].size = size;
fts[i].mtime = time;
snprintf(file, sizeof(file), "torrents/%s/torrent", relpath);
if ((mi = mi_load(file, NULL)) == NULL) {
btpd_log(BTPD_L_ERROR,
"torrent '%s': failed to load metainfo (%s).\n",
tl->name, strerror(errno));
return errno;
}
if (fread(pc_field, 1, pfsize, fp) != pfsize)
goto invalid;
if (fread(blk_field, 1, bfsize, fp) != bfsize)
goto invalid;
fclose(fp);
*res = mi;
return 0;
invalid:
fclose(fp);
bzero(pc_field, pfsize);
bzero(blk_field, bfsize);
return EINVAL;
}

void
tlib_save_resume(struct tlib *tl, unsigned nfiles, struct file_time_size *fts,
size_t pfsize, uint8_t *pc_field, size_t bfsize, uint8_t *blk_field)
struct resume_data {
void *base;
size_t size;
uint8_t *pc_field;
uint8_t *blk_field;
};

static void *
resume_file_size(struct resume_data *resd, int i)
{
int err;
FILE *fp;
return resd->base + 8 + 16 * i;
}

static void *
resume_file_time(struct resume_data *resd, int i)
{
return resd->base + 16 + 16 * i;
}

static void
init_resume(int fd, size_t size)
{
char buf[1024];
uint32_t ver;
bzero(buf, sizeof(buf));
enc_be32(&ver, 2);
if (write(fd, "RESD", 4) == -1 || write(fd, &ver, 4) == -1)
goto fatal;
size -= 8;
while (size > 0) {
ssize_t nw = write(fd, buf, min(sizeof(buf), size));
if (nw < 1)
goto fatal;
size -= nw;
}
return;
fatal:
btpd_err("failed to initialize resume file (%s).\n", strerror(errno));
}

struct resume_data *
tlib_open_resume(struct tlib *tl, unsigned nfiles, size_t pfsize,
size_t bfsize)
{
int fd;
char relpath[RELPATH_SIZE];
struct stat sb;
struct resume_data *resd = btpd_calloc(1, sizeof(*resd));
bin2hex(tl->hash, relpath, 20);

if ((err = vfopen(&fp, "wb", "torrents/%s/resume", relpath)) != 0)
return;
fprintf(fp, "%d\n", 1);
for (int i = 0; i < nfiles; i++)
fprintf(fp, "%lld %ld\n", (long long)fts[i].size, (long)fts[i].mtime);
fwrite(pc_field, 1, pfsize, fp);
fwrite(blk_field, 1, bfsize, fp);
if (fclose(fp) != 0); //XXX
resd->size = 8 + nfiles * 16 + pfsize + bfsize;

if ((errno =
vopen(&fd, O_RDWR|O_CREAT, "torrents/%s/resume", relpath)) != 0)
goto fatal;
if (fstat(fd, &sb) != 0)
goto fatal;
if (sb.st_size != resd->size) {
if (sb.st_size != 0 && ftruncate(fd, 0) != 0)
goto fatal;
init_resume(fd, resd->size);
}
resd->base =
mmap(NULL, resd->size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
if (resd->base == MAP_FAILED)
goto fatal;
if (bcmp(resd->base, "RESD", 4) != 0 || dec_be32(resd->base + 4) != 2)
init_resume(fd, resd->size);
close(fd);

resd->pc_field = resd->base + 8 + nfiles * 16;
resd->blk_field = resd->pc_field + pfsize;

return resd;
fatal:
btpd_err("file operation failed on 'torrents/%s/resume' (%s).\n",
relpath, strerror(errno));
}

uint8_t *
resume_piece_field(struct resume_data *resd)
{
return resd->pc_field;
}

uint8_t *
resume_block_field(struct resume_data *resd)
{
return resd->blk_field;
}

void
resume_set_fts(struct resume_data *resd, int i, struct file_time_size *fts)
{
enc_be64(resume_file_size(resd, i), (uint64_t)fts->size);
enc_be64(resume_file_time(resd, i), (uint64_t)fts->mtime);
}

void
resume_get_fts(struct resume_data *resd, int i, struct file_time_size *fts)
{
fts->size = dec_be64(resume_file_size(resd, i));
fts->mtime = dec_be64(resume_file_time(resd, i));
}

void
tlib_close_resume(struct resume_data *resd)
{
munmap(resd->base, resd->size);
free(resd);
}

+ 11
- 6
btpd/tlib.h Parādīt failu

@@ -34,15 +34,20 @@ struct tlib *tlib_by_hash(const uint8_t *hash);
struct tlib *tlib_by_num(unsigned num);
unsigned tlib_count(void);

int tlib_load_mi(struct tlib *tl, char **res);

void tlib_read_hash(struct tlib *tl, size_t off, uint32_t piece,
uint8_t *hash);

int tlib_load_resume(struct tlib *tl, unsigned nfiles,
struct file_time_size *fts, size_t pfsize, uint8_t *pc_field,
size_t bfsize, uint8_t *blk_field);
struct resume_data *tlib_open_resume(struct tlib *tl, unsigned nfiles,
size_t pfsize, size_t bfsize);
void tlib_close_resume(struct resume_data *resume);

void tlib_save_resume(struct tlib *tl, unsigned nfiles,
struct file_time_size *fts, size_t pfsize, uint8_t *pc_field,
size_t bfsize, uint8_t *blk_field);
uint8_t *resume_piece_field(struct resume_data *resd);
uint8_t *resume_block_field(struct resume_data *resd);
void resume_set_fts(struct resume_data *resd, int i,
struct file_time_size *fts);
void resume_get_fts(struct resume_data *resd, int i,
struct file_time_size *fts);

#endif

+ 1
- 34
btpd/torrent.c Parādīt failu

@@ -1,7 +1,4 @@
#include <sys/types.h>
#include <sys/mman.h>
#include <sys/stat.h>

#include <assert.h>
#include <errno.h>
#include <fcntl.h>
@@ -86,44 +83,14 @@ torrent_block_size(struct torrent *tp, uint32_t piece, uint32_t nblocks,
enum ipc_err
torrent_start(struct tlib *tl)
{
struct stat sb;
struct torrent *tp;
char *mi;
char relpath[RELPATH_SIZE];
char file[PATH_MAX];

if (tl->dir == NULL)
return IPC_EBADTENT;

if (stat(tl->dir, &sb) == 0) {
if ((sb.st_mode & S_IFMT) != S_IFDIR) {
btpd_log(BTPD_L_ERROR,
"torrent '%s': content dir '%s' is not a directory\n",
tl->name, tl->dir);
return IPC_EBADCDIR;
}
} else if (errno == ENOENT) {
if (mkdirs(tl->dir, 0777) != 0 && errno != EEXIST) {
btpd_log(BTPD_L_ERROR, "torrent '%s': "
"failed to create content dir '%s' (%s).\n",
tl->name, tl->dir, strerror(errno));
return IPC_ECREATECDIR;
}
} else {
btpd_log(BTPD_L_ERROR,
"torrent '%s': couldn't stat content dir '%s' (%s)\n",
tl->name, tl->dir, strerror(errno));
return IPC_EBADCDIR;
}

bin2hex(tl->hash, relpath, 20);
snprintf(file, PATH_MAX, "torrents/%s/torrent", relpath);
if ((mi = mi_load(file, NULL)) == NULL) {
btpd_log(BTPD_L_ERROR,
"torrent '%s': failed to load metainfo (%s).\n",
tl->name, strerror(errno));
if (tlib_load_mi(tl, &mi) != 0)
return IPC_EBADTENT;
}

tp = btpd_calloc(1, sizeof(*tp));
tp->tl = tl;


+ 42
- 0
misc/subr.c Parādīt failu

@@ -13,6 +13,48 @@
#include <string.h>
#include <unistd.h>

void
enc_be32(void *buf, uint32_t num)
{
uint8_t *p = buf;
*p = (num >> 24) & 0xff;
*(p + 1) = (num >> 16) & 0xff;
*(p + 2) = (num >> 8) & 0xff;
*(p + 3) = num & 0xff;
}

uint32_t
dec_be32(const void *buf)
{
const uint8_t *p = buf;
return (uint32_t)*p << 24 | (uint32_t)*(p + 1) << 16
| (uint16_t)*(p + 2) << 8 | *(p + 3);
}

void
enc_be64(void *buf, uint64_t num)
{
uint8_t *p = buf;
*p = (num >> 56) & 0xff;
*(p + 1) = (num >> 48) & 0xff;
*(p + 2) = (num >> 40) & 0xff;
*(p + 3) = (num >> 32) & 0xff;
*(p + 4) = (num >> 24) & 0xff;
*(p + 5) = (num >> 16) & 0xff;
*(p + 6) = (num >> 8) & 0xff;
*(p + 7) = num & 0xff;
}

uint64_t
dec_be64(const void *buf)
{
const uint8_t *p = buf;
return (uint64_t)*p << 56 | (uint64_t)*(p + 1) << 48
| (uint64_t)*(p + 2) << 40 | (uint64_t)*(p + 3) << 32
| (uint64_t)*(p + 4) << 24 | (uint64_t)*(p + 5) << 16
| (uint64_t)*(p + 6) << 8 | (uint64_t)*(p + 7);
}

void
set_bit(uint8_t *bits, unsigned long index)
{


+ 5
- 0
misc/subr.h Parādīt failu

@@ -9,6 +9,11 @@

#define SHAHEXSIZE 41

uint32_t dec_be32(const void *buf);
uint64_t dec_be64(const void *buf);
void enc_be32(void *buf, uint32_t num);
void enc_be64(void *buf, uint64_t num);

int set_nonblocking(int fd);
int set_blocking(int fd);



Notiek ielāde…
Atcelt
Saglabāt