Browse Source

Rewrite of the code for receiving data from peers.

It's not quite how I want it yet, but it's getting there.
master
Richard Nyberg 19 years ago
parent
commit
32a88ff5d8
4 changed files with 265 additions and 448 deletions
  1. +246
    -404
      btpd/net.c
  2. +6
    -36
      btpd/net.h
  3. +8
    -6
      btpd/peer.c
  4. +5
    -2
      btpd/peer.h

+ 246
- 404
btpd/net.c View File

@@ -18,43 +18,11 @@

#include "btpd.h"

#define min(x, y) ((x) <= (y) ? (x) : (y))

static unsigned long
net_write(struct peer *p, unsigned long wmax);

void
net_read_cb(int sd, short type, void *arg)
{
struct peer *p = (struct peer *)arg;
if (btpd.ibwlim == 0) {
p->reader->read(p, 0);
} else if (btpd.ibw_left > 0) {
btpd.ibw_left -= p->reader->read(p, btpd.ibw_left);
} else {
p->flags |= PF_ON_READQ;
BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry);
}
}
#ifndef IOV_MAX
#define IOV_MAX 1024
#endif

void
net_write_cb(int sd, short type, void *arg)
{
struct peer *p = (struct peer *)arg;
if (type == EV_TIMEOUT) {
btpd_log(BTPD_L_CONN, "Write attempt timed out.\n");
peer_kill(p);
return;
}
if (btpd.obwlim == 0) {
net_write(p, 0);
} else if (btpd.obw_left > 0) {
btpd.obw_left -= net_write(p, btpd.obw_left);
} else {
p->flags |= PF_ON_WRITEQ;
BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry);
}
}
#define min(x, y) ((x) <= (y) ? (x) : (y))

void
net_write32(void *buf, uint32_t num)
@@ -68,19 +36,11 @@ net_read32(void *buf)
return ntohl(*(uint32_t *)buf);
}

void
kill_shake(struct input_reader *reader)
{
free(reader);
}

#define NIOV 16

static unsigned long
net_write(struct peer *p, unsigned long wmax)
{
struct nb_link *nl;
struct iovec iov[NIOV];
struct iovec iov[IOV_MAX];
int niov;
int limited;
ssize_t nwritten;
@@ -90,7 +50,8 @@ net_write(struct peer *p, unsigned long wmax)

niov = 0;
assert((nl = BTPDQ_FIRST(&p->outq)) != NULL);
while (niov < NIOV && nl != NULL && (!limited || (limited && wmax > 0))) {
while ((niov < IOV_MAX && nl != NULL
&& (!limited || (limited && wmax > 0)))) {
if (niov > 0) {
iov[niov].iov_base = nl->nb->buf;
iov[niov].iov_len = nl->nb->len;
@@ -159,363 +120,136 @@ net_write(struct peer *p, unsigned long wmax)
return nwritten;
}

static void
kill_generic(struct input_reader *reader)
{
free(reader);
}

static size_t
net_read(struct peer *p, char *buf, size_t len)
{
ssize_t nread = read(p->sd, buf, len);
if (nread < 0) {
if (errno == EAGAIN) {
event_add(&p->in_ev, NULL);
return 0;
} else {
btpd_log(BTPD_L_CONN, "read error: %s\n", strerror(errno));
peer_kill(p);
return 0;
}
} else if (nread == 0) {
btpd_log(BTPD_L_CONN, "conn closed by other side.\n");
if (!BTPDQ_EMPTY(&p->outq))
p->flags |= PF_WRITE_CLOSE;
else
peer_kill(p);
return 0;
} else
return nread;
}

static size_t
net_read_to_buf(struct peer *p, struct io_buffer *iob, unsigned long rmax)
{
if (rmax == 0)
rmax = iob->buf_len - iob->buf_off;
else
rmax = min(rmax, iob->buf_len - iob->buf_off);

assert(rmax > 0);
size_t nread = net_read(p, iob->buf + iob->buf_off, rmax);
if (nread > 0)
iob->buf_off += nread;
return nread;
}

void
kill_bitfield(struct input_reader *rd)
net_set_state(struct peer *p, int state, size_t size)
{
free(rd);
p->net_state = state;
p->state_bytes = size;
}

static void net_generic_reader(struct peer *p);

static unsigned long
read_bitfield(struct peer *p, unsigned long rmax)
static int
net_dispatch_msg(struct peer *p, uint32_t mlen, uint8_t mnum, uint8_t *buf)
{
struct bitfield_reader *rd = (struct bitfield_reader *)p->reader;

size_t nread = net_read_to_buf(p, &rd->iob, rmax);
if (nread == 0)
return 0;

if (rd->iob.buf_off == rd->iob.buf_len) {
peer_on_bitfield(p, rd->iob.buf);
free(rd);
net_generic_reader(p);
} else
event_add(&p->in_ev, NULL);

return nread;
}

void
kill_piece(struct input_reader *rd)
{
free(rd);
uint32_t index, begin, length;
int res = 0;

switch (mnum) {
case MSG_CHOKE:
peer_on_choke(p);
break;
case MSG_UNCHOKE:
peer_on_unchoke(p);
break;
case MSG_INTEREST:
peer_on_interest(p);
break;
case MSG_UNINTEREST:
peer_on_uninterest(p);
break;
case MSG_HAVE:
peer_on_have(p, net_read32(buf + 5));
break;
case MSG_BITFIELD:
peer_on_bitfield(p, buf + 5);
break;
case MSG_REQUEST:
if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) {
index = net_read32(buf + 5);
begin = net_read32(buf + 9);
length = net_read32(buf + 13);
if ((length > PIECE_BLOCKLEN
|| index >= p->tp->meta.npieces
|| !has_bit(p->tp->piece_field, index)
|| begin + length < torrent_piece_size(p->tp, index))) {
res = 1;
break;
}
peer_on_request(p, index, begin, length);
}
break;
case MSG_CANCEL:
index = net_read32(buf + 5);
begin = net_read32(buf + 9);
length = net_read32(buf + 13);
peer_on_cancel(p, index, begin, length);
break;
case MSG_PIECE:
index = net_read32(buf + 5);
begin = net_read32(buf + 9);
length = mlen - 9;
peer_on_piece(p, index, begin, length, buf + 13);
break;
default:
abort();
}
return res;
}

static unsigned long
read_piece(struct peer *p, unsigned long rmax)
static int
net_mh_ok(struct peer *p, uint32_t mlen, uint8_t mnum)
{
struct piece_reader *rd = (struct piece_reader *)p->reader;

size_t nread = net_read_to_buf(p, &rd->iob, rmax);
if (nread == 0)
switch (mnum) {
case MSG_CHOKE:
case MSG_UNCHOKE:
case MSG_INTEREST:
case MSG_UNINTEREST:
return mlen == 1;
case MSG_HAVE:
return mlen == 5;
case MSG_BITFIELD:
return mlen == (uint32_t)ceil(p->tp->meta.npieces / 8.0) + 1;
case MSG_REQUEST:
case MSG_CANCEL:
return mlen == 13;
case MSG_PIECE:
return mlen <= PIECE_BLOCKLEN + 9;
default:
return 0;

p->rate_to_me[btpd.seconds % RATEHISTORY] += nread;
p->tp->downloaded += nread;
if (rd->iob.buf_off == rd->iob.buf_len) {
peer_on_piece(p, rd->index, rd->begin, rd->iob.buf_len, rd->iob.buf);
free(rd);
net_generic_reader(p);
} else
event_add(&p->in_ev, NULL);

return nread;
}

#define GRBUFLEN (1 << 15)

static unsigned long
net_generic_read(struct peer *p, unsigned long rmax)
{
char buf[GRBUFLEN];
struct io_buffer iob = { 0, GRBUFLEN, buf };
struct generic_reader *gr = (struct generic_reader *)p->reader;
size_t nread;
size_t off, len;
int got_part;

if (gr->iob.buf_off > 0) {
iob.buf_off = gr->iob.buf_off;
bcopy(gr->iob.buf, iob.buf, iob.buf_off);
gr->iob.buf_off = 0;
}
if ((nread = net_read_to_buf(p, &iob, rmax)) == 0)
return 0;

len = iob.buf_off;
off = 0;

got_part = 0;
while (!got_part && len - off >= 4) {
size_t msg_len = net_read32(buf + off);

if (msg_len == 0) { /* Keep alive */
off += 4;
continue;
}
if (len - off < 5) {
got_part = 1;
break;
}

switch (buf[off + 4]) {
case MSG_CHOKE:
if (msg_len != 1)
goto bad_data;
peer_on_choke(p);
break;
case MSG_UNCHOKE:
if (msg_len != 1)
goto bad_data;
peer_on_unchoke(p);
break;
case MSG_INTEREST:
if (msg_len != 1)
goto bad_data;
peer_on_interest(p);
break;
case MSG_UNINTEREST:
if (msg_len != 1)
goto bad_data;
peer_on_uninterest(p);
break;
case MSG_HAVE:
if (msg_len != 5)
goto bad_data;
else if (len - off >= msg_len + 4) {
uint32_t index = net_read32(buf + off + 5);
peer_on_have(p, index);
} else
got_part = 1;
break;
case MSG_BITFIELD:
if (msg_len != (size_t)ceil(p->tp->meta.npieces / 8.0) + 1)
goto bad_data;
else if (p->npieces != 0)
goto bad_data;
else if (len - off >= msg_len + 4)
peer_on_bitfield(p, buf + off + 5);
else {
struct bitfield_reader *rp;
size_t mem = sizeof(*rp) + msg_len - 1;
p->reader->kill(p->reader);
rp = btpd_calloc(1, mem);
rp->rd.kill = kill_bitfield;
rp->rd.read = read_bitfield;
rp->iob.buf = (char *)rp + sizeof(*rp);
rp->iob.buf_len = msg_len - 1;
rp->iob.buf_off = len - off - 5;
bcopy(buf + off + 5, rp->iob.buf, rp->iob.buf_off);
p->reader = (struct input_reader *)rp;
event_add(&p->in_ev, NULL);
return nread;
}
break;
case MSG_REQUEST:
if (msg_len != 13)
goto bad_data;
else if (len - off >= msg_len + 4) {
if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) != PF_P_WANT)
break;
uint32_t index, begin, length;
index = net_read32(buf + off + 5);
begin = net_read32(buf + off + 9);
length = net_read32(buf + off + 13);
if (length > (1 << 15))
goto bad_data;
if (index >= p->tp->meta.npieces)
goto bad_data;
if (!has_bit(p->tp->piece_field, index))
goto bad_data;
if (begin + length > torrent_piece_size(p->tp, index))
goto bad_data;
peer_on_request(p, index, begin, length);
} else
got_part = 1;
break;
case MSG_PIECE:
if (msg_len < 10)
goto bad_data;
else if (len - off >= 13) {
uint32_t index = net_read32(buf + off + 5);
uint32_t begin = net_read32(buf + off + 9);
uint32_t length = msg_len - 9;
if (len - off >= msg_len + 4) {
p->tp->downloaded += length;
p->rate_to_me[btpd.seconds % RATEHISTORY] += length;
peer_on_piece(p, index, begin, length, buf + off + 13);
} else {
struct piece_reader *rp;
size_t mem = sizeof(*rp) + length;
p->reader->kill(p->reader);
rp = btpd_calloc(1, mem);
rp->rd.kill = kill_piece;
rp->rd.read = read_piece;
rp->index = index;
rp->begin = begin;
rp->iob.buf = (char *)rp + sizeof(*rp);
rp->iob.buf_len = length;
rp->iob.buf_off = len - off - 13;
bcopy(buf + off + 13, rp->iob.buf, rp->iob.buf_off);
p->reader = (struct input_reader *)rp;
event_add(&p->in_ev, NULL);
p->tp->downloaded += rp->iob.buf_off;
p->rate_to_me[btpd.seconds % RATEHISTORY] +=
rp->iob.buf_off;
return nread;
}
} else
got_part = 1;
break;
case MSG_CANCEL:
if (msg_len != 13)
goto bad_data;
else if (len - off >= msg_len + 4) {
uint32_t index = net_read32(buf + off + 5);
uint32_t begin = net_read32(buf + off + 9);
uint32_t length = net_read32(buf + off + 13);
if (index > p->tp->meta.npieces)
goto bad_data;
if (begin + length > torrent_piece_size(p->tp, index))
goto bad_data;
peer_on_cancel(p, index, begin, length);
} else
got_part = 1;
break;
default:
goto bad_data;
}
if (!got_part)
off += 4 + msg_len;
}
if (off != len) {
gr->iob.buf_off = len - off;
assert(gr->iob.buf_off <= gr->iob.buf_len);
bcopy(buf + off, gr->iob.buf, gr->iob.buf_off);
}
event_add(&p->in_ev, NULL);
return nread;

bad_data:
btpd_log(BTPD_L_MSG, "received bad data from %p\n", p);
peer_kill(p);
return nread;
}

static void
net_generic_reader(struct peer *p)
net_progress(struct peer *p, size_t length)
{
struct generic_reader *gr;
gr = btpd_calloc(1, sizeof(*gr));

gr->rd.read = net_generic_read;
gr->rd.kill = kill_generic;

gr->iob.buf = gr->_io_buf;
gr->iob.buf_len = MAX_INPUT_LEFT;
gr->iob.buf_off = 0;

p->reader = (struct input_reader *)gr;

event_add(&p->in_ev, NULL);
if (p->net_state == NET_MSGPIECE) {
p->tp->downloaded += length;
p->rate_to_me[btpd.seconds % RATEHISTORY] += length;
}
}

static unsigned long
net_shake_read(struct peer *p, unsigned long rmax)
static int
net_state_foo(struct peer *p, struct io_buffer *iob)
{
struct handshake *hs = (struct handshake *)p->reader;
struct io_buffer *in = &hs->in;
uint32_t mlen;
uint32_t mnum;

size_t nread = net_read_to_buf(p, in, rmax);
if (nread == 0)
return 0;

switch (hs->state) {
case SHAKE_INIT:
if (in->buf_off < 20)
break;
else if (bcmp(in->buf, "\x13""BitTorrent protocol", 20) == 0)
hs->state = SHAKE_PSTR;
else
goto bad_shake;
switch (p->net_state) {
case SHAKE_PSTR:
if (in->buf_off < 28)
break;
else
hs->state = SHAKE_RESERVED;
case SHAKE_RESERVED:
if (in->buf_off < 48)
break;
else if (hs->incoming) {
struct torrent *tp = torrent_get_by_hash(in->buf + 28);
if (tp != NULL) {
hs->state = SHAKE_INFO;
p->tp = tp;
peer_send(p, nb_create_shake(p->tp));
} else
goto bad_shake;
} else {
if (bcmp(in->buf + 28, p->tp->meta.info_hash, 20) == 0)
hs->state = SHAKE_INFO;
else
goto bad_shake;
}
assert(iob->buf_len >= 28);
if (bcmp(iob->buf, "\x13""BitTorrent protocol", 20) != 0)
goto bad;
net_set_state(p, SHAKE_INFO, 20);
return 28;
case SHAKE_INFO:
if (in->buf_off < 68)
break;
else {
if (torrent_has_peer(p->tp, in->buf + 48))
goto bad_shake; // Not really, but we're already connected.
else if (bcmp(in->buf + 48, btpd.peer_id, 20) == 0)
goto bad_shake; // Connection from myself.
bcopy(in->buf + 48, p->id, 20);
hs->state = SHAKE_ID;
}
default:
assert(hs->state == SHAKE_ID);
}
if (hs->state == SHAKE_ID) {
assert(iob->buf_len >= 20);
if (p->flags & PF_INCOMING) {
struct torrent *tp = torrent_get_by_hash(iob->buf);
if (tp == NULL)
goto bad;
p->tp = tp;
peer_send(p, nb_create_shake(p->tp));
} else if (bcmp(iob->buf, p->tp->meta.info_hash, 20) != 0)
goto bad;
net_set_state(p, SHAKE_ID, 20);
return 20;
case SHAKE_ID:
assert(iob->buf_len >= 20);
if ((torrent_has_peer(p->tp, iob->buf)
|| bcmp(iob->buf, btpd.peer_id, 20) == 0))
goto bad;
bcopy(iob->buf, p->id, 20);
btpd_log(BTPD_L_CONN, "Got whole shake.\n");
free(hs);
p->piece_field = btpd_calloc(1, (int)ceil(p->tp->meta.npieces / 8.0));
net_generic_reader(p);
if (p->tp->have_npieces > 0) {
if (p->tp->have_npieces * 9 < 5 + ceil(p->tp->meta.npieces / 8.0))
peer_send(p, nb_create_multihave(p->tp));
@@ -525,37 +259,112 @@ net_shake_read(struct peer *p, unsigned long rmax)
}
}
cm_on_new_peer(p);
} else
event_add(&p->in_ev, NULL);

return nread;
net_set_state(p, NET_MSGSIZE, 4);
return 20;
case NET_MSGSIZE:
assert(iob->buf_len >= 4);
if (bcmp(iob->buf, "\0\0\0\0", 4) == 0)
return 4;
else {
net_set_state(p, NET_MSGHEAD, 5);
return 0;
}
case NET_MSGHEAD:
assert(iob->buf_len >= 5);
mlen = net_read32(iob->buf);
mnum = iob->buf[4];
if (!net_mh_ok(p, mlen, mnum)) {
btpd_log(BTPD_L_ERROR, "error in head\n");
goto bad;
} else if (mlen == 1) {
if (net_dispatch_msg(p, mlen, mnum, iob->buf) != 0) {
btpd_log(BTPD_L_ERROR, "error in dispatch\n");
goto bad;
}
net_set_state(p, NET_MSGSIZE, 4);
return 5;
} else {
uint8_t nstate = mnum == MSG_PIECE ? NET_MSGPIECE : NET_MSGBODY;
net_set_state(p, nstate, mlen + 4);
return 0;
}
case NET_MSGPIECE:
case NET_MSGBODY:
mlen = net_read32(iob->buf);
mnum = iob->buf[4];
assert(iob->buf_len >= mlen + 4);
if (net_dispatch_msg(p, mlen, mnum, iob->buf) != 0) {
btpd_log(BTPD_L_ERROR, "error in dispatch\n");
goto bad;
}
net_set_state(p, NET_MSGSIZE, 4);
return mlen + 4;
default:
abort();
}

bad_shake:
btpd_log(BTPD_L_CONN, "Bad shake(%d)\n", hs->state);
bad:
btpd_log(BTPD_L_CONN, "bad data.\n");
peer_kill(p);
return nread;
return -1;
}

#define GRBUFLEN (1 << 15)

void
net_handshake(struct peer *p, int incoming)
static unsigned long
net_read(struct peer *p, unsigned long rmax)
{
struct handshake *hs;
size_t baggage = p->net_in.buf_len;
char buf[GRBUFLEN + baggage];
struct io_buffer sbuf = { baggage, sizeof(buf), buf };
if (baggage > 0) {
bcopy(p->net_in.buf, buf, baggage);
free(p->net_in.buf);
p->net_in.buf = NULL;
p->net_in.buf_off = 0;
p->net_in.buf_len = 0;
}

if (rmax > 0)
rmax = min(rmax, sbuf.buf_len - sbuf.buf_off);
else
rmax = sbuf.buf_len - sbuf.buf_off;

ssize_t nread = read(p->sd, sbuf.buf + sbuf.buf_off, rmax);
if (nread < 0 && errno == EAGAIN)
goto out;
else if (nread < 0) {
btpd_log(BTPD_L_CONN, "Read error (%s) on %p.\n", strerror(errno), p);
peer_kill(p);
return 0;
} else if (nread == 0) {
btpd_log(BTPD_L_CONN, "Connection closed by %p.\n", p);
peer_kill(p);
return 0;
}

hs = calloc(1, sizeof(*hs));
hs->incoming = incoming;
hs->state = SHAKE_INIT;
sbuf.buf_len = sbuf.buf_off + nread;
sbuf.buf_off = 0;
while (p->state_bytes <= sbuf.buf_len) {
ssize_t chomped = net_state_foo(p, &sbuf);
if (chomped < 0)
return nread;
sbuf.buf += chomped;
sbuf.buf_len -= chomped;
baggage = 0;
}

hs->in.buf_len = SHAKE_LEN;
hs->in.buf_off = 0;
hs->in.buf = hs->_io_buf;
net_progress(p, sbuf.buf_len - baggage);

p->reader = (struct input_reader *)hs;
hs->rd.read = net_shake_read;
hs->rd.kill = kill_shake;
if (sbuf.buf_len > 0) {
p->net_in = sbuf;
p->net_in.buf = btpd_malloc(sbuf.buf_len);
bcopy(sbuf.buf, p->net_in.buf, sbuf.buf_len);
}

if (!incoming)
peer_send(p, nb_create_shake(p->tp));
out:
event_add(&p->in_ev, NULL);
return nread > 0 ? nread : 0;
}

int
@@ -660,13 +469,13 @@ net_bw_cb(int sd, short type, void *arg)
while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left > 0) {
BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
p->flags &= ~PF_ON_READQ;
btpd.ibw_left -= p->reader->read(p, btpd.ibw_left);
btpd.ibw_left -= net_read(p, btpd.ibw_left);
}
} else {
while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL) {
BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
p->flags &= ~PF_ON_READQ;
p->reader->read(p, 0);
net_read(p, 0);
}
}

@@ -685,3 +494,36 @@ net_bw_cb(int sd, short type, void *arg)
}
event_add(&btpd.bwlim, (& (struct timeval) { 0, 1000000 / btpd.bw_hz }));
}

void
net_read_cb(int sd, short type, void *arg)
{
struct peer *p = (struct peer *)arg;
if (btpd.ibwlim == 0)
net_read(p, 0);
else if (btpd.ibw_left > 0)
btpd.ibw_left -= net_read(p, btpd.ibw_left);
else {
p->flags |= PF_ON_READQ;
BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry);
}
}

void
net_write_cb(int sd, short type, void *arg)
{
struct peer *p = (struct peer *)arg;
if (type == EV_TIMEOUT) {
btpd_log(BTPD_L_CONN, "Write attempt timed out.\n");
peer_kill(p);
return;
}
if (btpd.obwlim == 0) {
net_write(p, 0);
} else if (btpd.obw_left > 0) {
btpd.obw_left -= net_write(p, btpd.obw_left);
} else {
p->flags |= PF_ON_WRITEQ;
BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry);
}
}

+ 6
- 36
btpd/net.h View File

@@ -13,25 +13,6 @@

#define WRITE_TIMEOUT (& (struct timeval) { 60, 0 })

struct peer;

struct input_reader {
unsigned long (*read)(struct peer *, unsigned long);
void (*kill)(struct input_reader *);
};

struct bitfield_reader {
struct input_reader rd;
struct io_buffer iob;
};

struct piece_reader {
struct input_reader rd;
struct io_buffer iob;
uint32_t index;
uint32_t begin;
};

#define SHAKE_LEN 68

enum shake_state {
@@ -39,24 +20,14 @@ enum shake_state {
SHAKE_PSTR,
SHAKE_RESERVED,
SHAKE_INFO,
SHAKE_ID
};

struct handshake {
struct input_reader rd;
enum shake_state state;
int incoming;
struct io_buffer in;
char _io_buf[SHAKE_LEN];
SHAKE_ID,
NET_MSGSIZE,
NET_MSGHEAD,
NET_MSGBODY,
NET_MSGPIECE
};

#define MAX_INPUT_LEFT 16

struct generic_reader {
struct input_reader rd;
struct io_buffer iob;
char _io_buf[MAX_INPUT_LEFT];
};
void net_set_state(struct peer *p, int state, size_t size);

void net_connection_cb(int sd, short type, void *arg);
void net_bw_rate(void);
@@ -65,7 +36,6 @@ void net_bw_cb(int sd, short type, void *arg);
void net_read_cb(int sd, short type, void *arg);
void net_write_cb(int sd, short type, void *arg);

void net_handshake(struct peer *p, int incoming);
int net_connect2(struct sockaddr *sa, socklen_t salen, int *sd);
int net_connect(const char *ip, int port, int *sd);



+ 8
- 6
btpd/peer.c View File

@@ -22,7 +22,7 @@ peer_kill(struct peer *p)
{
struct nb_link *nl;

btpd_log(BTPD_L_CONN, "killed peer.\n");
btpd_log(BTPD_L_CONN, "killed peer %p\n", p);

if (p->flags & PF_ATTACHED)
cm_on_lost_peer(p);
@@ -45,7 +45,8 @@ peer_kill(struct peer *p)
nl = next;
}

p->reader->kill(p->reader);
if (p->net_in.buf != NULL)
free(p->net_in.buf);
if (p->piece_field != NULL)
free(p->piece_field);
free(p);
@@ -66,7 +67,6 @@ peer_send(struct peer *p, struct net_buf *nb)
BTPDQ_INSERT_TAIL(&p->outq, nl, entry);
}


/*
* Remove a network buffer from the peer's outq.
* If a part of the buffer already have been written
@@ -250,6 +250,8 @@ peer_create_common(int sd)
BTPDQ_INIT(&p->my_reqs);
BTPDQ_INIT(&p->outq);

net_set_state(p, SHAKE_PSTR, 28);

event_set(&p->out_ev, p->sd, EV_WRITE, net_write_cb, p);
event_set(&p->in_ev, p->sd, EV_READ, net_read_cb, p);
event_add(&p->in_ev, NULL);
@@ -263,7 +265,7 @@ void
peer_create_in(int sd)
{
struct peer *p = peer_create_common(sd);
net_handshake(p, 1);
p->flags |= PF_INCOMING;
}

void
@@ -278,7 +280,7 @@ peer_create_out(struct torrent *tp, const uint8_t *id,

p = peer_create_common(sd);
p->tp = tp;
net_handshake(p, 0);
peer_send(p, nb_create_shake(p->tp));
}

void
@@ -297,7 +299,7 @@ peer_create_out_compact(struct torrent *tp, const char *compact)

p = peer_create_common(sd);
p->tp = tp;
net_handshake(p, 0);
peer_send(p, nb_create_shake(p->tp));
}

void


+ 5
- 2
btpd/peer.h View File

@@ -10,6 +10,7 @@
#define PF_ATTACHED 0x40
#define PF_WRITE_CLOSE 0x80 /* Close connection after writing all data */
#define PF_NO_REQUESTS 0x100
#define PF_INCOMING 0x200

#define RATEHISTORY 20
#define MAXPIECEMSGS 128
@@ -46,11 +47,13 @@ struct peer {
struct event in_ev;
struct event out_ev;

struct input_reader *reader;

unsigned long rate_to_me[RATEHISTORY];
unsigned long rate_from_me[RATEHISTORY];

size_t state_bytes;
uint8_t net_state;
struct io_buffer net_in;

BTPDQ_ENTRY(peer) cm_entry;

BTPDQ_ENTRY(peer) rq_entry;


Loading…
Cancel
Save