Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using a runtime with no lifo slot. #4771

Merged
merged 1 commit into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quickwit/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
8 changes: 7 additions & 1 deletion quickwit/quickwit-common/src/runtimes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,13 @@ impl Default for RuntimesConfig {
fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
let mut runtimes = HashMap::with_capacity(2);

let blocking_runtime = tokio::runtime::Builder::new_multi_thread()
let disable_lifo_slot: bool = crate::get_from_env("QW_DISABLE_TOKIO_LIFO_SLOT", false);

let mut blocking_runtime_builder = tokio::runtime::Builder::new_multi_thread();
if disable_lifo_slot {
blocking_runtime_builder.disable_lifo_slot();
}
let blocking_runtime = blocking_runtime_builder
.worker_threads(config.num_threads_blocking)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
Expand Down
15 changes: 4 additions & 11 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,10 @@ pub fn do_nothing_env_filter_reload_fn() -> EnvFilterReloadFn {
}

fn get_metastore_client_max_concurrency() -> usize {
std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok()
.and_then(|metastore_client_max_concurrency_str| {
if let Ok(metastore_client_max_concurrency) = metastore_client_max_concurrency_str.parse::<usize>() {
info!("overriding max concurrent metastore requests to {metastore_client_max_concurrency}");
Some(metastore_client_max_concurrency)
} else {
error!("failed to parse environment variable `{METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY}={metastore_client_max_concurrency_str}`");
None
}
})
.unwrap_or(DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY)
quickwit_common::get_from_env(
METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY,
DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY,
)
}

static CP_GRPC_CLIENT_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::ops::Range;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{env, fmt, io};
use std::{fmt, io};

use anyhow::{anyhow, Context as AnyhhowContext};
use async_trait::async_trait;
Expand Down Expand Up @@ -59,11 +59,7 @@ use crate::{
/// Semaphore to limit the number of concurent requests to the object store. Some object stores
/// (R2, SeaweedFs...) return errors when too many concurrent requests are emitted.
static REQUEST_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| {
let num_permits: usize = env::var("QW_S3_MAX_CONCURRENCY")
.as_deref()
.unwrap_or("10000")
.parse()
.expect("QW_S3_MAX_CONCURRENCY value should be a number.");
let num_permits: usize = quickwit_common::get_from_env("QW_S3_MAX_CONCURRENCY", 10_000usize);
Semaphore::new(num_permits)
});

Expand Down
Loading