From 89a95cbdf5b4b0b8c82ca937156ba0db1205f73a Mon Sep 17 00:00:00 2001 From: Richard Nyberg Date: Fri, 6 Feb 2009 22:43:48 +0100 Subject: [PATCH] Split peer information. Make id hash functions available. struct peer is now peer and meta_peer. meta_peer can be used as a handle that won't be affected if a peer vanishes. The meta_peers are kept in a hash table to enable fast lookup by peer id. --- btpd/btpd.h | 3 ++ btpd/net.c | 42 +++++++++--------- btpd/net_types.h | 15 +++++-- btpd/peer.c | 111 +++++++++++++++++++++++++++++++---------------- btpd/peer.h | 2 + btpd/tlib.c | 14 +----- btpd/upload.c | 22 +++++----- btpd/util.c | 12 +++++ 8 files changed, 137 insertions(+), 84 deletions(-) diff --git a/btpd/btpd.h b/btpd/btpd.h index c473b7f..70316a7 100644 --- a/btpd/btpd.h +++ b/btpd/btpd.h @@ -85,6 +85,9 @@ void btpd_timer_del(struct timeout *to); void btpd_shutdown(void); int btpd_is_stopping(void); +int btpd_id_eq(const void *k1, const void *k2); +uint32_t btpd_id_hash(const void *k); + const uint8_t *btpd_get_peer_id(void); void td_acquire_lock(void); diff --git a/btpd/net.c b/btpd/net.c index 3e76cb7..d5ef575 100644 --- a/btpd/net.c +++ b/btpd/net.c @@ -26,16 +26,7 @@ struct peer_tq net_unattached = BTPDQ_HEAD_INITIALIZER(net_unattached); int net_torrent_has_peer(struct net *n, const uint8_t *id) { - int has = 0; - struct peer *p = BTPDQ_FIRST(&n->peers); - while (p != NULL) { - if (bcmp(p->id, id, 20) == 0) { - has = 1; - break; - } - p = BTPDQ_NEXT(p, p_entry); - } - return has; + return mptbl_find(n->mptbl, id) != NULL; } void @@ -45,8 +36,11 @@ net_create(struct torrent *tp) n->tp = tp; tp->net = n; + if ((n->mptbl = mptbl_create(3, btpd_id_eq, btpd_id_hash)) == NULL) + btpd_err("Out of memory.\n"); + BTPDQ_INIT(&n->getlst); - + n->busy_field = btpd_calloc(ceil(tp->npieces / 8.0), 1); n->piece_count = btpd_calloc(tp->npieces, sizeof(*n->piece_count)); } @@ -54,6 +48,14 @@ net_create(struct torrent *tp) void net_kill(struct torrent *tp) { + struct htbl_iter it; + struct meta_peer *mp = mptbl_iter_first(tp->net->mptbl, &it); + while (mp != NULL) { + struct meta_peer *mps = mp; + mp = mptbl_iter_del(&it); + mp_kill(mps); + } + mptbl_free(tp->net->mptbl); free(tp->net->piece_count); free(tp->net->busy_field); free(tp->net); @@ -235,7 +237,7 @@ net_dispatch_msg(struct peer *p, const char *buf) res = 1; break; case MSG_REQUEST: - if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) { + if ((p->mp->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) { index = dec_be32(buf); begin = dec_be32(buf + 4); length = dec_be32(buf + 8); @@ -310,7 +312,7 @@ net_state(struct peer *p, const char *buf) peer_set_in_state(p, SHAKE_INFO, 20); break; case SHAKE_INFO: - if (p->flags & PF_INCOMING) { + if (p->mp->flags & PF_INCOMING) { struct torrent *tp = torrent_by_hash(buf); if (tp == NULL || !net_active(tp)) goto bad; @@ -324,7 +326,7 @@ net_state(struct peer *p, const char *buf) if ((net_torrent_has_peer(p->n, buf) || bcmp(buf, btpd_get_peer_id(), 20) == 0)) goto bad; - bcopy(buf, p->id, 20); + bcopy(buf, p->mp->id, 20); peer_on_shake(p); peer_set_in_state(p, BTP_MSGSIZE, 4); break; @@ -561,14 +563,14 @@ net_bw_tick(void) while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL && m_bw_bytes_in > 0) { BTPDQ_REMOVE(&net_bw_readq, p, rq_entry); btpd_ev_enable(&p->ioev, EV_READ); - p->flags &= ~PF_ON_READQ; + p->mp->flags &= ~PF_ON_READQ; m_bw_bytes_in -= net_read(p, m_bw_bytes_in); } } else { while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL) { BTPDQ_REMOVE(&net_bw_readq, p, rq_entry); btpd_ev_enable(&p->ioev, EV_READ); - p->flags &= ~PF_ON_READQ; + p->mp->flags &= ~PF_ON_READQ; net_read(p, 0); } } @@ -578,14 +580,14 @@ net_bw_tick(void) && m_bw_bytes_out > 0)) { BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry); btpd_ev_enable(&p->ioev, EV_WRITE); - p->flags &= ~PF_ON_WRITEQ; + p->mp->flags &= ~PF_ON_WRITEQ; m_bw_bytes_out -= net_write(p, m_bw_bytes_out); } } else { while ((p = BTPDQ_FIRST(&net_bw_writeq)) != NULL) { BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry); btpd_ev_enable(&p->ioev, EV_WRITE); - p->flags &= ~PF_ON_WRITEQ; + p->mp->flags &= ~PF_ON_WRITEQ; net_write(p, 0); } } @@ -622,7 +624,7 @@ net_read_cb(struct peer *p) m_bw_bytes_in -= net_read(p, m_bw_bytes_in); else { btpd_ev_disable(&p->ioev, EV_READ); - p->flags |= PF_ON_READQ; + p->mp->flags |= PF_ON_READQ; BTPDQ_INSERT_TAIL(&net_bw_readq, p, rq_entry); } } @@ -636,7 +638,7 @@ net_write_cb(struct peer *p) m_bw_bytes_out -= net_write(p, m_bw_bytes_out); else { btpd_ev_disable(&p->ioev, EV_WRITE); - p->flags |= PF_ON_WRITEQ; + p->mp->flags |= PF_ON_WRITEQ; BTPDQ_INSERT_TAIL(&net_bw_writeq, p, wq_entry); } } diff --git a/btpd/net_types.h b/btpd/net_types.h index a4db2e6..8b7438e 100644 --- a/btpd/net_types.h +++ b/btpd/net_types.h @@ -21,6 +21,7 @@ struct net { unsigned npeers; struct peer_tq peers; + struct mptbl *mptbl; }; enum input_state { @@ -33,16 +34,24 @@ enum input_state { BTP_MSGBODY }; +struct meta_peer { + struct peer *p; + HTBL_ENTRY(chain); + uint16_t flags; + uint16_t refs; + uint8_t id[20]; +}; + +HTBL_TYPE(mptbl, meta_peer, uint8_t, id, chain); + struct peer { int sd; - uint16_t flags; uint8_t *piece_field; uint32_t npieces; uint32_t nwant; - uint8_t id[20]; - struct net *n; + struct meta_peer *mp; struct block_request_tq my_reqs; diff --git a/btpd/peer.c b/btpd/peer.c index db9dd17..3f35773 100644 --- a/btpd/peer.c +++ b/btpd/peer.c @@ -2,6 +2,36 @@ #include +struct meta_peer * +mp_create(void) +{ + return btpd_calloc(1, sizeof(struct meta_peer)); +} + +void +mp_kill(struct meta_peer *mp) +{ + free(mp); +} + +void +mp_hold(struct meta_peer *mp) +{ + mp->refs++; +} + +void +mp_drop(struct meta_peer *mp, struct net *n) +{ + assert(mp->refs > 0); + mp->refs--; + if (mp->refs == 0) { + if (mp->flags & PF_ATTACHED) + assert(mptbl_remove(n->mptbl, mp->id) == mp); + mp_kill(mp); + } +} + void peer_kill(struct peer *p) { @@ -9,7 +39,7 @@ peer_kill(struct peer *p) btpd_log(BTPD_L_CONN, "killed peer %p\n", p); - if (p->flags & PF_ATTACHED) { + if (p->mp->flags & PF_ATTACHED) { BTPDQ_REMOVE(&p->n->peers, p, p_entry); p->n->npeers--; if (p->n->active) { @@ -18,9 +48,9 @@ peer_kill(struct peer *p) } } else BTPDQ_REMOVE(&net_unattached, p, p_entry); - if (p->flags & PF_ON_READQ) + if (p->mp->flags & PF_ON_READQ) BTPDQ_REMOVE(&net_bw_readq, p, rq_entry); - if (p->flags & PF_ON_WRITEQ) + if (p->mp->flags & PF_ON_WRITEQ) BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry); btpd_ev_del(&p->ioev); @@ -34,6 +64,8 @@ peer_kill(struct peer *p) nl = next; } + p->mp->p = NULL; + mp_drop(p->mp, p->n); if (p->in.buf != NULL) free(p->in.buf); if (p->piece_field != NULL) @@ -83,9 +115,9 @@ peer_unsend(struct peer *p, struct nb_link *nl) nb_drop(nl->nb); free(nl); if (BTPDQ_EMPTY(&p->outq)) { - if (p->flags & PF_ON_WRITEQ) { + if (p->mp->flags & PF_ON_WRITEQ) { BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry); - p->flags &= ~PF_ON_WRITEQ; + p->mp->flags &= ~PF_ON_WRITEQ; } else btpd_ev_disable(&p->ioev, EV_WRITE); } @@ -106,7 +138,7 @@ peer_sent(struct peer *p, struct net_buf *nb) break; case NB_UNCHOKE: btpd_log(BTPD_L_MSG, "sent unchoke to %p\n", p); - p->flags &= ~PF_NO_REQUESTS; + p->mp->flags &= ~PF_NO_REQUESTS; break; case NB_INTEREST: btpd_log(BTPD_L_MSG, "sent interest to %p\n", p); @@ -199,7 +231,7 @@ peer_cancel(struct peer *p, struct block_request *req, struct net_buf *nb) void peer_unchoke(struct peer *p) { - p->flags &= ~PF_I_CHOKE; + p->mp->flags &= ~PF_I_CHOKE; peer_send(p, nb_create_unchoke()); } @@ -218,7 +250,7 @@ peer_choke(struct peer *p) nl = next; } - p->flags |= PF_I_CHOKE; + p->mp->flags |= PF_I_CHOKE; peer_send(p, nb_create_choke()); } @@ -229,7 +261,7 @@ peer_want(struct peer *p, uint32_t index) p->nwant++; if (p->nwant == 1) { if (p->nreqs_out == 0) { - assert((p->flags & PF_DO_UNWANT) == 0); + assert((p->mp->flags & PF_DO_UNWANT) == 0); int unsent = 0; struct nb_link *nl = BTPDQ_LAST(&p->outq, nb_tq); if (nl != NULL && nl->nb->type == NB_UNINTEREST) @@ -237,10 +269,10 @@ peer_want(struct peer *p, uint32_t index) if (!unsent) peer_send(p, nb_create_interest()); } else { - assert((p->flags & PF_DO_UNWANT) != 0); - p->flags &= ~PF_DO_UNWANT; + assert((p->mp->flags & PF_DO_UNWANT) != 0); + p->mp->flags &= ~PF_DO_UNWANT; } - p->flags |= PF_I_WANT; + p->mp->flags |= PF_I_WANT; } } @@ -250,12 +282,12 @@ peer_unwant(struct peer *p, uint32_t index) assert(p->nwant > 0); p->nwant--; if (p->nwant == 0) { - p->flags &= ~PF_I_WANT; + p->mp->flags &= ~PF_I_WANT; p->t_nointerest = btpd_seconds; if (p->nreqs_out == 0) peer_send(p, nb_create_uninterest()); else - p->flags |= PF_DO_UNWANT; + p->mp->flags |= PF_DO_UNWANT; } } @@ -264,8 +296,12 @@ peer_create_common(int sd) { struct peer *p = btpd_calloc(1, sizeof(*p)); + p->mp = mp_create(); + mp_hold(p->mp); + p->mp->p = p; + p->sd = sd; - p->flags = PF_I_CHOKE | PF_P_CHOKE; + p->mp->flags = PF_I_CHOKE | PF_P_CHOKE; p->t_created = btpd_seconds; p->t_lastwrite = btpd_seconds; p->t_nointerest = btpd_seconds; @@ -285,7 +321,7 @@ void peer_create_in(int sd) { struct peer *p = peer_create_common(sd); - p->flags |= PF_INCOMING; + p->mp->flags |= PF_INCOMING; } void @@ -344,9 +380,9 @@ peer_create_out_compact(struct net *n, int family, const char *compact) void peer_on_no_reqs(struct peer *p) { - if ((p->flags & PF_DO_UNWANT) != 0) { + if ((p->mp->flags & PF_DO_UNWANT) != 0) { assert(p->nwant == 0); - p->flags &= ~PF_DO_UNWANT; + p->mp->flags &= ~PF_DO_UNWANT; peer_send(p, nb_create_uninterest()); } } @@ -362,8 +398,8 @@ peer_on_shake(struct peer *p) { uint8_t printid[21]; int i; - for (i = 0; i < 20 && isprint(p->id[i]); i++) - printid[i] = p->id[i]; + for (i = 0; i < 20 && isprint(p->mp->id[i]); i++) + printid[i] = p->mp->id[i]; printid[i] = '\0'; btpd_log(BTPD_L_MSG, "received shake(%s) from %p\n", printid, p); p->piece_field = btpd_calloc(1, (int)ceil(p->n->tp->npieces / 8.0)); @@ -377,9 +413,10 @@ peer_on_shake(struct peer *p) } } + mptbl_insert(p->n->mptbl, p->mp); BTPDQ_REMOVE(&net_unattached, p, p_entry); BTPDQ_INSERT_HEAD(&p->n->peers, p, p_entry); - p->flags |= PF_ATTACHED; + p->mp->flags |= PF_ATTACHED; p->n->npeers++; ul_on_new_peer(p); @@ -390,10 +427,10 @@ void peer_on_choke(struct peer *p) { btpd_log(BTPD_L_MSG, "received choke from %p\n", p); - if ((p->flags & PF_P_CHOKE) != 0) + if ((p->mp->flags & PF_P_CHOKE) != 0) return; else { - p->flags |= PF_P_CHOKE; + p->mp->flags |= PF_P_CHOKE; dl_on_choke(p); struct nb_link *nl = BTPDQ_FIRST(&p->outq); while (nl != NULL) { @@ -409,10 +446,10 @@ void peer_on_unchoke(struct peer *p) { btpd_log(BTPD_L_MSG, "received unchoke from %p\n", p); - if ((p->flags & PF_P_CHOKE) == 0) + if ((p->mp->flags & PF_P_CHOKE) == 0) return; else { - p->flags &= ~PF_P_CHOKE; + p->mp->flags &= ~PF_P_CHOKE; dl_on_unchoke(p); } } @@ -421,10 +458,10 @@ void peer_on_interest(struct peer *p) { btpd_log(BTPD_L_MSG, "received interest from %p\n", p); - if ((p->flags & PF_P_WANT) != 0) + if ((p->mp->flags & PF_P_WANT) != 0) return; else { - p->flags |= PF_P_WANT; + p->mp->flags |= PF_P_WANT; ul_on_interest(p); } } @@ -433,10 +470,10 @@ void peer_on_uninterest(struct peer *p) { btpd_log(BTPD_L_MSG, "received uninterest from %p\n", p); - if ((p->flags & PF_P_WANT) == 0) + if ((p->mp->flags & PF_P_WANT) == 0) return; else { - p->flags &= ~PF_P_WANT; + p->mp->flags &= ~PF_P_WANT; p->t_nointerest = btpd_seconds; ul_on_uninterest(p); } @@ -497,14 +534,14 @@ peer_on_request(struct peer *p, uint32_t index, uint32_t begin, { btpd_log(BTPD_L_MSG, "received request(%u,%u,%u) from %p\n", index, begin, length, p); - if ((p->flags & PF_NO_REQUESTS) == 0) { + if ((p->mp->flags & PF_NO_REQUESTS) == 0) { peer_send(p, nb_create_piece(index, begin, length)); peer_send(p, nb_create_torrentdata()); p->npiece_msgs++; if (p->npiece_msgs >= MAXPIECEMSGS) { peer_send(p, nb_create_choke()); peer_send(p, nb_create_unchoke()); - p->flags |= PF_NO_REQUESTS; + p->mp->flags |= PF_NO_REQUESTS; } } } @@ -531,7 +568,7 @@ peer_on_cancel(struct peer *p, uint32_t index, uint32_t begin, void peer_on_tick(struct peer *p) { - if (p->flags & PF_ATTACHED) { + if (p->mp->flags & PF_ATTACHED) { if (BTPDQ_EMPTY(&p->outq)) { if (btpd_seconds - p->t_lastwrite >= 120) peer_keepalive(p); @@ -539,7 +576,7 @@ peer_on_tick(struct peer *p) btpd_log(BTPD_L_CONN, "write attempt timed out.\n"); goto kill; } - if ((cm_full(p->n->tp) && !(p->flags & PF_P_WANT) && + if ((cm_full(p->n->tp) && !(p->mp->flags & PF_P_WANT) && btpd_seconds - p->t_nointerest >= 600)) { btpd_log(BTPD_L_CONN, "no interest for 10 minutes.\n"); goto kill; @@ -556,7 +593,7 @@ kill: int peer_chokes(struct peer *p) { - return p->flags & PF_P_CHOKE; + return p->mp->flags & PF_P_CHOKE; } int @@ -574,13 +611,13 @@ peer_laden(struct peer *p) int peer_wanted(struct peer *p) { - return (p->flags & PF_I_WANT) == PF_I_WANT; + return (p->mp->flags & PF_I_WANT) == PF_I_WANT; } int peer_leech_ok(struct peer *p) { - return (p->flags & (PF_I_WANT|PF_P_CHOKE)) == PF_I_WANT; + return (p->mp->flags & (PF_I_WANT|PF_P_CHOKE)) == PF_I_WANT; } int @@ -592,7 +629,7 @@ peer_active_down(struct peer *p) int peer_active_up(struct peer *p) { - return (p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT + return (p->mp->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT || p->npiece_msgs > 0; } diff --git a/btpd/peer.h b/btpd/peer.h index 61b022a..0dc38bb 100644 --- a/btpd/peer.h +++ b/btpd/peer.h @@ -64,4 +64,6 @@ int peer_has(struct peer *p, uint32_t index); int peer_leech_ok(struct peer *p); int peer_full(struct peer *p); +void mp_kill(struct meta_peer *mp); + #endif diff --git a/btpd/tlib.c b/btpd/tlib.c index 12f786e..8909c1a 100644 --- a/btpd/tlib.c +++ b/btpd/tlib.c @@ -295,18 +295,6 @@ num_hash(const void *k) return *(const unsigned *)k; } -static int -id_test(const void *k1, const void *k2) -{ - return bcmp(k1, k2, 20) == 0; -} - -static uint32_t -id_hash(const void *k) -{ - return dec_be32(k + 16); -} - void tlib_init(void) { @@ -316,7 +304,7 @@ tlib_init(void) char file[PATH_MAX]; m_numtbl = numtbl_create(1, num_test, num_hash); - m_hashtbl = hashtbl_create(1, id_test, id_hash); + m_hashtbl = hashtbl_create(1, btpd_id_eq, btpd_id_hash); if (m_numtbl == NULL || m_hashtbl == NULL) btpd_err("Out of memory.\n"); diff --git a/btpd/upload.c b/btpd/upload.c index 8e6a0dc..ddde8a1 100644 --- a/btpd/upload.c +++ b/btpd/upload.c @@ -33,12 +33,12 @@ choke_do(void) if (m_max_uploads < 0) { struct peer *p; BTPDQ_FOREACH(p, &m_peerq, ul_entry) - if (p->flags & PF_I_CHOKE) + if (p->mp->flags & PF_I_CHOKE) peer_unchoke(p); } else if (m_max_uploads == 0) { struct peer *p; BTPDQ_FOREACH(p, &m_peerq, ul_entry) - if ((p->flags & PF_I_CHOKE) == 0) + if ((p->mp->flags & PF_I_CHOKE) == 0) peer_choke(p); } else { struct peer_sort worthy[m_npeers]; @@ -68,9 +68,9 @@ choke_do(void) bzero(unchoked, sizeof(unchoked)); for (i = nworthy - 1; i >= 0 && found < m_max_uploads - 1; i--) { - if ((worthy[i].p->flags & PF_P_WANT) != 0) + if ((worthy[i].p->mp->flags & PF_P_WANT) != 0) found++; - if ((worthy[i].p->flags & PF_I_CHOKE) != 0) + if ((worthy[i].p->mp->flags & PF_I_CHOKE) != 0) peer_unchoke(worthy[i].p); unchoked[worthy[i].i] = 1; } @@ -79,12 +79,12 @@ choke_do(void) BTPDQ_FOREACH(p, &m_peerq, ul_entry) { if (!unchoked[i]) { if (found < m_max_uploads && !peer_full(p)) { - if (p->flags & PF_P_WANT) + if (p->mp->flags & PF_P_WANT) found++; - if (p->flags & PF_I_CHOKE) + if (p->mp->flags & PF_I_CHOKE) peer_unchoke(p); } else { - if ((p->flags & PF_I_CHOKE) == 0) + if ((p->mp->flags & PF_I_CHOKE) == 0) peer_choke(p); } } @@ -98,7 +98,7 @@ shuffle_optimists(void) { for (int i = 0; i < m_npeers; i++) { struct peer *p = BTPDQ_FIRST(&m_peerq); - if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == (PF_P_WANT|PF_I_CHOKE)) { + if ((p->mp->flags & (PF_P_WANT|PF_I_CHOKE)) == (PF_P_WANT|PF_I_CHOKE)) { break; } else { BTPDQ_REMOVE(&m_peerq, p, ul_entry); @@ -143,7 +143,7 @@ ul_on_lost_peer(struct peer *p) assert(m_npeers > 0); BTPDQ_REMOVE(&m_peerq, p, ul_entry); m_npeers--; - if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) + if ((p->mp->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) choke_do(); } @@ -161,14 +161,14 @@ ul_on_lost_torrent(struct net *n) void ul_on_interest(struct peer *p) { - if ((p->flags & PF_I_CHOKE) == 0) + if ((p->mp->flags & PF_I_CHOKE) == 0) choke_do(); } void ul_on_uninterest(struct peer *p) { - if ((p->flags & PF_I_CHOKE) == 0) + if ((p->mp->flags & PF_I_CHOKE) == 0) choke_do(); } diff --git a/btpd/util.c b/btpd/util.c index 9019bfd..a3ce23e 100644 --- a/btpd/util.c +++ b/btpd/util.c @@ -3,6 +3,18 @@ #include #include +int +btpd_id_eq(const void *k1, const void *k2) +{ + return bcmp(k1, k2, 20) == 0; +} + +uint32_t +btpd_id_hash(const void *k) +{ + return dec_be32(k + 16); +} + void * btpd_malloc(size_t size) {