A clone of btpd with my configuration changes.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

671 lines
17 KiB

  1. #include <sys/types.h>
  2. #include <sys/uio.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <netdb.h>
  6. #include <sys/mman.h>
  7. #include <sys/wait.h>
  8. #include <assert.h>
  9. #include <errno.h>
  10. #include <fcntl.h>
  11. #include <math.h>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <unistd.h>
  16. #include "btpd.h"
  17. static struct event m_bw_timer;
  18. static unsigned long m_bw_bytes_in;
  19. static unsigned long m_bw_bytes_out;
  20. static unsigned long m_rate_up;
  21. static unsigned long m_rate_dwn;
  22. static struct event m_net_incoming;
  23. static unsigned m_ntorrents;
  24. static struct net_tq m_torrents = BTPDQ_HEAD_INITIALIZER(m_torrents);
  25. unsigned net_npeers;
  26. struct peer_tq net_bw_readq = BTPDQ_HEAD_INITIALIZER(net_bw_readq);
  27. struct peer_tq net_bw_writeq = BTPDQ_HEAD_INITIALIZER(net_bw_writeq);
  28. struct peer_tq net_unattached = BTPDQ_HEAD_INITIALIZER(net_unattached);
  29. int
  30. net_torrent_has_peer(struct net *n, const uint8_t *id)
  31. {
  32. int has = 0;
  33. struct peer *p = BTPDQ_FIRST(&n->peers);
  34. while (p != NULL) {
  35. if (bcmp(p->id, id, 20) == 0) {
  36. has = 1;
  37. break;
  38. }
  39. p = BTPDQ_NEXT(p, p_entry);
  40. }
  41. return has;
  42. }
  43. void
  44. net_create(struct torrent *tp)
  45. {
  46. size_t field_size = ceil(tp->meta.npieces / 8.0);
  47. size_t mem = sizeof(*(tp->net)) + field_size +
  48. tp->meta.npieces * sizeof(*(tp->net->piece_count));
  49. struct net *n = btpd_calloc(1, mem);
  50. n->tp = tp;
  51. tp->net = n;
  52. BTPDQ_INIT(&n->getlst);
  53. n->busy_field = (uint8_t *)(n + 1);
  54. n->piece_count = (unsigned *)(n->busy_field + field_size);
  55. }
  56. void
  57. net_kill(struct torrent *tp)
  58. {
  59. free(tp->net);
  60. tp->net = NULL;
  61. }
  62. void
  63. net_start(struct torrent *tp)
  64. {
  65. struct net *n = tp->net;
  66. BTPDQ_INSERT_HEAD(&m_torrents, n, entry);
  67. m_ntorrents++;
  68. n->active = 1;
  69. }
  70. void
  71. net_stop(struct torrent *tp)
  72. {
  73. struct net *n = tp->net;
  74. assert(m_ntorrents > 0);
  75. m_ntorrents--;
  76. BTPDQ_REMOVE(&m_torrents, n, entry);
  77. n->active = 0;
  78. ul_on_lost_torrent(n);
  79. struct piece *pc;
  80. while ((pc = BTPDQ_FIRST(&n->getlst)) != NULL)
  81. piece_free(pc);
  82. struct peer *p = BTPDQ_FIRST(&net_unattached);
  83. while (p != NULL) {
  84. struct peer *next = BTPDQ_NEXT(p, p_entry);
  85. if (p->n == n)
  86. peer_kill(p);
  87. p = next;
  88. }
  89. p = BTPDQ_FIRST(&n->peers);
  90. while (p != NULL) {
  91. struct peer *next = BTPDQ_NEXT(p, p_entry);
  92. peer_kill(p);
  93. p = next;
  94. }
  95. }
  96. int
  97. net_active(struct torrent *tp)
  98. {
  99. return tp->net->active;
  100. }
  101. void
  102. net_write32(void *buf, uint32_t num)
  103. {
  104. *(uint32_t *)buf = htonl(num);
  105. }
  106. uint32_t
  107. net_read32(const void *buf)
  108. {
  109. return ntohl(*(uint32_t *)buf);
  110. }
  111. static unsigned long
  112. net_write(struct peer *p, unsigned long wmax)
  113. {
  114. struct nb_link *nl;
  115. struct iovec iov[IOV_MAX];
  116. int niov;
  117. int limited;
  118. ssize_t nwritten;
  119. unsigned long bcount;
  120. limited = wmax > 0;
  121. niov = 0;
  122. assert((nl = BTPDQ_FIRST(&p->outq)) != NULL);
  123. while ((niov < IOV_MAX && nl != NULL
  124. && (!limited || (limited && wmax > 0)))) {
  125. if (niov > 0) {
  126. iov[niov].iov_base = nl->nb->buf;
  127. iov[niov].iov_len = nl->nb->len;
  128. } else {
  129. iov[niov].iov_base = nl->nb->buf + p->outq_off;
  130. iov[niov].iov_len = nl->nb->len - p->outq_off;
  131. }
  132. if (limited) {
  133. if (iov[niov].iov_len > wmax)
  134. iov[niov].iov_len = wmax;
  135. wmax -= iov[niov].iov_len;
  136. }
  137. niov++;
  138. nl = BTPDQ_NEXT(nl, entry);
  139. }
  140. nwritten = writev(p->sd, iov, niov);
  141. if (nwritten < 0) {
  142. if (errno == EAGAIN) {
  143. event_add(&p->out_ev, WRITE_TIMEOUT);
  144. return 0;
  145. } else {
  146. btpd_log(BTPD_L_CONN, "write error: %s\n", strerror(errno));
  147. peer_kill(p);
  148. return 0;
  149. }
  150. } else if (nwritten == 0) {
  151. btpd_log(BTPD_L_CONN, "connection closed by peer.\n");
  152. peer_kill(p);
  153. return 0;
  154. }
  155. bcount = nwritten;
  156. nl = BTPDQ_FIRST(&p->outq);
  157. while (bcount > 0) {
  158. unsigned long bufdelta = nl->nb->len - p->outq_off;
  159. if (bcount >= bufdelta) {
  160. peer_sent(p, nl->nb);
  161. if (nl->nb->type == NB_TORRENTDATA) {
  162. p->n->uploaded += bufdelta;
  163. p->count_up += bufdelta;
  164. }
  165. bcount -= bufdelta;
  166. BTPDQ_REMOVE(&p->outq, nl, entry);
  167. nb_drop(nl->nb);
  168. free(nl);
  169. p->outq_off = 0;
  170. nl = BTPDQ_FIRST(&p->outq);
  171. } else {
  172. if (nl->nb->type == NB_TORRENTDATA) {
  173. p->n->uploaded += bcount;
  174. p->count_up += bcount;
  175. }
  176. p->outq_off += bcount;
  177. bcount = 0;
  178. }
  179. }
  180. if (!BTPDQ_EMPTY(&p->outq))
  181. event_add(&p->out_ev, WRITE_TIMEOUT);
  182. return nwritten;
  183. }
  184. static int
  185. net_dispatch_msg(struct peer *p, const char *buf)
  186. {
  187. uint32_t index, begin, length;
  188. int res = 0;
  189. switch (p->in.msg_num) {
  190. case MSG_CHOKE:
  191. peer_on_choke(p);
  192. break;
  193. case MSG_UNCHOKE:
  194. peer_on_unchoke(p);
  195. break;
  196. case MSG_INTEREST:
  197. peer_on_interest(p);
  198. break;
  199. case MSG_UNINTEREST:
  200. peer_on_uninterest(p);
  201. break;
  202. case MSG_HAVE:
  203. peer_on_have(p, net_read32(buf));
  204. break;
  205. case MSG_BITFIELD:
  206. if (p->npieces == 0)
  207. peer_on_bitfield(p, buf);
  208. else
  209. res = 1;
  210. break;
  211. case MSG_REQUEST:
  212. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) {
  213. index = net_read32(buf);
  214. begin = net_read32(buf + 4);
  215. length = net_read32(buf + 8);
  216. if ((length > PIECE_BLOCKLEN
  217. || index >= p->n->tp->meta.npieces
  218. || !cm_has_piece(p->n->tp, index)
  219. || begin + length > torrent_piece_size(p->n->tp, index))) {
  220. btpd_log(BTPD_L_MSG, "bad request: (%u, %u, %u) from %p\n",
  221. index, begin, length, p);
  222. res = 1;
  223. break;
  224. }
  225. peer_on_request(p, index, begin, length);
  226. }
  227. break;
  228. case MSG_CANCEL:
  229. index = net_read32(buf);
  230. begin = net_read32(buf + 4);
  231. length = net_read32(buf + 8);
  232. peer_on_cancel(p, index, begin, length);
  233. break;
  234. case MSG_PIECE:
  235. length = p->in.msg_len - 9;
  236. peer_on_piece(p, p->in.pc_index, p->in.pc_begin, length, buf);
  237. break;
  238. default:
  239. abort();
  240. }
  241. return res;
  242. }
  243. static int
  244. net_mh_ok(struct peer *p)
  245. {
  246. uint32_t mlen = p->in.msg_len;
  247. switch (p->in.msg_num) {
  248. case MSG_CHOKE:
  249. case MSG_UNCHOKE:
  250. case MSG_INTEREST:
  251. case MSG_UNINTEREST:
  252. return mlen == 1;
  253. case MSG_HAVE:
  254. return mlen == 5;
  255. case MSG_BITFIELD:
  256. return mlen == (uint32_t)ceil(p->n->tp->meta.npieces / 8.0) + 1;
  257. case MSG_REQUEST:
  258. case MSG_CANCEL:
  259. return mlen == 13;
  260. case MSG_PIECE:
  261. return mlen <= PIECE_BLOCKLEN + 9;
  262. default:
  263. return 0;
  264. }
  265. }
  266. static void
  267. net_progress(struct peer *p, size_t length)
  268. {
  269. if (p->in.state == BTP_MSGBODY && p->in.msg_num == MSG_PIECE) {
  270. p->n->downloaded += length;
  271. p->count_dwn += length;
  272. }
  273. }
  274. static int
  275. net_state(struct peer *p, const char *buf)
  276. {
  277. switch (p->in.state) {
  278. case SHAKE_PSTR:
  279. if (bcmp(buf, "\x13""BitTorrent protocol", 20) != 0)
  280. goto bad;
  281. peer_set_in_state(p, SHAKE_INFO, 20);
  282. break;
  283. case SHAKE_INFO:
  284. if (p->flags & PF_INCOMING) {
  285. struct net *n;
  286. BTPDQ_FOREACH(n, &m_torrents, entry)
  287. if (bcmp(buf, n->tp->meta.info_hash, 20) == 0)
  288. break;
  289. if (n == NULL)
  290. goto bad;
  291. p->n = n;
  292. peer_send(p, nb_create_shake(p->n->tp));
  293. } else if (bcmp(buf, p->n->tp->meta.info_hash, 20) != 0)
  294. goto bad;
  295. peer_set_in_state(p, SHAKE_ID, 20);
  296. break;
  297. case SHAKE_ID:
  298. if ((net_torrent_has_peer(p->n, buf)
  299. || bcmp(buf, btpd_get_peer_id(), 20) == 0))
  300. goto bad;
  301. bcopy(buf, p->id, 20);
  302. peer_on_shake(p);
  303. peer_set_in_state(p, BTP_MSGSIZE, 4);
  304. break;
  305. case BTP_MSGSIZE:
  306. p->in.msg_len = net_read32(buf);
  307. if (p->in.msg_len == 0)
  308. peer_on_keepalive(p);
  309. else
  310. peer_set_in_state(p, BTP_MSGHEAD, 1);
  311. break;
  312. case BTP_MSGHEAD:
  313. p->in.msg_num = buf[0];
  314. if (!net_mh_ok(p))
  315. goto bad;
  316. else if (p->in.msg_len == 1) {
  317. if (net_dispatch_msg(p, buf) != 0)
  318. goto bad;
  319. peer_set_in_state(p, BTP_MSGSIZE, 4);
  320. } else if (p->in.msg_num == MSG_PIECE)
  321. peer_set_in_state(p, BTP_PIECEMETA, 8);
  322. else
  323. peer_set_in_state(p, BTP_MSGBODY, p->in.msg_len - 1);
  324. break;
  325. case BTP_PIECEMETA:
  326. p->in.pc_index = net_read32(buf);
  327. p->in.pc_begin = net_read32(buf + 4);
  328. peer_set_in_state(p, BTP_MSGBODY, p->in.msg_len - 9);
  329. break;
  330. case BTP_MSGBODY:
  331. if (net_dispatch_msg(p, buf) != 0)
  332. goto bad;
  333. peer_set_in_state(p, BTP_MSGSIZE, 4);
  334. break;
  335. default:
  336. abort();
  337. }
  338. return 0;
  339. bad:
  340. btpd_log(BTPD_L_CONN, "bad data from %p (%u, %u, %u).\n",
  341. p, p->in.state, p->in.msg_len, p->in.msg_num);
  342. peer_kill(p);
  343. return -1;
  344. }
  345. #define GRBUFLEN (1 << 15)
  346. static unsigned long
  347. net_read(struct peer *p, unsigned long rmax)
  348. {
  349. size_t rest = p->in.buf != NULL ? p->in.st_bytes - p->in.off : 0;
  350. char buf[GRBUFLEN];
  351. struct iovec iov[2] = {
  352. {
  353. p->in.buf + p->in.off,
  354. rest
  355. }, {
  356. buf,
  357. sizeof(buf)
  358. }
  359. };
  360. if (rmax > 0) {
  361. if (iov[0].iov_len > rmax)
  362. iov[0].iov_len = rmax;
  363. iov[1].iov_len = min(rmax - iov[0].iov_len, iov[1].iov_len);
  364. }
  365. ssize_t nread = readv(p->sd, iov, 2);
  366. if (nread < 0 && errno == EAGAIN)
  367. goto out;
  368. else if (nread < 0) {
  369. btpd_log(BTPD_L_CONN, "Read error (%s) on %p.\n", strerror(errno), p);
  370. peer_kill(p);
  371. return 0;
  372. } else if (nread == 0) {
  373. btpd_log(BTPD_L_CONN, "Connection closed by %p.\n", p);
  374. peer_kill(p);
  375. return 0;
  376. }
  377. if (rest > 0) {
  378. if (nread < rest) {
  379. p->in.off += nread;
  380. net_progress(p, nread);
  381. goto out;
  382. }
  383. net_progress(p, rest);
  384. if (net_state(p, p->in.buf) != 0)
  385. return nread;
  386. free(p->in.buf);
  387. p->in.buf = NULL;
  388. p->in.off = 0;
  389. }
  390. iov[1].iov_len = nread - rest;
  391. while (p->in.st_bytes <= iov[1].iov_len) {
  392. size_t consumed = p->in.st_bytes;
  393. net_progress(p, consumed);
  394. if (net_state(p, iov[1].iov_base) != 0)
  395. return nread;
  396. iov[1].iov_base += consumed;
  397. iov[1].iov_len -= consumed;
  398. }
  399. if (iov[1].iov_len > 0) {
  400. net_progress(p, iov[1].iov_len);
  401. p->in.off = iov[1].iov_len;
  402. p->in.buf = btpd_malloc(p->in.st_bytes);
  403. bcopy(iov[1].iov_base, p->in.buf, iov[1].iov_len);
  404. }
  405. out:
  406. event_add(&p->in_ev, NULL);
  407. return nread > 0 ? nread : 0;
  408. }
  409. int
  410. net_connect2(struct sockaddr *sa, socklen_t salen, int *sd)
  411. {
  412. if ((*sd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  413. return errno;
  414. set_nonblocking(*sd);
  415. if (connect(*sd, sa, salen) == -1 && errno != EINPROGRESS) {
  416. int err = errno;
  417. btpd_log(BTPD_L_CONN, "Botched connection %s.\n", strerror(errno));
  418. close(*sd);
  419. return err;
  420. }
  421. return 0;
  422. }
  423. int
  424. net_connect(const char *ip, int port, int *sd)
  425. {
  426. struct addrinfo hints, *res;
  427. char portstr[6];
  428. assert(net_npeers < net_max_peers);
  429. if (snprintf(portstr, sizeof(portstr), "%d", port) >= sizeof(portstr))
  430. return EINVAL;
  431. bzero(&hints, sizeof(hints));
  432. hints.ai_family = AF_UNSPEC;
  433. hints.ai_flags = AI_NUMERICHOST;
  434. hints.ai_socktype = SOCK_STREAM;
  435. if (getaddrinfo(ip, portstr, &hints, &res) != 0)
  436. return errno;
  437. int error = net_connect2(res->ai_addr, res->ai_addrlen, sd);
  438. freeaddrinfo(res);
  439. return error;
  440. }
  441. void
  442. net_connection_cb(int sd, short type, void *arg)
  443. {
  444. int nsd;
  445. nsd = accept(sd, NULL, NULL);
  446. if (nsd < 0) {
  447. if (errno == EWOULDBLOCK || errno == ECONNABORTED)
  448. return;
  449. else
  450. btpd_err("accept4: %s\n", strerror(errno));
  451. }
  452. if (set_nonblocking(nsd) != 0) {
  453. close(nsd);
  454. return;
  455. }
  456. assert(net_npeers <= net_max_peers);
  457. if (net_npeers == net_max_peers) {
  458. close(nsd);
  459. return;
  460. }
  461. peer_create_in(nsd);
  462. btpd_log(BTPD_L_CONN, "got connection.\n");
  463. }
  464. #define RATEHISTORY 20
  465. static unsigned long
  466. compute_rate_sub(unsigned long rate)
  467. {
  468. if (rate > 256 * RATEHISTORY)
  469. return rate / RATEHISTORY;
  470. else
  471. return min(256, rate);
  472. }
  473. static void
  474. compute_rates(void) {
  475. unsigned long tot_up = 0, tot_dwn = 0;
  476. struct net *n;
  477. BTPDQ_FOREACH(n, &m_torrents, entry) {
  478. unsigned long tp_up = 0, tp_dwn = 0;
  479. struct peer *p;
  480. BTPDQ_FOREACH(p, &n->peers, p_entry) {
  481. if (p->count_up > 0 || peer_active_up(p)) {
  482. tp_up += p->count_up;
  483. p->rate_up += p->count_up - compute_rate_sub(p->rate_up);
  484. p->count_up = 0;
  485. }
  486. if (p->count_dwn > 0 || peer_active_down(p)) {
  487. tp_dwn += p->count_dwn;
  488. p->rate_dwn += p->count_dwn - compute_rate_sub(p->rate_dwn);
  489. p->count_dwn = 0;
  490. }
  491. }
  492. n->rate_up += tp_up - compute_rate_sub(n->rate_up);
  493. n->rate_dwn += tp_dwn - compute_rate_sub(n->rate_dwn);
  494. tot_up += tp_up;
  495. tot_dwn += tp_dwn;
  496. }
  497. m_rate_up += tot_up - compute_rate_sub(m_rate_up);
  498. m_rate_dwn += tot_dwn - compute_rate_sub(m_rate_dwn);
  499. }
  500. void
  501. net_bw_cb(int sd, short type, void *arg)
  502. {
  503. struct peer *p;
  504. evtimer_add(&m_bw_timer, (& (struct timeval) { 1, 0 }));
  505. compute_rates();
  506. m_bw_bytes_out = net_bw_limit_out;
  507. m_bw_bytes_in = net_bw_limit_in;
  508. if (net_bw_limit_in > 0) {
  509. while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL && m_bw_bytes_in > 0) {
  510. BTPDQ_REMOVE(&net_bw_readq, p, rq_entry);
  511. p->flags &= ~PF_ON_READQ;
  512. m_bw_bytes_in -= net_read(p, m_bw_bytes_in);
  513. }
  514. } else {
  515. while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL) {
  516. BTPDQ_REMOVE(&net_bw_readq, p, rq_entry);
  517. p->flags &= ~PF_ON_READQ;
  518. net_read(p, 0);
  519. }
  520. }
  521. if (net_bw_limit_out) {
  522. while ((p = BTPDQ_FIRST(&net_bw_writeq)) != NULL && m_bw_bytes_out > 0) {
  523. BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry);
  524. p->flags &= ~PF_ON_WRITEQ;
  525. m_bw_bytes_out -= net_write(p, m_bw_bytes_out);
  526. }
  527. } else {
  528. while ((p = BTPDQ_FIRST(&net_bw_writeq)) != NULL) {
  529. BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry);
  530. p->flags &= ~PF_ON_WRITEQ;
  531. net_write(p, 0);
  532. }
  533. }
  534. }
  535. void
  536. net_read_cb(int sd, short type, void *arg)
  537. {
  538. struct peer *p = (struct peer *)arg;
  539. if (net_bw_limit_in == 0)
  540. net_read(p, 0);
  541. else if (m_bw_bytes_in > 0)
  542. m_bw_bytes_in -= net_read(p, m_bw_bytes_in);
  543. else {
  544. p->flags |= PF_ON_READQ;
  545. BTPDQ_INSERT_TAIL(&net_bw_readq, p, rq_entry);
  546. }
  547. }
  548. void
  549. net_write_cb(int sd, short type, void *arg)
  550. {
  551. struct peer *p = (struct peer *)arg;
  552. if (type == EV_TIMEOUT) {
  553. btpd_log(BTPD_L_CONN, "Write attempt timed out.\n");
  554. peer_kill(p);
  555. return;
  556. }
  557. if (net_bw_limit_out == 0) {
  558. net_write(p, 0);
  559. } else if (m_bw_bytes_out > 0) {
  560. m_bw_bytes_out -= net_write(p, m_bw_bytes_out);
  561. } else {
  562. p->flags |= PF_ON_WRITEQ;
  563. BTPDQ_INSERT_TAIL(&net_bw_writeq, p, wq_entry);
  564. }
  565. }
  566. void
  567. net_init(void)
  568. {
  569. m_bw_bytes_out = net_bw_limit_out;
  570. m_bw_bytes_in = net_bw_limit_in;
  571. int safe_fds = min(getdtablesize(), FD_SETSIZE) * 4 / 5;
  572. if (net_max_peers == 0 || net_max_peers > safe_fds)
  573. net_max_peers = safe_fds;
  574. int sd;
  575. int flag = 1;
  576. struct sockaddr_in addr;
  577. addr.sin_family = AF_INET;
  578. addr.sin_addr.s_addr = htonl(INADDR_ANY);
  579. addr.sin_port = htons(net_port);
  580. if ((sd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
  581. btpd_err("socket: %s\n", strerror(errno));
  582. setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
  583. if (bind(sd, (struct sockaddr *)&addr, sizeof(addr)) == -1)
  584. btpd_err("bind: %s\n", strerror(errno));
  585. listen(sd, 10);
  586. set_nonblocking(sd);
  587. event_set(&m_net_incoming, sd, EV_READ | EV_PERSIST,
  588. net_connection_cb, NULL);
  589. event_add(&m_net_incoming, NULL);
  590. evtimer_set(&m_bw_timer, net_bw_cb, NULL);
  591. evtimer_add(&m_bw_timer, (& (struct timeval) { 1, 0 }));
  592. }