Pārlūkot izejas kodu

* Rearrange some code. Mostly from net to net_buf and peer.

* Use the new net_bufs where it makes sense.
* Take advantage of the reference count on net_bufs and only allocate
  the (un)choke and (un)interest messages once.
master
Richard Nyberg pirms 19 gadiem
vecāks
revīzija
2acdcff5a6
10 mainītis faili ar 431 papildinājumiem un 380 dzēšanām
  1. +1
    -0
      btpd/Makefile.am
  2. +9
    -0
      btpd/btpd.c
  3. +6
    -0
      btpd/btpd.h
  4. +7
    -269
      btpd/net.c
  5. +6
    -60
      btpd/net.h
  6. +234
    -0
      btpd/net_buf.c
  7. +59
    -0
      btpd/net_buf.h
  8. +88
    -36
      btpd/peer.c
  9. +4
    -1
      btpd/peer.h
  10. +17
    -14
      btpd/policy_subr.c

+ 1
- 0
btpd/Makefile.am Parādīt failu

@@ -3,6 +3,7 @@ btpd_SOURCES=\
btpd.c btpd.h\
cli_if.c\
net.c net.h\
net_buf.c net_buf.h\
queue.h \
peer.c peer.h\
policy_choke.c policy_if.c policy_subr.c policy.h\


+ 9
- 0
btpd/btpd.c Parādīt failu

@@ -145,6 +145,15 @@ btpd_init(void)
"More could be beneficial to the download performance.\n",
nfiles);
btpd.maxpeers = nfiles - 20;

btpd.choke_msg = nb_create_choke();
nb_hold(btpd.choke_msg);
btpd.unchoke_msg = nb_create_unchoke();
nb_hold(btpd.unchoke_msg);
btpd.interest_msg = nb_create_interest();
nb_hold(btpd.interest_msg);
btpd.uninterest_msg = nb_create_uninterest();
nb_hold(btpd.uninterest_msg);
}

void


+ 6
- 0
btpd/btpd.h Parādīt failu

@@ -17,6 +17,7 @@
#include "benc.h"
#include "metainfo.h"
#include "iobuf.h"
#include "net_buf.h"
#include "net.h"
#include "peer.h"
#include "torrent.h"
@@ -78,6 +79,11 @@ struct btpd {
struct event sigint;
struct event sigterm;
struct event sigchld;

struct net_buf *choke_msg;
struct net_buf *unchoke_msg;
struct net_buf *interest_msg;
struct net_buf *uninterest_msg;
};

extern struct btpd btpd;


+ 7
- 269
btpd/net.c Parādīt failu

@@ -18,8 +18,6 @@

#include "btpd.h"

#define WRITE_TIMEOUT (& (struct timeval) { 60, 0 })

#define min(x, y) ((x) <= (y) ? (x) : (y))

static unsigned long
@@ -70,101 +68,6 @@ net_read32(void *buf)
return ntohl(*(uint32_t *)buf);
}

static void
kill_buf_no(char *buf, size_t len)
{
//Nothing
}

static void
kill_buf_free(char *buf, size_t len)
{
free(buf);
}

int
nb_drop(struct net_buf *nb)
{
assert(nb->refs > 0);
nb->refs--;
if (nb->refs == 0) {
nb->kill_buf(nb->buf, nb->len);
free(nb);
return 1;
} else
return 0;
}

void
nb_hold(struct net_buf *nb)
{
nb->refs++;
}

struct net_buf *
nb_create_alloc(short type, size_t len)
{
struct net_buf *nb = btpd_calloc(1, sizeof(*nb) + len);
nb->type = type;
nb->buf = (char *)(nb + 1);
nb->len = len;
nb->kill_buf = kill_buf_no;
return nb;
}

struct net_buf *
nb_create_set(short type, char *buf, size_t len,
void (*kill_buf)(char *, size_t))
{
struct net_buf *nb = btpd_calloc(1, sizeof(*nb));
nb->type = type;
nb->buf = buf;
nb->len = len;
nb->kill_buf = kill_buf;
return nb;
}

uint32_t
nb_get_index(struct net_buf *nb)
{
switch (nb->type) {
case NB_CANCEL:
case NB_HAVE:
case NB_PIECE:
case NB_REQUEST:
return net_read32(nb->buf + 5);
default:
abort();
}
}

uint32_t
nb_get_begin(struct net_buf *nb)
{
switch (nb->type) {
case NB_CANCEL:
case NB_PIECE:
case NB_REQUEST:
return net_read32(nb->buf + 9);
default:
abort();
}
}

uint32_t
nb_get_length(struct net_buf *nb)
{
switch (nb->type) {
case NB_CANCEL:
case NB_REQUEST:
return net_read32(nb->buf + 13);
case NB_PIECE:
return net_read32(nb->buf) - 9;
default:
abort();
}
}

void
kill_shake(struct input_reader *reader)
{
@@ -255,173 +158,6 @@ net_write(struct peer *p, unsigned long wmax)
return nwritten;
}

void
net_send(struct peer *p, struct net_buf *nb)
{
struct nb_link *nl = btpd_calloc(1, sizeof(*nl));
nl->nb = nb;
nb_hold(nb);

if (BTPDQ_EMPTY(&p->outq)) {
assert(p->outq_off == 0);
event_add(&p->out_ev, WRITE_TIMEOUT);
}
BTPDQ_INSERT_TAIL(&p->outq, nl, entry);
}


/*
* Remove a network buffer from the peer's outq.
* If a part of the buffer already have been written
* to the network it cannot be removed.
*
* Returns 1 if the buffer is removed, 0 if not.
*/
int
net_unsend(struct peer *p, struct nb_link *nl)
{
if (!(nl == BTPDQ_FIRST(&p->outq) && p->outq_off > 0)) {
BTPDQ_REMOVE(&p->outq, nl, entry);
nb_drop(nl->nb);
free(nl);
if (BTPDQ_EMPTY(&p->outq)) {
if (p->flags & PF_ON_WRITEQ) {
BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
p->flags &= ~PF_ON_WRITEQ;
} else
event_del(&p->out_ev);
}
return 1;
} else
return 0;
}

void
net_send_piece(struct peer *p, uint32_t index, uint32_t begin,
char *block, size_t blen)
{
struct net_buf *head, *piece;

btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen);

head = nb_create_alloc(NB_PIECE, 13);
net_write32(head->buf, 9 + blen);
head->buf[4] = MSG_PIECE;
net_write32(head->buf + 5, index);
net_write32(head->buf + 9, begin);
net_send(p, head);

piece = nb_create_set(NB_TORRENTDATA, block, blen, kill_buf_free);
net_send(p, piece);
}

void
net_send_request(struct peer *p, struct piece_req *req)
{
struct net_buf *out = nb_create_alloc(NB_REQUEST, 17);
net_write32(out->buf, 13);
out->buf[4] = MSG_REQUEST;
net_write32(out->buf + 5, req->index);
net_write32(out->buf + 9, req->begin);
net_write32(out->buf + 13, req->length);
net_send(p, out);
}

void
net_send_cancel(struct peer *p, struct piece_req *req)
{
struct net_buf *out = nb_create_alloc(NB_CANCEL, 17);
net_write32(out->buf, 13);
out->buf[4] = MSG_CANCEL;
net_write32(out->buf + 5, req->index);
net_write32(out->buf + 9, req->begin);
net_write32(out->buf + 13, req->length);
net_send(p, out);
}

void
net_send_have(struct peer *p, uint32_t index)
{
struct net_buf *out = nb_create_alloc(NB_HAVE, 9);
net_write32(out->buf, 5);
out->buf[4] = MSG_HAVE;
net_write32(out->buf + 5, index);
net_send(p, out);
}

void
net_send_multihave(struct peer *p)
{
struct torrent *tp = p->tp;
struct net_buf *out = nb_create_alloc(NB_MULTIHAVE, 9 * tp->have_npieces);
for (uint32_t i = 0, count = 0; count < tp->have_npieces; i++) {
if (has_bit(tp->piece_field, i)) {
net_write32(out->buf + count * 9, 5);
out->buf[count * 9 + 4] = MSG_HAVE;
net_write32(out->buf + count * 9 + 5, i);
count++;
}
}
net_send(p, out);
}

void
net_send_onesized(struct peer *p, char mtype, int btype)
{
struct net_buf *out = nb_create_alloc(btype, 5);
net_write32(out->buf, 1);
out->buf[4] = mtype;
net_send(p, out);
}

void
net_send_unchoke(struct peer *p)
{
net_send_onesized(p, MSG_UNCHOKE, NB_UNCHOKE);
}

void
net_send_choke(struct peer *p)
{
net_send_onesized(p, MSG_CHOKE, NB_CHOKE);
}

void
net_send_uninterest(struct peer *p)
{
net_send_onesized(p, MSG_UNINTEREST, NB_UNINTEREST);
}

void
net_send_interest(struct peer *p)
{
net_send_onesized(p, MSG_INTEREST, NB_INTEREST);
}

void
net_send_bitfield(struct peer *p)
{
uint32_t plen = ceil(p->tp->meta.npieces / 8.0);

struct net_buf *out = nb_create_alloc(NB_BITFIELD, 5);
net_write32(out->buf, plen + 1);
out->buf[4] = MSG_BITFIELD;
net_send(p, out);
out = nb_create_set(NB_BITDATA, p->tp->piece_field, plen, kill_buf_no);
net_send(p, out);
}

void
net_send_shake(struct peer *p)
{
struct net_buf *out = nb_create_alloc(NB_SHAKE, 68);
bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->buf, 28);
bcopy(p->tp->meta.info_hash, out->buf + 28, 20);
bcopy(btpd.peer_id, out->buf + 48, 20);
net_send(p, out);
}

static void
kill_generic(struct input_reader *reader)
{
@@ -761,7 +497,7 @@ net_shake_read(struct peer *p, unsigned long rmax)
if (tp != NULL) {
hs->state = SHAKE_INFO;
p->tp = tp;
net_send_shake(p);
peer_send(p, nb_create_shake(p->tp));
} else
goto bad_shake;
} else {
@@ -791,9 +527,11 @@ net_shake_read(struct peer *p, unsigned long rmax)
net_generic_reader(p);
if (p->tp->have_npieces > 0) {
if (p->tp->have_npieces * 9 < 5 + ceil(p->tp->meta.npieces / 8.0))
net_send_multihave(p);
else
net_send_bitfield(p);
peer_send(p, nb_create_multihave(p->tp));
else {
peer_send(p, nb_create_bitfield(p->tp));
peer_send(p, nb_create_bitdata(p->tp));
}
}
cm_on_new_peer(p);
} else
@@ -826,7 +564,7 @@ net_handshake(struct peer *p, int incoming)
hs->rd.kill = kill_shake;

if (!incoming)
net_send_shake(p);
peer_send(p, nb_create_shake(p->tp));
}

int


+ 6
- 60
btpd/net.h Parādīt failu

@@ -11,43 +11,7 @@
#define MSG_PIECE 7
#define MSG_CANCEL 8

#define NB_CHOKE 0
#define NB_UNCHOKE 1
#define NB_INTEREST 2
#define NB_UNINTEREST 3
#define NB_HAVE 4
#define NB_BITFIELD 5
#define NB_REQUEST 6
#define NB_PIECE 7
#define NB_CANCEL 8
#define NB_TORRENTDATA 10
#define NB_MULTIHAVE 11
#define NB_BITDATA 12
#define NB_SHAKE 13

struct net_buf {
short type;
unsigned refs;
char *buf;
size_t len;
void (*kill_buf)(char *, size_t);
};

struct nb_link {
struct net_buf *nb;
BTPDQ_ENTRY(nb_link) entry;
};

BTPDQ_HEAD(nb_tq, nb_link);

struct net_buf *nb_create_alloc(short type, size_t len);
struct net_buf *nb_create_set(short type, char *buf, size_t len,
void (*kill_buf)(char *, size_t));
int nb_drop(struct net_buf *nb);
void nb_hold(struct net_buf *nb);
uint32_t nb_get_index(struct net_buf *nb);
uint32_t nb_get_begin(struct net_buf *nb);
uint32_t nb_get_length(struct net_buf *nb);
#define WRITE_TIMEOUT (& (struct timeval) { 60, 0 })

struct peer;

@@ -94,36 +58,18 @@ struct generic_reader {
char _io_buf[MAX_INPUT_LEFT];
};

struct piece_req {
uint32_t index, begin, length;
struct iob_link *head; /* Pointer to outgoing piece. */
BTPDQ_ENTRY(piece_req) entry;
};

BTPDQ_HEAD(piece_req_tq, piece_req);

void net_connection_cb(int sd, short type, void *arg);
void net_bw_rate(void);
void net_bw_cb(int sd, short type, void *arg);

struct peer;

void net_send_uninterest(struct peer *p);
void net_send_interest(struct peer *p);
void net_send_unchoke(struct peer *p);
void net_send_choke(struct peer *p);

void net_send_have(struct peer *p, uint32_t index);
void net_send_request(struct peer *p, struct piece_req *req);
void net_send_piece(struct peer *p, uint32_t index, uint32_t begin,
char *block, size_t blen);
void net_send_cancel(struct peer *p, struct piece_req *req);
int net_unsend(struct peer *p, struct nb_link *nl);
void net_handshake(struct peer *p, int incoming);

void net_read_cb(int sd, short type, void *arg);
void net_write_cb(int sd, short type, void *arg);

void net_handshake(struct peer *p, int incoming);
int net_connect2(struct sockaddr *sa, socklen_t salen, int *sd);
int net_connect(const char *ip, int port, int *sd);

void net_write32(void *buf, uint32_t num);
uint32_t net_read32(void *buf);

#endif

+ 234
- 0
btpd/net_buf.c Parādīt failu

@@ -0,0 +1,234 @@
#include <math.h>
#include <string.h>

#include "btpd.h"

static void
kill_buf_no(char *buf, size_t len)
{

}

static void
kill_buf_free(char *buf, size_t len)
{
free(buf);
}

static struct net_buf *
nb_create_alloc(short type, size_t len)
{
struct net_buf *nb = btpd_calloc(1, sizeof(*nb) + len);
nb->type = type;
nb->buf = (char *)(nb + 1);
nb->len = len;
nb->kill_buf = kill_buf_no;
return nb;
}

static struct net_buf *
nb_create_set(short type, char *buf, size_t len,
void (*kill_buf)(char *, size_t))
{
struct net_buf *nb = btpd_calloc(1, sizeof(*nb));
nb->type = type;
nb->buf = buf;
nb->len = len;
nb->kill_buf = kill_buf;
return nb;
}

static struct net_buf *
nb_create_onesized(char mtype, int btype)
{
struct net_buf *out = nb_create_alloc(btype, 5);
net_write32(out->buf, 1);
out->buf[4] = mtype;
return out;
}

struct net_buf *
nb_create_piece(uint32_t index, uint32_t begin, size_t blen)
{
struct net_buf *out;

btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen);

out = nb_create_alloc(NB_PIECE, 13);
net_write32(out->buf, 9 + blen);
out->buf[4] = MSG_PIECE;
net_write32(out->buf + 5, index);
net_write32(out->buf + 9, begin);
return out;
}

struct net_buf *
nb_create_torrentdata(char *block, size_t blen)
{
struct net_buf *out;
out = nb_create_set(NB_TORRENTDATA, block, blen, kill_buf_free);
return out;
}

struct net_buf *
nb_create_request(uint32_t index, uint32_t begin, uint32_t length)
{
struct net_buf *out = nb_create_alloc(NB_REQUEST, 17);
net_write32(out->buf, 13);
out->buf[4] = MSG_REQUEST;
net_write32(out->buf + 5, index);
net_write32(out->buf + 9, begin);
net_write32(out->buf + 13, length);
return out;
}

struct net_buf *
nb_create_cancel(uint32_t index, uint32_t begin, uint32_t length)
{
struct net_buf *out = nb_create_alloc(NB_CANCEL, 17);
net_write32(out->buf, 13);
out->buf[4] = MSG_CANCEL;
net_write32(out->buf + 5, index);
net_write32(out->buf + 9, begin);
net_write32(out->buf + 13, length);
return out;
}

struct net_buf *
nb_create_have(uint32_t index)
{
struct net_buf *out = nb_create_alloc(NB_HAVE, 9);
net_write32(out->buf, 5);
out->buf[4] = MSG_HAVE;
net_write32(out->buf + 5, index);
return out;
}

struct net_buf *
nb_create_multihave(struct torrent *tp)
{
struct net_buf *out = nb_create_alloc(NB_MULTIHAVE, 9 * tp->have_npieces);
for (uint32_t i = 0, count = 0; count < tp->have_npieces; i++) {
if (has_bit(tp->piece_field, i)) {
net_write32(out->buf + count * 9, 5);
out->buf[count * 9 + 4] = MSG_HAVE;
net_write32(out->buf + count * 9 + 5, i);
count++;
}
}
return out;
}

struct net_buf *
nb_create_unchoke(void)
{
return nb_create_onesized(MSG_UNCHOKE, NB_UNCHOKE);
}

struct net_buf *
nb_create_choke(void)
{
return nb_create_onesized(MSG_CHOKE, NB_CHOKE);
}

struct net_buf *
nb_create_uninterest(void)
{
return nb_create_onesized(MSG_UNINTEREST, NB_UNINTEREST);
}

struct net_buf *
nb_create_interest(void)
{
return nb_create_onesized(MSG_INTEREST, NB_INTEREST);
}

struct net_buf *
nb_create_bitfield(struct torrent *tp)
{
uint32_t plen = ceil(tp->meta.npieces / 8.0);

struct net_buf *out = nb_create_alloc(NB_BITFIELD, 5);
net_write32(out->buf, plen + 1);
out->buf[4] = MSG_BITFIELD;
return out;
}

struct net_buf *
nb_create_bitdata(struct torrent *tp)
{
uint32_t plen = ceil(tp->meta.npieces / 8.0);
struct net_buf *out =
nb_create_set(NB_BITDATA, tp->piece_field, plen, kill_buf_no);
return out;
}

struct net_buf *
nb_create_shake(struct torrent *tp)
{
struct net_buf *out = nb_create_alloc(NB_SHAKE, 68);
bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->buf, 28);
bcopy(tp->meta.info_hash, out->buf + 28, 20);
bcopy(btpd.peer_id, out->buf + 48, 20);
return out;
}

uint32_t
nb_get_index(struct net_buf *nb)
{
switch (nb->type) {
case NB_CANCEL:
case NB_HAVE:
case NB_PIECE:
case NB_REQUEST:
return net_read32(nb->buf + 5);
default:
abort();
}
}

uint32_t
nb_get_begin(struct net_buf *nb)
{
switch (nb->type) {
case NB_CANCEL:
case NB_PIECE:
case NB_REQUEST:
return net_read32(nb->buf + 9);
default:
abort();
}
}

uint32_t
nb_get_length(struct net_buf *nb)
{
switch (nb->type) {
case NB_CANCEL:
case NB_REQUEST:
return net_read32(nb->buf + 13);
case NB_PIECE:
return net_read32(nb->buf) - 9;
default:
abort();
}
}

int
nb_drop(struct net_buf *nb)
{
assert(nb->refs > 0);
nb->refs--;
if (nb->refs == 0) {
nb->kill_buf(nb->buf, nb->len);
free(nb);
return 1;
} else
return 0;
}

void
nb_hold(struct net_buf *nb)
{
nb->refs++;
}

+ 59
- 0
btpd/net_buf.h Parādīt failu

@@ -0,0 +1,59 @@
#ifndef BTPD_NET_BUF_H
#define BTPD_NET_BUF_H

#define NB_CHOKE 0
#define NB_UNCHOKE 1
#define NB_INTEREST 2
#define NB_UNINTEREST 3
#define NB_HAVE 4
#define NB_BITFIELD 5
#define NB_REQUEST 6
#define NB_PIECE 7
#define NB_CANCEL 8
#define NB_TORRENTDATA 10
#define NB_MULTIHAVE 11
#define NB_BITDATA 12
#define NB_SHAKE 13

struct net_buf {
short type;
unsigned refs;
char *buf;
size_t len;
void (*kill_buf)(char *, size_t);
};

struct nb_link {
struct net_buf *nb;
BTPDQ_ENTRY(nb_link) entry;
};

BTPDQ_HEAD(nb_tq, nb_link);

struct torrent;
struct peer;

struct net_buf *nb_create_piece(uint32_t index, uint32_t begin, size_t blen);
struct net_buf *nb_create_torrentdata(char *block, size_t blen);
struct net_buf *nb_create_request(uint32_t index,
uint32_t begin, uint32_t length);
struct net_buf *nb_create_cancel(uint32_t index,
uint32_t begin, uint32_t length);
struct net_buf *nb_create_have(uint32_t index);
struct net_buf *nb_create_multihave(struct torrent *tp);
struct net_buf *nb_create_unchoke(void);
struct net_buf *nb_create_choke(void);
struct net_buf *nb_create_uninterest(void);
struct net_buf *nb_create_interest(void);
struct net_buf *nb_create_bitfield(struct torrent *tp);
struct net_buf *nb_create_bitdata(struct torrent *tp);
struct net_buf *nb_create_shake(struct torrent *tp);

int nb_drop(struct net_buf *nb);
void nb_hold(struct net_buf *nb);

uint32_t nb_get_index(struct net_buf *nb);
uint32_t nb_get_begin(struct net_buf *nb);
uint32_t nb_get_length(struct net_buf *nb);

#endif

+ 88
- 36
btpd/peer.c Parādīt failu

@@ -21,7 +21,6 @@ void
peer_kill(struct peer *p)
{
struct nb_link *nl;
struct piece_req *req;

btpd_log(BTPD_L_CONN, "killed peer.\n");

@@ -45,11 +44,12 @@ peer_kill(struct peer *p)
free(nl);
nl = next;
}
req = BTPDQ_FIRST(&p->my_reqs);
while (req != NULL) {
struct piece_req *next = BTPDQ_NEXT(req, entry);
free(req);
req = next;
nl = BTPDQ_FIRST(&p->my_reqs);
while (nl != NULL) {
struct nb_link *next = BTPDQ_NEXT(nl, entry);
nb_drop(nl->nb);
free(nl);
nl = next;
}

p->reader->kill(p->reader);
@@ -59,32 +59,82 @@ peer_kill(struct peer *p)
btpd.npeers--;
}

void
peer_send(struct peer *p, struct net_buf *nb)
{
struct nb_link *nl = btpd_calloc(1, sizeof(*nl));
nl->nb = nb;
nb_hold(nb);

if (BTPDQ_EMPTY(&p->outq)) {
assert(p->outq_off == 0);
event_add(&p->out_ev, WRITE_TIMEOUT);
}
BTPDQ_INSERT_TAIL(&p->outq, nl, entry);
}


/*
* Remove a network buffer from the peer's outq.
* If a part of the buffer already have been written
* to the network it cannot be removed.
*
* Returns 1 if the buffer is removed, 0 if not.
*/
int
peer_unsend(struct peer *p, struct nb_link *nl)
{
if (!(nl == BTPDQ_FIRST(&p->outq) && p->outq_off > 0)) {
BTPDQ_REMOVE(&p->outq, nl, entry);
nb_drop(nl->nb);
free(nl);
if (BTPDQ_EMPTY(&p->outq)) {
if (p->flags & PF_ON_WRITEQ) {
BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
p->flags &= ~PF_ON_WRITEQ;
} else
event_del(&p->out_ev);
}
return 1;
} else
return 0;
}

void
peer_request(struct peer *p, uint32_t index, uint32_t begin, uint32_t len)
{
if (p->tp->endgame == 0)
assert(p->nreqs_out < MAXPIPEDREQUESTS);
p->nreqs_out++;
struct piece_req *req = btpd_calloc(1, sizeof(*req));
req->index = index;
req->begin = begin;
req->length = len;
BTPDQ_INSERT_TAIL(&p->my_reqs, req, entry);
net_send_request(p, req);
struct net_buf *nb = nb_create_request(index, begin, len);
struct nb_link *nl = btpd_calloc(1, sizeof(*nl));
nl->nb = nb;
nb_hold(nb);
BTPDQ_INSERT_TAIL(&p->my_reqs, nl, entry);
peer_send(p, nb);
}

void
peer_cancel(struct peer *p, uint32_t index, uint32_t begin, uint32_t len)
{
struct piece_req *req;
struct net_buf *nb = NULL;
struct nb_link *nl;
again:
BTPDQ_FOREACH(req, &p->my_reqs, entry)
if (index == req->index && begin == req->begin && len == req->length)
BTPDQ_FOREACH(nl, &p->my_reqs, entry) {
int match = nb_get_begin(nl->nb) == begin
&& nb_get_index(nl->nb) == index
&& nb_get_length(nl->nb) == len;
if (match)
break;
if (req != NULL) {
net_send_cancel(p, req);
BTPDQ_REMOVE(&p->my_reqs, req, entry);
free(req);
}
if (nl != NULL) {
if (nb == NULL) {
nb = nb_create_cancel(index, begin, len);
peer_send(p, nb);
}
BTPDQ_REMOVE(&p->my_reqs, nl, entry);
nb_drop(nl->nb);
free(nl);
p->nreqs_out--;
goto again;
}
@@ -93,14 +143,14 @@ again:
void
peer_have(struct peer *p, uint32_t index)
{
net_send_have(p, index);
peer_send(p, nb_create_have(index));
}

void
peer_unchoke(struct peer *p)
{
p->flags &= ~PF_I_CHOKE;
net_send_unchoke(p);
peer_send(p, btpd.unchoke_msg);
}

void
@@ -112,14 +162,14 @@ peer_choke(struct peer *p)
if (nl->nb->type == NB_PIECE) {
struct nb_link *data = next;
next = BTPDQ_NEXT(next, entry);
if (net_unsend(p, nl))
net_unsend(p, data);
if (peer_unsend(p, nl))
peer_unsend(p, data);
}
nl = next;
}

p->flags |= PF_I_CHOKE;
net_send_choke(p);
peer_send(p, btpd.choke_msg);
}

void
@@ -129,7 +179,7 @@ peer_want(struct peer *p, uint32_t index)
p->nwant++;
if (p->nwant == 1) {
p->flags |= PF_I_WANT;
net_send_interest(p);
peer_send(p, btpd.interest_msg);
}
}

@@ -140,7 +190,7 @@ peer_unwant(struct peer *p, uint32_t index)
p->nwant--;
if (p->nwant == 0) {
p->flags &= ~PF_I_WANT;
net_send_uninterest(p);
peer_send(p, btpd.uninterest_msg);
}
}

@@ -275,16 +325,17 @@ void
peer_on_piece(struct peer *p, uint32_t index, uint32_t begin,
uint32_t length, const char *data)
{
struct piece_req *req = BTPDQ_FIRST(&p->my_reqs);
if (req != NULL &&
req->index == index &&
req->begin == begin &&
req->length == length) {
struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
if (nl != NULL &&
nb_get_begin(nl->nb) == begin &&
nb_get_index(nl->nb) == index &&
nb_get_length(nl->nb) == length) {

assert(p->nreqs_out > 0);
p->nreqs_out--;
BTPDQ_REMOVE(&p->my_reqs, req, entry);
free(req);
BTPDQ_REMOVE(&p->my_reqs, nl, entry);
nb_drop(nl->nb);
free(nl);
cm_on_block(p, index, begin, length, data);
}
@@ -296,7 +347,8 @@ peer_on_request(struct peer *p, uint32_t index, uint32_t begin,
{
off_t cbegin = index * p->tp->meta.piece_length + begin;
char * content = torrent_get_bytes(p->tp, cbegin, length);
net_send_piece(p, index, begin, content, length);
peer_send(p, nb_create_piece(index, begin, length));
peer_send(p, nb_create_torrentdata(content, length));
}

void
@@ -310,8 +362,8 @@ peer_on_cancel(struct peer *p, uint32_t index, uint32_t begin,
&& nb_get_index(nl->nb) == index
&& nb_get_length(nl->nb) == length) {
struct nb_link *data = BTPDQ_NEXT(nl, entry);
if (net_unsend(p, nl))
net_unsend(p, data);
if (peer_unsend(p, nl))
peer_unsend(p, data);
break;
}
}


+ 4
- 1
btpd/peer.h Parādīt failu

@@ -25,7 +25,7 @@ struct peer {

struct torrent *tp;

struct piece_req_tq my_reqs;
struct nb_tq my_reqs;

unsigned nreqs_out;

@@ -48,6 +48,9 @@ struct peer {

BTPDQ_HEAD(peer_tq, peer);

void peer_send(struct peer *p, struct net_buf *nb);
int peer_unsend(struct peer *p, struct nb_link *nl);

void peer_unchoke(struct peer *p);
void peer_choke(struct peer *p);
void peer_unwant(struct peer *p, uint32_t index);


+ 17
- 14
btpd/policy_subr.c Parādīt failu

@@ -396,20 +396,22 @@ cm_unassign_requests(struct peer *p)
while (pc != NULL) {
int was_full = piece_full(pc);

struct piece_req *req = BTPDQ_FIRST(&p->my_reqs);
while (req != NULL) {
struct piece_req *next = BTPDQ_NEXT(req, entry);
struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
while (nl != NULL) {
struct nb_link *next = BTPDQ_NEXT(nl, entry);

if (pc->index == req->index) {
if (pc->index == nb_get_index(nl->nb)) {
uint32_t block = nb_get_begin(nl->nb) / PIECE_BLOCKLEN;
// XXX: Needs to be looked at if we introduce snubbing.
assert(has_bit(pc->down_field, req->begin / PIECE_BLOCKLEN));
clear_bit(pc->down_field, req->begin / PIECE_BLOCKLEN);
assert(has_bit(pc->down_field, block));
clear_bit(pc->down_field, block);
pc->nbusy--;
BTPDQ_REMOVE(&p->my_reqs, req, entry);
free(req);
BTPDQ_REMOVE(&p->my_reqs, nl, entry);
nb_drop(nl->nb);
free(nl);
}
req = next;
nl = next;
}
if (was_full && !piece_full(pc))
@@ -449,11 +451,12 @@ cm_assign_requests_eg(struct peer *p)
void
cm_unassign_requests_eg(struct peer *p)
{
struct piece_req *req = BTPDQ_FIRST(&p->my_reqs);
while (req != NULL) {
struct piece_req *next = BTPDQ_NEXT(req, entry);
free(req);
req = next;
struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
while (nl != NULL) {
struct nb_link *next = BTPDQ_NEXT(nl, entry);
nb_drop(nl->nb);
free(nl);
nl = next;
}
BTPDQ_INIT(&p->my_reqs);
p->nreqs_out = 0;


Notiek ielāde…
Atcelt
Saglabāt