From 32a88ff5d8ba98700ef5c52d3b689ab00718fe5c Mon Sep 17 00:00:00 2001 From: Richard Nyberg Date: Tue, 4 Oct 2005 17:52:56 +0000 Subject: [PATCH] Rewrite of the code for receiving data from peers. It's not quite how I want it yet, but it's getting there. --- btpd/net.c | 650 ++++++++++++++++++++-------------------------------- btpd/net.h | 42 +--- btpd/peer.c | 14 +- btpd/peer.h | 7 +- 4 files changed, 265 insertions(+), 448 deletions(-) diff --git a/btpd/net.c b/btpd/net.c index 6435064..90f1203 100644 --- a/btpd/net.c +++ b/btpd/net.c @@ -18,43 +18,11 @@ #include "btpd.h" -#define min(x, y) ((x) <= (y) ? (x) : (y)) - -static unsigned long -net_write(struct peer *p, unsigned long wmax); - -void -net_read_cb(int sd, short type, void *arg) -{ - struct peer *p = (struct peer *)arg; - if (btpd.ibwlim == 0) { - p->reader->read(p, 0); - } else if (btpd.ibw_left > 0) { - btpd.ibw_left -= p->reader->read(p, btpd.ibw_left); - } else { - p->flags |= PF_ON_READQ; - BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry); - } -} +#ifndef IOV_MAX +#define IOV_MAX 1024 +#endif -void -net_write_cb(int sd, short type, void *arg) -{ - struct peer *p = (struct peer *)arg; - if (type == EV_TIMEOUT) { - btpd_log(BTPD_L_CONN, "Write attempt timed out.\n"); - peer_kill(p); - return; - } - if (btpd.obwlim == 0) { - net_write(p, 0); - } else if (btpd.obw_left > 0) { - btpd.obw_left -= net_write(p, btpd.obw_left); - } else { - p->flags |= PF_ON_WRITEQ; - BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry); - } -} +#define min(x, y) ((x) <= (y) ? (x) : (y)) void net_write32(void *buf, uint32_t num) @@ -68,19 +36,11 @@ net_read32(void *buf) return ntohl(*(uint32_t *)buf); } -void -kill_shake(struct input_reader *reader) -{ - free(reader); -} - -#define NIOV 16 - static unsigned long net_write(struct peer *p, unsigned long wmax) { struct nb_link *nl; - struct iovec iov[NIOV]; + struct iovec iov[IOV_MAX]; int niov; int limited; ssize_t nwritten; @@ -90,7 +50,8 @@ net_write(struct peer *p, unsigned long wmax) niov = 0; assert((nl = BTPDQ_FIRST(&p->outq)) != NULL); - while (niov < NIOV && nl != NULL && (!limited || (limited && wmax > 0))) { + while ((niov < IOV_MAX && nl != NULL + && (!limited || (limited && wmax > 0)))) { if (niov > 0) { iov[niov].iov_base = nl->nb->buf; iov[niov].iov_len = nl->nb->len; @@ -159,363 +120,136 @@ net_write(struct peer *p, unsigned long wmax) return nwritten; } -static void -kill_generic(struct input_reader *reader) -{ - free(reader); -} - -static size_t -net_read(struct peer *p, char *buf, size_t len) -{ - ssize_t nread = read(p->sd, buf, len); - if (nread < 0) { - if (errno == EAGAIN) { - event_add(&p->in_ev, NULL); - return 0; - } else { - btpd_log(BTPD_L_CONN, "read error: %s\n", strerror(errno)); - peer_kill(p); - return 0; - } - } else if (nread == 0) { - btpd_log(BTPD_L_CONN, "conn closed by other side.\n"); - if (!BTPDQ_EMPTY(&p->outq)) - p->flags |= PF_WRITE_CLOSE; - else - peer_kill(p); - return 0; - } else - return nread; -} - -static size_t -net_read_to_buf(struct peer *p, struct io_buffer *iob, unsigned long rmax) -{ - if (rmax == 0) - rmax = iob->buf_len - iob->buf_off; - else - rmax = min(rmax, iob->buf_len - iob->buf_off); - - assert(rmax > 0); - size_t nread = net_read(p, iob->buf + iob->buf_off, rmax); - if (nread > 0) - iob->buf_off += nread; - return nread; -} - void -kill_bitfield(struct input_reader *rd) +net_set_state(struct peer *p, int state, size_t size) { - free(rd); + p->net_state = state; + p->state_bytes = size; } -static void net_generic_reader(struct peer *p); - -static unsigned long -read_bitfield(struct peer *p, unsigned long rmax) +static int +net_dispatch_msg(struct peer *p, uint32_t mlen, uint8_t mnum, uint8_t *buf) { - struct bitfield_reader *rd = (struct bitfield_reader *)p->reader; - - size_t nread = net_read_to_buf(p, &rd->iob, rmax); - if (nread == 0) - return 0; - - if (rd->iob.buf_off == rd->iob.buf_len) { - peer_on_bitfield(p, rd->iob.buf); - free(rd); - net_generic_reader(p); - } else - event_add(&p->in_ev, NULL); - - return nread; -} - -void -kill_piece(struct input_reader *rd) -{ - free(rd); + uint32_t index, begin, length; + int res = 0; + + switch (mnum) { + case MSG_CHOKE: + peer_on_choke(p); + break; + case MSG_UNCHOKE: + peer_on_unchoke(p); + break; + case MSG_INTEREST: + peer_on_interest(p); + break; + case MSG_UNINTEREST: + peer_on_uninterest(p); + break; + case MSG_HAVE: + peer_on_have(p, net_read32(buf + 5)); + break; + case MSG_BITFIELD: + peer_on_bitfield(p, buf + 5); + break; + case MSG_REQUEST: + if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) { + index = net_read32(buf + 5); + begin = net_read32(buf + 9); + length = net_read32(buf + 13); + if ((length > PIECE_BLOCKLEN + || index >= p->tp->meta.npieces + || !has_bit(p->tp->piece_field, index) + || begin + length < torrent_piece_size(p->tp, index))) { + res = 1; + break; + } + peer_on_request(p, index, begin, length); + } + break; + case MSG_CANCEL: + index = net_read32(buf + 5); + begin = net_read32(buf + 9); + length = net_read32(buf + 13); + peer_on_cancel(p, index, begin, length); + break; + case MSG_PIECE: + index = net_read32(buf + 5); + begin = net_read32(buf + 9); + length = mlen - 9; + peer_on_piece(p, index, begin, length, buf + 13); + break; + default: + abort(); + } + return res; } -static unsigned long -read_piece(struct peer *p, unsigned long rmax) +static int +net_mh_ok(struct peer *p, uint32_t mlen, uint8_t mnum) { - struct piece_reader *rd = (struct piece_reader *)p->reader; - - size_t nread = net_read_to_buf(p, &rd->iob, rmax); - if (nread == 0) + switch (mnum) { + case MSG_CHOKE: + case MSG_UNCHOKE: + case MSG_INTEREST: + case MSG_UNINTEREST: + return mlen == 1; + case MSG_HAVE: + return mlen == 5; + case MSG_BITFIELD: + return mlen == (uint32_t)ceil(p->tp->meta.npieces / 8.0) + 1; + case MSG_REQUEST: + case MSG_CANCEL: + return mlen == 13; + case MSG_PIECE: + return mlen <= PIECE_BLOCKLEN + 9; + default: return 0; - - p->rate_to_me[btpd.seconds % RATEHISTORY] += nread; - p->tp->downloaded += nread; - if (rd->iob.buf_off == rd->iob.buf_len) { - peer_on_piece(p, rd->index, rd->begin, rd->iob.buf_len, rd->iob.buf); - free(rd); - net_generic_reader(p); - } else - event_add(&p->in_ev, NULL); - - return nread; -} - -#define GRBUFLEN (1 << 15) - -static unsigned long -net_generic_read(struct peer *p, unsigned long rmax) -{ - char buf[GRBUFLEN]; - struct io_buffer iob = { 0, GRBUFLEN, buf }; - struct generic_reader *gr = (struct generic_reader *)p->reader; - size_t nread; - size_t off, len; - int got_part; - - if (gr->iob.buf_off > 0) { - iob.buf_off = gr->iob.buf_off; - bcopy(gr->iob.buf, iob.buf, iob.buf_off); - gr->iob.buf_off = 0; } - - if ((nread = net_read_to_buf(p, &iob, rmax)) == 0) - return 0; - - len = iob.buf_off; - off = 0; - - got_part = 0; - while (!got_part && len - off >= 4) { - size_t msg_len = net_read32(buf + off); - - if (msg_len == 0) { /* Keep alive */ - off += 4; - continue; - } - if (len - off < 5) { - got_part = 1; - break; - } - - switch (buf[off + 4]) { - case MSG_CHOKE: - if (msg_len != 1) - goto bad_data; - peer_on_choke(p); - break; - case MSG_UNCHOKE: - if (msg_len != 1) - goto bad_data; - peer_on_unchoke(p); - break; - case MSG_INTEREST: - if (msg_len != 1) - goto bad_data; - peer_on_interest(p); - break; - case MSG_UNINTEREST: - if (msg_len != 1) - goto bad_data; - peer_on_uninterest(p); - break; - case MSG_HAVE: - if (msg_len != 5) - goto bad_data; - else if (len - off >= msg_len + 4) { - uint32_t index = net_read32(buf + off + 5); - peer_on_have(p, index); - } else - got_part = 1; - break; - case MSG_BITFIELD: - if (msg_len != (size_t)ceil(p->tp->meta.npieces / 8.0) + 1) - goto bad_data; - else if (p->npieces != 0) - goto bad_data; - else if (len - off >= msg_len + 4) - peer_on_bitfield(p, buf + off + 5); - else { - struct bitfield_reader *rp; - size_t mem = sizeof(*rp) + msg_len - 1; - p->reader->kill(p->reader); - rp = btpd_calloc(1, mem); - rp->rd.kill = kill_bitfield; - rp->rd.read = read_bitfield; - rp->iob.buf = (char *)rp + sizeof(*rp); - rp->iob.buf_len = msg_len - 1; - rp->iob.buf_off = len - off - 5; - bcopy(buf + off + 5, rp->iob.buf, rp->iob.buf_off); - p->reader = (struct input_reader *)rp; - event_add(&p->in_ev, NULL); - return nread; - } - break; - case MSG_REQUEST: - if (msg_len != 13) - goto bad_data; - else if (len - off >= msg_len + 4) { - if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) != PF_P_WANT) - break; - uint32_t index, begin, length; - index = net_read32(buf + off + 5); - begin = net_read32(buf + off + 9); - length = net_read32(buf + off + 13); - if (length > (1 << 15)) - goto bad_data; - if (index >= p->tp->meta.npieces) - goto bad_data; - if (!has_bit(p->tp->piece_field, index)) - goto bad_data; - if (begin + length > torrent_piece_size(p->tp, index)) - goto bad_data; - peer_on_request(p, index, begin, length); - } else - got_part = 1; - break; - case MSG_PIECE: - if (msg_len < 10) - goto bad_data; - else if (len - off >= 13) { - uint32_t index = net_read32(buf + off + 5); - uint32_t begin = net_read32(buf + off + 9); - uint32_t length = msg_len - 9; - if (len - off >= msg_len + 4) { - p->tp->downloaded += length; - p->rate_to_me[btpd.seconds % RATEHISTORY] += length; - peer_on_piece(p, index, begin, length, buf + off + 13); - } else { - struct piece_reader *rp; - size_t mem = sizeof(*rp) + length; - p->reader->kill(p->reader); - rp = btpd_calloc(1, mem); - rp->rd.kill = kill_piece; - rp->rd.read = read_piece; - rp->index = index; - rp->begin = begin; - rp->iob.buf = (char *)rp + sizeof(*rp); - rp->iob.buf_len = length; - rp->iob.buf_off = len - off - 13; - bcopy(buf + off + 13, rp->iob.buf, rp->iob.buf_off); - p->reader = (struct input_reader *)rp; - event_add(&p->in_ev, NULL); - p->tp->downloaded += rp->iob.buf_off; - p->rate_to_me[btpd.seconds % RATEHISTORY] += - rp->iob.buf_off; - return nread; - } - } else - got_part = 1; - break; - case MSG_CANCEL: - if (msg_len != 13) - goto bad_data; - else if (len - off >= msg_len + 4) { - uint32_t index = net_read32(buf + off + 5); - uint32_t begin = net_read32(buf + off + 9); - uint32_t length = net_read32(buf + off + 13); - if (index > p->tp->meta.npieces) - goto bad_data; - if (begin + length > torrent_piece_size(p->tp, index)) - goto bad_data; - peer_on_cancel(p, index, begin, length); - } else - got_part = 1; - break; - default: - goto bad_data; - } - if (!got_part) - off += 4 + msg_len; - } - if (off != len) { - gr->iob.buf_off = len - off; - assert(gr->iob.buf_off <= gr->iob.buf_len); - bcopy(buf + off, gr->iob.buf, gr->iob.buf_off); - } - event_add(&p->in_ev, NULL); - return nread; - -bad_data: - btpd_log(BTPD_L_MSG, "received bad data from %p\n", p); - peer_kill(p); - return nread; } static void -net_generic_reader(struct peer *p) +net_progress(struct peer *p, size_t length) { - struct generic_reader *gr; - gr = btpd_calloc(1, sizeof(*gr)); - - gr->rd.read = net_generic_read; - gr->rd.kill = kill_generic; - - gr->iob.buf = gr->_io_buf; - gr->iob.buf_len = MAX_INPUT_LEFT; - gr->iob.buf_off = 0; - - p->reader = (struct input_reader *)gr; - - event_add(&p->in_ev, NULL); + if (p->net_state == NET_MSGPIECE) { + p->tp->downloaded += length; + p->rate_to_me[btpd.seconds % RATEHISTORY] += length; + } } -static unsigned long -net_shake_read(struct peer *p, unsigned long rmax) +static int +net_state_foo(struct peer *p, struct io_buffer *iob) { - struct handshake *hs = (struct handshake *)p->reader; - struct io_buffer *in = &hs->in; + uint32_t mlen; + uint32_t mnum; - size_t nread = net_read_to_buf(p, in, rmax); - if (nread == 0) - return 0; - - switch (hs->state) { - case SHAKE_INIT: - if (in->buf_off < 20) - break; - else if (bcmp(in->buf, "\x13""BitTorrent protocol", 20) == 0) - hs->state = SHAKE_PSTR; - else - goto bad_shake; + switch (p->net_state) { case SHAKE_PSTR: - if (in->buf_off < 28) - break; - else - hs->state = SHAKE_RESERVED; - case SHAKE_RESERVED: - if (in->buf_off < 48) - break; - else if (hs->incoming) { - struct torrent *tp = torrent_get_by_hash(in->buf + 28); - if (tp != NULL) { - hs->state = SHAKE_INFO; - p->tp = tp; - peer_send(p, nb_create_shake(p->tp)); - } else - goto bad_shake; - } else { - if (bcmp(in->buf + 28, p->tp->meta.info_hash, 20) == 0) - hs->state = SHAKE_INFO; - else - goto bad_shake; - } + assert(iob->buf_len >= 28); + if (bcmp(iob->buf, "\x13""BitTorrent protocol", 20) != 0) + goto bad; + net_set_state(p, SHAKE_INFO, 20); + return 28; case SHAKE_INFO: - if (in->buf_off < 68) - break; - else { - if (torrent_has_peer(p->tp, in->buf + 48)) - goto bad_shake; // Not really, but we're already connected. - else if (bcmp(in->buf + 48, btpd.peer_id, 20) == 0) - goto bad_shake; // Connection from myself. - bcopy(in->buf + 48, p->id, 20); - hs->state = SHAKE_ID; - } - default: - assert(hs->state == SHAKE_ID); - } - if (hs->state == SHAKE_ID) { + assert(iob->buf_len >= 20); + if (p->flags & PF_INCOMING) { + struct torrent *tp = torrent_get_by_hash(iob->buf); + if (tp == NULL) + goto bad; + p->tp = tp; + peer_send(p, nb_create_shake(p->tp)); + } else if (bcmp(iob->buf, p->tp->meta.info_hash, 20) != 0) + goto bad; + net_set_state(p, SHAKE_ID, 20); + return 20; + case SHAKE_ID: + assert(iob->buf_len >= 20); + if ((torrent_has_peer(p->tp, iob->buf) + || bcmp(iob->buf, btpd.peer_id, 20) == 0)) + goto bad; + bcopy(iob->buf, p->id, 20); btpd_log(BTPD_L_CONN, "Got whole shake.\n"); - free(hs); p->piece_field = btpd_calloc(1, (int)ceil(p->tp->meta.npieces / 8.0)); - net_generic_reader(p); if (p->tp->have_npieces > 0) { if (p->tp->have_npieces * 9 < 5 + ceil(p->tp->meta.npieces / 8.0)) peer_send(p, nb_create_multihave(p->tp)); @@ -525,37 +259,112 @@ net_shake_read(struct peer *p, unsigned long rmax) } } cm_on_new_peer(p); - } else - event_add(&p->in_ev, NULL); - - return nread; + net_set_state(p, NET_MSGSIZE, 4); + return 20; + case NET_MSGSIZE: + assert(iob->buf_len >= 4); + if (bcmp(iob->buf, "\0\0\0\0", 4) == 0) + return 4; + else { + net_set_state(p, NET_MSGHEAD, 5); + return 0; + } + case NET_MSGHEAD: + assert(iob->buf_len >= 5); + mlen = net_read32(iob->buf); + mnum = iob->buf[4]; + if (!net_mh_ok(p, mlen, mnum)) { + btpd_log(BTPD_L_ERROR, "error in head\n"); + goto bad; + } else if (mlen == 1) { + if (net_dispatch_msg(p, mlen, mnum, iob->buf) != 0) { + btpd_log(BTPD_L_ERROR, "error in dispatch\n"); + goto bad; + } + net_set_state(p, NET_MSGSIZE, 4); + return 5; + } else { + uint8_t nstate = mnum == MSG_PIECE ? NET_MSGPIECE : NET_MSGBODY; + net_set_state(p, nstate, mlen + 4); + return 0; + } + case NET_MSGPIECE: + case NET_MSGBODY: + mlen = net_read32(iob->buf); + mnum = iob->buf[4]; + assert(iob->buf_len >= mlen + 4); + if (net_dispatch_msg(p, mlen, mnum, iob->buf) != 0) { + btpd_log(BTPD_L_ERROR, "error in dispatch\n"); + goto bad; + } + net_set_state(p, NET_MSGSIZE, 4); + return mlen + 4; + default: + abort(); + } -bad_shake: - btpd_log(BTPD_L_CONN, "Bad shake(%d)\n", hs->state); +bad: + btpd_log(BTPD_L_CONN, "bad data.\n"); peer_kill(p); - return nread; + return -1; } +#define GRBUFLEN (1 << 15) -void -net_handshake(struct peer *p, int incoming) +static unsigned long +net_read(struct peer *p, unsigned long rmax) { - struct handshake *hs; + size_t baggage = p->net_in.buf_len; + char buf[GRBUFLEN + baggage]; + struct io_buffer sbuf = { baggage, sizeof(buf), buf }; + if (baggage > 0) { + bcopy(p->net_in.buf, buf, baggage); + free(p->net_in.buf); + p->net_in.buf = NULL; + p->net_in.buf_off = 0; + p->net_in.buf_len = 0; + } + + if (rmax > 0) + rmax = min(rmax, sbuf.buf_len - sbuf.buf_off); + else + rmax = sbuf.buf_len - sbuf.buf_off; + + ssize_t nread = read(p->sd, sbuf.buf + sbuf.buf_off, rmax); + if (nread < 0 && errno == EAGAIN) + goto out; + else if (nread < 0) { + btpd_log(BTPD_L_CONN, "Read error (%s) on %p.\n", strerror(errno), p); + peer_kill(p); + return 0; + } else if (nread == 0) { + btpd_log(BTPD_L_CONN, "Connection closed by %p.\n", p); + peer_kill(p); + return 0; + } - hs = calloc(1, sizeof(*hs)); - hs->incoming = incoming; - hs->state = SHAKE_INIT; + sbuf.buf_len = sbuf.buf_off + nread; + sbuf.buf_off = 0; + while (p->state_bytes <= sbuf.buf_len) { + ssize_t chomped = net_state_foo(p, &sbuf); + if (chomped < 0) + return nread; + sbuf.buf += chomped; + sbuf.buf_len -= chomped; + baggage = 0; + } - hs->in.buf_len = SHAKE_LEN; - hs->in.buf_off = 0; - hs->in.buf = hs->_io_buf; + net_progress(p, sbuf.buf_len - baggage); - p->reader = (struct input_reader *)hs; - hs->rd.read = net_shake_read; - hs->rd.kill = kill_shake; + if (sbuf.buf_len > 0) { + p->net_in = sbuf; + p->net_in.buf = btpd_malloc(sbuf.buf_len); + bcopy(sbuf.buf, p->net_in.buf, sbuf.buf_len); + } - if (!incoming) - peer_send(p, nb_create_shake(p->tp)); +out: + event_add(&p->in_ev, NULL); + return nread > 0 ? nread : 0; } int @@ -660,13 +469,13 @@ net_bw_cb(int sd, short type, void *arg) while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left > 0) { BTPDQ_REMOVE(&btpd.readq, p, rq_entry); p->flags &= ~PF_ON_READQ; - btpd.ibw_left -= p->reader->read(p, btpd.ibw_left); + btpd.ibw_left -= net_read(p, btpd.ibw_left); } } else { while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL) { BTPDQ_REMOVE(&btpd.readq, p, rq_entry); p->flags &= ~PF_ON_READQ; - p->reader->read(p, 0); + net_read(p, 0); } } @@ -685,3 +494,36 @@ net_bw_cb(int sd, short type, void *arg) } event_add(&btpd.bwlim, (& (struct timeval) { 0, 1000000 / btpd.bw_hz })); } + +void +net_read_cb(int sd, short type, void *arg) +{ + struct peer *p = (struct peer *)arg; + if (btpd.ibwlim == 0) + net_read(p, 0); + else if (btpd.ibw_left > 0) + btpd.ibw_left -= net_read(p, btpd.ibw_left); + else { + p->flags |= PF_ON_READQ; + BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry); + } +} + +void +net_write_cb(int sd, short type, void *arg) +{ + struct peer *p = (struct peer *)arg; + if (type == EV_TIMEOUT) { + btpd_log(BTPD_L_CONN, "Write attempt timed out.\n"); + peer_kill(p); + return; + } + if (btpd.obwlim == 0) { + net_write(p, 0); + } else if (btpd.obw_left > 0) { + btpd.obw_left -= net_write(p, btpd.obw_left); + } else { + p->flags |= PF_ON_WRITEQ; + BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry); + } +} diff --git a/btpd/net.h b/btpd/net.h index 58ea819..fd288f9 100644 --- a/btpd/net.h +++ b/btpd/net.h @@ -13,25 +13,6 @@ #define WRITE_TIMEOUT (& (struct timeval) { 60, 0 }) -struct peer; - -struct input_reader { - unsigned long (*read)(struct peer *, unsigned long); - void (*kill)(struct input_reader *); -}; - -struct bitfield_reader { - struct input_reader rd; - struct io_buffer iob; -}; - -struct piece_reader { - struct input_reader rd; - struct io_buffer iob; - uint32_t index; - uint32_t begin; -}; - #define SHAKE_LEN 68 enum shake_state { @@ -39,24 +20,14 @@ enum shake_state { SHAKE_PSTR, SHAKE_RESERVED, SHAKE_INFO, - SHAKE_ID -}; - -struct handshake { - struct input_reader rd; - enum shake_state state; - int incoming; - struct io_buffer in; - char _io_buf[SHAKE_LEN]; + SHAKE_ID, + NET_MSGSIZE, + NET_MSGHEAD, + NET_MSGBODY, + NET_MSGPIECE }; -#define MAX_INPUT_LEFT 16 - -struct generic_reader { - struct input_reader rd; - struct io_buffer iob; - char _io_buf[MAX_INPUT_LEFT]; -}; +void net_set_state(struct peer *p, int state, size_t size); void net_connection_cb(int sd, short type, void *arg); void net_bw_rate(void); @@ -65,7 +36,6 @@ void net_bw_cb(int sd, short type, void *arg); void net_read_cb(int sd, short type, void *arg); void net_write_cb(int sd, short type, void *arg); -void net_handshake(struct peer *p, int incoming); int net_connect2(struct sockaddr *sa, socklen_t salen, int *sd); int net_connect(const char *ip, int port, int *sd); diff --git a/btpd/peer.c b/btpd/peer.c index 572e2cc..e95a7b5 100644 --- a/btpd/peer.c +++ b/btpd/peer.c @@ -22,7 +22,7 @@ peer_kill(struct peer *p) { struct nb_link *nl; - btpd_log(BTPD_L_CONN, "killed peer.\n"); + btpd_log(BTPD_L_CONN, "killed peer %p\n", p); if (p->flags & PF_ATTACHED) cm_on_lost_peer(p); @@ -45,7 +45,8 @@ peer_kill(struct peer *p) nl = next; } - p->reader->kill(p->reader); + if (p->net_in.buf != NULL) + free(p->net_in.buf); if (p->piece_field != NULL) free(p->piece_field); free(p); @@ -66,7 +67,6 @@ peer_send(struct peer *p, struct net_buf *nb) BTPDQ_INSERT_TAIL(&p->outq, nl, entry); } - /* * Remove a network buffer from the peer's outq. * If a part of the buffer already have been written @@ -250,6 +250,8 @@ peer_create_common(int sd) BTPDQ_INIT(&p->my_reqs); BTPDQ_INIT(&p->outq); + net_set_state(p, SHAKE_PSTR, 28); + event_set(&p->out_ev, p->sd, EV_WRITE, net_write_cb, p); event_set(&p->in_ev, p->sd, EV_READ, net_read_cb, p); event_add(&p->in_ev, NULL); @@ -263,7 +265,7 @@ void peer_create_in(int sd) { struct peer *p = peer_create_common(sd); - net_handshake(p, 1); + p->flags |= PF_INCOMING; } void @@ -278,7 +280,7 @@ peer_create_out(struct torrent *tp, const uint8_t *id, p = peer_create_common(sd); p->tp = tp; - net_handshake(p, 0); + peer_send(p, nb_create_shake(p->tp)); } void @@ -297,7 +299,7 @@ peer_create_out_compact(struct torrent *tp, const char *compact) p = peer_create_common(sd); p->tp = tp; - net_handshake(p, 0); + peer_send(p, nb_create_shake(p->tp)); } void diff --git a/btpd/peer.h b/btpd/peer.h index a8eb80d..c92f925 100644 --- a/btpd/peer.h +++ b/btpd/peer.h @@ -10,6 +10,7 @@ #define PF_ATTACHED 0x40 #define PF_WRITE_CLOSE 0x80 /* Close connection after writing all data */ #define PF_NO_REQUESTS 0x100 +#define PF_INCOMING 0x200 #define RATEHISTORY 20 #define MAXPIECEMSGS 128 @@ -46,11 +47,13 @@ struct peer { struct event in_ev; struct event out_ev; - struct input_reader *reader; - unsigned long rate_to_me[RATEHISTORY]; unsigned long rate_from_me[RATEHISTORY]; + size_t state_bytes; + uint8_t net_state; + struct io_buffer net_in; + BTPDQ_ENTRY(peer) cm_entry; BTPDQ_ENTRY(peer) rq_entry;