Skip to content

Commit

Permalink
adds new contact-info with forward compatible sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Jan 12, 2023
1 parent 1b6024a commit e52dabb
Show file tree
Hide file tree
Showing 13 changed files with 340 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions frozen-abi/src/abi_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,14 @@ impl<T: std::cmp::Ord + AbiExample, S: AbiExample> AbiExample for BTreeMap<T, S>
}
}

// + Clone is needed to work around this rustc bug:
// https://github.com/rust-lang/rust/issues/106710
impl<T: AbiExample + Clone> AbiExample for std::borrow::Cow<'_, T> {
fn example() -> Self {
Self::Owned(T::example())
}
}

impl<T: AbiExample> AbiExample for Vec<T> {
fn example() -> Self {
info!("AbiExample for (Vec<T>): {}", type_name::<Self>());
Expand Down Expand Up @@ -464,6 +472,12 @@ impl AbiExample for SocketAddr {
}
}

impl AbiExample for IpAddr {
fn example() -> Self {
IpAddr::V4(Ipv4Addr::UNSPECIFIED)
}
}

// This is a control flow indirection needed for digesting all variants of an enum
pub trait AbiEnumVisitor: Serialize {
fn visit_for_abi(&self, digester: &mut AbiDigester) -> DigestResult;
Expand Down
1 change: 1 addition & 0 deletions gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ solana-thin-client = { path = "../thin-client", version = "=1.15.0" }
solana-tpu-client = { path = "../tpu-client", version = "=1.15.0", default-features = false }
solana-version = { path = "../version", version = "=1.15.0" }
solana-vote-program = { path = "../programs/vote", version = "=1.15.0" }
static_assertions = "1.1.0"
thiserror = "1.0"

[dev-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "Hsj6a2bmzxno1RUcSM1gzHAg2zxgw15E3feb2SimieBA")]
#[frozen_abi(digest = "B52ZEhM7hXGk8MzKuoPRG8xCQZ2cBANrhoBHaurnQ58u")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down Expand Up @@ -377,6 +377,7 @@ impl Sanitize for Protocol {
fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
values.retain(|value| {
match value.data {
CrdsData::ContactInfo(_) => true,
CrdsData::LegacyContactInfo(_) => true,
// May Impact new validators starting up without any stake yet.
CrdsData::Vote(_, _) => true,
Expand Down
4 changes: 4 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.counts[10],
i64
),
("ContactInfo-push", crds_stats.push.counts[11], i64),
("ContactInfo-pull", crds_stats.pull.counts[11], i64),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
Expand Down Expand Up @@ -686,6 +688,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.fails[10],
i64
),
("ContactInfo-push", crds_stats.push.fails[11], i64),
("ContactInfo-pull", crds_stats.pull.fails[11], i64),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);
Expand Down
295 changes: 295 additions & 0 deletions gossip/src/contact_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
#![allow(dead_code)]
#![allow(unused_variables)]
#![allow(unused_imports)]
pub use crate::legacy_contact_info::LegacyContactInfo;
use {
crate::crds_value::MAX_WALLCLOCK,
serde::{Deserialize, Deserializer, Serialize, Serializer},
solana_sdk::{
pubkey::Pubkey,
sanitize::{Sanitize, SanitizeError},
serde_varint, short_vec,
},
static_assertions::const_assert_eq,
std::{
borrow::Cow,
collections::HashMap,
net::{IpAddr, SocketAddr},
num::TryFromIntError,
},
thiserror::Error,
};

const SOCKET_TAG_GOSSIP: u8 = 0;
const SOCKET_TAG_REPAIR: u8 = 1;
const SOCKET_TAG_RPC: u8 = 2;
const SOCKET_TAG_RPC_PUBSUB: u8 = 3;
const SOCKET_TAG_SERVE_REPAIR: u8 = 4;
const SOCKET_TAG_TPU: u8 = 5;
const SOCKET_TAG_TPU_QUIC: u8 = 6;
const SOCKET_TAG_TPU_FORWARDS: u8 = 7;
const SOCKET_TAG_TPU_FORWARDS_QUIC: u8 = 8;
const SOCKET_TAG_TPU_VOTE: u8 = 9;
const SOCKET_TAG_TVU: u8 = 10;
const SOCKET_TAG_TVU_FORWARDS: u8 = 11;
const_assert_eq!(SOCKET_CACHE_SIZE, 12);
const SOCKET_CACHE_SIZE: usize = SOCKET_TAG_TVU_FORWARDS as usize + 1usize;

#[derive(Debug, Error)]
enum Error {
#[error("Invalid IP address index: {index}, num addrs: {num_addrs}")]
InvalidAddrIndex { index: u8, num_addrs: usize },
#[error("Invalid port: {0}")]
InvalidPort(/*port:*/ u16),
#[error("IP addresses saturated")]
IpAddrsSaturated,
#[error("Multicast IP address: {0}")]
MulticastAddr(IpAddr),
#[error("Socket tag not found: {0}")]
SocketNotFound(/*tag:*/ u8),
#[error("Unspecified IP address: {0}")]
UnspecifiedAddr(IpAddr),
}

#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct ContactInfo {
pubkey: Pubkey,
wallclock: u64,
shred_version: u16,
version: solana_version::Version,
addrs: Vec<IpAddr>,
// TODO: use port offset with varint? should be sorted!
sockets: Vec<(/*tag:*/ u8, /*addr-idx:*/ u8, /*port:*/ u16)>,
cache: [Option<SocketAddr>; SOCKET_CACHE_SIZE],
}

// Workaround since serde does not have an initializer for skipped fields.
// https://github.com/serde-rs/serde/issues/642
#[derive(Deserialize, Serialize)]
struct ContactInfoLite<'a> {
pubkey: Cow<'a, Pubkey>,
#[serde(with = "serde_varint")]
wallclock: u64,
shred_version: u16,
version: Cow<'a, solana_version::Version>,
#[serde(with = "short_vec")]
addrs: Cow<'a, [IpAddr]>,
#[serde(with = "short_vec")]
sockets: Cow<'a, [(u8, u8, u16)]>,
}

impl ContactInfo {
#[inline]
pub fn pubkey(&self) -> &Pubkey {
&self.pubkey
}

#[inline]
pub(crate) fn wallclock(&self) -> u64 {
self.wallclock
}

fn get_socket(&self, tag: u8) -> Option<SocketAddr> {
let &(_, index, port) = self.sockets.iter().find(|(k, _, _)| *k == tag)?;
let addr = self.addrs.get(usize::from(index))?;
let socket = SocketAddr::new(*addr, port);
sanitize_socket(&socket).ok()?;
Some(socket)
}

// Adds given IP address to self.addrs returning respective index.
fn push_addr(&mut self, addr: IpAddr) -> Result<u8, Error> {
match self.addrs.iter().position(|k| k == &addr) {
Some(index) => u8::try_from(index).map_err(|_| Error::IpAddrsSaturated),
None => {
let index = u8::try_from(self.addrs.len()).map_err(|_| Error::IpAddrsSaturated)?;
self.addrs.push(addr);
Ok(index)
}
}
}

fn set_socket(&mut self, tag: u8, socket: SocketAddr) -> Result<(), Error> {
sanitize_socket(&socket)?;
self.remove_socket(tag);
let addr = self.push_addr(socket.ip())?;
self.sockets.push((tag, addr, socket.port()));
if let Some(entry) = self.cache.get_mut(usize::from(tag)) {
*entry = Some(socket);
}
Ok(())
}

// Removes the socket with the specified tag.
fn remove_socket(&mut self, tag: u8) {
if let Some(index) = self.sockets.iter().position(|(k, _, _)| *k == tag) {
let (_, index, _) = self.sockets.remove(index);
self.maybe_remove_addr(index);
if let Some(entry) = self.cache.get_mut(usize::from(tag)) {
*entry = None;
}
}
}

// If no socket uses the IP address at the given index,
// removes the IP address and adjusts indices.
fn maybe_remove_addr(&mut self, index: u8) {
if !self.sockets.iter().any(|(_, k, _)| *k == index) {
self.addrs.remove(usize::from(index));
for (_, k, _) in &mut self.sockets {
if *k > index {
*k -= 1;
}
}
}
}
}

fn sanitize_socket(socket: &SocketAddr) -> Result<(), Error> {
if socket.port() == 0u16 {
return Err(Error::InvalidPort(socket.port()));
}
let addr = socket.ip();
if addr.is_unspecified() {
return Err(Error::UnspecifiedAddr(addr));
}
if addr.is_multicast() {
return Err(Error::MulticastAddr(addr));
}
Ok(())
}

impl Sanitize for ContactInfo {
fn sanitize(&self) -> Result<(), SanitizeError> {
if self.wallclock >= MAX_WALLCLOCK {
return Err(SanitizeError::ValueOutOfBounds);
}
Ok(())
}
}

impl Serialize for ContactInfo {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
ContactInfoLite {
pubkey: Cow::Borrowed(&self.pubkey),
wallclock: self.wallclock,
shred_version: self.shred_version,
version: Cow::Borrowed(&self.version),
addrs: Cow::Borrowed(&self.addrs),
sockets: Cow::Borrowed(&self.sockets),
}
.serialize(serializer)
}
}

impl<'de> Deserialize<'de> for ContactInfo {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let ContactInfoLite {
pubkey,
wallclock,
shred_version,
version,
addrs,
sockets,
} = ContactInfoLite::deserialize(deserializer)?;
// TODO: sanity check on the fields.
// TODO: verify port offsets don't overflow.
let mut node = ContactInfo {
pubkey: pubkey.into_owned(),
wallclock,
shred_version,
version: version.into_owned(),
addrs: addrs.into_owned(),
sockets: sockets.into_owned(),
..ContactInfo::default()
};
node.cache = std::array::from_fn(|k| node.get_socket(k as u8));
Ok(node)
}
}

#[cfg(test)]
mod tests {
use {
super::*,
rand::{seq::SliceRandom, Rng},
std::{
collections::{HashMap, HashSet},
iter::repeat_with,
net::{Ipv4Addr, Ipv6Addr},
ops::Range,
},
};

fn new_rand_addr<R: Rng>(rng: &mut R) -> IpAddr {
if rng.gen() {
let addr = Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen());
IpAddr::V4(addr)
} else {
let addr = Ipv6Addr::new(
rng.gen(),
rng.gen(),
rng.gen(),
rng.gen(),
rng.gen(),
rng.gen(),
rng.gen(),
rng.gen(),
);
IpAddr::V6(addr)
}
}

fn new_rand_port<R: Rng>(rng: &mut R) -> u16 {
let port = rng.gen::<u16>();
let bits = u16::BITS - port.leading_zeros();
let shift = rng.gen_range(0u32, bits + 1u32);
port.checked_shr(shift).unwrap_or_default()
}

#[test]
fn test_set_socket() {
const TAGS: Range<u8> = 0u8..16u8;
let mut rng = rand::thread_rng();
let addrs: Vec<IpAddr> = repeat_with(|| new_rand_addr(&mut rng)).take(8).collect();
let mut node = ContactInfo {
pubkey: Pubkey::new_unique(),
wallclock: rng.gen(),
shred_version: rng.gen(),
..ContactInfo::default()
};
let mut sockets = HashMap::<u8, SocketAddr>::new();
for _ in 0..1 << 14 {
let addr = addrs.choose(&mut rng).unwrap();
let socket = SocketAddr::new(*addr, new_rand_port(&mut rng));
let tag = rng.gen_range(TAGS.start, TAGS.end);
if sanitize_socket(&socket).is_ok() {
sockets.insert(tag, socket);
assert_matches!(node.set_socket(tag, socket), Ok(()));
} else {
assert_matches!(node.set_socket(tag, socket), Err(_));
}
for tag in TAGS.clone() {
let socket = sockets.get(&tag);
assert_eq!(node.get_socket(tag).as_ref(), socket);
if usize::from(tag) < SOCKET_CACHE_SIZE {
assert_eq!(node.cache[usize::from(tag)].as_ref(), socket);
}
}
// Assert that all addresses are unique.
let num_unique_addrs = node.addrs.iter().copied().collect::<HashSet<_>>().len();
assert_eq!(node.addrs.len(), num_unique_addrs);
// Assert that only mapped addresses are stored.
assert_eq!(
node.addrs.iter().copied().collect::<HashSet<_>>(),
sockets.values().map(SocketAddr::ip).collect::<HashSet<_>>(),
);
}
}
}
4 changes: 3 additions & 1 deletion gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub enum GossipRoute {
PushMessage,
}

type CrdsCountsArray = [usize; 11];
type CrdsCountsArray = [usize; 12];

pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
Expand Down Expand Up @@ -700,6 +700,8 @@ impl CrdsDataStats {
CrdsData::NodeInstance(_) => 8,
CrdsData::DuplicateShred(_, _) => 9,
CrdsData::IncrementalSnapshotHashes(_) => 10,
CrdsData::ContactInfo(_) => 11,
// Update CrdsCountsArray if new items are added here.
}
}
}
Expand Down
Loading

0 comments on commit e52dabb

Please sign in to comment.