A clone of btpd with my configuration changes.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

909 lines
20 KiB

  1. #include <sys/types.h>
  2. #include <sys/uio.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <netdb.h>
  6. #include <sys/mman.h>
  7. #include <sys/wait.h>
  8. #include <assert.h>
  9. #include <errno.h>
  10. #include <fcntl.h>
  11. #include <math.h>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <unistd.h>
  16. #include "btpd.h"
  17. #define min(x, y) ((x) <= (y) ? (x) : (y))
  18. static unsigned long
  19. net_write(struct peer *p, unsigned long wmax);
  20. void
  21. net_bw_read_cb(int sd, short type, void *arg)
  22. {
  23. struct peer *p;
  24. struct bwlim *bw = arg;
  25. btpd.ibw_left += bw->count;
  26. assert(btpd.ibw_left <= btpd.ibwlim);
  27. unsigned long count = 0;
  28. while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left - count > 0) {
  29. BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
  30. p->flags &= ~PF_ON_READQ;
  31. count += p->reader->read(p, btpd.ibw_left - count);
  32. }
  33. btpd.ibw_left -= count;
  34. BTPDQ_REMOVE(&btpd.bwq, bw, entry);
  35. if (count == 0)
  36. free(bw);
  37. else {
  38. bw->count = count;
  39. event_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  40. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  41. }
  42. }
  43. void
  44. net_read_cb(int sd, short type, void *arg)
  45. {
  46. struct peer *p = (struct peer *)arg;
  47. if (btpd.ibwlim == 0) {
  48. p->reader->read(p, 0);
  49. } else if (btpd.ibw_left > 0) {
  50. unsigned long nread = p->reader->read(p, btpd.ibw_left);
  51. if (nread > 0) {
  52. struct bwlim *bw = btpd_calloc(1, sizeof(*bw));
  53. evtimer_set(&bw->timer, net_bw_read_cb, bw);
  54. evtimer_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  55. bw->count = nread;
  56. btpd.ibw_left -= nread;
  57. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  58. }
  59. } else {
  60. p->flags |= PF_ON_READQ;
  61. BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry);
  62. }
  63. }
  64. void
  65. net_bw_write_cb(int sd, short type, void *arg)
  66. {
  67. struct peer *p;
  68. struct bwlim *bw = arg;
  69. btpd.obw_left += bw->count;
  70. assert(btpd.obw_left <= btpd.obwlim);
  71. unsigned long count = 0;
  72. while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL && btpd.obw_left - count > 0) {
  73. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  74. p->flags &= ~PF_ON_WRITEQ;
  75. count += net_write(p, btpd.obw_left - count);
  76. }
  77. btpd.obw_left -= count;
  78. BTPDQ_REMOVE(&btpd.bwq, bw, entry);
  79. if (count == 0)
  80. free(bw);
  81. else {
  82. bw->count = count;
  83. event_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  84. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  85. }
  86. }
  87. void
  88. net_write_cb(int sd, short type, void *arg)
  89. {
  90. struct peer *p = (struct peer *)arg;
  91. if (btpd.obwlim == 0) {
  92. net_write(p, 0);
  93. } else if (btpd.obw_left > 0) {
  94. unsigned long nw = net_write(p, btpd.obw_left);
  95. if (nw > 0) {
  96. struct bwlim *bw = btpd_calloc(1, sizeof(*bw));
  97. evtimer_set(&bw->timer, net_bw_write_cb, bw);
  98. evtimer_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  99. bw->count = nw;
  100. btpd.obw_left -= nw;
  101. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  102. }
  103. } else {
  104. p->flags |= PF_ON_WRITEQ;
  105. BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry);
  106. }
  107. }
  108. static void
  109. nokill_iob(struct io_buffer *iob)
  110. {
  111. //Nothing
  112. }
  113. static void
  114. kill_free_buf(struct io_buffer *iob)
  115. {
  116. free(iob->buf);
  117. }
  118. static struct iob_link *
  119. malloc_liob(size_t len)
  120. {
  121. struct iob_link *iol;
  122. iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol) + len);
  123. iol->iob.buf = (char *)(iol + 1);
  124. iol->iob.buf_len = len;
  125. iol->iob.buf_off = 0;
  126. iol->kill_buf = nokill_iob;
  127. return iol;
  128. }
  129. static struct iob_link *
  130. salloc_liob(char *buf, size_t len, void (*kill_buf)(struct io_buffer *))
  131. {
  132. struct iob_link *iol;
  133. iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol));
  134. iol->iob.buf = buf;
  135. iol->iob.buf_len = len;
  136. iol->iob.buf_off = 0;
  137. iol->kill_buf = kill_buf;
  138. return iol;
  139. }
  140. void
  141. net_unsend_piece(struct peer *p, struct piece_req *req)
  142. {
  143. struct iob_link *piece;
  144. BTPDQ_REMOVE(&p->p_reqs, req, entry);
  145. piece = BTPDQ_NEXT(req->head, entry);
  146. BTPDQ_REMOVE(&p->outq, piece, entry);
  147. piece->kill_buf(&piece->iob);
  148. free(piece);
  149. BTPDQ_REMOVE(&p->outq, req->head, entry);
  150. req->head->kill_buf(&req->head->iob);
  151. free(req->head);
  152. free(req);
  153. if (BTPDQ_EMPTY(&p->outq)) {
  154. if (p->flags & PF_ON_WRITEQ) {
  155. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  156. p->flags &= ~PF_ON_WRITEQ;
  157. } else
  158. event_del(&p->out_ev);
  159. }
  160. }
  161. void
  162. kill_shake(struct input_reader *reader)
  163. {
  164. free(reader);
  165. }
  166. #define NIOV 16
  167. static unsigned long
  168. net_write(struct peer *p, unsigned long wmax)
  169. {
  170. struct iob_link *iol;
  171. struct piece_req *req;
  172. struct iovec iov[NIOV];
  173. int niov;
  174. int limited;
  175. ssize_t nwritten;
  176. unsigned long bcount;
  177. limited = wmax > 0;
  178. niov = 0;
  179. assert((iol = BTPDQ_FIRST(&p->outq)) != NULL);
  180. while (niov < NIOV && iol != NULL
  181. && (!limited || (limited && wmax > 0))) {
  182. iov[niov].iov_base = iol->iob.buf + iol->iob.buf_off;
  183. iov[niov].iov_len = iol->iob.buf_len - iol->iob.buf_off;
  184. if (limited) {
  185. if (iov[niov].iov_len > wmax)
  186. iov[niov].iov_len = wmax;
  187. wmax -= iov[niov].iov_len;
  188. }
  189. niov++;
  190. iol = BTPDQ_NEXT(iol, entry);
  191. }
  192. nwritten = writev(p->sd, iov, niov);
  193. if (nwritten < 0) {
  194. if (errno == EAGAIN) {
  195. event_add(&p->out_ev, NULL);
  196. return 0;
  197. } else {
  198. btpd_log(BTPD_L_CONN, "write error: %s\n", strerror(errno));
  199. peer_kill(p);
  200. return 0;
  201. }
  202. }
  203. bcount = nwritten;
  204. req = BTPDQ_FIRST(&p->p_reqs);
  205. iol = BTPDQ_FIRST(&p->outq);
  206. while (bcount > 0) {
  207. unsigned long bufdelta = iol->iob.buf_len - iol->iob.buf_off;
  208. if (req != NULL && req->head == iol) {
  209. struct piece_req *next = BTPDQ_NEXT(req, entry);
  210. BTPDQ_REMOVE(&p->p_reqs, req, entry);
  211. free(req);
  212. req = next;
  213. }
  214. if (bcount >= bufdelta) {
  215. if (iol->upload) {
  216. p->tp->uploaded += bufdelta;
  217. p->rate_from_me[btpd.seconds % RATEHISTORY] += bufdelta;
  218. }
  219. bcount -= bufdelta;
  220. BTPDQ_REMOVE(&p->outq, iol, entry);
  221. iol->kill_buf(&iol->iob);
  222. free(iol);
  223. iol = BTPDQ_FIRST(&p->outq);
  224. } else {
  225. if (iol->upload) {
  226. p->tp->uploaded += bcount;
  227. p->rate_from_me[btpd.seconds % RATEHISTORY] += bcount;
  228. }
  229. iol->iob.buf_off += bcount;
  230. bcount = 0;
  231. }
  232. }
  233. if (!BTPDQ_EMPTY(&p->outq))
  234. event_add(&p->out_ev, NULL);
  235. else if (p->flags & PF_WRITE_CLOSE) {
  236. btpd_log(BTPD_L_CONN, "Closed because of write flag.\n");
  237. peer_kill(p);
  238. }
  239. return nwritten;
  240. }
  241. void
  242. net_send(struct peer *p, struct iob_link *iol)
  243. {
  244. if (BTPDQ_EMPTY(&p->outq))
  245. event_add(&p->out_ev, NULL);
  246. BTPDQ_INSERT_TAIL(&p->outq, iol, entry);
  247. }
  248. void
  249. net_write32(void *buf, uint32_t num)
  250. {
  251. *(uint32_t *)buf = htonl(num);
  252. }
  253. uint32_t
  254. net_read32(void *buf)
  255. {
  256. return ntohl(*(uint32_t *)buf);
  257. }
  258. void
  259. net_send_piece(struct peer *p, uint32_t index, uint32_t begin,
  260. char *block, size_t blen)
  261. {
  262. struct iob_link *head, *piece;
  263. struct piece_req *req;
  264. btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen);
  265. head = malloc_liob(13);
  266. net_write32(head->iob.buf, 9 + blen);
  267. head->iob.buf[4] = MSG_PIECE;
  268. net_write32(head->iob.buf + 5, index);
  269. net_write32(head->iob.buf + 9, begin);
  270. net_send(p, head);
  271. piece = salloc_liob(block, blen, kill_free_buf);
  272. piece->upload = 1;
  273. net_send(p, piece);
  274. req = btpd_malloc(sizeof(*req));
  275. req->index = index;
  276. req->begin = begin;
  277. req->length = blen;
  278. req->head = head;
  279. BTPDQ_INSERT_TAIL(&p->p_reqs, req, entry);
  280. }
  281. void
  282. net_send_request(struct peer *p, struct piece_req *req)
  283. {
  284. struct iob_link *out;
  285. out = malloc_liob(17);
  286. net_write32(out->iob.buf, 13);
  287. out->iob.buf[4] = MSG_REQUEST;
  288. net_write32(out->iob.buf + 5, req->index);
  289. net_write32(out->iob.buf + 9, req->begin);
  290. net_write32(out->iob.buf + 13, req->length);
  291. net_send(p, out);
  292. }
  293. void
  294. net_send_cancel(struct peer *p, struct piece_req *req)
  295. {
  296. struct iob_link *out;
  297. out = malloc_liob(17);
  298. net_write32(out->iob.buf, 13);
  299. out->iob.buf[4] = MSG_CANCEL;
  300. net_write32(out->iob.buf + 5, req->index);
  301. net_write32(out->iob.buf + 9, req->begin);
  302. net_write32(out->iob.buf + 13, req->length);
  303. net_send(p, out);
  304. }
  305. void
  306. net_send_have(struct peer *p, uint32_t index)
  307. {
  308. struct iob_link *out;
  309. out = malloc_liob(9);
  310. net_write32(out->iob.buf, 5);
  311. out->iob.buf[4] = MSG_HAVE;
  312. net_write32(out->iob.buf + 5, index);
  313. net_send(p, out);
  314. }
  315. void
  316. net_send_onesized(struct peer *p, char type)
  317. {
  318. struct iob_link *out;
  319. out = malloc_liob(5);
  320. net_write32(out->iob.buf, 1);
  321. out->iob.buf[4] = type;
  322. net_send(p, out);
  323. }
  324. void
  325. net_send_unchoke(struct peer *p)
  326. {
  327. net_send_onesized(p, MSG_UNCHOKE);
  328. }
  329. void
  330. net_send_choke(struct peer *p)
  331. {
  332. net_send_onesized(p, MSG_CHOKE);
  333. }
  334. void
  335. net_send_uninterest(struct peer *p)
  336. {
  337. net_send_onesized(p, MSG_UNINTEREST);
  338. }
  339. void
  340. net_send_interest(struct peer *p)
  341. {
  342. net_send_onesized(p, MSG_INTEREST);
  343. }
  344. void
  345. net_send_bitfield(struct peer *p)
  346. {
  347. struct iob_link *out;
  348. uint32_t plen;
  349. plen = (uint32_t)ceil(p->tp->meta.npieces / 8.0);
  350. out = malloc_liob(5);
  351. net_write32(out->iob.buf, plen + 1);
  352. out->iob.buf[4] = MSG_BITFIELD;
  353. net_send(p, out);
  354. out = salloc_liob(p->tp->piece_field, plen, nokill_iob);
  355. net_send(p, out);
  356. }
  357. void
  358. net_send_shake(struct peer *p)
  359. {
  360. struct iob_link *out;
  361. out = malloc_liob(68);
  362. bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->iob.buf, 28);
  363. bcopy(p->tp->meta.info_hash, out->iob.buf + 28, 20);
  364. bcopy(btpd.peer_id, out->iob.buf + 48, 20);
  365. net_send(p, out);
  366. if (p->tp->have_npieces > 0)
  367. net_send_bitfield(p);
  368. }
  369. static void
  370. kill_generic(struct input_reader *reader)
  371. {
  372. free(reader);
  373. }
  374. static size_t
  375. net_read(struct peer *p, char *buf, size_t len)
  376. {
  377. ssize_t nread = read(p->sd, buf, len);
  378. if (nread < 0) {
  379. if (errno == EAGAIN) {
  380. event_add(&p->in_ev, NULL);
  381. return 0;
  382. } else {
  383. btpd_log(BTPD_L_CONN, "read error: %s\n", strerror(errno));
  384. peer_kill(p);
  385. return 0;
  386. }
  387. } else if (nread == 0) {
  388. btpd_log(BTPD_L_CONN, "conn closed by other side.\n");
  389. if (!BTPDQ_EMPTY(&p->outq))
  390. p->flags |= PF_WRITE_CLOSE;
  391. else
  392. peer_kill(p);
  393. return 0;
  394. } else
  395. return nread;
  396. }
  397. static size_t
  398. net_read_to_buf(struct peer *p, struct io_buffer *iob, unsigned long rmax)
  399. {
  400. if (rmax == 0)
  401. rmax = iob->buf_len - iob->buf_off;
  402. else
  403. rmax = min(rmax, iob->buf_len - iob->buf_off);
  404. assert(rmax > 0);
  405. size_t nread = net_read(p, iob->buf + iob->buf_off, rmax);
  406. if (nread > 0)
  407. iob->buf_off += nread;
  408. return nread;
  409. }
  410. void
  411. kill_bitfield(struct input_reader *rd)
  412. {
  413. free(rd);
  414. }
  415. static void net_generic_reader(struct peer *p);
  416. static unsigned long
  417. read_bitfield(struct peer *p, unsigned long rmax)
  418. {
  419. struct bitfield_reader *rd = (struct bitfield_reader *)p->reader;
  420. size_t nread = net_read_to_buf(p, &rd->iob, rmax);
  421. if (nread == 0)
  422. return 0;
  423. if (rd->iob.buf_off == rd->iob.buf_len) {
  424. peer_on_bitfield(p, rd->iob.buf);
  425. free(rd);
  426. net_generic_reader(p);
  427. } else
  428. event_add(&p->in_ev, NULL);
  429. return nread;
  430. }
  431. void
  432. kill_piece(struct input_reader *rd)
  433. {
  434. free(rd);
  435. }
  436. static unsigned long
  437. read_piece(struct peer *p, unsigned long rmax)
  438. {
  439. struct piece_reader *rd = (struct piece_reader *)p->reader;
  440. size_t nread = net_read_to_buf(p, &rd->iob, rmax);
  441. if (nread == 0)
  442. return 0;
  443. p->rate_to_me[btpd.seconds % RATEHISTORY] += nread;
  444. p->tp->downloaded += nread;
  445. if (rd->iob.buf_off == rd->iob.buf_len) {
  446. peer_on_piece(p, rd->index, rd->begin, rd->iob.buf_len, rd->iob.buf);
  447. free(rd);
  448. net_generic_reader(p);
  449. } else
  450. event_add(&p->in_ev, NULL);
  451. return nread;
  452. }
  453. #define GRBUFLEN (1 << 15)
  454. static unsigned long
  455. net_generic_read(struct peer *p, unsigned long rmax)
  456. {
  457. char buf[GRBUFLEN];
  458. struct io_buffer iob = { 0, GRBUFLEN, buf };
  459. struct generic_reader *gr = (struct generic_reader *)p->reader;
  460. size_t nread;
  461. size_t off, len;
  462. int got_part;
  463. if (gr->iob.buf_off > 0) {
  464. iob.buf_off = gr->iob.buf_off;
  465. bcopy(gr->iob.buf, iob.buf, iob.buf_off);
  466. gr->iob.buf_off = 0;
  467. }
  468. if ((nread = net_read_to_buf(p, &iob, rmax)) == 0)
  469. return 0;
  470. len = iob.buf_off;
  471. off = 0;
  472. got_part = 0;
  473. while (!got_part && len - off >= 4) {
  474. size_t msg_len = net_read32(buf + off);
  475. if (msg_len == 0) { /* Keep alive */
  476. off += 4;
  477. continue;
  478. }
  479. if (len - off < 5) {
  480. got_part = 1;
  481. break;
  482. }
  483. switch (buf[off + 4]) {
  484. case MSG_CHOKE:
  485. btpd_log(BTPD_L_MSG, "choke.\n");
  486. if (msg_len != 1)
  487. goto bad_data;
  488. peer_on_choke(p);
  489. break;
  490. case MSG_UNCHOKE:
  491. btpd_log(BTPD_L_MSG, "unchoke.\n");
  492. if (msg_len != 1)
  493. goto bad_data;
  494. peer_on_unchoke(p);
  495. break;
  496. case MSG_INTEREST:
  497. btpd_log(BTPD_L_MSG, "interested.\n");
  498. if (msg_len != 1)
  499. goto bad_data;
  500. peer_on_interest(p);
  501. break;
  502. case MSG_UNINTEREST:
  503. btpd_log(BTPD_L_MSG, "uninterested.\n");
  504. if (msg_len != 1)
  505. goto bad_data;
  506. peer_on_uninterest(p);
  507. break;
  508. case MSG_HAVE:
  509. btpd_log(BTPD_L_MSG, "have.\n");
  510. if (msg_len != 5)
  511. goto bad_data;
  512. else if (len - off >= msg_len + 4) {
  513. uint32_t index = net_read32(buf + off + 5);
  514. peer_on_have(p, index);
  515. } else
  516. got_part = 1;
  517. break;
  518. case MSG_BITFIELD:
  519. btpd_log(BTPD_L_MSG, "bitfield.\n");
  520. if (msg_len != (size_t)ceil(p->tp->meta.npieces / 8.0) + 1)
  521. goto bad_data;
  522. else if (p->npieces != 0)
  523. goto bad_data;
  524. else if (len - off >= msg_len + 4)
  525. peer_on_bitfield(p, buf + off + 5);
  526. else {
  527. struct bitfield_reader *rp;
  528. size_t mem = sizeof(*rp) + msg_len - 1;
  529. p->reader->kill(p->reader);
  530. rp = btpd_calloc(1, mem);
  531. rp->rd.kill = kill_bitfield;
  532. rp->rd.read = read_bitfield;
  533. rp->iob.buf = (char *)rp + sizeof(*rp);
  534. rp->iob.buf_len = msg_len - 1;
  535. rp->iob.buf_off = len - off - 5;
  536. bcopy(buf + off + 5, rp->iob.buf, rp->iob.buf_off);
  537. p->reader = (struct input_reader *)rp;
  538. event_add(&p->in_ev, NULL);
  539. return nread;
  540. }
  541. break;
  542. case MSG_REQUEST:
  543. btpd_log(BTPD_L_MSG, "request.\n");
  544. if (msg_len != 13)
  545. goto bad_data;
  546. else if (len - off >= msg_len + 4) {
  547. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) != PF_P_WANT)
  548. break;
  549. uint32_t index, begin, length;
  550. index = net_read32(buf + off + 5);
  551. begin = net_read32(buf + off + 9);
  552. length = net_read32(buf + off + 13);
  553. if (length > (1 << 15))
  554. goto bad_data;
  555. if (index >= p->tp->meta.npieces)
  556. goto bad_data;
  557. if (!has_bit(p->tp->piece_field, index))
  558. goto bad_data;
  559. if (begin + length > torrent_piece_size(p->tp, index))
  560. goto bad_data;
  561. peer_on_request(p, index, begin, length);
  562. } else
  563. got_part = 1;
  564. break;
  565. case MSG_PIECE:
  566. btpd_log(BTPD_L_MSG, "piece.\n");
  567. if (msg_len < 10)
  568. goto bad_data;
  569. else if (len - off >= 13) {
  570. uint32_t index = net_read32(buf + off + 5);
  571. uint32_t begin = net_read32(buf + off + 9);
  572. uint32_t length = msg_len - 9;
  573. if (len - off >= msg_len + 4) {
  574. p->tp->downloaded += length;
  575. p->rate_to_me[btpd.seconds % RATEHISTORY] += length;
  576. peer_on_piece(p, index, begin, length, buf + off + 13);
  577. } else {
  578. struct piece_reader *rp;
  579. size_t mem = sizeof(*rp) + length;
  580. p->reader->kill(p->reader);
  581. rp = btpd_calloc(1, mem);
  582. rp->rd.kill = kill_piece;
  583. rp->rd.read = read_piece;
  584. rp->index = index;
  585. rp->begin = begin;
  586. rp->iob.buf = (char *)rp + sizeof(*rp);
  587. rp->iob.buf_len = length;
  588. rp->iob.buf_off = len - off - 13;
  589. bcopy(buf + off + 13, rp->iob.buf, rp->iob.buf_off);
  590. p->reader = (struct input_reader *)rp;
  591. event_add(&p->in_ev, NULL);
  592. p->tp->downloaded += rp->iob.buf_off;
  593. p->rate_to_me[btpd.seconds % RATEHISTORY] +=
  594. rp->iob.buf_off;
  595. return nread;
  596. }
  597. } else
  598. got_part = 1;
  599. break;
  600. case MSG_CANCEL:
  601. if (msg_len != 13)
  602. goto bad_data;
  603. else if (len - off >= msg_len + 4) {
  604. uint32_t index = net_read32(buf + off + 5);
  605. uint32_t begin = net_read32(buf + off + 9);
  606. uint32_t length = net_read32(buf + off + 13);
  607. if (index > p->tp->meta.npieces)
  608. goto bad_data;
  609. if (begin + length > torrent_piece_size(p->tp, index))
  610. goto bad_data;
  611. btpd_log(BTPD_L_MSG, "cancel: %u, %u, %u\n",
  612. index, begin, length);
  613. peer_on_cancel(p, index, begin, length);
  614. } else
  615. got_part = 1;
  616. break;
  617. default:
  618. goto bad_data;
  619. }
  620. if (!got_part)
  621. off += 4 + msg_len;
  622. }
  623. if (off != len) {
  624. gr->iob.buf_off = len - off;
  625. assert(gr->iob.buf_off <= gr->iob.buf_len);
  626. bcopy(buf + off, gr->iob.buf, gr->iob.buf_off);
  627. }
  628. event_add(&p->in_ev, NULL);
  629. return nread;
  630. bad_data:
  631. btpd_log(BTPD_L_MSG, "bad data\n");
  632. peer_kill(p);
  633. return nread;
  634. }
  635. static void
  636. net_generic_reader(struct peer *p)
  637. {
  638. struct generic_reader *gr;
  639. gr = btpd_calloc(1, sizeof(*gr));
  640. gr->rd.read = net_generic_read;
  641. gr->rd.kill = kill_generic;
  642. gr->iob.buf = gr->_io_buf;
  643. gr->iob.buf_len = MAX_INPUT_LEFT;
  644. gr->iob.buf_off = 0;
  645. p->reader = (struct input_reader *)gr;
  646. event_add(&p->in_ev, NULL);
  647. }
  648. static unsigned long
  649. net_shake_read(struct peer *p, unsigned long rmax)
  650. {
  651. struct handshake *hs = (struct handshake *)p->reader;
  652. struct io_buffer *in = &hs->in;
  653. size_t nread = net_read_to_buf(p, in, rmax);
  654. if (nread == 0)
  655. return 0;
  656. switch (hs->state) {
  657. case SHAKE_INIT:
  658. if (in->buf_off < 20)
  659. break;
  660. else if (bcmp(in->buf, "\x13""BitTorrent protocol", 20) == 0)
  661. hs->state = SHAKE_PSTR;
  662. else
  663. goto bad_shake;
  664. case SHAKE_PSTR:
  665. if (in->buf_off < 28)
  666. break;
  667. else
  668. hs->state = SHAKE_RESERVED;
  669. case SHAKE_RESERVED:
  670. if (in->buf_off < 48)
  671. break;
  672. else if (hs->incoming) {
  673. struct torrent *tp = torrent_get_by_hash(in->buf + 28);
  674. if (tp != NULL) {
  675. hs->state = SHAKE_INFO;
  676. p->tp = tp;
  677. net_send_shake(p);
  678. } else
  679. goto bad_shake;
  680. } else {
  681. if (bcmp(in->buf + 28, p->tp->meta.info_hash, 20) == 0)
  682. hs->state = SHAKE_INFO;
  683. else
  684. goto bad_shake;
  685. }
  686. case SHAKE_INFO:
  687. if (in->buf_off < 68)
  688. break;
  689. else {
  690. if (torrent_has_peer(p->tp, in->buf + 48))
  691. goto bad_shake; // Not really, but we're already connected.
  692. else if (bcmp(in->buf + 48, btpd.peer_id, 20) == 0)
  693. goto bad_shake; // Connection from myself.
  694. bcopy(in->buf + 48, p->id, 20);
  695. hs->state = SHAKE_ID;
  696. }
  697. default:
  698. assert(hs->state == SHAKE_ID);
  699. }
  700. if (hs->state == SHAKE_ID) {
  701. btpd_log(BTPD_L_CONN, "Got whole shake.\n");
  702. free(hs);
  703. p->piece_field = btpd_calloc(1, (int)ceil(p->tp->meta.npieces / 8.0));
  704. cm_on_new_peer(p);
  705. net_generic_reader(p);
  706. } else
  707. event_add(&p->in_ev, NULL);
  708. return nread;
  709. bad_shake:
  710. btpd_log(BTPD_L_CONN, "Bad shake(%d)\n", hs->state);
  711. peer_kill(p);
  712. return nread;
  713. }
  714. void
  715. net_handshake(struct peer *p, int incoming)
  716. {
  717. struct handshake *hs;
  718. hs = calloc(1, sizeof(*hs));
  719. hs->incoming = incoming;
  720. hs->state = SHAKE_INIT;
  721. hs->in.buf_len = SHAKE_LEN;
  722. hs->in.buf_off = 0;
  723. hs->in.buf = hs->_io_buf;
  724. p->reader = (struct input_reader *)hs;
  725. hs->rd.read = net_shake_read;
  726. hs->rd.kill = kill_shake;
  727. if (!incoming)
  728. net_send_shake(p);
  729. }
  730. int
  731. net_connect2(struct sockaddr *sa, socklen_t salen, int *sd)
  732. {
  733. if ((*sd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  734. return errno;
  735. set_nonblocking(*sd);
  736. if (connect(*sd, sa, salen) == -1 && errno != EINPROGRESS) {
  737. btpd_log(BTPD_L_CONN, "Botched connection %s.", strerror(errno));
  738. close(*sd);
  739. return errno;
  740. }
  741. return 0;
  742. }
  743. int
  744. net_connect(const char *ip, int port, int *sd)
  745. {
  746. struct addrinfo hints, *res;
  747. char portstr[6];
  748. assert(btpd.npeers < btpd.maxpeers);
  749. if (snprintf(portstr, sizeof(portstr), "%d", port) >= sizeof(portstr))
  750. return EINVAL;
  751. bzero(&hints, sizeof(hints));
  752. hints.ai_family = AF_UNSPEC;
  753. hints.ai_flags = AI_NUMERICHOST;
  754. hints.ai_socktype = SOCK_STREAM;
  755. if (getaddrinfo(ip, portstr, &hints, &res) != 0)
  756. return errno;
  757. int error = net_connect2(res->ai_addr, res->ai_addrlen, sd);
  758. freeaddrinfo(res);
  759. return error;
  760. }
  761. void
  762. net_connection_cb(int sd, short type, void *arg)
  763. {
  764. int nsd;
  765. nsd = accept(sd, NULL, NULL);
  766. if (nsd < 0) {
  767. if (errno == EWOULDBLOCK || errno == ECONNABORTED)
  768. return;
  769. else
  770. btpd_err("accept4: %s\n", strerror(errno));
  771. }
  772. if (set_nonblocking(nsd) != 0) {
  773. close(nsd);
  774. return;
  775. }
  776. assert(btpd.npeers <= btpd.maxpeers);
  777. if (btpd.npeers == btpd.maxpeers) {
  778. close(nsd);
  779. return;
  780. }
  781. peer_create_in(nsd);
  782. btpd_log(BTPD_L_CONN, "got connection.\n");
  783. }
  784. void
  785. net_by_second(void)
  786. {
  787. struct peer *p;
  788. struct torrent *tp;
  789. int ri = btpd.seconds % RATEHISTORY;
  790. BTPDQ_FOREACH(tp, &btpd.cm_list, entry) {
  791. BTPDQ_FOREACH(p, &tp->peers, cm_entry) {
  792. p->rate_to_me[ri] = 0;
  793. p->rate_from_me[ri] = 0;
  794. }
  795. }
  796. }