From b4af110e8870a0e6b4f7d587108c908979be50e5 Mon Sep 17 00:00:00 2001 From: Artem Vorotnikov Date: Fri, 3 Apr 2020 21:57:11 +0300 Subject: [PATCH] Implement eth/65 (EIP-2464) --- Cargo.lock | 1 + ethcore/client-traits/src/lib.rs | 5 +- ethcore/src/client/client.rs | 6 +- ethcore/src/test_helpers/test_client.rs | 5 +- ethcore/sync/Cargo.toml | 1 + ethcore/sync/src/chain/handler.rs | 70 ++++++++++++++++++++-- ethcore/sync/src/chain/mod.rs | 78 ++++++++++++++++++++++++- ethcore/sync/src/chain/propagator.rs | 36 ++++++++++-- ethcore/sync/src/chain/requester.rs | 20 ++++--- ethcore/sync/src/chain/supplier.rs | 43 ++++++++------ ethcore/sync/src/chain/sync_packet.rs | 6 ++ ethcore/sync/src/tests/helpers.rs | 4 +- rpc/src/v1/impls/eth.rs | 2 +- 13 files changed, 229 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9cbb6b9302..255e7211d88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1609,6 +1609,7 @@ dependencies = [ "snapshot", "spec", "trace-time", + "transaction-pool", "triehash-ethereum", ] diff --git a/ethcore/client-traits/src/lib.rs b/ethcore/client-traits/src/lib.rs index 7692e9fc016..3ddd03dbe56 100644 --- a/ethcore/client-traits/src/lib.rs +++ b/ethcore/client-traits/src/lib.rs @@ -285,7 +285,10 @@ pub trait BlockChainClient: fn list_storage(&self, id: BlockId, account: &Address, after: Option<&H256>, count: Option) -> Option>; /// Get transaction with given hash. - fn transaction(&self, id: TransactionId) -> Option; + fn block_transaction(&self, id: TransactionId) -> Option; + + /// Get pool transaction with a given hash. + fn queued_transaction(&self, hash: H256) -> Option>; /// Get uncle with given id. fn uncle(&self, id: UncleId) -> Option; diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 043e04d808d..3e527033aff 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1898,10 +1898,14 @@ impl BlockChainClient for Client { Some(keys) } - fn transaction(&self, id: TransactionId) -> Option { + fn block_transaction(&self, id: TransactionId) -> Option { self.transaction_address(id).and_then(|address| self.chain.read().transaction(&address)) } + fn queued_transaction(&self, hash: H256) -> Option> { + self.importer.miner.transaction(&hash) + } + fn uncle(&self, id: UncleId) -> Option { let index = id.position; self.block_body(id.block).and_then(|body| body.view().uncle_rlp_at(index)) diff --git a/ethcore/src/test_helpers/test_client.rs b/ethcore/src/test_helpers/test_client.rs index c0808f8dcc7..b547fa30194 100644 --- a/ethcore/src/test_helpers/test_client.rs +++ b/ethcore/src/test_helpers/test_client.rs @@ -737,9 +737,12 @@ impl BlockChainClient for TestBlockChainClient { fn list_storage(&self, _id: BlockId, _account: &Address, _after: Option<&H256>, _count: Option) -> Option> { None } - fn transaction(&self, _id: TransactionId) -> Option { + fn block_transaction(&self, _id: TransactionId) -> Option { None // Simple default. } + fn queued_transaction(&self, _hash: H256) -> Option> { + None + } fn uncle(&self, _id: UncleId) -> Option { None // Simple default. diff --git a/ethcore/sync/Cargo.toml b/ethcore/sync/Cargo.toml index 6789a23bd54..7addfe46a3b 100644 --- a/ethcore/sync/Cargo.toml +++ b/ethcore/sync/Cargo.toml @@ -36,6 +36,7 @@ rlp = "0.4.5" snapshot = { path = "../snapshot" } trace-time = "0.1" triehash-ethereum = { version = "0.2", path = "../../util/triehash-ethereum" } +transaction-pool = "2" [dev-dependencies] env_logger = "0.5" diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 3cd53743e7b..6156d999bcc 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -26,14 +26,12 @@ use crate::{ sync_packet::{ PacketInfo, SyncPacket::{ - self, BlockBodiesPacket, BlockHeadersPacket, NewBlockHashesPacket, NewBlockPacket, - PrivateStatePacket, PrivateTransactionPacket, ReceiptsPacket, SignedPrivateTransactionPacket, - SnapshotDataPacket, SnapshotManifestPacket, StatusPacket, + self, *, } }, BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester, - SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, - PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4, + SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65, + MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4, } }; @@ -53,6 +51,7 @@ use common_types::{ verification::Unverified, snapshot::{ManifestData, RestorationStatus}, }; +use transaction_pool::VerifiedTransaction; /// The Chain Sync Handler: handles responses from peers @@ -70,6 +69,8 @@ impl SyncHandler { ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp), NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp), + NewPooledTransactionHashesPacket => SyncHandler::on_peer_new_pooled_transactions(sync, io, peer, &rlp), + PooledTransactionsPacket => SyncHandler::on_peer_pooled_transactions(sync, io, peer, &rlp), SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp), SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp), @@ -595,9 +596,11 @@ impl SyncHandler { difficulty, latest_hash, genesis, + unsent_pooled_hashes: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(io.chain().transactions_to_propagate().into_iter().map(|tx| *tx.hash()).collect()) } else { None }, asking: PeerAsking::Nothing, asking_blocks: Vec::new(), asking_hash: None, + asking_pooled_transactions: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(Vec::new()) } else { None }, asking_private_state: None, ask_time: Instant::now(), last_sent_transactions: Default::default(), @@ -656,7 +659,7 @@ impl SyncHandler { if false || (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_4.0)) - || (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_64.0)) + || (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_65.0)) { trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version); return Err(DownloaderImportError::Invalid); @@ -703,6 +706,61 @@ impl SyncHandler { Ok(()) } + /// Called when peer sends us a set of new pooled transactions + pub fn on_peer_new_pooled_transactions(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> { + for item in tx_rlp { + let hash = item.as_val::().map_err(|_| DownloaderImportError::Invalid)?; + + if io.chain().queued_transaction(hash).is_none() { + let unfetched = sync.unfetched_pooled_transactions.entry(hash).or_insert_with(|| super::UnfetchedTransaction { + announcer: peer_id, + next_fetch: Instant::now(), + tries: 0, + }); + + // Only reset the budget if we hear from multiple sources + if unfetched.announcer != peer_id { + unfetched.next_fetch = Instant::now(); + unfetched.tries = 0; + } + } + } + + Ok(()) + } + + /// Called when peer sends us a list of pooled transactions + pub fn on_peer_pooled_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> { + let peer = match sync.peers.get(&peer_id).filter(|p| p.can_sync()) { + Some(peer) => peer, + None => { + trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id); + return Ok(()); + } + }; + + // TODO: actually check against asked hashes + let item_count = tx_rlp.item_count()?; + if let Some(p) = &peer.asking_pooled_transactions { + if item_count > p.len() { + trace!(target: "sync", "{} Peer sent us more transactions than was supposed to", peer_id); + return Err(DownloaderImportError::Invalid); + } + } else { + trace!(target: "sync", "{} Peer sent us pooled transactions but does not declare support for them", peer_id); + return Err(DownloaderImportError::Invalid); + } + trace!(target: "sync", "{:02} -> PooledTransactions ({} entries)", peer_id, item_count); + let mut transactions = Vec::with_capacity(item_count); + for i in 0 .. item_count { + let rlp = tx_rlp.at(i)?; + let tx = rlp.as_raw().to_vec(); + transactions.push(tx); + } + io.chain().queue_transactions(transactions, peer_id); + Ok(()) + } + /// Called when peer sends us signed private transaction packet fn on_signed_private_transaction(sync: &mut ChainSync, _io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index d7bce672c79..57e26e69b35 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -166,6 +166,8 @@ impl From for PacketProcessError { } } +/// Version 65 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count). +pub const ETH_PROTOCOL_VERSION_65: (u8, u8) = (65, 0x11); /// Version 64 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count). pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11); /// Version 63 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count). @@ -217,6 +219,7 @@ const STATUS_TIMEOUT: Duration = Duration::from_secs(10); const HEADERS_TIMEOUT: Duration = Duration::from_secs(15); const BODIES_TIMEOUT: Duration = Duration::from_secs(20); const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10); +const POOLED_TRANSACTIONS_TIMEOUT: Duration = Duration::from_secs(10); const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3); /// Max time to wait for the Snapshot Manifest packet to arrive from a peer after it's being asked. const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5); @@ -318,6 +321,7 @@ pub enum PeerAsking { BlockHeaders, BlockBodies, BlockReceipts, + PooledTransactions, SnapshotManifest, SnapshotData, PrivateState, @@ -352,6 +356,8 @@ pub struct PeerInfo { network_id: u64, /// Peer best block hash latest_hash: H256, + /// Unpropagated tx pool hashes + unsent_pooled_hashes: Option, /// Peer total difficulty if known difficulty: Option, /// Type of data currently being requested by us from a peer. @@ -360,6 +366,8 @@ pub struct PeerInfo { asking_blocks: Vec, /// Holds requested header hash if currently requesting block header by hash asking_hash: Option, + /// Holds requested transaction IDs + asking_pooled_transactions: Option>, /// Holds requested private state hash asking_private_state: Option, /// Holds requested snapshot chunk hash if any. @@ -669,6 +677,13 @@ enum PeerState { SameBlock } +#[derive(Clone, MallocSizeOf)] +struct UnfetchedTransaction { + announcer: PeerId, + next_fetch: Instant, + tries: usize, +} + /// Blockchain sync handler. /// See module documentation for more details. #[derive(MallocSizeOf)] @@ -708,6 +723,8 @@ pub struct ChainSync { sync_start_time: Option, /// Transactions propagation statistics transactions_stats: TransactionsStats, + /// Transactions whose hash has been announced, but that we have not fetched + unfetched_pooled_transactions: H256FastMap, /// Enable ancient block downloading download_old_blocks: bool, /// Shared private tx service. @@ -751,6 +768,7 @@ impl ChainSync { snapshot: Snapshot::new(), sync_start_time: None, transactions_stats: TransactionsStats::default(), + unfetched_pooled_transactions: Default::default(), private_tx_handler, warp_sync: config.warp_sync, status_sinks: Vec::new() @@ -764,7 +782,7 @@ impl ChainSync { let last_imported_number = self.new_blocks.last_imported_block_number(); SyncStatus { state: self.state.clone(), - protocol_version: ETH_PROTOCOL_VERSION_64.0, + protocol_version: ETH_PROTOCOL_VERSION_65.0, network_id: self.network_id, start_block_number: self.starting_block, last_imported_block_number: Some(last_imported_number), @@ -798,8 +816,17 @@ impl ChainSync { /// Updates the set of transactions recently sent to this peer to avoid spamming. pub fn transactions_received(&mut self, txs: &[UnverifiedTransaction], peer_id: PeerId) { - if let Some(peer_info) = self.peers.get_mut(&peer_id) { - peer_info.last_sent_transactions.extend(txs.iter().map(|tx| tx.hash())); + for (id, peer) in &mut self.peers { + let hashes = txs.iter().map(|tx| tx.hash()); + if *id == peer_id { + peer.last_sent_transactions.extend(hashes); + } else if let Some(s) = &mut peer.unsent_pooled_hashes { + s.extend(hashes); + } + } + + for tx in txs { + self.unfetched_pooled_transactions.remove(&tx.hash()); } } @@ -1149,6 +1176,48 @@ impl ChainSync { } } + // get the peer to give us at least some of announced but unfetched transactions + if !self.unfetched_pooled_transactions.is_empty() { + if let Some(s) = &mut self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions { + let now = Instant::now(); + + let mut new_asking_pooled_transactions = s.iter().copied().collect::>(); + let mut remaining_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone(); + for (hash, mut item) in self.unfetched_pooled_transactions.drain() { + if new_asking_pooled_transactions.len() >= 256 { + // can't request any more transactions + break; + } + + // if enough time has passed since last attempt... + if item.next_fetch < now { + // ...queue this hash for requesting + new_asking_pooled_transactions.insert(hash); + item.tries += 1; + + // if we just started asking for it, queue it to be asked later on again + if item.tries < 5 { + item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2); + remaining_unfetched_pooled_transactions.insert(hash, item); + } else { + // ...otherwise we assume this transaction does not exist and remove its hash from request queue + remaining_unfetched_pooled_transactions.remove(&hash); + } + } + } + + let new_asking_pooled_transactions = new_asking_pooled_transactions.into_iter().collect::>(); + SyncRequester::request_pooled_transactions(self, io, peer_id, &new_asking_pooled_transactions); + + self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions = Some(new_asking_pooled_transactions); + self.unfetched_pooled_transactions = remaining_unfetched_pooled_transactions; + + return; + } else { + trace!(target: "sync", "Skipping transaction fetch for peer {} as they don't support eth/65", peer_id); + } + } + // Only ask for old blocks if the peer has an equal or higher difficulty let equal_or_higher_difficulty = peer_difficulty.map_or(true, |pd| pd >= syncing_difficulty); @@ -1340,6 +1409,7 @@ impl ChainSync { PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT, PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT, PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT, + PeerAsking::PooledTransactions => elapsed > POOLED_TRANSACTIONS_TIMEOUT, PeerAsking::Nothing => false, PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT, PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT, @@ -1668,10 +1738,12 @@ pub mod tests { genesis: H256::zero(), network_id: 0, latest_hash: peer_latest_hash, + unsent_pooled_hashes: Some(Default::default()), difficulty: None, asking: PeerAsking::Nothing, asking_blocks: Vec::new(), asking_hash: None, + asking_pooled_transactions: Some(Vec::new()), asking_private_state: None, ask_time: Instant::now(), last_sent_transactions: Default::default(), diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 78bd9070fd6..230e4a30f73 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -29,12 +29,7 @@ use rand::RngCore; use rlp::{Encodable, RlpStream}; use common_types::{blockchain_info::BlockChainInfo, transaction::SignedTransaction, BlockNumber}; -use super::sync_packet::SyncPacket::{ - NewBlockHashesPacket, - TransactionsPacket, - NewBlockPacket, - ConsensusDataPacket, -}; +use super::sync_packet::SyncPacket::*; use super::{ random, @@ -98,11 +93,33 @@ impl SyncPropagator { /// propagates new transactions to all peers pub fn propagate_new_transactions bool>(sync: &mut ChainSync, io: &mut dyn SyncIo, mut should_continue: F) -> usize { + const NEW_POOLED_HASHES_LIMIT: usize = 4096; + // Early out if nobody to send to. if sync.peers.is_empty() { return 0; } + // propagate just hashes to newer peers + trace!(target: "sync", "Sending NewPooledTransactionsHashes to {:?}", sync.peers.keys()); + for (peer_id, peer) in &mut sync.peers { + let mut affected = false; + let mut packet = RlpStream::new(); + packet.begin_unbounded_list(); + if let Some(s) = &mut peer.unsent_pooled_hashes { + for item in s.drain().take(NEW_POOLED_HASHES_LIMIT) { + affected = true; + packet.append(&item); + } + } + + if affected { + packet.finalize_unbounded_list(); + + SyncPropagator::send_packet(io, *peer_id, NewPooledTransactionHashesPacket, packet.out()); + } + } + let transactions = io.chain().transactions_to_propagate(); if transactions.is_empty() { return 0; @@ -178,6 +195,11 @@ impl SyncPropagator { let peer_info = sync.peers.get_mut(&peer_id) .expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed"); + // We do not gossip full transactions to newer peers + if peer_info.protocol_version >= 65 { + continue; + } + // Send all transactions, if the peer doesn't know about anything if peer_info.last_sent_transactions.is_empty() { // update stats @@ -434,10 +456,12 @@ mod tests { genesis: H256::zero(), network_id: 0, latest_hash: client.block_hash_delta_minus(1), + unsent_pooled_hashes: Some(Default::default()), difficulty: None, asking: PeerAsking::Nothing, asking_blocks: Vec::new(), asking_hash: None, + asking_pooled_transactions: Some(Vec::new()), asking_private_state: None, ask_time: Instant::now(), last_sent_transactions: Default::default(), diff --git a/ethcore/sync/src/chain/requester.rs b/ethcore/sync/src/chain/requester.rs index 3b44a59e041..518127c752c 100644 --- a/ethcore/sync/src/chain/requester.rs +++ b/ethcore/sync/src/chain/requester.rs @@ -29,14 +29,7 @@ use rlp::RlpStream; use common_types::BlockNumber; use super::sync_packet::SyncPacket; -use super::sync_packet::SyncPacket::{ - GetBlockHeadersPacket, - GetBlockBodiesPacket, - GetReceiptsPacket, - GetSnapshotManifestPacket, - GetSnapshotDataPacket, - GetPrivateStatePacket, -}; +use super::sync_packet::SyncPacket::*; use super::{ BlockSet, @@ -87,6 +80,17 @@ impl SyncRequester { SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, GetBlockHeadersPacket, rlp.out()); } + /// Request pooled transactions from a peer + pub fn request_pooled_transactions(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, hashes: &[H256]) { + trace!(target: "sync", "{} <- GetPooledTransactions: {:?}", peer_id, hashes); + let mut rlp = RlpStream::new_list(hashes.len()); + for h in hashes { + rlp.append(h); + } + + SyncRequester::send_request(sync, io, peer_id, PeerAsking::PooledTransactions, PooledTransactionsPacket, rlp.out()) + } + /// Find some headers or blocks to download from a peer. pub fn request_snapshot_data(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId) { // find chunk data to download diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 95da5910bc6..56a510da235 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -29,25 +29,7 @@ use rlp::{Rlp, RlpStream}; use common_types::{ids::BlockId, BlockNumber}; use super::sync_packet::{PacketInfo, SyncPacket}; -use super::sync_packet::SyncPacket::{ - StatusPacket, - TransactionsPacket, - GetBlockHeadersPacket, - BlockHeadersPacket, - GetBlockBodiesPacket, - BlockBodiesPacket, - GetNodeDataPacket, - NodeDataPacket, - GetReceiptsPacket, - ReceiptsPacket, - GetSnapshotManifestPacket, - SnapshotManifestPacket, - GetSnapshotDataPacket, - SnapshotDataPacket, - ConsensusDataPacket, - GetPrivateStatePacket, - PrivateStatePacket, -}; +use super::sync_packet::SyncPacket::*; use super::{ ChainSync, @@ -74,6 +56,11 @@ impl SyncSupplier { if let Some(id) = SyncPacket::from_u8(packet_id) { let result = match id { + GetPooledTransactionsPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_pooled_transactions, + |e| format!("Error sending pooled transactions: {:?}", e)), + GetBlockBodiesPacket => SyncSupplier::return_rlp( io, &rlp, peer, SyncSupplier::return_block_bodies, @@ -265,6 +252,24 @@ impl SyncSupplier { Ok(Some((BlockHeadersPacket, rlp))) } + /// Respond to GetPooledTransactions request + fn return_pooled_transactions(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { + const LIMIT: usize = 256; + + let transactions = r.iter().take(LIMIT).filter_map(|v| { + v.as_val::().ok().and_then(|hash| io.chain().queued_transaction(hash)) + }).collect::>(); + + let added = transactions.len(); + let mut rlp = RlpStream::new_list(added); + for tx in transactions { + rlp.append(tx.signed()); + } + + trace!(target: "sync", "{} -> GetPooledTransactions: returned {} entries", peer_id, added); + Ok(Some((PooledTransactionsPacket, rlp))) + } + /// Respond to GetBlockBodies request fn return_block_bodies(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { let payload_soft_limit = io.payload_soft_limit(); diff --git a/ethcore/sync/src/chain/sync_packet.rs b/ethcore/sync/src/chain/sync_packet.rs index bba38d3548d..6001ad929fc 100644 --- a/ethcore/sync/src/chain/sync_packet.rs +++ b/ethcore/sync/src/chain/sync_packet.rs @@ -44,6 +44,9 @@ pub enum SyncPacket { GetBlockBodiesPacket = 0x05, BlockBodiesPacket = 0x06, NewBlockPacket = 0x07, + NewPooledTransactionHashesPacket = 0x08, + GetPooledTransactionsPacket = 0x09, + PooledTransactionsPacket = 0x0a, GetNodeDataPacket = 0x0d, NodeDataPacket = 0x0e, @@ -85,6 +88,9 @@ impl PacketInfo for SyncPacket { GetBlockBodiesPacket | BlockBodiesPacket | NewBlockPacket | + NewPooledTransactionHashesPacket | + GetPooledTransactionsPacket | + PooledTransactionsPacket | GetNodeDataPacket| NodeDataPacket | diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index eaadf2f3d95..25aba8082bc 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -25,7 +25,7 @@ use crate::{ PacketInfo, SyncPacket::{self, PrivateTransactionPacket, SignedPrivateTransactionPacket} }, - ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_64, PAR_PROTOCOL_VERSION_4 + ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_65, PAR_PROTOCOL_VERSION_4 }, private_tx::SimplePrivateTxHandler, sync_io::SyncIo, @@ -157,7 +157,7 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { } fn protocol_version(&self, protocol: &ProtocolId, _peer_id: PeerId) -> u8 { - if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_4.0 } else { ETH_PROTOCOL_VERSION_64.0 } + if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_4.0 } else { ETH_PROTOCOL_VERSION_65.0 } } fn is_expired(&self) -> bool { diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index fa28a7e4303..998054dbde7 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -309,7 +309,7 @@ impl EthClient where } fn transaction(&self, id: PendingTransactionId) -> Result> { - let client_transaction = |id| match self.client.transaction(id) { + let client_transaction = |id| match self.client.block_transaction(id) { Some(t) => Ok(Some(Transaction::from_localized(t))), None => Ok(None), };