瀏覽代碼

Rework the outgoing network buffers. The buffers now contain more

information on what data they hold, making it unnecessary to have
other lists tracking that information. Also they now have a reference
count, making it possible to use the same buffer on many peers.

This is only a start though. I've just done enough for btpd to work,
I haven't taken advantage of the reference count yet.
master
Richard Nyberg 20 年之前
父節點
當前提交
9cc1ffda34
共有 4 個檔案被更改,包括 204 行新增142 行删除
  1. +126
    -109
      btpd/net.c
  2. +37
    -6
      btpd/net.h
  3. +38
    -25
      btpd/peer.c
  4. +3
    -2
      btpd/peer.h

+ 126
- 109
btpd/net.c 查看文件

@@ -59,44 +59,65 @@ net_write_cb(int sd, short type, void *arg)
}

static void
nokill_iob(struct io_buffer *iob)
kill_buf_no(char *buf, size_t len)
{
//Nothing
}

static void
kill_free_buf(struct io_buffer *iob)
kill_buf_free(char *buf, size_t len)
{
free(iob->buf);
free(buf);
}

static struct iob_link *
malloc_liob(size_t len)
int
nb_drop(struct net_buf *nb)
{
assert(nb->refs > 0);
nb->refs--;
if (nb->refs == 0) {
nb->kill_buf(nb->buf, nb->len);
free(nb);
return 1;
} else
return 0;
}

void
nb_hold(struct net_buf *nb)
{
struct iob_link *iol;
iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol) + len);
iol->iob.buf = (char *)(iol + 1);
iol->iob.buf_len = len;
iol->iob.buf_off = 0;
iol->kill_buf = nokill_iob;
return iol;
nb->refs++;
}

static struct iob_link *
salloc_liob(char *buf, size_t len, void (*kill_buf)(struct io_buffer *))
struct net_buf *
nb_create_alloc(short type, size_t len)
{
struct iob_link *iol;
iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol));
iol->iob.buf = buf;
iol->iob.buf_len = len;
iol->iob.buf_off = 0;
iol->kill_buf = kill_buf;
return iol;
struct net_buf *nb = btpd_calloc(1, sizeof(*nb) + len);
nb->info.type = type;
nb->buf = (char *)(nb + 1);
nb->len = len;
nb->kill_buf = kill_buf_no;
nb_hold(nb);
return nb;
}

struct net_buf *
nb_create_set(short type, char *buf, size_t len,
void (*kill_buf)(char *, size_t))
{
struct net_buf *nb = btpd_calloc(1, sizeof(*nb));
nb->info.type = type;
nb->buf = buf;
nb->len = len;
nb->kill_buf = kill_buf;
nb_hold(nb);
return nb;
}
#if 0
void
net_unsend_piece(struct peer *p, struct piece_req *req)
{
struct nb_link *nl;
struct iob_link *piece;

BTPDQ_REMOVE(&p->p_reqs, req, entry);
@@ -119,6 +140,7 @@ net_unsend_piece(struct peer *p, struct piece_req *req)
event_del(&p->out_ev);
}
}
#endif

void
kill_shake(struct input_reader *reader)
@@ -131,8 +153,7 @@ kill_shake(struct input_reader *reader)
static unsigned long
net_write(struct peer *p, unsigned long wmax)
{
struct iob_link *iol;
struct piece_req *req;
struct nb_link *nl;
struct iovec iov[NIOV];
int niov;
int limited;
@@ -142,18 +163,22 @@ net_write(struct peer *p, unsigned long wmax)
limited = wmax > 0;

niov = 0;
assert((iol = BTPDQ_FIRST(&p->outq)) != NULL);
while (niov < NIOV && iol != NULL
&& (!limited || (limited && wmax > 0))) {
iov[niov].iov_base = iol->iob.buf + iol->iob.buf_off;
iov[niov].iov_len = iol->iob.buf_len - iol->iob.buf_off;
assert((nl = BTPDQ_FIRST(&p->outq)) != NULL);
while (niov < NIOV && nl != NULL && (!limited || (limited && wmax > 0))) {
if (niov > 0) {
iov[niov].iov_base = nl->nb->buf;
iov[niov].iov_len = nl->nb->len;
} else {
iov[niov].iov_base = nl->nb->buf + p->outq_off;
iov[niov].iov_len = nl->nb->len - p->outq_off;
}
if (limited) {
if (iov[niov].iov_len > wmax)
iov[niov].iov_len = wmax;
wmax -= iov[niov].iov_len;
}
niov++;
iol = BTPDQ_NEXT(iol, entry);
nl = BTPDQ_NEXT(nl, entry);
}

nwritten = writev(p->sd, iov, niov);
@@ -174,32 +199,26 @@ net_write(struct peer *p, unsigned long wmax)

bcount = nwritten;

req = BTPDQ_FIRST(&p->p_reqs);
iol = BTPDQ_FIRST(&p->outq);
nl = BTPDQ_FIRST(&p->outq);
while (bcount > 0) {
unsigned long bufdelta = iol->iob.buf_len - iol->iob.buf_off;
if (req != NULL && req->head == iol) {
struct piece_req *next = BTPDQ_NEXT(req, entry);
BTPDQ_REMOVE(&p->p_reqs, req, entry);
free(req);
req = next;
}
unsigned long bufdelta = nl->nb->len - p->outq_off;
if (bcount >= bufdelta) {
if (iol->upload) {
if (nl->nb->info.type == NB_TORRENTDATA) {
p->tp->uploaded += bufdelta;
p->rate_from_me[btpd.seconds % RATEHISTORY] += bufdelta;
}
bcount -= bufdelta;
BTPDQ_REMOVE(&p->outq, iol, entry);
iol->kill_buf(&iol->iob);
free(iol);
iol = BTPDQ_FIRST(&p->outq);
BTPDQ_REMOVE(&p->outq, nl, entry);
nb_drop(nl->nb);
free(nl);
p->outq_off = 0;
nl = BTPDQ_FIRST(&p->outq);
} else {
if (iol->upload) {
if (nl->nb->info.type == NB_TORRENTDATA) {
p->tp->uploaded += bcount;
p->rate_from_me[btpd.seconds % RATEHISTORY] += bcount;
}
iol->iob.buf_off += bcount;
p->outq_off += bcount;
bcount = 0;
}
}
@@ -214,11 +233,16 @@ net_write(struct peer *p, unsigned long wmax)
}

void
net_send(struct peer *p, struct iob_link *iol)
net_send(struct peer *p, struct net_buf *nb)
{
if (BTPDQ_EMPTY(&p->outq))
struct nb_link *nl = btpd_calloc(1, sizeof(*nl));
nl->nb = nb;

if (BTPDQ_EMPTY(&p->outq)) {
assert(p->outq_off == 0);
event_add(&p->out_ev, WRITE_TIMEOUT);
BTPDQ_INSERT_TAIL(&p->outq, iol, entry);
}
BTPDQ_INSERT_TAIL(&p->outq, nl, entry);
}

void
@@ -237,64 +261,62 @@ void
net_send_piece(struct peer *p, uint32_t index, uint32_t begin,
char *block, size_t blen)
{
struct iob_link *head, *piece;
struct piece_req *req;
struct net_buf *head, *piece;

btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen);

head = malloc_liob(13);
net_write32(head->iob.buf, 9 + blen);
head->iob.buf[4] = MSG_PIECE;
net_write32(head->iob.buf + 5, index);
net_write32(head->iob.buf + 9, begin);
head = nb_create_alloc(NB_PIECE, 13);
net_write32(head->buf, 9 + blen);
head->buf[4] = MSG_PIECE;
net_write32(head->buf + 5, index);
net_write32(head->buf + 9, begin);
net_send(p, head);

piece = salloc_liob(block, blen, kill_free_buf);
piece->upload = 1;
piece = nb_create_set(NB_TORRENTDATA, block, blen, kill_buf_free);
piece->info.index = index;
piece->info.begin = begin;
piece->info.length = blen;
net_send(p, piece);

req = btpd_malloc(sizeof(*req));
req->index = index;
req->begin = begin;
req->length = blen;
req->head = head;
BTPDQ_INSERT_TAIL(&p->p_reqs, req, entry);
}

void
net_send_request(struct peer *p, struct piece_req *req)
{
struct iob_link *out;
out = malloc_liob(17);
net_write32(out->iob.buf, 13);
out->iob.buf[4] = MSG_REQUEST;
net_write32(out->iob.buf + 5, req->index);
net_write32(out->iob.buf + 9, req->begin);
net_write32(out->iob.buf + 13, req->length);
struct net_buf *out = nb_create_alloc(NB_REQUEST, 17);
net_write32(out->buf, 13);
out->buf[4] = MSG_REQUEST;
net_write32(out->buf + 5, req->index);
net_write32(out->buf + 9, req->begin);
net_write32(out->buf + 13, req->length);
out->info.index = req->index;
out->info.begin = req->begin;
out->info.length = req->length;
net_send(p, out);
}

void
net_send_cancel(struct peer *p, struct piece_req *req)
{
struct iob_link *out;
out = malloc_liob(17);
net_write32(out->iob.buf, 13);
out->iob.buf[4] = MSG_CANCEL;
net_write32(out->iob.buf + 5, req->index);
net_write32(out->iob.buf + 9, req->begin);
net_write32(out->iob.buf + 13, req->length);
struct net_buf *out = nb_create_alloc(NB_CANCEL, 17);
net_write32(out->buf, 13);
out->buf[4] = MSG_CANCEL;
net_write32(out->buf + 5, req->index);
net_write32(out->buf + 9, req->begin);
net_write32(out->buf + 13, req->length);
out->info.index = req->index;
out->info.begin = req->begin;
out->info.length = req->length;
net_send(p, out);
}

void
net_send_have(struct peer *p, uint32_t index)
{
struct iob_link *out;
out = malloc_liob(9);
net_write32(out->iob.buf, 5);
out->iob.buf[4] = MSG_HAVE;
net_write32(out->iob.buf + 5, index);
struct net_buf *out = nb_create_alloc(NB_HAVE, 9);
net_write32(out->buf, 5);
out->buf[4] = MSG_HAVE;
net_write32(out->buf + 5, index);
out->info.index = index;
net_send(p, out);
}

@@ -302,13 +324,12 @@ void
net_send_multihave(struct peer *p)
{
struct torrent *tp = p->tp;
struct iob_link *out;
out = malloc_liob(9 * tp->have_npieces);
struct net_buf *out = nb_create_alloc(NB_MULTIHAVE, 9 * tp->have_npieces);
for (uint32_t i = 0, count = 0; count < tp->have_npieces; i++) {
if (has_bit(tp->piece_field, i)) {
net_write32(out->iob.buf + count * 9, 5);
out->iob.buf[count * 9 + 4] = MSG_HAVE;
net_write32(out->iob.buf + count * 9 + 5, i);
net_write32(out->buf + count * 9, 5);
out->buf[count * 9 + 4] = MSG_HAVE;
net_write32(out->buf + count * 9 + 5, i);
count++;
}
}
@@ -316,63 +337,59 @@ net_send_multihave(struct peer *p)
}

void
net_send_onesized(struct peer *p, char type)
net_send_onesized(struct peer *p, char mtype, int btype)
{
struct iob_link *out;
out = malloc_liob(5);
net_write32(out->iob.buf, 1);
out->iob.buf[4] = type;
struct net_buf *out = nb_create_alloc(btype, 5);
net_write32(out->buf, 1);
out->buf[4] = mtype;
net_send(p, out);
}

void
net_send_unchoke(struct peer *p)
{
net_send_onesized(p, MSG_UNCHOKE);
net_send_onesized(p, MSG_UNCHOKE, NB_UNCHOKE);
}

void
net_send_choke(struct peer *p)
{
net_send_onesized(p, MSG_CHOKE);
net_send_onesized(p, MSG_CHOKE, NB_CHOKE);
}

void
net_send_uninterest(struct peer *p)
{
net_send_onesized(p, MSG_UNINTEREST);
net_send_onesized(p, MSG_UNINTEREST, NB_UNINTEREST);
}

void
net_send_interest(struct peer *p)
{
net_send_onesized(p, MSG_INTEREST);
net_send_onesized(p, MSG_INTEREST, NB_INTEREST);
}

void
net_send_bitfield(struct peer *p)
{
struct iob_link *out;
uint32_t plen;
uint32_t plen = ceil(p->tp->meta.npieces / 8.0);

plen = (uint32_t)ceil(p->tp->meta.npieces / 8.0);
out = malloc_liob(5);
net_write32(out->iob.buf, plen + 1);
out->iob.buf[4] = MSG_BITFIELD;
struct net_buf *out = nb_create_alloc(NB_BITFIELD, 5);
net_write32(out->buf, plen + 1);
out->buf[4] = MSG_BITFIELD;
net_send(p, out);
out = salloc_liob(p->tp->piece_field, plen, nokill_iob);
out = nb_create_set(NB_BITDATA, p->tp->piece_field, plen, kill_buf_no);
net_send(p, out);
}

void
net_send_shake(struct peer *p)
{
struct iob_link *out;
out = malloc_liob(68);
bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->iob.buf, 28);
bcopy(p->tp->meta.info_hash, out->iob.buf + 28, 20);
bcopy(btpd.peer_id, out->iob.buf + 48, 20);
struct net_buf *out = nb_create_alloc(NB_SHAKE, 68);
bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->buf, 28);
bcopy(p->tp->meta.info_hash, out->buf + 28, 20);
bcopy(btpd.peer_id, out->buf + 48, 20);
net_send(p, out);
}



+ 37
- 6
btpd/net.h 查看文件

@@ -11,14 +11,45 @@
#define MSG_PIECE 7
#define MSG_CANCEL 8

struct iob_link {
int upload;
BTPDQ_ENTRY(iob_link) entry;
void (*kill_buf)(struct io_buffer *);
struct io_buffer iob;
#define NB_CHOKE 0
#define NB_UNCHOKE 1
#define NB_INTEREST 2
#define NB_UNINTEREST 3
#define NB_HAVE 4
#define NB_BITFIELD 5
#define NB_REQUEST 6
#define NB_PIECE 7
#define NB_CANCEL 8
#define NB_TORRENTDATA 10
#define NB_MULTIHAVE 11
#define NB_BITDATA 12
#define NB_SHAKE 13

struct net_buf {
unsigned refs;

struct {
short type;
uint32_t index, begin, length;
} info;

char *buf;
size_t len;
void (*kill_buf)(char *, size_t);
};

struct nb_link {
struct net_buf *nb;
BTPDQ_ENTRY(nb_link) entry;
};

BTPDQ_HEAD(io_tq, iob_link);
BTPDQ_HEAD(nb_tq, nb_link);

struct net_buf *nb_create_alloc(short type, size_t len);
struct net_buf *nb_create_set(short type, char *buf, size_t len,
void (*kill_buf)(char *, size_t));
int nb_drop(struct net_buf *nb);
void nb_hold(struct net_buf *nb);

struct peer;



+ 38
- 25
btpd/peer.c 查看文件

@@ -20,7 +20,7 @@ peer_get_rate(unsigned long *rates)
void
peer_kill(struct peer *p)
{
struct iob_link *iol;
struct nb_link *nl;
struct piece_req *req;

btpd_log(BTPD_L_CONN, "killed peer.\n");
@@ -38,18 +38,12 @@ peer_kill(struct peer *p)
event_del(&p->in_ev);
event_del(&p->out_ev);

iol = BTPDQ_FIRST(&p->outq);
while (iol != NULL) {
struct iob_link *next = BTPDQ_NEXT(iol, entry);
iol->kill_buf(&iol->iob);
free(iol);
iol = next;
}
req = BTPDQ_FIRST(&p->p_reqs);
while (req != NULL) {
struct piece_req *next = BTPDQ_NEXT(req, entry);
free(req);
req = next;
nl = BTPDQ_FIRST(&p->outq);
while (nl != NULL) {
struct nb_link *next = BTPDQ_NEXT(nl, entry);
nb_drop(nl->nb);
free(nl);
nl = next;
}
req = BTPDQ_FIRST(&p->my_reqs);
while (req != NULL) {
@@ -113,10 +107,22 @@ peer_unchoke(struct peer *p)
void
peer_choke(struct peer *p)
{
struct piece_req *req;

while ((req = BTPDQ_FIRST(&p->p_reqs)) != NULL)
net_unsend_piece(p, req);
struct nb_link *nl = BTPDQ_FIRST(&p->outq);
while (nl != NULL) {
struct nb_link *next = BTPDQ_NEXT(nl, entry);
if (nl->nb->info.type == NB_PIECE
&& (nl != BTPDQ_FIRST(&p->outq) && p->outq_off > 0)) {
nb_drop(nl->nb);
BTPDQ_REMOVE(&p->outq, nl, entry);
free(nl);
nl = next;
next = BTPDQ_NEXT(next, entry);
nb_drop(nl->nb);
BTPDQ_REMOVE(&p->outq, nl, entry);
free(nl);
}
nl = next;
}

p->flags |= PF_I_CHOKE;
net_send_choke(p);
@@ -151,7 +157,6 @@ peer_create_common(int sd)

p->sd = sd;
p->flags = PF_I_CHOKE | PF_P_CHOKE;
BTPDQ_INIT(&p->p_reqs);
BTPDQ_INIT(&p->my_reqs);
BTPDQ_INIT(&p->outq);

@@ -304,15 +309,23 @@ void
peer_on_cancel(struct peer *p, uint32_t index, uint32_t begin,
uint32_t length)
{
struct piece_req *req = BTPDQ_FIRST(&p->p_reqs);
while (req != NULL) {
if (req->index == index
&& req->begin == begin && req->length == length) {
struct nb_link *nl = BTPDQ_FIRST(&p->outq);
while (nl != NULL) {
if (nl->nb->info.type == NB_PIECE
&& nl->nb->info.index == index
&& nl->nb->info.begin == begin
&& nl->nb->info.length == length
&& (nl != BTPDQ_FIRST(&p->outq) && p->outq_off > 0)) {
btpd_log(BTPD_L_MSG, "cancel matched.\n");
net_unsend_piece(p, req);
break;
struct nb_link *data = BTPDQ_NEXT(nl, entry);
nb_drop(nl->nb);
BTPDQ_REMOVE(&p->outq, nl, entry);
free(nl);
nb_drop(data->nb);
BTPDQ_REMOVE(&p->outq, data, entry);
free(data);
}
req = BTPDQ_NEXT(req, entry);
nl = BTPDQ_NEXT(nl, entry);
}
}



+ 3
- 2
btpd/peer.h 查看文件

@@ -25,11 +25,12 @@ struct peer {

struct torrent *tp;

struct piece_req_tq p_reqs, my_reqs;
struct piece_req_tq my_reqs;

unsigned nreqs_out;

struct io_tq outq;
size_t outq_off;
struct nb_tq outq;

struct event in_ev;
struct event out_ev;


||||||
x
 
000:0
Loading…
取消
儲存