Skip to content

Commit

Permalink
pageserver: do vectored read on each dio-aligned section once (#8763)
Browse files Browse the repository at this point in the history
Part of #8130, closes #8719.

## Problem

Currently, vectored blob io only coalesce blocks if they are immediately
adjacent to each other. When we switch to Direct IO, we need a way to
coalesce blobs that are within the dio-aligned boundary but has gap
between them.

## Summary of changes

- Introduces a `VectoredReadCoalesceMode` for `VectoredReadPlanner` and
`StreamingVectoredReadPlanner` which has two modes:
  - `AdjacentOnly` (current implementation)
  - `Chunked(<alignment requirement>)`
- New `ChunkedVectorBuilder` that considers batching `dio-align`-sized
read, the start and end of the vectored read will respect
`stx_dio_offset_align` / `stx_dio_mem_align` (`vectored_read.start` and
`vectored_read.blobs_at.first().start_offset` will be two different
value).
- Since we break the assumption that blobs within single `VectoredRead`
are next to each other (implicit end offset), we start to store blob end
offsets in the `VectoredRead`.
- Adapted existing tests to run in both `VectoredReadCoalesceMode`.
- The io alignment can also be live configured at runtime.

Signed-off-by: Yuchen Liang <[email protected]>
  • Loading branch information
yliang412 committed Aug 28, 2024
1 parent 4355dba commit 714594c
Show file tree
Hide file tree
Showing 16 changed files with 480 additions and 52 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/_build-and-test-locally.yml
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ jobs:
${cov_prefix} cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
for io_buffer_alignment in 0 1 512 ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine NEON_PAGESERVER_UNIT_TEST_IO_BUFFER_ALIGNMENT=$io_buffer_alignment ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
done
done
# Run separate tests for real S3
Expand Down
8 changes: 6 additions & 2 deletions pageserver/benches/bench_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use camino::Utf8PathBuf;
use criterion::{criterion_group, criterion_main, Criterion};
use pageserver::{
config::PageServerConf,
config::{defaults::DEFAULT_IO_BUFFER_ALIGNMENT, PageServerConf},
context::{DownloadBehavior, RequestContext},
l0_flush::{L0FlushConfig, L0FlushGlobalState},
page_cache,
Expand Down Expand Up @@ -164,7 +164,11 @@ fn criterion_benchmark(c: &mut Criterion) {
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(16384, virtual_file::io_engine_for_bench());
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
DEFAULT_IO_BUFFER_ALIGNMENT,
);
page_cache::init(conf.page_cache_size);

{
Expand Down
10 changes: 10 additions & 0 deletions pageserver/client/src/mgmt_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,16 @@ impl Client {
.map_err(Error::ReceiveBody)
}

/// Configs io buffer alignment at runtime.
pub async fn put_io_alignment(&self, align: usize) -> Result<()> {
let uri = format!("{}/v1/io_alignment", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, align)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}

pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
self.get(uri)
Expand Down
7 changes: 6 additions & 1 deletion pageserver/ctl/src/layer_map_analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
Expand Down Expand Up @@ -144,7 +145,11 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::page_cache::init(100);

let mut total_delta_layers = 0usize;
Expand Down
9 changes: 7 additions & 2 deletions pageserver/ctl/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf};
use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
use clap::Subcommand;
use pageserver::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::block_io::BlockCursor;
Expand Down Expand Up @@ -59,7 +60,7 @@ pub(crate) enum LayerCmd {

async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, 1);
page_cache::init(100);
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
Expand Down Expand Up @@ -189,7 +190,11 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::page_cache::init(100);

let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
Expand Down
7 changes: 6 additions & 1 deletion pageserver/ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use clap::{Parser, Subcommand};
use index_part::IndexPartCmd;
use layers::LayerCmd;
use pageserver::{
config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
context::{DownloadBehavior, RequestContext},
page_cache,
task_mgr::TaskKind,
Expand Down Expand Up @@ -205,7 +206,11 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {

async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await
Expand Down
9 changes: 9 additions & 0 deletions pageserver/pagebench/src/cmd/getpage_latest_lsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ pub(crate) struct Args {
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
#[clap(long)]
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,

/// Before starting the benchmark, live-reconfigure the pageserver to use specified alignment for io buffers.
#[clap(long)]
set_io_alignment: Option<usize>,

targets: Option<Vec<TenantTimelineId>>,
}

Expand Down Expand Up @@ -124,6 +129,10 @@ async fn main_impl(
mgmt_api_client.put_io_engine(engine_str).await?;
}

if let Some(align) = args.set_io_alignment {
mgmt_api_client.put_io_alignment(align).await?;
}

// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,
Expand Down
7 changes: 6 additions & 1 deletion pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");

let tenants_path = conf.tenants_path();
if !tenants_path.exists() {
Expand All @@ -136,7 +137,11 @@ fn main() -> anyhow::Result<()> {
let scenario = failpoint_support::init();

// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
virtual_file::init(
conf.max_file_descriptors,
conf.virtual_file_io_engine,
conf.io_buffer_alignment,
);
page_cache::init(conf.page_cache_size);

start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
Expand Down
18 changes: 18 additions & 0 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pub mod defaults {

pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;

pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 0;

///
/// Default built-in configuration file.
///
Expand Down Expand Up @@ -289,6 +291,8 @@ pub struct PageServerConf {

/// Direct IO settings
pub virtual_file_direct_io: virtual_file::DirectIoMode,

pub io_buffer_alignment: usize,
}

/// We do not want to store this in a PageServerConf because the latter may be logged
Expand Down Expand Up @@ -393,6 +397,8 @@ struct PageServerConfigBuilder {
compact_level0_phase1_value_access: BuilderValue<CompactL0Phase1ValueAccess>,

virtual_file_direct_io: BuilderValue<virtual_file::DirectIoMode>,

io_buffer_alignment: BuilderValue<usize>,
}

impl PageServerConfigBuilder {
Expand Down Expand Up @@ -481,6 +487,7 @@ impl PageServerConfigBuilder {
l0_flush: Set(L0FlushConfig::default()),
compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()),
virtual_file_direct_io: Set(virtual_file::DirectIoMode::default()),
io_buffer_alignment: Set(DEFAULT_IO_BUFFER_ALIGNMENT),
}
}
}
Expand Down Expand Up @@ -660,6 +667,10 @@ impl PageServerConfigBuilder {
self.virtual_file_direct_io = BuilderValue::Set(value);
}

pub fn io_buffer_alignment(&mut self, value: usize) {
self.io_buffer_alignment = BuilderValue::Set(value);
}

pub fn build(self, id: NodeId) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();

Expand Down Expand Up @@ -716,6 +727,7 @@ impl PageServerConfigBuilder {
l0_flush,
compact_level0_phase1_value_access,
virtual_file_direct_io,
io_buffer_alignment,
}
CUSTOM LOGIC
{
Expand Down Expand Up @@ -985,6 +997,9 @@ impl PageServerConf {
"virtual_file_direct_io" => {
builder.virtual_file_direct_io(utils::toml_edit_ext::deserialize_item(item).context("virtual_file_direct_io")?)
}
"io_buffer_alignment" => {
builder.io_buffer_alignment(parse_toml_u64("io_buffer_alignment", item)? as usize)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
Expand Down Expand Up @@ -1068,6 +1083,7 @@ impl PageServerConf {
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
virtual_file_direct_io: virtual_file::DirectIoMode::default(),
io_buffer_alignment: defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
}
}
}
Expand Down Expand Up @@ -1308,6 +1324,7 @@ background_task_maximum_delay = '334 s'
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
virtual_file_direct_io: virtual_file::DirectIoMode::default(),
io_buffer_alignment: defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
},
"Correct defaults should be used when no config values are provided"
);
Expand Down Expand Up @@ -1381,6 +1398,7 @@ background_task_maximum_delay = '334 s'
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
virtual_file_direct_io: virtual_file::DirectIoMode::default(),
io_buffer_alignment: defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
},
"Should be able to parse all basic config values correctly"
);
Expand Down
17 changes: 17 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2325,6 +2325,20 @@ async fn put_io_engine_handler(
json_response(StatusCode::OK, ())
}

async fn put_io_alignment_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let align: usize = json_request(&mut r).await?;
crate::virtual_file::set_io_buffer_alignment(align).map_err(|align| {
ApiError::PreconditionFailed(
format!("Requested io alignment ({align}) is not a power of two").into(),
)
})?;
json_response(StatusCode::OK, ())
}

/// Polled by control plane.
///
/// See [`crate::utilization`].
Expand Down Expand Up @@ -3012,6 +3026,9 @@ pub fn make_router(
|r| api_handler(r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put("/v1/io_alignment", |r| {
api_handler(r, put_io_alignment_handler)
})
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),
Expand Down
6 changes: 4 additions & 2 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
VectoredReadCoalesceMode, VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
Expand Down Expand Up @@ -1207,6 +1207,7 @@ impl DeltaLayerInner {
let mut prev: Option<(Key, Lsn, BlobRef)> = None;

let mut read_builder: Option<VectoredReadBuilder> = None;
let read_mode = VectoredReadCoalesceMode::get();

let max_read_size = self
.max_vectored_read_bytes
Expand Down Expand Up @@ -1255,6 +1256,7 @@ impl DeltaLayerInner {
offsets.end.pos(),
meta,
max_read_size,
read_mode,
))
}
} else {
Expand Down Expand Up @@ -2283,7 +2285,7 @@ pub(crate) mod test {
.await
.unwrap();
let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
for max_read_size in [1, 1024] {
for max_read_size in [1, 2048] {
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
// Test if the batch size is correctly determined
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ mod test {
.await
.unwrap();
let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
for max_read_size in [1, 1024] {
for max_read_size in [1, 2048] {
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
// Test if the batch size is correctly determined
Expand Down
Loading

0 comments on commit 714594c

Please sign in to comment.