Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport/rpc-v2: Refactor archive_storage method into subscription (#6483) #6710

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions substrate/client/rpc-spec-v2/src/archive/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

use crate::{
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery,
},
MethodResult,
};
Expand Down Expand Up @@ -100,13 +99,17 @@ pub trait ArchiveApi<Hash> {
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "archive_unstable_storage", blocking)]
#[subscription(
name = "archive_unstable_storage" => "archive_unstable_storageEvent",
unsubscribe = "archive_unstable_stopStorage",
item = ArchiveStorageEvent,
)]
fn archive_unstable_storage(
&self,
hash: Hash,
items: Vec<PaginatedStorageQuery<String>>,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<ArchiveStorageResult>;
);

/// Returns the storage difference between two blocks.
///
Expand Down
202 changes: 107 additions & 95 deletions substrate/client/rpc-spec-v2/src/archive/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

use crate::{
archive::{
archive_storage::{ArchiveStorage, ArchiveStorageDiff},
error::Error as ArchiveError,
ArchiveApiServer,
archive_storage::ArchiveStorageDiff, error::Error as ArchiveError, ArchiveApiServer,
},
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
common::{
events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery,
},
storage::{QueryResult, StorageSubscriptionClient},
},
hex_string, MethodResult, SubscriptionTaskExecutor,
};
Expand Down Expand Up @@ -57,42 +57,12 @@ use tokio::sync::mpsc;

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";

/// The configuration of [`Archive`].
pub struct ArchiveConfig {
/// The maximum number of items the `archive_storage` can return for a descendant query before
/// pagination is required.
pub max_descendant_responses: usize,
/// The maximum number of queried items allowed for the `archive_storage` at a time.
pub max_queried_items: usize,
}

/// The maximum number of items the `archive_storage` can return for a descendant query before
/// pagination is required.
///
/// Note: this is identical to the `chainHead` value.
const MAX_DESCENDANT_RESPONSES: usize = 5;

/// The maximum number of queried items allowed for the `archive_storage` at a time.
///
/// Note: A queried item can also be a descendant query which can return up to
/// `MAX_DESCENDANT_RESPONSES`.
const MAX_QUERIED_ITEMS: usize = 8;

/// The buffer capacity for each storage query.
///
/// This is small because the underlying JSON-RPC server has
/// its down buffer capacity per connection as well.
const STORAGE_QUERY_BUF: usize = 16;

impl Default for ArchiveConfig {
fn default() -> Self {
Self {
max_descendant_responses: MAX_DESCENDANT_RESPONSES,
max_queried_items: MAX_QUERIED_ITEMS,
}
}
}

/// An API for archive RPC calls.
pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
Expand All @@ -103,11 +73,6 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
executor: SubscriptionTaskExecutor,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of items the `archive_storage` can return for a descendant query before
/// pagination is required.
storage_max_descendant_responses: usize,
/// The maximum number of queried items allowed for the `archive_storage` at a time.
storage_max_queried_items: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
Expand All @@ -119,18 +84,9 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
backend: Arc<BE>,
genesis_hash: GenesisHash,
executor: SubscriptionTaskExecutor,
config: ArchiveConfig,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
backend,
executor,
genesis_hash,
storage_max_descendant_responses: config.max_descendant_responses,
storage_max_queried_items: config.max_queried_items,
_phantom: PhantomData,
}
Self { client, backend, executor, genesis_hash, _phantom: PhantomData }
}
}

Expand Down Expand Up @@ -260,47 +216,53 @@ where

fn archive_unstable_storage(
&self,
pending: PendingSubscriptionSink,
hash: Block::Hash,
items: Vec<PaginatedStorageQuery<String>>,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<ArchiveStorageResult> {
let items = items
.into_iter()
.map(|query| {
let key = StorageKey(parse_hex_param(query.key)?);
let pagination_start_key = query
.pagination_start_key
.map(|key| parse_hex_param(key).map(|key| StorageKey(key)))
.transpose()?;

// Paginated start key is only supported
if pagination_start_key.is_some() && !query.query_type.is_descendant_query() {
return Err(ArchiveError::InvalidParam(
"Pagination start key is only supported for descendants queries"
.to_string(),
))
}
) {
let mut storage_client =
StorageSubscriptionClient::<Client, Block, BE>::new(self.client.clone());

let fut = async move {
let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };

Ok(PaginatedStorageQuery {
key,
query_type: query.query_type,
pagination_start_key,
let items = match items
.into_iter()
.map(|query| {
let key = StorageKey(parse_hex_param(query.key)?);
Ok(StorageQuery { key, query_type: query.query_type })
})
})
.collect::<Result<Vec<_>, ArchiveError>>()?;
.collect::<Result<Vec<_>, ArchiveError>>()
{
Ok(items) => items,
Err(error) => {
let _ = sink.send(&ArchiveStorageEvent::err(error.to_string()));
return
},
};

let child_trie = child_trie
.map(|child_trie| parse_hex_param(child_trie))
.transpose()?
.map(ChildInfo::new_default_from_vec);
let child_trie = child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose();
let child_trie = match child_trie {
Ok(child_trie) => child_trie.map(ChildInfo::new_default_from_vec),
Err(error) => {
let _ = sink.send(&ArchiveStorageEvent::err(error.to_string()));
return
},
};

let storage_client = ArchiveStorage::new(
self.client.clone(),
self.storage_max_descendant_responses,
self.storage_max_queried_items,
);
let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
let storage_fut = storage_client.generate_events(hash, items, child_trie, tx);

Ok(storage_client.handle_query(hash, items, child_trie))
// We don't care about the return value of this join:
// - process_events might encounter an error (if the client disconnected)
// - storage_fut might encounter an error while processing a trie queries and
// the error is propagated via the sink.
let _ = futures::future::join(storage_fut, process_storage_events(&mut rx, &mut sink))
.await;
};

self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}

fn archive_unstable_storage_diff(
Expand Down Expand Up @@ -337,24 +299,74 @@ where
// - process_events might encounter an error (if the client disconnected)
// - storage_fut might encounter an error while processing a trie queries and
// the error is propagated via the sink.
let _ = futures::future::join(storage_fut, process_events(&mut rx, &mut sink)).await;
let _ =
futures::future::join(storage_fut, process_storage_diff_events(&mut rx, &mut sink))
.await;
};

self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}
}

/// Sends all the events to the sink.
async fn process_events(rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>, sink: &mut Subscription) {
while let Some(event) = rx.recv().await {
if event.is_done() {
log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
} else if event.is_err() {
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
/// Sends all the events of the storage_diff method to the sink.
async fn process_storage_diff_events(
rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>,
sink: &mut Subscription,
) {
loop {
tokio::select! {
_ = sink.closed() => {
return
},

maybe_event = rx.recv() => {
let Some(event) = maybe_event else {
break;
};

if event.is_done() {
log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
} else if event.is_err() {
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
}

if sink.send(&event).await.is_err() {
return
}
}
}
}
}

/// Sends all the events of the storage method to the sink.
async fn process_storage_events(rx: &mut mpsc::Receiver<QueryResult>, sink: &mut Subscription) {
loop {
tokio::select! {
_ = sink.closed() => {
break
}

maybe_storage = rx.recv() => {
let Some(event) = maybe_storage else {
break;
};

match event {
Ok(None) => continue,

Ok(Some(event)) =>
if sink.send(&ArchiveStorageEvent::result(event)).await.is_err() {
return
},

if sink.send(&event).await.is_err() {
return
Err(error) => {
let _ = sink.send(&ArchiveStorageEvent::err(error)).await;
return
}
}
}
}
}

let _ = sink.send(&ArchiveStorageEvent::StorageDone).await;
}
Loading
Loading