A clone of btpd with my configuration changes.

831 lines
21 KiB

  1. #include <sys/types.h>
  2. #include <sys/stat.h>
  3. #include <fcntl.h>
  4. #include <math.h>
  5. #include <pthread.h>
  6. #include <stdio.h>
  7. #include <string.h>
  8. #include <unistd.h>
  9. #include <openssl/sha.h>
  10. #include "btpd.h"
  11. #include "stream.h"
  12. struct cm_write_data {
  13. uint32_t begin;
  14. uint8_t *buf;
  15. size_t len;
  16. BTPDQ_ENTRY(cm_write_data) entry;
  17. };
  18. BTPDQ_HEAD(cm_write_data_tq, cm_write_data);
  19. enum cm_op_type {
  20. CM_ALLOC,
  21. CM_SAVE,
  22. CM_START,
  23. CM_TEST,
  24. CM_WRITE
  25. };
  26. struct cm_op {
  27. struct torrent *tp;
  28. int error;
  29. int received;
  30. enum cm_op_type type;
  31. union {
  32. struct {
  33. uint32_t piece;
  34. uint32_t pos;
  35. } alloc;
  36. struct {
  37. volatile sig_atomic_t cancel;
  38. } start;
  39. struct {
  40. uint32_t piece;
  41. uint32_t pos;
  42. int ok;
  43. } test;
  44. struct {
  45. uint32_t piece;
  46. uint32_t pos;
  47. struct cm_write_data_tq q;
  48. } write;
  49. } u;
  50. BTPDQ_ENTRY(cm_op) cm_entry;
  51. BTPDQ_ENTRY(cm_op) td_entry;
  52. };
  53. BTPDQ_HEAD(cm_op_tq, cm_op);
  54. struct content {
  55. int active;
  56. uint32_t npieces_got;
  57. off_t ncontent_bytes;
  58. size_t bppbf; // bytes per piece block field
  59. uint8_t *piece_field;
  60. uint8_t *block_field;
  61. uint8_t *hold_field;
  62. uint8_t *pos_field;
  63. struct cm_op_tq todoq;
  64. struct bt_stream *rds;
  65. struct bt_stream *wrs;
  66. struct event save_timer;
  67. };
  68. #define ZEROBUFLEN (1 << 14)
  69. struct cm_comm {
  70. struct cm_op_tq q;
  71. pthread_mutex_t lock;
  72. pthread_cond_t cond;
  73. };
  74. static struct cm_comm m_long_comm, m_short_comm;
  75. static const uint8_t m_zerobuf[ZEROBUFLEN];
  76. static int
  77. fd_cb_rd(const char *path, int *fd, void *arg)
  78. {
  79. struct torrent *tp = arg;
  80. return vopen(fd, O_RDONLY, "torrents/%s/content/%s", tp->relpath, path);
  81. }
  82. static int
  83. fd_cb_wr(const char *path, int *fd, void *arg)
  84. {
  85. struct torrent *tp = arg;
  86. return vopen(fd, O_RDWR|O_CREAT, "torrents/%s/content/%s", tp->relpath,
  87. path);
  88. }
  89. static void
  90. cm_td_post_common(struct cm_comm *comm, struct cm_op *op)
  91. {
  92. pthread_mutex_lock(&comm->lock);
  93. BTPDQ_INSERT_TAIL(&comm->q, op, td_entry);
  94. pthread_mutex_unlock(&comm->lock);
  95. pthread_cond_signal(&comm->cond);
  96. }
  97. static void
  98. cm_td_post_long(struct cm_op *op)
  99. {
  100. cm_td_post_common(&m_long_comm, op);
  101. }
  102. static void
  103. cm_td_post_short(struct cm_op *op)
  104. {
  105. cm_td_post_common(&m_short_comm, op);
  106. }
  107. static void
  108. run_todo(struct content *cm)
  109. {
  110. struct cm_op *op = BTPDQ_FIRST(&cm->todoq);
  111. if (op->type == CM_WRITE && BTPDQ_EMPTY(&op->u.write.q)) {
  112. BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
  113. free(op);
  114. if (!BTPDQ_EMPTY(&cm->todoq))
  115. run_todo(cm);
  116. return;
  117. }
  118. if (op->type != CM_START)
  119. cm_td_post_short(op);
  120. else
  121. cm_td_post_long(op);
  122. }
  123. static void
  124. add_todo(struct content *cm, struct cm_op *op)
  125. {
  126. int was_empty = BTPDQ_EMPTY(&cm->todoq);
  127. BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
  128. if (was_empty)
  129. run_todo(cm);
  130. }
  131. void
  132. cm_destroy(struct torrent *tp)
  133. {
  134. struct content *cm = tp->cm;
  135. bts_close(cm->rds);
  136. free(cm->piece_field);
  137. free(cm->block_field);
  138. free(cm->hold_field);
  139. free(cm->pos_field);
  140. tp->cm = NULL;
  141. torrent_on_cm_stopped(tp);
  142. }
  143. void
  144. cm_save(struct torrent *tp)
  145. {
  146. struct content *cm = tp->cm;
  147. struct cm_op *op = btpd_calloc(1, sizeof(*op));
  148. op->tp = tp;
  149. op->type = CM_SAVE;
  150. add_todo(cm, op);
  151. }
  152. static void
  153. cm_write_done(struct torrent *tp)
  154. {
  155. int err;
  156. struct content *cm = tp->cm;
  157. if ((err = bts_close(cm->wrs)) != 0)
  158. btpd_err("Error closing write stream for %s (%s).\n", tp->relpath,
  159. strerror(err));
  160. cm->wrs = NULL;
  161. event_del(&cm->save_timer);
  162. cm_save(tp);
  163. }
  164. void
  165. cm_stop(struct torrent *tp)
  166. {
  167. struct content *cm = tp->cm;
  168. cm->active = 0;
  169. struct cm_op *op = BTPDQ_FIRST(&cm->todoq);
  170. if (op != NULL && op->type == CM_START) {
  171. pthread_mutex_lock(&m_long_comm.lock);
  172. if (op->received)
  173. op->u.start.cancel = 1;
  174. else {
  175. BTPDQ_REMOVE(&m_long_comm.q, op, td_entry);
  176. BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
  177. free(op);
  178. }
  179. pthread_mutex_unlock(&m_long_comm.lock);
  180. } else if (!cm_full(tp))
  181. cm_write_done(tp);
  182. if (BTPDQ_EMPTY(&cm->todoq))
  183. cm_destroy(tp);
  184. }
  185. #define SAVE_INTERVAL (& (struct timeval) { 15, 0 })
  186. static void
  187. save_timer_cb(int fd, short type, void *arg)
  188. {
  189. struct torrent *tp = arg;
  190. event_add(&tp->cm->save_timer, SAVE_INTERVAL);
  191. cm_save(tp);
  192. }
  193. static void
  194. cm_td_cb(void *arg)
  195. {
  196. int err;
  197. struct cm_op *op = arg;
  198. struct torrent *tp = op->tp;
  199. struct content *cm = tp->cm;
  200. if (op->error)
  201. btpd_err("IO error for %s.\n", tp->relpath);
  202. switch (op->type) {
  203. case CM_ALLOC:
  204. set_bit(cm->pos_field, op->u.alloc.pos);
  205. clear_bit(cm->hold_field, op->u.alloc.piece);
  206. break;
  207. case CM_START:
  208. if (cm->active) {
  209. assert(!op->u.start.cancel);
  210. if (!cm_full(tp)) {
  211. if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0)
  212. btpd_err("Couldn't open write stream for %s (%s).\n",
  213. tp->relpath, strerror(err));
  214. event_add(&cm->save_timer, SAVE_INTERVAL);
  215. }
  216. torrent_on_cm_started(tp);
  217. }
  218. break;
  219. case CM_TEST:
  220. if (op->u.test.ok) {
  221. assert(cm->npieces_got < tp->meta.npieces);
  222. cm->npieces_got++;
  223. set_bit(cm->piece_field, op->u.test.piece);
  224. if (tp->net != NULL)
  225. dl_on_ok_piece(op->tp->net, op->u.test.piece);
  226. if (cm_full(tp))
  227. cm_write_done(tp);
  228. } else {
  229. cm->ncontent_bytes -= torrent_piece_size(tp, op->u.test.piece);
  230. bzero(cm->block_field + op->u.test.piece * cm->bppbf, cm->bppbf);
  231. if (tp->net != NULL)
  232. dl_on_bad_piece(tp->net, op->u.test.piece);
  233. }
  234. break;
  235. case CM_SAVE:
  236. case CM_WRITE:
  237. break;
  238. }
  239. BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
  240. free(op);
  241. if (!BTPDQ_EMPTY(&cm->todoq))
  242. run_todo(cm);
  243. else if (!cm->active)
  244. cm_destroy(tp);
  245. }
  246. int
  247. cm_start(struct torrent *tp)
  248. {
  249. int err;
  250. struct content *cm = btpd_calloc(1, sizeof(*cm));
  251. size_t pfield_size = ceil(tp->meta.npieces / 8.0);
  252. cm->active = 1;
  253. cm->bppbf = ceil((double)tp->meta.piece_length / (1 << 17));
  254. cm->piece_field = btpd_calloc(pfield_size, 1);
  255. cm->hold_field = btpd_calloc(pfield_size, 1);
  256. cm->pos_field = btpd_calloc(pfield_size, 1);
  257. cm->block_field = btpd_calloc(tp->meta.npieces * cm->bppbf, 1);
  258. BTPDQ_INIT(&cm->todoq);
  259. if ((err = bts_open(&cm->rds, &tp->meta, fd_cb_rd, tp)) != 0)
  260. btpd_err("Error opening stream (%s).\n", strerror(err));
  261. tp->cm = cm;
  262. evtimer_set(&cm->save_timer, save_timer_cb, tp);
  263. struct cm_op *op = btpd_calloc(1, sizeof(*op));
  264. op->tp = tp;
  265. op->type = CM_START;
  266. add_todo(cm, op);
  267. return 0;
  268. }
  269. int
  270. cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len,
  271. uint8_t **buf)
  272. {
  273. *buf = btpd_malloc(len);
  274. int err =
  275. bts_get(tp->cm->rds, piece * tp->meta.piece_length + begin, *buf, len);
  276. if (err != 0)
  277. btpd_err("Io error (%s)\n", strerror(err));
  278. return 0;
  279. }
  280. static void
  281. cm_post_alloc(struct torrent *tp, uint32_t piece)
  282. {
  283. struct content *cm = tp->cm;
  284. set_bit(cm->hold_field, piece);
  285. struct cm_op *op = btpd_calloc(1, sizeof(*op));
  286. op->tp = tp;
  287. op->type = CM_ALLOC;
  288. op->u.alloc.piece = piece;
  289. op->u.alloc.pos = piece;
  290. add_todo(cm, op);
  291. op = btpd_calloc(1, sizeof(*op));
  292. op->tp = tp;
  293. op->type = CM_WRITE;
  294. op->u.write.piece = piece;
  295. op->u.write.pos = piece;
  296. BTPDQ_INIT(&op->u.write.q);
  297. add_todo(cm, op);
  298. }
  299. void
  300. cm_prealloc(struct torrent *tp, uint32_t piece)
  301. {
  302. struct content *cm = tp->cm;
  303. if (cm_alloc_size == 0)
  304. set_bit(cm->pos_field, piece);
  305. else {
  306. unsigned npieces = ceil((double)cm_alloc_size / tp->meta.piece_length);
  307. uint32_t start = piece - piece % npieces;
  308. uint32_t end = min(start + npieces, tp->meta.npieces);
  309. while (start < end) {
  310. if ((!has_bit(cm->pos_field, start)
  311. && !has_bit(cm->hold_field, start)))
  312. cm_post_alloc(tp, start);
  313. start++;
  314. }
  315. }
  316. }
  317. void
  318. cm_test_piece(struct torrent *tp, uint32_t piece)
  319. {
  320. struct content *cm = tp->cm;
  321. struct cm_op *op = btpd_calloc(1, sizeof(*op));
  322. op->tp = tp;
  323. op->type = CM_TEST;
  324. op->u.test.piece = piece;
  325. op->u.test.pos = piece;
  326. add_todo(cm, op);
  327. }
  328. int
  329. cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
  330. const uint8_t *buf, size_t len)
  331. {
  332. int err;
  333. struct content *cm = tp->cm;
  334. if (has_bit(cm->hold_field, piece)) {
  335. struct cm_write_data *d = btpd_calloc(1, sizeof(*d) + len);
  336. d->begin = begin;
  337. d->len = len;
  338. d->buf = (uint8_t *)(d + 1);
  339. bcopy(buf, d->buf, len);
  340. struct cm_op *op;
  341. BTPDQ_FOREACH(op, &cm->todoq, cm_entry)
  342. if (op->type == CM_WRITE && op->u.write.piece == piece)
  343. break;
  344. struct cm_write_data *it;
  345. BTPDQ_FOREACH(it, &op->u.write.q, entry)
  346. if (it->begin > begin) {
  347. BTPDQ_INSERT_BEFORE(it, d, entry);
  348. break;
  349. }
  350. if (it == NULL)
  351. BTPDQ_INSERT_TAIL(&op->u.write.q, d, entry);
  352. } else {
  353. err = bts_put(cm->wrs, piece * tp->meta.piece_length + begin, buf,
  354. len);
  355. if (err != 0)
  356. btpd_err("Io error (%s)\n", strerror(err));
  357. }
  358. cm->ncontent_bytes += len;
  359. uint8_t *bf = cm->block_field + piece * cm->bppbf;
  360. set_bit(bf, begin / PIECE_BLOCKLEN);
  361. return 0;
  362. }
  363. int
  364. cm_full(struct torrent *tp)
  365. {
  366. return tp->cm->npieces_got == tp->meta.npieces;
  367. }
  368. off_t
  369. cm_get_size(struct torrent *tp)
  370. {
  371. return tp->cm->ncontent_bytes;
  372. }
  373. uint32_t
  374. cm_get_npieces(struct torrent *tp)
  375. {
  376. return tp->cm->npieces_got;
  377. }
  378. uint8_t *
  379. cm_get_piece_field(struct torrent *tp)
  380. {
  381. return tp->cm->piece_field;
  382. }
  383. uint8_t *
  384. cm_get_block_field(struct torrent *tp, uint32_t piece)
  385. {
  386. return tp->cm->block_field + piece * tp->cm->bppbf;
  387. }
  388. int
  389. cm_has_piece(struct torrent *tp, uint32_t piece)
  390. {
  391. return has_bit(tp->cm->piece_field, piece);
  392. }
  393. static int
  394. test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece)
  395. {
  396. if (tp->meta.piece_hash != NULL)
  397. return bcmp(hash, tp->meta.piece_hash[piece], SHA_DIGEST_LENGTH);
  398. else {
  399. char piece_hash[SHA_DIGEST_LENGTH];
  400. int fd;
  401. int err;
  402. err = vopen(&fd, O_RDONLY, "torrents/%s/torrent", tp->relpath);
  403. if (err != 0)
  404. btpd_err("test_hash: %s\n", strerror(err));
  405. lseek(fd, tp->meta.pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET);
  406. read(fd, piece_hash, SHA_DIGEST_LENGTH);
  407. close(fd);
  408. return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
  409. }
  410. }
  411. static int
  412. test_piece(struct torrent *tp, uint32_t pos, uint32_t piece, int *ok)
  413. {
  414. int err;
  415. uint8_t hash[SHA_DIGEST_LENGTH];
  416. struct bt_stream *bts;
  417. if ((err = bts_open(&bts, &tp->meta, fd_cb_rd, tp)) != 0)
  418. return err;
  419. if ((err = bts_sha(bts, pos * tp->meta.piece_length,
  420. torrent_piece_size(tp, piece), hash)) != 0)
  421. return err;;
  422. bts_close(bts);
  423. *ok = test_hash(tp, hash, piece) == 0;
  424. return 0;
  425. }
  426. static void
  427. cm_td_alloc(struct cm_op *op)
  428. {
  429. struct torrent *tp = op->tp;
  430. struct content *cm = tp->cm;
  431. uint32_t pos = op->u.alloc.pos;
  432. struct bt_stream *bts;
  433. int err;
  434. assert(!has_bit(cm->pos_field, pos));
  435. if ((err = bts_open(&bts, &tp->meta, fd_cb_wr, tp)) != 0)
  436. goto out;
  437. off_t len = torrent_piece_size(tp, pos);
  438. off_t off = tp->meta.piece_length * pos;
  439. while (len > 0) {
  440. size_t wlen = min(ZEROBUFLEN, len);
  441. if ((err = bts_put(bts, off, m_zerobuf, wlen)) != 0) {
  442. bts_close(bts);
  443. goto out;
  444. }
  445. len -= wlen;
  446. off += wlen;
  447. }
  448. err = bts_close(bts);
  449. out:
  450. if (err != 0)
  451. op->error = 1;
  452. }
  453. static int
  454. test_torrent(struct torrent *tp, volatile sig_atomic_t *cancel)
  455. {
  456. int err;
  457. FILE *fp;
  458. uint8_t (*hashes)[SHA_DIGEST_LENGTH];
  459. uint8_t hash[SHA_DIGEST_LENGTH];
  460. if ((err = vfopen(&fp, "r", "torrents/%s/torrent", tp->relpath)) != 0)
  461. return err;
  462. hashes = btpd_malloc(tp->meta.npieces * SHA_DIGEST_LENGTH);
  463. fseek(fp, tp->meta.pieces_off, SEEK_SET);
  464. fread(hashes, SHA_DIGEST_LENGTH, tp->meta.npieces, fp);
  465. fclose(fp);
  466. tp->meta.piece_hash = hashes;
  467. struct content *cm = tp->cm;
  468. for (uint32_t piece = 0; piece < tp->meta.npieces; piece++) {
  469. if (!has_bit(cm->pos_field, piece))
  470. continue;
  471. err = bts_sha(cm->rds, piece * tp->meta.piece_length,
  472. torrent_piece_size(tp, piece), hash);
  473. if (err != 0)
  474. break;
  475. if (test_hash(tp, hash, piece) == 0)
  476. set_bit(tp->cm->piece_field, piece);
  477. if (*cancel) {
  478. err = EINTR;
  479. break;
  480. }
  481. }
  482. tp->meta.piece_hash = NULL;
  483. free(hashes);
  484. return err;
  485. }
  486. struct rstat {
  487. time_t mtime;
  488. off_t size;
  489. };
  490. int
  491. stat_and_adjust(struct torrent *tp, struct rstat ret[])
  492. {
  493. char path[PATH_MAX];
  494. struct stat sb;
  495. for (int i = 0; i < tp->meta.nfiles; i++) {
  496. snprintf(path, PATH_MAX, "torrents/%s/content/%s", tp->relpath,
  497. tp->meta.files[i].path);
  498. again:
  499. if (stat(path, &sb) == -1) {
  500. if (errno == ENOENT) {
  501. ret[i].mtime = -1;
  502. ret[i].size = -1;
  503. } else
  504. return errno;
  505. } else {
  506. ret[i].mtime = sb.st_mtime;
  507. ret[i].size = sb.st_size;
  508. }
  509. if (ret[i].size > tp->meta.files[i].length) {
  510. if (truncate(path, tp->meta.files[i].length) != 0)
  511. return errno;
  512. goto again;
  513. }
  514. }
  515. return 0;
  516. }
  517. static int
  518. load_resume(struct torrent *tp, struct rstat sbs[])
  519. {
  520. int err, ver;
  521. FILE *fp;
  522. size_t pfsiz = ceil(tp->meta.npieces / 8.0);
  523. size_t bfsiz = tp->meta.npieces * tp->cm->bppbf;
  524. if ((err = vfopen(&fp, "r" , "torrents/%s/resume", tp->relpath)) != 0)
  525. return err;
  526. if (fscanf(fp, "%d\n", &ver) != 1)
  527. goto invalid;
  528. if (ver != 1)
  529. goto invalid;
  530. for (int i = 0; i < tp->meta.nfiles; i++) {
  531. long long size;
  532. time_t time;
  533. if (fscanf(fp, "%qd %ld\n", &size, &time) != 2)
  534. goto invalid;
  535. if (sbs[i].size != size || sbs[i].mtime != time)
  536. err = EINVAL;
  537. }
  538. if (fread(tp->cm->piece_field, 1, pfsiz, fp) != pfsiz)
  539. goto invalid;
  540. if (fread(tp->cm->block_field, 1, bfsiz, fp) != bfsiz)
  541. goto invalid;
  542. fclose(fp);
  543. return err;
  544. invalid:
  545. fclose(fp);
  546. bzero(tp->cm->piece_field, pfsiz);
  547. bzero(tp->cm->block_field, bfsiz);
  548. return EINVAL;
  549. }
  550. static int
  551. save_resume(struct torrent *tp, struct rstat sbs[])
  552. {
  553. int err;
  554. FILE *fp;
  555. if ((err = vfopen(&fp, "wb", "torrents/%s/resume", tp->relpath)) != 0)
  556. return err;
  557. fprintf(fp, "%d\n", 1);
  558. for (int i = 0; i < tp->meta.nfiles; i++)
  559. fprintf(fp, "%qd %ld\n", (long long)sbs[i].size, sbs[i].mtime);
  560. fwrite(tp->cm->piece_field, 1, ceil(tp->meta.npieces / 8.0), fp);
  561. fwrite(tp->cm->block_field, 1, tp->meta.npieces * tp->cm->bppbf, fp);
  562. if (fclose(fp) != 0)
  563. err = errno;
  564. return err;
  565. }
  566. static void
  567. cm_td_save(struct cm_op *op)
  568. {
  569. struct torrent *tp = op->tp;
  570. struct rstat sbs[tp->meta.nfiles];
  571. if (stat_and_adjust(tp, sbs) == 0)
  572. save_resume(tp, sbs);
  573. }
  574. static void
  575. cm_td_start(struct cm_op *op)
  576. {
  577. int err, resume_clean = 0, tested_torrent = 0;
  578. struct rstat sbs[op->tp->meta.nfiles];
  579. struct torrent *tp = op->tp;
  580. struct content *cm = tp->cm;
  581. if ((err = stat_and_adjust(op->tp, sbs)) != 0)
  582. goto out;
  583. resume_clean = load_resume(tp, sbs) == 0;
  584. if (!resume_clean) {
  585. memset(cm->pos_field, 0xff, ceil(tp->meta.npieces / 8.0));
  586. off_t off = 0;
  587. for (int i = 0; i < tp->meta.nfiles; i++) {
  588. if (sbs[i].size == -1 || sbs[i].size == 0) {
  589. uint32_t start = off / tp->meta.piece_length;
  590. uint32_t end = (off + tp->meta.files[i].length - 1) /
  591. tp->meta.piece_length;
  592. while (start <= end) {
  593. clear_bit(cm->pos_field, start);
  594. clear_bit(cm->piece_field, start);
  595. bzero(cm->block_field + start * cm->bppbf, cm->bppbf);
  596. start++;
  597. }
  598. } else if (sbs[i].size < tp->meta.files[i].length) {
  599. uint32_t start = (off + sbs[i].size) /
  600. tp->meta.piece_length;
  601. uint32_t end = (off + tp->meta.files[i].length - 1) /
  602. tp->meta.piece_length;
  603. while (start <= end) {
  604. clear_bit(cm->pos_field, start);
  605. start++;
  606. }
  607. }
  608. off += tp->meta.files[i].length;
  609. }
  610. if (op->u.start.cancel)
  611. goto out;
  612. if ((err = test_torrent(tp, &op->u.start.cancel)) != 0)
  613. goto out;
  614. tested_torrent = 1;
  615. }
  616. bzero(cm->pos_field, ceil(tp->meta.npieces / 8.0));
  617. for (uint32_t piece = 0; piece < tp->meta.npieces; piece++) {
  618. if (cm_has_piece(tp, piece)) {
  619. cm->ncontent_bytes += torrent_piece_size(tp, piece);
  620. cm->npieces_got++;
  621. set_bit(cm->pos_field, piece);
  622. continue;
  623. }
  624. uint8_t *bf = cm->block_field + cm->bppbf * piece;
  625. uint32_t nblocks = torrent_piece_blocks(tp, piece);
  626. uint32_t nblocks_got = 0;
  627. for (uint32_t i = 0; i < nblocks; i++) {
  628. if (has_bit(bf, i)) {
  629. nblocks_got++;
  630. cm->ncontent_bytes +=
  631. torrent_block_size(tp, piece, nblocks, i);
  632. }
  633. }
  634. if (nblocks_got == nblocks) {
  635. resume_clean = 0;
  636. int ok = 0;
  637. if (!tested_torrent) {
  638. if (((err = test_piece(tp, piece, piece, &ok)) != 0
  639. || op->u.start.cancel))
  640. goto out;
  641. }
  642. if (ok) {
  643. set_bit(cm->pos_field, piece);
  644. set_bit(cm->piece_field, piece);
  645. } else
  646. bzero(bf, cm->bppbf);
  647. } else if (nblocks_got > 0)
  648. set_bit(cm->pos_field, piece);
  649. }
  650. if (!resume_clean)
  651. save_resume(tp, sbs);
  652. out:
  653. if (!op->u.start.cancel && err != 0)
  654. op->error = 1;
  655. }
  656. static void
  657. cm_td_test(struct cm_op *op)
  658. {
  659. if (test_piece(op->tp, op->u.test.pos, op->u.test.piece,
  660. &op->u.test.ok) != 0)
  661. op->error = 1;
  662. }
  663. static void
  664. cm_td_write(struct cm_op *op)
  665. {
  666. int err;
  667. struct cm_write_data *d, *next;
  668. off_t base = op->tp->meta.piece_length * op->u.write.pos;
  669. struct bt_stream *bts;
  670. if ((err = bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp)) != 0)
  671. goto out;
  672. BTPDQ_FOREACH(d, &op->u.write.q, entry)
  673. if ((err = bts_put(bts, base + d->begin, d->buf, d->len)) != 0) {
  674. bts_close(bts);
  675. goto out;
  676. }
  677. err = bts_close(bts);
  678. out:
  679. BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next)
  680. free(d);
  681. if (err)
  682. op->error = 1;
  683. }
  684. static void
  685. cm_td(void *arg)
  686. {
  687. struct cm_comm *comm = arg;
  688. struct cm_op *op;
  689. for (;;) {
  690. pthread_mutex_lock(&comm->lock);
  691. while (BTPDQ_EMPTY(&comm->q))
  692. pthread_cond_wait(&comm->cond, &comm->lock);
  693. op = BTPDQ_FIRST(&comm->q);
  694. BTPDQ_REMOVE(&comm->q, op, td_entry);
  695. op->received = 1;
  696. pthread_mutex_unlock(&comm->lock);
  697. switch (op->type) {
  698. case CM_ALLOC:
  699. cm_td_alloc(op);
  700. break;
  701. case CM_SAVE:
  702. cm_td_save(op);
  703. break;
  704. case CM_START:
  705. cm_td_start(op);
  706. break;
  707. case CM_TEST:
  708. cm_td_test(op);
  709. break;
  710. case CM_WRITE:
  711. cm_td_write(op);
  712. break;
  713. default:
  714. abort();
  715. }
  716. td_post_begin();
  717. td_post(cm_td_cb, op);
  718. td_post_end();
  719. }
  720. }
  721. void
  722. cm_init(void)
  723. {
  724. pthread_t td;
  725. BTPDQ_INIT(&m_long_comm.q);
  726. pthread_mutex_init(&m_long_comm.lock, NULL);
  727. pthread_cond_init(&m_long_comm.cond, NULL);
  728. pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_long_comm);
  729. BTPDQ_INIT(&m_short_comm.q);
  730. pthread_mutex_init(&m_short_comm.lock, NULL);
  731. pthread_cond_init(&m_short_comm.cond, NULL);
  732. pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_short_comm);
  733. }