From 6e02c5cb258b784b00d5769c08cc5ddacc258bed Mon Sep 17 00:00:00 2001 From: Pierre Barre Date: Mon, 9 Dec 2024 09:53:11 +0100 Subject: [PATCH] Make `NUM_MERGE_THREADS` configurable (#2535) * Make `NUM_MERGE_THREADS` configurable * Remove unused import * Reword comment src/index/index.rs Co-authored-by: PSeitz --------- Co-authored-by: PSeitz --- src/index/index.rs | 24 +++++++++++++++++++++++- src/indexer/index_writer.rs | 13 +++++++++++-- src/indexer/segment_updater.rs | 5 ++--- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/index/index.rs b/src/index/index.rs index 052bc4f920..94882951ed 100644 --- a/src/index/index.rs +++ b/src/index/index.rs @@ -24,6 +24,8 @@ use crate::schema::{Field, FieldType, Schema}; use crate::tokenizer::{TextAnalyzer, TokenizerManager}; use crate::SegmentReader; +const DEFAULT_NUM_MERGE_THREADS: usize = 4; + fn load_metas( directory: &dyn Directory, inventory: &SegmentMetaInventory, @@ -538,10 +540,11 @@ impl Index { /// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`. /// If the memory arena per thread is too small or too big, returns /// `TantivyError::InvalidArgument` - pub fn writer_with_num_threads( + pub fn writer_with_num_threads_and_num_merge_threads( &self, num_threads: usize, overall_memory_budget_in_bytes: usize, + num_merge_threads: usize, ) -> crate::Result> { let directory_lock = self .directory @@ -563,6 +566,25 @@ impl Index { num_threads, memory_arena_in_bytes_per_thread, directory_lock, + num_merge_threads, + ) + } + + /// Creates a multithreaded writer with 4 merge threads. + /// + /// # Errors + /// If the lockfile already exists, returns `Error::FileAlreadyExists`. + /// If the memory arena per thread is too small or too big, returns + /// `TantivyError::InvalidArgument` + pub fn writer_with_num_threads( + &self, + num_threads: usize, + overall_memory_budget_in_bytes: usize, + ) -> crate::Result> { + self.writer_with_num_threads_and_num_merge_threads( + num_threads, + overall_memory_budget_in_bytes, + DEFAULT_NUM_MERGE_THREADS, ) } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 6719afe9f9..c7c8ba740e 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -71,6 +71,7 @@ pub struct IndexWriter { worker_id: usize, num_threads: usize, + num_merge_threads: usize, delete_queue: DeleteQueue, @@ -268,6 +269,7 @@ impl IndexWriter { num_threads: usize, memory_budget_in_bytes_per_thread: usize, directory_lock: DirectoryLock, + num_merge_threads: usize, ) -> crate::Result { if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN { let err_msg = format!( @@ -291,8 +293,12 @@ impl IndexWriter { let stamper = Stamper::new(current_opstamp); - let segment_updater = - SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?; + let segment_updater = SegmentUpdater::create( + index.clone(), + stamper.clone(), + &delete_queue.cursor(), + num_merge_threads, + )?; let mut index_writer = Self { _directory_lock: Some(directory_lock), @@ -307,6 +313,8 @@ impl IndexWriter { workers_join_handle: vec![], num_threads, + num_merge_threads, + delete_queue, committed_opstamp: current_opstamp, @@ -558,6 +566,7 @@ impl IndexWriter { self.num_threads, self.memory_budget_in_bytes_per_thread, directory_lock, + self.num_merge_threads, )?; // the current `self` is dropped right away because of this call. diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index ba987c18da..b2fa1d648f 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -25,8 +25,6 @@ use crate::indexer::{ }; use crate::{FutureResult, Opstamp}; -const NUM_MERGE_THREADS: usize = 4; - /// Save the index meta file. /// This operation is atomic: /// Either @@ -273,6 +271,7 @@ impl SegmentUpdater { index: Index, stamper: Stamper, delete_cursor: &DeleteCursor, + num_merge_threads: usize, ) -> crate::Result { let segments = index.searchable_segment_metas()?; let segment_manager = SegmentManager::from_segments(segments, delete_cursor); @@ -287,7 +286,7 @@ impl SegmentUpdater { })?; let merge_thread_pool = ThreadPoolBuilder::new() .thread_name(|i| format!("merge_thread_{i}")) - .num_threads(NUM_MERGE_THREADS) + .num_threads(num_merge_threads) .build() .map_err(|_| { crate::TantivyError::SystemError(