Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DedicatedExecutor to FlightSQL Server #247

Merged

Conversation

matthewmturner
Copy link
Collaborator

@matthewmturner matthewmturner commented Nov 13, 2024

Add's a dedicated executor for running CPU bound work on the FlightSQL server.

There is interest from the DataFusion community for this, it was already on our roadmap and I think the DFT FlightSQL server is a great place to have a reference implementation.

Initial inspiration and context can be found here.

Most of the initial implementation was copied from here with some tweaks for our current setup. In particular we dont have metrics yet in the FlightSQL server implementation (but it is on the roadmap) - I expect to do a follow on where metrics are integrated.

f.csv Outdated Show resolved Hide resolved
@matthewmturner matthewmturner linked an issue Nov 13, 2024 that may be closed by this pull request
Comment on lines 121 to 123
// also register the IO runtime for the current thread, since it might be used as well (esp. for the
// current thread RT)
register_io_runtime(io_handle.clone());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me what this means in practice. Maybe will become more obvious once I start plugging this in to the rest of the app.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It basically means that the IO (e.g. for object store) will be done on the "current" tokio run time(aka the implicit one that is created by #[tokio::main]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that part i was clear on - i meant more under what circumstances its used in actual app code. no real concern right now - just memorializing my thoughts as i work on this.

Comment on lines 126 to 129
let runtime = runtime_builder
.on_thread_start(move || register_io_runtime(io_handle.clone()))
.build()
.expect("Creating tokio runtime");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to allow configuring the number of threads - but i guess that could be done by the caller since this takes tokio::runtime::Builder

Comment on lines 143 to 145
if tx_handle.send(Handle::current()).is_err() {
return;
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we want a log or something here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think typically when a send handle (tx) fails to send it means the receiving side has hung up (no one is there to get the message), which can happen during normal shutdown

@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

pub mod dedicated_executor;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will probably end up including this in flightsql feature

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool @matthewmturner -- thank you

cc @crepererum and @tustvold

Comment on lines 121 to 123
// also register the IO runtime for the current thread, since it might be used as well (esp. for the
// current thread RT)
register_io_runtime(io_handle.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It basically means that the IO (e.g. for object store) will be done on the "current" tokio run time(aka the implicit one that is created by #[tokio::main]

Comment on lines 143 to 145
if tx_handle.send(Handle::current()).is_err() {
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think typically when a send handle (tx) fails to send it means the receiving side has hung up (no one is there to get the message), which can happen during normal shutdown

/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for
/// [`start_shutdown`](Self::start_shutdown) and signals the completion via
/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side).
struct State {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to use a tokio JoinSet https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html instead of this now (I think this code predates JoinSet)

// specific language governing permissions and limitations
// under the License.

use std::{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great -- thank you @matthewmturner

Once we get this sorted out I definitely think we should contemplate merging it back upstream in DataFusion (with documentation). I can totally help with that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

@matthewmturner matthewmturner changed the title Start setting up dedicated executor Add dedicated executor for FlightSQL Server Nov 13, 2024
@alamb
Copy link
Contributor

alamb commented Nov 14, 2024

BTW I wrote up a bunch of backstory about why a separate executor is needed in apache/datafusion#13423

I hope to get an example up soon (that will show why this DedicatedExecutor is much nicer)

@matthewmturner
Copy link
Collaborator Author

@alamb thanks for that - it looks great.

Im just plugging along getting this integrated into the flightsql server here.

My first objective is to get all the pipes working and all the CPU bound work being executed by the current implementation of dedicated executor.

After that I will probably look into updating the implementation to use the JoinSet like you mentioned or improve the observability story with tokio-metrics and/or tokio-console

@matthewmturner matthewmturner changed the title Add dedicated executor for FlightSQL Server Add DedicatedExecutor to FlightSQL Server Nov 15, 2024
Comment on lines +144 to +154
pub async fn statement_to_logical_plan(&self, statement: Statement) -> Result<LogicalPlan> {
let ctx = self.session_ctx.clone();
let task = async move { ctx.state().statement_to_plan(statement).await };
if let Some(executor) = &self.executor {
let job = executor.spawn(task).map_err(|e| eyre::eyre!(e));
let job_res = job.await?;
job_res.map_err(|e| eyre!(e))
} else {
task.await.map_err(|e| eyre!(e))
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using dedicated executor here for logical planning

Comment on lines +156 to +173
/// Executes the provided `LogicalPlan` returning a `SendableRecordBatchStream`. Uses the [`DedicatedExecutor`] if it is available.
pub async fn execute_logical_plan(
&self,
logical_plan: LogicalPlan,
) -> Result<SendableRecordBatchStream> {
let ctx = self.session_ctx.clone();
let task = async move {
let df = ctx.execute_logical_plan(logical_plan).await?;
df.execute_stream().await
};
if let Some(executor) = &self.executor {
let job = executor.spawn(task).map_err(|e| eyre!(e));
let job_res = job.await?;
job_res.map_err(|e| eyre!(e))
} else {
task.await.map_err(|e| eyre!(e))
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using dedicated executor here for stream execution

.await
{

let logical_plan = self.execution.statement_to_logical_plan(statement).await;
Copy link
Collaborator Author

@matthewmturner matthewmturner Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlightSQL server calls the method that use dedicated executor here

Comment on lines -165 to 152
match self
let stream = self
.execution
.session_ctx()
.execute_logical_plan(plan)
.await
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlightSQL server calls the method that use dedicated executor here

src/execution/local.rs Outdated Show resolved Hide resolved
src/config.rs Outdated
Comment on lines 215 to 217
fn default_dedicated_executor_threads_percent() -> f64 {
0.75
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default we give the dedicated executor 75% of available CPU threads - since this in config it is configurable

Comment on lines 166 to 175
let cpus = num_cpus::get();
let cpu_threads =
(config.dedicated_executor_threads_percent * cpus as f64) as usize;

let mut runtime_builder = runtime_builder;
let runtime = runtime_builder
.worker_threads(cpu_threads)
.on_thread_start(move || register_io_runtime(io_handle.clone()))
.build()
.expect("Creating tokio runtime");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set CPU worker threads here

src/main.rs Outdated
Comment on lines 41 to 46
let main_threads = if state.config.execution.dedicated_executor_enabled {
// Just for IO
(cpus as f64 * (1.0 - state.config.execution.dedicated_executor_threads_percent)) as usize
} else {
cpus
};
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set main / io worker threads here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe i could just build the threads logic into the config (i.e. have fields for main / dedicated executor threads) that are computed at config load time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW we found it is ok to slightly over commit the CPUs (like if you have 8 CPUs total, it is often fine to have an 8 CPUs in the dedicated worker, and 1 CPU).

For most workloads, leaving a single CPU to process network requests is totally fine (e.g. with 8 CPUs total, using 7 in the dedicated executor is fine as it will leave 1 entire CPU to do most of the real work)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes a lot of sense

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to this

@matthewmturner matthewmturner requested a review from alamb November 18, 2024 14:56
@matthewmturner
Copy link
Collaborator Author

@alamb if you get the chance to review this again would be great, i think its getting close. i plan to do a once over and add some more comments in the next day and hopefully merge after that.

@matthewmturner matthewmturner marked this pull request as ready for review November 18, 2024 15:07
@alamb
Copy link
Contributor

alamb commented Nov 18, 2024

Will try and check it out tomorrow morning

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @matthewmturner -- I think this looks very cool. It is also inspiring me to port this stuff upstream for the "different threadpool example"

src/main.rs Outdated
Comment on lines 41 to 46
let main_threads = if state.config.execution.dedicated_executor_enabled {
// Just for IO
(cpus as f64 * (1.0 - state.config.execution.dedicated_executor_threads_percent)) as usize
} else {
cpus
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW we found it is ok to slightly over commit the CPUs (like if you have 8 CPUs total, it is often fine to have an 8 CPUs in the dedicated worker, and 1 CPU).

For most workloads, leaving a single CPU to process network requests is totally fine (e.g. with 8 CPUs total, using 7 in the dedicated executor is fine as it will leave 1 entire CPU to do most of the real work)

ddl_path: Option<PathBuf>,
/// Dedicated executor for running CPU intensive work
executor: Option<DedicatedExecutor>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might make the code cleaner if you always created a DedicatedExecutor rather than only with the FlightSQL (a possible future enhancement)

Copy link
Collaborator Author

@matthewmturner matthewmturner Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going back and forth on this. I ended up going with this approach so there was still a "default" datafusion experience.

That being said, this is meant to be an opinionated datafusion implementation so it does probably make sense to just have one approach.

I also had in mind doing some benchmarks under different loads - and the baseline would be without the dedicated executor so the current setup would be useful for that.

For the time being I will live with this code smell until I figure out next steps but i do think i will end up getting rid of this.

///
/// # Panic
/// Needs a IO runtime [registered](register_io_runtime).
pub async fn spawn_io<Fut>(fut: Fut) -> Fut::Output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I don't see in this PR is actually calling spawn_io - if this isn't called, then during an ExecutionPlan that is run on the dedicated executor, the IO will be done on the DedicatedExector's threadpool

Here is where influxdb3 calls this:

https://github.com/influxdata/influxdb3_core/blob/6fcbb004232738d55655f32f4ad2385523d10696/iox_query/src/physical_optimizer/cached_parquet_data.rs#L140

I think a better idea is likely to create some sort of ObjectStore wrapper that forwards all network calls to the IO runtime. I'll try and work up some version of that in the DataFusion example.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this reference, very helpful

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb i understand iOX heavily uses parquet files but based on what i see that means any other types of files would be run on the DedicatedExecutor - correct? I think I will need to come up with a more general purpose solution as the idea with dft is to be general purpose / work with all datafusion supported file types (CSV, JSON, Arrow IPC, etc) - perhaps i will create a wrapping ObjectStore where all calls to the underlying ObjectStore use spawn_io.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to add this ObjectStore wrapper in a follow on PR. Then I think this wrapper ObjectStore could be used anywhere an ObjectStore is needed - like the ParquetFileReaderFactory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome -- thank you -- I think your idea is 👌 very good

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it!

@matthewmturner matthewmturner merged commit e8c8e20 into datafusion-contrib:main Nov 19, 2024
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Separate runtime for IO / CPU bound tasks
3 participants