Skip to content

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed May 16, 2024
1 parent 9684afd commit 0c1b96b
Showing 1 changed file with 26 additions and 10 deletions.
36 changes: 26 additions & 10 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,9 +860,25 @@ pub async fn multi_leaf_search(

for leaf_search_request_ref in leaf_search_request.leaf_requests.into_iter() {
let index_uri = quickwit_common::uri::Uri::from_str(
&leaf_search_request.index_uris[leaf_search_request_ref.index_uri_ord as usize],
&leaf_search_request
.index_uris
.get(leaf_search_request_ref.index_uri_ord as usize)
.ok_or_else(|| {
SearchError::Internal(format!(
"Received incorrect request, index_uri_ord out of bounds: {}",
leaf_search_request_ref.index_uri_ord
))
})?,
)?;
let doc_mapper = doc_mappers[leaf_search_request_ref.doc_mapper_ord as usize].clone();
let doc_mapper = doc_mappers
.get(leaf_search_request_ref.doc_mapper_ord as usize)
.ok_or_else(|| {
SearchError::Internal(format!(
"Received incorrect request, doc_mapper_ord out of bounds: {}",
leaf_search_request_ref.doc_mapper_ord
))
})?
.clone();

let leaf_request_future = tokio::spawn(
resolve_storage_and_leaf_search(
Expand Down Expand Up @@ -897,35 +913,34 @@ pub async fn multi_leaf_search(
}
}

crate::run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into))
crate::search_thread_pool()
.run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into))
.instrument(info_span!("incremental_merge_finalize"))
.await
.context("failed to merge split search responses")?
}

/// Resolves storage and calls leaf_search
pub async fn resolve_storage_and_leaf_search(
async fn resolve_storage_and_leaf_search(
searcher_context: Arc<SearcherContext>,
search_request: Arc<SearchRequest>,
index_uri: quickwit_common::uri::Uri,
storage_resolver: StorageResolver,
splits: Vec<SplitIdAndFooterOffsets>,
doc_mapper: Arc<dyn DocMapper>,
aggregations_limits: AggregationLimits,
) -> Result<LeafSearchResponse, SearchError> {
) -> crate::Result<LeafSearchResponse> {
let storage = storage_resolver.resolve(&index_uri).await?;

let resp = leaf_search(
leaf_search(
searcher_context.clone(),
search_request.clone(),
storage.clone(),
splits,
doc_mapper,
aggregations_limits,
)
.await?;

Ok(resp)
.await
}

/// `leaf` step of search.
Expand Down Expand Up @@ -1021,7 +1036,8 @@ pub async fn leaf_search(
}
}

let result = crate::run_cpu_intensive(|| incremental_merge_collector.finalize())
let result = crate::search_thread_pool()
.run_cpu_intensive(|| incremental_merge_collector.finalize())
.instrument(info_span!("incremental_merge_intermediate"))
.await
.context("failed to merge split search responses")??;
Expand Down

0 comments on commit 0c1b96b

Please sign in to comment.