Skip to content

Commit

Permalink
Support bench vote using quic (#4298)
Browse files Browse the repository at this point in the history
* bench vote using quic
  • Loading branch information
lijunwangs authored Jan 8, 2025
1 parent ae6e34c commit a1844d1
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 40 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion bench-vote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ edition = { workspace = true }

[dependencies]
bincode = { workspace = true }
clap = { version = "3.1.5", features = ["cargo"] }
clap = { workspace = true }
crossbeam-channel = { workspace = true }
solana-clap-utils = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-logger = { workspace = true }
solana-net-utils = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
Expand Down
175 changes: 137 additions & 38 deletions bench-vote/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
#![allow(clippy::arithmetic_side_effects)]

use {
clap::{crate_description, crate_name, Arg, Command},
clap::{crate_description, crate_name, value_t, value_t_or_exit, App, Arg},
crossbeam_channel::unbounded,
solana_clap_utils::{input_parsers::keypair_of, input_validators::is_keypair_or_ask_keyword},
solana_client::connection_cache::ConnectionCache,
solana_connection_cache::client_connection::ClientConnection,
solana_net_utils::{bind_to_unspecified, SocketConfig},
solana_sdk::{
hash::Hash, message::Message, signature::Keypair, signer::Signer, transaction::Transaction,
hash::Hash, message::Message, pubkey::Pubkey, signature::Keypair, signer::Signer,
transaction::Transaction,
},
solana_streamer::{
packet::PacketBatchRecycler,
streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats},
quic::{spawn_server_multi, QuicServerParams},
streamer::{receiver, PacketBatchReceiver, StakedNodes, StreamerReceiveStats},
},
solana_vote_program::{vote_instruction, vote_state::Vote},
std::{
cmp::max,
collections::HashMap,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
Arc, RwLock,
},
thread::{self, spawn, JoinHandle, Result},
time::{Duration, Instant, SystemTime},
Expand Down Expand Up @@ -57,82 +61,124 @@ fn sink(
const TRANSACTIONS_PER_THREAD: u64 = 1_000_000; // Number of transactions per thread

fn main() -> Result<()> {
let matches = Command::new(crate_name!())
let matches = App::new(crate_name!())
.about(crate_description!())
.version(solana_version::version!())
.arg(
Arg::new("num-recv-sockets")
Arg::with_name("identity")
.short("i")
.long("identity")
.value_name("KEYPAIR")
.takes_value(true)
.validator(is_keypair_or_ask_keyword)
.help("Identity keypair for the QUIC endpoint when '--use-quic' is set true. If it is not specified a dynamic key is created."),
)
.arg(
Arg::with_name("num-recv-sockets")
.long("num-recv-sockets")
.value_name("NUM")
.takes_value(true)
.help("Use NUM receive sockets"),
)
.arg(
Arg::new("num-producers")
Arg::with_name("num-producers")
.long("num-producers")
.value_name("NUM")
.takes_value(true)
.help("Use this many producer threads."),
)
.arg(
Arg::new("server-only")
Arg::with_name("server-only")
.long("server-only")
.takes_value(false)
.help("Run the bench tool as a server only."),
)
.arg(
Arg::new("client-only")
Arg::with_name("client-only")
.long("client-only")
.takes_value(false)
.requires("server-address")
.help("Run the bench tool as a client only."),
)
.arg(
Arg::with_name("server-address")
.short('n')
.short("n")
.long("server-address")
.value_name("HOST:PORT")
.takes_value(true)
.validator(|arg| solana_net_utils::is_host_port(arg.to_string()))
.help("The destination streamer address to which the client will send transactions to"),
)
.arg(
Arg::new("use-connection-cache")
Arg::with_name("use-connection-cache")
.long("use-connection-cache")
.takes_value(false)
.help("Use this many producer threads."),
)
.arg(
Arg::new("verbose")
Arg::with_name("verbose")
.long("verbose")
.takes_value(false)
.help("Show verbose messages."),
)
.arg(
Arg::with_name("use-quic")
.long("use-quic")
.value_name("Boolean")
.takes_value(true)
.default_value("false")
.help("Controls if to use QUIC for sending/receiving vote transactions."),
)
.get_matches();

solana_logger::setup();

let mut num_sockets = 1usize;
if let Some(n) = matches.value_of("num-recv-sockets") {
num_sockets = max(num_sockets, n.to_string().parse().expect("integer"));
}

let num_producers: u64 = matches.value_of_t("num-producers").unwrap_or(4);

let vote_use_quic = value_t_or_exit!(matches, "use-quic", bool);
let num_producers: u64 = value_t!(matches, "num-producers", u64).unwrap_or(4);
let use_connection_cache = matches.is_present("use-connection-cache");

let server_only = matches.is_present("server-only");
let client_only = matches.is_present("client-only");
let verbose = matches.is_present("verbose");

let destination = matches.is_present("server-address").then(|| {
let addr = matches
.value_of("server-address")
.expect("Destination must be set when --client-only is used");
.expect("Server address must be set when --client-only is used");
solana_net_utils::parse_host_port(addr).expect("Expecting a valid server address")
});

let port = destination.map_or(0, |addr| addr.port());
let ip_addr = destination.map_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED), |addr| addr.ip());

let quic_params = vote_use_quic.then(|| {
let identity_keypair = keypair_of(&matches, "identity").or_else(|| {
println!("--identity is not specified when --use-quic is on. Will generate a key dynamically.");
Some(Keypair::new())
}).unwrap();

let stake: u64 = 1024;
let total_stake: u64 = 1024;

let stakes = HashMap::from([
(identity_keypair.pubkey(), stake),
(Pubkey::new_unique(), total_stake.saturating_sub(stake)),
]);
let staked_nodes: Arc<RwLock<StakedNodes>> = Arc::new(RwLock::new(StakedNodes::new(
Arc::new(stakes),
HashMap::<Pubkey, u64>::default(), // overrides
)));

QuicParams {
identity_keypair,
staked_nodes
}
});

let (exit, read_threads, sink_threads, destination) = if !client_only {
let exit = Arc::new(AtomicBool::new(false));

Expand All @@ -147,24 +193,48 @@ fn main() -> Result<()> {
num_sockets,
)
.unwrap();
let stats = Arc::new(StreamerReceiveStats::new("bench-streamer-test"));
for read in read_sockets {
read.set_read_timeout(Some(SOCKET_RECEIVE_TIMEOUT)).unwrap();

let stats = Arc::new(StreamerReceiveStats::new("bench-vote-test"));

if let Some(quic_params) = &quic_params {
let quic_server_params = QuicServerParams {
max_connections_per_ipaddr_per_min: 1024,
max_connections_per_peer: 1024,
..Default::default()
};
let (s_reader, r_reader) = unbounded();
read_channels.push(r_reader);
read_threads.push(receiver(
"solRcvrBenStrmr".to_string(),
Arc::new(read),
exit.clone(),

let server = spawn_server_multi(
"solRcvrBenVote",
"bench_vote_metrics",
read_sockets,
&quic_params.identity_keypair,
s_reader,
recycler.clone(),
stats.clone(),
COALESCE_TIME, // coalesce
true, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
));
exit.clone(),
quic_params.staked_nodes.clone(),
quic_server_params,
)
.unwrap();
read_threads.push(server.thread);
} else {
for read in read_sockets {
read.set_read_timeout(Some(SOCKET_RECEIVE_TIMEOUT)).unwrap();

let (s_reader, r_reader) = unbounded();
read_channels.push(r_reader);
read_threads.push(receiver(
"solRcvrBenVote".to_string(),
Arc::new(read),
exit.clone(),
s_reader,
recycler.clone(),
stats.clone(),
COALESCE_TIME, // coalesce
true, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
));
}
}

let received_size = Arc::new(AtomicUsize::new(0));
Expand All @@ -187,8 +257,15 @@ fn main() -> Result<()> {

let start = SystemTime::now();

let producer_threads =
(!server_only).then(|| producer(destination, num_producers, use_connection_cache, verbose));
let producer_threads = (!server_only).then(|| {
producer(
destination,
num_producers,
use_connection_cache,
verbose,
quic_params,
)
});

producer_threads
.into_iter()
Expand Down Expand Up @@ -231,18 +308,40 @@ enum Transporter {
DirectSocket(Arc<UdpSocket>),
}

struct QuicParams {
identity_keypair: Keypair,
staked_nodes: Arc<RwLock<StakedNodes>>,
}

fn producer(
sock: SocketAddr,
num_producers: u64,
use_connection_cache: bool,
verbose: bool,
quic_params: Option<QuicParams>,
) -> Vec<JoinHandle<()>> {
println!("Running clients against {sock:?}");
let transporter = if use_connection_cache {
Transporter::Cache(Arc::new(ConnectionCache::with_udp(
"connection_cache_vote_udp",
1, // connection_pool_size
)))
let transporter = if use_connection_cache || quic_params.is_some() {
if let Some(quic_params) = &quic_params {
Transporter::Cache(Arc::new(ConnectionCache::new_with_client_options(
"connection_cache_vote_quic",
256, // connection_pool_size
None, // client_endpoint
Some((
&quic_params.identity_keypair,
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)),
Some((
&quic_params.staked_nodes,
&quic_params.identity_keypair.pubkey(),
)),
)))
} else {
Transporter::Cache(Arc::new(ConnectionCache::with_udp(
"connection_cache_vote_udp",
1, // connection_pool_size
)))
}
} else {
Transporter::DirectSocket(Arc::new(bind_to_unspecified().unwrap()))
};
Expand Down
1 change: 1 addition & 0 deletions quic-client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl QuicLazyInitializedEndpoint {
)
.expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range")
.1;
info!("Local endpoint is : {client_socket:?}");

QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket)
};
Expand Down

0 comments on commit a1844d1

Please sign in to comment.