A clone of btpd with my configuration changes.

617 lines
14 KiB

  1. #include <sys/types.h>
  2. #include <sys/uio.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <netdb.h>
  6. #include <sys/mman.h>
  7. #include <sys/wait.h>
  8. #include <assert.h>
  9. #include <errno.h>
  10. #include <fcntl.h>
  11. #include <math.h>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <unistd.h>
  16. #include "btpd.h"
  17. #define min(x, y) ((x) <= (y) ? (x) : (y))
  18. static struct event m_bw_timer;
  19. static unsigned long m_bw_bytes_in;
  20. static unsigned long m_bw_bytes_out;
  21. static struct event m_net_incoming;
  22. static unsigned m_ntorrents;
  23. static struct torrent_tq m_torrents = BTPDQ_HEAD_INITIALIZER(m_torrents);
  24. unsigned net_npeers;
  25. struct peer_tq net_bw_readq = BTPDQ_HEAD_INITIALIZER(net_bw_readq);
  26. struct peer_tq net_bw_writeq = BTPDQ_HEAD_INITIALIZER(net_bw_writeq);
  27. struct peer_tq net_unattached = BTPDQ_HEAD_INITIALIZER(net_unattached);
  28. void
  29. net_add_torrent(struct torrent *tp)
  30. {
  31. BTPDQ_INSERT_HEAD(&m_torrents, tp, net_entry);
  32. m_ntorrents++;
  33. dl_start(tp);
  34. }
  35. void
  36. net_del_torrent(struct torrent *tp)
  37. {
  38. assert(m_ntorrents > 0);
  39. m_ntorrents--;
  40. BTPDQ_REMOVE(&m_torrents, tp, net_entry);
  41. ul_on_lost_torrent(tp);
  42. dl_stop(tp);
  43. struct peer *p = BTPDQ_FIRST(&net_unattached);
  44. while (p != NULL) {
  45. struct peer *next = BTPDQ_NEXT(p, p_entry);
  46. if (p->tp == tp)
  47. peer_kill(p);
  48. p = next;
  49. }
  50. }
  51. void
  52. net_write32(void *buf, uint32_t num)
  53. {
  54. *(uint32_t *)buf = htonl(num);
  55. }
  56. uint32_t
  57. net_read32(const void *buf)
  58. {
  59. return ntohl(*(uint32_t *)buf);
  60. }
  61. static unsigned long
  62. net_write(struct peer *p, unsigned long wmax)
  63. {
  64. struct nb_link *nl;
  65. struct iovec iov[IOV_MAX];
  66. int niov;
  67. int limited;
  68. ssize_t nwritten;
  69. unsigned long bcount;
  70. limited = wmax > 0;
  71. niov = 0;
  72. assert((nl = BTPDQ_FIRST(&p->outq)) != NULL);
  73. while ((niov < IOV_MAX && nl != NULL
  74. && (!limited || (limited && wmax > 0)))) {
  75. if (niov > 0) {
  76. iov[niov].iov_base = nl->nb->buf;
  77. iov[niov].iov_len = nl->nb->len;
  78. } else {
  79. iov[niov].iov_base = nl->nb->buf + p->outq_off;
  80. iov[niov].iov_len = nl->nb->len - p->outq_off;
  81. }
  82. if (limited) {
  83. if (iov[niov].iov_len > wmax)
  84. iov[niov].iov_len = wmax;
  85. wmax -= iov[niov].iov_len;
  86. }
  87. niov++;
  88. nl = BTPDQ_NEXT(nl, entry);
  89. }
  90. nwritten = writev(p->sd, iov, niov);
  91. if (nwritten < 0) {
  92. if (errno == EAGAIN) {
  93. event_add(&p->out_ev, WRITE_TIMEOUT);
  94. return 0;
  95. } else {
  96. btpd_log(BTPD_L_CONN, "write error: %s\n", strerror(errno));
  97. peer_kill(p);
  98. return 0;
  99. }
  100. } else if (nwritten == 0) {
  101. btpd_log(BTPD_L_CONN, "connection closed by peer.\n");
  102. peer_kill(p);
  103. return 0;
  104. }
  105. bcount = nwritten;
  106. nl = BTPDQ_FIRST(&p->outq);
  107. while (bcount > 0) {
  108. unsigned long bufdelta = nl->nb->len - p->outq_off;
  109. if (bcount >= bufdelta) {
  110. peer_sent(p, nl->nb);
  111. if (nl->nb->type == NB_TORRENTDATA) {
  112. p->tp->uploaded += bufdelta;
  113. p->count_up += bufdelta;
  114. }
  115. bcount -= bufdelta;
  116. BTPDQ_REMOVE(&p->outq, nl, entry);
  117. nb_drop(nl->nb);
  118. free(nl);
  119. p->outq_off = 0;
  120. nl = BTPDQ_FIRST(&p->outq);
  121. } else {
  122. if (nl->nb->type == NB_TORRENTDATA) {
  123. p->tp->uploaded += bcount;
  124. p->count_up += bcount;
  125. }
  126. p->outq_off += bcount;
  127. bcount = 0;
  128. }
  129. }
  130. if (!BTPDQ_EMPTY(&p->outq))
  131. event_add(&p->out_ev, WRITE_TIMEOUT);
  132. return nwritten;
  133. }
  134. void
  135. net_set_state(struct peer *p, enum net_state state, size_t size)
  136. {
  137. p->net.state = state;
  138. p->net.st_bytes = size;
  139. }
  140. static int
  141. net_dispatch_msg(struct peer *p, const char *buf)
  142. {
  143. uint32_t index, begin, length;
  144. int res = 0;
  145. switch (p->net.msg_num) {
  146. case MSG_CHOKE:
  147. peer_on_choke(p);
  148. break;
  149. case MSG_UNCHOKE:
  150. peer_on_unchoke(p);
  151. break;
  152. case MSG_INTEREST:
  153. peer_on_interest(p);
  154. break;
  155. case MSG_UNINTEREST:
  156. peer_on_uninterest(p);
  157. break;
  158. case MSG_HAVE:
  159. peer_on_have(p, net_read32(buf));
  160. break;
  161. case MSG_BITFIELD:
  162. if (p->npieces == 0)
  163. peer_on_bitfield(p, buf);
  164. else
  165. res = 1;
  166. break;
  167. case MSG_REQUEST:
  168. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) {
  169. index = net_read32(buf);
  170. begin = net_read32(buf + 4);
  171. length = net_read32(buf + 8);
  172. if ((length > PIECE_BLOCKLEN
  173. || index >= p->tp->meta.npieces
  174. || !has_bit(p->tp->piece_field, index)
  175. || begin + length > torrent_piece_size(p->tp, index))) {
  176. btpd_log(BTPD_L_MSG, "bad request: (%u, %u, %u) from %p\n",
  177. index, begin, length, p);
  178. res = 1;
  179. break;
  180. }
  181. peer_on_request(p, index, begin, length);
  182. }
  183. break;
  184. case MSG_CANCEL:
  185. index = net_read32(buf);
  186. begin = net_read32(buf + 4);
  187. length = net_read32(buf + 8);
  188. peer_on_cancel(p, index, begin, length);
  189. break;
  190. case MSG_PIECE:
  191. length = p->net.msg_len - 9;
  192. peer_on_piece(p, p->net.pc_index, p->net.pc_begin, length, buf);
  193. break;
  194. default:
  195. abort();
  196. }
  197. return res;
  198. }
  199. static int
  200. net_mh_ok(struct peer *p)
  201. {
  202. uint32_t mlen = p->net.msg_len;
  203. switch (p->net.msg_num) {
  204. case MSG_CHOKE:
  205. case MSG_UNCHOKE:
  206. case MSG_INTEREST:
  207. case MSG_UNINTEREST:
  208. return mlen == 1;
  209. case MSG_HAVE:
  210. return mlen == 5;
  211. case MSG_BITFIELD:
  212. return mlen == (uint32_t)ceil(p->tp->meta.npieces / 8.0) + 1;
  213. case MSG_REQUEST:
  214. case MSG_CANCEL:
  215. return mlen == 13;
  216. case MSG_PIECE:
  217. return mlen <= PIECE_BLOCKLEN + 9;
  218. default:
  219. return 0;
  220. }
  221. }
  222. static void
  223. net_progress(struct peer *p, size_t length)
  224. {
  225. if (p->net.state == BTP_MSGBODY && p->net.msg_num == MSG_PIECE) {
  226. p->tp->downloaded += length;
  227. p->count_dwn += length;
  228. }
  229. }
  230. static int
  231. net_state(struct peer *p, const char *buf)
  232. {
  233. switch (p->net.state) {
  234. case SHAKE_PSTR:
  235. if (bcmp(buf, "\x13""BitTorrent protocol", 20) != 0)
  236. goto bad;
  237. net_set_state(p, SHAKE_INFO, 20);
  238. break;
  239. case SHAKE_INFO:
  240. if (p->flags & PF_INCOMING) {
  241. struct torrent *tp;
  242. BTPDQ_FOREACH(tp, &m_torrents, net_entry)
  243. if (bcmp(buf, tp->meta.info_hash, 20) == 0)
  244. break;
  245. if (tp == NULL)
  246. goto bad;
  247. p->tp = tp;
  248. peer_send(p, nb_create_shake(p->tp));
  249. } else if (bcmp(buf, p->tp->meta.info_hash, 20) != 0)
  250. goto bad;
  251. net_set_state(p, SHAKE_ID, 20);
  252. break;
  253. case SHAKE_ID:
  254. if ((torrent_has_peer(p->tp, buf)
  255. || bcmp(buf, btpd_get_peer_id(), 20) == 0))
  256. goto bad;
  257. bcopy(buf, p->id, 20);
  258. peer_on_shake(p);
  259. net_set_state(p, BTP_MSGSIZE, 4);
  260. break;
  261. case BTP_MSGSIZE:
  262. p->net.msg_len = net_read32(buf);
  263. if (p->net.msg_len == 0)
  264. peer_on_keepalive(p);
  265. else
  266. net_set_state(p, BTP_MSGHEAD, 1);
  267. break;
  268. case BTP_MSGHEAD:
  269. p->net.msg_num = buf[0];
  270. if (!net_mh_ok(p))
  271. goto bad;
  272. else if (p->net.msg_len == 1) {
  273. if (net_dispatch_msg(p, buf) != 0)
  274. goto bad;
  275. net_set_state(p, BTP_MSGSIZE, 4);
  276. } else if (p->net.msg_num == MSG_PIECE)
  277. net_set_state(p, BTP_PIECEMETA, 8);
  278. else
  279. net_set_state(p, BTP_MSGBODY, p->net.msg_len - 1);
  280. break;
  281. case BTP_PIECEMETA:
  282. p->net.pc_index = net_read32(buf);
  283. p->net.pc_begin = net_read32(buf + 4);
  284. net_set_state(p, BTP_MSGBODY, p->net.msg_len - 9);
  285. break;
  286. case BTP_MSGBODY:
  287. if (net_dispatch_msg(p, buf) != 0)
  288. goto bad;
  289. net_set_state(p, BTP_MSGSIZE, 4);
  290. break;
  291. default:
  292. abort();
  293. }
  294. return 0;
  295. bad:
  296. btpd_log(BTPD_L_CONN, "bad data from %p (%u, %u, %u).\n",
  297. p, p->net.state, p->net.msg_len, p->net.msg_num);
  298. peer_kill(p);
  299. return -1;
  300. }
  301. #define GRBUFLEN (1 << 15)
  302. static unsigned long
  303. net_read(struct peer *p, unsigned long rmax)
  304. {
  305. size_t rest = p->net.buf != NULL ? p->net.st_bytes - p->net.off : 0;
  306. char buf[GRBUFLEN];
  307. struct iovec iov[2] = {
  308. {
  309. p->net.buf + p->net.off,
  310. rest
  311. }, {
  312. buf,
  313. sizeof(buf)
  314. }
  315. };
  316. if (rmax > 0) {
  317. if (iov[0].iov_len > rmax)
  318. iov[0].iov_len = rmax;
  319. iov[1].iov_len = min(rmax - iov[0].iov_len, iov[1].iov_len);
  320. }
  321. ssize_t nread = readv(p->sd, iov, 2);
  322. if (nread < 0 && errno == EAGAIN)
  323. goto out;
  324. else if (nread < 0) {
  325. btpd_log(BTPD_L_CONN, "Read error (%s) on %p.\n", strerror(errno), p);
  326. peer_kill(p);
  327. return 0;
  328. } else if (nread == 0) {
  329. btpd_log(BTPD_L_CONN, "Connection closed by %p.\n", p);
  330. peer_kill(p);
  331. return 0;
  332. }
  333. if (rest > 0) {
  334. if (nread < rest) {
  335. p->net.off += nread;
  336. net_progress(p, nread);
  337. goto out;
  338. }
  339. net_progress(p, rest);
  340. if (net_state(p, p->net.buf) != 0)
  341. return nread;
  342. free(p->net.buf);
  343. p->net.buf = NULL;
  344. p->net.off = 0;
  345. }
  346. iov[1].iov_len = nread - rest;
  347. while (p->net.st_bytes <= iov[1].iov_len) {
  348. size_t consumed = p->net.st_bytes;
  349. net_progress(p, consumed);
  350. if (net_state(p, iov[1].iov_base) != 0)
  351. return nread;
  352. iov[1].iov_base += consumed;
  353. iov[1].iov_len -= consumed;
  354. }
  355. if (iov[1].iov_len > 0) {
  356. net_progress(p, iov[1].iov_len);
  357. p->net.off = iov[1].iov_len;
  358. p->net.buf = btpd_malloc(p->net.st_bytes);
  359. bcopy(iov[1].iov_base, p->net.buf, iov[1].iov_len);
  360. }
  361. out:
  362. event_add(&p->in_ev, NULL);
  363. return nread > 0 ? nread : 0;
  364. }
  365. int
  366. net_connect2(struct sockaddr *sa, socklen_t salen, int *sd)
  367. {
  368. if ((*sd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  369. return errno;
  370. set_nonblocking(*sd);
  371. if (connect(*sd, sa, salen) == -1 && errno != EINPROGRESS) {
  372. btpd_log(BTPD_L_CONN, "Botched connection %s.", strerror(errno));
  373. close(*sd);
  374. return errno;
  375. }
  376. return 0;
  377. }
  378. int
  379. net_connect(const char *ip, int port, int *sd)
  380. {
  381. struct addrinfo hints, *res;
  382. char portstr[6];
  383. assert(net_npeers < net_max_peers);
  384. if (snprintf(portstr, sizeof(portstr), "%d", port) >= sizeof(portstr))
  385. return EINVAL;
  386. bzero(&hints, sizeof(hints));
  387. hints.ai_family = AF_UNSPEC;
  388. hints.ai_flags = AI_NUMERICHOST;
  389. hints.ai_socktype = SOCK_STREAM;
  390. if (getaddrinfo(ip, portstr, &hints, &res) != 0)
  391. return errno;
  392. int error = net_connect2(res->ai_addr, res->ai_addrlen, sd);
  393. freeaddrinfo(res);
  394. return error;
  395. }
  396. void
  397. net_connection_cb(int sd, short type, void *arg)
  398. {
  399. int nsd;
  400. nsd = accept(sd, NULL, NULL);
  401. if (nsd < 0) {
  402. if (errno == EWOULDBLOCK || errno == ECONNABORTED)
  403. return;
  404. else
  405. btpd_err("accept4: %s\n", strerror(errno));
  406. }
  407. if (set_nonblocking(nsd) != 0) {
  408. close(nsd);
  409. return;
  410. }
  411. assert(net_npeers <= net_max_peers);
  412. if (net_npeers == net_max_peers) {
  413. close(nsd);
  414. return;
  415. }
  416. peer_create_in(nsd);
  417. btpd_log(BTPD_L_CONN, "got connection.\n");
  418. }
  419. #define RATEHISTORY 20
  420. long
  421. compute_rate_sub(long rate)
  422. {
  423. if (rate > 256 * RATEHISTORY)
  424. return rate / RATEHISTORY;
  425. else
  426. return 256;
  427. }
  428. static void
  429. compute_peer_rates(void) {
  430. struct torrent *tp;
  431. BTPDQ_FOREACH(tp, &m_torrents, net_entry) {
  432. struct peer *p;
  433. BTPDQ_FOREACH(p, &tp->peers, p_entry) {
  434. if (p->count_up > 0 || peer_active_up(p)) {
  435. p->rate_up += p->count_up - compute_rate_sub(p->rate_up);
  436. if (p->rate_up < 0)
  437. p->rate_up = 0;
  438. p->count_up = 0;
  439. }
  440. if (p->count_dwn > 0 || peer_active_down(p)) {
  441. p->rate_dwn += p->count_dwn - compute_rate_sub(p->rate_dwn);
  442. if (p->rate_dwn < 0)
  443. p->rate_dwn = 0;
  444. p->count_dwn = 0;
  445. }
  446. }
  447. }
  448. }
  449. void
  450. net_bw_cb(int sd, short type, void *arg)
  451. {
  452. struct peer *p;
  453. evtimer_add(&m_bw_timer, (& (struct timeval) { 1, 0 }));
  454. compute_peer_rates();
  455. m_bw_bytes_out = net_bw_limit_out;
  456. m_bw_bytes_in = net_bw_limit_in;
  457. if (net_bw_limit_in > 0) {
  458. while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL && m_bw_bytes_in > 0) {
  459. BTPDQ_REMOVE(&net_bw_readq, p, rq_entry);
  460. p->flags &= ~PF_ON_READQ;
  461. m_bw_bytes_in -= net_read(p, m_bw_bytes_in);
  462. }
  463. } else {
  464. while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL) {
  465. BTPDQ_REMOVE(&net_bw_readq, p, rq_entry);
  466. p->flags &= ~PF_ON_READQ;
  467. net_read(p, 0);
  468. }
  469. }
  470. if (net_bw_limit_out) {
  471. while ((p = BTPDQ_FIRST(&net_bw_writeq)) != NULL && m_bw_bytes_out > 0) {
  472. BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry);
  473. p->flags &= ~PF_ON_WRITEQ;
  474. m_bw_bytes_out -= net_write(p, m_bw_bytes_out);
  475. }
  476. } else {
  477. while ((p = BTPDQ_FIRST(&net_bw_writeq)) != NULL) {
  478. BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry);
  479. p->flags &= ~PF_ON_WRITEQ;
  480. net_write(p, 0);
  481. }
  482. }
  483. }
  484. void
  485. net_read_cb(int sd, short type, void *arg)
  486. {
  487. struct peer *p = (struct peer *)arg;
  488. if (net_bw_limit_in == 0)
  489. net_read(p, 0);
  490. else if (m_bw_bytes_in > 0)
  491. m_bw_bytes_in -= net_read(p, m_bw_bytes_in);
  492. else {
  493. p->flags |= PF_ON_READQ;
  494. BTPDQ_INSERT_TAIL(&net_bw_readq, p, rq_entry);
  495. }
  496. }
  497. void
  498. net_write_cb(int sd, short type, void *arg)
  499. {
  500. struct peer *p = (struct peer *)arg;
  501. if (type == EV_TIMEOUT) {
  502. btpd_log(BTPD_L_CONN, "Write attempt timed out.\n");
  503. peer_kill(p);
  504. return;
  505. }
  506. if (net_bw_limit_out == 0) {
  507. net_write(p, 0);
  508. } else if (m_bw_bytes_out > 0) {
  509. m_bw_bytes_out -= net_write(p, m_bw_bytes_out);
  510. } else {
  511. p->flags |= PF_ON_WRITEQ;
  512. BTPDQ_INSERT_TAIL(&net_bw_writeq, p, wq_entry);
  513. }
  514. }
  515. void
  516. net_init(void)
  517. {
  518. m_bw_bytes_out = net_bw_limit_out;
  519. m_bw_bytes_in = net_bw_limit_in;
  520. int nfiles = getdtablesize();
  521. if (nfiles <= 20)
  522. btpd_err("Too few open files allowed (%d). "
  523. "Check \"ulimit -n\"\n", nfiles);
  524. else if (nfiles < 64)
  525. btpd_log(BTPD_L_BTPD,
  526. "You have restricted the number of open files to %d. "
  527. "More could be beneficial to the download performance.\n",
  528. nfiles);
  529. net_max_peers = nfiles - 20;
  530. int sd;
  531. int flag = 1;
  532. struct sockaddr_in addr;
  533. addr.sin_family = AF_INET;
  534. addr.sin_addr.s_addr = htonl(INADDR_ANY);
  535. addr.sin_port = htons(net_port);
  536. if ((sd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
  537. btpd_err("socket: %s\n", strerror(errno));
  538. setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
  539. if (bind(sd, (struct sockaddr *)&addr, sizeof(addr)) == -1)
  540. btpd_err("bind: %s\n", strerror(errno));
  541. listen(sd, 10);
  542. set_nonblocking(sd);
  543. event_set(&m_net_incoming, sd, EV_READ | EV_PERSIST,
  544. net_connection_cb, NULL);
  545. event_add(&m_net_incoming, NULL);
  546. evtimer_set(&m_bw_timer, net_bw_cb, NULL);
  547. evtimer_add(&m_bw_timer, (& (struct timeval) { 1, 0 }));
  548. }