From 2acdcff5a62ff7ae6279de0021a10c0411230498 Mon Sep 17 00:00:00 2001 From: Richard Nyberg Date: Tue, 2 Aug 2005 17:05:25 +0000 Subject: [PATCH] * Rearrange some code. Mostly from net to net_buf and peer. * Use the new net_bufs where it makes sense. * Take advantage of the reference count on net_bufs and only allocate the (un)choke and (un)interest messages once. --- btpd/Makefile.am | 1 + btpd/btpd.c | 9 ++ btpd/btpd.h | 6 + btpd/net.c | 276 ++------------------------------------------- btpd/net.h | 66 +---------- btpd/net_buf.c | 234 ++++++++++++++++++++++++++++++++++++++ btpd/net_buf.h | 59 ++++++++++ btpd/peer.c | 124 ++++++++++++++------ btpd/peer.h | 5 +- btpd/policy_subr.c | 31 ++--- 10 files changed, 431 insertions(+), 380 deletions(-) create mode 100644 btpd/net_buf.c create mode 100644 btpd/net_buf.h diff --git a/btpd/Makefile.am b/btpd/Makefile.am index b18b432..7135b63 100644 --- a/btpd/Makefile.am +++ b/btpd/Makefile.am @@ -3,6 +3,7 @@ btpd_SOURCES=\ btpd.c btpd.h\ cli_if.c\ net.c net.h\ + net_buf.c net_buf.h\ queue.h \ peer.c peer.h\ policy_choke.c policy_if.c policy_subr.c policy.h\ diff --git a/btpd/btpd.c b/btpd/btpd.c index 9082648..365573f 100644 --- a/btpd/btpd.c +++ b/btpd/btpd.c @@ -145,6 +145,15 @@ btpd_init(void) "More could be beneficial to the download performance.\n", nfiles); btpd.maxpeers = nfiles - 20; + + btpd.choke_msg = nb_create_choke(); + nb_hold(btpd.choke_msg); + btpd.unchoke_msg = nb_create_unchoke(); + nb_hold(btpd.unchoke_msg); + btpd.interest_msg = nb_create_interest(); + nb_hold(btpd.interest_msg); + btpd.uninterest_msg = nb_create_uninterest(); + nb_hold(btpd.uninterest_msg); } void diff --git a/btpd/btpd.h b/btpd/btpd.h index cd53e76..ef073a5 100644 --- a/btpd/btpd.h +++ b/btpd/btpd.h @@ -17,6 +17,7 @@ #include "benc.h" #include "metainfo.h" #include "iobuf.h" +#include "net_buf.h" #include "net.h" #include "peer.h" #include "torrent.h" @@ -78,6 +79,11 @@ struct btpd { struct event sigint; struct event sigterm; struct event sigchld; + + struct net_buf *choke_msg; + struct net_buf *unchoke_msg; + struct net_buf *interest_msg; + struct net_buf *uninterest_msg; }; extern struct btpd btpd; diff --git a/btpd/net.c b/btpd/net.c index 51dc120..081cdc4 100644 --- a/btpd/net.c +++ b/btpd/net.c @@ -18,8 +18,6 @@ #include "btpd.h" -#define WRITE_TIMEOUT (& (struct timeval) { 60, 0 }) - #define min(x, y) ((x) <= (y) ? (x) : (y)) static unsigned long @@ -70,101 +68,6 @@ net_read32(void *buf) return ntohl(*(uint32_t *)buf); } -static void -kill_buf_no(char *buf, size_t len) -{ - //Nothing -} - -static void -kill_buf_free(char *buf, size_t len) -{ - free(buf); -} - -int -nb_drop(struct net_buf *nb) -{ - assert(nb->refs > 0); - nb->refs--; - if (nb->refs == 0) { - nb->kill_buf(nb->buf, nb->len); - free(nb); - return 1; - } else - return 0; -} - -void -nb_hold(struct net_buf *nb) -{ - nb->refs++; -} - -struct net_buf * -nb_create_alloc(short type, size_t len) -{ - struct net_buf *nb = btpd_calloc(1, sizeof(*nb) + len); - nb->type = type; - nb->buf = (char *)(nb + 1); - nb->len = len; - nb->kill_buf = kill_buf_no; - return nb; -} - -struct net_buf * -nb_create_set(short type, char *buf, size_t len, - void (*kill_buf)(char *, size_t)) -{ - struct net_buf *nb = btpd_calloc(1, sizeof(*nb)); - nb->type = type; - nb->buf = buf; - nb->len = len; - nb->kill_buf = kill_buf; - return nb; -} - -uint32_t -nb_get_index(struct net_buf *nb) -{ - switch (nb->type) { - case NB_CANCEL: - case NB_HAVE: - case NB_PIECE: - case NB_REQUEST: - return net_read32(nb->buf + 5); - default: - abort(); - } -} - -uint32_t -nb_get_begin(struct net_buf *nb) -{ - switch (nb->type) { - case NB_CANCEL: - case NB_PIECE: - case NB_REQUEST: - return net_read32(nb->buf + 9); - default: - abort(); - } -} - -uint32_t -nb_get_length(struct net_buf *nb) -{ - switch (nb->type) { - case NB_CANCEL: - case NB_REQUEST: - return net_read32(nb->buf + 13); - case NB_PIECE: - return net_read32(nb->buf) - 9; - default: - abort(); - } -} - void kill_shake(struct input_reader *reader) { @@ -255,173 +158,6 @@ net_write(struct peer *p, unsigned long wmax) return nwritten; } -void -net_send(struct peer *p, struct net_buf *nb) -{ - struct nb_link *nl = btpd_calloc(1, sizeof(*nl)); - nl->nb = nb; - nb_hold(nb); - - if (BTPDQ_EMPTY(&p->outq)) { - assert(p->outq_off == 0); - event_add(&p->out_ev, WRITE_TIMEOUT); - } - BTPDQ_INSERT_TAIL(&p->outq, nl, entry); -} - - -/* - * Remove a network buffer from the peer's outq. - * If a part of the buffer already have been written - * to the network it cannot be removed. - * - * Returns 1 if the buffer is removed, 0 if not. - */ -int -net_unsend(struct peer *p, struct nb_link *nl) -{ - if (!(nl == BTPDQ_FIRST(&p->outq) && p->outq_off > 0)) { - BTPDQ_REMOVE(&p->outq, nl, entry); - nb_drop(nl->nb); - free(nl); - if (BTPDQ_EMPTY(&p->outq)) { - if (p->flags & PF_ON_WRITEQ) { - BTPDQ_REMOVE(&btpd.writeq, p, wq_entry); - p->flags &= ~PF_ON_WRITEQ; - } else - event_del(&p->out_ev); - } - return 1; - } else - return 0; -} - -void -net_send_piece(struct peer *p, uint32_t index, uint32_t begin, - char *block, size_t blen) -{ - struct net_buf *head, *piece; - - btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen); - - head = nb_create_alloc(NB_PIECE, 13); - net_write32(head->buf, 9 + blen); - head->buf[4] = MSG_PIECE; - net_write32(head->buf + 5, index); - net_write32(head->buf + 9, begin); - net_send(p, head); - - piece = nb_create_set(NB_TORRENTDATA, block, blen, kill_buf_free); - net_send(p, piece); -} - -void -net_send_request(struct peer *p, struct piece_req *req) -{ - struct net_buf *out = nb_create_alloc(NB_REQUEST, 17); - net_write32(out->buf, 13); - out->buf[4] = MSG_REQUEST; - net_write32(out->buf + 5, req->index); - net_write32(out->buf + 9, req->begin); - net_write32(out->buf + 13, req->length); - net_send(p, out); -} - -void -net_send_cancel(struct peer *p, struct piece_req *req) -{ - struct net_buf *out = nb_create_alloc(NB_CANCEL, 17); - net_write32(out->buf, 13); - out->buf[4] = MSG_CANCEL; - net_write32(out->buf + 5, req->index); - net_write32(out->buf + 9, req->begin); - net_write32(out->buf + 13, req->length); - net_send(p, out); -} - -void -net_send_have(struct peer *p, uint32_t index) -{ - struct net_buf *out = nb_create_alloc(NB_HAVE, 9); - net_write32(out->buf, 5); - out->buf[4] = MSG_HAVE; - net_write32(out->buf + 5, index); - net_send(p, out); -} - -void -net_send_multihave(struct peer *p) -{ - struct torrent *tp = p->tp; - struct net_buf *out = nb_create_alloc(NB_MULTIHAVE, 9 * tp->have_npieces); - for (uint32_t i = 0, count = 0; count < tp->have_npieces; i++) { - if (has_bit(tp->piece_field, i)) { - net_write32(out->buf + count * 9, 5); - out->buf[count * 9 + 4] = MSG_HAVE; - net_write32(out->buf + count * 9 + 5, i); - count++; - } - } - net_send(p, out); -} - -void -net_send_onesized(struct peer *p, char mtype, int btype) -{ - struct net_buf *out = nb_create_alloc(btype, 5); - net_write32(out->buf, 1); - out->buf[4] = mtype; - net_send(p, out); -} - -void -net_send_unchoke(struct peer *p) -{ - net_send_onesized(p, MSG_UNCHOKE, NB_UNCHOKE); -} - -void -net_send_choke(struct peer *p) -{ - net_send_onesized(p, MSG_CHOKE, NB_CHOKE); -} - -void -net_send_uninterest(struct peer *p) -{ - net_send_onesized(p, MSG_UNINTEREST, NB_UNINTEREST); -} - -void -net_send_interest(struct peer *p) -{ - net_send_onesized(p, MSG_INTEREST, NB_INTEREST); -} - -void -net_send_bitfield(struct peer *p) -{ - uint32_t plen = ceil(p->tp->meta.npieces / 8.0); - - struct net_buf *out = nb_create_alloc(NB_BITFIELD, 5); - net_write32(out->buf, plen + 1); - out->buf[4] = MSG_BITFIELD; - net_send(p, out); - - out = nb_create_set(NB_BITDATA, p->tp->piece_field, plen, kill_buf_no); - net_send(p, out); -} - -void -net_send_shake(struct peer *p) -{ - struct net_buf *out = nb_create_alloc(NB_SHAKE, 68); - bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->buf, 28); - bcopy(p->tp->meta.info_hash, out->buf + 28, 20); - bcopy(btpd.peer_id, out->buf + 48, 20); - net_send(p, out); -} - static void kill_generic(struct input_reader *reader) { @@ -761,7 +497,7 @@ net_shake_read(struct peer *p, unsigned long rmax) if (tp != NULL) { hs->state = SHAKE_INFO; p->tp = tp; - net_send_shake(p); + peer_send(p, nb_create_shake(p->tp)); } else goto bad_shake; } else { @@ -791,9 +527,11 @@ net_shake_read(struct peer *p, unsigned long rmax) net_generic_reader(p); if (p->tp->have_npieces > 0) { if (p->tp->have_npieces * 9 < 5 + ceil(p->tp->meta.npieces / 8.0)) - net_send_multihave(p); - else - net_send_bitfield(p); + peer_send(p, nb_create_multihave(p->tp)); + else { + peer_send(p, nb_create_bitfield(p->tp)); + peer_send(p, nb_create_bitdata(p->tp)); + } } cm_on_new_peer(p); } else @@ -826,7 +564,7 @@ net_handshake(struct peer *p, int incoming) hs->rd.kill = kill_shake; if (!incoming) - net_send_shake(p); + peer_send(p, nb_create_shake(p->tp)); } int diff --git a/btpd/net.h b/btpd/net.h index 9fe7b30..58ea819 100644 --- a/btpd/net.h +++ b/btpd/net.h @@ -11,43 +11,7 @@ #define MSG_PIECE 7 #define MSG_CANCEL 8 -#define NB_CHOKE 0 -#define NB_UNCHOKE 1 -#define NB_INTEREST 2 -#define NB_UNINTEREST 3 -#define NB_HAVE 4 -#define NB_BITFIELD 5 -#define NB_REQUEST 6 -#define NB_PIECE 7 -#define NB_CANCEL 8 -#define NB_TORRENTDATA 10 -#define NB_MULTIHAVE 11 -#define NB_BITDATA 12 -#define NB_SHAKE 13 - -struct net_buf { - short type; - unsigned refs; - char *buf; - size_t len; - void (*kill_buf)(char *, size_t); -}; - -struct nb_link { - struct net_buf *nb; - BTPDQ_ENTRY(nb_link) entry; -}; - -BTPDQ_HEAD(nb_tq, nb_link); - -struct net_buf *nb_create_alloc(short type, size_t len); -struct net_buf *nb_create_set(short type, char *buf, size_t len, - void (*kill_buf)(char *, size_t)); -int nb_drop(struct net_buf *nb); -void nb_hold(struct net_buf *nb); -uint32_t nb_get_index(struct net_buf *nb); -uint32_t nb_get_begin(struct net_buf *nb); -uint32_t nb_get_length(struct net_buf *nb); +#define WRITE_TIMEOUT (& (struct timeval) { 60, 0 }) struct peer; @@ -94,36 +58,18 @@ struct generic_reader { char _io_buf[MAX_INPUT_LEFT]; }; -struct piece_req { - uint32_t index, begin, length; - struct iob_link *head; /* Pointer to outgoing piece. */ - BTPDQ_ENTRY(piece_req) entry; -}; - -BTPDQ_HEAD(piece_req_tq, piece_req); - void net_connection_cb(int sd, short type, void *arg); void net_bw_rate(void); void net_bw_cb(int sd, short type, void *arg); -struct peer; - -void net_send_uninterest(struct peer *p); -void net_send_interest(struct peer *p); -void net_send_unchoke(struct peer *p); -void net_send_choke(struct peer *p); - -void net_send_have(struct peer *p, uint32_t index); -void net_send_request(struct peer *p, struct piece_req *req); -void net_send_piece(struct peer *p, uint32_t index, uint32_t begin, - char *block, size_t blen); -void net_send_cancel(struct peer *p, struct piece_req *req); -int net_unsend(struct peer *p, struct nb_link *nl); -void net_handshake(struct peer *p, int incoming); - void net_read_cb(int sd, short type, void *arg); void net_write_cb(int sd, short type, void *arg); + +void net_handshake(struct peer *p, int incoming); int net_connect2(struct sockaddr *sa, socklen_t salen, int *sd); int net_connect(const char *ip, int port, int *sd); +void net_write32(void *buf, uint32_t num); +uint32_t net_read32(void *buf); + #endif diff --git a/btpd/net_buf.c b/btpd/net_buf.c new file mode 100644 index 0000000..d39f393 --- /dev/null +++ b/btpd/net_buf.c @@ -0,0 +1,234 @@ +#include +#include + +#include "btpd.h" + +static void +kill_buf_no(char *buf, size_t len) +{ + +} + +static void +kill_buf_free(char *buf, size_t len) +{ + free(buf); +} + +static struct net_buf * +nb_create_alloc(short type, size_t len) +{ + struct net_buf *nb = btpd_calloc(1, sizeof(*nb) + len); + nb->type = type; + nb->buf = (char *)(nb + 1); + nb->len = len; + nb->kill_buf = kill_buf_no; + return nb; +} + +static struct net_buf * +nb_create_set(short type, char *buf, size_t len, + void (*kill_buf)(char *, size_t)) +{ + struct net_buf *nb = btpd_calloc(1, sizeof(*nb)); + nb->type = type; + nb->buf = buf; + nb->len = len; + nb->kill_buf = kill_buf; + return nb; +} + +static struct net_buf * +nb_create_onesized(char mtype, int btype) +{ + struct net_buf *out = nb_create_alloc(btype, 5); + net_write32(out->buf, 1); + out->buf[4] = mtype; + return out; +} + +struct net_buf * +nb_create_piece(uint32_t index, uint32_t begin, size_t blen) +{ + struct net_buf *out; + + btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen); + + out = nb_create_alloc(NB_PIECE, 13); + net_write32(out->buf, 9 + blen); + out->buf[4] = MSG_PIECE; + net_write32(out->buf + 5, index); + net_write32(out->buf + 9, begin); + return out; +} + +struct net_buf * +nb_create_torrentdata(char *block, size_t blen) +{ + struct net_buf *out; + out = nb_create_set(NB_TORRENTDATA, block, blen, kill_buf_free); + return out; +} + +struct net_buf * +nb_create_request(uint32_t index, uint32_t begin, uint32_t length) +{ + struct net_buf *out = nb_create_alloc(NB_REQUEST, 17); + net_write32(out->buf, 13); + out->buf[4] = MSG_REQUEST; + net_write32(out->buf + 5, index); + net_write32(out->buf + 9, begin); + net_write32(out->buf + 13, length); + return out; +} + +struct net_buf * +nb_create_cancel(uint32_t index, uint32_t begin, uint32_t length) +{ + struct net_buf *out = nb_create_alloc(NB_CANCEL, 17); + net_write32(out->buf, 13); + out->buf[4] = MSG_CANCEL; + net_write32(out->buf + 5, index); + net_write32(out->buf + 9, begin); + net_write32(out->buf + 13, length); + return out; +} + +struct net_buf * +nb_create_have(uint32_t index) +{ + struct net_buf *out = nb_create_alloc(NB_HAVE, 9); + net_write32(out->buf, 5); + out->buf[4] = MSG_HAVE; + net_write32(out->buf + 5, index); + return out; +} + +struct net_buf * +nb_create_multihave(struct torrent *tp) +{ + struct net_buf *out = nb_create_alloc(NB_MULTIHAVE, 9 * tp->have_npieces); + for (uint32_t i = 0, count = 0; count < tp->have_npieces; i++) { + if (has_bit(tp->piece_field, i)) { + net_write32(out->buf + count * 9, 5); + out->buf[count * 9 + 4] = MSG_HAVE; + net_write32(out->buf + count * 9 + 5, i); + count++; + } + } + return out; +} + +struct net_buf * +nb_create_unchoke(void) +{ + return nb_create_onesized(MSG_UNCHOKE, NB_UNCHOKE); +} + +struct net_buf * +nb_create_choke(void) +{ + return nb_create_onesized(MSG_CHOKE, NB_CHOKE); +} + +struct net_buf * +nb_create_uninterest(void) +{ + return nb_create_onesized(MSG_UNINTEREST, NB_UNINTEREST); +} + +struct net_buf * +nb_create_interest(void) +{ + return nb_create_onesized(MSG_INTEREST, NB_INTEREST); +} + +struct net_buf * +nb_create_bitfield(struct torrent *tp) +{ + uint32_t plen = ceil(tp->meta.npieces / 8.0); + + struct net_buf *out = nb_create_alloc(NB_BITFIELD, 5); + net_write32(out->buf, plen + 1); + out->buf[4] = MSG_BITFIELD; + return out; +} + +struct net_buf * +nb_create_bitdata(struct torrent *tp) +{ + uint32_t plen = ceil(tp->meta.npieces / 8.0); + struct net_buf *out = + nb_create_set(NB_BITDATA, tp->piece_field, plen, kill_buf_no); + return out; +} + +struct net_buf * +nb_create_shake(struct torrent *tp) +{ + struct net_buf *out = nb_create_alloc(NB_SHAKE, 68); + bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->buf, 28); + bcopy(tp->meta.info_hash, out->buf + 28, 20); + bcopy(btpd.peer_id, out->buf + 48, 20); + return out; +} + +uint32_t +nb_get_index(struct net_buf *nb) +{ + switch (nb->type) { + case NB_CANCEL: + case NB_HAVE: + case NB_PIECE: + case NB_REQUEST: + return net_read32(nb->buf + 5); + default: + abort(); + } +} + +uint32_t +nb_get_begin(struct net_buf *nb) +{ + switch (nb->type) { + case NB_CANCEL: + case NB_PIECE: + case NB_REQUEST: + return net_read32(nb->buf + 9); + default: + abort(); + } +} + +uint32_t +nb_get_length(struct net_buf *nb) +{ + switch (nb->type) { + case NB_CANCEL: + case NB_REQUEST: + return net_read32(nb->buf + 13); + case NB_PIECE: + return net_read32(nb->buf) - 9; + default: + abort(); + } +} + +int +nb_drop(struct net_buf *nb) +{ + assert(nb->refs > 0); + nb->refs--; + if (nb->refs == 0) { + nb->kill_buf(nb->buf, nb->len); + free(nb); + return 1; + } else + return 0; +} + +void +nb_hold(struct net_buf *nb) +{ + nb->refs++; +} diff --git a/btpd/net_buf.h b/btpd/net_buf.h new file mode 100644 index 0000000..7082c88 --- /dev/null +++ b/btpd/net_buf.h @@ -0,0 +1,59 @@ +#ifndef BTPD_NET_BUF_H +#define BTPD_NET_BUF_H + +#define NB_CHOKE 0 +#define NB_UNCHOKE 1 +#define NB_INTEREST 2 +#define NB_UNINTEREST 3 +#define NB_HAVE 4 +#define NB_BITFIELD 5 +#define NB_REQUEST 6 +#define NB_PIECE 7 +#define NB_CANCEL 8 +#define NB_TORRENTDATA 10 +#define NB_MULTIHAVE 11 +#define NB_BITDATA 12 +#define NB_SHAKE 13 + +struct net_buf { + short type; + unsigned refs; + char *buf; + size_t len; + void (*kill_buf)(char *, size_t); +}; + +struct nb_link { + struct net_buf *nb; + BTPDQ_ENTRY(nb_link) entry; +}; + +BTPDQ_HEAD(nb_tq, nb_link); + +struct torrent; +struct peer; + +struct net_buf *nb_create_piece(uint32_t index, uint32_t begin, size_t blen); +struct net_buf *nb_create_torrentdata(char *block, size_t blen); +struct net_buf *nb_create_request(uint32_t index, + uint32_t begin, uint32_t length); +struct net_buf *nb_create_cancel(uint32_t index, + uint32_t begin, uint32_t length); +struct net_buf *nb_create_have(uint32_t index); +struct net_buf *nb_create_multihave(struct torrent *tp); +struct net_buf *nb_create_unchoke(void); +struct net_buf *nb_create_choke(void); +struct net_buf *nb_create_uninterest(void); +struct net_buf *nb_create_interest(void); +struct net_buf *nb_create_bitfield(struct torrent *tp); +struct net_buf *nb_create_bitdata(struct torrent *tp); +struct net_buf *nb_create_shake(struct torrent *tp); + +int nb_drop(struct net_buf *nb); +void nb_hold(struct net_buf *nb); + +uint32_t nb_get_index(struct net_buf *nb); +uint32_t nb_get_begin(struct net_buf *nb); +uint32_t nb_get_length(struct net_buf *nb); + +#endif diff --git a/btpd/peer.c b/btpd/peer.c index 7126bdc..6f1eb3d 100644 --- a/btpd/peer.c +++ b/btpd/peer.c @@ -21,7 +21,6 @@ void peer_kill(struct peer *p) { struct nb_link *nl; - struct piece_req *req; btpd_log(BTPD_L_CONN, "killed peer.\n"); @@ -45,11 +44,12 @@ peer_kill(struct peer *p) free(nl); nl = next; } - req = BTPDQ_FIRST(&p->my_reqs); - while (req != NULL) { - struct piece_req *next = BTPDQ_NEXT(req, entry); - free(req); - req = next; + nl = BTPDQ_FIRST(&p->my_reqs); + while (nl != NULL) { + struct nb_link *next = BTPDQ_NEXT(nl, entry); + nb_drop(nl->nb); + free(nl); + nl = next; } p->reader->kill(p->reader); @@ -59,32 +59,82 @@ peer_kill(struct peer *p) btpd.npeers--; } +void +peer_send(struct peer *p, struct net_buf *nb) +{ + struct nb_link *nl = btpd_calloc(1, sizeof(*nl)); + nl->nb = nb; + nb_hold(nb); + + if (BTPDQ_EMPTY(&p->outq)) { + assert(p->outq_off == 0); + event_add(&p->out_ev, WRITE_TIMEOUT); + } + BTPDQ_INSERT_TAIL(&p->outq, nl, entry); +} + + +/* + * Remove a network buffer from the peer's outq. + * If a part of the buffer already have been written + * to the network it cannot be removed. + * + * Returns 1 if the buffer is removed, 0 if not. + */ +int +peer_unsend(struct peer *p, struct nb_link *nl) +{ + if (!(nl == BTPDQ_FIRST(&p->outq) && p->outq_off > 0)) { + BTPDQ_REMOVE(&p->outq, nl, entry); + nb_drop(nl->nb); + free(nl); + if (BTPDQ_EMPTY(&p->outq)) { + if (p->flags & PF_ON_WRITEQ) { + BTPDQ_REMOVE(&btpd.writeq, p, wq_entry); + p->flags &= ~PF_ON_WRITEQ; + } else + event_del(&p->out_ev); + } + return 1; + } else + return 0; +} + void peer_request(struct peer *p, uint32_t index, uint32_t begin, uint32_t len) { if (p->tp->endgame == 0) assert(p->nreqs_out < MAXPIPEDREQUESTS); p->nreqs_out++; - struct piece_req *req = btpd_calloc(1, sizeof(*req)); - req->index = index; - req->begin = begin; - req->length = len; - BTPDQ_INSERT_TAIL(&p->my_reqs, req, entry); - net_send_request(p, req); + struct net_buf *nb = nb_create_request(index, begin, len); + struct nb_link *nl = btpd_calloc(1, sizeof(*nl)); + nl->nb = nb; + nb_hold(nb); + BTPDQ_INSERT_TAIL(&p->my_reqs, nl, entry); + peer_send(p, nb); } void peer_cancel(struct peer *p, uint32_t index, uint32_t begin, uint32_t len) { - struct piece_req *req; + struct net_buf *nb = NULL; + struct nb_link *nl; again: - BTPDQ_FOREACH(req, &p->my_reqs, entry) - if (index == req->index && begin == req->begin && len == req->length) + BTPDQ_FOREACH(nl, &p->my_reqs, entry) { + int match = nb_get_begin(nl->nb) == begin + && nb_get_index(nl->nb) == index + && nb_get_length(nl->nb) == len; + if (match) break; - if (req != NULL) { - net_send_cancel(p, req); - BTPDQ_REMOVE(&p->my_reqs, req, entry); - free(req); + } + if (nl != NULL) { + if (nb == NULL) { + nb = nb_create_cancel(index, begin, len); + peer_send(p, nb); + } + BTPDQ_REMOVE(&p->my_reqs, nl, entry); + nb_drop(nl->nb); + free(nl); p->nreqs_out--; goto again; } @@ -93,14 +143,14 @@ again: void peer_have(struct peer *p, uint32_t index) { - net_send_have(p, index); + peer_send(p, nb_create_have(index)); } void peer_unchoke(struct peer *p) { p->flags &= ~PF_I_CHOKE; - net_send_unchoke(p); + peer_send(p, btpd.unchoke_msg); } void @@ -112,14 +162,14 @@ peer_choke(struct peer *p) if (nl->nb->type == NB_PIECE) { struct nb_link *data = next; next = BTPDQ_NEXT(next, entry); - if (net_unsend(p, nl)) - net_unsend(p, data); + if (peer_unsend(p, nl)) + peer_unsend(p, data); } nl = next; } p->flags |= PF_I_CHOKE; - net_send_choke(p); + peer_send(p, btpd.choke_msg); } void @@ -129,7 +179,7 @@ peer_want(struct peer *p, uint32_t index) p->nwant++; if (p->nwant == 1) { p->flags |= PF_I_WANT; - net_send_interest(p); + peer_send(p, btpd.interest_msg); } } @@ -140,7 +190,7 @@ peer_unwant(struct peer *p, uint32_t index) p->nwant--; if (p->nwant == 0) { p->flags &= ~PF_I_WANT; - net_send_uninterest(p); + peer_send(p, btpd.uninterest_msg); } } @@ -275,16 +325,17 @@ void peer_on_piece(struct peer *p, uint32_t index, uint32_t begin, uint32_t length, const char *data) { - struct piece_req *req = BTPDQ_FIRST(&p->my_reqs); - if (req != NULL && - req->index == index && - req->begin == begin && - req->length == length) { + struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs); + if (nl != NULL && + nb_get_begin(nl->nb) == begin && + nb_get_index(nl->nb) == index && + nb_get_length(nl->nb) == length) { assert(p->nreqs_out > 0); p->nreqs_out--; - BTPDQ_REMOVE(&p->my_reqs, req, entry); - free(req); + BTPDQ_REMOVE(&p->my_reqs, nl, entry); + nb_drop(nl->nb); + free(nl); cm_on_block(p, index, begin, length, data); } @@ -296,7 +347,8 @@ peer_on_request(struct peer *p, uint32_t index, uint32_t begin, { off_t cbegin = index * p->tp->meta.piece_length + begin; char * content = torrent_get_bytes(p->tp, cbegin, length); - net_send_piece(p, index, begin, content, length); + peer_send(p, nb_create_piece(index, begin, length)); + peer_send(p, nb_create_torrentdata(content, length)); } void @@ -310,8 +362,8 @@ peer_on_cancel(struct peer *p, uint32_t index, uint32_t begin, && nb_get_index(nl->nb) == index && nb_get_length(nl->nb) == length) { struct nb_link *data = BTPDQ_NEXT(nl, entry); - if (net_unsend(p, nl)) - net_unsend(p, data); + if (peer_unsend(p, nl)) + peer_unsend(p, data); break; } } diff --git a/btpd/peer.h b/btpd/peer.h index 66cbfd2..dd9db0f 100644 --- a/btpd/peer.h +++ b/btpd/peer.h @@ -25,7 +25,7 @@ struct peer { struct torrent *tp; - struct piece_req_tq my_reqs; + struct nb_tq my_reqs; unsigned nreqs_out; @@ -48,6 +48,9 @@ struct peer { BTPDQ_HEAD(peer_tq, peer); +void peer_send(struct peer *p, struct net_buf *nb); +int peer_unsend(struct peer *p, struct nb_link *nl); + void peer_unchoke(struct peer *p); void peer_choke(struct peer *p); void peer_unwant(struct peer *p, uint32_t index); diff --git a/btpd/policy_subr.c b/btpd/policy_subr.c index 8f8e88e..142abf8 100644 --- a/btpd/policy_subr.c +++ b/btpd/policy_subr.c @@ -396,20 +396,22 @@ cm_unassign_requests(struct peer *p) while (pc != NULL) { int was_full = piece_full(pc); - struct piece_req *req = BTPDQ_FIRST(&p->my_reqs); - while (req != NULL) { - struct piece_req *next = BTPDQ_NEXT(req, entry); + struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs); + while (nl != NULL) { + struct nb_link *next = BTPDQ_NEXT(nl, entry); - if (pc->index == req->index) { + if (pc->index == nb_get_index(nl->nb)) { + uint32_t block = nb_get_begin(nl->nb) / PIECE_BLOCKLEN; // XXX: Needs to be looked at if we introduce snubbing. - assert(has_bit(pc->down_field, req->begin / PIECE_BLOCKLEN)); - clear_bit(pc->down_field, req->begin / PIECE_BLOCKLEN); + assert(has_bit(pc->down_field, block)); + clear_bit(pc->down_field, block); pc->nbusy--; - BTPDQ_REMOVE(&p->my_reqs, req, entry); - free(req); + BTPDQ_REMOVE(&p->my_reqs, nl, entry); + nb_drop(nl->nb); + free(nl); } - req = next; + nl = next; } if (was_full && !piece_full(pc)) @@ -449,11 +451,12 @@ cm_assign_requests_eg(struct peer *p) void cm_unassign_requests_eg(struct peer *p) { - struct piece_req *req = BTPDQ_FIRST(&p->my_reqs); - while (req != NULL) { - struct piece_req *next = BTPDQ_NEXT(req, entry); - free(req); - req = next; + struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs); + while (nl != NULL) { + struct nb_link *next = BTPDQ_NEXT(nl, entry); + nb_drop(nl->nb); + free(nl); + nl = next; } BTPDQ_INIT(&p->my_reqs); p->nreqs_out = 0;