diff --git a/btpd/net.c b/btpd/net.c index 80810c3..493b280 100644 --- a/btpd/net.c +++ b/btpd/net.c @@ -59,44 +59,65 @@ net_write_cb(int sd, short type, void *arg) } static void -nokill_iob(struct io_buffer *iob) +kill_buf_no(char *buf, size_t len) { //Nothing } static void -kill_free_buf(struct io_buffer *iob) +kill_buf_free(char *buf, size_t len) { - free(iob->buf); + free(buf); } -static struct iob_link * -malloc_liob(size_t len) +int +nb_drop(struct net_buf *nb) +{ + assert(nb->refs > 0); + nb->refs--; + if (nb->refs == 0) { + nb->kill_buf(nb->buf, nb->len); + free(nb); + return 1; + } else + return 0; +} + +void +nb_hold(struct net_buf *nb) { - struct iob_link *iol; - iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol) + len); - iol->iob.buf = (char *)(iol + 1); - iol->iob.buf_len = len; - iol->iob.buf_off = 0; - iol->kill_buf = nokill_iob; - return iol; + nb->refs++; } -static struct iob_link * -salloc_liob(char *buf, size_t len, void (*kill_buf)(struct io_buffer *)) +struct net_buf * +nb_create_alloc(short type, size_t len) { - struct iob_link *iol; - iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol)); - iol->iob.buf = buf; - iol->iob.buf_len = len; - iol->iob.buf_off = 0; - iol->kill_buf = kill_buf; - return iol; + struct net_buf *nb = btpd_calloc(1, sizeof(*nb) + len); + nb->info.type = type; + nb->buf = (char *)(nb + 1); + nb->len = len; + nb->kill_buf = kill_buf_no; + nb_hold(nb); + return nb; } +struct net_buf * +nb_create_set(short type, char *buf, size_t len, + void (*kill_buf)(char *, size_t)) +{ + struct net_buf *nb = btpd_calloc(1, sizeof(*nb)); + nb->info.type = type; + nb->buf = buf; + nb->len = len; + nb->kill_buf = kill_buf; + nb_hold(nb); + return nb; +} +#if 0 void net_unsend_piece(struct peer *p, struct piece_req *req) { + struct nb_link *nl; struct iob_link *piece; BTPDQ_REMOVE(&p->p_reqs, req, entry); @@ -119,6 +140,7 @@ net_unsend_piece(struct peer *p, struct piece_req *req) event_del(&p->out_ev); } } +#endif void kill_shake(struct input_reader *reader) @@ -131,8 +153,7 @@ kill_shake(struct input_reader *reader) static unsigned long net_write(struct peer *p, unsigned long wmax) { - struct iob_link *iol; - struct piece_req *req; + struct nb_link *nl; struct iovec iov[NIOV]; int niov; int limited; @@ -142,18 +163,22 @@ net_write(struct peer *p, unsigned long wmax) limited = wmax > 0; niov = 0; - assert((iol = BTPDQ_FIRST(&p->outq)) != NULL); - while (niov < NIOV && iol != NULL - && (!limited || (limited && wmax > 0))) { - iov[niov].iov_base = iol->iob.buf + iol->iob.buf_off; - iov[niov].iov_len = iol->iob.buf_len - iol->iob.buf_off; + assert((nl = BTPDQ_FIRST(&p->outq)) != NULL); + while (niov < NIOV && nl != NULL && (!limited || (limited && wmax > 0))) { + if (niov > 0) { + iov[niov].iov_base = nl->nb->buf; + iov[niov].iov_len = nl->nb->len; + } else { + iov[niov].iov_base = nl->nb->buf + p->outq_off; + iov[niov].iov_len = nl->nb->len - p->outq_off; + } if (limited) { if (iov[niov].iov_len > wmax) iov[niov].iov_len = wmax; wmax -= iov[niov].iov_len; } niov++; - iol = BTPDQ_NEXT(iol, entry); + nl = BTPDQ_NEXT(nl, entry); } nwritten = writev(p->sd, iov, niov); @@ -174,32 +199,26 @@ net_write(struct peer *p, unsigned long wmax) bcount = nwritten; - req = BTPDQ_FIRST(&p->p_reqs); - iol = BTPDQ_FIRST(&p->outq); + nl = BTPDQ_FIRST(&p->outq); while (bcount > 0) { - unsigned long bufdelta = iol->iob.buf_len - iol->iob.buf_off; - if (req != NULL && req->head == iol) { - struct piece_req *next = BTPDQ_NEXT(req, entry); - BTPDQ_REMOVE(&p->p_reqs, req, entry); - free(req); - req = next; - } + unsigned long bufdelta = nl->nb->len - p->outq_off; if (bcount >= bufdelta) { - if (iol->upload) { + if (nl->nb->info.type == NB_TORRENTDATA) { p->tp->uploaded += bufdelta; p->rate_from_me[btpd.seconds % RATEHISTORY] += bufdelta; } bcount -= bufdelta; - BTPDQ_REMOVE(&p->outq, iol, entry); - iol->kill_buf(&iol->iob); - free(iol); - iol = BTPDQ_FIRST(&p->outq); + BTPDQ_REMOVE(&p->outq, nl, entry); + nb_drop(nl->nb); + free(nl); + p->outq_off = 0; + nl = BTPDQ_FIRST(&p->outq); } else { - if (iol->upload) { + if (nl->nb->info.type == NB_TORRENTDATA) { p->tp->uploaded += bcount; p->rate_from_me[btpd.seconds % RATEHISTORY] += bcount; } - iol->iob.buf_off += bcount; + p->outq_off += bcount; bcount = 0; } } @@ -214,11 +233,16 @@ net_write(struct peer *p, unsigned long wmax) } void -net_send(struct peer *p, struct iob_link *iol) +net_send(struct peer *p, struct net_buf *nb) { - if (BTPDQ_EMPTY(&p->outq)) + struct nb_link *nl = btpd_calloc(1, sizeof(*nl)); + nl->nb = nb; + + if (BTPDQ_EMPTY(&p->outq)) { + assert(p->outq_off == 0); event_add(&p->out_ev, WRITE_TIMEOUT); - BTPDQ_INSERT_TAIL(&p->outq, iol, entry); + } + BTPDQ_INSERT_TAIL(&p->outq, nl, entry); } void @@ -237,64 +261,62 @@ void net_send_piece(struct peer *p, uint32_t index, uint32_t begin, char *block, size_t blen) { - struct iob_link *head, *piece; - struct piece_req *req; + struct net_buf *head, *piece; btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen); - head = malloc_liob(13); - net_write32(head->iob.buf, 9 + blen); - head->iob.buf[4] = MSG_PIECE; - net_write32(head->iob.buf + 5, index); - net_write32(head->iob.buf + 9, begin); + head = nb_create_alloc(NB_PIECE, 13); + net_write32(head->buf, 9 + blen); + head->buf[4] = MSG_PIECE; + net_write32(head->buf + 5, index); + net_write32(head->buf + 9, begin); net_send(p, head); - piece = salloc_liob(block, blen, kill_free_buf); - piece->upload = 1; + piece = nb_create_set(NB_TORRENTDATA, block, blen, kill_buf_free); + piece->info.index = index; + piece->info.begin = begin; + piece->info.length = blen; net_send(p, piece); - - req = btpd_malloc(sizeof(*req)); - req->index = index; - req->begin = begin; - req->length = blen; - req->head = head; - BTPDQ_INSERT_TAIL(&p->p_reqs, req, entry); } void net_send_request(struct peer *p, struct piece_req *req) { - struct iob_link *out; - out = malloc_liob(17); - net_write32(out->iob.buf, 13); - out->iob.buf[4] = MSG_REQUEST; - net_write32(out->iob.buf + 5, req->index); - net_write32(out->iob.buf + 9, req->begin); - net_write32(out->iob.buf + 13, req->length); + struct net_buf *out = nb_create_alloc(NB_REQUEST, 17); + net_write32(out->buf, 13); + out->buf[4] = MSG_REQUEST; + net_write32(out->buf + 5, req->index); + net_write32(out->buf + 9, req->begin); + net_write32(out->buf + 13, req->length); + out->info.index = req->index; + out->info.begin = req->begin; + out->info.length = req->length; net_send(p, out); } void net_send_cancel(struct peer *p, struct piece_req *req) { - struct iob_link *out; - out = malloc_liob(17); - net_write32(out->iob.buf, 13); - out->iob.buf[4] = MSG_CANCEL; - net_write32(out->iob.buf + 5, req->index); - net_write32(out->iob.buf + 9, req->begin); - net_write32(out->iob.buf + 13, req->length); + struct net_buf *out = nb_create_alloc(NB_CANCEL, 17); + net_write32(out->buf, 13); + out->buf[4] = MSG_CANCEL; + net_write32(out->buf + 5, req->index); + net_write32(out->buf + 9, req->begin); + net_write32(out->buf + 13, req->length); + out->info.index = req->index; + out->info.begin = req->begin; + out->info.length = req->length; net_send(p, out); } void net_send_have(struct peer *p, uint32_t index) { - struct iob_link *out; - out = malloc_liob(9); - net_write32(out->iob.buf, 5); - out->iob.buf[4] = MSG_HAVE; - net_write32(out->iob.buf + 5, index); + struct net_buf *out = nb_create_alloc(NB_HAVE, 9); + net_write32(out->buf, 5); + out->buf[4] = MSG_HAVE; + net_write32(out->buf + 5, index); + out->info.index = index; net_send(p, out); } @@ -302,13 +324,12 @@ void net_send_multihave(struct peer *p) { struct torrent *tp = p->tp; - struct iob_link *out; - out = malloc_liob(9 * tp->have_npieces); + struct net_buf *out = nb_create_alloc(NB_MULTIHAVE, 9 * tp->have_npieces); for (uint32_t i = 0, count = 0; count < tp->have_npieces; i++) { if (has_bit(tp->piece_field, i)) { - net_write32(out->iob.buf + count * 9, 5); - out->iob.buf[count * 9 + 4] = MSG_HAVE; - net_write32(out->iob.buf + count * 9 + 5, i); + net_write32(out->buf + count * 9, 5); + out->buf[count * 9 + 4] = MSG_HAVE; + net_write32(out->buf + count * 9 + 5, i); count++; } } @@ -316,63 +337,59 @@ net_send_multihave(struct peer *p) } void -net_send_onesized(struct peer *p, char type) +net_send_onesized(struct peer *p, char mtype, int btype) { - struct iob_link *out; - out = malloc_liob(5); - net_write32(out->iob.buf, 1); - out->iob.buf[4] = type; + struct net_buf *out = nb_create_alloc(btype, 5); + net_write32(out->buf, 1); + out->buf[4] = mtype; net_send(p, out); } void net_send_unchoke(struct peer *p) { - net_send_onesized(p, MSG_UNCHOKE); + net_send_onesized(p, MSG_UNCHOKE, NB_UNCHOKE); } void net_send_choke(struct peer *p) { - net_send_onesized(p, MSG_CHOKE); + net_send_onesized(p, MSG_CHOKE, NB_CHOKE); } void net_send_uninterest(struct peer *p) { - net_send_onesized(p, MSG_UNINTEREST); + net_send_onesized(p, MSG_UNINTEREST, NB_UNINTEREST); } void net_send_interest(struct peer *p) { - net_send_onesized(p, MSG_INTEREST); + net_send_onesized(p, MSG_INTEREST, NB_INTEREST); } void net_send_bitfield(struct peer *p) { - struct iob_link *out; - uint32_t plen; + uint32_t plen = ceil(p->tp->meta.npieces / 8.0); - plen = (uint32_t)ceil(p->tp->meta.npieces / 8.0); - out = malloc_liob(5); - net_write32(out->iob.buf, plen + 1); - out->iob.buf[4] = MSG_BITFIELD; + struct net_buf *out = nb_create_alloc(NB_BITFIELD, 5); + net_write32(out->buf, plen + 1); + out->buf[4] = MSG_BITFIELD; net_send(p, out); - out = salloc_liob(p->tp->piece_field, plen, nokill_iob); + out = nb_create_set(NB_BITDATA, p->tp->piece_field, plen, kill_buf_no); net_send(p, out); } void net_send_shake(struct peer *p) { - struct iob_link *out; - out = malloc_liob(68); - bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->iob.buf, 28); - bcopy(p->tp->meta.info_hash, out->iob.buf + 28, 20); - bcopy(btpd.peer_id, out->iob.buf + 48, 20); + struct net_buf *out = nb_create_alloc(NB_SHAKE, 68); + bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->buf, 28); + bcopy(p->tp->meta.info_hash, out->buf + 28, 20); + bcopy(btpd.peer_id, out->buf + 48, 20); net_send(p, out); } diff --git a/btpd/net.h b/btpd/net.h index af01607..2dc377a 100644 --- a/btpd/net.h +++ b/btpd/net.h @@ -11,14 +11,45 @@ #define MSG_PIECE 7 #define MSG_CANCEL 8 -struct iob_link { - int upload; - BTPDQ_ENTRY(iob_link) entry; - void (*kill_buf)(struct io_buffer *); - struct io_buffer iob; +#define NB_CHOKE 0 +#define NB_UNCHOKE 1 +#define NB_INTEREST 2 +#define NB_UNINTEREST 3 +#define NB_HAVE 4 +#define NB_BITFIELD 5 +#define NB_REQUEST 6 +#define NB_PIECE 7 +#define NB_CANCEL 8 +#define NB_TORRENTDATA 10 +#define NB_MULTIHAVE 11 +#define NB_BITDATA 12 +#define NB_SHAKE 13 + +struct net_buf { + unsigned refs; + + struct { + short type; + uint32_t index, begin, length; + } info; + + char *buf; + size_t len; + void (*kill_buf)(char *, size_t); +}; + +struct nb_link { + struct net_buf *nb; + BTPDQ_ENTRY(nb_link) entry; }; -BTPDQ_HEAD(io_tq, iob_link); +BTPDQ_HEAD(nb_tq, nb_link); + +struct net_buf *nb_create_alloc(short type, size_t len); +struct net_buf *nb_create_set(short type, char *buf, size_t len, + void (*kill_buf)(char *, size_t)); +int nb_drop(struct net_buf *nb); +void nb_hold(struct net_buf *nb); struct peer; diff --git a/btpd/peer.c b/btpd/peer.c index fb6ab5e..cc57a5d 100644 --- a/btpd/peer.c +++ b/btpd/peer.c @@ -20,7 +20,7 @@ peer_get_rate(unsigned long *rates) void peer_kill(struct peer *p) { - struct iob_link *iol; + struct nb_link *nl; struct piece_req *req; btpd_log(BTPD_L_CONN, "killed peer.\n"); @@ -38,18 +38,12 @@ peer_kill(struct peer *p) event_del(&p->in_ev); event_del(&p->out_ev); - iol = BTPDQ_FIRST(&p->outq); - while (iol != NULL) { - struct iob_link *next = BTPDQ_NEXT(iol, entry); - iol->kill_buf(&iol->iob); - free(iol); - iol = next; - } - req = BTPDQ_FIRST(&p->p_reqs); - while (req != NULL) { - struct piece_req *next = BTPDQ_NEXT(req, entry); - free(req); - req = next; + nl = BTPDQ_FIRST(&p->outq); + while (nl != NULL) { + struct nb_link *next = BTPDQ_NEXT(nl, entry); + nb_drop(nl->nb); + free(nl); + nl = next; } req = BTPDQ_FIRST(&p->my_reqs); while (req != NULL) { @@ -113,10 +107,22 @@ peer_unchoke(struct peer *p) void peer_choke(struct peer *p) { - struct piece_req *req; - - while ((req = BTPDQ_FIRST(&p->p_reqs)) != NULL) - net_unsend_piece(p, req); + struct nb_link *nl = BTPDQ_FIRST(&p->outq); + while (nl != NULL) { + struct nb_link *next = BTPDQ_NEXT(nl, entry); + if (nl->nb->info.type == NB_PIECE + && (nl != BTPDQ_FIRST(&p->outq) && p->outq_off > 0)) { + nb_drop(nl->nb); + BTPDQ_REMOVE(&p->outq, nl, entry); + free(nl); + nl = next; + next = BTPDQ_NEXT(next, entry); + nb_drop(nl->nb); + BTPDQ_REMOVE(&p->outq, nl, entry); + free(nl); + } + nl = next; + } p->flags |= PF_I_CHOKE; net_send_choke(p); @@ -151,7 +157,6 @@ peer_create_common(int sd) p->sd = sd; p->flags = PF_I_CHOKE | PF_P_CHOKE; - BTPDQ_INIT(&p->p_reqs); BTPDQ_INIT(&p->my_reqs); BTPDQ_INIT(&p->outq); @@ -304,15 +309,23 @@ void peer_on_cancel(struct peer *p, uint32_t index, uint32_t begin, uint32_t length) { - struct piece_req *req = BTPDQ_FIRST(&p->p_reqs); - while (req != NULL) { - if (req->index == index - && req->begin == begin && req->length == length) { + struct nb_link *nl = BTPDQ_FIRST(&p->outq); + while (nl != NULL) { + if (nl->nb->info.type == NB_PIECE + && nl->nb->info.index == index + && nl->nb->info.begin == begin + && nl->nb->info.length == length + && (nl != BTPDQ_FIRST(&p->outq) && p->outq_off > 0)) { btpd_log(BTPD_L_MSG, "cancel matched.\n"); - net_unsend_piece(p, req); - break; + struct nb_link *data = BTPDQ_NEXT(nl, entry); + nb_drop(nl->nb); + BTPDQ_REMOVE(&p->outq, nl, entry); + free(nl); + nb_drop(data->nb); + BTPDQ_REMOVE(&p->outq, data, entry); + free(data); } - req = BTPDQ_NEXT(req, entry); + nl = BTPDQ_NEXT(nl, entry); } } diff --git a/btpd/peer.h b/btpd/peer.h index 863005b..66cbfd2 100644 --- a/btpd/peer.h +++ b/btpd/peer.h @@ -25,11 +25,12 @@ struct peer { struct torrent *tp; - struct piece_req_tq p_reqs, my_reqs; + struct piece_req_tq my_reqs; unsigned nreqs_out; - struct io_tq outq; + size_t outq_off; + struct nb_tq outq; struct event in_ev; struct event out_ev;