Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc-v2: Implement archive_unstable_storageDiff #5997

Merged
merged 60 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
e17a555
archive/api: Add stroage diff method
lexnv Oct 9, 2024
4101ee6
storage/events: Add storage events and test serialization
lexnv Oct 9, 2024
d781d67
archive: Add initial implementation for storage diff
lexnv Oct 9, 2024
60e1a93
archive: Handle modified and delete cases
lexnv Oct 9, 2024
63da031
archive/events: Derive more impl for diff items
lexnv Oct 9, 2024
25b9e5a
archive: Deduplicate input keys
lexnv Oct 9, 2024
52cfaf7
archive: Move code to different module
lexnv Oct 9, 2024
ee6bed7
archive: Fetch value / hash or both
lexnv Oct 9, 2024
8948e74
events: Add subscription events for ArchiveStorageDiffEvent
lexnv Nov 4, 2024
fbfb0dc
archive: Make archive_storageDiff a subscription
lexnv Nov 4, 2024
f7b33ed
archive: Modify storageDiff to leverage subscription backpressure
lexnv Nov 4, 2024
f47db36
archive: Deduplicate items to a separate function
lexnv Nov 5, 2024
97546a4
archive/tests: Check deduplication
lexnv Nov 5, 2024
baa3dd0
archive: Fix deduplication shortest key
lexnv Nov 5, 2024
4683f6b
archive/tests: Check complex deduplication
lexnv Nov 5, 2024
36f59d2
archive: Simplify logic of trie iteration
lexnv Nov 5, 2024
a084f47
archive: Improve documentation
lexnv Nov 5, 2024
77cac50
archive/events: Add derive(Eq) to archive storage diff events
lexnv Nov 5, 2024
e2324a4
archive/tests: Check query for main trie under prefix
lexnv Nov 5, 2024
4b05913
archive: Add trace and debug logs for storage diff
lexnv Nov 5, 2024
83cb9b4
archive: Send StorageDiffDone for partial trie queries
lexnv Nov 5, 2024
85c810e
archive/tests: Check no changes between blocks
lexnv Nov 5, 2024
152d13b
archive/tests: Ensure added key
lexnv Nov 5, 2024
4f549bf
archive/tests: Extend test with interleaved values and hashes
lexnv Nov 5, 2024
d208068
rpc-v2: Use indexmap crate
lexnv Nov 5, 2024
369a4d2
archive: Move prevHash as last parameter to archive diff
lexnv Nov 5, 2024
a02cc84
archive: Preserve order of elements with indexMap
lexnv Nov 5, 2024
637b446
archive/tests: Check deleted keys
lexnv Nov 5, 2024
76efb18
events: Add common wrappers for construction
lexnv Nov 5, 2024
ca1fa20
archive: Propagate errors via events
lexnv Nov 5, 2024
03eefd7
archive: Check invalid parameters and events
lexnv Nov 5, 2024
9a1e792
archive: Fix clippy
lexnv Nov 5, 2024
dbbfce1
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 5, 2024
971026d
Add prdoc
lexnv Nov 5, 2024
2880058
Merge branch 'master' into lexnv/storage-diff
lexnv Nov 6, 2024
b774855
archive/storage: Optimize space used for saved keys
lexnv Nov 6, 2024
838504b
Merge remote-tracking branch 'origin/lexnv/storage-diff' into lexnv/s…
lexnv Nov 6, 2024
60abd9b
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 6, 2024
dc5f001
archive: Propagate all queries to handle_trie_queries
lexnv Nov 11, 2024
cdc4dc5
archive: Simplify process_events
lexnv Nov 11, 2024
de7cf79
archive: Add missing line
lexnv Nov 11, 2024
199b46d
archive: Refactor with FetchedStorage enum
lexnv Nov 11, 2024
3f4a4b8
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 11, 2024
f4ace06
archive: Refactor for optimal iteration cycles
lexnv Nov 11, 2024
55489c7
archive: Adjust code comments
lexnv Nov 12, 2024
4f94890
archive: Rename starts_with to belongs_to_query
lexnv Nov 12, 2024
a7e54a5
Update prdoc
lexnv Nov 12, 2024
5341227
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 12, 2024
751d8ff
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 26, 2024
3c2bb7c
archive: Remove variable
lexnv Nov 26, 2024
8f8c0c0
Update cargo lock
lexnv Nov 26, 2024
07a459c
archive: Mode deduplicate_storage_diff_items to inenr methods
lexnv Nov 26, 2024
a151bf3
archive/tests: Ensure other storage entries are not provided
lexnv Nov 26, 2024
a575b4a
archive/diff: Implement lexicographic_diff
lexnv Nov 27, 2024
caa75cc
archive/tests: Check lexicographic diff
lexnv Nov 27, 2024
7ff0bdb
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 27, 2024
c6e40a0
archive: Remove commented code and add documentation
lexnv Nov 27, 2024
a07b5c4
Update substrate/client/rpc-spec-v2/src/archive/api.rs
lexnv Nov 27, 2024
ae00327
archive: Remove unneeded lifetime
lexnv Nov 27, 2024
fd48434
Merge remote-tracking branch 'origin/lexnv/storage-diff' into lexnv/s…
lexnv Nov 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions prdoc/pr_5997.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Implement archive_unstable_storageDiff method

doc:
- audience: Node Dev
description: |
This PR implements the `archive_unstable_storageDiff` rpc-v2 method.
Developers can use this method to fetch the storage differences
between two blocks. This is useful for oracles and archive nodes.
For more details see: https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/archive_unstable_storageDiff.md.

crates:
- name: sc-rpc-spec-v2
bump: major
- name: sc-service
bump: patch
1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ log = { workspace = true, default-features = true }
futures-util = { workspace = true }
rand = { workspace = true, default-features = true }
schnellru = { workspace = true }
itertools = { workspace = true }

[dev-dependencies]
async-trait = { workspace = true }
Expand Down
22 changes: 21 additions & 1 deletion substrate/client/rpc-spec-v2/src/archive/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
//! API trait of the archive methods.

use crate::{
common::events::{ArchiveStorageResult, PaginatedStorageQuery},
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
},
MethodResult,
};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
Expand Down Expand Up @@ -104,4 +107,21 @@ pub trait ArchiveApi<Hash> {
items: Vec<PaginatedStorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<ArchiveStorageResult>;

/// Returns the storage difference between two blocks.
///
/// # Unstable
///
/// This method is unstable and can change in minor or patch releases.
#[subscription(
name = "archive_unstable_storageDiff" => "archive_unstable_storageDiffEvent",
unsubscribe = "archive_unstable_storageDiff_stopStorageDiff",
item = ArchiveStorageDiffEvent,
)]
fn archive_unstable_storage_diff(
&self,
hash: Hash,
items: Vec<ArchiveStorageDiffItem<String>>,
previous_hash: Option<Hash>,
);
}
89 changes: 84 additions & 5 deletions substrate/client/rpc-spec-v2/src/archive/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,29 @@
//! API implementation for `archive`.

use crate::{
archive::{error::Error as ArchiveError, ArchiveApiServer},
common::events::{ArchiveStorageResult, PaginatedStorageQuery},
hex_string, MethodResult,
archive::{
archive_storage::{ArchiveStorage, ArchiveStorageDiff},
error::Error as ArchiveError,
ArchiveApiServer,
},
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
},
hex_string, MethodResult, SubscriptionTaskExecutor,
};

use codec::Encode;
use jsonrpsee::core::{async_trait, RpcResult};
use futures::FutureExt;
use jsonrpsee::{
core::{async_trait, RpcResult},
PendingSubscriptionSink,
};
use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
StorageProvider,
};
use sc_rpc::utils::Subscription;
use sp_api::{CallApiAt, CallContext};
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
Expand All @@ -41,7 +53,9 @@ use sp_runtime::{
};
use std::{collections::HashSet, marker::PhantomData, sync::Arc};

use super::archive_storage::ArchiveStorage;
use tokio::sync::mpsc;

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";

/// The configuration of [`Archive`].
pub struct ArchiveConfig {
Expand All @@ -64,6 +78,12 @@ const MAX_DESCENDANT_RESPONSES: usize = 5;
/// `MAX_DESCENDANT_RESPONSES`.
const MAX_QUERIED_ITEMS: usize = 8;

/// The buffer capacity for each storage query.
///
/// This is small because the underlying JSON-RPC server has
/// its down buffer capacity per connection as well.
const STORAGE_QUERY_BUF: usize = 16;

impl Default for ArchiveConfig {
fn default() -> Self {
Self {
Expand All @@ -79,6 +99,8 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
client: Arc<Client>,
/// Backend of the chain.
backend: Arc<BE>,
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of items the `archive_storage` can return for a descendant query before
Expand All @@ -96,12 +118,14 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
client: Arc<Client>,
backend: Arc<BE>,
genesis_hash: GenesisHash,
executor: SubscriptionTaskExecutor,
config: ArchiveConfig,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
backend,
executor,
genesis_hash,
storage_max_descendant_responses: config.max_descendant_responses,
storage_max_queried_items: config.max_queried_items,
Expand Down Expand Up @@ -278,4 +302,59 @@ where

Ok(storage_client.handle_query(hash, items, child_trie))
}

fn archive_unstable_storage_diff(
&self,
pending: PendingSubscriptionSink,
hash: Block::Hash,
items: Vec<ArchiveStorageDiffItem<String>>,
previous_hash: Option<Block::Hash>,
) {
let storage_client = ArchiveStorageDiff::new(self.client.clone());
let client = self.client.clone();

log::trace!(target: LOG_TARGET, "Storage diff subscription started");

let fut = async move {
let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };

let previous_hash = if let Some(previous_hash) = previous_hash {
previous_hash
} else {
let Ok(Some(current_header)) = client.header(hash) else {
let message = format!("Block header is not present: {hash}");
let _ = sink.send(&ArchiveStorageDiffEvent::err(message)).await;
return
};
*current_header.parent_hash()
};

let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
let storage_fut =
storage_client.handle_trie_queries(hash, items, previous_hash, tx.clone());

// We don't care about the return value of this join:
// - process_events might encounter an error (if the client disconnected)
// - storage_fut might encounter an error while processing a trie queries and
// the error is propagated via the sink.
let _ = futures::future::join(storage_fut, process_events(&mut rx, &mut sink)).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

};

self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}
}

/// Sends all the events to the sink.
async fn process_events(rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>, sink: &mut Subscription) {
while let Some(event) = rx.recv().await {
if event.is_done() {
log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
} else if event.is_err() {
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
}

if sink.send(&event).await.is_err() {
return
}
}
}
Loading
Loading