Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using sharded locks instead of a global lock for Timers #6534

Merged
merged 33 commits into from May 22, 2024

Conversation

wathenjiang
Copy link
Contributor

Motivation

As part of addressing #6504, this PR attempts to implement a shard approach to improve timeout and sleep performance in specific scenarios.

When high concurrency, a large number of timeouts or sleeps are registered to Timer, the global lock contention is very severe. By sharding the lock, this performance issue can be significantly reduced.

Solution

  • Thread local storage random number generation is used to achieve sharding.
  • Because the wheel data structure is a multi-layered linked list, the current shard size is consistent with the number of workers. I hope that the creation of a runtime will not require more time, and the serial lock acquisition in the park method will not take too much time.

@github-actions github-actions bot added the R-loom-time-driver Run loom time driver tests on this PR label May 4, 2024
@wathenjiang
Copy link
Contributor Author

wathenjiang commented May 4, 2024

The following code is used for the benchmark:

use std::{
    future::pending,
    time::Duration,
};

fn main() {
    let runtime = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .worker_threads(8)
        .build()
        .unwrap();

    let _r = runtime.block_on(async {
        let mut handles = Vec::with_capacity(1024);
        for _ in 0..1024 {
            handles.push(tokio::spawn(async move {
                loop {
                    let h = timeout(Duration::from_millis(10), never_ready_job(1));
                    let _r = h.await;
                }
            }));
        }
        for handle in handles {
            handle.await.unwrap();
        }
    });
}

// a job never ready
async fn never_ready_job(n: u64) -> u64 {
    pending::<()>().await;
    n * 2
}

The flamegraph of the master branch:

image

The flamegraph of this PR:

image

The above shows a significant reduction in lock contention:

  • parking_lot::raw_mutex::RawMutex::lock_slow has reduced from 6.03% to 0.79% in poll
  • parking_lot::raw_mutex::RawMutex::lock_slow has reduced from 6.32% to 1.24% in clear_entry

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-time Module: tokio/time labels May 4, 2024
Comment on lines 501 to 502
let shard_id =
super::rand::thread_rng_n(self.driver.driver().time().inner.get_shard_size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether we could use the current worker id, when this is called from a runtime thread. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth a try, this is the difference between sleeps and tasks. The timer wheel is more like an mpsc, while the OwnedTasks I optimized before are mpmc. That is to say, when it comes to lock contention in sleep, the most severe one is insert, while clear is much smaller in lock contention. Therefore, making the create operation thread-local is worth trying for sleeps.

However, I still need to point out that it would be useful if our scenario involves creating sleep and timeouts in a worker thread environment. But when it needs to be created in a thread outside of the worker thread, the effect is minimal. If there are enough shards, then the performance consumed by locking is already small enough, such as the 0.79% in the above benchmark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the thread-local logic, as expected, creating a sleep in the worker thread is almost lock free contention.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the thread-local logic, as expected, creating a sleep in the worker thread is almost lock free contention.

Assume the same would be said for creating a timeout in the worker thread? That it would be an almost lock free operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is almost lock-free.

Comment on lines 449 to 453
// Used by `TimerEntry`.
pub(crate) fn thread_rng_n(n: u32) -> u32 {
thread_local! {
static THREAD_RNG: Cell<FastRand> = Cell::new(FastRand::new());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not create new thread locals. Some platforms have small limits on the number of thread locals that each process can have.

There should already be a random number generator in the runtime context somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the context::thread_rng_n instead. But this introduces unexpected additional time features.

If we are not allowed to access 'context:: thread_rng'n' in the time feature, then our other option may be to use std::thread::current().id().as_u64(). However, the latter is unstable. Maybe the hash value of thread id can address this problem.

next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
// Finds out the min expiration time to park.
let mut next_wake: Option<u64> = None;
for id in 0..rt_handle.time().inner.get_shard_size() {
Copy link

@FrankReh FrankReh May 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. What does it mean to the system if the park_internal came up with a new expiration time that wasn't the min? I don't know if other threads could be running when this function is being called, but if one can, couldn't it set a new smaller timer on a shard that the loop had already just checked? But I would have a question about the earlier version too so this is not an important question. Tx.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question.

When the park_internal method is executed, other worker threads may be running or sleeping on condvar. Our concern is that if a smaller expiration time timer is registered, the entire runtime can process it in a timely manner.

Therefore, when Timers are registered, if necessary, we execute unpark.unpark() to wake up the worker thread. Please See

unpark.unpark();

Considering this is a multi-threaded issue, both the current PR version and the master branch version have unnecessary wake-up issues. That is to say, even if the current minimum expiration time has changed, the worker thread still parks on the driver with the older and larger expiration time which is generated by the last check. In this case, the worker thread will be quickly unparked by unpark.unpark().

This is similar to spurious wakeup, but the cost is not high.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are some other running worker threads.

  • If necessary, it will attempt to execute unpark.unpark()
  • Alternatively, the worker thread that was originally responsible for the poll driver is not executed in the timeout way. If there is no event in the driver, it will immediately return from park_internal

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing me to reregister. So any time a task creates a timer entry that happens to create a new earliest time for the timer wheel to fire, the task is rescheduled to be woken up immediately. This PR doesn't change that fact.

And this design favors helping systems that are busy. The busier the system, the more unlikely a new timer entry will be the earliest, as there would likely be other tasks already waiting for similar timeout events that had been created earlier. Only when the system isn't busy is it likely a new entry will often be the wheel's next entry and that's just when a little inefficiency doesn't make any difference.

@github-actions github-actions bot added the R-loom-multi-thread Run loom multi-thread tests on this PR label May 5, 2024
@github-actions github-actions bot added the R-loom-multi-thread-alt Run loom multi-thread alt tests on this PR label May 5, 2024
drop(lock);

waker_list.wake_all();
next_wake_up
Copy link

@FrankReh FrankReh May 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I'm confused that waker_list.wake_all is called potentially multiple times before self.inner.set_next_wake is called and not afterwards, but I will admit I haven't quite caught up my understanding of the implications in the multi threaded case. Never mind. This makes sense to me now; of course the temporary waker_list would be flushed.

@wathenjiang wathenjiang requested a review from Darksonn May 15, 2024 02:16
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

Comment on lines 505 to 520
// Gets the shard id. If current thread is a worker thread, we use its worker index as a shard id.
// Otherwise, we use a random number generator to obtain the shard id.
cfg_rt! {
fn get_shard_id(&self) -> u32 {
let shard_size = self.driver.driver().time().inner.get_shard_size();
let id = context::with_scheduler(|ctx| match ctx {
Some(scheduler::Context::CurrentThread(_ctx)) => 0,
#[cfg(feature = "rt-multi-thread")]
Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32,
_ => context::thread_rng_n(shard_size),
});
id % shard_size
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a getter for a value that is already created, but actually it is the logic for figuring out which shard to use for a new timer. Can we rename this, and perhaps move it to a stand-alone function instead of a method on TimerEntry. Otherwise I am worried that someone will call it and expect it to return the same value as what the TimerShared is using.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this CR. I would like to make it not a method of TimerEntry, and rename it to generate_shard_id.

tokio/src/runtime/time/entry.rs Outdated Show resolved Hide resolved
Comment on lines 96 to 97
/// The earliest time at which we promise to wake up without unparking.
next_wake: AtomicU64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would result in clearer code to wrap this in an AtomicOptionNonZeroU64 utility type? Or at least we should simplify NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it shoud be simplified. I have added a new helper type exactly called AtomicOptionNonZeroU64.

Compared to unwrap_or_else and unwarp, I prefer match here.

}

fn store(&self, val: Option<u64>) {
self.0.store(Self::turn(val), Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stores Some(1) if passed Some(0). Can you add a comment that explains why?

Actually, it might be more clear to change this to Option<NonZeroU64> and move the call to turn to where you call this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add the helper function next_wake_time instead and some explains about it.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Thanks.

tokio/src/runtime/scheduler/multi_thread_alt/worker.rs Outdated Show resolved Hide resolved
@Darksonn Darksonn enabled auto-merge (squash) May 22, 2024 11:31
@Darksonn Darksonn merged commit 1914e1e into tokio-rs:master May 22, 2024
83 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-time Module: tokio/time R-loom-multi-thread Run loom multi-thread tests on this PR R-loom-multi-thread-alt Run loom multi-thread alt tests on this PR R-loom-time-driver Run loom time driver tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants