From 0d3575689dfc1d475e4d4e7330b910e2de9b0ddf Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 8 Jan 2023 11:38:36 -0500 Subject: [PATCH] adds new contact-info with forward compatible sockets The commit implement new ContactInfo where * Ports and IP addresses are specified separately so that unique IP addresses can only be specified once. * Different sockets (tvu, tpu, etc) are specified by opaque u8 tags so that adding and removing sockets is backward and forward compatible. * solana_version::Version is also embedded in so that it won't need to be gossiped separately. * NodeInstance is also rolled in by adding a field identifying when the instance was first created so that it won't need to be gossiped separately. Update plan: * Once the cluster is able to ingest the new type (i.e. this patch), a 2nd patch will start gossiping the new ContactInfo along with the LegacyContactInfo. * Once all nodes in the cluster gossip the new ContactInfo, a 3rd patch will start solely using the new ContactInfo while still gossiping the old LegacyContactInfo. * Once all nodes in the cluster solely use the new ContactInfo, a 4th patch will stop gossiping the old LegacyContactInfo. --- Cargo.lock | 1 + frozen-abi/src/abi_example.rs | 6 + gossip/Cargo.toml | 1 + gossip/src/cluster_info.rs | 3 +- gossip/src/cluster_info_metrics.rs | 4 + gossip/src/contact_info.rs | 619 +++++++++++++++++++++++++++++ gossip/src/crds.rs | 4 +- gossip/src/crds_gossip_pull.rs | 7 +- gossip/src/crds_gossip_push.rs | 7 +- gossip/src/crds_value.rs | 9 + gossip/src/lib.rs | 1 + programs/sbf/Cargo.lock | 1 + 12 files changed, 659 insertions(+), 4 deletions(-) create mode 100644 gossip/src/contact_info.rs diff --git a/Cargo.lock b/Cargo.lock index a6b82eecf0041b..4df975e98e0409 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5528,6 +5528,7 @@ dependencies = [ "solana-tpu-client", "solana-version", "solana-vote-program", + "static_assertions", "thiserror", ] diff --git a/frozen-abi/src/abi_example.rs b/frozen-abi/src/abi_example.rs index 8936ccde63f156..4ad447136e0f85 100644 --- a/frozen-abi/src/abi_example.rs +++ b/frozen-abi/src/abi_example.rs @@ -464,6 +464,12 @@ impl AbiExample for SocketAddr { } } +impl AbiExample for IpAddr { + fn example() -> Self { + IpAddr::V4(Ipv4Addr::UNSPECIFIED) + } +} + // This is a control flow indirection needed for digesting all variants of an enum pub trait AbiEnumVisitor: Serialize { fn visit_for_abi(&self, digester: &mut AbiDigester) -> DigestResult; diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 600770e03883be..715af411189fd5 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -47,6 +47,7 @@ solana-thin-client = { path = "../thin-client", version = "=1.15.0" } solana-tpu-client = { path = "../tpu-client", version = "=1.15.0", default-features = false } solana-version = { path = "../version", version = "=1.15.0" } solana-vote-program = { path = "../programs/vote", version = "=1.15.0" } +static_assertions = "1.1.0" thiserror = "1.0" [dev-dependencies] diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index e1cf8809387511..e96a35c8ae80e2 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -267,7 +267,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "Aui5aMV3SK41tRQN14sgCMK3qp6r9dboLXNAHEBKFzii")] +#[frozen_abi(digest = "avtjwK4ZjdAVfR8ZqqJkxbbe7SXSvhX5Cv7hnNjbZ7g")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { @@ -377,6 +377,7 @@ impl Sanitize for Protocol { fn retain_staked(values: &mut Vec, stakes: &HashMap) { values.retain(|value| { match value.data { + CrdsData::ContactInfo(_) => true, CrdsData::LegacyContactInfo(_) => true, // May Impact new validators starting up without any stake yet. CrdsData::Vote(_, _) => true, diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 1a94793a4f1efa..fa35fd3411ca28 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -641,6 +641,8 @@ pub(crate) fn submit_gossip_stats( crds_stats.pull.counts[10], i64 ), + ("ContactInfo-push", crds_stats.push.counts[11], i64), + ("ContactInfo-pull", crds_stats.pull.counts[11], i64), ( "all-push", crds_stats.push.counts.iter().sum::(), @@ -684,6 +686,8 @@ pub(crate) fn submit_gossip_stats( crds_stats.pull.fails[10], i64 ), + ("ContactInfo-push", crds_stats.push.fails[11], i64), + ("ContactInfo-pull", crds_stats.pull.fails[11], i64), ("all-push", crds_stats.push.fails.iter().sum::(), i64), ("all-pull", crds_stats.pull.fails.iter().sum::(), i64), ); diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs new file mode 100644 index 00000000000000..c99322516a2f0c --- /dev/null +++ b/gossip/src/contact_info.rs @@ -0,0 +1,619 @@ +pub use crate::legacy_contact_info::LegacyContactInfo; +use { + crate::crds_value::MAX_WALLCLOCK, + matches::debug_assert_matches, + serde::{Deserialize, Deserializer, Serialize}, + solana_sdk::{ + pubkey::Pubkey, + sanitize::{Sanitize, SanitizeError}, + serde_varint, short_vec, + }, + static_assertions::const_assert_eq, + std::{ + collections::HashSet, + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::{SystemTime, UNIX_EPOCH}, + }, + thiserror::Error, +}; + +const SOCKET_TAG_GOSSIP: u8 = 0; +const SOCKET_TAG_REPAIR: u8 = 1; +const SOCKET_TAG_RPC: u8 = 2; +const SOCKET_TAG_RPC_PUBSUB: u8 = 3; +const SOCKET_TAG_SERVE_REPAIR: u8 = 4; +const SOCKET_TAG_TPU: u8 = 5; +const SOCKET_TAG_TPU_FORWARDS: u8 = 6; +const SOCKET_TAG_TPU_FORWARDS_QUIC: u8 = 7; +const SOCKET_TAG_TPU_QUIC: u8 = 8; +const SOCKET_TAG_TPU_VOTE: u8 = 9; +const SOCKET_TAG_TVU: u8 = 10; +const SOCKET_TAG_TVU_FORWARDS: u8 = 11; +const_assert_eq!(SOCKET_CACHE_SIZE, 12); +const SOCKET_CACHE_SIZE: usize = SOCKET_TAG_TVU_FORWARDS as usize + 1usize; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Duplicate IP address: {0}")] + DuplicateIpAddr(IpAddr), + #[error("Duplicate socket: {0}")] + DuplicateSocket(/*key:*/ u8), + #[error("Invalid IP address index: {index}, num addrs: {num_addrs}")] + InvalidIpAddrIndex { index: u8, num_addrs: usize }, + #[error("Invalid port: {0}")] + InvalidPort(/*port:*/ u16), + #[error("IP addresses saturated")] + IpAddrsSaturated, + #[error("Multicast IP address: {0}")] + MulticastIpAddr(IpAddr), + #[error("Port offsets overflow")] + PortOffsetsOverflow, + #[error("Socket not found: {0}")] + SocketNotFound(/*key:*/ u8), + #[error("Unspecified IP address: {0}")] + UnspecifiedIpAddr(IpAddr), + #[error("Unused IP address: {0}")] + UnusedIpAddr(IpAddr), +} + +#[derive(Clone, Debug, Eq, PartialEq, AbiExample, Serialize)] +pub struct ContactInfo { + pubkey: Pubkey, + #[serde(with = "serde_varint")] + wallclock: u64, + // When the node instance was first created. + // Identifies duplicate running instances. + outset: u64, + shred_version: u16, + version: solana_version::Version, + // All IP addresses are unique and referenced at least once in sockets. + #[serde(with = "short_vec")] + addrs: Vec, + // All sockets have a unique key and a valid IP address index. + #[serde(with = "short_vec")] + sockets: Vec, + #[serde(skip_serializing)] + cache: [SocketAddr; SOCKET_CACHE_SIZE], +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq, AbiExample, Deserialize, Serialize)] +struct SocketEntry { + key: u8, // Protocol identifier, e.g. tvu, tpu, etc + index: u8, // IpAddr index in the accompanying addrs vector. + #[serde(with = "serde_varint")] + offset: u16, // Port offset with respect to the previous entry. +} + +// As part of deserialization, self.addrs and self.sockets should be cross +// verified and self.cache needs to be populated. This type serves as a +// workaround since serde does not have an initializer. +// https://github.com/serde-rs/serde/issues/642 +#[derive(Deserialize)] +struct ContactInfoLite { + pubkey: Pubkey, + #[serde(with = "serde_varint")] + wallclock: u64, + outset: u64, + shred_version: u16, + version: solana_version::Version, + #[serde(with = "short_vec")] + addrs: Vec, + #[serde(with = "short_vec")] + sockets: Vec, +} + +macro_rules! get_socket { + ($name:ident, $key:ident) => { + pub fn $name(&self) -> Result { + let socket = self.cache[usize::from($key)]; + sanitize_socket(&socket)?; + Ok(socket) + } + }; +} + +impl ContactInfo { + pub fn new(pubkey: Pubkey, wallclock: u64, shred_version: u16) -> Self { + Self { + pubkey, + wallclock, + outset: { + let now = SystemTime::now(); + let elapsed = now.duration_since(UNIX_EPOCH).unwrap(); + u64::try_from(elapsed.as_micros()).unwrap() + }, + shred_version, + version: solana_version::Version::default(), + addrs: Vec::::default(), + sockets: Vec::::default(), + cache: [socket_addr_unspecified(); SOCKET_CACHE_SIZE], + } + } + + #[inline] + pub(crate) fn pubkey(&self) -> &Pubkey { + &self.pubkey + } + + #[inline] + pub(crate) fn wallclock(&self) -> u64 { + self.wallclock + } + + get_socket!(gossip, SOCKET_TAG_GOSSIP); + get_socket!(repair, SOCKET_TAG_REPAIR); + get_socket!(rpc, SOCKET_TAG_RPC); + get_socket!(rpc_pubsub, SOCKET_TAG_RPC_PUBSUB); + get_socket!(serve_repair, SOCKET_TAG_SERVE_REPAIR); + get_socket!(tpu, SOCKET_TAG_TPU); + get_socket!(tpu_forwards, SOCKET_TAG_TPU_FORWARDS); + get_socket!(tpu_forwards_quic, SOCKET_TAG_TPU_FORWARDS_QUIC); + get_socket!(tpu_quic, SOCKET_TAG_TPU_QUIC); + get_socket!(tpu_vote, SOCKET_TAG_TPU_VOTE); + get_socket!(tvu, SOCKET_TAG_TVU); + get_socket!(tvu_forwards, SOCKET_TAG_TVU_FORWARDS); + + #[cfg(test)] + fn get_socket(&self, key: u8) -> Result { + let mut port = 0u16; + for entry in &self.sockets { + port += entry.offset; + if entry.key == key { + let addr = + self.addrs + .get(usize::from(entry.index)) + .ok_or(Error::InvalidIpAddrIndex { + index: entry.index, + num_addrs: self.addrs.len(), + })?; + let socket = SocketAddr::new(*addr, port); + sanitize_socket(&socket)?; + return Ok(socket); + } + } + Err(Error::SocketNotFound(key)) + } + + // Adds given IP address to self.addrs returning respective index. + fn push_addr(&mut self, addr: IpAddr) -> Result { + match self.addrs.iter().position(|k| k == &addr) { + Some(index) => u8::try_from(index).map_err(|_| Error::IpAddrsSaturated), + None => { + let index = u8::try_from(self.addrs.len()).map_err(|_| Error::IpAddrsSaturated)?; + self.addrs.push(addr); + Ok(index) + } + } + } + + pub fn set_socket(&mut self, key: u8, socket: SocketAddr) -> Result<(), Error> { + sanitize_socket(&socket)?; + // Remove the old entry associated with this key (if any). + self.remove_socket(key); + // Find the index at which the new socket entry would be inserted into + // self.sockets, and the respective port offset. + let mut offset = socket.port(); + let index = self.sockets.iter().position(|entry| { + offset = match offset.checked_sub(entry.offset) { + None => return true, + Some(offset) => offset, + }; + false + }); + let entry = SocketEntry { + key, + index: self.push_addr(socket.ip())?, + offset, + }; + // Insert the new entry into self.sockets. + // Adjust the port offset of the next entry (if any). + match index { + None => self.sockets.push(entry), + Some(index) => { + self.sockets[index].offset -= entry.offset; + self.sockets.insert(index, entry); + } + } + if let Some(entry) = self.cache.get_mut(usize::from(key)) { + *entry = socket; + } + debug_assert_matches!(sanitize_entries(&self.addrs, &self.sockets), Ok(())); + Ok(()) + } + + // Removes the socket associated with the specified key. + fn remove_socket(&mut self, key: u8) { + if let Some(index) = self.sockets.iter().position(|entry| entry.key == key) { + let entry = self.sockets.remove(index); + if let Some(next_entry) = self.sockets.get_mut(index) { + next_entry.offset += entry.offset; + } + self.maybe_remove_addr(entry.index); + if let Some(entry) = self.cache.get_mut(usize::from(key)) { + *entry = socket_addr_unspecified(); + } + } + } + + // Removes the IP address at the given index if + // no socket entry refrences that index. + fn maybe_remove_addr(&mut self, index: u8) { + if !self.sockets.iter().any(|entry| entry.index == index) { + self.addrs.remove(usize::from(index)); + for entry in &mut self.sockets { + if entry.index > index { + entry.index -= 1; + } + } + } + } +} + +impl<'de> Deserialize<'de> for ContactInfo { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let node = ContactInfoLite::deserialize(deserializer)?; + ContactInfo::try_from(node).map_err(serde::de::Error::custom) + } +} + +impl TryFrom for ContactInfo { + type Error = Error; + + fn try_from(node: ContactInfoLite) -> Result { + let ContactInfoLite { + pubkey, + wallclock, + outset, + shred_version, + version, + addrs, + sockets, + } = node; + sanitize_entries(&addrs, &sockets)?; + let mut node = ContactInfo { + pubkey, + wallclock, + outset, + shred_version, + version, + addrs, + sockets, + cache: [socket_addr_unspecified(); SOCKET_CACHE_SIZE], + }; + // Populate node.cache. + let mut port = 0u16; + for &SocketEntry { key, index, offset } in &node.sockets { + port += offset; + let entry = match node.cache.get_mut(usize::from(key)) { + None => continue, + Some(entry) => entry, + }; + let addr = match node.addrs.get(usize::from(index)) { + None => continue, + Some(&addr) => addr, + }; + let socket = SocketAddr::new(addr, port); + if sanitize_socket(&socket).is_ok() { + *entry = socket; + } + } + Ok(node) + } +} + +impl Sanitize for ContactInfo { + fn sanitize(&self) -> Result<(), SanitizeError> { + if self.wallclock >= MAX_WALLCLOCK { + return Err(SanitizeError::ValueOutOfBounds); + } + Ok(()) + } +} + +// Workaround until feature(const_socketaddr) is stable. +fn socket_addr_unspecified() -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), /*port:*/ 0u16) +} + +fn sanitize_socket(socket: &SocketAddr) -> Result<(), Error> { + if socket.port() == 0u16 { + return Err(Error::InvalidPort(socket.port())); + } + let addr = socket.ip(); + if addr.is_unspecified() { + return Err(Error::UnspecifiedIpAddr(addr)); + } + if addr.is_multicast() { + return Err(Error::MulticastIpAddr(addr)); + } + Ok(()) +} + +// Sanitizes deserialized IpAddr and socket entries. +fn sanitize_entries(addrs: &[IpAddr], sockets: &[SocketEntry]) -> Result<(), Error> { + // Verify that all IP addresses are unique. + { + let mut seen = HashSet::with_capacity(addrs.len()); + for addr in addrs { + if !seen.insert(addr) { + return Err(Error::DuplicateIpAddr(*addr)); + } + } + } + // Verify that all socket entries have unique key. + { + let mut mask = [0u64; 4]; // 256-bit bitmask. + for &SocketEntry { key, .. } in sockets { + let mask = &mut mask[usize::from(key / 64u8)]; + let bit = 1u64 << (key % 64u8); + if (*mask & bit) != 0u64 { + return Err(Error::DuplicateSocket(key)); + } + *mask |= bit; + } + } + // Verify that all socket entries reference a valid IP address, and + // that all IP addresses are referenced in the sockets. + { + let num_addrs = addrs.len(); + let mut hits = vec![false; num_addrs]; + for &SocketEntry { index, .. } in sockets { + *hits + .get_mut(usize::from(index)) + .ok_or(Error::InvalidIpAddrIndex { index, num_addrs })? = true; + } + if let Some(index) = hits.into_iter().position(|hit| !hit) { + return Err(Error::UnusedIpAddr(addrs[index])); + } + } + // Verify that port offsets don't overflow. + if sockets + .iter() + .fold(Some(0u16), |offset, entry| { + offset?.checked_add(entry.offset) + }) + .is_none() + { + return Err(Error::PortOffsetsOverflow); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use { + super::*, + rand::{seq::SliceRandom, Rng}, + std::{ + collections::{HashMap, HashSet}, + iter::repeat_with, + net::{Ipv4Addr, Ipv6Addr}, + ops::Range, + }, + }; + + fn new_rand_addr(rng: &mut R) -> IpAddr { + if rng.gen() { + let addr = Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()); + IpAddr::V4(addr) + } else { + let addr = Ipv6Addr::new( + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + ); + IpAddr::V6(addr) + } + } + + fn new_rand_port(rng: &mut R) -> u16 { + let port = rng.gen::(); + let bits = u16::BITS - port.leading_zeros(); + let shift = rng.gen_range(0u32, bits + 1u32); + port.checked_shr(shift).unwrap_or_default() + } + + #[test] + fn test_sanitize_entries() { + let mut rng = rand::thread_rng(); + let addrs: Vec = repeat_with(|| new_rand_addr(&mut rng)).take(5).collect(); + let mut keys: Vec = (0u8..=u8::MAX).collect(); + keys.shuffle(&mut rng); + // Duplicate IP addresses. + { + let addrs = [addrs[0], addrs[1], addrs[2], addrs[0], addrs[3]]; + assert_matches!( + sanitize_entries(&addrs, /*sockets:*/ &[]), + Err(Error::DuplicateIpAddr(_)) + ); + } + // Duplicate socket keys. + { + let keys = [0u8, 1, 5, 1, 3]; + let (index, offset) = (0u8, 0u16); + let sockets: Vec<_> = keys + .iter() + .map(|&key| SocketEntry { key, index, offset }) + .collect(); + assert_matches!( + sanitize_entries(/*addrs:*/ &[], &sockets), + Err(Error::DuplicateSocket(_)) + ); + } + // Invalid IP address index. + { + let offset = 0u16; + let sockets: Vec<_> = [1u8, 2, 1, 3, 5, 3] + .into_iter() + .zip(&keys) + .map(|(index, &key)| SocketEntry { key, index, offset }) + .collect(); + assert_matches!( + sanitize_entries(&addrs, &sockets), + Err(Error::InvalidIpAddrIndex { .. }) + ); + } + // Unused IP address. + { + let sockets: Vec<_> = (0..4u8) + .map(|key| SocketEntry { + key, + index: key, + offset: 0u16, + }) + .collect(); + assert_matches!( + sanitize_entries(&addrs, &sockets), + Err(Error::UnusedIpAddr(_)) + ); + } + // Port offsets overflow. + { + let sockets: Vec<_> = keys + .iter() + .map(|&key| SocketEntry { + key, + index: rng.gen_range(0u8, addrs.len() as u8), + offset: rng.gen_range(0u16, u16::MAX / 64), + }) + .collect(); + assert_matches!( + sanitize_entries(&addrs, &sockets), + Err(Error::PortOffsetsOverflow) + ); + } + { + let sockets: Vec<_> = keys + .iter() + .map(|&key| SocketEntry { + key, + index: rng.gen_range(0u8, addrs.len() as u8), + offset: rng.gen_range(0u16, u16::MAX / 256), + }) + .collect(); + assert_matches!(sanitize_entries(&addrs, &sockets), Ok(())); + } + } + + #[test] + fn test_round_trip() { + const KEYS: Range = 0u8..16u8; + let mut rng = rand::thread_rng(); + let addrs: Vec = repeat_with(|| new_rand_addr(&mut rng)).take(8).collect(); + let mut node = ContactInfo { + pubkey: Pubkey::new_unique(), + wallclock: rng.gen(), + outset: rng.gen(), + shred_version: rng.gen(), + version: solana_version::Version::default(), + addrs: Vec::default(), + sockets: Vec::default(), + cache: [socket_addr_unspecified(); SOCKET_CACHE_SIZE], + }; + let mut sockets = HashMap::::new(); + for _ in 0..1 << 14 { + let addr = addrs.choose(&mut rng).unwrap(); + let socket = SocketAddr::new(*addr, new_rand_port(&mut rng)); + let key = rng.gen_range(KEYS.start, KEYS.end); + if sanitize_socket(&socket).is_ok() { + sockets.insert(key, socket); + assert_matches!(node.set_socket(key, socket), Ok(())); + assert_matches!(sanitize_entries(&node.addrs, &node.sockets), Ok(())); + } else { + assert_matches!(node.set_socket(key, socket), Err(_)); + } + for key in KEYS.clone() { + let socket = sockets.get(&key); + assert_eq!(node.get_socket(key).ok().as_ref(), socket); + if usize::from(key) < SOCKET_CACHE_SIZE { + assert_eq!( + &node.cache[usize::from(key)], + socket.unwrap_or(&socket_addr_unspecified()) + ) + } + } + assert_eq!(node.gossip().ok().as_ref(), sockets.get(&SOCKET_TAG_GOSSIP)); + assert_eq!(node.repair().ok().as_ref(), sockets.get(&SOCKET_TAG_REPAIR)); + assert_eq!(node.rpc().ok().as_ref(), sockets.get(&SOCKET_TAG_RPC)); + assert_eq!( + node.rpc_pubsub().ok().as_ref(), + sockets.get(&SOCKET_TAG_RPC_PUBSUB) + ); + assert_eq!( + node.serve_repair().ok().as_ref(), + sockets.get(&SOCKET_TAG_SERVE_REPAIR) + ); + assert_eq!(node.tpu().ok().as_ref(), sockets.get(&SOCKET_TAG_TPU)); + assert_eq!( + node.tpu_forwards().ok().as_ref(), + sockets.get(&SOCKET_TAG_TPU_FORWARDS) + ); + assert_eq!( + node.tpu_forwards_quic().ok().as_ref(), + sockets.get(&SOCKET_TAG_TPU_FORWARDS_QUIC) + ); + assert_eq!( + node.tpu_quic().ok().as_ref(), + sockets.get(&SOCKET_TAG_TPU_QUIC) + ); + assert_eq!( + node.tpu_vote().ok().as_ref(), + sockets.get(&SOCKET_TAG_TPU_VOTE) + ); + assert_eq!(node.tvu().ok().as_ref(), sockets.get(&SOCKET_TAG_TVU)); + assert_eq!( + node.tvu_forwards().ok().as_ref(), + sockets.get(&SOCKET_TAG_TVU_FORWARDS) + ); + // Assert that all IP addresses are unique. + assert_eq!( + node.addrs.len(), + node.addrs + .iter() + .copied() + .collect::>() + .len() + ); + // Assert that all sockets have unique key. + assert_eq!( + node.sockets.len(), + node.sockets + .iter() + .map(|entry| entry.key) + .collect::>() + .len() + ); + // Assert that only mapped addresses are stored. + assert_eq!( + node.addrs.iter().copied().collect::>(), + sockets.values().map(SocketAddr::ip).collect::>(), + ); + // Assert that all sockets reference a valid IP address. + assert!(node + .sockets + .iter() + .map(|entry| node.addrs.get(usize::from(entry.index))) + .all(|addr| addr.is_some())); + // Assert that port offsets don't overflow. + assert!(u16::try_from( + node.sockets + .iter() + .map(|entry| u64::from(entry.offset)) + .sum::() + ) + .is_ok()); + // Assert that serde round trips. + let bytes = bincode::serialize(&node).unwrap(); + let other: ContactInfo = bincode::deserialize(&bytes).unwrap(); + assert_eq!(node, other); + } + } +} diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index b7521ed1206037..5ee14c2e194a38 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -94,7 +94,7 @@ pub enum GossipRoute { PushMessage, } -type CrdsCountsArray = [usize; 11]; +type CrdsCountsArray = [usize; 12]; pub(crate) struct CrdsDataStats { pub(crate) counts: CrdsCountsArray, @@ -682,6 +682,8 @@ impl CrdsDataStats { CrdsData::NodeInstance(_) => 8, CrdsData::DuplicateShred(_, _) => 9, CrdsData::IncrementalSnapshotHashes(_) => 10, + CrdsData::ContactInfo(_) => 11, + // Update CrdsCountsArray if new items are added here. } } } diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index cb1302272c2c68..7f2011178dcc53 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -18,7 +18,7 @@ use { crds::{Crds, GossipRoute, VersionedCrdsValue}, crds_gossip, crds_gossip_error::CrdsGossipError, - crds_value::CrdsValue, + crds_value::{CrdsData, CrdsValue}, legacy_contact_info::LegacyContactInfo as ContactInfo, ping_pong::PingCache, }, @@ -482,6 +482,11 @@ impl CrdsGossipPull { let out: Vec<_> = crds .filter_bitmask(filter.mask, filter.mask_bits) .filter(pred) + .filter(|entry| { + // Exclude the new ContactInfo from the pull responses + // until the cluster has upgraded. + !matches!(&entry.value.data, CrdsData::ContactInfo(_)) + }) .map(|entry| entry.value.clone()) .take(output_size_limit.load(Ordering::Relaxed).max(0) as usize) .collect(); diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index c4e9c22bc01970..4b390ac12ccfca 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -16,7 +16,7 @@ use { cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, crds::{Crds, CrdsError, Cursor, GossipRoute}, crds_gossip, - crds_value::CrdsValue, + crds_value::{CrdsData, CrdsValue}, ping_pong::PingCache, push_active_set::PushActiveSet, received_cache::ReceivedCache, @@ -189,6 +189,11 @@ impl CrdsGossipPush { let crds = crds.read().unwrap(); let entries = crds .get_entries(crds_cursor.deref_mut()) + .filter(|entry| { + // Exclude the new ContactInfo from outgoing push messages + // until the cluster has upgraded. + !matches!(&entry.value.data, CrdsData::ContactInfo(_)) + }) .map(|entry| &entry.value) .filter(|value| wallclock_window.contains(&value.wallclock())); for value in entries { diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 2c2c2802aca7ca..d5f18c14b5bc9c 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -1,6 +1,7 @@ use { crate::{ cluster_info::MAX_SNAPSHOT_HASHES, + contact_info::ContactInfo, deprecated, duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, epoch_slots::EpochSlots, @@ -92,6 +93,7 @@ pub enum CrdsData { NodeInstance(NodeInstance), DuplicateShred(DuplicateShredIndex, DuplicateShred), IncrementalSnapshotHashes(IncrementalSnapshotHashes), + ContactInfo(ContactInfo), } impl Sanitize for CrdsData { @@ -129,6 +131,7 @@ impl Sanitize for CrdsData { } } CrdsData::IncrementalSnapshotHashes(val) => val.sanitize(), + CrdsData::ContactInfo(node) => node.sanitize(), } } } @@ -492,6 +495,7 @@ pub enum CrdsValueLabel { NodeInstance(Pubkey), DuplicateShred(DuplicateShredIndex, Pubkey), IncrementalSnapshotHashes(Pubkey), + ContactInfo(Pubkey), } impl fmt::Display for CrdsValueLabel { @@ -512,6 +516,7 @@ impl fmt::Display for CrdsValueLabel { CrdsValueLabel::IncrementalSnapshotHashes(_) => { write!(f, "IncrementalSnapshotHashes({})", self.pubkey()) } + CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()), } } } @@ -530,6 +535,7 @@ impl CrdsValueLabel { CrdsValueLabel::NodeInstance(p) => *p, CrdsValueLabel::DuplicateShred(_, p) => *p, CrdsValueLabel::IncrementalSnapshotHashes(p) => *p, + CrdsValueLabel::ContactInfo(pubkey) => *pubkey, } } } @@ -579,6 +585,7 @@ impl CrdsValue { CrdsData::NodeInstance(node) => node.wallclock, CrdsData::DuplicateShred(_, shred) => shred.wallclock, CrdsData::IncrementalSnapshotHashes(hash) => hash.wallclock, + CrdsData::ContactInfo(node) => node.wallclock(), } } pub fn pubkey(&self) -> Pubkey { @@ -594,6 +601,7 @@ impl CrdsValue { CrdsData::NodeInstance(node) => node.from, CrdsData::DuplicateShred(_, shred) => shred.from, CrdsData::IncrementalSnapshotHashes(hash) => hash.from, + CrdsData::ContactInfo(node) => *node.pubkey(), } } pub fn label(&self) -> CrdsValueLabel { @@ -611,6 +619,7 @@ impl CrdsValue { CrdsData::IncrementalSnapshotHashes(_) => { CrdsValueLabel::IncrementalSnapshotHashes(self.pubkey()) } + CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()), } } pub fn contact_info(&self) -> Option<&LegacyContactInfo> { diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 2eb5336e6f49d1..8fd9d5f6dd6fa1 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -3,6 +3,7 @@ pub mod cluster_info; pub mod cluster_info_metrics; +pub mod contact_info; pub mod crds; pub mod crds_entry; pub mod crds_gossip; diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index a28285f7e8726b..8f45084d354ee6 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4734,6 +4734,7 @@ dependencies = [ "solana-tpu-client", "solana-version", "solana-vote-program", + "static_assertions", "thiserror", ]