Skip to content

Commit

Permalink
v1.18: pings received contact-infos on gossip socket address (backpor…
Browse files Browse the repository at this point in the history
…t of #1615) (#1635)

* pings received contact-infos on gossip socket address (#1615)

(cherry picked from commit 329a186)

# Conflicts:
#	gossip/src/cluster_info.rs
#	gossip/src/legacy_contact_info.rs

* resolves merge conflicts

---------

Co-authored-by: behzad nouri <[email protected]>
  • Loading branch information
2 people authored and willhickey committed Jun 8, 2024
1 parent e8335cf commit bfacaf6
Showing 1 changed file with 83 additions and 9 deletions.
92 changes: 83 additions & 9 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use {
gossip_error::GossipError,
ping_pong::{self, PingCache, Pong},
restart_crds_values::{RestartLastVotedForkSlots, RestartLastVotedForkSlotsError},
socketaddr, socketaddr_any,
weighted_shuffle::WeightedShuffle,
},
bincode::{serialize, serialized_size},
Expand Down Expand Up @@ -127,7 +126,7 @@ pub const MAX_INCREMENTAL_SNAPSHOT_HASHES: usize = 25;
const MAX_PRUNE_DATA_NODES: usize = 32;
/// Number of bytes in the randomly generated token sent with ping messages.
const GOSSIP_PING_TOKEN_SIZE: usize = 32;
const GOSSIP_PING_CACHE_CAPACITY: usize = 65536;
const GOSSIP_PING_CACHE_CAPACITY: usize = 126976;
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(1280);
const GOSSIP_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(1280 / 64);
pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000;
Expand Down Expand Up @@ -2422,6 +2421,20 @@ impl ClusterInfo {
}
Ok(())
};
let mut pings = Vec::new();
let mut rng = rand::thread_rng();
let keypair: Arc<Keypair> = self.keypair().clone();
let mut verify_gossip_addr = |value: &CrdsValue| {
verify_gossip_addr(
&mut rng,
&keypair,
value,
stakes,
&self.socket_addr_space,
&self.ping_cache,
&mut pings,
)
};
// Split packets based on their types.
let mut pull_requests = vec![];
let mut pull_responses = vec![];
Expand All @@ -2432,15 +2445,23 @@ impl ClusterInfo {
for (from_addr, packet) in packets {
match packet {
Protocol::PullRequest(filter, caller) => {
pull_requests.push((from_addr, filter, caller))
if verify_gossip_addr(&caller) {
pull_requests.push((from_addr, filter, caller))
}
}
Protocol::PullResponse(_, mut data) => {
check_duplicate_instance(&data)?;
pull_responses.append(&mut data);
data.retain(&mut verify_gossip_addr);
if !data.is_empty() {
pull_responses.append(&mut data);
}
}
Protocol::PushMessage(from, data) => {
Protocol::PushMessage(from, mut data) => {
check_duplicate_instance(&data)?;
push_messages.push((from, data));
data.retain(&mut verify_gossip_addr);
if !data.is_empty() {
push_messages.push((from, data));
}
}
Protocol::PruneMessage(_from, data) => prune_messages.push(data),
Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)),
Expand All @@ -2454,6 +2475,17 @@ impl ClusterInfo {
}
push_messages.retain(|(_, data)| !data.is_empty());
}
if !pings.is_empty() {
self.stats
.packets_sent_gossip_requests_count
.add_relaxed(pings.len() as u64);
let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler,
"ping_contact_infos",
&pings,
);
let _ = response_sender.send(packet_batch);
}
self.handle_batch_ping_messages(ping_messages, recycler, response_sender);
self.handle_batch_prune_messages(prune_messages, stakes);
self.handle_batch_push_messages(
Expand Down Expand Up @@ -2722,9 +2754,12 @@ impl ClusterInfo {
shred_version: u16,
) -> (ContactInfo, UdpSocket, Option<TcpListener>) {
let bind_ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
let (_, gossip_socket) = bind_in_range(bind_ip_addr, VALIDATOR_PORT_RANGE).unwrap();
let contact_info = Self::gossip_contact_info(id, socketaddr_any!(), shred_version);

let (port, gossip_socket) = bind_in_range(bind_ip_addr, VALIDATOR_PORT_RANGE).unwrap();
let contact_info = Self::gossip_contact_info(
id,
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
shred_version,
);
(contact_info, gossip_socket, None)
}
}
Expand Down Expand Up @@ -3145,6 +3180,44 @@ fn filter_on_shred_version(
}
}

// If the CRDS value is an unstaked contact-info, verifies if
// it has responded to ping on its gossip socket address.
// Returns false if the CRDS value should be discarded.
#[must_use]
fn verify_gossip_addr<R: Rng + CryptoRng>(
rng: &mut R,
keypair: &Keypair,
value: &CrdsValue,
stakes: &HashMap<Pubkey, u64>,
socket_addr_space: &SocketAddrSpace,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Protocol /* ::PingMessage */)>,
) -> bool {
let (pubkey, addr) = match &value.data {
CrdsData::ContactInfo(node) => (node.pubkey(), node.gossip()),
CrdsData::LegacyContactInfo(node) => (node.pubkey(), node.gossip()),
_ => return true, // If not a contact-info, nothing to verify.
};
// For (sufficiently) staked nodes, don't bother with ping/pong.
if stakes.get(pubkey) >= Some(&MIN_STAKE_FOR_GOSSIP) {
return true;
}
// Invalid addresses are not verifiable.
let Some(addr) = addr.ok().filter(|addr| socket_addr_space.check(addr)) else {
return false;
};
let (out, ping) = {
let node = (*pubkey, addr);
let mut pingf = move || Ping::new_rand(rng, keypair).ok();
let mut ping_cache = ping_cache.lock().unwrap();
ping_cache.check(Instant::now(), node, &mut pingf)
};
if let Some(ping) = ping {
pings.push((addr, Protocol::PingMessage(ping)));
}
out
}

#[cfg(test)]
mod tests {
use {
Expand All @@ -3153,6 +3226,7 @@ mod tests {
crds_gossip_pull::tests::MIN_NUM_BLOOM_FILTERS,
crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote},
duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS},
socketaddr,
},
itertools::izip,
solana_ledger::shred::Shredder,
Expand Down

0 comments on commit bfacaf6

Please sign in to comment.