Skip to content

Commit

Permalink
Add DedicatedExecutor to FlightSQL Server (#247)
Browse files Browse the repository at this point in the history
Add's a dedicated executor for running CPU bound work on the FlightSQL
server.

There is interest from the [DataFusion
community](apache/datafusion#13274 (comment))
for this, it was already on our
[roadmap](#197)
and I think the DFT FlightSQL server is a great place to have a
reference implementation.

Initial inspiration and context can be found
[here](https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/).

Most of the initial implementation was copied from
[here](https://github.com/influxdata/influxdb3_core/blob/6fcbb004232738d55655f32f4ad2385523d10696/executor/src/lib.rs)
with some tweaks for our current setup. In particular we dont have
metrics yet in the FlightSQL server implementation (but it is on the
[roadmap](#210))
- I expect to do a follow on where metrics are integrated.
  • Loading branch information
matthewmturner authored Nov 19, 2024
1 parent c1973b1 commit e8c8e20
Show file tree
Hide file tree
Showing 13 changed files with 1,100 additions and 80 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ http-body = {version = "0.4.5" }
itertools = "0.13.0"
lazy_static = "1.4.0"
log = "0.4.22"
num_cpus = "1.16.0"
object_store = { version = "0.10.2", features = ["aws"], optional = true }
parking_lot = "0.12.3"
pin-project-lite = {version = "0.2.14" }
prost = "0.12.3"
ratatui = "0.28.0"
Expand Down
2 changes: 0 additions & 2 deletions f.csv

This file was deleted.

2 changes: 1 addition & 1 deletion src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub struct DftArgs {
#[clap(short = 'n', help = "Set the number of benchmark iterations to run")]
pub benchmark_iterations: Option<usize>,

#[cfg(feature = "flightsql")]
#[cfg(any(feature = "flightsql", feature = "experimental-flightsql-server"))]
#[clap(long, help = "Set the host and port to be used for FlightSQL")]
pub flightsql_host: Option<String>,
}
Expand Down
18 changes: 18 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ pub struct ExecutionConfig {
pub tui_batch_size: usize,
#[serde(default = "default_flightsql_server_batch_size")]
pub flightsql_server_batch_size: usize,
#[serde(default = "default_dedicated_executor_enabled")]
pub dedicated_executor_enabled: bool,
#[serde(default = "default_dedicated_executor_threads")]
pub dedicated_executor_threads: usize,
}

fn default_ddl_path() -> Option<PathBuf> {
Expand Down Expand Up @@ -204,6 +208,18 @@ fn default_flightsql_server_batch_size() -> usize {
8092
}

fn default_dedicated_executor_enabled() -> bool {
false
}

fn default_dedicated_executor_threads() -> usize {
// By default we slightly over provision CPUs. For example, if you have N CPUs available we
// have N CPUs for the [`DedicatedExecutor`] and 1 for the main / IO runtime.
//
// Ref: https://github.com/datafusion-contrib/datafusion-dft/pull/247#discussion_r1848270250
num_cpus::get()
}

impl Default for ExecutionConfig {
fn default() -> Self {
Self {
Expand All @@ -213,6 +229,8 @@ impl Default for ExecutionConfig {
cli_batch_size: default_cli_batch_size(),
tui_batch_size: default_tui_batch_size(),
flightsql_server_batch_size: default_flightsql_server_batch_size(),
dedicated_executor_enabled: default_dedicated_executor_enabled(),
dedicated_executor_threads: default_dedicated_executor_threads(),
}
}
}
Expand Down
Loading

0 comments on commit e8c8e20

Please sign in to comment.