Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Implement eth/65 (EIP-2464)
Browse files Browse the repository at this point in the history
  • Loading branch information
vorot93 committed Sep 10, 2020
1 parent 63e2781 commit b4af110
Show file tree
Hide file tree
Showing 13 changed files with 229 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion ethcore/client-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ pub trait BlockChainClient:
fn list_storage(&self, id: BlockId, account: &Address, after: Option<&H256>, count: Option<u64>) -> Option<Vec<H256>>;

/// Get transaction with given hash.
fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction>;
fn block_transaction(&self, id: TransactionId) -> Option<LocalizedTransaction>;

/// Get pool transaction with a given hash.
fn queued_transaction(&self, hash: H256) -> Option<Arc<VerifiedTransaction>>;

/// Get uncle with given id.
fn uncle(&self, id: UncleId) -> Option<encoded::Header>;
Expand Down
6 changes: 5 additions & 1 deletion ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1898,10 +1898,14 @@ impl BlockChainClient for Client {
Some(keys)
}

fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction> {
fn block_transaction(&self, id: TransactionId) -> Option<LocalizedTransaction> {
self.transaction_address(id).and_then(|address| self.chain.read().transaction(&address))
}

fn queued_transaction(&self, hash: H256) -> Option<Arc<VerifiedTransaction>> {
self.importer.miner.transaction(&hash)
}

fn uncle(&self, id: UncleId) -> Option<encoded::Header> {
let index = id.position;
self.block_body(id.block).and_then(|body| body.view().uncle_rlp_at(index))
Expand Down
5 changes: 4 additions & 1 deletion ethcore/src/test_helpers/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,9 +737,12 @@ impl BlockChainClient for TestBlockChainClient {
fn list_storage(&self, _id: BlockId, _account: &Address, _after: Option<&H256>, _count: Option<u64>) -> Option<Vec<H256>> {
None
}
fn transaction(&self, _id: TransactionId) -> Option<LocalizedTransaction> {
fn block_transaction(&self, _id: TransactionId) -> Option<LocalizedTransaction> {
None // Simple default.
}
fn queued_transaction(&self, _hash: H256) -> Option<Arc<VerifiedTransaction>> {
None
}

fn uncle(&self, _id: UncleId) -> Option<encoded::Header> {
None // Simple default.
Expand Down
1 change: 1 addition & 0 deletions ethcore/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ rlp = "0.4.5"
snapshot = { path = "../snapshot" }
trace-time = "0.1"
triehash-ethereum = { version = "0.2", path = "../../util/triehash-ethereum" }
transaction-pool = "2"

[dev-dependencies]
env_logger = "0.5"
Expand Down
70 changes: 64 additions & 6 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ use crate::{
sync_packet::{
PacketInfo,
SyncPacket::{
self, BlockBodiesPacket, BlockHeadersPacket, NewBlockHashesPacket, NewBlockPacket,
PrivateStatePacket, PrivateTransactionPacket, ReceiptsPacket, SignedPrivateTransactionPacket,
SnapshotDataPacket, SnapshotManifestPacket, StatusPacket,
self, *,
}
},
BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester,
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4,
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65,
MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4,
}
};

Expand All @@ -53,6 +51,7 @@ use common_types::{
verification::Unverified,
snapshot::{ManifestData, RestorationStatus},
};
use transaction_pool::VerifiedTransaction;


/// The Chain Sync Handler: handles responses from peers
Expand All @@ -70,6 +69,8 @@ impl SyncHandler {
ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
NewPooledTransactionHashesPacket => SyncHandler::on_peer_new_pooled_transactions(sync, io, peer, &rlp),
PooledTransactionsPacket => SyncHandler::on_peer_pooled_transactions(sync, io, peer, &rlp),
SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
Expand Down Expand Up @@ -595,9 +596,11 @@ impl SyncHandler {
difficulty,
latest_hash,
genesis,
unsent_pooled_hashes: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(io.chain().transactions_to_propagate().into_iter().map(|tx| *tx.hash()).collect()) } else { None },
asking: PeerAsking::Nothing,
asking_blocks: Vec::new(),
asking_hash: None,
asking_pooled_transactions: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(Vec::new()) } else { None },
asking_private_state: None,
ask_time: Instant::now(),
last_sent_transactions: Default::default(),
Expand Down Expand Up @@ -656,7 +659,7 @@ impl SyncHandler {

if false
|| (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_4.0))
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_64.0))
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_65.0))
{
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
return Err(DownloaderImportError::Invalid);
Expand Down Expand Up @@ -703,6 +706,61 @@ impl SyncHandler {
Ok(())
}

/// Called when peer sends us a set of new pooled transactions
pub fn on_peer_new_pooled_transactions(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> {
for item in tx_rlp {
let hash = item.as_val::<H256>().map_err(|_| DownloaderImportError::Invalid)?;

if io.chain().queued_transaction(hash).is_none() {
let unfetched = sync.unfetched_pooled_transactions.entry(hash).or_insert_with(|| super::UnfetchedTransaction {
announcer: peer_id,
next_fetch: Instant::now(),
tries: 0,
});

// Only reset the budget if we hear from multiple sources
if unfetched.announcer != peer_id {
unfetched.next_fetch = Instant::now();
unfetched.tries = 0;
}
}
}

Ok(())
}

/// Called when peer sends us a list of pooled transactions
pub fn on_peer_pooled_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> {
let peer = match sync.peers.get(&peer_id).filter(|p| p.can_sync()) {
Some(peer) => peer,
None => {
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
return Ok(());
}
};

// TODO: actually check against asked hashes
let item_count = tx_rlp.item_count()?;
if let Some(p) = &peer.asking_pooled_transactions {
if item_count > p.len() {
trace!(target: "sync", "{} Peer sent us more transactions than was supposed to", peer_id);
return Err(DownloaderImportError::Invalid);
}
} else {
trace!(target: "sync", "{} Peer sent us pooled transactions but does not declare support for them", peer_id);
return Err(DownloaderImportError::Invalid);
}
trace!(target: "sync", "{:02} -> PooledTransactions ({} entries)", peer_id, item_count);
let mut transactions = Vec::with_capacity(item_count);
for i in 0 .. item_count {
let rlp = tx_rlp.at(i)?;
let tx = rlp.as_raw().to_vec();
transactions.push(tx);
}
io.chain().queue_transactions(transactions, peer_id);
Ok(())
}

/// Called when peer sends us signed private transaction packet
fn on_signed_private_transaction(sync: &mut ChainSync, _io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
Expand Down
78 changes: 75 additions & 3 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ impl From<DecoderError> for PacketProcessError {
}
}

/// Version 65 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
pub const ETH_PROTOCOL_VERSION_65: (u8, u8) = (65, 0x11);
/// Version 64 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11);
/// Version 63 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
Expand Down Expand Up @@ -217,6 +219,7 @@ const STATUS_TIMEOUT: Duration = Duration::from_secs(10);
const HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
const BODIES_TIMEOUT: Duration = Duration::from_secs(20);
const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10);
const POOLED_TRANSACTIONS_TIMEOUT: Duration = Duration::from_secs(10);
const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3);
/// Max time to wait for the Snapshot Manifest packet to arrive from a peer after it's being asked.
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -318,6 +321,7 @@ pub enum PeerAsking {
BlockHeaders,
BlockBodies,
BlockReceipts,
PooledTransactions,
SnapshotManifest,
SnapshotData,
PrivateState,
Expand Down Expand Up @@ -352,6 +356,8 @@ pub struct PeerInfo {
network_id: u64,
/// Peer best block hash
latest_hash: H256,
/// Unpropagated tx pool hashes
unsent_pooled_hashes: Option<H256FastSet>,
/// Peer total difficulty if known
difficulty: Option<U256>,
/// Type of data currently being requested by us from a peer.
Expand All @@ -360,6 +366,8 @@ pub struct PeerInfo {
asking_blocks: Vec<H256>,
/// Holds requested header hash if currently requesting block header by hash
asking_hash: Option<H256>,
/// Holds requested transaction IDs
asking_pooled_transactions: Option<Vec<H256>>,
/// Holds requested private state hash
asking_private_state: Option<H256>,
/// Holds requested snapshot chunk hash if any.
Expand Down Expand Up @@ -669,6 +677,13 @@ enum PeerState {
SameBlock
}

#[derive(Clone, MallocSizeOf)]
struct UnfetchedTransaction {
announcer: PeerId,
next_fetch: Instant,
tries: usize,
}

/// Blockchain sync handler.
/// See module documentation for more details.
#[derive(MallocSizeOf)]
Expand Down Expand Up @@ -708,6 +723,8 @@ pub struct ChainSync {
sync_start_time: Option<Instant>,
/// Transactions propagation statistics
transactions_stats: TransactionsStats,
/// Transactions whose hash has been announced, but that we have not fetched
unfetched_pooled_transactions: H256FastMap<UnfetchedTransaction>,
/// Enable ancient block downloading
download_old_blocks: bool,
/// Shared private tx service.
Expand Down Expand Up @@ -751,6 +768,7 @@ impl ChainSync {
snapshot: Snapshot::new(),
sync_start_time: None,
transactions_stats: TransactionsStats::default(),
unfetched_pooled_transactions: Default::default(),
private_tx_handler,
warp_sync: config.warp_sync,
status_sinks: Vec::new()
Expand All @@ -764,7 +782,7 @@ impl ChainSync {
let last_imported_number = self.new_blocks.last_imported_block_number();
SyncStatus {
state: self.state.clone(),
protocol_version: ETH_PROTOCOL_VERSION_64.0,
protocol_version: ETH_PROTOCOL_VERSION_65.0,
network_id: self.network_id,
start_block_number: self.starting_block,
last_imported_block_number: Some(last_imported_number),
Expand Down Expand Up @@ -798,8 +816,17 @@ impl ChainSync {

/// Updates the set of transactions recently sent to this peer to avoid spamming.
pub fn transactions_received(&mut self, txs: &[UnverifiedTransaction], peer_id: PeerId) {
if let Some(peer_info) = self.peers.get_mut(&peer_id) {
peer_info.last_sent_transactions.extend(txs.iter().map(|tx| tx.hash()));
for (id, peer) in &mut self.peers {
let hashes = txs.iter().map(|tx| tx.hash());
if *id == peer_id {
peer.last_sent_transactions.extend(hashes);
} else if let Some(s) = &mut peer.unsent_pooled_hashes {
s.extend(hashes);
}
}

for tx in txs {
self.unfetched_pooled_transactions.remove(&tx.hash());
}
}

Expand Down Expand Up @@ -1149,6 +1176,48 @@ impl ChainSync {
}
}

// get the peer to give us at least some of announced but unfetched transactions
if !self.unfetched_pooled_transactions.is_empty() {
if let Some(s) = &mut self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions {
let now = Instant::now();

let mut new_asking_pooled_transactions = s.iter().copied().collect::<HashSet<_>>();
let mut remaining_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone();
for (hash, mut item) in self.unfetched_pooled_transactions.drain() {
if new_asking_pooled_transactions.len() >= 256 {
// can't request any more transactions
break;
}

// if enough time has passed since last attempt...
if item.next_fetch < now {
// ...queue this hash for requesting
new_asking_pooled_transactions.insert(hash);
item.tries += 1;

// if we just started asking for it, queue it to be asked later on again
if item.tries < 5 {
item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2);
remaining_unfetched_pooled_transactions.insert(hash, item);
} else {
// ...otherwise we assume this transaction does not exist and remove its hash from request queue
remaining_unfetched_pooled_transactions.remove(&hash);
}
}
}

let new_asking_pooled_transactions = new_asking_pooled_transactions.into_iter().collect::<Vec<_>>();
SyncRequester::request_pooled_transactions(self, io, peer_id, &new_asking_pooled_transactions);

self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions = Some(new_asking_pooled_transactions);
self.unfetched_pooled_transactions = remaining_unfetched_pooled_transactions;

return;
} else {
trace!(target: "sync", "Skipping transaction fetch for peer {} as they don't support eth/65", peer_id);
}
}

// Only ask for old blocks if the peer has an equal or higher difficulty
let equal_or_higher_difficulty = peer_difficulty.map_or(true, |pd| pd >= syncing_difficulty);

Expand Down Expand Up @@ -1340,6 +1409,7 @@ impl ChainSync {
PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT,
PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT,
PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT,
PeerAsking::PooledTransactions => elapsed > POOLED_TRANSACTIONS_TIMEOUT,
PeerAsking::Nothing => false,
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT,
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT,
Expand Down Expand Up @@ -1668,10 +1738,12 @@ pub mod tests {
genesis: H256::zero(),
network_id: 0,
latest_hash: peer_latest_hash,
unsent_pooled_hashes: Some(Default::default()),
difficulty: None,
asking: PeerAsking::Nothing,
asking_blocks: Vec::new(),
asking_hash: None,
asking_pooled_transactions: Some(Vec::new()),
asking_private_state: None,
ask_time: Instant::now(),
last_sent_transactions: Default::default(),
Expand Down
Loading

0 comments on commit b4af110

Please sign in to comment.