A clone of btpd with my configuration changes.

920 lines
21 KiB

  1. #include <sys/types.h>
  2. #include <sys/uio.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <netdb.h>
  6. #include <sys/mman.h>
  7. #include <sys/wait.h>
  8. #include <assert.h>
  9. #include <errno.h>
  10. #include <fcntl.h>
  11. #include <math.h>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <unistd.h>
  16. #include "btpd.h"
  17. #define WRITE_TIMEOUT (& (struct timeval) { 60, 0 })
  18. #define min(x, y) ((x) <= (y) ? (x) : (y))
  19. static unsigned long
  20. net_write(struct peer *p, unsigned long wmax);
  21. void
  22. net_bw_read_cb(int sd, short type, void *arg)
  23. {
  24. struct peer *p;
  25. struct bwlim *bw = arg;
  26. btpd.ibw_left += bw->count;
  27. assert(btpd.ibw_left <= btpd.ibwlim);
  28. unsigned long count = 0;
  29. while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left - count > 0) {
  30. BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
  31. p->flags &= ~PF_ON_READQ;
  32. count += p->reader->read(p, btpd.ibw_left - count);
  33. }
  34. btpd.ibw_left -= count;
  35. BTPDQ_REMOVE(&btpd.bwq, bw, entry);
  36. if (count == 0)
  37. free(bw);
  38. else {
  39. bw->count = count;
  40. event_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  41. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  42. }
  43. }
  44. void
  45. net_read_cb(int sd, short type, void *arg)
  46. {
  47. struct peer *p = (struct peer *)arg;
  48. if (btpd.ibwlim == 0) {
  49. p->reader->read(p, 0);
  50. } else if (btpd.ibw_left > 0) {
  51. unsigned long nread = p->reader->read(p, btpd.ibw_left);
  52. if (nread > 0) {
  53. struct bwlim *bw = btpd_calloc(1, sizeof(*bw));
  54. evtimer_set(&bw->timer, net_bw_read_cb, bw);
  55. evtimer_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  56. bw->count = nread;
  57. btpd.ibw_left -= nread;
  58. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  59. }
  60. } else {
  61. p->flags |= PF_ON_READQ;
  62. BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry);
  63. }
  64. }
  65. void
  66. net_bw_write_cb(int sd, short type, void *arg)
  67. {
  68. struct peer *p;
  69. struct bwlim *bw = arg;
  70. btpd.obw_left += bw->count;
  71. assert(btpd.obw_left <= btpd.obwlim);
  72. unsigned long count = 0;
  73. while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL && btpd.obw_left - count > 0) {
  74. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  75. p->flags &= ~PF_ON_WRITEQ;
  76. count += net_write(p, btpd.obw_left - count);
  77. }
  78. btpd.obw_left -= count;
  79. BTPDQ_REMOVE(&btpd.bwq, bw, entry);
  80. if (count == 0)
  81. free(bw);
  82. else {
  83. bw->count = count;
  84. event_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  85. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  86. }
  87. }
  88. void
  89. net_write_cb(int sd, short type, void *arg)
  90. {
  91. struct peer *p = (struct peer *)arg;
  92. if (type == EV_TIMEOUT) {
  93. btpd_log(BTPD_L_ERROR, "Write attempt timed out.\n");
  94. peer_kill(p);
  95. return;
  96. }
  97. if (btpd.obwlim == 0) {
  98. net_write(p, 0);
  99. } else if (btpd.obw_left > 0) {
  100. unsigned long nw = net_write(p, btpd.obw_left);
  101. if (nw > 0) {
  102. struct bwlim *bw = btpd_calloc(1, sizeof(*bw));
  103. evtimer_set(&bw->timer, net_bw_write_cb, bw);
  104. evtimer_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  105. bw->count = nw;
  106. btpd.obw_left -= nw;
  107. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  108. }
  109. } else {
  110. p->flags |= PF_ON_WRITEQ;
  111. BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry);
  112. }
  113. }
  114. static void
  115. nokill_iob(struct io_buffer *iob)
  116. {
  117. //Nothing
  118. }
  119. static void
  120. kill_free_buf(struct io_buffer *iob)
  121. {
  122. free(iob->buf);
  123. }
  124. static struct iob_link *
  125. malloc_liob(size_t len)
  126. {
  127. struct iob_link *iol;
  128. iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol) + len);
  129. iol->iob.buf = (char *)(iol + 1);
  130. iol->iob.buf_len = len;
  131. iol->iob.buf_off = 0;
  132. iol->kill_buf = nokill_iob;
  133. return iol;
  134. }
  135. static struct iob_link *
  136. salloc_liob(char *buf, size_t len, void (*kill_buf)(struct io_buffer *))
  137. {
  138. struct iob_link *iol;
  139. iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol));
  140. iol->iob.buf = buf;
  141. iol->iob.buf_len = len;
  142. iol->iob.buf_off = 0;
  143. iol->kill_buf = kill_buf;
  144. return iol;
  145. }
  146. void
  147. net_unsend_piece(struct peer *p, struct piece_req *req)
  148. {
  149. struct iob_link *piece;
  150. BTPDQ_REMOVE(&p->p_reqs, req, entry);
  151. piece = BTPDQ_NEXT(req->head, entry);
  152. BTPDQ_REMOVE(&p->outq, piece, entry);
  153. piece->kill_buf(&piece->iob);
  154. free(piece);
  155. BTPDQ_REMOVE(&p->outq, req->head, entry);
  156. req->head->kill_buf(&req->head->iob);
  157. free(req->head);
  158. free(req);
  159. if (BTPDQ_EMPTY(&p->outq)) {
  160. if (p->flags & PF_ON_WRITEQ) {
  161. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  162. p->flags &= ~PF_ON_WRITEQ;
  163. } else
  164. event_del(&p->out_ev);
  165. }
  166. }
  167. void
  168. kill_shake(struct input_reader *reader)
  169. {
  170. free(reader);
  171. }
  172. #define NIOV 16
  173. static unsigned long
  174. net_write(struct peer *p, unsigned long wmax)
  175. {
  176. struct iob_link *iol;
  177. struct piece_req *req;
  178. struct iovec iov[NIOV];
  179. int niov;
  180. int limited;
  181. ssize_t nwritten;
  182. unsigned long bcount;
  183. limited = wmax > 0;
  184. niov = 0;
  185. assert((iol = BTPDQ_FIRST(&p->outq)) != NULL);
  186. while (niov < NIOV && iol != NULL
  187. && (!limited || (limited && wmax > 0))) {
  188. iov[niov].iov_base = iol->iob.buf + iol->iob.buf_off;
  189. iov[niov].iov_len = iol->iob.buf_len - iol->iob.buf_off;
  190. if (limited) {
  191. if (iov[niov].iov_len > wmax)
  192. iov[niov].iov_len = wmax;
  193. wmax -= iov[niov].iov_len;
  194. }
  195. niov++;
  196. iol = BTPDQ_NEXT(iol, entry);
  197. }
  198. nwritten = writev(p->sd, iov, niov);
  199. if (nwritten < 0) {
  200. if (errno == EAGAIN) {
  201. event_add(&p->out_ev, WRITE_TIMEOUT);
  202. return 0;
  203. } else {
  204. btpd_log(BTPD_L_CONN, "write error: %s\n", strerror(errno));
  205. peer_kill(p);
  206. return 0;
  207. }
  208. } else if (nwritten == 0) {
  209. btpd_log(BTPD_L_CONN, "connection close by peer.\n");
  210. peer_kill(p);
  211. return 0;
  212. }
  213. bcount = nwritten;
  214. req = BTPDQ_FIRST(&p->p_reqs);
  215. iol = BTPDQ_FIRST(&p->outq);
  216. while (bcount > 0) {
  217. unsigned long bufdelta = iol->iob.buf_len - iol->iob.buf_off;
  218. if (req != NULL && req->head == iol) {
  219. struct piece_req *next = BTPDQ_NEXT(req, entry);
  220. BTPDQ_REMOVE(&p->p_reqs, req, entry);
  221. free(req);
  222. req = next;
  223. }
  224. if (bcount >= bufdelta) {
  225. if (iol->upload) {
  226. p->tp->uploaded += bufdelta;
  227. p->rate_from_me[btpd.seconds % RATEHISTORY] += bufdelta;
  228. }
  229. bcount -= bufdelta;
  230. BTPDQ_REMOVE(&p->outq, iol, entry);
  231. iol->kill_buf(&iol->iob);
  232. free(iol);
  233. iol = BTPDQ_FIRST(&p->outq);
  234. } else {
  235. if (iol->upload) {
  236. p->tp->uploaded += bcount;
  237. p->rate_from_me[btpd.seconds % RATEHISTORY] += bcount;
  238. }
  239. iol->iob.buf_off += bcount;
  240. bcount = 0;
  241. }
  242. }
  243. if (!BTPDQ_EMPTY(&p->outq))
  244. event_add(&p->out_ev, WRITE_TIMEOUT);
  245. else if (p->flags & PF_WRITE_CLOSE) {
  246. btpd_log(BTPD_L_CONN, "Closed because of write flag.\n");
  247. peer_kill(p);
  248. }
  249. return nwritten;
  250. }
  251. void
  252. net_send(struct peer *p, struct iob_link *iol)
  253. {
  254. if (BTPDQ_EMPTY(&p->outq))
  255. event_add(&p->out_ev, WRITE_TIMEOUT);
  256. BTPDQ_INSERT_TAIL(&p->outq, iol, entry);
  257. }
  258. void
  259. net_write32(void *buf, uint32_t num)
  260. {
  261. *(uint32_t *)buf = htonl(num);
  262. }
  263. uint32_t
  264. net_read32(void *buf)
  265. {
  266. return ntohl(*(uint32_t *)buf);
  267. }
  268. void
  269. net_send_piece(struct peer *p, uint32_t index, uint32_t begin,
  270. char *block, size_t blen)
  271. {
  272. struct iob_link *head, *piece;
  273. struct piece_req *req;
  274. btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen);
  275. head = malloc_liob(13);
  276. net_write32(head->iob.buf, 9 + blen);
  277. head->iob.buf[4] = MSG_PIECE;
  278. net_write32(head->iob.buf + 5, index);
  279. net_write32(head->iob.buf + 9, begin);
  280. net_send(p, head);
  281. piece = salloc_liob(block, blen, kill_free_buf);
  282. piece->upload = 1;
  283. net_send(p, piece);
  284. req = btpd_malloc(sizeof(*req));
  285. req->index = index;
  286. req->begin = begin;
  287. req->length = blen;
  288. req->head = head;
  289. BTPDQ_INSERT_TAIL(&p->p_reqs, req, entry);
  290. }
  291. void
  292. net_send_request(struct peer *p, struct piece_req *req)
  293. {
  294. struct iob_link *out;
  295. out = malloc_liob(17);
  296. net_write32(out->iob.buf, 13);
  297. out->iob.buf[4] = MSG_REQUEST;
  298. net_write32(out->iob.buf + 5, req->index);
  299. net_write32(out->iob.buf + 9, req->begin);
  300. net_write32(out->iob.buf + 13, req->length);
  301. net_send(p, out);
  302. }
  303. void
  304. net_send_cancel(struct peer *p, struct piece_req *req)
  305. {
  306. struct iob_link *out;
  307. out = malloc_liob(17);
  308. net_write32(out->iob.buf, 13);
  309. out->iob.buf[4] = MSG_CANCEL;
  310. net_write32(out->iob.buf + 5, req->index);
  311. net_write32(out->iob.buf + 9, req->begin);
  312. net_write32(out->iob.buf + 13, req->length);
  313. net_send(p, out);
  314. }
  315. void
  316. net_send_have(struct peer *p, uint32_t index)
  317. {
  318. struct iob_link *out;
  319. out = malloc_liob(9);
  320. net_write32(out->iob.buf, 5);
  321. out->iob.buf[4] = MSG_HAVE;
  322. net_write32(out->iob.buf + 5, index);
  323. net_send(p, out);
  324. }
  325. void
  326. net_send_onesized(struct peer *p, char type)
  327. {
  328. struct iob_link *out;
  329. out = malloc_liob(5);
  330. net_write32(out->iob.buf, 1);
  331. out->iob.buf[4] = type;
  332. net_send(p, out);
  333. }
  334. void
  335. net_send_unchoke(struct peer *p)
  336. {
  337. net_send_onesized(p, MSG_UNCHOKE);
  338. }
  339. void
  340. net_send_choke(struct peer *p)
  341. {
  342. net_send_onesized(p, MSG_CHOKE);
  343. }
  344. void
  345. net_send_uninterest(struct peer *p)
  346. {
  347. net_send_onesized(p, MSG_UNINTEREST);
  348. }
  349. void
  350. net_send_interest(struct peer *p)
  351. {
  352. net_send_onesized(p, MSG_INTEREST);
  353. }
  354. void
  355. net_send_bitfield(struct peer *p)
  356. {
  357. struct iob_link *out;
  358. uint32_t plen;
  359. plen = (uint32_t)ceil(p->tp->meta.npieces / 8.0);
  360. out = malloc_liob(5);
  361. net_write32(out->iob.buf, plen + 1);
  362. out->iob.buf[4] = MSG_BITFIELD;
  363. net_send(p, out);
  364. out = salloc_liob(p->tp->piece_field, plen, nokill_iob);
  365. net_send(p, out);
  366. }
  367. void
  368. net_send_shake(struct peer *p)
  369. {
  370. struct iob_link *out;
  371. out = malloc_liob(68);
  372. bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->iob.buf, 28);
  373. bcopy(p->tp->meta.info_hash, out->iob.buf + 28, 20);
  374. bcopy(btpd.peer_id, out->iob.buf + 48, 20);
  375. net_send(p, out);
  376. if (p->tp->have_npieces > 0)
  377. net_send_bitfield(p);
  378. }
  379. static void
  380. kill_generic(struct input_reader *reader)
  381. {
  382. free(reader);
  383. }
  384. static size_t
  385. net_read(struct peer *p, char *buf, size_t len)
  386. {
  387. ssize_t nread = read(p->sd, buf, len);
  388. if (nread < 0) {
  389. if (errno == EAGAIN) {
  390. event_add(&p->in_ev, NULL);
  391. return 0;
  392. } else {
  393. btpd_log(BTPD_L_CONN, "read error: %s\n", strerror(errno));
  394. peer_kill(p);
  395. return 0;
  396. }
  397. } else if (nread == 0) {
  398. btpd_log(BTPD_L_CONN, "conn closed by other side.\n");
  399. if (!BTPDQ_EMPTY(&p->outq))
  400. p->flags |= PF_WRITE_CLOSE;
  401. else
  402. peer_kill(p);
  403. return 0;
  404. } else
  405. return nread;
  406. }
  407. static size_t
  408. net_read_to_buf(struct peer *p, struct io_buffer *iob, unsigned long rmax)
  409. {
  410. if (rmax == 0)
  411. rmax = iob->buf_len - iob->buf_off;
  412. else
  413. rmax = min(rmax, iob->buf_len - iob->buf_off);
  414. assert(rmax > 0);
  415. size_t nread = net_read(p, iob->buf + iob->buf_off, rmax);
  416. if (nread > 0)
  417. iob->buf_off += nread;
  418. return nread;
  419. }
  420. void
  421. kill_bitfield(struct input_reader *rd)
  422. {
  423. free(rd);
  424. }
  425. static void net_generic_reader(struct peer *p);
  426. static unsigned long
  427. read_bitfield(struct peer *p, unsigned long rmax)
  428. {
  429. struct bitfield_reader *rd = (struct bitfield_reader *)p->reader;
  430. size_t nread = net_read_to_buf(p, &rd->iob, rmax);
  431. if (nread == 0)
  432. return 0;
  433. if (rd->iob.buf_off == rd->iob.buf_len) {
  434. peer_on_bitfield(p, rd->iob.buf);
  435. free(rd);
  436. net_generic_reader(p);
  437. } else
  438. event_add(&p->in_ev, NULL);
  439. return nread;
  440. }
  441. void
  442. kill_piece(struct input_reader *rd)
  443. {
  444. free(rd);
  445. }
  446. static unsigned long
  447. read_piece(struct peer *p, unsigned long rmax)
  448. {
  449. struct piece_reader *rd = (struct piece_reader *)p->reader;
  450. size_t nread = net_read_to_buf(p, &rd->iob, rmax);
  451. if (nread == 0)
  452. return 0;
  453. p->rate_to_me[btpd.seconds % RATEHISTORY] += nread;
  454. p->tp->downloaded += nread;
  455. if (rd->iob.buf_off == rd->iob.buf_len) {
  456. peer_on_piece(p, rd->index, rd->begin, rd->iob.buf_len, rd->iob.buf);
  457. free(rd);
  458. net_generic_reader(p);
  459. } else
  460. event_add(&p->in_ev, NULL);
  461. return nread;
  462. }
  463. #define GRBUFLEN (1 << 15)
  464. static unsigned long
  465. net_generic_read(struct peer *p, unsigned long rmax)
  466. {
  467. char buf[GRBUFLEN];
  468. struct io_buffer iob = { 0, GRBUFLEN, buf };
  469. struct generic_reader *gr = (struct generic_reader *)p->reader;
  470. size_t nread;
  471. size_t off, len;
  472. int got_part;
  473. if (gr->iob.buf_off > 0) {
  474. iob.buf_off = gr->iob.buf_off;
  475. bcopy(gr->iob.buf, iob.buf, iob.buf_off);
  476. gr->iob.buf_off = 0;
  477. }
  478. if ((nread = net_read_to_buf(p, &iob, rmax)) == 0)
  479. return 0;
  480. len = iob.buf_off;
  481. off = 0;
  482. got_part = 0;
  483. while (!got_part && len - off >= 4) {
  484. size_t msg_len = net_read32(buf + off);
  485. if (msg_len == 0) { /* Keep alive */
  486. off += 4;
  487. continue;
  488. }
  489. if (len - off < 5) {
  490. got_part = 1;
  491. break;
  492. }
  493. switch (buf[off + 4]) {
  494. case MSG_CHOKE:
  495. btpd_log(BTPD_L_MSG, "choke.\n");
  496. if (msg_len != 1)
  497. goto bad_data;
  498. peer_on_choke(p);
  499. break;
  500. case MSG_UNCHOKE:
  501. btpd_log(BTPD_L_MSG, "unchoke.\n");
  502. if (msg_len != 1)
  503. goto bad_data;
  504. peer_on_unchoke(p);
  505. break;
  506. case MSG_INTEREST:
  507. btpd_log(BTPD_L_MSG, "interested.\n");
  508. if (msg_len != 1)
  509. goto bad_data;
  510. peer_on_interest(p);
  511. break;
  512. case MSG_UNINTEREST:
  513. btpd_log(BTPD_L_MSG, "uninterested.\n");
  514. if (msg_len != 1)
  515. goto bad_data;
  516. peer_on_uninterest(p);
  517. break;
  518. case MSG_HAVE:
  519. btpd_log(BTPD_L_MSG, "have.\n");
  520. if (msg_len != 5)
  521. goto bad_data;
  522. else if (len - off >= msg_len + 4) {
  523. uint32_t index = net_read32(buf + off + 5);
  524. peer_on_have(p, index);
  525. } else
  526. got_part = 1;
  527. break;
  528. case MSG_BITFIELD:
  529. btpd_log(BTPD_L_MSG, "bitfield.\n");
  530. if (msg_len != (size_t)ceil(p->tp->meta.npieces / 8.0) + 1)
  531. goto bad_data;
  532. else if (p->npieces != 0)
  533. goto bad_data;
  534. else if (len - off >= msg_len + 4)
  535. peer_on_bitfield(p, buf + off + 5);
  536. else {
  537. struct bitfield_reader *rp;
  538. size_t mem = sizeof(*rp) + msg_len - 1;
  539. p->reader->kill(p->reader);
  540. rp = btpd_calloc(1, mem);
  541. rp->rd.kill = kill_bitfield;
  542. rp->rd.read = read_bitfield;
  543. rp->iob.buf = (char *)rp + sizeof(*rp);
  544. rp->iob.buf_len = msg_len - 1;
  545. rp->iob.buf_off = len - off - 5;
  546. bcopy(buf + off + 5, rp->iob.buf, rp->iob.buf_off);
  547. p->reader = (struct input_reader *)rp;
  548. event_add(&p->in_ev, NULL);
  549. return nread;
  550. }
  551. break;
  552. case MSG_REQUEST:
  553. btpd_log(BTPD_L_MSG, "request.\n");
  554. if (msg_len != 13)
  555. goto bad_data;
  556. else if (len - off >= msg_len + 4) {
  557. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) != PF_P_WANT)
  558. break;
  559. uint32_t index, begin, length;
  560. index = net_read32(buf + off + 5);
  561. begin = net_read32(buf + off + 9);
  562. length = net_read32(buf + off + 13);
  563. if (length > (1 << 15))
  564. goto bad_data;
  565. if (index >= p->tp->meta.npieces)
  566. goto bad_data;
  567. if (!has_bit(p->tp->piece_field, index))
  568. goto bad_data;
  569. if (begin + length > torrent_piece_size(p->tp, index))
  570. goto bad_data;
  571. peer_on_request(p, index, begin, length);
  572. } else
  573. got_part = 1;
  574. break;
  575. case MSG_PIECE:
  576. btpd_log(BTPD_L_MSG, "piece.\n");
  577. if (msg_len < 10)
  578. goto bad_data;
  579. else if (len - off >= 13) {
  580. uint32_t index = net_read32(buf + off + 5);
  581. uint32_t begin = net_read32(buf + off + 9);
  582. uint32_t length = msg_len - 9;
  583. if (len - off >= msg_len + 4) {
  584. p->tp->downloaded += length;
  585. p->rate_to_me[btpd.seconds % RATEHISTORY] += length;
  586. peer_on_piece(p, index, begin, length, buf + off + 13);
  587. } else {
  588. struct piece_reader *rp;
  589. size_t mem = sizeof(*rp) + length;
  590. p->reader->kill(p->reader);
  591. rp = btpd_calloc(1, mem);
  592. rp->rd.kill = kill_piece;
  593. rp->rd.read = read_piece;
  594. rp->index = index;
  595. rp->begin = begin;
  596. rp->iob.buf = (char *)rp + sizeof(*rp);
  597. rp->iob.buf_len = length;
  598. rp->iob.buf_off = len - off - 13;
  599. bcopy(buf + off + 13, rp->iob.buf, rp->iob.buf_off);
  600. p->reader = (struct input_reader *)rp;
  601. event_add(&p->in_ev, NULL);
  602. p->tp->downloaded += rp->iob.buf_off;
  603. p->rate_to_me[btpd.seconds % RATEHISTORY] +=
  604. rp->iob.buf_off;
  605. return nread;
  606. }
  607. } else
  608. got_part = 1;
  609. break;
  610. case MSG_CANCEL:
  611. if (msg_len != 13)
  612. goto bad_data;
  613. else if (len - off >= msg_len + 4) {
  614. uint32_t index = net_read32(buf + off + 5);
  615. uint32_t begin = net_read32(buf + off + 9);
  616. uint32_t length = net_read32(buf + off + 13);
  617. if (index > p->tp->meta.npieces)
  618. goto bad_data;
  619. if (begin + length > torrent_piece_size(p->tp, index))
  620. goto bad_data;
  621. btpd_log(BTPD_L_MSG, "cancel: %u, %u, %u\n",
  622. index, begin, length);
  623. peer_on_cancel(p, index, begin, length);
  624. } else
  625. got_part = 1;
  626. break;
  627. default:
  628. goto bad_data;
  629. }
  630. if (!got_part)
  631. off += 4 + msg_len;
  632. }
  633. if (off != len) {
  634. gr->iob.buf_off = len - off;
  635. assert(gr->iob.buf_off <= gr->iob.buf_len);
  636. bcopy(buf + off, gr->iob.buf, gr->iob.buf_off);
  637. }
  638. event_add(&p->in_ev, NULL);
  639. return nread;
  640. bad_data:
  641. btpd_log(BTPD_L_MSG, "bad data\n");
  642. peer_kill(p);
  643. return nread;
  644. }
  645. static void
  646. net_generic_reader(struct peer *p)
  647. {
  648. struct generic_reader *gr;
  649. gr = btpd_calloc(1, sizeof(*gr));
  650. gr->rd.read = net_generic_read;
  651. gr->rd.kill = kill_generic;
  652. gr->iob.buf = gr->_io_buf;
  653. gr->iob.buf_len = MAX_INPUT_LEFT;
  654. gr->iob.buf_off = 0;
  655. p->reader = (struct input_reader *)gr;
  656. event_add(&p->in_ev, NULL);
  657. }
  658. static unsigned long
  659. net_shake_read(struct peer *p, unsigned long rmax)
  660. {
  661. struct handshake *hs = (struct handshake *)p->reader;
  662. struct io_buffer *in = &hs->in;
  663. size_t nread = net_read_to_buf(p, in, rmax);
  664. if (nread == 0)
  665. return 0;
  666. switch (hs->state) {
  667. case SHAKE_INIT:
  668. if (in->buf_off < 20)
  669. break;
  670. else if (bcmp(in->buf, "\x13""BitTorrent protocol", 20) == 0)
  671. hs->state = SHAKE_PSTR;
  672. else
  673. goto bad_shake;
  674. case SHAKE_PSTR:
  675. if (in->buf_off < 28)
  676. break;
  677. else
  678. hs->state = SHAKE_RESERVED;
  679. case SHAKE_RESERVED:
  680. if (in->buf_off < 48)
  681. break;
  682. else if (hs->incoming) {
  683. struct torrent *tp = torrent_get_by_hash(in->buf + 28);
  684. if (tp != NULL) {
  685. hs->state = SHAKE_INFO;
  686. p->tp = tp;
  687. net_send_shake(p);
  688. } else
  689. goto bad_shake;
  690. } else {
  691. if (bcmp(in->buf + 28, p->tp->meta.info_hash, 20) == 0)
  692. hs->state = SHAKE_INFO;
  693. else
  694. goto bad_shake;
  695. }
  696. case SHAKE_INFO:
  697. if (in->buf_off < 68)
  698. break;
  699. else {
  700. if (torrent_has_peer(p->tp, in->buf + 48))
  701. goto bad_shake; // Not really, but we're already connected.
  702. else if (bcmp(in->buf + 48, btpd.peer_id, 20) == 0)
  703. goto bad_shake; // Connection from myself.
  704. bcopy(in->buf + 48, p->id, 20);
  705. hs->state = SHAKE_ID;
  706. }
  707. default:
  708. assert(hs->state == SHAKE_ID);
  709. }
  710. if (hs->state == SHAKE_ID) {
  711. btpd_log(BTPD_L_CONN, "Got whole shake.\n");
  712. free(hs);
  713. p->piece_field = btpd_calloc(1, (int)ceil(p->tp->meta.npieces / 8.0));
  714. cm_on_new_peer(p);
  715. net_generic_reader(p);
  716. } else
  717. event_add(&p->in_ev, NULL);
  718. return nread;
  719. bad_shake:
  720. btpd_log(BTPD_L_CONN, "Bad shake(%d)\n", hs->state);
  721. peer_kill(p);
  722. return nread;
  723. }
  724. void
  725. net_handshake(struct peer *p, int incoming)
  726. {
  727. struct handshake *hs;
  728. hs = calloc(1, sizeof(*hs));
  729. hs->incoming = incoming;
  730. hs->state = SHAKE_INIT;
  731. hs->in.buf_len = SHAKE_LEN;
  732. hs->in.buf_off = 0;
  733. hs->in.buf = hs->_io_buf;
  734. p->reader = (struct input_reader *)hs;
  735. hs->rd.read = net_shake_read;
  736. hs->rd.kill = kill_shake;
  737. if (!incoming)
  738. net_send_shake(p);
  739. }
  740. int
  741. net_connect2(struct sockaddr *sa, socklen_t salen, int *sd)
  742. {
  743. if ((*sd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  744. return errno;
  745. set_nonblocking(*sd);
  746. if (connect(*sd, sa, salen) == -1 && errno != EINPROGRESS) {
  747. btpd_log(BTPD_L_CONN, "Botched connection %s.", strerror(errno));
  748. close(*sd);
  749. return errno;
  750. }
  751. return 0;
  752. }
  753. int
  754. net_connect(const char *ip, int port, int *sd)
  755. {
  756. struct addrinfo hints, *res;
  757. char portstr[6];
  758. assert(btpd.npeers < btpd.maxpeers);
  759. if (snprintf(portstr, sizeof(portstr), "%d", port) >= sizeof(portstr))
  760. return EINVAL;
  761. bzero(&hints, sizeof(hints));
  762. hints.ai_family = AF_UNSPEC;
  763. hints.ai_flags = AI_NUMERICHOST;
  764. hints.ai_socktype = SOCK_STREAM;
  765. if (getaddrinfo(ip, portstr, &hints, &res) != 0)
  766. return errno;
  767. int error = net_connect2(res->ai_addr, res->ai_addrlen, sd);
  768. freeaddrinfo(res);
  769. return error;
  770. }
  771. void
  772. net_connection_cb(int sd, short type, void *arg)
  773. {
  774. int nsd;
  775. nsd = accept(sd, NULL, NULL);
  776. if (nsd < 0) {
  777. if (errno == EWOULDBLOCK || errno == ECONNABORTED)
  778. return;
  779. else
  780. btpd_err("accept4: %s\n", strerror(errno));
  781. }
  782. if (set_nonblocking(nsd) != 0) {
  783. close(nsd);
  784. return;
  785. }
  786. assert(btpd.npeers <= btpd.maxpeers);
  787. if (btpd.npeers == btpd.maxpeers) {
  788. close(nsd);
  789. return;
  790. }
  791. peer_create_in(nsd);
  792. btpd_log(BTPD_L_CONN, "got connection.\n");
  793. }
  794. void
  795. net_by_second(void)
  796. {
  797. struct peer *p;
  798. struct torrent *tp;
  799. int ri = btpd.seconds % RATEHISTORY;
  800. BTPDQ_FOREACH(tp, &btpd.cm_list, entry) {
  801. BTPDQ_FOREACH(p, &tp->peers, cm_entry) {
  802. p->rate_to_me[ri] = 0;
  803. p->rate_from_me[ri] = 0;
  804. }
  805. }
  806. }