Skip to content

Commit

Permalink
Make NUM_MERGE_THREADS configurable (#2535)
Browse files Browse the repository at this point in the history
* Make `NUM_MERGE_THREADS` configurable

* Remove unused import

* Reword comment src/index/index.rs

Co-authored-by: PSeitz <[email protected]>

---------

Co-authored-by: PSeitz <[email protected]>
  • Loading branch information
Barre and PSeitz authored Dec 9, 2024
1 parent 876a579 commit 6e02c5c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
24 changes: 23 additions & 1 deletion src/index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use crate::schema::{Field, FieldType, Schema};
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
use crate::SegmentReader;

const DEFAULT_NUM_MERGE_THREADS: usize = 4;

fn load_metas(
directory: &dyn Directory,
inventory: &SegmentMetaInventory,
Expand Down Expand Up @@ -538,10 +540,11 @@ impl Index {
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer_with_num_threads<D: Document>(
pub fn writer_with_num_threads_and_num_merge_threads<D: Document>(
&self,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
num_merge_threads: usize,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
Expand All @@ -563,6 +566,25 @@ impl Index {
num_threads,
memory_arena_in_bytes_per_thread,
directory_lock,
num_merge_threads,
)
}

/// Creates a multithreaded writer with 4 merge threads.
///
/// # Errors
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer_with_num_threads<D: Document>(
&self,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
self.writer_with_num_threads_and_num_merge_threads(
num_threads,
overall_memory_budget_in_bytes,
DEFAULT_NUM_MERGE_THREADS,
)
}

Expand Down
13 changes: 11 additions & 2 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
worker_id: usize,

num_threads: usize,
num_merge_threads: usize,

delete_queue: DeleteQueue,

Expand Down Expand Up @@ -268,6 +269,7 @@ impl<D: Document> IndexWriter<D> {
num_threads: usize,
memory_budget_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
num_merge_threads: usize,
) -> crate::Result<Self> {
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
let err_msg = format!(
Expand All @@ -291,8 +293,12 @@ impl<D: Document> IndexWriter<D> {

let stamper = Stamper::new(current_opstamp);

let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let segment_updater = SegmentUpdater::create(
index.clone(),
stamper.clone(),
&delete_queue.cursor(),
num_merge_threads,
)?;

let mut index_writer = Self {
_directory_lock: Some(directory_lock),
Expand All @@ -307,6 +313,8 @@ impl<D: Document> IndexWriter<D> {
workers_join_handle: vec![],
num_threads,

num_merge_threads,

delete_queue,

committed_opstamp: current_opstamp,
Expand Down Expand Up @@ -558,6 +566,7 @@ impl<D: Document> IndexWriter<D> {
self.num_threads,
self.memory_budget_in_bytes_per_thread,
directory_lock,
self.num_merge_threads,
)?;

// the current `self` is dropped right away because of this call.
Expand Down
5 changes: 2 additions & 3 deletions src/indexer/segment_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use crate::indexer::{
};
use crate::{FutureResult, Opstamp};

const NUM_MERGE_THREADS: usize = 4;

/// Save the index meta file.
/// This operation is atomic:
/// Either
Expand Down Expand Up @@ -273,6 +271,7 @@ impl SegmentUpdater {
index: Index,
stamper: Stamper,
delete_cursor: &DeleteCursor,
num_merge_threads: usize,
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
Expand All @@ -287,7 +286,7 @@ impl SegmentUpdater {
})?;
let merge_thread_pool = ThreadPoolBuilder::new()
.thread_name(|i| format!("merge_thread_{i}"))
.num_threads(NUM_MERGE_THREADS)
.num_threads(num_merge_threads)
.build()
.map_err(|_| {
crate::TantivyError::SystemError(
Expand Down

0 comments on commit 6e02c5c

Please sign in to comment.