Skip to content

Commit

Permalink
Add grouped operations (#493)
Browse files Browse the repository at this point in the history
* [WIP] added UserOperation enum, added IndexWriter.run, and added MultiStamp

* removed MultiStamp in favor of std::ops::Range

* changed IndexWriter::run to return u64, Stamper::stamps to return a Range, added tests, and added docs

* changed delete_cursor skipping to use first operation's opstamp vice last. change index_writer test to use 1 thread

* added test for order batch of operations

* added a test comment
  • Loading branch information
elbow-jason authored and fulmicoton committed Feb 13, 2019
1 parent 45e62d4 commit e14701e
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 12 deletions.
172 changes: 160 additions & 12 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::operation::AddOperation;
use super::operation::{AddOperation, UserOperation};
use super::segment_updater::SegmentUpdater;
use super::PreparedCommit;
use bit_set::BitSet;
Expand Down Expand Up @@ -26,6 +26,7 @@ use schema::Document;
use schema::IndexRecordOption;
use schema::Term;
use std::mem;
use std::ops::Range;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
Expand All @@ -43,8 +44,8 @@ pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES;
// reaches `PIPELINE_MAX_SIZE_IN_DOCS`
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;

type DocumentSender = channel::Sender<AddOperation>;
type DocumentReceiver = channel::Receiver<AddOperation>;
type DocumentSender = channel::Sender<Vec<AddOperation>>;
type DocumentReceiver = channel::Receiver<Vec<AddOperation>>;

/// Split the thread memory budget into
/// - the heap size
Expand Down Expand Up @@ -266,19 +267,19 @@ fn index_documents(
memory_budget: usize,
segment: &Segment,
generation: usize,
document_iterator: &mut impl Iterator<Item = AddOperation>,
document_iterator: &mut Iterator<Item = Vec<AddOperation>>,
segment_updater: &mut SegmentUpdater,
mut delete_cursor: DeleteCursor,
) -> Result<bool> {
let schema = segment.schema();
let segment_id = segment.id();
let table_size = initial_table_size(memory_budget);
let mut segment_writer = SegmentWriter::for_segment(table_size, segment.clone(), &schema)?;
for doc in document_iterator {
segment_writer.add_document(doc, &schema)?;

for documents in document_iterator {
for doc in documents {
segment_writer.add_document(doc, &schema)?;
}
let mem_usage = segment_writer.mem_usage();

if mem_usage >= memory_budget - MARGIN_IN_BYTES {
info!(
"Buffer limit reached, flushing segment with maxdoc={}.",
Expand Down Expand Up @@ -409,8 +410,12 @@ impl IndexWriter {
// this is a valid guarantee as the
// peeked document now belongs to
// our local iterator.
if let Some(operation) = document_iterator.peek() {
delete_cursor.skip_to(operation.opstamp);
if let Some(operations) = document_iterator.peek() {
if let Some(first) = operations.first() {
delete_cursor.skip_to(first.opstamp);
} else {
return Ok(());
}
} else {
// No more documents.
// Happens when there is a commit, or if the `IndexWriter`
Expand Down Expand Up @@ -643,25 +648,168 @@ impl IndexWriter {
pub fn add_document(&mut self, document: Document) -> u64 {
let opstamp = self.stamper.stamp();
let add_operation = AddOperation { opstamp, document };
let send_result = self.document_sender.send(add_operation);
let send_result = self.document_sender.send(vec![add_operation]);
if let Err(e) = send_result {
panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e);
}
opstamp
}

/// Gets a range of stamps from the stamper and "pops" the last stamp
/// from the range returning a tuple of the last optstamp and the popped
/// range.
///
/// The total number of stamps generated by this method is `count + 1`;
/// each operation gets a stamp from the `stamps` iterator and `last_opstamp`
/// is for the batch itself.
fn get_batch_opstamps(&mut self, count: u64) -> (u64, Range<u64>) {
let Range { start, end } = self.stamper.stamps(count + 1u64);
let last_opstamp = end - 1;
let stamps = Range {
start: start,
end: last_opstamp,
};
(last_opstamp, stamps)
}

/// Runs a group of document operations ensuring that the operations are
/// assigned contigous u64 opstamps and that add operations of the same
/// group are flushed into the same segment.
///
/// If the indexing pipeline is full, this call may block.
///
/// Each operation of the given `user_operations` will receive an in-order,
/// contiguous u64 opstamp. The entire batch itself is also given an
/// opstamp that is 1 greater than the last given operation. This
/// `batch_opstamp` is the return value of `run`. An empty group of
/// `user_operations`, an empty `Vec<UserOperation>`, still receives
/// a valid opstamp even though no changes were _actually_ made to the index.
///
/// Like adds and deletes (see `IndexWriter.add_document` and
/// `IndexWriter.delete_term`), the changes made by calling `run` will be
/// visible to readers only after calling `commit()`.
pub fn run(&mut self, user_operations: Vec<UserOperation>) -> u64 {
let count = user_operations.len() as u64;
if count == 0 {
return self.stamper.stamp();
}
let (batch_opstamp, stamps) = self.get_batch_opstamps(count);

let mut adds: Vec<AddOperation> = Vec::new();

for (user_op, opstamp) in user_operations.into_iter().zip(stamps) {
match user_op {
UserOperation::Delete(term) => {
let delete_operation = DeleteOperation {
opstamp: opstamp,
term: term,
};
self.delete_queue.push(delete_operation);
}
UserOperation::Add(doc) => {
let add_operation = AddOperation {
opstamp: opstamp,
document: doc,
};
adds.push(add_operation);
}
}
}
let send_result = self.document_sender.send(adds);
if let Err(e) = send_result {
panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e);
};

batch_opstamp
}
}

#[cfg(test)]
mod tests {

use super::super::operation::UserOperation;
use super::initial_table_size;
use directory::error::LockError;
use error::*;
use indexer::NoMergePolicy;
use schema::{self, Document};
use schema::{self, Document, IndexRecordOption};
use query::{TermQuery};
use collector::TopDocs;
use Index;
use Term;

#[test]
fn test_operations_group() {
// an operations group with 2 items should cause 3 opstamps 0, 1, and 2.
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let operations = vec![
UserOperation::Add(doc!(text_field=>"a")),
UserOperation::Add(doc!(text_field=>"b")),
];
let batch_opstamp1 = index_writer.run(operations);
assert_eq!(batch_opstamp1, 2u64);
}

#[test]
fn test_ordered_batched_operations() {
// * one delete for `doc!(field=>"a")`
// * one add for `doc!(field=>"a")`
// * one add for `doc!(field=>"b")`
// * one delete for `doc!(field=>"b")`
// after commit there is one doc with "a" and 0 doc with "b"
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let a_term = Term::from_field_text(text_field, "a");
let b_term = Term::from_field_text(text_field, "b");
let operations = vec![
UserOperation::Delete(a_term),
UserOperation::Add(doc!(text_field=>"a")),
UserOperation::Add(doc!(text_field=>"b")),
UserOperation::Delete(b_term),
];

index_writer.run(operations);
index_writer.commit().expect("failed to commit");
index.load_searchers().expect("failed to load searchers");

let a_term = Term::from_field_text(text_field, "a");
let b_term = Term::from_field_text(text_field, "b");

let a_query = TermQuery::new(a_term, IndexRecordOption::Basic);
let b_query = TermQuery::new(b_term, IndexRecordOption::Basic);

let searcher = index.searcher();

let a_docs = searcher
.search(&a_query, &TopDocs::with_limit(1))
.expect("search for a failed");

let b_docs = searcher
.search(&b_query, &TopDocs::with_limit(1))
.expect("search for b failed");

assert_eq!(a_docs.len(), 1);
assert_eq!(b_docs.len(), 0);
}

#[test]
fn test_empty_operations_group() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer(3_000_000).unwrap();
let operations1 = vec![];
let batch_opstamp1 = index_writer.run(operations1);
assert_eq!(batch_opstamp1, 0u64);
let operations2 = vec![];
let batch_opstamp2 = index_writer.run(operations2);
assert_eq!(batch_opstamp2, 1u64);
}

#[test]
fn test_lockfile_stops_duplicates() {
let schema_builder = schema::Schema::builder();
Expand Down
7 changes: 7 additions & 0 deletions src/indexer/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,10 @@ pub struct AddOperation {
pub opstamp: u64,
pub document: Document,
}

/// UserOperation is an enum type that encapsulates other operation types.
#[derive(Eq, PartialEq, Debug)]
pub enum UserOperation {
Add(Document),
Delete(Term),
}
13 changes: 13 additions & 0 deletions src/indexer/stamper.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::ops::Range;
use std::sync::atomic::Ordering;
use std::sync::Arc;

Expand Down Expand Up @@ -60,6 +61,16 @@ impl Stamper {
pub fn stamp(&self) -> u64 {
self.0.fetch_add(1u64, Ordering::SeqCst) as u64
}

/// Given a desired count `n`, `stamps` returns an iterator that
/// will supply `n` number of u64 stamps.
pub fn stamps(&self, n: u64) -> Range<u64> {
let start = self.0.fetch_add(n, Ordering::SeqCst);
Range {
start: start,
end: start + n,
}
}
}

#[cfg(test)]
Expand All @@ -78,5 +89,7 @@ mod test {

assert_eq!(stamper.stamp(), 10u64);
assert_eq!(stamper_clone.stamp(), 11u64);
assert_eq!(stamper.stamps(3u64), (12..15));
assert_eq!(stamper.stamp(), 15u64);
}
}

0 comments on commit e14701e

Please sign in to comment.