Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Fix issues during block sync (#11265)
Browse files Browse the repository at this point in the history
  • Loading branch information
grbIzl authored Jun 30, 2020
1 parent f81c576 commit 63e2781
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ethcore/client-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ pub trait BlockChainClient:
/// Schedule state-altering transaction to be executed on the next pending
/// block with the given gas and nonce parameters.
fn transact(&self, tx_request: TransactionRequest) -> Result<(), transaction::Error>;

/// Returns true, if underlying import queue is processing possible fork at the moment
fn is_processing_fork(&self) -> bool;
}

/// The data required for a `Client` to create a transaction.
Expand Down
5 changes: 5 additions & 0 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,11 @@ impl BlockChainClient for Client {
}
}

fn is_processing_fork(&self) -> bool {
let chain = self.chain.read();
self.importer.block_queue.is_processing_fork(&chain.best_block_hash(), &chain)
}

fn block_total_difficulty(&self, id: BlockId) -> Option<U256> {
let chain = self.chain.read();

Expand Down
2 changes: 2 additions & 0 deletions ethcore/src/test_helpers/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,8 @@ impl BlockChainClient for TestBlockChainClient {
}
}

fn is_processing_fork(&self) -> bool { false }

// works only if blocks are one after another 1 -> 2 -> 3
fn tree_route(&self, from: &H256, to: &H256) -> Option<TreeRoute> {
Some(TreeRoute {
Expand Down
1 change: 1 addition & 0 deletions ethcore/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bytes = { package = "parity-bytes", version = "0.1" }
client-traits = { path = "../client-traits" }
common-types = { path = "../types" }
devp2p = { package = "ethcore-network-devp2p", path = "../../util/network-devp2p" }
derive_more = "0.99"
enum-primitive-derive = "0.2"
ethcore-io = { path = "../../util/io" }
ethcore-private-tx = { path = "../private-tx" }
Expand Down
3 changes: 3 additions & 0 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ const MAINTAIN_SYNC_TIMER: TimerToken = 1;
const CONTINUE_SYNC_TIMER: TimerToken = 2;
const TX_TIMER: TimerToken = 3;
const PRIORITY_TIMER: TimerToken = 4;
const DELAYED_PROCESSING_TIMER: TimerToken = 5;

pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250);

Expand All @@ -489,6 +490,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
io.register_timer(MAINTAIN_SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(CONTINUE_SYNC_TIMER, Duration::from_millis(2500)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");
io.register_timer(DELAYED_PROCESSING_TIMER, Duration::from_millis(2100)).expect("Error registering delayed processing timer");

io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer");
}
Expand Down Expand Up @@ -539,6 +541,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
CONTINUE_SYNC_TIMER => self.sync.write().continue_sync(&mut io),
TX_TIMER => self.sync.write().propagate_new_transactions(&mut io),
PRIORITY_TIMER => self.sync.process_priority_queue(&mut io),
DELAYED_PROCESSING_TIMER => self.sync.process_delayed_requests(&mut io),
_ => warn!("Unknown timer {} triggered.", timer),
}
}
Expand Down
41 changes: 30 additions & 11 deletions ethcore/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,19 @@ impl BlockDownloader {
self.state = State::Blocks;
}

/// Reset sync to the specified block
fn reset_to_block(&mut self, start_hash: &H256, start_number: BlockNumber) {
self.reset();
self.last_imported_block = start_number;
self.last_imported_hash = start_hash.clone();
self.last_round_start = start_number;
self.last_round_start_hash = start_hash.clone();
self.imported_this_round = None;
self.round_parents = VecDeque::new();
self.target_hash = None;
self.retract_step = 1;
}

/// Returns best imported block number.
pub fn last_imported_block_number(&self) -> BlockNumber {
self.last_imported_block
Expand Down Expand Up @@ -439,22 +452,28 @@ impl BlockDownloader {
trace_sync!(self, "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash);
} else {
let best = io.chain().chain_info().best_block_number;
let best_hash = io.chain().chain_info().best_block_hash;
let oldest_reorg = io.chain().pruning_info().earliest_state;
if self.block_set == BlockSet::NewBlocks && best > start && start < oldest_reorg {
debug_sync!(self, "Could not revert to previous ancient block, last: {} ({})", start, start_hash);
self.reset();
self.reset_to_block(&best_hash, best);
} else {
let n = start - cmp::min(self.retract_step, start);
self.retract_step *= 2;
match io.chain().block_hash(BlockId::Number(n)) {
Some(h) => {
self.last_imported_block = n;
self.last_imported_hash = h;
trace_sync!(self, "Searching common header in the blockchain {} ({})", start, self.last_imported_hash);
}
None => {
debug_sync!(self, "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash);
self.reset();
if n == 0 {
debug_sync!(self, "Header not found, bottom line reached, resetting, last imported: {}", self.last_imported_hash);
self.reset_to_block(&best_hash, best);
} else {
self.retract_step *= 2;
match io.chain().block_hash(BlockId::Number(n)) {
Some(h) => {
self.last_imported_block = n;
self.last_imported_hash = h;
trace_sync!(self, "Searching common header in the blockchain {} ({})", start, self.last_imported_hash);
}
None => {
debug_sync!(self, "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash);
self.reset_to_block(&best_hash, best);
}
}
}
}
Expand Down
23 changes: 14 additions & 9 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
SnapshotDataPacket, SnapshotManifestPacket, StatusPacket,
}
},
BlockSet, ChainSync, ForkConfirmation, PacketDecodeError, PeerAsking, PeerInfo, SyncRequester,
BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester,
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4,
}
Expand Down Expand Up @@ -114,6 +114,7 @@ impl SyncHandler {
debug!(target: "sync", "Disconnected {}", peer_id);
sync.clear_peer_download(peer_id);
sync.peers.remove(&peer_id);
sync.delayed_requests.retain(|(request_peer_id, _, _)| *request_peer_id != peer_id);
sync.active_peers.remove(&peer_id);

if sync.state == SyncState::SnapshotManifest {
Expand Down Expand Up @@ -149,23 +150,27 @@ impl SyncHandler {
trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id);
return Ok(());
}
let difficulty: U256 = r.val_at(1)?;
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
peer.difficulty = Some(difficulty);
}
}
let block = Unverified::from_rlp(r.at(0)?.as_raw().to_vec())?;
let hash = block.header.hash();
let number = block.header.number();
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, hash);
if number > sync.highest_block.unwrap_or(0) {
sync.highest_block = Some(number);
}
let parent_hash = block.header.parent_hash();
let difficulty: U256 = r.val_at(1)?;
// Most probably the sent block is being imported by peer right now
// Use td and hash, that peer must have for now
let parent_td = difficulty.checked_sub(*block.header.difficulty());
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
if peer.difficulty.map_or(true, |pd| parent_td.map_or(false, |td| td > pd)) {
peer.difficulty = parent_td;
}
}
let mut unknown = false;

if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
peer.latest_hash = hash;
peer.latest_hash = *parent_hash;
}

let last_imported_number = sync.new_blocks.last_imported_block_number();
Expand Down Expand Up @@ -675,7 +680,7 @@ impl SyncHandler {
}

/// Called when peer sends us new transactions
pub fn on_peer_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: Rlp) -> Result<(), PacketDecodeError> {
pub fn on_peer_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: Rlp) -> Result<(), PacketProcessError> {
// Accept transactions only when fully synced
if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) {
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
Expand Down
58 changes: 54 additions & 4 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,14 @@ use crate::{

use bytes::Bytes;
use client_traits::BlockChainClient;
use derive_more::Display;
use ethereum_types::{H256, U256};
use fastmap::{H256FastMap, H256FastSet};
use futures::sync::mpsc as futures_mpsc;
use keccak_hash::keccak;
use log::{error, trace, debug, warn};
use network::client_version::ClientVersion;
use network::{self, PeerId, PacketId};
use network::{self, PeerId};
use parity_util_mem::{MallocSizeOfExt, malloc_size_of_is_0};
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use rand::{Rng, seq::SliceRandom};
Expand Down Expand Up @@ -147,7 +148,23 @@ pub(crate) use self::supplier::SyncSupplier;

malloc_size_of_is_0!(PeerInfo);

pub type PacketDecodeError = DecoderError;
/// Possible errors during packet's processing
#[derive(Debug, Display)]
pub enum PacketProcessError {
/// Error of RLP decoder
#[display(fmt = "Decoder Error: {}", _0)]
Decoder(DecoderError),
/// Underlying client is busy and cannot process the packet
/// The packet should be postponed for later response
#[display(fmt = "Underlying client is busy")]
ClientBusy,
}

impl From<DecoderError> for PacketProcessError {
fn from(err: DecoderError) -> Self {
PacketProcessError::Decoder(err).into()
}
}

/// Version 64 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11);
Expand Down Expand Up @@ -411,7 +428,7 @@ pub mod random {
}
}

pub type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
pub type RlpResponseResult = Result<Option<(SyncPacket, RlpStream)>, PacketProcessError>;
pub type Peers = HashMap<PeerId, PeerInfo>;

/// Thread-safe wrapper for `ChainSync`.
Expand Down Expand Up @@ -468,6 +485,17 @@ impl ChainSyncApi {
SyncSupplier::dispatch_packet(&self.sync, io, peer, packet_id, data)
}

/// Process the queue with requests, that were delayed with response.
pub fn process_delayed_requests(&self, io: &mut dyn SyncIo) {
let requests = self.sync.write().retrieve_delayed_requests();
if !requests.is_empty() {
debug!(target: "sync", "Processing {} delayed requests", requests.len());
for (peer_id, packet_id, packet_data) in requests {
SyncSupplier::dispatch_delayed_request(&self.sync, io, peer_id, packet_id, &packet_data);
}
}
}

/// Process a priority propagation queue.
/// This task is run from a timer and should be time constrained.
/// Hence we set up a deadline for the execution and cancel the task if the deadline is exceeded.
Expand Down Expand Up @@ -672,6 +700,10 @@ pub struct ChainSync {
/// Connected peers pending Status message.
/// Value is request timestamp.
handshaking_peers: HashMap<PeerId, Instant>,
/// Requests, that can not be processed at the moment
delayed_requests: Vec<(PeerId, u8, Vec<u8>)>,
/// Ids of delayed requests, used for lookup, id is composed from peer id and packet id
delayed_requests_ids: HashSet<(PeerId, u8)>,
/// Sync start timestamp. Measured when first peer is connected
sync_start_time: Option<Instant>,
/// Transactions propagation statistics
Expand Down Expand Up @@ -707,6 +739,8 @@ impl ChainSync {
peers: HashMap::new(),
handshaking_peers: HashMap::new(),
active_peers: HashSet::new(),
delayed_requests: Vec::new(),
delayed_requests_ids: HashSet::new(),
new_blocks: BlockDownloader::new(BlockSet::NewBlocks, &chain_info.best_block_hash, chain_info.best_block_number),
old_blocks: None,
last_sent_block_number: 0,
Expand Down Expand Up @@ -821,6 +855,22 @@ impl ChainSync {
self.active_peers = self.peers.keys().cloned().collect();
}

/// Add a request for later processing
pub fn add_delayed_request(&mut self, peer: PeerId, packet_id: u8, data: &[u8]) {
// Ignore the request, if there is a request already in queue with the same id
if !self.delayed_requests_ids.contains(&(peer, packet_id)) {
self.delayed_requests_ids.insert((peer, packet_id));
self.delayed_requests.push((peer, packet_id, data.to_vec()));
debug!(target: "sync", "Delayed request with packet id {} from peer {} added", packet_id, peer);
}
}

/// Drain and return all delayed requests
pub fn retrieve_delayed_requests(&mut self) -> Vec<(PeerId, u8, Vec<u8>)> {
self.delayed_requests_ids.clear();
self.delayed_requests.drain(..).collect()
}

/// Restart sync
pub fn reset_and_continue(&mut self, io: &mut dyn SyncIo) {
trace!(target: "sync", "Restarting");
Expand Down Expand Up @@ -1261,7 +1311,7 @@ impl ChainSync {
packet.append(&chain.total_difficulty);
packet.append(&chain.best_block_hash);
packet.append(&chain.genesis_hash);
if eth_protocol_version >= ETH_PROTOCOL_VERSION_64.0 {
if eth_protocol_version >= ETH_PROTOCOL_VERSION_64.0 {
packet.append(&self.fork_filter.current(io.chain()));
}
if warp_protocol {
Expand Down
Loading

0 comments on commit 63e2781

Please sign in to comment.