A clone of btpd with my configuration changes.

563 lines
14 KiB

  1. #include <sys/types.h>
  2. #include <sys/stat.h>
  3. #include <fcntl.h>
  4. #include <math.h>
  5. #include <stdio.h>
  6. #include <string.h>
  7. #include <unistd.h>
  8. #include <openssl/sha.h>
  9. #include "btpd.h"
  10. #include "stream.h"
  11. struct content {
  12. enum { CM_INACTIVE, CM_STARTING, CM_ACTIVE } state;
  13. int error;
  14. uint32_t npieces_got;
  15. off_t ncontent_bytes;
  16. size_t bppbf; // bytes per piece block field
  17. uint8_t *piece_field;
  18. uint8_t *block_field;
  19. uint8_t *pos_field;
  20. struct bt_stream *rds;
  21. struct bt_stream *wrs;
  22. struct event save_timer;
  23. };
  24. #define ZEROBUFLEN (1 << 14)
  25. static const uint8_t m_zerobuf[ZEROBUFLEN];
  26. static int
  27. fd_cb_rd(const char *path, int *fd, void *arg)
  28. {
  29. struct torrent *tp = arg;
  30. return vopen(fd, O_RDONLY, "%s/%s", tp->tl->dir, path);
  31. }
  32. static int
  33. fd_cb_wr(const char *path, int *fd, void *arg)
  34. {
  35. struct torrent *tp = arg;
  36. return vopen(fd, O_RDWR, "%s/%s", tp->tl->dir, path);
  37. }
  38. struct start_test_data {
  39. struct torrent *tp;
  40. struct file_time_size *fts;
  41. uint32_t start;
  42. BTPDQ_ENTRY(start_test_data) entry;
  43. };
  44. BTPDQ_HEAD(std_tq, start_test_data);
  45. static struct std_tq m_startq = BTPDQ_HEAD_INITIALIZER(m_startq);
  46. static struct event m_workev;
  47. #define READBUFLEN (1 << 14)
  48. static int
  49. test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece)
  50. {
  51. char piece_hash[SHA_DIGEST_LENGTH];
  52. tlib_read_hash(tp->tl, tp->pieces_off, piece, piece_hash);
  53. return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
  54. }
  55. static int
  56. test_piece(struct torrent *tp, uint32_t piece, int *ok)
  57. {
  58. int err;
  59. uint8_t hash[SHA_DIGEST_LENGTH];
  60. if ((err = bts_sha(tp->cm->rds, piece * tp->piece_length,
  61. torrent_piece_size(tp, piece), hash)) != 0) {
  62. btpd_log(BTPD_L_ERROR, "io error on '%s' (%s).\n",
  63. bts_filename(tp->cm->rds), strerror(err));
  64. return err;;
  65. }
  66. *ok = test_hash(tp, hash, piece) == 0;
  67. return 0;
  68. }
  69. static int test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece);
  70. static void startup_test_run(void);
  71. void
  72. worker_cb(int fd, short type, void *arg)
  73. {
  74. startup_test_run();
  75. }
  76. void
  77. cm_kill(struct torrent *tp)
  78. {
  79. struct content *cm = tp->cm;
  80. free(cm->piece_field);
  81. free(cm->block_field);
  82. free(cm->pos_field);
  83. free(cm);
  84. tp->cm = NULL;
  85. }
  86. static int stat_and_adjust(struct torrent *tp, struct file_time_size ret[]);
  87. void
  88. cm_save(struct torrent *tp)
  89. {
  90. struct file_time_size fts[tp->nfiles];
  91. stat_and_adjust(tp, fts);
  92. tlib_save_resume(tp->tl, tp->nfiles, fts,
  93. ceil(tp->npieces / 8.0), tp->cm->piece_field,
  94. tp->cm->bppbf * tp->npieces, tp->cm->block_field);
  95. }
  96. static void
  97. cm_on_error(struct torrent *tp)
  98. {
  99. if (!tp->cm->error) {
  100. tp->cm->error = 1;
  101. cm_stop(tp);
  102. }
  103. }
  104. static void
  105. cm_write_done(struct torrent *tp)
  106. {
  107. int err = 0;
  108. struct content *cm = tp->cm;
  109. if ((err = bts_close(cm->wrs)) != 0)
  110. btpd_log(BTPD_L_ERROR, "error closing write stream for '%s' (%s).\n",
  111. torrent_name(tp), strerror(err));
  112. cm->wrs = NULL;
  113. btpd_ev_del(&cm->save_timer);
  114. if (!err)
  115. cm_save(tp);
  116. else
  117. cm_on_error(tp);
  118. }
  119. void
  120. cm_stop(struct torrent *tp)
  121. {
  122. struct content *cm = tp->cm;
  123. if (cm->state != CM_STARTING && cm->state != CM_ACTIVE)
  124. return;
  125. if (cm->state == CM_STARTING) {
  126. struct start_test_data *std;
  127. BTPDQ_FOREACH(std, &m_startq, entry)
  128. if (std->tp == tp) {
  129. BTPDQ_REMOVE(&m_startq, std, entry);
  130. free(std->fts);
  131. free(std);
  132. break;
  133. }
  134. }
  135. if (cm->rds != NULL)
  136. bts_close(cm->rds);
  137. if (cm->wrs != NULL)
  138. cm_write_done(tp);
  139. cm->state = CM_INACTIVE;
  140. }
  141. int
  142. cm_active(struct torrent *tp)
  143. {
  144. struct content *cm = tp->cm;
  145. return cm->state != CM_INACTIVE;
  146. }
  147. int
  148. cm_error(struct torrent *tp)
  149. {
  150. return tp->cm->error;
  151. }
  152. int
  153. cm_started(struct torrent *tp)
  154. {
  155. struct content *cm = tp->cm;
  156. return cm->state == CM_ACTIVE;
  157. }
  158. #define SAVE_INTERVAL (& (struct timeval) { 15, 0 })
  159. static void
  160. save_timer_cb(int fd, short type, void *arg)
  161. {
  162. struct torrent *tp = arg;
  163. btpd_ev_add(&tp->cm->save_timer, SAVE_INTERVAL);
  164. cm_save(tp);
  165. }
  166. void
  167. cm_create(struct torrent *tp, const char *mi)
  168. {
  169. size_t pfield_size = ceil(tp->npieces / 8.0);
  170. struct content *cm = btpd_calloc(1, sizeof(*cm));
  171. cm->bppbf = ceil((double)tp->piece_length / (1 << 17));
  172. cm->piece_field = btpd_calloc(pfield_size, 1);
  173. cm->pos_field = btpd_calloc(pfield_size, 1);
  174. cm->block_field = btpd_calloc(tp->npieces * cm->bppbf, 1);
  175. evtimer_set(&cm->save_timer, save_timer_cb, tp);
  176. tp->cm = cm;
  177. }
  178. int
  179. cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len,
  180. uint8_t **buf)
  181. {
  182. if (tp->cm->error)
  183. return EIO;
  184. *buf = btpd_malloc(len);
  185. int err =
  186. bts_get(tp->cm->rds, piece * tp->piece_length + begin, *buf, len);
  187. if (err != 0) {
  188. btpd_log(BTPD_L_ERROR, "io error on '%s' (%s).\n",
  189. bts_filename(tp->cm->rds), strerror(err));
  190. cm_on_error(tp);
  191. }
  192. return err;
  193. }
  194. void
  195. cm_prealloc(struct torrent *tp, uint32_t piece)
  196. {
  197. struct content *cm = tp->cm;
  198. if (cm_alloc_size <= 0)
  199. set_bit(cm->pos_field, piece);
  200. }
  201. void
  202. cm_test_piece(struct torrent *tp, uint32_t piece)
  203. {
  204. int ok;
  205. struct content *cm = tp->cm;
  206. if ((errno = test_piece(tp, piece, &ok)) != 0)
  207. cm_on_error(tp);
  208. else if (ok) {
  209. assert(cm->npieces_got < tp->npieces);
  210. cm->npieces_got++;
  211. set_bit(cm->piece_field, piece);
  212. if (net_active(tp))
  213. dl_on_ok_piece(tp->net,piece);
  214. if (cm_full(tp))
  215. cm_write_done(tp);
  216. } else {
  217. cm->ncontent_bytes -= torrent_piece_size(tp,piece);
  218. bzero(cm->block_field + piece * cm->bppbf, cm->bppbf);
  219. if (net_active(tp))
  220. dl_on_bad_piece(tp->net, piece);
  221. }
  222. }
  223. int
  224. cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
  225. const uint8_t *buf, size_t len)
  226. {
  227. int err;
  228. struct content *cm = tp->cm;
  229. if (cm->error)
  230. return EIO;
  231. uint8_t *bf = cm->block_field + piece * cm->bppbf;
  232. assert(!has_bit(bf, begin / PIECE_BLOCKLEN));
  233. assert(!has_bit(cm->piece_field, piece));
  234. if (!has_bit(cm->pos_field, piece)) {
  235. unsigned npieces = ceil((double)cm_alloc_size / tp->piece_length);
  236. uint32_t start = piece - piece % npieces;
  237. uint32_t end = min(start + npieces, tp->npieces);
  238. while (start < end) {
  239. if (!has_bit(cm->pos_field, start)) {
  240. assert(!has_bit(cm->piece_field, start));
  241. off_t len = torrent_piece_size(tp, start);
  242. off_t off = tp->piece_length * start;
  243. while (len > 0) {
  244. size_t wlen = min(ZEROBUFLEN, len);
  245. if ((err = bts_put(cm->wrs, off, m_zerobuf, wlen)) != 0) {
  246. btpd_log(BTPD_L_ERROR, "io error on '%s' (%s).\n",
  247. bts_filename(cm->wrs), strerror(errno));
  248. cm_on_error(tp);
  249. return err;
  250. }
  251. len -= wlen;
  252. off += wlen;
  253. }
  254. set_bit(cm->pos_field, start);
  255. }
  256. start++;
  257. }
  258. }
  259. err = bts_put(cm->wrs, piece * tp->piece_length + begin, buf, len);
  260. if (err != 0) {
  261. btpd_log(BTPD_L_ERROR, "io error on '%s' (%s)\n",
  262. bts_filename(cm->wrs), strerror(err));
  263. cm_on_error(tp);
  264. return err;
  265. }
  266. cm->ncontent_bytes += len;
  267. set_bit(bf, begin / PIECE_BLOCKLEN);
  268. return 0;
  269. }
  270. int
  271. cm_full(struct torrent *tp)
  272. {
  273. return tp->cm->npieces_got == tp->npieces;
  274. }
  275. off_t
  276. cm_content(struct torrent *tp)
  277. {
  278. return tp->cm->ncontent_bytes;
  279. }
  280. uint32_t
  281. cm_pieces(struct torrent *tp)
  282. {
  283. return tp->cm->npieces_got;
  284. }
  285. uint8_t *
  286. cm_get_piece_field(struct torrent *tp)
  287. {
  288. return tp->cm->piece_field;
  289. }
  290. uint8_t *
  291. cm_get_block_field(struct torrent *tp, uint32_t piece)
  292. {
  293. return tp->cm->block_field + piece * tp->cm->bppbf;
  294. }
  295. int
  296. cm_has_piece(struct torrent *tp, uint32_t piece)
  297. {
  298. return has_bit(tp->cm->piece_field, piece);
  299. }
  300. int
  301. stat_and_adjust(struct torrent *tp, struct file_time_size ret[])
  302. {
  303. int fd;
  304. char path[PATH_MAX];
  305. struct stat sb;
  306. for (int i = 0; i < tp->nfiles; i++) {
  307. snprintf(path, PATH_MAX, "%s/%s", tp->tl->dir, tp->files[i].path);
  308. again:
  309. if (stat(path, &sb) == -1) {
  310. if (errno == ENOENT) {
  311. errno = vopen(&fd, O_CREAT|O_RDWR, "%s", path);
  312. if (errno != 0 || close(fd) != 0)
  313. return errno;
  314. goto again;
  315. } else
  316. return errno;
  317. } else if (sb.st_size > tp->files[i].length) {
  318. if (truncate(path, tp->files[i].length) != 0)
  319. return errno;
  320. goto again;
  321. } else {
  322. ret[i].mtime = sb.st_mtime;
  323. ret[i].size = sb.st_size;
  324. }
  325. }
  326. return 0;
  327. }
  328. void
  329. startup_test_end(struct torrent *tp, int unclean)
  330. {
  331. struct content *cm = tp->cm;
  332. bzero(cm->pos_field, ceil(tp->npieces / 8.0));
  333. for (uint32_t piece = 0; piece < tp->npieces; piece++) {
  334. if (cm_has_piece(tp, piece)) {
  335. cm->ncontent_bytes += torrent_piece_size(tp, piece);
  336. cm->npieces_got++;
  337. set_bit(cm->pos_field, piece);
  338. continue;
  339. }
  340. uint8_t *bf = cm->block_field + cm->bppbf * piece;
  341. uint32_t nblocks = torrent_piece_blocks(tp, piece);
  342. uint32_t nblocks_got = 0;
  343. for (uint32_t i = 0; i < nblocks; i++) {
  344. if (has_bit(bf, i)) {
  345. nblocks_got++;
  346. cm->ncontent_bytes +=
  347. torrent_block_size(tp, piece, nblocks, i);
  348. }
  349. }
  350. if (nblocks_got == nblocks) {
  351. bzero(bf, cm->bppbf);
  352. cm->ncontent_bytes -= torrent_piece_size(tp, piece);
  353. } else if (nblocks_got > 0)
  354. set_bit(cm->pos_field, piece);
  355. }
  356. if (unclean) {
  357. struct start_test_data *std = BTPDQ_FIRST(&m_startq);
  358. BTPDQ_REMOVE(&m_startq, std, entry);
  359. tlib_save_resume(tp->tl, tp->nfiles, std->fts,
  360. ceil(tp->npieces / 8.0), cm->piece_field, cm->bppbf * 8,
  361. cm->block_field);
  362. free(std->fts);
  363. free(std);
  364. }
  365. if (!cm_full(tp)) {
  366. int err;
  367. if ((err = bts_open(&cm->wrs, tp->nfiles, tp->files,
  368. fd_cb_wr, tp)) != 0) {
  369. btpd_log(BTPD_L_ERROR,
  370. "failed to open write stream for '%s' (%s).\n",
  371. torrent_name(tp), strerror(err));
  372. cm_on_error(tp);
  373. return;
  374. }
  375. btpd_ev_add(&cm->save_timer, SAVE_INTERVAL);
  376. }
  377. cm->state = CM_ACTIVE;
  378. }
  379. void
  380. startup_test_run(void)
  381. {
  382. int ok;
  383. struct torrent *tp;
  384. struct content *cm;
  385. struct start_test_data * std = BTPDQ_FIRST(&m_startq);
  386. uint32_t this;
  387. if (std == NULL)
  388. return;
  389. tp = std->tp;
  390. cm = tp->cm;
  391. if (test_piece(std->tp, std->start, &ok) != 0) {
  392. cm_on_error(std->tp);
  393. return;
  394. }
  395. if (ok)
  396. set_bit(cm->piece_field, std->start);
  397. else
  398. clear_bit(cm->piece_field, std->start);
  399. this = std->start;
  400. do
  401. std->start++;
  402. while (std->start < tp->npieces && !has_bit(cm->pos_field, std->start));
  403. if (std->start >= tp->npieces)
  404. startup_test_end(tp, 1);
  405. if (!BTPDQ_EMPTY(&m_startq))
  406. event_add(&m_workev, (& (struct timeval) { 0, 0 }));
  407. }
  408. void
  409. startup_test_begin(struct torrent *tp, struct file_time_size *fts)
  410. {
  411. uint32_t piece = 0;
  412. struct content *cm = tp->cm;
  413. while (piece < tp->npieces && !has_bit(cm->pos_field, piece))
  414. piece++;
  415. if (piece < tp->npieces) {
  416. struct start_test_data *std = btpd_calloc(1, sizeof(*std));
  417. std->tp = tp;
  418. std->start = piece;
  419. std->fts = fts;
  420. BTPDQ_INSERT_TAIL(&m_startq, std, entry);
  421. if (std == BTPDQ_FIRST(&m_startq))
  422. event_add(&m_workev, (& (struct timeval) { 0, 0 }));
  423. } else {
  424. free(fts);
  425. startup_test_end(tp, 0);
  426. }
  427. }
  428. void
  429. cm_start(struct torrent *tp, int force_test)
  430. {
  431. int err, run_test = force_test;
  432. struct file_time_size *fts;
  433. struct content *cm = tp->cm;
  434. cm->state = CM_STARTING;
  435. if ((errno =
  436. bts_open(&cm->rds, tp->nfiles, tp->files, fd_cb_rd, tp)) != 0) {
  437. btpd_log(BTPD_L_ERROR, "failed to open stream for '%s' (%s).\n",
  438. torrent_name(tp), strerror(errno));
  439. cm_on_error(tp);
  440. return;
  441. }
  442. fts = btpd_calloc(tp->nfiles * 2, sizeof(*fts));
  443. if ((err = stat_and_adjust(tp, fts)) != 0) {
  444. btpd_log(BTPD_L_ERROR, "failed stat_and_adjust for '%s' (%s).\n",
  445. torrent_name(tp), strerror(err));
  446. free(fts);
  447. cm_on_error(tp);
  448. return;
  449. }
  450. if (tlib_load_resume(tp->tl, tp->nfiles, fts + tp->nfiles,
  451. ceil(tp->npieces / 8.0), cm->piece_field,
  452. cm->bppbf * tp->npieces, cm->block_field) != 0)
  453. run_test = 1;
  454. for (int i = 0; i < tp->nfiles; i++) {
  455. if ((fts[i].mtime != fts[i + tp->nfiles].mtime ||
  456. fts[i].size != fts[i + tp->nfiles].size)) {
  457. run_test = 1;
  458. break;
  459. }
  460. }
  461. if (run_test) {
  462. memset(cm->pos_field, 0xff, ceil(tp->npieces / 8.0));
  463. off_t off = 0;
  464. for (int i = 0; i < tp->nfiles; i++) {
  465. if (fts[i].size != tp->files[i].length) {
  466. uint32_t start, end;
  467. end = (off + tp->files[i].length - 1)
  468. / tp->piece_length;
  469. start = (off + fts[i].size) / tp->piece_length;
  470. while (start <= end) {
  471. clear_bit(cm->pos_field, start);
  472. clear_bit(cm->piece_field, start);
  473. bzero(cm->block_field + start * cm->bppbf, cm->bppbf);
  474. start++;
  475. }
  476. }
  477. off += tp->files[i].length;
  478. }
  479. }
  480. startup_test_begin(tp, fts);
  481. }
  482. void
  483. cm_init(void)
  484. {
  485. evtimer_set(&m_workev, worker_cb, NULL);
  486. }