Skip to content

Commit

Permalink
Using a runtime with no lifo slot.
Browse files Browse the repository at this point in the history
As spotted by @PSeitz, the unstealable lifo-slot is causing the
doc processor and the indexer to run on the same thread.

For the moment this is disabled by default and only apply to the
indexer blocking runtime.
  • Loading branch information
fulmicoton committed May 16, 2024
1 parent 4e88883 commit cdcb05b
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 18 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ COPY --from=ui-builder /quickwit/quickwit-ui/build /quickwit/quickwit-ui/build

WORKDIR /quickwit

ENV RUSTFLAGS="--cfg tokio_unstable"
RUN echo "Building workspace with feature(s) '$CARGO_FEATURES' and profile '$CARGO_PROFILE'" \
&& RUSTFLAGS="--cfg tokio_unstable" \
cargo build \
Expand Down
2 changes: 2 additions & 0 deletions quickwit/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
8 changes: 7 additions & 1 deletion quickwit/quickwit-common/src/runtimes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,13 @@ impl Default for RuntimesConfig {
fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
let mut runtimes = HashMap::with_capacity(2);

let blocking_runtime = tokio::runtime::Builder::new_multi_thread()
let disable_lifo_slot: bool = crate::get_from_env("QW_DISABLE_TOKIO_LIFO_SLOT", false);

let mut blocking_runtime_builder = tokio::runtime::Builder::new_multi_thread();
if disable_lifo_slot {
blocking_runtime_builder.disable_lifo_slot();
}
let blocking_runtime = blocking_runtime_builder
.worker_threads(config.num_threads_blocking)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
Expand Down
15 changes: 4 additions & 11 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,10 @@ pub fn do_nothing_env_filter_reload_fn() -> EnvFilterReloadFn {
}

fn get_metastore_client_max_concurrency() -> usize {
std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok()
.and_then(|metastore_client_max_concurrency_str| {
if let Ok(metastore_client_max_concurrency) = metastore_client_max_concurrency_str.parse::<usize>() {
info!("overriding max concurrent metastore requests to {metastore_client_max_concurrency}");
Some(metastore_client_max_concurrency)
} else {
error!("failed to parse environment variable `{METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY}={metastore_client_max_concurrency_str}`");
None
}
})
.unwrap_or(DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY)
quickwit_common::get_from_env(
METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY,
DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY,
)
}

static CP_GRPC_CLIENT_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::ops::Range;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{env, fmt, io};
use std::{fmt, io};

use anyhow::{anyhow, Context as AnyhhowContext};
use async_trait::async_trait;
Expand Down Expand Up @@ -59,11 +59,7 @@ use crate::{
/// Semaphore to limit the number of concurent requests to the object store. Some object stores
/// (R2, SeaweedFs...) return errors when too many concurrent requests are emitted.
static REQUEST_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| {
let num_permits: usize = env::var("QW_S3_MAX_CONCURRENCY")
.as_deref()
.unwrap_or("10000")
.parse()
.expect("QW_S3_MAX_CONCURRENCY value should be a number.");
let num_permits: usize = quickwit_common::get_from_env("QW_S3_MAX_CONCURRENCY", 10_000usize);
Semaphore::new(num_permits)
});

Expand Down

0 comments on commit cdcb05b

Please sign in to comment.