Skip to content

Commit

Permalink
feat(memory): cherry-pick #19372 and #19059 to branch release-2.1 (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Jan 2, 2025
1 parent de6318b commit 3a91f87
Show file tree
Hide file tree
Showing 29 changed files with 832 additions and 631 deletions.
5 changes: 3 additions & 2 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,15 +408,15 @@ mod test {
let raw_opts = "
--compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/
--meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/
--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/
--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368
--prometheus-listener-addr=127.0.0.1:1234
--config-path=src/config/test.toml
";
let actual = StandaloneOpts::parse_from(raw_opts.lines());
let opts = StandaloneOpts {
compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/".into()),
meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/".into()),
frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/".into()),
frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368".into() ),
compactor_opts: None,
prometheus_listener_addr: Some("127.0.0.1:1234".into()),
config_path: Some("src/config/test.toml".into()),
Expand Down Expand Up @@ -508,6 +508,7 @@ mod test {
metrics_level: None,
enable_barrier_read: None,
temp_secret_file_dir: "./frontend/secrets/",
frontend_total_memory_bytes: 34359738368,
},
),
compactor_opts: None,
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ pub async fn compute_node_serve(
compactor_context,
hummock_meta_client.clone(),
storage.sstable_object_id_manager().clone(),
storage.filter_key_extractor_manager().clone(),
storage.compaction_catalog_manager_ref().clone(),
);
sub_tasks.push((handle, shutdown_sender));
}
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub mod session;
mod stream_fragmenter;
use risingwave_common::config::{MetricLevel, OverrideConfig};
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::util::tokio_util::sync::CancellationToken;
pub use stream_fragmenter::build_graph;
mod utils;
Expand Down Expand Up @@ -160,6 +161,10 @@ pub struct FrontendOpts {
default_value = "./secrets"
)]
pub temp_secret_file_dir: String,

/// Total available memory for the frontend node in bytes. Used by both computing and storage.
#[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())]
pub frontend_total_memory_bytes: usize,
}

impl risingwave_common::opts::Opts for FrontendOpts {
Expand Down Expand Up @@ -221,3 +226,7 @@ pub fn start(
.unwrap()
})
}

pub fn default_frontend_total_memory_bytes() -> usize {
system_memory_available_bytes()
}
4 changes: 2 additions & 2 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::telemetry_env_enabled;
use risingwave_common::types::DataType;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::cluster_limit;
use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::{cluster_limit, resource_util};
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_common_service::{MetricsManager, ObserverManager};
Expand Down Expand Up @@ -444,7 +444,7 @@ impl FrontendEnv {
.map_err(|err| anyhow!(err))?;
}

let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
let total_memory_bytes = opts.frontend_total_memory_bytes;
let heap_profiler =
HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
// Run a background heap profiler
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ pub async fn setup_compute_env_with_metric(
compactor_streams_change_tx,
)
.await;

let fake_host_address = HostAddress {
host: "127.0.0.1".to_string(),
port,
Expand Down
28 changes: 22 additions & 6 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -32,6 +33,7 @@ use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::{InMemObjectStore, ObjectStore, ObjectStoreImpl};
use risingwave_pb::hummock::compact_task::PbTaskType;
use risingwave_pb::hummock::PbTableSchema;
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::compactor::compactor_runner::compact_and_build_sst;
use risingwave_storage::hummock::compactor::{
ConcatSstableIterator, DummyCompactionFilter, TaskConfig, TaskProgress,
Expand Down Expand Up @@ -133,8 +135,13 @@ async fn build_table(
policy: CachePolicy::Fill(CacheContext::Default),
},
);
let mut builder =
SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt);
let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
);
let value = b"1234567890123456789";
let mut full_key = test_key_of(0, epoch, TableId::new(0));
let table_key_len = full_key.user_key.table_key.len();
Expand Down Expand Up @@ -177,8 +184,14 @@ async fn build_table_2(
policy: CachePolicy::Fill(CacheContext::Default),
},
);
let mut builder =
SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt);

let table_id_to_vnode = HashMap::from_iter(vec![(table_id, VirtualNode::COUNT_FOR_TEST)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
);
let mut full_key = test_key_of(0, epoch, TableId::new(table_id));
let table_key_len = full_key.user_key.table_key.len();

Expand Down Expand Up @@ -273,8 +286,11 @@ async fn compact<I: HummockIterator<Direction = Forward>>(
bloom_false_positive: 0.001,
..Default::default()
};
let mut builder =
CapacitySplitTableBuilder::for_test(LocalTableBuilderFactory::new(32, sstable_store, opt));
let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
let mut builder = CapacitySplitTableBuilder::for_test(
LocalTableBuilderFactory::new(32, sstable_store, opt),
compaction_catalog_agent_ref,
);

let task_config = task_config.unwrap_or_else(|| TaskConfig {
key_range: KeyRange::inf(),
Expand Down
16 changes: 15 additions & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::env;
use std::ops::Range;
use std::sync::atomic::AtomicU64;
Expand All @@ -24,11 +25,13 @@ use foyer::{Engine, HybridCacheBuilder};
use rand::random;
use risingwave_common::catalog::TableId;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_object_store::object::{
InMemObjectStore, ObjectStore, ObjectStoreImpl, S3ObjectStore,
};
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::iterator::{ConcatIterator, ConcatIteratorInner, HummockIterator};
use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use risingwave_storage::hummock::value::HummockValue;
Expand Down Expand Up @@ -83,7 +86,11 @@ impl<F: SstableWriterFactory> TableBuilderFactory for LocalTableBuilderFactory<F
.create_sst_writer(id, writer_options)
.await
.unwrap();
let builder = SstableBuilder::for_test(id, writer, self.options.clone());
let table_id_to_vnode = HashMap::from_iter(vec![(
TableId::default().into(),
VirtualNode::COUNT_FOR_TEST,
)]);
let builder = SstableBuilder::for_test(id, writer, self.options.clone(), table_id_to_vnode);

Ok(builder)
}
Expand Down Expand Up @@ -192,6 +199,8 @@ fn bench_builder(

let sstable_store = runtime.block_on(async { generate_sstable_store(object_store).await });

let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);

let mut group = c.benchmark_group("bench_multi_builder");
group
.sample_size(SAMPLE_COUNT)
Expand All @@ -205,6 +214,7 @@ fn bench_builder(
StreamingSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
})
});
Expand All @@ -217,6 +227,7 @@ fn bench_builder(
BatchSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
})
});
Expand Down Expand Up @@ -249,13 +260,16 @@ fn bench_table_scan(c: &mut Criterion) {
let object_store = Arc::new(ObjectStoreImpl::InMem(store));
let sstable_store = runtime.block_on(async { generate_sstable_store(object_store).await });

let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);

let ssts = runtime.block_on(async {
build_tables(CapacitySplitTableBuilder::for_test(
LocalTableBuilderFactory::new(
1,
BatchSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
.await
});
Expand Down
29 changes: 9 additions & 20 deletions src/storage/compactor/src/compactor_observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::ObserverState;
use risingwave_pb::catalog::Table;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::SubscribeResponse;
use risingwave_storage::filter_key_extractor::{
FilterKeyExtractorImpl, FilterKeyExtractorManagerRef,
};
use risingwave_storage::compaction_catalog_manager::CompactionCatalogManagerRef;

pub struct CompactorObserverNode {
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
compaction_catalog_manager: CompactionCatalogManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
version: u64,
}
Expand Down Expand Up @@ -83,36 +78,30 @@ impl ObserverState for CompactorObserverNode {

impl CompactorObserverNode {
pub fn new(
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
compaction_catalog_manager: CompactionCatalogManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
) -> Self {
Self {
filter_key_extractor_manager,
compaction_catalog_manager,
system_params_manager,
version: 0,
}
}

fn handle_catalog_snapshot(&mut self, tables: Vec<Table>) {
let all_filter_key_extractors: HashMap<u32, Arc<FilterKeyExtractorImpl>> = tables
.iter()
.map(|t| (t.id, Arc::new(FilterKeyExtractorImpl::from_table(t))))
.collect();
self.filter_key_extractor_manager
.sync(all_filter_key_extractors);
self.compaction_catalog_manager
.sync(tables.into_iter().map(|t| (t.id, t)).collect());
}

fn handle_catalog_notification(&mut self, operation: Operation, table_catalog: Table) {
match operation {
Operation::Add | Operation::Update => {
self.filter_key_extractor_manager.update(
table_catalog.id,
Arc::new(FilterKeyExtractorImpl::from_table(&table_catalog)),
);
self.compaction_catalog_manager
.update(table_catalog.id, table_catalog);
}

Operation::Delete => {
self.filter_key_extractor_manager.remove(table_catalog.id);
self.compaction_catalog_manager.remove(table_catalog.id);
}

_ => panic!("receive an unsupported notify {:?}", operation),
Expand Down
9 changes: 9 additions & 0 deletions src/storage/compactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_common::config::{
AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig,
};
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::util::tokio_util::sync::CancellationToken;

use crate::server::{compactor_serve, shared_compactor_serve};
Expand Down Expand Up @@ -92,6 +93,10 @@ pub struct CompactorOpts {

#[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
pub proxy_rpc_endpoint: String,

/// Total available memory for the frontend node in bytes. Used by compactor.
#[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())]
pub compactor_total_memory_bytes: usize,
}

impl risingwave_common::opts::Opts for CompactorOpts {
Expand Down Expand Up @@ -143,3 +148,7 @@ pub fn start(
}),
}
}

pub fn default_compactor_total_memory_bytes() -> usize {
system_memory_available_bytes()
}
Loading

0 comments on commit 3a91f87

Please sign in to comment.