A clone of btpd with my configuration changes.

662 行
17 KiB

  1. #include <sys/types.h>
  2. #include <sys/uio.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <netdb.h>
  6. #include <sys/mman.h>
  7. #include <sys/wait.h>
  8. #include <assert.h>
  9. #include <errno.h>
  10. #include <fcntl.h>
  11. #include <math.h>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <unistd.h>
  16. #include "btpd.h"
  17. static struct event m_bw_timer;
  18. static unsigned long m_bw_bytes_in;
  19. static unsigned long m_bw_bytes_out;
  20. static unsigned long m_rate_up;
  21. static unsigned long m_rate_dwn;
  22. static struct event m_net_incoming;
  23. static unsigned m_ntorrents;
  24. static struct net_tq m_torrents = BTPDQ_HEAD_INITIALIZER(m_torrents);
  25. unsigned net_npeers;
  26. struct peer_tq net_bw_readq = BTPDQ_HEAD_INITIALIZER(net_bw_readq);
  27. struct peer_tq net_bw_writeq = BTPDQ_HEAD_INITIALIZER(net_bw_writeq);
  28. struct peer_tq net_unattached = BTPDQ_HEAD_INITIALIZER(net_unattached);
  29. int
  30. net_torrent_has_peer(struct net *n, const uint8_t *id)
  31. {
  32. int has = 0;
  33. struct peer *p = BTPDQ_FIRST(&n->peers);
  34. while (p != NULL) {
  35. if (bcmp(p->id, id, 20) == 0) {
  36. has = 1;
  37. break;
  38. }
  39. p = BTPDQ_NEXT(p, p_entry);
  40. }
  41. return has;
  42. }
  43. void
  44. net_add_torrent(struct torrent *tp)
  45. {
  46. size_t field_size = ceil(tp->meta.npieces / 8.0);
  47. size_t mem = sizeof(*(tp->net)) + field_size +
  48. tp->meta.npieces * sizeof(*(tp->net->piece_count));
  49. struct net *n = btpd_calloc(1, mem);
  50. n->tp = tp;
  51. tp->net = n;
  52. n->active = 1;
  53. BTPDQ_INIT(&n->getlst);
  54. n->busy_field = (uint8_t *)(n + 1);
  55. n->piece_count = (unsigned *)(n->busy_field + field_size);
  56. BTPDQ_INSERT_HEAD(&m_torrents, n, entry);
  57. m_ntorrents++;
  58. }
  59. void
  60. net_del_torrent(struct torrent *tp)
  61. {
  62. struct net *n = tp->net;
  63. tp->net = NULL;
  64. assert(m_ntorrents > 0);
  65. m_ntorrents--;
  66. BTPDQ_REMOVE(&m_torrents, n, entry);
  67. n->active = 0;
  68. ul_on_lost_torrent(n);
  69. struct piece *pc;
  70. while ((pc = BTPDQ_FIRST(&n->getlst)) != NULL)
  71. piece_free(pc);
  72. struct peer *p = BTPDQ_FIRST(&net_unattached);
  73. while (p != NULL) {
  74. struct peer *next = BTPDQ_NEXT(p, p_entry);
  75. if (p->n == n)
  76. peer_kill(p);
  77. p = next;
  78. }
  79. p = BTPDQ_FIRST(&n->peers);
  80. while (p != NULL) {
  81. struct peer *next = BTPDQ_NEXT(p, p_entry);
  82. peer_kill(p);
  83. p = next;
  84. }
  85. free(n);
  86. }
  87. void
  88. net_write32(void *buf, uint32_t num)
  89. {
  90. *(uint32_t *)buf = htonl(num);
  91. }
  92. uint32_t
  93. net_read32(const void *buf)
  94. {
  95. return ntohl(*(uint32_t *)buf);
  96. }
  97. static unsigned long
  98. net_write(struct peer *p, unsigned long wmax)
  99. {
  100. struct nb_link *nl;
  101. struct iovec iov[IOV_MAX];
  102. int niov;
  103. int limited;
  104. ssize_t nwritten;
  105. unsigned long bcount;
  106. limited = wmax > 0;
  107. niov = 0;
  108. assert((nl = BTPDQ_FIRST(&p->outq)) != NULL);
  109. while ((niov < IOV_MAX && nl != NULL
  110. && (!limited || (limited && wmax > 0)))) {
  111. if (niov > 0) {
  112. iov[niov].iov_base = nl->nb->buf;
  113. iov[niov].iov_len = nl->nb->len;
  114. } else {
  115. iov[niov].iov_base = nl->nb->buf + p->outq_off;
  116. iov[niov].iov_len = nl->nb->len - p->outq_off;
  117. }
  118. if (limited) {
  119. if (iov[niov].iov_len > wmax)
  120. iov[niov].iov_len = wmax;
  121. wmax -= iov[niov].iov_len;
  122. }
  123. niov++;
  124. nl = BTPDQ_NEXT(nl, entry);
  125. }
  126. nwritten = writev(p->sd, iov, niov);
  127. if (nwritten < 0) {
  128. if (errno == EAGAIN) {
  129. event_add(&p->out_ev, WRITE_TIMEOUT);
  130. return 0;
  131. } else {
  132. btpd_log(BTPD_L_CONN, "write error: %s\n", strerror(errno));
  133. peer_kill(p);
  134. return 0;
  135. }
  136. } else if (nwritten == 0) {
  137. btpd_log(BTPD_L_CONN, "connection closed by peer.\n");
  138. peer_kill(p);
  139. return 0;
  140. }
  141. bcount = nwritten;
  142. nl = BTPDQ_FIRST(&p->outq);
  143. while (bcount > 0) {
  144. unsigned long bufdelta = nl->nb->len - p->outq_off;
  145. if (bcount >= bufdelta) {
  146. peer_sent(p, nl->nb);
  147. if (nl->nb->type == NB_TORRENTDATA) {
  148. p->n->uploaded += bufdelta;
  149. p->count_up += bufdelta;
  150. }
  151. bcount -= bufdelta;
  152. BTPDQ_REMOVE(&p->outq, nl, entry);
  153. nb_drop(nl->nb);
  154. free(nl);
  155. p->outq_off = 0;
  156. nl = BTPDQ_FIRST(&p->outq);
  157. } else {
  158. if (nl->nb->type == NB_TORRENTDATA) {
  159. p->n->uploaded += bcount;
  160. p->count_up += bcount;
  161. }
  162. p->outq_off += bcount;
  163. bcount = 0;
  164. }
  165. }
  166. if (!BTPDQ_EMPTY(&p->outq))
  167. event_add(&p->out_ev, WRITE_TIMEOUT);
  168. return nwritten;
  169. }
  170. static int
  171. net_dispatch_msg(struct peer *p, const char *buf)
  172. {
  173. uint32_t index, begin, length;
  174. int res = 0;
  175. switch (p->in.msg_num) {
  176. case MSG_CHOKE:
  177. peer_on_choke(p);
  178. break;
  179. case MSG_UNCHOKE:
  180. peer_on_unchoke(p);
  181. break;
  182. case MSG_INTEREST:
  183. peer_on_interest(p);
  184. break;
  185. case MSG_UNINTEREST:
  186. peer_on_uninterest(p);
  187. break;
  188. case MSG_HAVE:
  189. peer_on_have(p, net_read32(buf));
  190. break;
  191. case MSG_BITFIELD:
  192. if (p->npieces == 0)
  193. peer_on_bitfield(p, buf);
  194. else
  195. res = 1;
  196. break;
  197. case MSG_REQUEST:
  198. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) {
  199. index = net_read32(buf);
  200. begin = net_read32(buf + 4);
  201. length = net_read32(buf + 8);
  202. if ((length > PIECE_BLOCKLEN
  203. || index >= p->n->tp->meta.npieces
  204. || cm_has_piece(p->n->tp, index)
  205. || begin + length > torrent_piece_size(p->n->tp, index))) {
  206. btpd_log(BTPD_L_MSG, "bad request: (%u, %u, %u) from %p\n",
  207. index, begin, length, p);
  208. res = 1;
  209. break;
  210. }
  211. peer_on_request(p, index, begin, length);
  212. }
  213. break;
  214. case MSG_CANCEL:
  215. index = net_read32(buf);
  216. begin = net_read32(buf + 4);
  217. length = net_read32(buf + 8);
  218. peer_on_cancel(p, index, begin, length);
  219. break;
  220. case MSG_PIECE:
  221. length = p->in.msg_len - 9;
  222. peer_on_piece(p, p->in.pc_index, p->in.pc_begin, length, buf);
  223. break;
  224. default:
  225. abort();
  226. }
  227. return res;
  228. }
  229. static int
  230. net_mh_ok(struct peer *p)
  231. {
  232. uint32_t mlen = p->in.msg_len;
  233. switch (p->in.msg_num) {
  234. case MSG_CHOKE:
  235. case MSG_UNCHOKE:
  236. case MSG_INTEREST:
  237. case MSG_UNINTEREST:
  238. return mlen == 1;
  239. case MSG_HAVE:
  240. return mlen == 5;
  241. case MSG_BITFIELD:
  242. return mlen == (uint32_t)ceil(p->n->tp->meta.npieces / 8.0) + 1;
  243. case MSG_REQUEST:
  244. case MSG_CANCEL:
  245. return mlen == 13;
  246. case MSG_PIECE:
  247. return mlen <= PIECE_BLOCKLEN + 9;
  248. default:
  249. return 0;
  250. }
  251. }
  252. static void
  253. net_progress(struct peer *p, size_t length)
  254. {
  255. if (p->in.state == BTP_MSGBODY && p->in.msg_num == MSG_PIECE) {
  256. p->n->downloaded += length;
  257. p->count_dwn += length;
  258. }
  259. }
  260. static int
  261. net_state(struct peer *p, const char *buf)
  262. {
  263. switch (p->in.state) {
  264. case SHAKE_PSTR:
  265. if (bcmp(buf, "\x13""BitTorrent protocol", 20) != 0)
  266. goto bad;
  267. peer_set_in_state(p, SHAKE_INFO, 20);
  268. break;
  269. case SHAKE_INFO:
  270. if (p->flags & PF_INCOMING) {
  271. struct net *n;
  272. BTPDQ_FOREACH(n, &m_torrents, entry)
  273. if (bcmp(buf, n->tp->meta.info_hash, 20) == 0)
  274. break;
  275. if (n == NULL)
  276. goto bad;
  277. p->n = n;
  278. peer_send(p, nb_create_shake(p->n->tp));
  279. } else if (bcmp(buf, p->n->tp->meta.info_hash, 20) != 0)
  280. goto bad;
  281. peer_set_in_state(p, SHAKE_ID, 20);
  282. break;
  283. case SHAKE_ID:
  284. if ((net_torrent_has_peer(p->n, buf)
  285. || bcmp(buf, btpd_get_peer_id(), 20) == 0))
  286. goto bad;
  287. bcopy(buf, p->id, 20);
  288. peer_on_shake(p);
  289. peer_set_in_state(p, BTP_MSGSIZE, 4);
  290. break;
  291. case BTP_MSGSIZE:
  292. p->in.msg_len = net_read32(buf);
  293. if (p->in.msg_len == 0)
  294. peer_on_keepalive(p);
  295. else
  296. peer_set_in_state(p, BTP_MSGHEAD, 1);
  297. break;
  298. case BTP_MSGHEAD:
  299. p->in.msg_num = buf[0];
  300. if (!net_mh_ok(p))
  301. goto bad;
  302. else if (p->in.msg_len == 1) {
  303. if (net_dispatch_msg(p, buf) != 0)
  304. goto bad;
  305. peer_set_in_state(p, BTP_MSGSIZE, 4);
  306. } else if (p->in.msg_num == MSG_PIECE)
  307. peer_set_in_state(p, BTP_PIECEMETA, 8);
  308. else
  309. peer_set_in_state(p, BTP_MSGBODY, p->in.msg_len - 1);
  310. break;
  311. case BTP_PIECEMETA:
  312. p->in.pc_index = net_read32(buf);
  313. p->in.pc_begin = net_read32(buf + 4);
  314. peer_set_in_state(p, BTP_MSGBODY, p->in.msg_len - 9);
  315. break;
  316. case BTP_MSGBODY:
  317. if (net_dispatch_msg(p, buf) != 0)
  318. goto bad;
  319. peer_set_in_state(p, BTP_MSGSIZE, 4);
  320. break;
  321. default:
  322. abort();
  323. }
  324. return 0;
  325. bad:
  326. btpd_log(BTPD_L_CONN, "bad data from %p (%u, %u, %u).\n",
  327. p, p->in.state, p->in.msg_len, p->in.msg_num);
  328. peer_kill(p);
  329. return -1;
  330. }
  331. #define GRBUFLEN (1 << 15)
  332. static unsigned long
  333. net_read(struct peer *p, unsigned long rmax)
  334. {
  335. size_t rest = p->in.buf != NULL ? p->in.st_bytes - p->in.off : 0;
  336. char buf[GRBUFLEN];
  337. struct iovec iov[2] = {
  338. {
  339. p->in.buf + p->in.off,
  340. rest
  341. }, {
  342. buf,
  343. sizeof(buf)
  344. }
  345. };
  346. if (rmax > 0) {
  347. if (iov[0].iov_len > rmax)
  348. iov[0].iov_len = rmax;
  349. iov[1].iov_len = min(rmax - iov[0].iov_len, iov[1].iov_len);
  350. }
  351. ssize_t nread = readv(p->sd, iov, 2);
  352. if (nread < 0 && errno == EAGAIN)
  353. goto out;
  354. else if (nread < 0) {
  355. btpd_log(BTPD_L_CONN, "Read error (%s) on %p.\n", strerror(errno), p);
  356. peer_kill(p);
  357. return 0;
  358. } else if (nread == 0) {
  359. btpd_log(BTPD_L_CONN, "Connection closed by %p.\n", p);
  360. peer_kill(p);
  361. return 0;
  362. }
  363. if (rest > 0) {
  364. if (nread < rest) {
  365. p->in.off += nread;
  366. net_progress(p, nread);
  367. goto out;
  368. }
  369. net_progress(p, rest);
  370. if (net_state(p, p->in.buf) != 0)
  371. return nread;
  372. free(p->in.buf);
  373. p->in.buf = NULL;
  374. p->in.off = 0;
  375. }
  376. iov[1].iov_len = nread - rest;
  377. while (p->in.st_bytes <= iov[1].iov_len) {
  378. size_t consumed = p->in.st_bytes;
  379. net_progress(p, consumed);
  380. if (net_state(p, iov[1].iov_base) != 0)
  381. return nread;
  382. iov[1].iov_base += consumed;
  383. iov[1].iov_len -= consumed;
  384. }
  385. if (iov[1].iov_len > 0) {
  386. net_progress(p, iov[1].iov_len);
  387. p->in.off = iov[1].iov_len;
  388. p->in.buf = btpd_malloc(p->in.st_bytes);
  389. bcopy(iov[1].iov_base, p->in.buf, iov[1].iov_len);
  390. }
  391. out:
  392. event_add(&p->in_ev, NULL);
  393. return nread > 0 ? nread : 0;
  394. }
  395. int
  396. net_connect2(struct sockaddr *sa, socklen_t salen, int *sd)
  397. {
  398. if ((*sd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  399. return errno;
  400. set_nonblocking(*sd);
  401. if (connect(*sd, sa, salen) == -1 && errno != EINPROGRESS) {
  402. btpd_log(BTPD_L_CONN, "Botched connection %s.", strerror(errno));
  403. close(*sd);
  404. return errno;
  405. }
  406. return 0;
  407. }
  408. int
  409. net_connect(const char *ip, int port, int *sd)
  410. {
  411. struct addrinfo hints, *res;
  412. char portstr[6];
  413. assert(net_npeers < net_max_peers);
  414. if (snprintf(portstr, sizeof(portstr), "%d", port) >= sizeof(portstr))
  415. return EINVAL;
  416. bzero(&hints, sizeof(hints));
  417. hints.ai_family = AF_UNSPEC;
  418. hints.ai_flags = AI_NUMERICHOST;
  419. hints.ai_socktype = SOCK_STREAM;
  420. if (getaddrinfo(ip, portstr, &hints, &res) != 0)
  421. return errno;
  422. int error = net_connect2(res->ai_addr, res->ai_addrlen, sd);
  423. freeaddrinfo(res);
  424. return error;
  425. }
  426. void
  427. net_connection_cb(int sd, short type, void *arg)
  428. {
  429. int nsd;
  430. nsd = accept(sd, NULL, NULL);
  431. if (nsd < 0) {
  432. if (errno == EWOULDBLOCK || errno == ECONNABORTED)
  433. return;
  434. else
  435. btpd_err("accept4: %s\n", strerror(errno));
  436. }
  437. if (set_nonblocking(nsd) != 0) {
  438. close(nsd);
  439. return;
  440. }
  441. assert(net_npeers <= net_max_peers);
  442. if (net_npeers == net_max_peers) {
  443. close(nsd);
  444. return;
  445. }
  446. peer_create_in(nsd);
  447. btpd_log(BTPD_L_CONN, "got connection.\n");
  448. }
  449. #define RATEHISTORY 20
  450. static unsigned long
  451. compute_rate_sub(unsigned long rate)
  452. {
  453. if (rate > 256 * RATEHISTORY)
  454. return rate / RATEHISTORY;
  455. else
  456. return min(256, rate);
  457. }
  458. static void
  459. compute_rates(void) {
  460. unsigned long tot_up = 0, tot_dwn = 0;
  461. struct net *n;
  462. BTPDQ_FOREACH(n, &m_torrents, entry) {
  463. unsigned long tp_up = 0, tp_dwn = 0;
  464. struct peer *p;
  465. BTPDQ_FOREACH(p, &n->peers, p_entry) {
  466. if (p->count_up > 0 || peer_active_up(p)) {
  467. tp_up += p->count_up;
  468. p->rate_up += p->count_up - compute_rate_sub(p->rate_up);
  469. p->count_up = 0;
  470. }
  471. if (p->count_dwn > 0 || peer_active_down(p)) {
  472. tp_dwn += p->count_dwn;
  473. p->rate_dwn += p->count_dwn - compute_rate_sub(p->rate_dwn);
  474. p->count_dwn = 0;
  475. }
  476. }
  477. n->rate_up += tp_up - compute_rate_sub(n->rate_up);
  478. n->rate_dwn += tp_dwn - compute_rate_sub(n->rate_dwn);
  479. tot_up += tp_up;
  480. tot_dwn += tp_dwn;
  481. }
  482. m_rate_up += tot_up - compute_rate_sub(m_rate_up);
  483. m_rate_dwn += tot_dwn - compute_rate_sub(m_rate_dwn);
  484. }
  485. void
  486. net_bw_cb(int sd, short type, void *arg)
  487. {
  488. struct peer *p;
  489. evtimer_add(&m_bw_timer, (& (struct timeval) { 1, 0 }));
  490. compute_rates();
  491. m_bw_bytes_out = net_bw_limit_out;
  492. m_bw_bytes_in = net_bw_limit_in;
  493. if (net_bw_limit_in > 0) {
  494. while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL && m_bw_bytes_in > 0) {
  495. BTPDQ_REMOVE(&net_bw_readq, p, rq_entry);
  496. p->flags &= ~PF_ON_READQ;
  497. m_bw_bytes_in -= net_read(p, m_bw_bytes_in);
  498. }
  499. } else {
  500. while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL) {
  501. BTPDQ_REMOVE(&net_bw_readq, p, rq_entry);
  502. p->flags &= ~PF_ON_READQ;
  503. net_read(p, 0);
  504. }
  505. }
  506. if (net_bw_limit_out) {
  507. while ((p = BTPDQ_FIRST(&net_bw_writeq)) != NULL && m_bw_bytes_out > 0) {
  508. BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry);
  509. p->flags &= ~PF_ON_WRITEQ;
  510. m_bw_bytes_out -= net_write(p, m_bw_bytes_out);
  511. }
  512. } else {
  513. while ((p = BTPDQ_FIRST(&net_bw_writeq)) != NULL) {
  514. BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry);
  515. p->flags &= ~PF_ON_WRITEQ;
  516. net_write(p, 0);
  517. }
  518. }
  519. }
  520. void
  521. net_read_cb(int sd, short type, void *arg)
  522. {
  523. struct peer *p = (struct peer *)arg;
  524. if (net_bw_limit_in == 0)
  525. net_read(p, 0);
  526. else if (m_bw_bytes_in > 0)
  527. m_bw_bytes_in -= net_read(p, m_bw_bytes_in);
  528. else {
  529. p->flags |= PF_ON_READQ;
  530. BTPDQ_INSERT_TAIL(&net_bw_readq, p, rq_entry);
  531. }
  532. }
  533. void
  534. net_write_cb(int sd, short type, void *arg)
  535. {
  536. struct peer *p = (struct peer *)arg;
  537. if (type == EV_TIMEOUT) {
  538. btpd_log(BTPD_L_CONN, "Write attempt timed out.\n");
  539. peer_kill(p);
  540. return;
  541. }
  542. if (net_bw_limit_out == 0) {
  543. net_write(p, 0);
  544. } else if (m_bw_bytes_out > 0) {
  545. m_bw_bytes_out -= net_write(p, m_bw_bytes_out);
  546. } else {
  547. p->flags |= PF_ON_WRITEQ;
  548. BTPDQ_INSERT_TAIL(&net_bw_writeq, p, wq_entry);
  549. }
  550. }
  551. void
  552. net_init(void)
  553. {
  554. m_bw_bytes_out = net_bw_limit_out;
  555. m_bw_bytes_in = net_bw_limit_in;
  556. int nfiles = getdtablesize();
  557. if (nfiles <= 20)
  558. btpd_err("Too few open files allowed (%d). "
  559. "Check \"ulimit -n\"\n", nfiles);
  560. else if (nfiles < 64)
  561. btpd_log(BTPD_L_BTPD,
  562. "You have restricted the number of open files to %d. "
  563. "More could be beneficial to the download performance.\n",
  564. nfiles);
  565. net_max_peers = nfiles - 20;
  566. int sd;
  567. int flag = 1;
  568. struct sockaddr_in addr;
  569. addr.sin_family = AF_INET;
  570. addr.sin_addr.s_addr = htonl(INADDR_ANY);
  571. addr.sin_port = htons(net_port);
  572. if ((sd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
  573. btpd_err("socket: %s\n", strerror(errno));
  574. setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
  575. if (bind(sd, (struct sockaddr *)&addr, sizeof(addr)) == -1)
  576. btpd_err("bind: %s\n", strerror(errno));
  577. listen(sd, 10);
  578. set_nonblocking(sd);
  579. event_set(&m_net_incoming, sd, EV_READ | EV_PERSIST,
  580. net_connection_cb, NULL);
  581. event_add(&m_net_incoming, NULL);
  582. evtimer_set(&m_bw_timer, net_bw_cb, NULL);
  583. evtimer_add(&m_bw_timer, (& (struct timeval) { 1, 0 }));
  584. }