Pārlūkot izejas kodu

Use the old simpler bandwidth limiter, but run it at a configurable rate.

Let the default be 8 hz for now.

Removed a try at time correction. I don't really think it'll matter and
there was a potential bug if the clock went backwards.

Removed net_by_second. Let the peer bandwidth calculation be handled in
cm_by_second.
master
Richard Nyberg pirms 19 gadiem
vecāks
revīzija
f671c4f965
5 mainītis faili ar 73 papildinājumiem un 97 dzēšanām
  1. +21
    -12
      btpd/btpd.c
  2. +3
    -2
      btpd/btpd.h
  3. +40
    -74
      btpd/net.c
  4. +1
    -9
      btpd/net.h
  5. +8
    -0
      btpd/policy_if.c

+ 21
- 12
btpd/btpd.c Parādīt failu

@@ -116,7 +116,6 @@ btpd_init(void)
btpd.ntorrents = 0; btpd.ntorrents = 0;
BTPDQ_INIT(&btpd.cm_list); BTPDQ_INIT(&btpd.cm_list);


BTPDQ_INIT(&btpd.bwq);
BTPDQ_INIT(&btpd.readq); BTPDQ_INIT(&btpd.readq);
BTPDQ_INIT(&btpd.writeq); BTPDQ_INIT(&btpd.writeq);


@@ -124,6 +123,7 @@ btpd_init(void)


btpd.port = 6881; btpd.port = 6881;


btpd.bw_hz = 8;
btpd.obwlim = 0; btpd.obwlim = 0;
btpd.ibwlim = 0; btpd.ibwlim = 0;
btpd.obw_left = 0; btpd.obw_left = 0;
@@ -187,20 +187,13 @@ static void
heartbeat_cb(int sd, short type, void *arg) heartbeat_cb(int sd, short type, void *arg)
{ {
struct torrent *tp; struct torrent *tp;
struct timeval begin, end, wadj;
gettimeofday(&begin, NULL);


btpd.seconds++; btpd.seconds++;


BTPDQ_FOREACH(tp, &btpd.cm_list, entry) BTPDQ_FOREACH(tp, &btpd.cm_list, entry)
cm_by_second(tp); cm_by_second(tp);


net_by_second();

gettimeofday(&end, NULL);
timersub(&end, &begin, &wadj);
evtimer_add(&btpd.heartbeat,
(& (struct timeval) { 0, 1000000 - wadj.tv_usec }));
evtimer_add(&btpd.heartbeat, (& (struct timeval) { 0, 1000000 }));
} }


static void static void
@@ -210,6 +203,10 @@ usage()
"\n" "\n"
"Options:\n" "Options:\n"
"\n" "\n"
"--bw-hz n\n"
"\tRun the bandwidth limiter at n hz.\n"
"\tDefault is 8 hz.\n"
"\n"
"--bw-in n\n" "--bw-in n\n"
"\tLimit incoming BitTorrent traffic to n kB/s.\n" "\tLimit incoming BitTorrent traffic to n kB/s.\n"
"\tDefault is 0 which means unlimited.\n" "\tDefault is 0 which means unlimited.\n"
@@ -242,6 +239,7 @@ static int longval = 0;


static struct option longopts[] = { static struct option longopts[] = {
{ "port", required_argument, NULL, 'p' }, { "port", required_argument, NULL, 'p' },
{ "bw-hz", required_argument, &longval, 6 },
{ "bw-in", required_argument, &longval, 1 }, { "bw-in", required_argument, &longval, 1 },
{ "bw-out", required_argument, &longval, 2 }, { "bw-out", required_argument, &longval, 2 },
{ "logfile", required_argument, &longval, 3 }, { "logfile", required_argument, &longval, 3 },
@@ -272,11 +270,9 @@ main(int argc, char **argv)
switch (longval) { switch (longval) {
case 1: case 1:
btpd.ibwlim = atoi(optarg) * 1024; btpd.ibwlim = atoi(optarg) * 1024;
btpd.ibw_left = btpd.ibwlim;
break; break;
case 2: case 2:
btpd.obwlim = atoi(optarg) * 1024; btpd.obwlim = atoi(optarg) * 1024;
btpd.obw_left = btpd.obwlim;
break; break;
case 3: case 3:
logfile = optarg; logfile = optarg;
@@ -289,6 +285,12 @@ main(int argc, char **argv)
break; break;
case 5: case 5:
usage(); usage();
case 6:
btpd.bw_hz = atoi(optarg);
if (btpd.bw_hz <= 0 || btpd.bw_hz > 100)
btpd_err("I will only accept bw limiter hz "
"between 1 and 100.\n");
break;
default: default:
usage(); usage();
} }
@@ -370,7 +372,6 @@ main(int argc, char **argv)
setlinebuf(stdout); setlinebuf(stdout);
setlinebuf(stderr); setlinebuf(stderr);



event_init(); event_init();


signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
@@ -393,6 +394,14 @@ main(int argc, char **argv)
net_connection_cb, &btpd); net_connection_cb, &btpd);
event_add(&btpd.accept4, NULL); event_add(&btpd.accept4, NULL);


evtimer_set(&btpd.bwlim, net_bw_cb, NULL);
if (btpd.obwlim > 0 || btpd.ibwlim > 0) {
btpd.ibw_left = btpd.ibwlim / btpd.bw_hz;
btpd.obw_left = btpd.obwlim / btpd.bw_hz;
evtimer_add(&btpd.bwlim,
(& (struct timeval) { 0, 1000000 / btpd.bw_hz }));
}

error = event_dispatch(); error = event_dispatch();
btpd_err("Returned from dispatch. Error = %d.\n", error); btpd_err("Returned from dispatch. Error = %d.\n", error);




+ 3
- 2
btpd/btpd.h Parādīt failu

@@ -46,7 +46,6 @@ struct btpd {
unsigned ntorrents; unsigned ntorrents;
struct torrent_tq cm_list; struct torrent_tq cm_list;


struct bwlim_tq bwq;
struct peer_tq readq; struct peer_tq readq;
struct peer_tq writeq; struct peer_tq writeq;


@@ -56,9 +55,11 @@ struct btpd {
int peer4_sd; int peer4_sd;
int ipc_sd; int ipc_sd;


unsigned bw_hz;
unsigned long obwlim, ibwlim; unsigned long obwlim, ibwlim;
unsigned long ibw_left, obw_left; unsigned long ibw_left, obw_left;
struct event bwlim;

unsigned npeers; unsigned npeers;
unsigned maxpeers; unsigned maxpeers;




+ 40
- 74
btpd/net.c Parādīt failu

@@ -25,34 +25,6 @@
static unsigned long static unsigned long
net_write(struct peer *p, unsigned long wmax); net_write(struct peer *p, unsigned long wmax);


void
net_bw_read_cb(int sd, short type, void *arg)
{
struct peer *p;
struct bwlim *bw = arg;

btpd.ibw_left += bw->count;
assert(btpd.ibw_left <= btpd.ibwlim);

unsigned long count = 0;

while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left - count > 0) {
BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
p->flags &= ~PF_ON_READQ;
count += p->reader->read(p, btpd.ibw_left - count);
}
btpd.ibw_left -= count;

BTPDQ_REMOVE(&btpd.bwq, bw, entry);
if (count == 0)
free(bw);
else {
bw->count = count;
event_add(&bw->timer, (& (struct timeval) { 1, 0 }));
BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
}
}

void void
net_read_cb(int sd, short type, void *arg) net_read_cb(int sd, short type, void *arg)
{ {
@@ -60,49 +32,13 @@ net_read_cb(int sd, short type, void *arg)
if (btpd.ibwlim == 0) { if (btpd.ibwlim == 0) {
p->reader->read(p, 0); p->reader->read(p, 0);
} else if (btpd.ibw_left > 0) { } else if (btpd.ibw_left > 0) {
unsigned long nread = p->reader->read(p, btpd.ibw_left);
if (nread > 0) {
struct bwlim *bw = btpd_calloc(1, sizeof(*bw));
evtimer_set(&bw->timer, net_bw_read_cb, bw);
evtimer_add(&bw->timer, (& (struct timeval) { 1, 0 }));
bw->count = nread;
btpd.ibw_left -= nread;
BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
}
btpd.ibw_left -= p->reader->read(p, btpd.ibw_left);
} else { } else {
p->flags |= PF_ON_READQ; p->flags |= PF_ON_READQ;
BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry); BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry);
} }
} }


void
net_bw_write_cb(int sd, short type, void *arg)
{
struct peer *p;
struct bwlim *bw = arg;

btpd.obw_left += bw->count;
assert(btpd.obw_left <= btpd.obwlim);

unsigned long count = 0;

while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL && btpd.obw_left - count > 0) {
BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
p->flags &= ~PF_ON_WRITEQ;
count += net_write(p, btpd.obw_left - count);
}
btpd.obw_left -= count;

BTPDQ_REMOVE(&btpd.bwq, bw, entry);
if (count == 0)
free(bw);
else {
bw->count = count;
event_add(&bw->timer, (& (struct timeval) { 1, 0 }));
BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
}
}

void void
net_write_cb(int sd, short type, void *arg) net_write_cb(int sd, short type, void *arg)
{ {
@@ -115,15 +51,7 @@ net_write_cb(int sd, short type, void *arg)
if (btpd.obwlim == 0) { if (btpd.obwlim == 0) {
net_write(p, 0); net_write(p, 0);
} else if (btpd.obw_left > 0) { } else if (btpd.obw_left > 0) {
unsigned long nw = net_write(p, btpd.obw_left);
if (nw > 0) {
struct bwlim *bw = btpd_calloc(1, sizeof(*bw));
evtimer_set(&bw->timer, net_bw_write_cb, bw);
evtimer_add(&bw->timer, (& (struct timeval) { 1, 0 }));
bw->count = nw;
btpd.obw_left -= nw;
BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
}
btpd.obw_left -= net_write(p, btpd.obw_left);
} else { } else {
p->flags |= PF_ON_WRITEQ; p->flags |= PF_ON_WRITEQ;
BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry); BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry);
@@ -917,3 +845,41 @@ net_by_second(void)
} }
} }
} }

void
net_bw_cb(int sd, short type, void *arg)
{
struct peer *p;

btpd.obw_left = btpd.obwlim / btpd.bw_hz;
btpd.ibw_left = btpd.ibwlim / btpd.bw_hz;

if (btpd.ibwlim > 0) {
while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left > 0) {
BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
p->flags &= ~PF_ON_READQ;
btpd.ibw_left -= p->reader->read(p, btpd.ibw_left);
}
} else {
while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL) {
BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
p->flags &= ~PF_ON_READQ;
p->reader->read(p, 0);
}
}

if (btpd.obwlim) {
while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL && btpd.obw_left > 0) {
BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
p->flags &= ~PF_ON_WRITEQ;
btpd.obw_left -= net_write(p, btpd.obw_left);
}
} else {
while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL) {
BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
p->flags &= ~PF_ON_WRITEQ;
net_write(p, 0);
}
}
event_add(&btpd.bwlim, (& (struct timeval) { 0, 1000000 / btpd.bw_hz }));
}

+ 1
- 9
btpd/net.h Parādīt failu

@@ -11,14 +11,6 @@
#define MSG_PIECE 7 #define MSG_PIECE 7
#define MSG_CANCEL 8 #define MSG_CANCEL 8


struct bwlim {
unsigned long count;
struct event timer;
BTPDQ_ENTRY(bwlim) entry;
};

BTPDQ_HEAD(bwlim_tq, bwlim);

struct iob_link { struct iob_link {
int upload; int upload;
BTPDQ_ENTRY(iob_link) entry; BTPDQ_ENTRY(iob_link) entry;
@@ -82,7 +74,7 @@ struct piece_req {
BTPDQ_HEAD(piece_req_tq, piece_req); BTPDQ_HEAD(piece_req_tq, piece_req);


void net_connection_cb(int sd, short type, void *arg); void net_connection_cb(int sd, short type, void *arg);
void net_by_second(void);
void net_bw_cb(int sd, short type, void *arg);


struct peer; struct peer;




+ 8
- 0
btpd/policy_if.c Parādīt failu

@@ -15,6 +15,14 @@ cm_by_second(struct torrent *tp)


if (btpd.seconds == tp->choke_time) if (btpd.seconds == tp->choke_time)
choke_alg(tp); choke_alg(tp);

struct peer *p;
int ri = btpd.seconds % RATEHISTORY;

BTPDQ_FOREACH(p, &tp->peers, cm_entry) {
p->rate_to_me[ri] = 0;
p->rate_from_me[ri] = 0;
}
} }


/* /*


Notiek ielāde…
Atcelt
Saglabāt