A clone of btpd with my configuration changes.

515 lines
11 KiB

  1. #include <sys/types.h>
  2. #include <sys/uio.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <netdb.h>
  6. #include <sys/mman.h>
  7. #include <sys/wait.h>
  8. #include <assert.h>
  9. #include <errno.h>
  10. #include <fcntl.h>
  11. #include <math.h>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <unistd.h>
  16. #include "btpd.h"
  17. #ifndef IOV_MAX
  18. #define IOV_MAX 1024
  19. #endif
  20. #define min(x, y) ((x) <= (y) ? (x) : (y))
  21. void
  22. net_write32(void *buf, uint32_t num)
  23. {
  24. *(uint32_t *)buf = htonl(num);
  25. }
  26. uint32_t
  27. net_read32(const void *buf)
  28. {
  29. return ntohl(*(uint32_t *)buf);
  30. }
  31. static unsigned long
  32. net_write(struct peer *p, unsigned long wmax)
  33. {
  34. struct nb_link *nl;
  35. struct iovec iov[IOV_MAX];
  36. int niov;
  37. int limited;
  38. ssize_t nwritten;
  39. unsigned long bcount;
  40. limited = wmax > 0;
  41. niov = 0;
  42. assert((nl = BTPDQ_FIRST(&p->outq)) != NULL);
  43. while ((niov < IOV_MAX && nl != NULL
  44. && (!limited || (limited && wmax > 0)))) {
  45. if (niov > 0) {
  46. iov[niov].iov_base = nl->nb->buf;
  47. iov[niov].iov_len = nl->nb->len;
  48. } else {
  49. iov[niov].iov_base = nl->nb->buf + p->outq_off;
  50. iov[niov].iov_len = nl->nb->len - p->outq_off;
  51. }
  52. if (limited) {
  53. if (iov[niov].iov_len > wmax)
  54. iov[niov].iov_len = wmax;
  55. wmax -= iov[niov].iov_len;
  56. }
  57. niov++;
  58. nl = BTPDQ_NEXT(nl, entry);
  59. }
  60. nwritten = writev(p->sd, iov, niov);
  61. if (nwritten < 0) {
  62. if (errno == EAGAIN) {
  63. event_add(&p->out_ev, WRITE_TIMEOUT);
  64. return 0;
  65. } else {
  66. btpd_log(BTPD_L_CONN, "write error: %s\n", strerror(errno));
  67. peer_kill(p);
  68. return 0;
  69. }
  70. } else if (nwritten == 0) {
  71. btpd_log(BTPD_L_CONN, "connection closed by peer.\n");
  72. peer_kill(p);
  73. return 0;
  74. }
  75. bcount = nwritten;
  76. nl = BTPDQ_FIRST(&p->outq);
  77. while (bcount > 0) {
  78. unsigned long bufdelta = nl->nb->len - p->outq_off;
  79. if (bcount >= bufdelta) {
  80. peer_sent(p, nl->nb);
  81. if (nl->nb->type == NB_TORRENTDATA) {
  82. p->tp->uploaded += bufdelta;
  83. p->rate_from_me[btpd.seconds % RATEHISTORY] += bufdelta;
  84. }
  85. bcount -= bufdelta;
  86. BTPDQ_REMOVE(&p->outq, nl, entry);
  87. nb_drop(nl->nb);
  88. free(nl);
  89. p->outq_off = 0;
  90. nl = BTPDQ_FIRST(&p->outq);
  91. } else {
  92. if (nl->nb->type == NB_TORRENTDATA) {
  93. p->tp->uploaded += bcount;
  94. p->rate_from_me[btpd.seconds % RATEHISTORY] += bcount;
  95. }
  96. p->outq_off += bcount;
  97. bcount = 0;
  98. }
  99. }
  100. if (!BTPDQ_EMPTY(&p->outq))
  101. event_add(&p->out_ev, WRITE_TIMEOUT);
  102. return nwritten;
  103. }
  104. void
  105. net_set_state(struct peer *p, int state, size_t size)
  106. {
  107. p->net.state = state;
  108. p->net.st_bytes = size;
  109. }
  110. static int
  111. net_dispatch_msg(struct peer *p, const char *buf)
  112. {
  113. uint32_t index, begin, length;
  114. int res = 0;
  115. switch (p->net.msg_num) {
  116. case MSG_CHOKE:
  117. peer_on_choke(p);
  118. break;
  119. case MSG_UNCHOKE:
  120. peer_on_unchoke(p);
  121. break;
  122. case MSG_INTEREST:
  123. peer_on_interest(p);
  124. break;
  125. case MSG_UNINTEREST:
  126. peer_on_uninterest(p);
  127. break;
  128. case MSG_HAVE:
  129. peer_on_have(p, net_read32(buf));
  130. break;
  131. case MSG_BITFIELD:
  132. if (p->npieces == 0)
  133. peer_on_bitfield(p, buf);
  134. else
  135. res = 1;
  136. break;
  137. case MSG_REQUEST:
  138. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) {
  139. index = net_read32(buf);
  140. begin = net_read32(buf + 4);
  141. length = net_read32(buf + 8);
  142. if ((length > PIECE_BLOCKLEN
  143. || index >= p->tp->meta.npieces
  144. || !has_bit(p->tp->piece_field, index)
  145. || begin + length > torrent_piece_size(p->tp, index))) {
  146. btpd_log(BTPD_L_MSG, "bad request: (%u, %u, %u) from %p\n",
  147. index, begin, length, p);
  148. res = 1;
  149. break;
  150. }
  151. peer_on_request(p, index, begin, length);
  152. }
  153. break;
  154. case MSG_CANCEL:
  155. index = net_read32(buf);
  156. begin = net_read32(buf + 4);
  157. length = net_read32(buf + 8);
  158. peer_on_cancel(p, index, begin, length);
  159. break;
  160. case MSG_PIECE:
  161. index = net_read32(buf);
  162. begin = net_read32(buf + 4);
  163. length = p->net.msg_len - 9;
  164. peer_on_piece(p, index, begin, length, buf + 8);
  165. break;
  166. default:
  167. abort();
  168. }
  169. return res;
  170. }
  171. static int
  172. net_mh_ok(struct peer *p)
  173. {
  174. uint32_t mlen = p->net.msg_len;
  175. switch (p->net.msg_num) {
  176. case MSG_CHOKE:
  177. case MSG_UNCHOKE:
  178. case MSG_INTEREST:
  179. case MSG_UNINTEREST:
  180. return mlen == 1;
  181. case MSG_HAVE:
  182. return mlen == 5;
  183. case MSG_BITFIELD:
  184. return mlen == (uint32_t)ceil(p->tp->meta.npieces / 8.0) + 1;
  185. case MSG_REQUEST:
  186. case MSG_CANCEL:
  187. return mlen == 13;
  188. case MSG_PIECE:
  189. return mlen <= PIECE_BLOCKLEN + 9;
  190. default:
  191. return 0;
  192. }
  193. }
  194. static void
  195. net_progress(struct peer *p, size_t length)
  196. {
  197. if (p->net.state == NET_MSGBODY && p->net.msg_num == MSG_PIECE) {
  198. p->tp->downloaded += length;
  199. p->rate_to_me[btpd.seconds % RATEHISTORY] += length;
  200. }
  201. }
  202. static int
  203. net_state(struct peer *p, const char *buf)
  204. {
  205. switch (p->net.state) {
  206. case SHAKE_PSTR:
  207. if (bcmp(buf, "\x13""BitTorrent protocol", 20) != 0)
  208. goto bad;
  209. net_set_state(p, SHAKE_INFO, 20);
  210. break;
  211. case SHAKE_INFO:
  212. if (p->flags & PF_INCOMING) {
  213. struct torrent *tp = torrent_get_by_hash(buf);
  214. if (tp == NULL)
  215. goto bad;
  216. p->tp = tp;
  217. peer_send(p, nb_create_shake(p->tp));
  218. } else if (bcmp(buf, p->tp->meta.info_hash, 20) != 0)
  219. goto bad;
  220. net_set_state(p, SHAKE_ID, 20);
  221. break;
  222. case SHAKE_ID:
  223. if ((torrent_has_peer(p->tp, buf)
  224. || bcmp(buf, btpd.peer_id, 20) == 0))
  225. goto bad;
  226. bcopy(buf, p->id, 20);
  227. peer_on_shake(p);
  228. net_set_state(p, NET_MSGSIZE, 4);
  229. break;
  230. case NET_MSGSIZE:
  231. p->net.msg_len = net_read32(buf);
  232. if (p->net.msg_len != 0)
  233. net_set_state(p, NET_MSGHEAD, 1);
  234. break;
  235. case NET_MSGHEAD:
  236. p->net.msg_num = buf[0];
  237. if (!net_mh_ok(p))
  238. goto bad;
  239. else if (p->net.msg_len == 1) {
  240. if (net_dispatch_msg(p, buf) != 0)
  241. goto bad;
  242. net_set_state(p, NET_MSGSIZE, 4);
  243. } else
  244. net_set_state(p, NET_MSGBODY, p->net.msg_len - 1);
  245. break;
  246. case NET_MSGBODY:
  247. if (net_dispatch_msg(p, buf) != 0)
  248. goto bad;
  249. net_set_state(p, NET_MSGSIZE, 4);
  250. break;
  251. default:
  252. abort();
  253. }
  254. return 0;
  255. bad:
  256. btpd_log(BTPD_L_CONN, "bad data from %p (%u, %u, %u).\n",
  257. p, p->net.state, p->net.msg_len, p->net.msg_num);
  258. peer_kill(p);
  259. return -1;
  260. }
  261. #define GRBUFLEN (1 << 15)
  262. static unsigned long
  263. net_read(struct peer *p, unsigned long rmax)
  264. {
  265. size_t rest = p->net.buf != NULL ? p->net.st_bytes - p->net.off : 0;
  266. char buf[GRBUFLEN];
  267. struct iovec iov[2] = {
  268. {
  269. p->net.buf + p->net.off,
  270. rest
  271. }, {
  272. buf,
  273. sizeof(buf)
  274. }
  275. };
  276. if (rmax > 0) {
  277. if (iov[0].iov_len > rmax)
  278. iov[0].iov_len = rmax;
  279. iov[1].iov_len = min(rmax - iov[0].iov_len, iov[1].iov_len);
  280. }
  281. ssize_t nread = readv(p->sd, iov, 2);
  282. if (nread < 0 && errno == EAGAIN)
  283. goto out;
  284. else if (nread < 0) {
  285. btpd_log(BTPD_L_CONN, "Read error (%s) on %p.\n", strerror(errno), p);
  286. peer_kill(p);
  287. return 0;
  288. } else if (nread == 0) {
  289. btpd_log(BTPD_L_CONN, "Connection closed by %p.\n", p);
  290. peer_kill(p);
  291. return 0;
  292. }
  293. if (rest > 0) {
  294. if (nread < rest) {
  295. p->net.off += nread;
  296. net_progress(p, nread);
  297. goto out;
  298. }
  299. net_progress(p, rest);
  300. if (net_state(p, p->net.buf) != 0)
  301. return nread;
  302. free(p->net.buf);
  303. p->net.buf = NULL;
  304. p->net.off = 0;
  305. }
  306. iov[1].iov_len = nread - rest;
  307. while (p->net.st_bytes <= iov[1].iov_len) {
  308. ssize_t consumed = p->net.st_bytes;
  309. net_progress(p, consumed);
  310. if (net_state(p, iov[1].iov_base) != 0)
  311. return nread;
  312. iov[1].iov_base += consumed;
  313. iov[1].iov_len -= consumed;
  314. }
  315. if (iov[1].iov_len > 0) {
  316. net_progress(p, iov[1].iov_len);
  317. p->net.off = iov[1].iov_len;
  318. p->net.buf = btpd_malloc(p->net.st_bytes);
  319. bcopy(iov[1].iov_base, p->net.buf, iov[1].iov_len);
  320. }
  321. out:
  322. event_add(&p->in_ev, NULL);
  323. return nread > 0 ? nread : 0;
  324. }
  325. int
  326. net_connect2(struct sockaddr *sa, socklen_t salen, int *sd)
  327. {
  328. if ((*sd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  329. return errno;
  330. set_nonblocking(*sd);
  331. if (connect(*sd, sa, salen) == -1 && errno != EINPROGRESS) {
  332. btpd_log(BTPD_L_CONN, "Botched connection %s.", strerror(errno));
  333. close(*sd);
  334. return errno;
  335. }
  336. return 0;
  337. }
  338. int
  339. net_connect(const char *ip, int port, int *sd)
  340. {
  341. struct addrinfo hints, *res;
  342. char portstr[6];
  343. assert(btpd.npeers < btpd.maxpeers);
  344. if (snprintf(portstr, sizeof(portstr), "%d", port) >= sizeof(portstr))
  345. return EINVAL;
  346. bzero(&hints, sizeof(hints));
  347. hints.ai_family = AF_UNSPEC;
  348. hints.ai_flags = AI_NUMERICHOST;
  349. hints.ai_socktype = SOCK_STREAM;
  350. if (getaddrinfo(ip, portstr, &hints, &res) != 0)
  351. return errno;
  352. int error = net_connect2(res->ai_addr, res->ai_addrlen, sd);
  353. freeaddrinfo(res);
  354. return error;
  355. }
  356. void
  357. net_connection_cb(int sd, short type, void *arg)
  358. {
  359. int nsd;
  360. nsd = accept(sd, NULL, NULL);
  361. if (nsd < 0) {
  362. if (errno == EWOULDBLOCK || errno == ECONNABORTED)
  363. return;
  364. else
  365. btpd_err("accept4: %s\n", strerror(errno));
  366. }
  367. if (set_nonblocking(nsd) != 0) {
  368. close(nsd);
  369. return;
  370. }
  371. assert(btpd.npeers <= btpd.maxpeers);
  372. if (btpd.npeers == btpd.maxpeers) {
  373. close(nsd);
  374. return;
  375. }
  376. peer_create_in(nsd);
  377. btpd_log(BTPD_L_CONN, "got connection.\n");
  378. }
  379. void
  380. net_bw_rate(void)
  381. {
  382. unsigned sum = 0;
  383. for (int i = 0; i < BWCALLHISTORY - 1; i++) {
  384. btpd.bwrate[i] = btpd.bwrate[i + 1];
  385. sum += btpd.bwrate[i];
  386. }
  387. btpd.bwrate[BWCALLHISTORY - 1] = btpd.bwcalls;
  388. sum += btpd.bwrate[BWCALLHISTORY - 1];
  389. btpd.bwcalls = 0;
  390. btpd.bw_hz_avg = sum / 5.0;
  391. }
  392. void
  393. net_bw_cb(int sd, short type, void *arg)
  394. {
  395. struct peer *p;
  396. btpd.bwcalls++;
  397. double avg_hz;
  398. if (btpd.seconds < BWCALLHISTORY)
  399. avg_hz = btpd.bw_hz;
  400. else
  401. avg_hz = btpd.bw_hz_avg;
  402. btpd.obw_left = btpd.obwlim / avg_hz;
  403. btpd.ibw_left = btpd.ibwlim / avg_hz;
  404. if (btpd.ibwlim > 0) {
  405. while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left > 0) {
  406. BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
  407. p->flags &= ~PF_ON_READQ;
  408. btpd.ibw_left -= net_read(p, btpd.ibw_left);
  409. }
  410. } else {
  411. while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL) {
  412. BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
  413. p->flags &= ~PF_ON_READQ;
  414. net_read(p, 0);
  415. }
  416. }
  417. if (btpd.obwlim) {
  418. while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL && btpd.obw_left > 0) {
  419. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  420. p->flags &= ~PF_ON_WRITEQ;
  421. btpd.obw_left -= net_write(p, btpd.obw_left);
  422. }
  423. } else {
  424. while ((p = BTPDQ_FIRST(&btpd.writeq)) != NULL) {
  425. BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
  426. p->flags &= ~PF_ON_WRITEQ;
  427. net_write(p, 0);
  428. }
  429. }
  430. event_add(&btpd.bwlim, (& (struct timeval) { 0, 1000000 / btpd.bw_hz }));
  431. }
  432. void
  433. net_read_cb(int sd, short type, void *arg)
  434. {
  435. struct peer *p = (struct peer *)arg;
  436. if (btpd.ibwlim == 0)
  437. net_read(p, 0);
  438. else if (btpd.ibw_left > 0)
  439. btpd.ibw_left -= net_read(p, btpd.ibw_left);
  440. else {
  441. p->flags |= PF_ON_READQ;
  442. BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry);
  443. }
  444. }
  445. void
  446. net_write_cb(int sd, short type, void *arg)
  447. {
  448. struct peer *p = (struct peer *)arg;
  449. if (type == EV_TIMEOUT) {
  450. btpd_log(BTPD_L_CONN, "Write attempt timed out.\n");
  451. peer_kill(p);
  452. return;
  453. }
  454. if (btpd.obwlim == 0) {
  455. net_write(p, 0);
  456. } else if (btpd.obw_left > 0) {
  457. btpd.obw_left -= net_write(p, btpd.obw_left);
  458. } else {
  459. p->flags |= PF_ON_WRITEQ;
  460. BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry);
  461. }
  462. }