A clone of btpd with my configuration changes.

681 lines
17 KiB

  1. #include <sys/types.h>
  2. #include <sys/uio.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <netdb.h>
  6. #include <sys/mman.h>
  7. #include <sys/wait.h>
  8. #include <assert.h>
  9. #include <errno.h>
  10. #include <fcntl.h>
  11. #include <math.h>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <unistd.h>
  16. #include "btpd.h"
  17. static unsigned long m_bw_bytes_in;
  18. static unsigned long m_bw_bytes_out;
  19. static unsigned long m_rate_up;
  20. static unsigned long m_rate_dwn;
  21. static struct event m_net_incoming;
  22. unsigned net_npeers;
  23. struct peer_tq net_bw_readq = BTPDQ_HEAD_INITIALIZER(net_bw_readq);
  24. struct peer_tq net_bw_writeq = BTPDQ_HEAD_INITIALIZER(net_bw_writeq);
  25. struct peer_tq net_unattached = BTPDQ_HEAD_INITIALIZER(net_unattached);
  26. int
  27. net_torrent_has_peer(struct net *n, const uint8_t *id)
  28. {
  29. int has = 0;
  30. struct peer *p = BTPDQ_FIRST(&n->peers);
  31. while (p != NULL) {
  32. if (bcmp(p->id, id, 20) == 0) {
  33. has = 1;
  34. break;
  35. }
  36. p = BTPDQ_NEXT(p, p_entry);
  37. }
  38. return has;
  39. }
  40. void
  41. net_create(struct torrent *tp)
  42. {
  43. size_t field_size = ceil(tp->npieces / 8.0);
  44. size_t mem = sizeof(*(tp->net)) + field_size +
  45. tp->npieces * sizeof(*(tp->net->piece_count));
  46. struct net *n = btpd_calloc(1, mem);
  47. n->tp = tp;
  48. tp->net = n;
  49. BTPDQ_INIT(&n->getlst);
  50. n->busy_field = (uint8_t *)(n + 1);
  51. n->piece_count = (unsigned *)(n->busy_field + field_size);
  52. }
  53. void
  54. net_kill(struct torrent *tp)
  55. {
  56. free(tp->net);
  57. tp->net = NULL;
  58. }
  59. void
  60. net_start(struct torrent *tp)
  61. {
  62. struct net *n = tp->net;
  63. n->active = 1;
  64. }
  65. void
  66. net_stop(struct torrent *tp)
  67. {
  68. struct net *n = tp->net;
  69. n->active = 0;
  70. n->rate_up = 0;
  71. n->rate_dwn = 0;
  72. ul_on_lost_torrent(n);
  73. struct piece *pc;
  74. while ((pc = BTPDQ_FIRST(&n->getlst)) != NULL)
  75. piece_free(pc);
  76. BTPDQ_INIT(&n->getlst);
  77. struct peer *p = BTPDQ_FIRST(&net_unattached);
  78. while (p != NULL) {
  79. struct peer *next = BTPDQ_NEXT(p, p_entry);
  80. if (p->n == n)
  81. peer_kill(p);
  82. p = next;
  83. }
  84. p = BTPDQ_FIRST(&n->peers);
  85. while (p != NULL) {
  86. struct peer *next = BTPDQ_NEXT(p, p_entry);
  87. peer_kill(p);
  88. p = next;
  89. }
  90. }
  91. int
  92. net_active(struct torrent *tp)
  93. {
  94. return tp->net->active;
  95. }
  96. void
  97. net_write32(void *buf, uint32_t num)
  98. {
  99. uint8_t *p = buf;
  100. *p = (num >> 24) & 0xff;
  101. *(p + 1) = (num >> 16) & 0xff;
  102. *(p + 2) = (num >> 8) & 0xff;
  103. *(p + 3) = num & 0xff;
  104. }
  105. uint32_t
  106. net_read32(const void *buf)
  107. {
  108. const uint8_t *p = buf;
  109. return (uint32_t)*p << 24 | (uint32_t)*(p + 1) << 16
  110. | (uint16_t)*(p + 2) << 8 | *(p + 3);
  111. }
  112. static unsigned long
  113. net_write(struct peer *p, unsigned long wmax)
  114. {
  115. struct nb_link *nl;
  116. struct iovec iov[IOV_MAX];
  117. int niov;
  118. int limited;
  119. ssize_t nwritten;
  120. unsigned long bcount;
  121. limited = wmax > 0;
  122. niov = 0;
  123. assert((nl = BTPDQ_FIRST(&p->outq)) != NULL);
  124. while ((niov < IOV_MAX && nl != NULL
  125. && (!limited || (limited && wmax > 0)))) {
  126. if (niov > 0) {
  127. iov[niov].iov_base = nl->nb->buf;
  128. iov[niov].iov_len = nl->nb->len;
  129. } else {
  130. iov[niov].iov_base = nl->nb->buf + p->outq_off;
  131. iov[niov].iov_len = nl->nb->len - p->outq_off;
  132. }
  133. if (limited) {
  134. if (iov[niov].iov_len > wmax)
  135. iov[niov].iov_len = wmax;
  136. wmax -= iov[niov].iov_len;
  137. }
  138. niov++;
  139. nl = BTPDQ_NEXT(nl, entry);
  140. }
  141. nwritten = writev(p->sd, iov, niov);
  142. if (nwritten < 0) {
  143. if (errno == EAGAIN) {
  144. btpd_ev_add(&p->out_ev, NULL);
  145. p->t_wantwrite = btpd_seconds;
  146. return 0;
  147. } else {
  148. btpd_log(BTPD_L_CONN, "write error: %s\n", strerror(errno));
  149. peer_kill(p);
  150. return 0;
  151. }
  152. } else if (nwritten == 0) {
  153. btpd_log(BTPD_L_CONN, "connection closed by peer.\n");
  154. peer_kill(p);
  155. return 0;
  156. }
  157. bcount = nwritten;
  158. nl = BTPDQ_FIRST(&p->outq);
  159. while (bcount > 0) {
  160. unsigned long bufdelta = nl->nb->len - p->outq_off;
  161. if (bcount >= bufdelta) {
  162. peer_sent(p, nl->nb);
  163. if (nl->nb->type == NB_TORRENTDATA) {
  164. p->n->uploaded += bufdelta;
  165. p->count_up += bufdelta;
  166. }
  167. bcount -= bufdelta;
  168. BTPDQ_REMOVE(&p->outq, nl, entry);
  169. nb_drop(nl->nb);
  170. free(nl);
  171. p->outq_off = 0;
  172. nl = BTPDQ_FIRST(&p->outq);
  173. } else {
  174. if (nl->nb->type == NB_TORRENTDATA) {
  175. p->n->uploaded += bcount;
  176. p->count_up += bcount;
  177. }
  178. p->outq_off += bcount;
  179. bcount = 0;
  180. }
  181. }
  182. if (!BTPDQ_EMPTY(&p->outq)) {
  183. btpd_ev_add(&p->out_ev, NULL);
  184. p->t_wantwrite = btpd_seconds;
  185. }
  186. p->t_lastwrite = btpd_seconds;
  187. return nwritten;
  188. }
  189. static int
  190. net_dispatch_msg(struct peer *p, const char *buf)
  191. {
  192. uint32_t index, begin, length;
  193. int res = 0;
  194. switch (p->in.msg_num) {
  195. case MSG_CHOKE:
  196. peer_on_choke(p);
  197. break;
  198. case MSG_UNCHOKE:
  199. peer_on_unchoke(p);
  200. break;
  201. case MSG_INTEREST:
  202. peer_on_interest(p);
  203. break;
  204. case MSG_UNINTEREST:
  205. peer_on_uninterest(p);
  206. break;
  207. case MSG_HAVE:
  208. peer_on_have(p, net_read32(buf));
  209. break;
  210. case MSG_BITFIELD:
  211. if (p->npieces == 0)
  212. peer_on_bitfield(p, buf);
  213. else
  214. res = 1;
  215. break;
  216. case MSG_REQUEST:
  217. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) {
  218. index = net_read32(buf);
  219. begin = net_read32(buf + 4);
  220. length = net_read32(buf + 8);
  221. if ((length > PIECE_BLOCKLEN
  222. || index >= p->n->tp->npieces
  223. || !cm_has_piece(p->n->tp, index)
  224. || begin + length > torrent_piece_size(p->n->tp, index))) {
  225. btpd_log(BTPD_L_MSG, "bad request: (%u, %u, %u) from %p\n",
  226. index, begin, length, p);
  227. res = 1;
  228. break;
  229. }
  230. peer_on_request(p, index, begin, length);
  231. }
  232. break;
  233. case MSG_CANCEL:
  234. index = net_read32(buf);
  235. begin = net_read32(buf + 4);
  236. length = net_read32(buf + 8);
  237. peer_on_cancel(p, index, begin, length);
  238. break;
  239. case MSG_PIECE:
  240. length = p->in.msg_len - 9;
  241. peer_on_piece(p, p->in.pc_index, p->in.pc_begin, length, buf);
  242. break;
  243. default:
  244. abort();
  245. }
  246. return res;
  247. }
  248. static int
  249. net_mh_ok(struct peer *p)
  250. {
  251. uint32_t mlen = p->in.msg_len;
  252. switch (p->in.msg_num) {
  253. case MSG_CHOKE:
  254. case MSG_UNCHOKE:
  255. case MSG_INTEREST:
  256. case MSG_UNINTEREST:
  257. return mlen == 1;
  258. case MSG_HAVE:
  259. return mlen == 5;
  260. case MSG_BITFIELD:
  261. return mlen == (uint32_t)ceil(p->n->tp->npieces / 8.0) + 1;
  262. case MSG_REQUEST:
  263. case MSG_CANCEL:
  264. return mlen == 13;
  265. case MSG_PIECE:
  266. return mlen <= PIECE_BLOCKLEN + 9;
  267. default:
  268. return 0;
  269. }
  270. }
  271. static void
  272. net_progress(struct peer *p, size_t length)
  273. {
  274. if (p->in.state == BTP_MSGBODY && p->in.msg_num == MSG_PIECE) {
  275. p->n->downloaded += length;
  276. p->count_dwn += length;
  277. }
  278. }
  279. static int
  280. net_state(struct peer *p, const char *buf)
  281. {
  282. switch (p->in.state) {
  283. case SHAKE_PSTR:
  284. if (bcmp(buf, "\x13""BitTorrent protocol", 20) != 0)
  285. goto bad;
  286. peer_set_in_state(p, SHAKE_INFO, 20);
  287. break;
  288. case SHAKE_INFO:
  289. if (p->flags & PF_INCOMING) {
  290. struct torrent *tp = torrent_by_hash(buf);
  291. if (tp == NULL || !net_active(tp))
  292. goto bad;
  293. p->n = tp->net;
  294. peer_send(p, nb_create_shake(tp));
  295. } else if (bcmp(buf, p->n->tp->tl->hash, 20) != 0)
  296. goto bad;
  297. peer_set_in_state(p, SHAKE_ID, 20);
  298. break;
  299. case SHAKE_ID:
  300. if ((net_torrent_has_peer(p->n, buf)
  301. || bcmp(buf, btpd_get_peer_id(), 20) == 0))
  302. goto bad;
  303. bcopy(buf, p->id, 20);
  304. peer_on_shake(p);
  305. peer_set_in_state(p, BTP_MSGSIZE, 4);
  306. break;
  307. case BTP_MSGSIZE:
  308. p->in.msg_len = net_read32(buf);
  309. if (p->in.msg_len == 0)
  310. peer_on_keepalive(p);
  311. else
  312. peer_set_in_state(p, BTP_MSGHEAD, 1);
  313. break;
  314. case BTP_MSGHEAD:
  315. p->in.msg_num = buf[0];
  316. if (!net_mh_ok(p))
  317. goto bad;
  318. else if (p->in.msg_len == 1) {
  319. if (net_dispatch_msg(p, buf) != 0)
  320. goto bad;
  321. peer_set_in_state(p, BTP_MSGSIZE, 4);
  322. } else if (p->in.msg_num == MSG_PIECE)
  323. peer_set_in_state(p, BTP_PIECEMETA, 8);
  324. else
  325. peer_set_in_state(p, BTP_MSGBODY, p->in.msg_len - 1);
  326. break;
  327. case BTP_PIECEMETA:
  328. p->in.pc_index = net_read32(buf);
  329. p->in.pc_begin = net_read32(buf + 4);
  330. peer_set_in_state(p, BTP_MSGBODY, p->in.msg_len - 9);
  331. break;
  332. case BTP_MSGBODY:
  333. if (net_dispatch_msg(p, buf) != 0)
  334. goto bad;
  335. peer_set_in_state(p, BTP_MSGSIZE, 4);
  336. break;
  337. default:
  338. abort();
  339. }
  340. return 0;
  341. bad:
  342. btpd_log(BTPD_L_CONN, "bad data from %p (%u, %u, %u).\n",
  343. p, p->in.state, p->in.msg_len, p->in.msg_num);
  344. peer_kill(p);
  345. return -1;
  346. }
  347. #define GRBUFLEN (1 << 15)
  348. static unsigned long
  349. net_read(struct peer *p, unsigned long rmax)
  350. {
  351. size_t rest = p->in.buf != NULL ? p->in.st_bytes - p->in.off : 0;
  352. char buf[GRBUFLEN];
  353. struct iovec iov[2] = {
  354. {
  355. p->in.buf + p->in.off,
  356. rest
  357. }, {
  358. buf,
  359. sizeof(buf)
  360. }
  361. };
  362. if (rmax > 0) {
  363. if (iov[0].iov_len > rmax)
  364. iov[0].iov_len = rmax;
  365. iov[1].iov_len = min(rmax - iov[0].iov_len, iov[1].iov_len);
  366. }
  367. ssize_t nread = readv(p->sd, iov, 2);
  368. if (nread < 0 && errno == EAGAIN)
  369. goto out;
  370. else if (nread < 0) {
  371. btpd_log(BTPD_L_CONN, "Read error (%s) on %p.\n", strerror(errno), p);
  372. peer_kill(p);
  373. return 0;
  374. } else if (nread == 0) {
  375. btpd_log(BTPD_L_CONN, "Connection closed by %p.\n", p);
  376. peer_kill(p);
  377. return 0;
  378. }
  379. if (rest > 0) {
  380. if (nread < rest) {
  381. p->in.off += nread;
  382. net_progress(p, nread);
  383. goto out;
  384. }
  385. net_progress(p, rest);
  386. if (net_state(p, p->in.buf) != 0)
  387. return nread;
  388. free(p->in.buf);
  389. p->in.buf = NULL;
  390. p->in.off = 0;
  391. }
  392. iov[1].iov_len = nread - rest;
  393. while (p->in.st_bytes <= iov[1].iov_len) {
  394. size_t consumed = p->in.st_bytes;
  395. net_progress(p, consumed);
  396. if (net_state(p, iov[1].iov_base) != 0)
  397. return nread;
  398. iov[1].iov_base += consumed;
  399. iov[1].iov_len -= consumed;
  400. }
  401. if (iov[1].iov_len > 0) {
  402. net_progress(p, iov[1].iov_len);
  403. p->in.off = iov[1].iov_len;
  404. p->in.buf = btpd_malloc(p->in.st_bytes);
  405. bcopy(iov[1].iov_base, p->in.buf, iov[1].iov_len);
  406. }
  407. out:
  408. btpd_ev_add(&p->in_ev, NULL);
  409. return nread > 0 ? nread : 0;
  410. }
  411. int
  412. net_connect2(struct sockaddr *sa, socklen_t salen, int *sd)
  413. {
  414. if ((*sd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  415. return errno;
  416. set_nonblocking(*sd);
  417. if (connect(*sd, sa, salen) == -1 && errno != EINPROGRESS) {
  418. int err = errno;
  419. btpd_log(BTPD_L_CONN, "Botched connection %s.\n", strerror(errno));
  420. close(*sd);
  421. return err;
  422. }
  423. return 0;
  424. }
  425. int
  426. net_connect(const char *ip, int port, int *sd)
  427. {
  428. struct addrinfo hints, *res;
  429. char portstr[6];
  430. assert(net_npeers < net_max_peers);
  431. if (snprintf(portstr, sizeof(portstr), "%d", port) >= sizeof(portstr))
  432. return EINVAL;
  433. bzero(&hints, sizeof(hints));
  434. hints.ai_family = AF_UNSPEC;
  435. hints.ai_flags = AI_NUMERICHOST;
  436. hints.ai_socktype = SOCK_STREAM;
  437. if (getaddrinfo(ip, portstr, &hints, &res) != 0)
  438. return errno;
  439. int error = net_connect2(res->ai_addr, res->ai_addrlen, sd);
  440. freeaddrinfo(res);
  441. return error;
  442. }
  443. void
  444. net_connection_cb(int sd, short type, void *arg)
  445. {
  446. int nsd;
  447. nsd = accept(sd, NULL, NULL);
  448. if (nsd < 0) {
  449. if (errno == EWOULDBLOCK || errno == ECONNABORTED)
  450. return;
  451. else
  452. btpd_err("accept4: %s\n", strerror(errno));
  453. }
  454. if (set_nonblocking(nsd) != 0) {
  455. close(nsd);
  456. return;
  457. }
  458. assert(net_npeers <= net_max_peers);
  459. if (net_npeers == net_max_peers) {
  460. close(nsd);
  461. return;
  462. }
  463. peer_create_in(nsd);
  464. btpd_log(BTPD_L_CONN, "got connection.\n");
  465. }
  466. static unsigned long
  467. compute_rate_sub(unsigned long rate)
  468. {
  469. if (rate > 256 * RATEHISTORY)
  470. return rate / RATEHISTORY;
  471. else
  472. return min(256, rate);
  473. }
  474. static void
  475. compute_rates(void) {
  476. unsigned long tot_up = 0, tot_dwn = 0;
  477. struct torrent *tp;
  478. BTPDQ_FOREACH(tp, torrent_get_all(), entry) {
  479. unsigned long tp_up = 0, tp_dwn = 0;
  480. struct net *n = tp->net;
  481. struct peer *p;
  482. BTPDQ_FOREACH(p, &n->peers, p_entry) {
  483. if (p->count_up > 0 || peer_active_up(p)) {
  484. tp_up += p->count_up;
  485. p->rate_up += p->count_up - compute_rate_sub(p->rate_up);
  486. p->count_up = 0;
  487. }
  488. if (p->count_dwn > 0 || peer_active_down(p)) {
  489. tp_dwn += p->count_dwn;
  490. p->rate_dwn += p->count_dwn - compute_rate_sub(p->rate_dwn);
  491. p->count_dwn = 0;
  492. }
  493. }
  494. n->rate_up += tp_up - compute_rate_sub(n->rate_up);
  495. n->rate_dwn += tp_dwn - compute_rate_sub(n->rate_dwn);
  496. tot_up += tp_up;
  497. tot_dwn += tp_dwn;
  498. }
  499. m_rate_up += tot_up - compute_rate_sub(m_rate_up);
  500. m_rate_dwn += tot_dwn - compute_rate_sub(m_rate_dwn);
  501. }
  502. static void
  503. net_bw_tick(void)
  504. {
  505. struct peer *p;
  506. m_bw_bytes_out = net_bw_limit_out;
  507. m_bw_bytes_in = net_bw_limit_in;
  508. if (net_bw_limit_in > 0) {
  509. while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL && m_bw_bytes_in > 0) {
  510. BTPDQ_REMOVE(&net_bw_readq, p, rq_entry);
  511. p->flags &= ~PF_ON_READQ;
  512. m_bw_bytes_in -= net_read(p, m_bw_bytes_in);
  513. }
  514. } else {
  515. while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL) {
  516. BTPDQ_REMOVE(&net_bw_readq, p, rq_entry);
  517. p->flags &= ~PF_ON_READQ;
  518. net_read(p, 0);
  519. }
  520. }
  521. if (net_bw_limit_out) {
  522. while (((p = BTPDQ_FIRST(&net_bw_writeq)) != NULL
  523. && m_bw_bytes_out > 0)) {
  524. BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry);
  525. p->flags &= ~PF_ON_WRITEQ;
  526. m_bw_bytes_out -= net_write(p, m_bw_bytes_out);
  527. }
  528. } else {
  529. while ((p = BTPDQ_FIRST(&net_bw_writeq)) != NULL) {
  530. BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry);
  531. p->flags &= ~PF_ON_WRITEQ;
  532. net_write(p, 0);
  533. }
  534. }
  535. }
  536. static void
  537. run_peer_ticks(void)
  538. {
  539. struct torrent *tp;
  540. struct peer *p, *next;
  541. BTPDQ_FOREACH_MUTABLE(p, &net_unattached, p_entry, next)
  542. peer_on_tick(p);
  543. BTPDQ_FOREACH(tp, torrent_get_all(), entry)
  544. BTPDQ_FOREACH_MUTABLE(p, &tp->net->peers, p_entry, next)
  545. peer_on_tick(p);
  546. }
  547. void
  548. net_on_tick(void)
  549. {
  550. run_peer_ticks();
  551. compute_rates();
  552. net_bw_tick();
  553. }
  554. void
  555. net_read_cb(int sd, short type, void *arg)
  556. {
  557. struct peer *p = (struct peer *)arg;
  558. if (net_bw_limit_in == 0)
  559. net_read(p, 0);
  560. else if (m_bw_bytes_in > 0)
  561. m_bw_bytes_in -= net_read(p, m_bw_bytes_in);
  562. else {
  563. p->flags |= PF_ON_READQ;
  564. BTPDQ_INSERT_TAIL(&net_bw_readq, p, rq_entry);
  565. }
  566. }
  567. void
  568. net_write_cb(int sd, short type, void *arg)
  569. {
  570. struct peer *p = (struct peer *)arg;
  571. if (net_bw_limit_out == 0)
  572. net_write(p, 0);
  573. else if (m_bw_bytes_out > 0)
  574. m_bw_bytes_out -= net_write(p, m_bw_bytes_out);
  575. else {
  576. p->flags |= PF_ON_WRITEQ;
  577. BTPDQ_INSERT_TAIL(&net_bw_writeq, p, wq_entry);
  578. }
  579. }
  580. void
  581. net_init(void)
  582. {
  583. m_bw_bytes_out = net_bw_limit_out;
  584. m_bw_bytes_in = net_bw_limit_in;
  585. int safe_fds = min(getdtablesize(), FD_SETSIZE) * 4 / 5;
  586. if (net_max_peers == 0 || net_max_peers > safe_fds)
  587. net_max_peers = safe_fds;
  588. int sd;
  589. int flag = 1;
  590. struct sockaddr_in addr;
  591. addr.sin_family = AF_INET;
  592. addr.sin_addr.s_addr = htonl(INADDR_ANY);
  593. addr.sin_port = htons(net_port);
  594. if ((sd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
  595. btpd_err("socket: %s\n", strerror(errno));
  596. setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
  597. if (bind(sd, (struct sockaddr *)&addr, sizeof(addr)) == -1)
  598. btpd_err("bind: %s\n", strerror(errno));
  599. listen(sd, 10);
  600. set_nonblocking(sd);
  601. event_set(&m_net_incoming, sd, EV_READ | EV_PERSIST,
  602. net_connection_cb, NULL);
  603. btpd_ev_add(&m_net_incoming, NULL);
  604. }