From 3b9535b507ba15f7f16cbd9fba6e7d2135eb3c04 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 22 Dec 2023 18:40:31 +0100 Subject: [PATCH 1/4] runtime: only mitigate false sharing for multi-threaded runtimes --- .../runtime/scheduler/multi_thread/worker.rs | 11 ++ .../scheduler/multi_thread_alt/worker.rs | 11 ++ tokio/src/runtime/task/core.rs | 96 +------------- tokio/src/runtime/task/harness.rs | 4 +- tokio/src/runtime/task/mod.rs | 12 ++ tokio/src/runtime/task/raw.rs | 7 +- tokio/src/runtime/task/task_box.rs | 124 ++++++++++++++++++ tokio/src/util/cacheline.rs | 27 +++- 8 files changed, 194 insertions(+), 98 deletions(-) create mode 100644 tokio/src/runtime/task/task_box.rs diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 313e2ea68f7..d652f5416ec 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -1014,6 +1014,17 @@ impl task::Schedule for Arc { fn yield_now(&self, task: Notified) { self.schedule_task(task, true); } + + fn min_align(&self) -> usize { + use crate::util::cacheline::CachePadded; + + // One for single-threaded runtime, otherwise use a high value to avoid + // false sharing. + match self.shared.remotes.len() { + 1 => 1, + _ => std::mem::align_of::>(), + } + } } impl Handle { diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 8d16418a80b..4af2d65f8be 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -1553,6 +1553,17 @@ impl task::Schedule for Arc { fn yield_now(&self, task: Notified) { self.shared.schedule_task(task, true); } + + fn min_align(&self) -> usize { + use crate::util::cacheline::CachePadded; + + // One for single-threaded runtime, otherwise use a high value to avoid + // false sharing. + match self.shared.remotes.len() { + 1 => 1, + _ => std::mem::align_of::>(), + } + } } impl AsMut for Synced { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 1903a01aa41..b9c3ab6d220 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -14,7 +14,7 @@ use crate::loom::cell::UnsafeCell; use crate::runtime::context; use crate::runtime::task::raw::{self, Vtable}; use crate::runtime::task::state::State; -use crate::runtime::task::{Id, Schedule}; +use crate::runtime::task::{Id, Schedule, TaskBox}; use crate::util::linked_list; use std::num::NonZeroU64; @@ -30,87 +30,6 @@ use std::task::{Context, Poll, Waker}; /// Any changes to the layout of this struct _must_ also be reflected in the /// const fns in raw.rs. /// -// # This struct should be cache padded to avoid false sharing. The cache padding rules are copied -// from crossbeam-utils/src/cache_padded.rs -// -// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache -// lines at a time, so we have to align to 128 bytes rather than 64. -// -// Sources: -// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf -// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 -// -// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. -// -// Sources: -// - https://www.mono-project.com/news/2016/09/12/arm64-icache/ -// -// powerpc64 has 128-byte cache line size. -// -// Sources: -// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 -#[cfg_attr( - any( - target_arch = "x86_64", - target_arch = "aarch64", - target_arch = "powerpc64", - ), - repr(align(128)) -)] -// arm, mips, mips64, sparc, and hexagon have 32-byte cache line size. -// -// Sources: -// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 -// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 -// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 -// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 -// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 -// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 -#[cfg_attr( - any( - target_arch = "arm", - target_arch = "mips", - target_arch = "mips64", - target_arch = "sparc", - target_arch = "hexagon", - ), - repr(align(32)) -)] -// m68k has 16-byte cache line size. -// -// Sources: -// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9 -#[cfg_attr(target_arch = "m68k", repr(align(16)))] -// s390x has 256-byte cache line size. -// -// Sources: -// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 -// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 -#[cfg_attr(target_arch = "s390x", repr(align(256)))] -// x86, riscv, wasm, and sparc64 have 64-byte cache line size. -// -// Sources: -// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 -// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 -// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 -// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10 -// -// All others are assumed to have 64-byte cache line size. -#[cfg_attr( - not(any( - target_arch = "x86_64", - target_arch = "aarch64", - target_arch = "powerpc64", - target_arch = "arm", - target_arch = "mips", - target_arch = "mips64", - target_arch = "sparc", - target_arch = "hexagon", - target_arch = "m68k", - target_arch = "s390x", - )), - repr(align(64)) -)] #[repr(C)] pub(super) struct Cell { /// Hot task state data @@ -205,7 +124,7 @@ pub(super) enum Stage { impl Cell { /// Allocates a new task cell, containing the header, trailer, and core /// structures. - pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box> { + pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> TaskBox { // Separated into a non-generic function to reduce LLVM codegen fn new_header( state: State, @@ -225,22 +144,21 @@ impl Cell { #[cfg(all(tokio_unstable, feature = "tracing"))] let tracing_id = future.id(); let vtable = raw::vtable::(); - let result = Box::new(Cell { - header: new_header( + let result = TaskBox::new( + new_header( state, vtable, #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, ), - core: Core { + Core { scheduler, stage: CoreStage { stage: UnsafeCell::new(Stage::Running(future)), }, task_id, }, - trailer: Trailer::new(), - }); + ); #[cfg(debug_assertions)] { @@ -459,7 +377,7 @@ impl Header { } impl Trailer { - fn new() -> Self { + pub(super) fn new() -> Self { Trailer { waker: UnsafeCell::new(None), owned: linked_list::Pointers::new(), diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 8bfd57e6fbf..d029a45dd5d 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -2,7 +2,7 @@ use crate::future::Future; use crate::runtime::task::core::{Cell, Core, Header, Trailer}; use crate::runtime::task::state::{Snapshot, State}; use crate::runtime::task::waker::waker_ref; -use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task}; +use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task, TaskBox}; use std::any::Any; use std::mem; @@ -269,7 +269,7 @@ where // are allowed to be dangling after their last use, even if the // reference has not yet gone out of scope. unsafe { - drop(Box::from_raw(self.cell.as_ptr())); + drop(TaskBox::from_raw(self.cell)); } } diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 0bd40cd875c..eeb589f75be 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -200,6 +200,9 @@ pub(crate) use self::raw::RawTask; mod state; use self::state::State; +mod task_box; +use self::task_box::TaskBox; + mod waker; cfg_taskdump! { @@ -272,6 +275,15 @@ pub(crate) trait Schedule: Sync + Sized + 'static { self.schedule(task); } + /// The minimum alignment for tasks spawned on this runtime. + /// + /// This is used by the multi-threaded runtime to avoid false sharing. + /// + /// The same scheduler must always return the same value. + fn min_align(&self) -> usize { + 1 + } + /// Polling the task resulted in a panic. Should the runtime shutdown? fn unhandled_panic(&self) { // By default, do nothing. This maintains the 1.0 behavior. diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index 6699551f3ec..ea55bddddf4 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -1,6 +1,6 @@ use crate::future::Future; use crate::runtime::task::core::{Core, Trailer}; -use crate::runtime::task::{Cell, Harness, Header, Id, Schedule, State}; +use crate::runtime::task::{Cell, Harness, Header, Id, Schedule, State, TaskBox}; use std::ptr::NonNull; use std::task::{Poll, Waker}; @@ -162,10 +162,9 @@ impl RawTask { T: Future, S: Schedule, { - let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id)); - let ptr = unsafe { NonNull::new_unchecked(ptr.cast()) }; + let ptr = TaskBox::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id)); - RawTask { ptr } + RawTask { ptr: ptr.cast() } } pub(super) unsafe fn from_raw(ptr: NonNull
) -> RawTask { diff --git a/tokio/src/runtime/task/task_box.rs b/tokio/src/runtime/task/task_box.rs new file mode 100644 index 00000000000..408e73d1795 --- /dev/null +++ b/tokio/src/runtime/task/task_box.rs @@ -0,0 +1,124 @@ +//! Helper module for allocating and deallocating tasks. + +use crate::runtime::task::core::{Cell, Core, Header, Trailer}; +use crate::runtime::task::Schedule; + +use std::alloc::{alloc, dealloc, handle_alloc_error, Layout}; +use std::future::Future; +use std::marker::PhantomData; +use std::mem::{align_of, size_of, ManuallyDrop}; +use std::ptr::{drop_in_place, NonNull}; + +fn layout_of(scheduler: &S) -> Layout { + let size = std::mem::size_of::>(); + let mut align = std::mem::align_of::>(); + let min_align = scheduler.min_align(); + if align < min_align { + align = min_align; + } + match Layout::from_size_align(size, align) { + Ok(layout) => layout, + Err(_) => panic!("Failed to build layout of type."), + } +} + +/// A `Box>` with an alignment of at least `s.min_align()`. +pub(super) struct TaskBox { + ptr: NonNull>, + _phantom: PhantomData>, +} + +impl TaskBox { + /// Creates a new task allocation. + pub(super) fn new(header: Header, core: Core) -> Self { + let layout = layout_of::(&core.scheduler); + + assert_eq!(size_of::>(), layout.size()); + assert_ne!(size_of::>(), 0); + assert!(align_of::>() <= layout.align()); + + // SAFETY: The size of `layout` is non-zero as checked above. + let ptr = unsafe { alloc(layout) } as *mut Cell; + + let ptr = match NonNull::new(ptr) { + Some(ptr) => ptr, + None => handle_alloc_error(layout), + }; + + // SAFETY: We just allocated memory with the same size and a compatible + // alignment for `Cell`. + unsafe { + ptr.as_ptr().write(Cell { + header, + core, + trailer: Trailer::new(), + }); + }; + + Self { + ptr, + _phantom: PhantomData, + } + } + + /// Convert this allocation into a raw pointer. + pub(super) fn into_raw(self) -> NonNull> { + let me = ManuallyDrop::new(self); + me.ptr + } + + /// Convert this allocation back into a `TaskBox`. + /// + /// # Safety + /// + /// The provided pointer must originate from a previous call to `into_raw`, + /// and the raw pointer must not be used again after this call. + pub(super) unsafe fn from_raw(ptr: NonNull>) -> Self { + Self { + ptr, + _phantom: PhantomData, + } + } +} + +impl std::ops::Deref for TaskBox { + type Target = Cell; + + fn deref(&self) -> &Cell { + // SAFETY: This box always points at a valid cell. + unsafe { &*self.ptr.as_ptr() } + } +} + +impl Drop for TaskBox { + fn drop(&mut self) { + let ptr = self.ptr.as_ptr(); + + // SAFETY: The task is still valid, so we can dereference the pointer. + let layout = layout_of::(unsafe { &(*ptr).core.scheduler }); + + // SAFETY: The pointer was allocated with this layout. (The return value + // of `min_align` doesn't change.) + let _drop_helper = DropHelper { + layout, + ptr: ptr as *mut u8, + }; + + // SAFETY: A task box contains a pointer to a valid cell, and we have + // not dropped the allocation yet. + unsafe { drop_in_place(self.ptr.as_ptr()) }; + } +} + +struct DropHelper { + ptr: *mut u8, + layout: Layout, +} + +impl Drop for DropHelper { + #[inline] + fn drop(&mut self) { + // SAFETY: See `TaskBox::drop`. + unsafe { dealloc(self.ptr, self.layout) }; + } +} diff --git a/tokio/src/util/cacheline.rs b/tokio/src/util/cacheline.rs index b34004c1ac1..e29834a179c 100644 --- a/tokio/src/util/cacheline.rs +++ b/tokio/src/util/cacheline.rs @@ -3,6 +3,9 @@ use std::ops::{Deref, DerefMut}; /// Pads and aligns a value to the length of a cache line. #[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] +// # This struct should be cache padded to avoid false sharing. The cache padding rules are copied +// from crossbeam-utils/src/cache_padded.rs +// // Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache // lines at a time, so we have to align to 128 bytes rather than 64. // @@ -27,27 +30,42 @@ use std::ops::{Deref, DerefMut}; ), repr(align(128)) )] -// arm, mips and mips64 have 32-byte cache line size. +// arm, mips, mips64, sparc, and hexagon have 32-byte cache line size. // // Sources: // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 #[cfg_attr( - any(target_arch = "arm", target_arch = "mips", target_arch = "mips64",), + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "sparc", + target_arch = "hexagon", + ), repr(align(32)) )] +// m68k has 16-byte cache line size. +// +// Sources: +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9 +#[cfg_attr(target_arch = "m68k", repr(align(16)))] // s390x has 256-byte cache line size. // // Sources: // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 #[cfg_attr(target_arch = "s390x", repr(align(256)))] -// x86, riscv and wasm have 64-byte cache line size. +// x86, riscv, wasm, and sparc64 have 64-byte cache line size. // // Sources: // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10 // // All others are assumed to have 64-byte cache line size. @@ -59,6 +77,9 @@ use std::ops::{Deref, DerefMut}; target_arch = "arm", target_arch = "mips", target_arch = "mips64", + target_arch = "sparc", + target_arch = "hexagon", + target_arch = "m68k", target_arch = "s390x", )), repr(align(64)) From 67e2d9d47210801d0d829d2c113a9c1ad7314fc6 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 22 Dec 2023 18:53:30 +0100 Subject: [PATCH 2/4] Fix Future import --- tokio/src/runtime/task/task_box.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/task/task_box.rs b/tokio/src/runtime/task/task_box.rs index 408e73d1795..1814c2ed6a3 100644 --- a/tokio/src/runtime/task/task_box.rs +++ b/tokio/src/runtime/task/task_box.rs @@ -1,10 +1,10 @@ //! Helper module for allocating and deallocating tasks. +use crate::future::Future; use crate::runtime::task::core::{Cell, Core, Header, Trailer}; use crate::runtime::task::Schedule; use std::alloc::{alloc, dealloc, handle_alloc_error, Layout}; -use std::future::Future; use std::marker::PhantomData; use std::mem::{align_of, size_of, ManuallyDrop}; use std::ptr::{drop_in_place, NonNull}; From 7c3427888c6f73001f14c2563da20b92f8982d7b Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 22 Dec 2023 18:54:38 +0100 Subject: [PATCH 3/4] Mention power of two. --- tokio/src/runtime/task/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index eeb589f75be..04f0a828cc7 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -279,7 +279,7 @@ pub(crate) trait Schedule: Sync + Sized + 'static { /// /// This is used by the multi-threaded runtime to avoid false sharing. /// - /// The same scheduler must always return the same value. + /// The same scheduler must always return the same value. It should be a power of two. fn min_align(&self) -> usize { 1 } From 1c990eeca4a370e2a1d1ab933c10fe0f8bd6ce39 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 22 Dec 2023 23:44:23 +0100 Subject: [PATCH 4/4] Fix clippy --- tokio/src/runtime/task/core.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index b9c3ab6d220..9656ad00667 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -124,6 +124,8 @@ pub(super) enum Stage { impl Cell { /// Allocates a new task cell, containing the header, trailer, and core /// structures. + // Ignore. Return type is reasonable. + #[allow(clippy::new_ret_no_self)] pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> TaskBox { // Separated into a non-generic function to reduce LLVM codegen fn new_header(