A clone of btpd with my configuration changes.

928 line
21 KiB

  1. #include <sys/types.h>
  2. #include <sys/uio.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <netdb.h>
  6. #include <sys/mman.h>
  7. #include <sys/wait.h>
  8. #include <assert.h>
  9. #include <errno.h>
  10. #include <fcntl.h>
  11. #include <math.h>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <unistd.h>
  16. #include "btpd.h"
  17. #define WRITE_TIMEOUT (& (struct timeval) { 60, 0 })
  18. #define min(x, y) ((x) <= (y) ? (x) : (y))
  19. static unsigned long
  20. net_write(struct peer *p, unsigned long wmax);
  21. void
  22. net_read_cb(int sd, short type, void *arg)
  23. {
  24. struct peer *p = (struct peer *)arg;
  25. if (btpd.ibwlim == 0) {
  26. p->reader->read(p, 0);
  27. } else if (btpd.ibw_left > 0) {
  28. btpd.ibw_left -= p->reader->read(p, btpd.ibw_left);
  29. } else {
  30. p->flags |= PF_ON_READQ;
  31. BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry);
  32. }
  33. }
  34. void
  35. net_write_cb(int sd, short type, void *arg)
  36. {
  37. struct peer *p = (struct peer *)arg;
  38. if (type == EV_TIMEOUT) {
  39. btpd_log(BTPD_L_ERROR, "Write attempt timed out.\n");
  40. peer_kill(p);
  41. return;
  42. }
  43. if (btpd.obwlim == 0) {
  44. net_write(p, 0);
  45. } else if (btpd.obw_left > 0) {
  46. btpd.obw_left -= net_write(p, btpd.obw_left);
  47. } else {
  48. p->flags |= PF_ON_WRITEQ;
  49. BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry);
  50. }
  51. }
  52. static void
  53. nokill_iob(struct io_buffer *iob)
  54. {
  55. //Nothing
  56. }
  57. static void
  58. kill_free_buf(struct io_buffer *iob)
  59. {
  60. free(iob->buf);
  61. }
  62. static struct iob_link *
  63. malloc_liob(size_t len)
  64. {
  65. struct iob_link *iol;
  66. iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol) + len);
  67. iol->iob.buf = (char *)(iol + 1);
  68. iol->iob.buf_len = len;
  69. iol->iob.buf_off = 0;
  70. iol->kill_buf = nokill_iob;
  71. return iol;
  72. }
  73. static struct iob_link *
  74. salloc_liob(char *buf, size_t len, void (*kill_buf)(struct io_buffer *))
  75. {
  76. struct iob_link *iol;
  77. iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol));
  78. iol->iob.buf = buf;
  79. iol->iob.buf_len = len;
  80. iol->iob.buf_off = 0;
  81. iol->kill_buf = kill_buf;
  82. return iol;
  83. }
  84. void
  85. net_unsend_piece(struct peer *p, struct piece_req *req)
  86. {
  87. struct iob_link *piece;
  88. BTPDQ_REMOVE(&p->p_reqs, req, entry);
  89. piece = BTPDQ_NEXT(req->head, entry);
  90. BTPDQ_REMOVE(&p->outq, piece, entry);
  91. piece->kill_buf(&piece->iob);
  92. free(piece);
  93. BTPDQ_REMOVE(&p->outq, req->head, entry);
  94. req->head->kill_buf(&req->head->iob);
  95. free(req->head);
  96. free(req);
  97. if (BTPDQ_EMPTY(&p->outq)) {
  98. if (p->flags & PF_ON_WRITEQ) {
  99. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  100. p->flags &= ~PF_ON_WRITEQ;
  101. } else
  102. event_del(&p->out_ev);
  103. }
  104. }
  105. void
  106. kill_shake(struct input_reader *reader)
  107. {
  108. free(reader);
  109. }
  110. #define NIOV 16
  111. static unsigned long
  112. net_write(struct peer *p, unsigned long wmax)
  113. {
  114. struct iob_link *iol;
  115. struct piece_req *req;
  116. struct iovec iov[NIOV];
  117. int niov;
  118. int limited;
  119. ssize_t nwritten;
  120. unsigned long bcount;
  121. limited = wmax > 0;
  122. niov = 0;
  123. assert((iol = BTPDQ_FIRST(&p->outq)) != NULL);
  124. while (niov < NIOV && iol != NULL
  125. && (!limited || (limited && wmax > 0))) {
  126. iov[niov].iov_base = iol->iob.buf + iol->iob.buf_off;
  127. iov[niov].iov_len = iol->iob.buf_len - iol->iob.buf_off;
  128. if (limited) {
  129. if (iov[niov].iov_len > wmax)
  130. iov[niov].iov_len = wmax;
  131. wmax -= iov[niov].iov_len;
  132. }
  133. niov++;
  134. iol = BTPDQ_NEXT(iol, entry);
  135. }
  136. nwritten = writev(p->sd, iov, niov);
  137. if (nwritten < 0) {
  138. if (errno == EAGAIN) {
  139. event_add(&p->out_ev, WRITE_TIMEOUT);
  140. return 0;
  141. } else {
  142. btpd_log(BTPD_L_CONN, "write error: %s\n", strerror(errno));
  143. peer_kill(p);
  144. return 0;
  145. }
  146. } else if (nwritten == 0) {
  147. btpd_log(BTPD_L_CONN, "connection close by peer.\n");
  148. peer_kill(p);
  149. return 0;
  150. }
  151. bcount = nwritten;
  152. req = BTPDQ_FIRST(&p->p_reqs);
  153. iol = BTPDQ_FIRST(&p->outq);
  154. while (bcount > 0) {
  155. unsigned long bufdelta = iol->iob.buf_len - iol->iob.buf_off;
  156. if (req != NULL && req->head == iol) {
  157. struct piece_req *next = BTPDQ_NEXT(req, entry);
  158. BTPDQ_REMOVE(&p->p_reqs, req, entry);
  159. free(req);
  160. req = next;
  161. }
  162. if (bcount >= bufdelta) {
  163. if (iol->upload) {
  164. p->tp->uploaded += bufdelta;
  165. p->rate_from_me[btpd.seconds % RATEHISTORY] += bufdelta;
  166. }
  167. bcount -= bufdelta;
  168. BTPDQ_REMOVE(&p->outq, iol, entry);
  169. iol->kill_buf(&iol->iob);
  170. free(iol);
  171. iol = BTPDQ_FIRST(&p->outq);
  172. } else {
  173. if (iol->upload) {
  174. p->tp->uploaded += bcount;
  175. p->rate_from_me[btpd.seconds % RATEHISTORY] += bcount;
  176. }
  177. iol->iob.buf_off += bcount;
  178. bcount = 0;
  179. }
  180. }
  181. if (!BTPDQ_EMPTY(&p->outq))
  182. event_add(&p->out_ev, WRITE_TIMEOUT);
  183. else if (p->flags & PF_WRITE_CLOSE) {
  184. btpd_log(BTPD_L_CONN, "Closed because of write flag.\n");
  185. peer_kill(p);
  186. }
  187. return nwritten;
  188. }
  189. void
  190. net_send(struct peer *p, struct iob_link *iol)
  191. {
  192. if (BTPDQ_EMPTY(&p->outq))
  193. event_add(&p->out_ev, WRITE_TIMEOUT);
  194. BTPDQ_INSERT_TAIL(&p->outq, iol, entry);
  195. }
  196. void
  197. net_write32(void *buf, uint32_t num)
  198. {
  199. *(uint32_t *)buf = htonl(num);
  200. }
  201. uint32_t
  202. net_read32(void *buf)
  203. {
  204. return ntohl(*(uint32_t *)buf);
  205. }
  206. void
  207. net_send_piece(struct peer *p, uint32_t index, uint32_t begin,
  208. char *block, size_t blen)
  209. {
  210. struct iob_link *head, *piece;
  211. struct piece_req *req;
  212. btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen);
  213. head = malloc_liob(13);
  214. net_write32(head->iob.buf, 9 + blen);
  215. head->iob.buf[4] = MSG_PIECE;
  216. net_write32(head->iob.buf + 5, index);
  217. net_write32(head->iob.buf + 9, begin);
  218. net_send(p, head);
  219. piece = salloc_liob(block, blen, kill_free_buf);
  220. piece->upload = 1;
  221. net_send(p, piece);
  222. req = btpd_malloc(sizeof(*req));
  223. req->index = index;
  224. req->begin = begin;
  225. req->length = blen;
  226. req->head = head;
  227. BTPDQ_INSERT_TAIL(&p->p_reqs, req, entry);
  228. }
  229. void
  230. net_send_request(struct peer *p, struct piece_req *req)
  231. {
  232. struct iob_link *out;
  233. out = malloc_liob(17);
  234. net_write32(out->iob.buf, 13);
  235. out->iob.buf[4] = MSG_REQUEST;
  236. net_write32(out->iob.buf + 5, req->index);
  237. net_write32(out->iob.buf + 9, req->begin);
  238. net_write32(out->iob.buf + 13, req->length);
  239. net_send(p, out);
  240. }
  241. void
  242. net_send_cancel(struct peer *p, struct piece_req *req)
  243. {
  244. struct iob_link *out;
  245. out = malloc_liob(17);
  246. net_write32(out->iob.buf, 13);
  247. out->iob.buf[4] = MSG_CANCEL;
  248. net_write32(out->iob.buf + 5, req->index);
  249. net_write32(out->iob.buf + 9, req->begin);
  250. net_write32(out->iob.buf + 13, req->length);
  251. net_send(p, out);
  252. }
  253. void
  254. net_send_have(struct peer *p, uint32_t index)
  255. {
  256. struct iob_link *out;
  257. out = malloc_liob(9);
  258. net_write32(out->iob.buf, 5);
  259. out->iob.buf[4] = MSG_HAVE;
  260. net_write32(out->iob.buf + 5, index);
  261. net_send(p, out);
  262. }
  263. void
  264. net_send_multihave(struct peer *p)
  265. {
  266. struct torrent *tp = p->tp;
  267. struct iob_link *out;
  268. out = malloc_liob(9 * tp->have_npieces);
  269. for (uint32_t i = 0, count = 0; count < tp->have_npieces; i++) {
  270. if (has_bit(tp->piece_field, i)) {
  271. net_write32(out->iob.buf + count * 9, 5);
  272. out->iob.buf[count * 9 + 4] = MSG_HAVE;
  273. net_write32(out->iob.buf + count * 9 + 5, i);
  274. count++;
  275. }
  276. }
  277. net_send(p, out);
  278. }
  279. void
  280. net_send_onesized(struct peer *p, char type)
  281. {
  282. struct iob_link *out;
  283. out = malloc_liob(5);
  284. net_write32(out->iob.buf, 1);
  285. out->iob.buf[4] = type;
  286. net_send(p, out);
  287. }
  288. void
  289. net_send_unchoke(struct peer *p)
  290. {
  291. net_send_onesized(p, MSG_UNCHOKE);
  292. }
  293. void
  294. net_send_choke(struct peer *p)
  295. {
  296. net_send_onesized(p, MSG_CHOKE);
  297. }
  298. void
  299. net_send_uninterest(struct peer *p)
  300. {
  301. net_send_onesized(p, MSG_UNINTEREST);
  302. }
  303. void
  304. net_send_interest(struct peer *p)
  305. {
  306. net_send_onesized(p, MSG_INTEREST);
  307. }
  308. void
  309. net_send_bitfield(struct peer *p)
  310. {
  311. struct iob_link *out;
  312. uint32_t plen;
  313. plen = (uint32_t)ceil(p->tp->meta.npieces / 8.0);
  314. out = malloc_liob(5);
  315. net_write32(out->iob.buf, plen + 1);
  316. out->iob.buf[4] = MSG_BITFIELD;
  317. net_send(p, out);
  318. out = salloc_liob(p->tp->piece_field, plen, nokill_iob);
  319. net_send(p, out);
  320. }
  321. void
  322. net_send_shake(struct peer *p)
  323. {
  324. struct iob_link *out;
  325. out = malloc_liob(68);
  326. bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->iob.buf, 28);
  327. bcopy(p->tp->meta.info_hash, out->iob.buf + 28, 20);
  328. bcopy(btpd.peer_id, out->iob.buf + 48, 20);
  329. net_send(p, out);
  330. }
  331. static void
  332. kill_generic(struct input_reader *reader)
  333. {
  334. free(reader);
  335. }
  336. static size_t
  337. net_read(struct peer *p, char *buf, size_t len)
  338. {
  339. ssize_t nread = read(p->sd, buf, len);
  340. if (nread < 0) {
  341. if (errno == EAGAIN) {
  342. event_add(&p->in_ev, NULL);
  343. return 0;
  344. } else {
  345. btpd_log(BTPD_L_CONN, "read error: %s\n", strerror(errno));
  346. peer_kill(p);
  347. return 0;
  348. }
  349. } else if (nread == 0) {
  350. btpd_log(BTPD_L_CONN, "conn closed by other side.\n");
  351. if (!BTPDQ_EMPTY(&p->outq))
  352. p->flags |= PF_WRITE_CLOSE;
  353. else
  354. peer_kill(p);
  355. return 0;
  356. } else
  357. return nread;
  358. }
  359. static size_t
  360. net_read_to_buf(struct peer *p, struct io_buffer *iob, unsigned long rmax)
  361. {
  362. if (rmax == 0)
  363. rmax = iob->buf_len - iob->buf_off;
  364. else
  365. rmax = min(rmax, iob->buf_len - iob->buf_off);
  366. assert(rmax > 0);
  367. size_t nread = net_read(p, iob->buf + iob->buf_off, rmax);
  368. if (nread > 0)
  369. iob->buf_off += nread;
  370. return nread;
  371. }
  372. void
  373. kill_bitfield(struct input_reader *rd)
  374. {
  375. free(rd);
  376. }
  377. static void net_generic_reader(struct peer *p);
  378. static unsigned long
  379. read_bitfield(struct peer *p, unsigned long rmax)
  380. {
  381. struct bitfield_reader *rd = (struct bitfield_reader *)p->reader;
  382. size_t nread = net_read_to_buf(p, &rd->iob, rmax);
  383. if (nread == 0)
  384. return 0;
  385. if (rd->iob.buf_off == rd->iob.buf_len) {
  386. peer_on_bitfield(p, rd->iob.buf);
  387. free(rd);
  388. net_generic_reader(p);
  389. } else
  390. event_add(&p->in_ev, NULL);
  391. return nread;
  392. }
  393. void
  394. kill_piece(struct input_reader *rd)
  395. {
  396. free(rd);
  397. }
  398. static unsigned long
  399. read_piece(struct peer *p, unsigned long rmax)
  400. {
  401. struct piece_reader *rd = (struct piece_reader *)p->reader;
  402. size_t nread = net_read_to_buf(p, &rd->iob, rmax);
  403. if (nread == 0)
  404. return 0;
  405. p->rate_to_me[btpd.seconds % RATEHISTORY] += nread;
  406. p->tp->downloaded += nread;
  407. if (rd->iob.buf_off == rd->iob.buf_len) {
  408. peer_on_piece(p, rd->index, rd->begin, rd->iob.buf_len, rd->iob.buf);
  409. free(rd);
  410. net_generic_reader(p);
  411. } else
  412. event_add(&p->in_ev, NULL);
  413. return nread;
  414. }
  415. #define GRBUFLEN (1 << 15)
  416. static unsigned long
  417. net_generic_read(struct peer *p, unsigned long rmax)
  418. {
  419. char buf[GRBUFLEN];
  420. struct io_buffer iob = { 0, GRBUFLEN, buf };
  421. struct generic_reader *gr = (struct generic_reader *)p->reader;
  422. size_t nread;
  423. size_t off, len;
  424. int got_part;
  425. if (gr->iob.buf_off > 0) {
  426. iob.buf_off = gr->iob.buf_off;
  427. bcopy(gr->iob.buf, iob.buf, iob.buf_off);
  428. gr->iob.buf_off = 0;
  429. }
  430. if ((nread = net_read_to_buf(p, &iob, rmax)) == 0)
  431. return 0;
  432. len = iob.buf_off;
  433. off = 0;
  434. got_part = 0;
  435. while (!got_part && len - off >= 4) {
  436. size_t msg_len = net_read32(buf + off);
  437. if (msg_len == 0) { /* Keep alive */
  438. off += 4;
  439. continue;
  440. }
  441. if (len - off < 5) {
  442. got_part = 1;
  443. break;
  444. }
  445. switch (buf[off + 4]) {
  446. case MSG_CHOKE:
  447. btpd_log(BTPD_L_MSG, "choke.\n");
  448. if (msg_len != 1)
  449. goto bad_data;
  450. peer_on_choke(p);
  451. break;
  452. case MSG_UNCHOKE:
  453. btpd_log(BTPD_L_MSG, "unchoke.\n");
  454. if (msg_len != 1)
  455. goto bad_data;
  456. peer_on_unchoke(p);
  457. break;
  458. case MSG_INTEREST:
  459. btpd_log(BTPD_L_MSG, "interested.\n");
  460. if (msg_len != 1)
  461. goto bad_data;
  462. peer_on_interest(p);
  463. break;
  464. case MSG_UNINTEREST:
  465. btpd_log(BTPD_L_MSG, "uninterested.\n");
  466. if (msg_len != 1)
  467. goto bad_data;
  468. peer_on_uninterest(p);
  469. break;
  470. case MSG_HAVE:
  471. btpd_log(BTPD_L_MSG, "have.\n");
  472. if (msg_len != 5)
  473. goto bad_data;
  474. else if (len - off >= msg_len + 4) {
  475. uint32_t index = net_read32(buf + off + 5);
  476. peer_on_have(p, index);
  477. } else
  478. got_part = 1;
  479. break;
  480. case MSG_BITFIELD:
  481. btpd_log(BTPD_L_MSG, "bitfield.\n");
  482. if (msg_len != (size_t)ceil(p->tp->meta.npieces / 8.0) + 1)
  483. goto bad_data;
  484. else if (p->npieces != 0)
  485. goto bad_data;
  486. else if (len - off >= msg_len + 4)
  487. peer_on_bitfield(p, buf + off + 5);
  488. else {
  489. struct bitfield_reader *rp;
  490. size_t mem = sizeof(*rp) + msg_len - 1;
  491. p->reader->kill(p->reader);
  492. rp = btpd_calloc(1, mem);
  493. rp->rd.kill = kill_bitfield;
  494. rp->rd.read = read_bitfield;
  495. rp->iob.buf = (char *)rp + sizeof(*rp);
  496. rp->iob.buf_len = msg_len - 1;
  497. rp->iob.buf_off = len - off - 5;
  498. bcopy(buf + off + 5, rp->iob.buf, rp->iob.buf_off);
  499. p->reader = (struct input_reader *)rp;
  500. event_add(&p->in_ev, NULL);
  501. return nread;
  502. }
  503. break;
  504. case MSG_REQUEST:
  505. btpd_log(BTPD_L_MSG, "request.\n");
  506. if (msg_len != 13)
  507. goto bad_data;
  508. else if (len - off >= msg_len + 4) {
  509. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) != PF_P_WANT)
  510. break;
  511. uint32_t index, begin, length;
  512. index = net_read32(buf + off + 5);
  513. begin = net_read32(buf + off + 9);
  514. length = net_read32(buf + off + 13);
  515. if (length > (1 << 15))
  516. goto bad_data;
  517. if (index >= p->tp->meta.npieces)
  518. goto bad_data;
  519. if (!has_bit(p->tp->piece_field, index))
  520. goto bad_data;
  521. if (begin + length > torrent_piece_size(p->tp, index))
  522. goto bad_data;
  523. peer_on_request(p, index, begin, length);
  524. } else
  525. got_part = 1;
  526. break;
  527. case MSG_PIECE:
  528. btpd_log(BTPD_L_MSG, "piece.\n");
  529. if (msg_len < 10)
  530. goto bad_data;
  531. else if (len - off >= 13) {
  532. uint32_t index = net_read32(buf + off + 5);
  533. uint32_t begin = net_read32(buf + off + 9);
  534. uint32_t length = msg_len - 9;
  535. if (len - off >= msg_len + 4) {
  536. p->tp->downloaded += length;
  537. p->rate_to_me[btpd.seconds % RATEHISTORY] += length;
  538. peer_on_piece(p, index, begin, length, buf + off + 13);
  539. } else {
  540. struct piece_reader *rp;
  541. size_t mem = sizeof(*rp) + length;
  542. p->reader->kill(p->reader);
  543. rp = btpd_calloc(1, mem);
  544. rp->rd.kill = kill_piece;
  545. rp->rd.read = read_piece;
  546. rp->index = index;
  547. rp->begin = begin;
  548. rp->iob.buf = (char *)rp + sizeof(*rp);
  549. rp->iob.buf_len = length;
  550. rp->iob.buf_off = len - off - 13;
  551. bcopy(buf + off + 13, rp->iob.buf, rp->iob.buf_off);
  552. p->reader = (struct input_reader *)rp;
  553. event_add(&p->in_ev, NULL);
  554. p->tp->downloaded += rp->iob.buf_off;
  555. p->rate_to_me[btpd.seconds % RATEHISTORY] +=
  556. rp->iob.buf_off;
  557. return nread;
  558. }
  559. } else
  560. got_part = 1;
  561. break;
  562. case MSG_CANCEL:
  563. if (msg_len != 13)
  564. goto bad_data;
  565. else if (len - off >= msg_len + 4) {
  566. uint32_t index = net_read32(buf + off + 5);
  567. uint32_t begin = net_read32(buf + off + 9);
  568. uint32_t length = net_read32(buf + off + 13);
  569. if (index > p->tp->meta.npieces)
  570. goto bad_data;
  571. if (begin + length > torrent_piece_size(p->tp, index))
  572. goto bad_data;
  573. btpd_log(BTPD_L_MSG, "cancel: %u, %u, %u\n",
  574. index, begin, length);
  575. peer_on_cancel(p, index, begin, length);
  576. } else
  577. got_part = 1;
  578. break;
  579. default:
  580. goto bad_data;
  581. }
  582. if (!got_part)
  583. off += 4 + msg_len;
  584. }
  585. if (off != len) {
  586. gr->iob.buf_off = len - off;
  587. assert(gr->iob.buf_off <= gr->iob.buf_len);
  588. bcopy(buf + off, gr->iob.buf, gr->iob.buf_off);
  589. }
  590. event_add(&p->in_ev, NULL);
  591. return nread;
  592. bad_data:
  593. btpd_log(BTPD_L_MSG, "bad data\n");
  594. peer_kill(p);
  595. return nread;
  596. }
  597. static void
  598. net_generic_reader(struct peer *p)
  599. {
  600. struct generic_reader *gr;
  601. gr = btpd_calloc(1, sizeof(*gr));
  602. gr->rd.read = net_generic_read;
  603. gr->rd.kill = kill_generic;
  604. gr->iob.buf = gr->_io_buf;
  605. gr->iob.buf_len = MAX_INPUT_LEFT;
  606. gr->iob.buf_off = 0;
  607. p->reader = (struct input_reader *)gr;
  608. event_add(&p->in_ev, NULL);
  609. }
  610. static unsigned long
  611. net_shake_read(struct peer *p, unsigned long rmax)
  612. {
  613. struct handshake *hs = (struct handshake *)p->reader;
  614. struct io_buffer *in = &hs->in;
  615. size_t nread = net_read_to_buf(p, in, rmax);
  616. if (nread == 0)
  617. return 0;
  618. switch (hs->state) {
  619. case SHAKE_INIT:
  620. if (in->buf_off < 20)
  621. break;
  622. else if (bcmp(in->buf, "\x13""BitTorrent protocol", 20) == 0)
  623. hs->state = SHAKE_PSTR;
  624. else
  625. goto bad_shake;
  626. case SHAKE_PSTR:
  627. if (in->buf_off < 28)
  628. break;
  629. else
  630. hs->state = SHAKE_RESERVED;
  631. case SHAKE_RESERVED:
  632. if (in->buf_off < 48)
  633. break;
  634. else if (hs->incoming) {
  635. struct torrent *tp = torrent_get_by_hash(in->buf + 28);
  636. if (tp != NULL) {
  637. hs->state = SHAKE_INFO;
  638. p->tp = tp;
  639. net_send_shake(p);
  640. } else
  641. goto bad_shake;
  642. } else {
  643. if (bcmp(in->buf + 28, p->tp->meta.info_hash, 20) == 0)
  644. hs->state = SHAKE_INFO;
  645. else
  646. goto bad_shake;
  647. }
  648. case SHAKE_INFO:
  649. if (in->buf_off < 68)
  650. break;
  651. else {
  652. if (torrent_has_peer(p->tp, in->buf + 48))
  653. goto bad_shake; // Not really, but we're already connected.
  654. else if (bcmp(in->buf + 48, btpd.peer_id, 20) == 0)
  655. goto bad_shake; // Connection from myself.
  656. bcopy(in->buf + 48, p->id, 20);
  657. hs->state = SHAKE_ID;
  658. }
  659. default:
  660. assert(hs->state == SHAKE_ID);
  661. }
  662. if (hs->state == SHAKE_ID) {
  663. btpd_log(BTPD_L_CONN, "Got whole shake.\n");
  664. free(hs);
  665. p->piece_field = btpd_calloc(1, (int)ceil(p->tp->meta.npieces / 8.0));
  666. cm_on_new_peer(p);
  667. net_generic_reader(p);
  668. if (p->tp->have_npieces > 0) {
  669. if (p->tp->have_npieces * 9 < 5 + ceil(p->tp->meta.npieces / 8.0))
  670. net_send_multihave(p);
  671. else
  672. net_send_bitfield(p);
  673. }
  674. } else
  675. event_add(&p->in_ev, NULL);
  676. return nread;
  677. bad_shake:
  678. btpd_log(BTPD_L_CONN, "Bad shake(%d)\n", hs->state);
  679. peer_kill(p);
  680. return nread;
  681. }
  682. void
  683. net_handshake(struct peer *p, int incoming)
  684. {
  685. struct handshake *hs;
  686. hs = calloc(1, sizeof(*hs));
  687. hs->incoming = incoming;
  688. hs->state = SHAKE_INIT;
  689. hs->in.buf_len = SHAKE_LEN;
  690. hs->in.buf_off = 0;
  691. hs->in.buf = hs->_io_buf;
  692. p->reader = (struct input_reader *)hs;
  693. hs->rd.read = net_shake_read;
  694. hs->rd.kill = kill_shake;
  695. if (!incoming)
  696. net_send_shake(p);
  697. }
  698. int
  699. net_connect2(struct sockaddr *sa, socklen_t salen, int *sd)
  700. {
  701. if ((*sd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  702. return errno;
  703. set_nonblocking(*sd);
  704. if (connect(*sd, sa, salen) == -1 && errno != EINPROGRESS) {
  705. btpd_log(BTPD_L_CONN, "Botched connection %s.", strerror(errno));
  706. close(*sd);
  707. return errno;
  708. }
  709. return 0;
  710. }
  711. int
  712. net_connect(const char *ip, int port, int *sd)
  713. {
  714. struct addrinfo hints, *res;
  715. char portstr[6];
  716. assert(btpd.npeers < btpd.maxpeers);
  717. if (snprintf(portstr, sizeof(portstr), "%d", port) >= sizeof(portstr))
  718. return EINVAL;
  719. bzero(&hints, sizeof(hints));
  720. hints.ai_family = AF_UNSPEC;
  721. hints.ai_flags = AI_NUMERICHOST;
  722. hints.ai_socktype = SOCK_STREAM;
  723. if (getaddrinfo(ip, portstr, &hints, &res) != 0)
  724. return errno;
  725. int error = net_connect2(res->ai_addr, res->ai_addrlen, sd);
  726. freeaddrinfo(res);
  727. return error;
  728. }
  729. void
  730. net_connection_cb(int sd, short type, void *arg)
  731. {
  732. int nsd;
  733. nsd = accept(sd, NULL, NULL);
  734. if (nsd < 0) {
  735. if (errno == EWOULDBLOCK || errno == ECONNABORTED)
  736. return;
  737. else
  738. btpd_err("accept4: %s\n", strerror(errno));
  739. }
  740. if (set_nonblocking(nsd) != 0) {
  741. close(nsd);
  742. return;
  743. }
  744. assert(btpd.npeers <= btpd.maxpeers);
  745. if (btpd.npeers == btpd.maxpeers) {
  746. close(nsd);
  747. return;
  748. }
  749. peer_create_in(nsd);
  750. btpd_log(BTPD_L_CONN, "got connection.\n");
  751. }
  752. void
  753. net_by_second(void)
  754. {
  755. struct peer *p;
  756. struct torrent *tp;
  757. int ri = btpd.seconds % RATEHISTORY;
  758. BTPDQ_FOREACH(tp, &btpd.cm_list, entry) {
  759. BTPDQ_FOREACH(p, &tp->peers, cm_entry) {
  760. p->rate_to_me[ri] = 0;
  761. p->rate_from_me[ri] = 0;
  762. }
  763. }
  764. }
  765. void
  766. net_bw_rate(void)
  767. {
  768. unsigned sum = 0;
  769. for (int i = 0; i < BWCALLHISTORY - 1; i++) {
  770. btpd.bwrate[i] = btpd.bwrate[i + 1];
  771. sum += btpd.bwrate[i];
  772. }
  773. btpd.bwrate[BWCALLHISTORY - 1] = btpd.bwcalls;
  774. sum += btpd.bwrate[BWCALLHISTORY - 1];
  775. btpd.bwcalls = 0;
  776. btpd.bw_hz_avg = sum / 5.0;
  777. }
  778. void
  779. net_bw_cb(int sd, short type, void *arg)
  780. {
  781. struct peer *p;
  782. btpd.bwcalls++;
  783. double avg_hz;
  784. if (btpd.seconds < BWCALLHISTORY)
  785. avg_hz = btpd.bw_hz;
  786. else
  787. avg_hz = btpd.bw_hz_avg;
  788. btpd.obw_left = btpd.obwlim / avg_hz;
  789. btpd.ibw_left = btpd.ibwlim / avg_hz;
  790. if (btpd.ibwlim > 0) {
  791. while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left > 0) {
  792. BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
  793. p->flags &= ~PF_ON_READQ;
  794. btpd.ibw_left -= p->reader->read(p, btpd.ibw_left);
  795. }
  796. } else {
  797. while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL) {
  798. BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
  799. p->flags &= ~PF_ON_READQ;
  800. p->reader->read(p, 0);
  801. }
  802. }
  803. if (btpd.obwlim) {
  804. while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL && btpd.obw_left > 0) {
  805. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  806. p->flags &= ~PF_ON_WRITEQ;
  807. btpd.obw_left -= net_write(p, btpd.obw_left);
  808. }
  809. } else {
  810. while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL) {
  811. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  812. p->flags &= ~PF_ON_WRITEQ;
  813. net_write(p, 0);
  814. }
  815. }
  816. event_add(&btpd.bwlim, (& (struct timeval) { 0, 1000000 / btpd.bw_hz }));
  817. }