A clone of btpd with my configuration changes.

928 řádky
21 KiB

  1. #include <sys/types.h>
  2. #include <sys/uio.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <netdb.h>
  6. #include <sys/mman.h>
  7. #include <sys/wait.h>
  8. #include <assert.h>
  9. #include <errno.h>
  10. #include <fcntl.h>
  11. #include <math.h>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <unistd.h>
  16. #include "btpd.h"
  17. #define WRITE_TIMEOUT (& (struct timeval) { 60, 0 })
  18. #define min(x, y) ((x) <= (y) ? (x) : (y))
  19. static unsigned long
  20. net_write(struct peer *p, unsigned long wmax);
  21. void
  22. net_read_cb(int sd, short type, void *arg)
  23. {
  24. struct peer *p = (struct peer *)arg;
  25. if (btpd.ibwlim == 0) {
  26. p->reader->read(p, 0);
  27. } else if (btpd.ibw_left > 0) {
  28. btpd.ibw_left -= p->reader->read(p, btpd.ibw_left);
  29. } else {
  30. p->flags |= PF_ON_READQ;
  31. BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry);
  32. }
  33. }
  34. void
  35. net_write_cb(int sd, short type, void *arg)
  36. {
  37. struct peer *p = (struct peer *)arg;
  38. if (type == EV_TIMEOUT) {
  39. btpd_log(BTPD_L_ERROR, "Write attempt timed out.\n");
  40. peer_kill(p);
  41. return;
  42. }
  43. if (btpd.obwlim == 0) {
  44. net_write(p, 0);
  45. } else if (btpd.obw_left > 0) {
  46. btpd.obw_left -= net_write(p, btpd.obw_left);
  47. } else {
  48. p->flags |= PF_ON_WRITEQ;
  49. BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry);
  50. }
  51. }
  52. static void
  53. kill_buf_no(char *buf, size_t len)
  54. {
  55. //Nothing
  56. }
  57. static void
  58. kill_buf_free(char *buf, size_t len)
  59. {
  60. free(buf);
  61. }
  62. int
  63. nb_drop(struct net_buf *nb)
  64. {
  65. assert(nb->refs > 0);
  66. nb->refs--;
  67. if (nb->refs == 0) {
  68. nb->kill_buf(nb->buf, nb->len);
  69. free(nb);
  70. return 1;
  71. } else
  72. return 0;
  73. }
  74. void
  75. nb_hold(struct net_buf *nb)
  76. {
  77. nb->refs++;
  78. }
  79. struct net_buf *
  80. nb_create_alloc(short type, size_t len)
  81. {
  82. struct net_buf *nb = btpd_calloc(1, sizeof(*nb) + len);
  83. nb->info.type = type;
  84. nb->buf = (char *)(nb + 1);
  85. nb->len = len;
  86. nb->kill_buf = kill_buf_no;
  87. return nb;
  88. }
  89. struct net_buf *
  90. nb_create_set(short type, char *buf, size_t len,
  91. void (*kill_buf)(char *, size_t))
  92. {
  93. struct net_buf *nb = btpd_calloc(1, sizeof(*nb));
  94. nb->info.type = type;
  95. nb->buf = buf;
  96. nb->len = len;
  97. nb->kill_buf = kill_buf;
  98. return nb;
  99. }
  100. void
  101. kill_shake(struct input_reader *reader)
  102. {
  103. free(reader);
  104. }
  105. #define NIOV 16
  106. static unsigned long
  107. net_write(struct peer *p, unsigned long wmax)
  108. {
  109. struct nb_link *nl;
  110. struct iovec iov[NIOV];
  111. int niov;
  112. int limited;
  113. ssize_t nwritten;
  114. unsigned long bcount;
  115. limited = wmax > 0;
  116. niov = 0;
  117. assert((nl = BTPDQ_FIRST(&p->outq)) != NULL);
  118. while (niov < NIOV && nl != NULL && (!limited || (limited && wmax > 0))) {
  119. if (niov > 0) {
  120. iov[niov].iov_base = nl->nb->buf;
  121. iov[niov].iov_len = nl->nb->len;
  122. } else {
  123. iov[niov].iov_base = nl->nb->buf + p->outq_off;
  124. iov[niov].iov_len = nl->nb->len - p->outq_off;
  125. }
  126. if (limited) {
  127. if (iov[niov].iov_len > wmax)
  128. iov[niov].iov_len = wmax;
  129. wmax -= iov[niov].iov_len;
  130. }
  131. niov++;
  132. nl = BTPDQ_NEXT(nl, entry);
  133. }
  134. nwritten = writev(p->sd, iov, niov);
  135. if (nwritten < 0) {
  136. if (errno == EAGAIN) {
  137. event_add(&p->out_ev, WRITE_TIMEOUT);
  138. return 0;
  139. } else {
  140. btpd_log(BTPD_L_CONN, "write error: %s\n", strerror(errno));
  141. peer_kill(p);
  142. return 0;
  143. }
  144. } else if (nwritten == 0) {
  145. btpd_log(BTPD_L_CONN, "connection close by peer.\n");
  146. peer_kill(p);
  147. return 0;
  148. }
  149. bcount = nwritten;
  150. nl = BTPDQ_FIRST(&p->outq);
  151. while (bcount > 0) {
  152. unsigned long bufdelta = nl->nb->len - p->outq_off;
  153. if (bcount >= bufdelta) {
  154. if (nl->nb->info.type == NB_TORRENTDATA) {
  155. p->tp->uploaded += bufdelta;
  156. p->rate_from_me[btpd.seconds % RATEHISTORY] += bufdelta;
  157. }
  158. bcount -= bufdelta;
  159. BTPDQ_REMOVE(&p->outq, nl, entry);
  160. nb_drop(nl->nb);
  161. free(nl);
  162. p->outq_off = 0;
  163. nl = BTPDQ_FIRST(&p->outq);
  164. } else {
  165. if (nl->nb->info.type == NB_TORRENTDATA) {
  166. p->tp->uploaded += bcount;
  167. p->rate_from_me[btpd.seconds % RATEHISTORY] += bcount;
  168. }
  169. p->outq_off += bcount;
  170. bcount = 0;
  171. }
  172. }
  173. if (!BTPDQ_EMPTY(&p->outq))
  174. event_add(&p->out_ev, WRITE_TIMEOUT);
  175. else if (p->flags & PF_WRITE_CLOSE) {
  176. btpd_log(BTPD_L_CONN, "Closed because of write flag.\n");
  177. peer_kill(p);
  178. }
  179. return nwritten;
  180. }
  181. void
  182. net_send(struct peer *p, struct net_buf *nb)
  183. {
  184. struct nb_link *nl = btpd_calloc(1, sizeof(*nl));
  185. nl->nb = nb;
  186. nb_hold(nb);
  187. if (BTPDQ_EMPTY(&p->outq)) {
  188. assert(p->outq_off == 0);
  189. event_add(&p->out_ev, WRITE_TIMEOUT);
  190. }
  191. BTPDQ_INSERT_TAIL(&p->outq, nl, entry);
  192. }
  193. /*
  194. * Remove a network buffer from the peer's outq.
  195. * If a part of the buffer already have been written
  196. * to the network it cannot be removed.
  197. *
  198. * Returns 1 if the buffer is removed, 0 if not.
  199. */
  200. int
  201. net_unsend(struct peer *p, struct nb_link *nl)
  202. {
  203. if (!(nl == BTPDQ_FIRST(&p->outq) && p->outq_off > 0)) {
  204. BTPDQ_REMOVE(&p->outq, nl, entry);
  205. nb_drop(nl->nb);
  206. free(nl);
  207. if (BTPDQ_EMPTY(&p->outq)) {
  208. if (p->flags & PF_ON_WRITEQ) {
  209. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  210. p->flags &= ~PF_ON_WRITEQ;
  211. } else
  212. event_del(&p->out_ev);
  213. }
  214. return 1;
  215. } else
  216. return 0;
  217. }
  218. void
  219. net_write32(void *buf, uint32_t num)
  220. {
  221. *(uint32_t *)buf = htonl(num);
  222. }
  223. uint32_t
  224. net_read32(void *buf)
  225. {
  226. return ntohl(*(uint32_t *)buf);
  227. }
  228. void
  229. net_send_piece(struct peer *p, uint32_t index, uint32_t begin,
  230. char *block, size_t blen)
  231. {
  232. struct net_buf *head, *piece;
  233. btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen);
  234. head = nb_create_alloc(NB_PIECE, 13);
  235. net_write32(head->buf, 9 + blen);
  236. head->buf[4] = MSG_PIECE;
  237. net_write32(head->buf + 5, index);
  238. net_write32(head->buf + 9, begin);
  239. net_send(p, head);
  240. piece = nb_create_set(NB_TORRENTDATA, block, blen, kill_buf_free);
  241. piece->info.index = index;
  242. piece->info.begin = begin;
  243. piece->info.length = blen;
  244. net_send(p, piece);
  245. }
  246. void
  247. net_send_request(struct peer *p, struct piece_req *req)
  248. {
  249. struct net_buf *out = nb_create_alloc(NB_REQUEST, 17);
  250. net_write32(out->buf, 13);
  251. out->buf[4] = MSG_REQUEST;
  252. net_write32(out->buf + 5, req->index);
  253. net_write32(out->buf + 9, req->begin);
  254. net_write32(out->buf + 13, req->length);
  255. out->info.index = req->index;
  256. out->info.begin = req->begin;
  257. out->info.length = req->length;
  258. net_send(p, out);
  259. }
  260. void
  261. net_send_cancel(struct peer *p, struct piece_req *req)
  262. {
  263. struct net_buf *out = nb_create_alloc(NB_CANCEL, 17);
  264. net_write32(out->buf, 13);
  265. out->buf[4] = MSG_CANCEL;
  266. net_write32(out->buf + 5, req->index);
  267. net_write32(out->buf + 9, req->begin);
  268. net_write32(out->buf + 13, req->length);
  269. out->info.index = req->index;
  270. out->info.begin = req->begin;
  271. out->info.length = req->length;
  272. net_send(p, out);
  273. }
  274. void
  275. net_send_have(struct peer *p, uint32_t index)
  276. {
  277. struct net_buf *out = nb_create_alloc(NB_HAVE, 9);
  278. net_write32(out->buf, 5);
  279. out->buf[4] = MSG_HAVE;
  280. net_write32(out->buf + 5, index);
  281. out->info.index = index;
  282. net_send(p, out);
  283. }
  284. void
  285. net_send_multihave(struct peer *p)
  286. {
  287. struct torrent *tp = p->tp;
  288. struct net_buf *out = nb_create_alloc(NB_MULTIHAVE, 9 * tp->have_npieces);
  289. for (uint32_t i = 0, count = 0; count < tp->have_npieces; i++) {
  290. if (has_bit(tp->piece_field, i)) {
  291. net_write32(out->buf + count * 9, 5);
  292. out->buf[count * 9 + 4] = MSG_HAVE;
  293. net_write32(out->buf + count * 9 + 5, i);
  294. count++;
  295. }
  296. }
  297. net_send(p, out);
  298. }
  299. void
  300. net_send_onesized(struct peer *p, char mtype, int btype)
  301. {
  302. struct net_buf *out = nb_create_alloc(btype, 5);
  303. net_write32(out->buf, 1);
  304. out->buf[4] = mtype;
  305. net_send(p, out);
  306. }
  307. void
  308. net_send_unchoke(struct peer *p)
  309. {
  310. net_send_onesized(p, MSG_UNCHOKE, NB_UNCHOKE);
  311. }
  312. void
  313. net_send_choke(struct peer *p)
  314. {
  315. net_send_onesized(p, MSG_CHOKE, NB_CHOKE);
  316. }
  317. void
  318. net_send_uninterest(struct peer *p)
  319. {
  320. net_send_onesized(p, MSG_UNINTEREST, NB_UNINTEREST);
  321. }
  322. void
  323. net_send_interest(struct peer *p)
  324. {
  325. net_send_onesized(p, MSG_INTEREST, NB_INTEREST);
  326. }
  327. void
  328. net_send_bitfield(struct peer *p)
  329. {
  330. uint32_t plen = ceil(p->tp->meta.npieces / 8.0);
  331. struct net_buf *out = nb_create_alloc(NB_BITFIELD, 5);
  332. net_write32(out->buf, plen + 1);
  333. out->buf[4] = MSG_BITFIELD;
  334. net_send(p, out);
  335. out = nb_create_set(NB_BITDATA, p->tp->piece_field, plen, kill_buf_no);
  336. net_send(p, out);
  337. }
  338. void
  339. net_send_shake(struct peer *p)
  340. {
  341. struct net_buf *out = nb_create_alloc(NB_SHAKE, 68);
  342. bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->buf, 28);
  343. bcopy(p->tp->meta.info_hash, out->buf + 28, 20);
  344. bcopy(btpd.peer_id, out->buf + 48, 20);
  345. net_send(p, out);
  346. }
  347. static void
  348. kill_generic(struct input_reader *reader)
  349. {
  350. free(reader);
  351. }
  352. static size_t
  353. net_read(struct peer *p, char *buf, size_t len)
  354. {
  355. ssize_t nread = read(p->sd, buf, len);
  356. if (nread < 0) {
  357. if (errno == EAGAIN) {
  358. event_add(&p->in_ev, NULL);
  359. return 0;
  360. } else {
  361. btpd_log(BTPD_L_CONN, "read error: %s\n", strerror(errno));
  362. peer_kill(p);
  363. return 0;
  364. }
  365. } else if (nread == 0) {
  366. btpd_log(BTPD_L_CONN, "conn closed by other side.\n");
  367. if (!BTPDQ_EMPTY(&p->outq))
  368. p->flags |= PF_WRITE_CLOSE;
  369. else
  370. peer_kill(p);
  371. return 0;
  372. } else
  373. return nread;
  374. }
  375. static size_t
  376. net_read_to_buf(struct peer *p, struct io_buffer *iob, unsigned long rmax)
  377. {
  378. if (rmax == 0)
  379. rmax = iob->buf_len - iob->buf_off;
  380. else
  381. rmax = min(rmax, iob->buf_len - iob->buf_off);
  382. assert(rmax > 0);
  383. size_t nread = net_read(p, iob->buf + iob->buf_off, rmax);
  384. if (nread > 0)
  385. iob->buf_off += nread;
  386. return nread;
  387. }
  388. void
  389. kill_bitfield(struct input_reader *rd)
  390. {
  391. free(rd);
  392. }
  393. static void net_generic_reader(struct peer *p);
  394. static unsigned long
  395. read_bitfield(struct peer *p, unsigned long rmax)
  396. {
  397. struct bitfield_reader *rd = (struct bitfield_reader *)p->reader;
  398. size_t nread = net_read_to_buf(p, &rd->iob, rmax);
  399. if (nread == 0)
  400. return 0;
  401. if (rd->iob.buf_off == rd->iob.buf_len) {
  402. peer_on_bitfield(p, rd->iob.buf);
  403. free(rd);
  404. net_generic_reader(p);
  405. } else
  406. event_add(&p->in_ev, NULL);
  407. return nread;
  408. }
  409. void
  410. kill_piece(struct input_reader *rd)
  411. {
  412. free(rd);
  413. }
  414. static unsigned long
  415. read_piece(struct peer *p, unsigned long rmax)
  416. {
  417. struct piece_reader *rd = (struct piece_reader *)p->reader;
  418. size_t nread = net_read_to_buf(p, &rd->iob, rmax);
  419. if (nread == 0)
  420. return 0;
  421. p->rate_to_me[btpd.seconds % RATEHISTORY] += nread;
  422. p->tp->downloaded += nread;
  423. if (rd->iob.buf_off == rd->iob.buf_len) {
  424. peer_on_piece(p, rd->index, rd->begin, rd->iob.buf_len, rd->iob.buf);
  425. free(rd);
  426. net_generic_reader(p);
  427. } else
  428. event_add(&p->in_ev, NULL);
  429. return nread;
  430. }
  431. #define GRBUFLEN (1 << 15)
  432. static unsigned long
  433. net_generic_read(struct peer *p, unsigned long rmax)
  434. {
  435. char buf[GRBUFLEN];
  436. struct io_buffer iob = { 0, GRBUFLEN, buf };
  437. struct generic_reader *gr = (struct generic_reader *)p->reader;
  438. size_t nread;
  439. size_t off, len;
  440. int got_part;
  441. if (gr->iob.buf_off > 0) {
  442. iob.buf_off = gr->iob.buf_off;
  443. bcopy(gr->iob.buf, iob.buf, iob.buf_off);
  444. gr->iob.buf_off = 0;
  445. }
  446. if ((nread = net_read_to_buf(p, &iob, rmax)) == 0)
  447. return 0;
  448. len = iob.buf_off;
  449. off = 0;
  450. got_part = 0;
  451. while (!got_part && len - off >= 4) {
  452. size_t msg_len = net_read32(buf + off);
  453. if (msg_len == 0) { /* Keep alive */
  454. off += 4;
  455. continue;
  456. }
  457. if (len - off < 5) {
  458. got_part = 1;
  459. break;
  460. }
  461. switch (buf[off + 4]) {
  462. case MSG_CHOKE:
  463. btpd_log(BTPD_L_MSG, "choke.\n");
  464. if (msg_len != 1)
  465. goto bad_data;
  466. peer_on_choke(p);
  467. break;
  468. case MSG_UNCHOKE:
  469. btpd_log(BTPD_L_MSG, "unchoke.\n");
  470. if (msg_len != 1)
  471. goto bad_data;
  472. peer_on_unchoke(p);
  473. break;
  474. case MSG_INTEREST:
  475. btpd_log(BTPD_L_MSG, "interested.\n");
  476. if (msg_len != 1)
  477. goto bad_data;
  478. peer_on_interest(p);
  479. break;
  480. case MSG_UNINTEREST:
  481. btpd_log(BTPD_L_MSG, "uninterested.\n");
  482. if (msg_len != 1)
  483. goto bad_data;
  484. peer_on_uninterest(p);
  485. break;
  486. case MSG_HAVE:
  487. btpd_log(BTPD_L_MSG, "have.\n");
  488. if (msg_len != 5)
  489. goto bad_data;
  490. else if (len - off >= msg_len + 4) {
  491. uint32_t index = net_read32(buf + off + 5);
  492. peer_on_have(p, index);
  493. } else
  494. got_part = 1;
  495. break;
  496. case MSG_BITFIELD:
  497. btpd_log(BTPD_L_MSG, "bitfield.\n");
  498. if (msg_len != (size_t)ceil(p->tp->meta.npieces / 8.0) + 1)
  499. goto bad_data;
  500. else if (p->npieces != 0)
  501. goto bad_data;
  502. else if (len - off >= msg_len + 4)
  503. peer_on_bitfield(p, buf + off + 5);
  504. else {
  505. struct bitfield_reader *rp;
  506. size_t mem = sizeof(*rp) + msg_len - 1;
  507. p->reader->kill(p->reader);
  508. rp = btpd_calloc(1, mem);
  509. rp->rd.kill = kill_bitfield;
  510. rp->rd.read = read_bitfield;
  511. rp->iob.buf = (char *)rp + sizeof(*rp);
  512. rp->iob.buf_len = msg_len - 1;
  513. rp->iob.buf_off = len - off - 5;
  514. bcopy(buf + off + 5, rp->iob.buf, rp->iob.buf_off);
  515. p->reader = (struct input_reader *)rp;
  516. event_add(&p->in_ev, NULL);
  517. return nread;
  518. }
  519. break;
  520. case MSG_REQUEST:
  521. btpd_log(BTPD_L_MSG, "request.\n");
  522. if (msg_len != 13)
  523. goto bad_data;
  524. else if (len - off >= msg_len + 4) {
  525. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) != PF_P_WANT)
  526. break;
  527. uint32_t index, begin, length;
  528. index = net_read32(buf + off + 5);
  529. begin = net_read32(buf + off + 9);
  530. length = net_read32(buf + off + 13);
  531. if (length > (1 << 15))
  532. goto bad_data;
  533. if (index >= p->tp->meta.npieces)
  534. goto bad_data;
  535. if (!has_bit(p->tp->piece_field, index))
  536. goto bad_data;
  537. if (begin + length > torrent_piece_size(p->tp, index))
  538. goto bad_data;
  539. peer_on_request(p, index, begin, length);
  540. } else
  541. got_part = 1;
  542. break;
  543. case MSG_PIECE:
  544. btpd_log(BTPD_L_MSG, "piece.\n");
  545. if (msg_len < 10)
  546. goto bad_data;
  547. else if (len - off >= 13) {
  548. uint32_t index = net_read32(buf + off + 5);
  549. uint32_t begin = net_read32(buf + off + 9);
  550. uint32_t length = msg_len - 9;
  551. if (len - off >= msg_len + 4) {
  552. p->tp->downloaded += length;
  553. p->rate_to_me[btpd.seconds % RATEHISTORY] += length;
  554. peer_on_piece(p, index, begin, length, buf + off + 13);
  555. } else {
  556. struct piece_reader *rp;
  557. size_t mem = sizeof(*rp) + length;
  558. p->reader->kill(p->reader);
  559. rp = btpd_calloc(1, mem);
  560. rp->rd.kill = kill_piece;
  561. rp->rd.read = read_piece;
  562. rp->index = index;
  563. rp->begin = begin;
  564. rp->iob.buf = (char *)rp + sizeof(*rp);
  565. rp->iob.buf_len = length;
  566. rp->iob.buf_off = len - off - 13;
  567. bcopy(buf + off + 13, rp->iob.buf, rp->iob.buf_off);
  568. p->reader = (struct input_reader *)rp;
  569. event_add(&p->in_ev, NULL);
  570. p->tp->downloaded += rp->iob.buf_off;
  571. p->rate_to_me[btpd.seconds % RATEHISTORY] +=
  572. rp->iob.buf_off;
  573. return nread;
  574. }
  575. } else
  576. got_part = 1;
  577. break;
  578. case MSG_CANCEL:
  579. if (msg_len != 13)
  580. goto bad_data;
  581. else if (len - off >= msg_len + 4) {
  582. uint32_t index = net_read32(buf + off + 5);
  583. uint32_t begin = net_read32(buf + off + 9);
  584. uint32_t length = net_read32(buf + off + 13);
  585. if (index > p->tp->meta.npieces)
  586. goto bad_data;
  587. if (begin + length > torrent_piece_size(p->tp, index))
  588. goto bad_data;
  589. btpd_log(BTPD_L_MSG, "cancel: %u, %u, %u\n",
  590. index, begin, length);
  591. peer_on_cancel(p, index, begin, length);
  592. } else
  593. got_part = 1;
  594. break;
  595. default:
  596. goto bad_data;
  597. }
  598. if (!got_part)
  599. off += 4 + msg_len;
  600. }
  601. if (off != len) {
  602. gr->iob.buf_off = len - off;
  603. assert(gr->iob.buf_off <= gr->iob.buf_len);
  604. bcopy(buf + off, gr->iob.buf, gr->iob.buf_off);
  605. }
  606. event_add(&p->in_ev, NULL);
  607. return nread;
  608. bad_data:
  609. btpd_log(BTPD_L_MSG, "bad data\n");
  610. peer_kill(p);
  611. return nread;
  612. }
  613. static void
  614. net_generic_reader(struct peer *p)
  615. {
  616. struct generic_reader *gr;
  617. gr = btpd_calloc(1, sizeof(*gr));
  618. gr->rd.read = net_generic_read;
  619. gr->rd.kill = kill_generic;
  620. gr->iob.buf = gr->_io_buf;
  621. gr->iob.buf_len = MAX_INPUT_LEFT;
  622. gr->iob.buf_off = 0;
  623. p->reader = (struct input_reader *)gr;
  624. event_add(&p->in_ev, NULL);
  625. }
  626. static unsigned long
  627. net_shake_read(struct peer *p, unsigned long rmax)
  628. {
  629. struct handshake *hs = (struct handshake *)p->reader;
  630. struct io_buffer *in = &hs->in;
  631. size_t nread = net_read_to_buf(p, in, rmax);
  632. if (nread == 0)
  633. return 0;
  634. switch (hs->state) {
  635. case SHAKE_INIT:
  636. if (in->buf_off < 20)
  637. break;
  638. else if (bcmp(in->buf, "\x13""BitTorrent protocol", 20) == 0)
  639. hs->state = SHAKE_PSTR;
  640. else
  641. goto bad_shake;
  642. case SHAKE_PSTR:
  643. if (in->buf_off < 28)
  644. break;
  645. else
  646. hs->state = SHAKE_RESERVED;
  647. case SHAKE_RESERVED:
  648. if (in->buf_off < 48)
  649. break;
  650. else if (hs->incoming) {
  651. struct torrent *tp = torrent_get_by_hash(in->buf + 28);
  652. if (tp != NULL) {
  653. hs->state = SHAKE_INFO;
  654. p->tp = tp;
  655. net_send_shake(p);
  656. } else
  657. goto bad_shake;
  658. } else {
  659. if (bcmp(in->buf + 28, p->tp->meta.info_hash, 20) == 0)
  660. hs->state = SHAKE_INFO;
  661. else
  662. goto bad_shake;
  663. }
  664. case SHAKE_INFO:
  665. if (in->buf_off < 68)
  666. break;
  667. else {
  668. if (torrent_has_peer(p->tp, in->buf + 48))
  669. goto bad_shake; // Not really, but we're already connected.
  670. else if (bcmp(in->buf + 48, btpd.peer_id, 20) == 0)
  671. goto bad_shake; // Connection from myself.
  672. bcopy(in->buf + 48, p->id, 20);
  673. hs->state = SHAKE_ID;
  674. }
  675. default:
  676. assert(hs->state == SHAKE_ID);
  677. }
  678. if (hs->state == SHAKE_ID) {
  679. btpd_log(BTPD_L_CONN, "Got whole shake.\n");
  680. free(hs);
  681. p->piece_field = btpd_calloc(1, (int)ceil(p->tp->meta.npieces / 8.0));
  682. cm_on_new_peer(p);
  683. net_generic_reader(p);
  684. if (p->tp->have_npieces > 0) {
  685. if (p->tp->have_npieces * 9 < 5 + ceil(p->tp->meta.npieces / 8.0))
  686. net_send_multihave(p);
  687. else
  688. net_send_bitfield(p);
  689. }
  690. } else
  691. event_add(&p->in_ev, NULL);
  692. return nread;
  693. bad_shake:
  694. btpd_log(BTPD_L_CONN, "Bad shake(%d)\n", hs->state);
  695. peer_kill(p);
  696. return nread;
  697. }
  698. void
  699. net_handshake(struct peer *p, int incoming)
  700. {
  701. struct handshake *hs;
  702. hs = calloc(1, sizeof(*hs));
  703. hs->incoming = incoming;
  704. hs->state = SHAKE_INIT;
  705. hs->in.buf_len = SHAKE_LEN;
  706. hs->in.buf_off = 0;
  707. hs->in.buf = hs->_io_buf;
  708. p->reader = (struct input_reader *)hs;
  709. hs->rd.read = net_shake_read;
  710. hs->rd.kill = kill_shake;
  711. if (!incoming)
  712. net_send_shake(p);
  713. }
  714. int
  715. net_connect2(struct sockaddr *sa, socklen_t salen, int *sd)
  716. {
  717. if ((*sd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  718. return errno;
  719. set_nonblocking(*sd);
  720. if (connect(*sd, sa, salen) == -1 && errno != EINPROGRESS) {
  721. btpd_log(BTPD_L_CONN, "Botched connection %s.", strerror(errno));
  722. close(*sd);
  723. return errno;
  724. }
  725. return 0;
  726. }
  727. int
  728. net_connect(const char *ip, int port, int *sd)
  729. {
  730. struct addrinfo hints, *res;
  731. char portstr[6];
  732. assert(btpd.npeers < btpd.maxpeers);
  733. if (snprintf(portstr, sizeof(portstr), "%d", port) >= sizeof(portstr))
  734. return EINVAL;
  735. bzero(&hints, sizeof(hints));
  736. hints.ai_family = AF_UNSPEC;
  737. hints.ai_flags = AI_NUMERICHOST;
  738. hints.ai_socktype = SOCK_STREAM;
  739. if (getaddrinfo(ip, portstr, &hints, &res) != 0)
  740. return errno;
  741. int error = net_connect2(res->ai_addr, res->ai_addrlen, sd);
  742. freeaddrinfo(res);
  743. return error;
  744. }
  745. void
  746. net_connection_cb(int sd, short type, void *arg)
  747. {
  748. int nsd;
  749. nsd = accept(sd, NULL, NULL);
  750. if (nsd < 0) {
  751. if (errno == EWOULDBLOCK || errno == ECONNABORTED)
  752. return;
  753. else
  754. btpd_err("accept4: %s\n", strerror(errno));
  755. }
  756. if (set_nonblocking(nsd) != 0) {
  757. close(nsd);
  758. return;
  759. }
  760. assert(btpd.npeers <= btpd.maxpeers);
  761. if (btpd.npeers == btpd.maxpeers) {
  762. close(nsd);
  763. return;
  764. }
  765. peer_create_in(nsd);
  766. btpd_log(BTPD_L_CONN, "got connection.\n");
  767. }
  768. void
  769. net_bw_rate(void)
  770. {
  771. unsigned sum = 0;
  772. for (int i = 0; i < BWCALLHISTORY - 1; i++) {
  773. btpd.bwrate[i] = btpd.bwrate[i + 1];
  774. sum += btpd.bwrate[i];
  775. }
  776. btpd.bwrate[BWCALLHISTORY - 1] = btpd.bwcalls;
  777. sum += btpd.bwrate[BWCALLHISTORY - 1];
  778. btpd.bwcalls = 0;
  779. btpd.bw_hz_avg = sum / 5.0;
  780. }
  781. void
  782. net_bw_cb(int sd, short type, void *arg)
  783. {
  784. struct peer *p;
  785. btpd.bwcalls++;
  786. double avg_hz;
  787. if (btpd.seconds < BWCALLHISTORY)
  788. avg_hz = btpd.bw_hz;
  789. else
  790. avg_hz = btpd.bw_hz_avg;
  791. btpd.obw_left = btpd.obwlim / avg_hz;
  792. btpd.ibw_left = btpd.ibwlim / avg_hz;
  793. if (btpd.ibwlim > 0) {
  794. while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left > 0) {
  795. BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
  796. p->flags &= ~PF_ON_READQ;
  797. btpd.ibw_left -= p->reader->read(p, btpd.ibw_left);
  798. }
  799. } else {
  800. while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL) {
  801. BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
  802. p->flags &= ~PF_ON_READQ;
  803. p->reader->read(p, 0);
  804. }
  805. }
  806. if (btpd.obwlim) {
  807. while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL && btpd.obw_left > 0) {
  808. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  809. p->flags &= ~PF_ON_WRITEQ;
  810. btpd.obw_left -= net_write(p, btpd.obw_left);
  811. }
  812. } else {
  813. while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL) {
  814. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  815. p->flags &= ~PF_ON_WRITEQ;
  816. net_write(p, 0);
  817. }
  818. }
  819. event_add(&btpd.bwlim, (& (struct timeval) { 0, 1000000 / btpd.bw_hz }));
  820. }