Browse Source

* Allocate request messages on piece creation. The request objects can

be shared by several peers. At least in end game.
* Link blocks with the peers we are loading them from and vice versa.
* Limit the number of requests / peer in end game too.
* Improve end game by using some sort of round robin for block requests.
master
Richard Nyberg 19 years ago
parent
commit
f31e2d8b89
6 changed files with 278 additions and 137 deletions
  1. +33
    -50
      btpd/peer.c
  2. +14
    -5
      btpd/peer.h
  3. +3
    -3
      btpd/policy.h
  4. +33
    -18
      btpd/policy_if.c
  5. +183
    -60
      btpd/policy_subr.c
  6. +12
    -1
      btpd/torrent.h

+ 33
- 50
btpd/peer.c View File

@@ -44,13 +44,6 @@ peer_kill(struct peer *p)
free(nl);
nl = next;
}
nl = BTPDQ_FIRST(&p->my_reqs);
while (nl != NULL) {
struct nb_link *next = BTPDQ_NEXT(nl, entry);
nb_drop(nl->nb);
free(nl);
nl = next;
}

p->reader->kill(p->reader);
if (p->piece_field != NULL)
@@ -119,49 +112,40 @@ peer_sent(struct peer *p, struct net_buf *nb)
}

void
peer_request(struct peer *p, uint32_t index, uint32_t begin, uint32_t len)
peer_request(struct peer *p, struct block_request *req)
{
if (p->tp->endgame == 0)
assert(p->nreqs_out < MAXPIPEDREQUESTS);
assert(p->nreqs_out < MAXPIPEDREQUESTS);
p->nreqs_out++;
struct net_buf *nb = nb_create_request(index, begin, len);
struct nb_link *nl = btpd_calloc(1, sizeof(*nl));
nl->nb = nb;
nb_hold(nb);
BTPDQ_INSERT_TAIL(&p->my_reqs, nl, entry);
peer_send(p, nb);
BTPDQ_INSERT_TAIL(&p->my_reqs, req, p_entry);
peer_send(p, req->blk->msg);
}

int
peer_requested(struct peer *p, struct block *blk)
{
struct block_request *req;
BTPDQ_FOREACH(req, &p->my_reqs, p_entry)
if (req->blk == blk)
return 1;
return 0;
}

void
peer_cancel(struct peer *p, uint32_t index, uint32_t begin, uint32_t len)
peer_cancel(struct peer *p, struct block_request *req, struct net_buf *nb)
{
struct net_buf *nb = NULL;
BTPDQ_REMOVE(&p->my_reqs, req, p_entry);
p->nreqs_out--;

int removed = 0;
struct nb_link *nl;
again:
BTPDQ_FOREACH(nl, &p->my_reqs, entry) {
int match = nb_get_begin(nl->nb) == begin
&& nb_get_index(nl->nb) == index
&& nb_get_length(nl->nb) == len;
if (match)
BTPDQ_FOREACH(nl, &p->outq, entry) {
if (nl->nb == req->blk->msg) {
removed = peer_unsend(p, nl);
break;
}
if (nl != NULL) {
if (nb == NULL) {
nb = nb_create_cancel(index, begin, len);
peer_send(p, nb);
}
BTPDQ_REMOVE(&p->my_reqs, nl, entry);
nb_drop(nl->nb);
free(nl);
p->nreqs_out--;
goto again;
}
}

void
peer_have(struct peer *p, uint32_t index)
{
peer_send(p, nb_create_have(index));
if (!removed)
peer_send(p, nb);
}

void
@@ -343,19 +327,18 @@ void
peer_on_piece(struct peer *p, uint32_t index, uint32_t begin,
uint32_t length, const char *data)
{
struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
if (nl != NULL &&
nb_get_begin(nl->nb) == begin &&
nb_get_index(nl->nb) == index &&
nb_get_length(nl->nb) == length) {
struct block_request *req = BTPDQ_FIRST(&p->my_reqs);
if (req == NULL)
return;
struct net_buf *nb = req->blk->msg;
if (nb_get_begin(nb) == begin &&
nb_get_index(nb) == index &&
nb_get_length(nb) == length) {

assert(p->nreqs_out > 0);
p->nreqs_out--;
BTPDQ_REMOVE(&p->my_reqs, nl, entry);
nb_drop(nl->nb);
free(nl);
cm_on_block(p, index, begin, length, data);
BTPDQ_REMOVE(&p->my_reqs, req, p_entry);
cm_on_block(p, req, index, begin, length, data);
}
}



+ 14
- 5
btpd/peer.h View File

@@ -15,6 +15,15 @@
#define MAXPIECEMSGS 128
#define MAXPIPEDREQUESTS 10

struct block_request {
struct peer *p;
struct block *blk;
BTPDQ_ENTRY(block_request) p_entry;
BTPDQ_ENTRY(block_request) blk_entry;
};

BTPDQ_HEAD(block_request_tq, block_request);

struct peer {
int sd;
uint16_t flags;
@@ -26,7 +35,7 @@ struct peer {

struct torrent *tp;

struct nb_tq my_reqs;
struct block_request_tq my_reqs;

unsigned nreqs_out;
unsigned npiece_msgs;
@@ -58,11 +67,11 @@ void peer_unchoke(struct peer *p);
void peer_choke(struct peer *p);
void peer_unwant(struct peer *p, uint32_t index);
void peer_want(struct peer *p, uint32_t index);
void peer_request(struct peer *p, uint32_t index,
uint32_t begin, uint32_t len);
void peer_cancel(struct peer *p, uint32_t index, uint32_t begin, uint32_t len);
void peer_request(struct peer *p, struct block_request *req);
void peer_cancel(struct peer *p, struct block_request *req,
struct net_buf *nb);

void peer_have(struct peer *p, uint32_t index);
int peer_requested(struct peer *p, struct block *blk);

unsigned long peer_get_rate(unsigned long *rates);



+ 3
- 3
btpd/policy.h View File

@@ -17,11 +17,11 @@ void cm_on_piece(struct piece *pc);
struct piece *cm_new_piece(struct torrent *tp, uint32_t index);
struct piece *cm_find_piece(struct torrent *tp, uint32_t index);
unsigned cm_piece_assign_requests(struct piece *pc, struct peer *p);
void cm_piece_assign_requests_eg(struct piece *pc, struct peer *p);
unsigned cm_assign_requests(struct peer *p);
void cm_assign_requests_eg(struct peer *p);
void cm_unassign_requests(struct peer *p);
void cm_unassign_requests_eg(struct peer *p);
void cm_piece_reorder_eg(struct piece *pc);

// policy_if.c

@@ -39,8 +39,8 @@ void cm_on_uninterest(struct peer *p);
void cm_on_download(struct peer *p);
void cm_on_undownload(struct peer *p);
void cm_on_piece_ann(struct peer *p, uint32_t index);
void cm_on_block(struct peer *p, uint32_t index, uint32_t begin,
uint32_t length, const char *data);
void cm_on_block(struct peer *p, struct block_request *req,
uint32_t index, uint32_t begin, uint32_t length, const char *data);

void cm_on_ok_piece(struct piece *pc);
void cm_on_bad_piece(struct piece *pc);


+ 33
- 18
btpd/policy_if.c View File

@@ -40,11 +40,10 @@ cm_on_piece_ann(struct peer *p, uint32_t index)
return;
struct piece *pc = cm_find_piece(tp, index);
if (tp->endgame) {
if (pc != NULL) {
peer_want(p, index);
if (!peer_chokes(p))
cm_piece_assign_requests_eg(pc, p);
}
assert(pc != NULL);
peer_want(p, index);
if (!peer_chokes(p) && !peer_laden(p))
cm_assign_requests_eg(p);
} else if (pc == NULL) {
peer_want(p, index);
if (!peer_chokes(p) && !peer_laden(p)) {
@@ -146,6 +145,7 @@ cm_on_ok_piece(struct piece *pc)
if (peer_has(p, pc->index))
peer_unwant(p, pc->index);

assert(pc->nreqs == 0);
piece_free(pc);

if (torrent_has_all(tp)) {
@@ -178,8 +178,8 @@ cm_on_bad_piece(struct piece *pc)
if (tp->endgame) {
struct peer *p;
BTPDQ_FOREACH(p, &tp->peers, cm_entry) {
if (peer_has(p, pc->index) && peer_leech_ok(p))
cm_piece_assign_requests_eg(pc, p);
if (peer_has(p, pc->index) && peer_leech_ok(p) && !peer_laden(p))
cm_assign_requests_eg(p);
}
} else
cm_on_piece_unfull(pc); // XXX: May get bad data again.
@@ -245,31 +245,46 @@ cm_on_lost_peer(struct peer *p)
}

void
cm_on_block(struct peer *p, uint32_t index, uint32_t begin, uint32_t length,
const char *data)
cm_on_block(struct peer *p, struct block_request *req,
uint32_t index, uint32_t begin, uint32_t length, const char *data)
{
struct torrent *tp = p->tp;
struct block *blk = req->blk;
struct piece *pc = blk->pc;

BTPDQ_REMOVE(&blk->reqs, req, blk_entry);
free(req);
pc->nreqs--;

off_t cbegin = index * p->tp->meta.piece_length + begin;
torrent_put_bytes(p->tp, data, cbegin, length);

struct piece *pc = cm_find_piece(tp, index);
assert(pc != NULL);

uint32_t block = begin / PIECE_BLOCKLEN;
set_bit(pc->have_field, block);
set_bit(pc->have_field, begin / PIECE_BLOCKLEN);
pc->ngot++;

if (tp->endgame) {
BTPDQ_FOREACH(p, &tp->peers, cm_entry) {
if (peer_has(p, index) && p->nreqs_out > 0)
peer_cancel(p, index, begin, length);
if (!BTPDQ_EMPTY(&blk->reqs)) {
struct net_buf *nb = nb_create_cancel(index, begin, length);
nb_hold(nb);
struct block_request *req = BTPDQ_FIRST(&blk->reqs);
while (req != NULL) {
struct block_request *next = BTPDQ_NEXT(req, blk_entry);
peer_cancel(req->p, req, nb);
free(req);
pc->nreqs--;
req = next;
}
BTPDQ_INIT(&blk->reqs);
nb_drop(nb);
}
cm_piece_reorder_eg(pc);
if (pc->ngot == pc->nblocks)
cm_on_piece(pc);
if (peer_leech_ok(p) && !peer_laden(p))
cm_assign_requests_eg(p);
} else {
// XXX: Needs to be looked at if we introduce snubbing.
clear_bit(pc->down_field, block);
clear_bit(pc->down_field, begin / PIECE_BLOCKLEN);
pc->nbusy--;
if (pc->ngot == pc->nblocks)
cm_on_piece(pc);


+ 183
- 60
btpd/policy_subr.c View File

@@ -35,13 +35,14 @@ piece_alloc(struct torrent *tp, uint32_t index)
assert(!has_bit(tp->busy_field, index)
&& tp->npcs_busy < tp->meta.npieces);
struct piece *pc;
size_t mem, field;
size_t mem, field, blocks;
unsigned nblocks;
off_t piece_length = torrent_piece_size(tp, index);

nblocks = (unsigned)ceil((double)piece_length / PIECE_BLOCKLEN);
blocks = sizeof(pc->blocks[0]) * nblocks;
field = (size_t)ceil(nblocks / 8.0);
mem = sizeof(*pc) + field;
mem = sizeof(*pc) + field + blocks;

pc = btpd_calloc(1, mem);
pc->tp = tp;
@@ -49,13 +50,28 @@ piece_alloc(struct torrent *tp, uint32_t index)
pc->have_field =
tp->block_field +
index * (size_t)ceil(tp->meta.piece_length / (double)(1 << 17));
pc->nblocks = nblocks;
pc->index = index;
pc->nblocks = nblocks;

pc->nreqs = 0;
pc->next_block = 0;

for (unsigned i = 0; i < nblocks; i++)
if (has_bit(pc->have_field, i))
pc->ngot++;

pc->blocks = (struct block *)(pc->down_field + field);
for (unsigned i = 0; i < nblocks; i++) {
uint32_t start = i * PIECE_BLOCKLEN;
uint32_t len = torrent_block_size(pc, i);
struct block *blk = &pc->blocks[i];
blk->pc = pc;
BTPDQ_INIT(&blk->reqs);
blk->msg = nb_create_request(index, start, len);
nb_hold(blk->msg);
}

tp->npcs_busy++;
set_bit(tp->busy_field, index);
BTPDQ_INSERT_HEAD(&tp->getlst, pc, entry);
@@ -70,6 +86,15 @@ piece_free(struct piece *pc)
tp->npcs_busy--;
clear_bit(tp->busy_field, pc->index);
BTPDQ_REMOVE(&pc->tp->getlst, pc, entry);
for (unsigned i = 0; i < pc->nblocks; i++) {
struct block_request *req = BTPDQ_FIRST(&pc->blocks[i].reqs);
while (req != NULL) {
struct block_request *next = BTPDQ_NEXT(req, blk_entry);
free(req);
req = next;
}
nb_drop(pc->blocks[i].msg);
}
free(pc);
}

@@ -97,27 +122,66 @@ cm_should_enter_endgame(struct torrent *tp)
return should;
}

static void
cm_piece_insert_eg(struct piece *pc)
{
struct piece_tq *getlst = &pc->tp->getlst;
if (pc->nblocks == pc->ngot)
BTPDQ_INSERT_TAIL(getlst, pc, entry);
else {
unsigned r = pc->nreqs / (pc->nblocks - pc->ngot);
struct piece *it;
BTPDQ_FOREACH(it, getlst, entry) {
if ((it->nblocks == it->ngot
|| r < it->nreqs / (it->nblocks - it->ngot))) {
BTPDQ_INSERT_BEFORE(it, pc, entry);
break;
}
}
if (it == NULL)
BTPDQ_INSERT_TAIL(getlst, pc, entry);
}
}

void
cm_piece_reorder_eg(struct piece *pc)
{
BTPDQ_REMOVE(&pc->tp->getlst, pc, entry);
cm_piece_insert_eg(pc);
}

static void
cm_enter_endgame(struct torrent *tp)
{
struct peer *p;
struct piece *pc;
struct piece *pcs[tp->npcs_busy];
unsigned pi;

btpd_log(BTPD_L_POL, "Entering end game\n");
tp->endgame = 1;

pi = 0;
BTPDQ_FOREACH(pc, &tp->getlst, entry) {
for (uint32_t i = 0; i < pc->nblocks; i++)
for (unsigned i = 0; i < pc->nblocks; i++)
clear_bit(pc->down_field, i);
pc->nbusy = 0;
pcs[pi] = pc;
pi++;
}
BTPDQ_INIT(&tp->getlst);
while (pi > 0) {
pi--;
cm_piece_insert_eg(pcs[pi]);
}
BTPDQ_FOREACH(p, &tp->peers, cm_entry) {
assert(p->nwant == 0);
BTPDQ_FOREACH(pc, &tp->getlst, entry) {
if (peer_has(p, pc->index)) {
if (peer_has(p, pc->index))
peer_want(p, pc->index);
if (peer_leech_ok(p))
cm_piece_assign_requests_eg(pc, p);
}
}
if (p->nwant > 0 && peer_leech_ok(p) && !peer_laden(p))
cm_assign_requests_eg(p);
}
}

@@ -320,6 +384,10 @@ cm_on_piece_unfull(struct piece *pc)
}
}

#define INCNEXTBLOCK(pc) \
(pc)->next_block = ((pc)->next_block + 1) % (pc)->nblocks


/*
* Request as many blocks as possible on this piece from
* the peer. If the piece becomes full we call cm_on_piece_full.
@@ -331,18 +399,29 @@ cm_piece_assign_requests(struct piece *pc, struct peer *p)
{
assert(!piece_full(pc) && !peer_laden(p));
unsigned count = 0;
for (uint32_t i = 0; !piece_full(pc) && !peer_laden(p); i++) {
if (has_bit(pc->have_field, i) || has_bit(pc->down_field, i))
continue;
set_bit(pc->down_field, i);
do {
while ((has_bit(pc->have_field, pc->next_block)
|| has_bit(pc->down_field, pc->next_block)))
INCNEXTBLOCK(pc);

struct block *blk = &pc->blocks[pc->next_block];
struct block_request *req = btpd_malloc(sizeof(*req));
req->p = p;
req->blk = blk;
BTPDQ_INSERT_TAIL(&blk->reqs, req, blk_entry);
peer_request(p, req);

set_bit(pc->down_field, pc->next_block);
pc->nbusy++;
uint32_t start = i * PIECE_BLOCKLEN;
uint32_t len = torrent_block_size(pc, i);
peer_request(p, pc->index, start, len);
pc->nreqs++;
count++;
}
INCNEXTBLOCK(pc);
} while (!piece_full(pc) && !peer_laden(p));

if (piece_full(pc))
cm_on_piece_full(pc);

return count;
}

@@ -390,74 +469,118 @@ cm_assign_requests(struct peer *p)
void
cm_unassign_requests(struct peer *p)
{
struct torrent *tp = p->tp;

struct piece *pc = BTPDQ_FIRST(&tp->getlst);
while (pc != NULL) {
while (p->nreqs_out > 0) {
struct block_request *req = BTPDQ_FIRST(&p->my_reqs);
struct piece *pc = req->blk->pc;
int was_full = piece_full(pc);

struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
while (nl != NULL) {
struct nb_link *next = BTPDQ_NEXT(nl, entry);

if (pc->index == nb_get_index(nl->nb)) {
uint32_t block = nb_get_begin(nl->nb) / PIECE_BLOCKLEN;
// XXX: Needs to be looked at if we introduce snubbing.
assert(has_bit(pc->down_field, block));
clear_bit(pc->down_field, block);
pc->nbusy--;
BTPDQ_REMOVE(&p->my_reqs, nl, entry);
nb_drop(nl->nb);
free(nl);
}
nl = next;
while (req != NULL) {
struct block_request *next = BTPDQ_NEXT(req, p_entry);

uint32_t blki = nb_get_begin(req->blk->msg) / PIECE_BLOCKLEN;
struct block *blk = req->blk;
// XXX: Needs to be looked at if we introduce snubbing.
assert(has_bit(pc->down_field, blki));
clear_bit(pc->down_field, blki);
pc->nbusy--;
BTPDQ_REMOVE(&p->my_reqs, req, p_entry);
p->nreqs_out--;
BTPDQ_REMOVE(&blk->reqs, req, blk_entry);
free(req);
pc->nreqs--;

while (next != NULL && next->blk->pc != pc)
next = BTPDQ_NEXT(next, p_entry);
req = next;
}
if (was_full && !piece_full(pc))
cm_on_piece_unfull(pc);

pc = BTPDQ_NEXT(pc, entry);
}

assert(BTPDQ_EMPTY(&p->my_reqs));
p->nreqs_out = 0;
}


void
static void
cm_piece_assign_requests_eg(struct piece *pc, struct peer *p)
{
for (uint32_t i = 0; i < pc->nblocks; i++) {
if (!has_bit(pc->have_field, i)) {
uint32_t start = i * PIECE_BLOCKLEN;
uint32_t len = torrent_block_size(pc, i);
peer_request(p, pc->index, start, len);
unsigned first_block = pc->next_block;
do {
if ((has_bit(pc->have_field, pc->next_block)
|| peer_requested(p, &pc->blocks[pc->next_block]))) {
INCNEXTBLOCK(pc);
continue;
}
}
struct block_request *req = btpd_calloc(1, sizeof(*req));
req->blk = &pc->blocks[pc->next_block];
req->p = p;
BTPDQ_INSERT_TAIL(&pc->blocks[pc->next_block].reqs, req, blk_entry);
pc->nreqs++;
INCNEXTBLOCK(pc);
peer_request(p, req);
} while (!peer_laden(p) && pc->next_block != first_block);
}

void
cm_assign_requests_eg(struct peer *p)
{
assert(!peer_laden(p));
struct torrent *tp = p->tp;
struct piece *pc;
BTPDQ_FOREACH(pc, &tp->getlst, entry) {
if (peer_has(p, pc->index))
struct piece_tq tmp;
BTPDQ_INIT(&tmp);

struct piece *pc = BTPDQ_FIRST(&tp->getlst);
while (!peer_laden(p) && pc != NULL) {
struct piece *next = BTPDQ_NEXT(pc, entry);
if (peer_has(p, pc->index) && pc->nblocks != pc->ngot) {
cm_piece_assign_requests_eg(pc, p);
BTPDQ_REMOVE(&tp->getlst, pc, entry);
BTPDQ_INSERT_HEAD(&tmp, pc, entry);
}
pc = next;
}

pc = BTPDQ_FIRST(&tmp);
while (pc != NULL) {
struct piece *next = BTPDQ_NEXT(pc, entry);
cm_piece_insert_eg(pc);
pc = next;
}
}

void
cm_unassign_requests_eg(struct peer *p)
{
struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
while (nl != NULL) {
struct nb_link *next = BTPDQ_NEXT(nl, entry);
nb_drop(nl->nb);
free(nl);
nl = next;
struct block_request *req;
struct piece *pc;
struct piece_tq tmp;
BTPDQ_INIT(&tmp);

while (p->nreqs_out > 0) {
req = BTPDQ_FIRST(&p->my_reqs);

pc = req->blk->pc;
BTPDQ_REMOVE(&pc->tp->getlst, pc, entry);
BTPDQ_INSERT_HEAD(&tmp, pc, entry);
while (req != NULL) {
struct block_request *next = BTPDQ_NEXT(req, p_entry);
BTPDQ_REMOVE(&p->my_reqs, req, p_entry);
p->nreqs_out--;
BTPDQ_REMOVE(&req->blk->reqs, req, blk_entry);
free(req);
pc->nreqs--;

while (next != NULL && next->blk->pc != pc)
next = BTPDQ_NEXT(next, p_entry);
req = next;
}
}
assert(BTPDQ_EMPTY(&p->my_reqs));

pc = BTPDQ_FIRST(&tmp);
while (pc != NULL) {
struct piece *next = BTPDQ_NEXT(pc, entry);
cm_piece_insert_eg(pc);
pc = next;
}
BTPDQ_INIT(&p->my_reqs);
p->nreqs_out = 0;
}

+ 12
- 1
btpd/torrent.h View File

@@ -3,14 +3,25 @@

#define PIECE_BLOCKLEN (1 << 14)

struct block {
struct piece *pc;
struct net_buf *msg;
struct block_request_tq reqs;
};

struct piece {
struct torrent *tp;

uint32_t index;
unsigned nblocks;

unsigned nreqs;

unsigned nblocks;
unsigned ngot;
unsigned nbusy;
unsigned next_block;

struct block *blocks;

uint8_t *have_field;
uint8_t *down_field;


Loading…
Cancel
Save