A clone of btpd with my configuration changes.
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

1044 linhas
24 KiB

  1. #include <sys/types.h>
  2. #include <sys/uio.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <netdb.h>
  6. #include <sys/mman.h>
  7. #include <sys/wait.h>
  8. #include <assert.h>
  9. #include <errno.h>
  10. #include <fcntl.h>
  11. #include <math.h>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <unistd.h>
  16. #include "btpd.h"
  17. #define min(x, y) ((x) <= (y) ? (x) : (y))
  18. static unsigned long
  19. net_write(struct peer *p, unsigned long wmax);
  20. void
  21. net_bw_read_cb(int sd, short type, void *arg)
  22. {
  23. struct peer *p;
  24. struct bwlim *bw = arg;
  25. btpd.ibw_left += bw->count;
  26. unsigned long count = 0;
  27. while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left - count > 0) {
  28. BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
  29. p->flags &= ~PF_ON_READQ;
  30. count += p->reader->read(p, btpd.ibw_left - count);
  31. }
  32. btpd.ibw_left -= count;
  33. BTPDQ_REMOVE(&btpd.bwq, bw, entry);
  34. if (count == 0)
  35. free(bw);
  36. else {
  37. bw->count = count;
  38. event_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  39. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  40. }
  41. }
  42. void
  43. net_read_cb(int sd, short type, void *arg)
  44. {
  45. struct peer *p = (struct peer *)arg;
  46. if (btpd.ibwlim == 0) {
  47. p->reader->read(p, 0);
  48. } else if (btpd.ibw_left > 0) {
  49. unsigned long nread = p->reader->read(p, btpd.ibw_left);
  50. if (nread > 0) {
  51. struct bwlim *bw = btpd_calloc(1, sizeof(*bw));
  52. evtimer_set(&bw->timer, net_bw_read_cb, bw);
  53. evtimer_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  54. bw->count = nread;
  55. btpd.ibw_left -= nread;
  56. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  57. }
  58. } else {
  59. p->flags |= PF_ON_READQ;
  60. BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry);
  61. }
  62. }
  63. void
  64. net_bw_write_cb(int sd, short type, void *arg)
  65. {
  66. struct peer *p;
  67. struct bwlim *bw = arg;
  68. btpd.obw_left += bw->count;
  69. unsigned long count = 0;
  70. while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL && btpd.obw_left - count > 0) {
  71. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  72. p->flags &= ~PF_ON_WRITEQ;
  73. count += net_write(p, btpd.obw_left - count);
  74. }
  75. btpd.obw_left -= count;
  76. BTPDQ_REMOVE(&btpd.bwq, bw, entry);
  77. if (count == 0)
  78. free(bw);
  79. else {
  80. bw->count = count;
  81. event_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  82. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  83. }
  84. }
  85. void
  86. net_write_cb(int sd, short type, void *arg)
  87. {
  88. struct peer *p = (struct peer *)arg;
  89. if (btpd.obwlim == 0) {
  90. net_write(p, 0);
  91. } else if (btpd.obw_left > 0) {
  92. unsigned long nw = net_write(p, btpd.obw_left);
  93. if (nw > 0) {
  94. struct bwlim *bw = btpd_calloc(1, sizeof(*bw));
  95. evtimer_set(&bw->timer, net_bw_write_cb, bw);
  96. evtimer_add(&bw->timer, (& (struct timeval) { 1, 0 }));
  97. bw->count = nw;
  98. btpd.obw_left -= nw;
  99. BTPDQ_INSERT_TAIL(&btpd.bwq, bw, entry);
  100. }
  101. } else {
  102. p->flags |= PF_ON_WRITEQ;
  103. BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry);
  104. }
  105. }
  106. static void
  107. nokill_iob(struct io_buffer *iob)
  108. {
  109. //Nothing
  110. }
  111. static void
  112. kill_free_buf(struct io_buffer *iob)
  113. {
  114. free(iob->buf);
  115. }
  116. static struct iob_link *
  117. malloc_liob(size_t len)
  118. {
  119. struct iob_link *iol;
  120. iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol) + len);
  121. iol->iob.buf = (char *)(iol + 1);
  122. iol->iob.buf_len = len;
  123. iol->iob.buf_off = 0;
  124. iol->kill_buf = nokill_iob;
  125. return iol;
  126. }
  127. static struct iob_link *
  128. salloc_liob(char *buf, size_t len, void (*kill_buf)(struct io_buffer *))
  129. {
  130. struct iob_link *iol;
  131. iol = (struct iob_link *)btpd_calloc(1, sizeof(*iol));
  132. iol->iob.buf = buf;
  133. iol->iob.buf_len = len;
  134. iol->iob.buf_off = 0;
  135. iol->kill_buf = kill_buf;
  136. return iol;
  137. }
  138. void
  139. net_unsend_piece(struct peer *p, struct piece_req *req)
  140. {
  141. struct iob_link *piece;
  142. BTPDQ_REMOVE(&p->p_reqs, req, entry);
  143. piece = BTPDQ_NEXT(req->head, entry);
  144. BTPDQ_REMOVE(&p->outq, piece, entry);
  145. piece->kill_buf(&piece->iob);
  146. free(piece);
  147. BTPDQ_REMOVE(&p->outq, req->head, entry);
  148. req->head->kill_buf(&req->head->iob);
  149. free(req->head);
  150. free(req);
  151. if (BTPDQ_EMPTY(&p->outq)) {
  152. if (p->flags & PF_ON_WRITEQ) {
  153. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  154. p->flags &= ~PF_ON_WRITEQ;
  155. } else
  156. event_del(&p->out_ev);
  157. }
  158. }
  159. void
  160. kill_shake(struct input_reader *reader)
  161. {
  162. free(reader);
  163. }
  164. #define NIOV 16
  165. static unsigned long
  166. net_write(struct peer *p, unsigned long wmax)
  167. {
  168. struct iob_link *iol;
  169. struct piece_req *req;
  170. struct iovec iov[NIOV];
  171. int niov;
  172. int limited;
  173. ssize_t nwritten;
  174. unsigned long bcount;
  175. limited = wmax > 0;
  176. niov = 0;
  177. assert((iol = BTPDQ_FIRST(&p->outq)) != NULL);
  178. while (niov < NIOV && iol != NULL
  179. && (!limited || (limited && wmax > 0))) {
  180. iov[niov].iov_base = iol->iob.buf + iol->iob.buf_off;
  181. iov[niov].iov_len = iol->iob.buf_len - iol->iob.buf_off;
  182. if (limited) {
  183. if (iov[niov].iov_len > wmax)
  184. iov[niov].iov_len = wmax;
  185. wmax -= iov[niov].iov_len;
  186. }
  187. niov++;
  188. iol = BTPDQ_NEXT(iol, entry);
  189. }
  190. again:
  191. nwritten = writev(p->sd, iov, niov);
  192. if (nwritten < 0) {
  193. if (errno == EINTR)
  194. goto again;
  195. else if (errno == EAGAIN) {
  196. event_add(&p->out_ev, NULL);
  197. return 0;
  198. } else {
  199. btpd_log(BTPD_L_CONN, "write error: %s\n", strerror(errno));
  200. peer_kill(p);
  201. return 0;
  202. }
  203. }
  204. bcount = nwritten;
  205. req = BTPDQ_FIRST(&p->p_reqs);
  206. iol = BTPDQ_FIRST(&p->outq);
  207. while (bcount > 0) {
  208. unsigned long bufdelta = iol->iob.buf_len - iol->iob.buf_off;
  209. if (req != NULL && req->head == iol) {
  210. struct piece_req *next = BTPDQ_NEXT(req, entry);
  211. BTPDQ_REMOVE(&p->p_reqs, req, entry);
  212. free(req);
  213. req = next;
  214. }
  215. if (bcount >= bufdelta) {
  216. if (iol->upload) {
  217. p->tp->uploaded += bufdelta;
  218. p->rate_from_me[btpd.seconds % RATEHISTORY] += bufdelta;
  219. }
  220. bcount -= bufdelta;
  221. BTPDQ_REMOVE(&p->outq, iol, entry);
  222. iol->kill_buf(&iol->iob);
  223. free(iol);
  224. iol = BTPDQ_FIRST(&p->outq);
  225. } else {
  226. if (iol->upload) {
  227. p->tp->uploaded += bcount;
  228. p->rate_from_me[btpd.seconds % RATEHISTORY] += bcount;
  229. }
  230. iol->iob.buf_off += bcount;
  231. bcount = 0;
  232. }
  233. }
  234. if (!BTPDQ_EMPTY(&p->outq))
  235. event_add(&p->out_ev, NULL);
  236. else if (p->flags & PF_WRITE_CLOSE) {
  237. btpd_log(BTPD_L_CONN, "Closed because of write flag.\n");
  238. peer_kill(p);
  239. }
  240. return nwritten;
  241. }
  242. void
  243. net_send(struct peer *p, struct iob_link *iol)
  244. {
  245. if (BTPDQ_EMPTY(&p->outq))
  246. event_add(&p->out_ev, NULL);
  247. BTPDQ_INSERT_TAIL(&p->outq, iol, entry);
  248. }
  249. void
  250. net_write32(void *buf, uint32_t num)
  251. {
  252. *(uint32_t *)buf = htonl(num);
  253. }
  254. uint32_t
  255. net_read32(void *buf)
  256. {
  257. return ntohl(*(uint32_t *)buf);
  258. }
  259. void
  260. net_send_piece(struct peer *p, uint32_t index, uint32_t begin,
  261. char *block, size_t blen)
  262. {
  263. struct iob_link *head, *piece;
  264. struct piece_req *req;
  265. btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen);
  266. head = malloc_liob(13);
  267. net_write32(head->iob.buf, 9 + blen);
  268. head->iob.buf[4] = MSG_PIECE;
  269. net_write32(head->iob.buf + 5, index);
  270. net_write32(head->iob.buf + 9, begin);
  271. net_send(p, head);
  272. piece = salloc_liob(block, blen, kill_free_buf);
  273. piece->upload = 1;
  274. net_send(p, piece);
  275. req = btpd_malloc(sizeof(*req));
  276. req->index = index;
  277. req->begin = begin;
  278. req->length = blen;
  279. req->head = head;
  280. BTPDQ_INSERT_TAIL(&p->p_reqs, req, entry);
  281. }
  282. void
  283. net_send_request(struct peer *p, struct piece_req *req)
  284. {
  285. struct iob_link *out;
  286. out = malloc_liob(17);
  287. net_write32(out->iob.buf, 13);
  288. out->iob.buf[4] = MSG_REQUEST;
  289. net_write32(out->iob.buf + 5, req->index);
  290. net_write32(out->iob.buf + 9, req->begin);
  291. net_write32(out->iob.buf + 13, req->length);
  292. net_send(p, out);
  293. }
  294. void
  295. net_send_cancel(struct peer *p, struct piece_req *req)
  296. {
  297. struct iob_link *out;
  298. out = malloc_liob(17);
  299. net_write32(out->iob.buf, 13);
  300. out->iob.buf[4] = MSG_CANCEL;
  301. net_write32(out->iob.buf + 5, req->index);
  302. net_write32(out->iob.buf + 9, req->begin);
  303. net_write32(out->iob.buf + 13, req->length);
  304. net_send(p, out);
  305. }
  306. void
  307. net_send_have(struct peer *p, uint32_t index)
  308. {
  309. struct iob_link *out;
  310. out = malloc_liob(9);
  311. net_write32(out->iob.buf, 5);
  312. out->iob.buf[4] = MSG_HAVE;
  313. net_write32(out->iob.buf + 5, index);
  314. net_send(p, out);
  315. }
  316. void
  317. net_send_onesized(struct peer *p, char type)
  318. {
  319. struct iob_link *out;
  320. out = malloc_liob(5);
  321. net_write32(out->iob.buf, 1);
  322. out->iob.buf[4] = type;
  323. net_send(p, out);
  324. }
  325. void
  326. net_send_unchoke(struct peer *p)
  327. {
  328. net_send_onesized(p, MSG_UNCHOKE);
  329. }
  330. void
  331. net_send_choke(struct peer *p)
  332. {
  333. net_send_onesized(p, MSG_CHOKE);
  334. }
  335. void
  336. net_send_uninterest(struct peer *p)
  337. {
  338. net_send_onesized(p, MSG_UNINTEREST);
  339. }
  340. void
  341. net_send_interest(struct peer *p)
  342. {
  343. net_send_onesized(p, MSG_INTEREST);
  344. }
  345. void
  346. net_send_bitfield(struct peer *p)
  347. {
  348. struct iob_link *out;
  349. uint32_t plen;
  350. plen = (uint32_t)ceil(p->tp->meta.npieces / 8.0);
  351. out = malloc_liob(5);
  352. net_write32(out->iob.buf, plen + 1);
  353. out->iob.buf[4] = MSG_BITFIELD;
  354. net_send(p, out);
  355. out = salloc_liob(p->tp->piece_field, plen, nokill_iob);
  356. net_send(p, out);
  357. }
  358. void
  359. net_send_shake(struct peer *p)
  360. {
  361. struct iob_link *out;
  362. out = malloc_liob(68);
  363. bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->iob.buf, 28);
  364. bcopy(p->tp->meta.info_hash, out->iob.buf + 28, 20);
  365. bcopy(btpd.peer_id, out->iob.buf + 48, 20);
  366. net_send(p, out);
  367. if (p->tp->have_npieces > 0)
  368. net_send_bitfield(p);
  369. }
  370. static void
  371. kill_generic(struct input_reader *reader)
  372. {
  373. free(reader);
  374. }
  375. static size_t
  376. net_read(struct peer *p, char *buf, size_t len)
  377. {
  378. ssize_t nread = read(p->sd, buf, len);
  379. if (nread < 0) {
  380. if (errno == EINTR || errno == EAGAIN) {
  381. event_add(&p->in_ev, NULL);
  382. return 0;
  383. } else {
  384. btpd_log(BTPD_L_CONN, "read error: %s\n", strerror(errno));
  385. peer_kill(p);
  386. return 0;
  387. }
  388. } else if (nread == 0) {
  389. btpd_log(BTPD_L_CONN, "conn closed by other side.\n");
  390. if (!BTPDQ_EMPTY(&p->outq))
  391. p->flags |= PF_WRITE_CLOSE;
  392. else
  393. peer_kill(p);
  394. return 0;
  395. } else
  396. return nread;
  397. }
  398. void
  399. kill_bitfield(struct input_reader *rd)
  400. {
  401. free(rd);
  402. }
  403. static void net_generic_reader(struct peer *p);
  404. static unsigned long
  405. read_bitfield(struct peer *p, unsigned long rmax)
  406. {
  407. ssize_t nread;
  408. struct bitfield_reader *rd = (struct bitfield_reader *)p->reader;
  409. if (rmax == 0)
  410. rmax = rd->iob.buf_len - rd->iob.buf_off;
  411. else
  412. rmax = min(rmax, rd->iob.buf_len - rd->iob.buf_off);
  413. if ((nread = net_read(p, rd->iob.buf + rd->iob.buf_off, rmax)) == 0)
  414. return 0;
  415. rd->iob.buf_off += nread;
  416. if (rd->iob.buf_off == rd->iob.buf_len) {
  417. bcopy(rd->iob.buf, p->piece_field, rd->iob.buf_len);
  418. for (unsigned i = 0; i < p->tp->meta.npieces; i++)
  419. if (has_bit(p->piece_field, i)) {
  420. p->npieces++;
  421. cm_on_piece_ann(p, i);
  422. }
  423. free(rd);
  424. net_generic_reader(p);
  425. } else
  426. event_add(&p->in_ev, NULL);
  427. return nread;
  428. }
  429. void
  430. kill_piece(struct input_reader *rd)
  431. {
  432. free(rd);
  433. }
  434. static unsigned long
  435. read_piece(struct peer *p, unsigned long rmax)
  436. {
  437. ssize_t nread;
  438. struct piece_reader *rd = (struct piece_reader *)p->reader;
  439. if (rmax == 0)
  440. rmax = rd->iob.buf_len - rd->iob.buf_off;
  441. else
  442. rmax = min(rmax, rd->iob.buf_len - rd->iob.buf_off);
  443. if ((nread = net_read(p, rd->iob.buf + rd->iob.buf_off, rmax)) == 0)
  444. return 0;
  445. rd->iob.buf_off += nread;
  446. p->rate_to_me[btpd.seconds % RATEHISTORY] += nread;
  447. p->tp->downloaded += nread;
  448. if (rd->iob.buf_off == rd->iob.buf_len) {
  449. struct piece_req *req = BTPDQ_FIRST(&p->my_reqs);
  450. if (req != NULL &&
  451. req->index == rd->index &&
  452. req->begin == rd->begin &&
  453. req->length == rd->iob.buf_len) {
  454. //
  455. off_t cbegin = rd->index * p->tp->meta.piece_length + rd->begin;
  456. torrent_put_bytes(p->tp, rd->iob.buf, cbegin, rd->iob.buf_len);
  457. cm_on_block(p);
  458. }
  459. free(rd);
  460. net_generic_reader(p);
  461. } else
  462. event_add(&p->in_ev, NULL);
  463. return nread;
  464. }
  465. #define GRBUFLEN (1 << 15)
  466. static unsigned long
  467. net_generic_read(struct peer *p, unsigned long rmax)
  468. {
  469. char buf[GRBUFLEN];
  470. struct generic_reader *gr = (struct generic_reader *)p->reader;
  471. ssize_t nread;
  472. size_t off, len;
  473. int got_part;
  474. len = 0;
  475. if (gr->iob.buf_off > 0) {
  476. len = gr->iob.buf_off;
  477. gr->iob.buf_off = 0;
  478. bcopy(gr->iob.buf, buf, len);
  479. }
  480. if (rmax == 0)
  481. rmax = GRBUFLEN - len;
  482. else
  483. rmax = min(rmax, GRBUFLEN - len);
  484. if ((nread = net_read(p, buf + len, rmax)) == 0)
  485. return 0;
  486. len += nread;
  487. off = 0;
  488. got_part = 0;
  489. while (!got_part && len - off >= 4) {
  490. size_t msg_len = net_read32(buf + off);
  491. if (msg_len == 0) { /* Keep alive */
  492. off += 4;
  493. continue;
  494. }
  495. if (len - off < 5) {
  496. got_part = 1;
  497. break;
  498. }
  499. switch (buf[off + 4]) {
  500. case MSG_CHOKE:
  501. btpd_log(BTPD_L_MSG, "choke.\n");
  502. if (msg_len != 1)
  503. goto bad_data;
  504. if ((p->flags & (PF_P_CHOKE|PF_I_WANT)) == PF_I_WANT) {
  505. p->flags |= PF_P_CHOKE;
  506. cm_on_undownload(p);
  507. } else
  508. p->flags |= PF_P_CHOKE;
  509. break;
  510. case MSG_UNCHOKE:
  511. btpd_log(BTPD_L_MSG, "unchoke.\n");
  512. if (msg_len != 1)
  513. goto bad_data;
  514. if ((p->flags & (PF_P_CHOKE|PF_I_WANT))
  515. == (PF_P_CHOKE|PF_I_WANT)) {
  516. p->flags &= ~PF_P_CHOKE;
  517. cm_on_download(p);
  518. } else
  519. p->flags &= ~PF_P_CHOKE;
  520. break;
  521. case MSG_INTEREST:
  522. btpd_log(BTPD_L_MSG, "interested.\n");
  523. if (msg_len != 1)
  524. goto bad_data;
  525. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == 0) {
  526. p->flags |= PF_P_WANT;
  527. cm_on_upload(p);
  528. } else
  529. p->flags |= PF_P_WANT;
  530. break;
  531. case MSG_UNINTEREST:
  532. btpd_log(BTPD_L_MSG, "uninterested.\n");
  533. if (msg_len != 1)
  534. goto bad_data;
  535. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) {
  536. p->flags &= ~PF_P_WANT;
  537. cm_on_unupload(p);
  538. } else
  539. p->flags &= ~PF_P_WANT;
  540. break;
  541. case MSG_HAVE:
  542. btpd_log(BTPD_L_MSG, "have.\n");
  543. if (msg_len != 5)
  544. goto bad_data;
  545. else if (len - off >= msg_len + 4) {
  546. unsigned long piece = net_read32(buf + off + 5);
  547. if (!has_bit(p->piece_field, piece)) {
  548. set_bit(p->piece_field, piece);
  549. p->npieces++;
  550. cm_on_piece_ann(p, piece);
  551. }
  552. } else
  553. got_part = 1;
  554. break;
  555. case MSG_BITFIELD:
  556. btpd_log(BTPD_L_MSG, "bitfield.\n");
  557. if (msg_len != (size_t)ceil(p->tp->meta.npieces / 8.0) + 1)
  558. goto bad_data;
  559. else if (p->npieces != 0)
  560. goto bad_data;
  561. else if (len - off >= msg_len + 4) {
  562. bcopy(buf + off + 5, p->piece_field, msg_len - 1);
  563. for (unsigned i = 0; i < p->tp->meta.npieces; i++)
  564. if (has_bit(p->piece_field, i)) {
  565. p->npieces++;
  566. cm_on_piece_ann(p, i);
  567. }
  568. } else {
  569. struct bitfield_reader *rp;
  570. size_t mem = sizeof(*rp) + msg_len - 1;
  571. p->reader->kill(p->reader);
  572. rp = btpd_calloc(1, mem);
  573. rp->rd.kill = kill_bitfield;
  574. rp->rd.read = read_bitfield;
  575. rp->iob.buf = (char *)rp + sizeof(*rp);
  576. rp->iob.buf_len = msg_len - 1;
  577. rp->iob.buf_off = len - off - 5;
  578. bcopy(buf + off + 5, rp->iob.buf, rp->iob.buf_off);
  579. p->reader = (struct input_reader *)rp;
  580. event_add(&p->in_ev, NULL);
  581. return nread;
  582. }
  583. break;
  584. case MSG_REQUEST:
  585. btpd_log(BTPD_L_MSG, "request.\n");
  586. if (msg_len != 13)
  587. goto bad_data;
  588. else if (len - off >= msg_len + 4) {
  589. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) != PF_P_WANT)
  590. break;
  591. uint32_t index, begin, length;
  592. off_t cbegin;
  593. char *content;
  594. index = net_read32(buf + off + 5);
  595. begin = net_read32(buf + off + 9);
  596. length = net_read32(buf + off + 13);
  597. if (length > (1 << 15))
  598. goto bad_data;
  599. if (index >= p->tp->meta.npieces
  600. || !has_bit(p->tp->piece_field, index))
  601. goto bad_data;
  602. if (begin + length > p->tp->meta.piece_length)
  603. goto bad_data;
  604. cbegin = index * p->tp->meta.piece_length + begin;
  605. if (cbegin + length > p->tp->meta.total_length)
  606. goto bad_data;
  607. content = torrent_get_bytes(p->tp, cbegin, length);
  608. net_send_piece(p, index, begin, content, length);
  609. } else
  610. got_part = 1;
  611. break;
  612. case MSG_PIECE:
  613. btpd_log(BTPD_L_MSG, "piece.\n");
  614. if (msg_len < 10)
  615. goto bad_data;
  616. else if (len - off >= 13) {
  617. uint32_t index = net_read32(buf + off + 5);
  618. uint32_t begin = net_read32(buf + off + 9);
  619. uint32_t length = msg_len - 9;
  620. #if 0
  621. struct piece_req *req = BTPDQ_FIRST(&p->my_reqs);
  622. if (req == NULL)
  623. goto bad_data;
  624. if (!(index == req->index &&
  625. begin == req->begin &&
  626. length == req->length))
  627. goto bad_data;
  628. #endif
  629. if (len - off >= msg_len + 4) {
  630. off_t cbegin = index * p->tp->meta.piece_length + begin;
  631. p->tp->downloaded += length;
  632. p->rate_to_me[btpd.seconds % RATEHISTORY] += length;
  633. struct piece_req *req = BTPDQ_FIRST(&p->my_reqs);
  634. if (req != NULL &&
  635. req->index == index &&
  636. req->begin == begin &&
  637. req->length == length) {
  638. //
  639. torrent_put_bytes(p->tp, buf + off + 13, cbegin, length);
  640. cm_on_block(p);
  641. }
  642. } else {
  643. struct piece_reader *rp;
  644. size_t mem = sizeof(*rp) + length;
  645. p->reader->kill(p->reader);
  646. rp = btpd_calloc(1, mem);
  647. rp->rd.kill = kill_piece;
  648. rp->rd.read = read_piece;
  649. rp->index = index;
  650. rp->begin = begin;
  651. rp->iob.buf = (char *)rp + sizeof(*rp);
  652. rp->iob.buf_len = length;
  653. rp->iob.buf_off = len - off - 13;
  654. bcopy(buf + off + 13, rp->iob.buf, rp->iob.buf_off);
  655. p->reader = (struct input_reader *)rp;
  656. event_add(&p->in_ev, NULL);
  657. p->tp->downloaded += rp->iob.buf_off;
  658. p->rate_to_me[btpd.seconds % RATEHISTORY] +=
  659. rp->iob.buf_off;
  660. return nread;
  661. }
  662. } else
  663. got_part = 1;
  664. break;
  665. case MSG_CANCEL:
  666. if (msg_len != 13)
  667. goto bad_data;
  668. else if (len - off >= msg_len + 4) {
  669. struct piece_req *req;
  670. uint32_t index, begin, length;
  671. index = net_read32(buf + off + 5);
  672. begin = net_read32(buf + off + 9);
  673. length = net_read32(buf + off + 13);
  674. btpd_log(BTPD_L_MSG, "cancel: %u, %u, %u\n",
  675. index, begin, length);
  676. req = BTPDQ_FIRST(&p->p_reqs);
  677. while (req != NULL) {
  678. if (req->index == index &&
  679. req->begin == begin &&
  680. req->length == length) {
  681. btpd_log(BTPD_L_MSG, "cancel matched.\n");
  682. net_unsend_piece(p, req);
  683. break;
  684. }
  685. req = BTPDQ_NEXT(req, entry);
  686. }
  687. } else
  688. got_part = 1;
  689. break;
  690. default:
  691. goto bad_data;
  692. }
  693. if (!got_part)
  694. off += 4 + msg_len;
  695. }
  696. if (off != len) {
  697. gr->iob.buf_off = len - off;
  698. bcopy(buf + off, gr->iob.buf, gr->iob.buf_off);
  699. }
  700. event_add(&p->in_ev, NULL);
  701. return nread;
  702. bad_data:
  703. btpd_log(BTPD_L_MSG, "bad data\n");
  704. peer_kill(p);
  705. return nread;
  706. }
  707. static void
  708. net_generic_reader(struct peer *p)
  709. {
  710. struct generic_reader *gr;
  711. gr = btpd_calloc(1, sizeof(*gr));
  712. gr->rd.read = net_generic_read;
  713. gr->rd.kill = kill_generic;
  714. gr->iob.buf = gr->_io_buf;
  715. gr->iob.buf_len = MAX_INPUT_LEFT;
  716. gr->iob.buf_off = 0;
  717. p->reader = (struct input_reader *)gr;
  718. event_add(&p->in_ev, NULL);
  719. }
  720. static unsigned long
  721. net_shake_read(struct peer *p, unsigned long rmax)
  722. {
  723. ssize_t nread;
  724. struct handshake *hs = (struct handshake *)p->reader;
  725. struct io_buffer *in = &hs->in;
  726. if (rmax == 0)
  727. rmax = in->buf_len - in->buf_off;
  728. else
  729. rmax = min(rmax, in->buf_len - in->buf_off);
  730. nread = net_read(p, in->buf + in->buf_off, rmax);
  731. if (nread == 0)
  732. return 0;
  733. in->buf_off += nread;
  734. switch (hs->state) {
  735. case SHAKE_INIT:
  736. if (in->buf_off < 20)
  737. break;
  738. else if (bcmp(in->buf, "\x13""BitTorrent protocol", 20) == 0)
  739. hs->state = SHAKE_PSTR;
  740. else
  741. goto bad_shake;
  742. case SHAKE_PSTR:
  743. if (in->buf_off < 28)
  744. break;
  745. else
  746. hs->state = SHAKE_RESERVED;
  747. #if 0
  748. else if (bcmp(in->buf + 20, "\0\0\0\0\0\0\0\0", 8) == 0)
  749. hs->state = SHAKE_RESERVED;
  750. else
  751. goto bad_shake;
  752. #endif
  753. case SHAKE_RESERVED:
  754. if (in->buf_off < 48)
  755. break;
  756. else if (hs->incoming) {
  757. struct torrent *tp = torrent_get_by_hash(in->buf + 28);
  758. #if 0
  759. tp = BTPDQ_FIRST(&btpd.cm_list);
  760. while (tp != NULL) {
  761. if (bcmp(in->buf + 28, tp->meta.info_hash, 20) == 0)
  762. break;
  763. else
  764. tp = BTPDQ_NEXT(tp, entry);
  765. }
  766. #endif
  767. if (tp != NULL) {
  768. hs->state = SHAKE_INFO;
  769. p->tp = tp;
  770. net_send_shake(p);
  771. } else
  772. goto bad_shake;
  773. } else {
  774. if (bcmp(in->buf + 28, p->tp->meta.info_hash, 20) == 0)
  775. hs->state = SHAKE_INFO;
  776. else
  777. goto bad_shake;
  778. }
  779. case SHAKE_INFO:
  780. if (in->buf_off < 68)
  781. break;
  782. else {
  783. if (torrent_has_peer(p->tp, in->buf + 48))
  784. goto bad_shake; // Not really, but we're already connected.
  785. else if (bcmp(in->buf + 48, btpd.peer_id, 20) == 0)
  786. goto bad_shake; // Connection from myself.
  787. bcopy(in->buf + 48, p->id, 20);
  788. hs->state = SHAKE_ID;
  789. }
  790. default:
  791. assert(hs->state == SHAKE_ID);
  792. }
  793. if (hs->state == SHAKE_ID) {
  794. btpd_log(BTPD_L_CONN, "Got whole shake.\n");
  795. free(hs);
  796. p->piece_field = btpd_calloc(1, (int)ceil(p->tp->meta.npieces / 8.0));
  797. cm_on_new_peer(p);
  798. net_generic_reader(p);
  799. } else
  800. event_add(&p->in_ev, NULL);
  801. return nread;
  802. bad_shake:
  803. btpd_log(BTPD_L_CONN, "Bad shake(%d)\n", hs->state);
  804. peer_kill(p);
  805. return nread;
  806. }
  807. void
  808. net_handshake(struct peer *p, int incoming)
  809. {
  810. struct handshake *hs;
  811. hs = calloc(1, sizeof(*hs));
  812. hs->incoming = incoming;
  813. hs->state = SHAKE_INIT;
  814. hs->in.buf_len = SHAKE_LEN;
  815. hs->in.buf_off = 0;
  816. hs->in.buf = hs->_io_buf;
  817. p->reader = (struct input_reader *)hs;
  818. hs->rd.read = net_shake_read;
  819. hs->rd.kill = kill_shake;
  820. if (!incoming)
  821. net_send_shake(p);
  822. }
  823. int
  824. net_connect2(struct sockaddr *sa, socklen_t salen, int *sd)
  825. {
  826. if ((*sd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  827. return errno;
  828. set_nonblocking(*sd);
  829. if (connect(*sd, sa, salen) == -1 && errno != EINPROGRESS) {
  830. btpd_log(BTPD_L_CONN, "Botched connection %s.", strerror(errno));
  831. close(*sd);
  832. return errno;
  833. }
  834. return 0;
  835. }
  836. int
  837. net_connect(const char *ip, int port, int *sd)
  838. {
  839. struct addrinfo hints, *res;
  840. char portstr[6];
  841. assert(btpd.npeers < btpd.maxpeers);
  842. if (snprintf(portstr, sizeof(portstr), "%d", port) >= sizeof(portstr))
  843. return EINVAL;
  844. bzero(&hints, sizeof(hints));
  845. hints.ai_family = AF_UNSPEC;
  846. hints.ai_flags = AI_NUMERICHOST;
  847. hints.ai_socktype = SOCK_STREAM;
  848. if (getaddrinfo(ip, portstr, &hints, &res) != 0)
  849. return errno;
  850. int error = net_connect2(res->ai_addr, res->ai_addrlen, sd);
  851. freeaddrinfo(res);
  852. if (error == 0)
  853. btpd.npeers++;
  854. return error;
  855. }
  856. void
  857. net_connection_cb(int sd, short type, void *arg)
  858. {
  859. int nsd;
  860. nsd = accept(sd, NULL, NULL);
  861. if (nsd < 0) {
  862. if (errno == EWOULDBLOCK || errno == ECONNABORTED || errno == EINTR)
  863. return;
  864. else
  865. btpd_err("accept4: %s\n", strerror(errno));
  866. }
  867. if (set_nonblocking(nsd) != 0) {
  868. close(nsd);
  869. return;
  870. }
  871. assert(btpd.npeers <= btpd.maxpeers);
  872. if (btpd.npeers == btpd.maxpeers) {
  873. close(nsd);
  874. return;
  875. }
  876. btpd.npeers++;
  877. peer_create_in(nsd);
  878. btpd_log(BTPD_L_CONN, "got connection.\n");
  879. }
  880. void
  881. net_by_second(void)
  882. {
  883. struct peer *p;
  884. struct torrent *tp;
  885. int ri = btpd.seconds % RATEHISTORY;
  886. BTPDQ_FOREACH(tp, &btpd.cm_list, entry) {
  887. BTPDQ_FOREACH(p, &tp->peers, cm_entry) {
  888. p->rate_to_me[ri] = 0;
  889. p->rate_from_me[ri] = 0;
  890. }
  891. }
  892. #if 0
  893. btpd.obw_left = btpd.obwlim;
  894. btpd.ibw_left = btpd.ibwlim;
  895. if (btpd.ibwlim > 0) {
  896. while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left > 0) {
  897. BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
  898. p->flags &= ~PF_ON_READQ;
  899. btpd.ibw_left -= p->reader->read(p, btpd.ibw_left);
  900. }
  901. } else {
  902. while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL) {
  903. BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
  904. p->flags &= ~PF_ON_READQ;
  905. p->reader->read(p, 0);
  906. }
  907. }
  908. if (btpd.obwlim) {
  909. while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL && btpd.obw_left > 0) {
  910. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  911. p->flags &= ~PF_ON_WRITEQ;
  912. btpd.obw_left -= net_write(p, btpd.obw_left);
  913. }
  914. } else {
  915. while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL) {
  916. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  917. p->flags &= ~PF_ON_WRITEQ;
  918. net_write(p, 0);
  919. }
  920. }
  921. #endif
  922. }