Skip to content

Commit

Permalink
compact set mostly done
Browse files Browse the repository at this point in the history
  • Loading branch information
tiptenbrink committed Jul 29, 2024
1 parent 311f433 commit 9bd7731
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 38 deletions.
33 changes: 18 additions & 15 deletions tiauth-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,25 @@ tiauth-core = { path = ".", features = ["test", "action", "app"] }
name = "deser"
harness = false

[[bin]]
name = "experiment"
required-features = ["test"]
# [[bin]]
# name = "experiment"
# required-features = ["test"]

[[bin]]
name = "experiment_big"
required-features = ["test"]
# [[bin]]
# name = "experiment_big"
# required-features = ["test"]

[[bin]]
name = "experiment_ses"
required-features = ["test"]
# [[bin]]
# name = "experiment_ses"
# required-features = ["test"]

[[bin]]
name = "multi_db"
required-features = ["test"]
# [[bin]]
# name = "multi_db"
# required-features = ["test"]

[[bin]]
name = "shared_counter"
required-features = ["test"]
# [[bin]]
# name = "shared_counter"
# required-features = ["test"]

# [profile.release]
# debug = true
192 changes: 170 additions & 22 deletions tiauth-core/src/mergerange.rs → tiauth-core/src/compactset.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,50 @@
use std::{cmp::Ordering, collections::VecDeque};
use std::{cmp::Ordering, collections::VecDeque, ops::DerefMut, path::Display, sync::{Arc, Mutex}, time::Instant};

#[derive(Clone)]
/// A data structure that tracks whether it has already seen a u64 value with as little space as possible.
/// Probabilistic data structures (like a Bloom filter) need 10+ bits per element if you want a decent error rate, but
/// our values are in a small(ish) and predictable range. It's designed to use less space than a bit arrray in the case
/// that values are mostly checked in order. If certain values have very large deviations (i.e. they arrive much earlier
/// or later than expected), a bit array might perform better space wise (and it certainly will be better time wise).
/// Until real-world data can be gathered, it will be difficult to evaluate. Currently, the structure is hidden behind
/// a Mutex to allow access from multiple threads.
pub struct CompactSet {
ranges: Arc<Mutex<VecDeque<Range<64>>>>
}

impl CompactSet {
pub fn new() -> Self {
let mut ranges = VecDeque::new();
ranges.push_back(Range::new(1));

Self {
ranges: Arc::new(Mutex::new(ranges))
}
}

pub fn num_exists(&self, num: u64, expires: u64, time: Option<u64>) -> bool {
let mut ranges = self.ranges.lock().unwrap();

let exists = add_num(ranges.deref_mut(), num, expires);

// Some basic experimentation showed that by doing it every four provides the best space/time tradeoff
if time.is_some() && num % 4 == 0 {
check_expired(ranges.deref_mut(), time.unwrap());
}

exists
}
}

#[derive(Debug, Clone, Copy, PartialEq)]
enum Expiry {
At(u64),
Unknown
}

#[derive(Debug, Clone)]
#[derive(Clone)]
struct Range<const N: usize> {
members: Option<[u64; N]>,
members: Option<[u8; N]>,
min: u64,
max: u64,
expires: Expiry
Expand All @@ -17,7 +53,7 @@ struct Range<const N: usize> {
impl<const N: usize> Range<N> {
fn new(min: u64) -> Self {
Self {
members: Some([0u64; N]),
members: Some([0u8; N]),
min,
max: min+(N as u64)-1,
expires: Expiry::Unknown
Expand All @@ -35,17 +71,21 @@ impl<const N: usize> Range<N> {
}

fn add_num(&self, num: u64) -> Option<Self> {
// To conserve space, we represent numbers using the min as offset, because the array won't have a size
// that can't be represented with a single byte
// 0 is a special value, it represents the "unfilled" value, therefore 0 can never be used
let num_small = (num - self.min + 1) as u8;
self.members.and_then(|mut members| {
for i in 0..N {
let val = members[i];
if val == num {
if val == num_small {
return None
} else if val == 0 {
let members = if i == N - 1 {
// In this case we are the last index, so it's full now
None
} else {
members[i] = num;
members[i] = num_small;
Some(members)
};

Expand All @@ -63,6 +103,14 @@ impl<const N: usize> Range<N> {
}
}

impl<const N: usize> std::fmt::Debug for Range<N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{{}-{};{:?};exp={:?}}}", self.min, self.max, self.members.map(|a| a.map(|v| {
if v != 0 { (v as u64) + self.min - 1 } else { 0 }
})), self.expires)
}
}

// impl<const N: usize> PartialOrd for Range<N> {
// fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
// match self.expires {
Expand All @@ -88,11 +136,8 @@ fn add_num_ranges<const N: usize>(ranges: &mut VecDeque<Range<N>>, num: u64) ->
Err(u) => u
};

if n_i == 0 {
println!("{:?}", ranges);
}

let target_i = n_i-1;

let initial_max = {
ranges[target_i].max
};
Expand Down Expand Up @@ -151,7 +196,6 @@ fn add_num_ranges<const N: usize>(ranges: &mut VecDeque<Range<N>>, num: u64) ->
}
}
ranges.truncate(ranges.len()-2);
ranges.shrink_to_fit();
target_i-1
} else if let Some((prev_min, prev_expires)) = prev_min_exp {
ranges[target_i-1] = Range::full(prev_min, target_range.max, prev_expires);
Expand All @@ -162,7 +206,6 @@ fn add_num_ranges<const N: usize>(ranges: &mut VecDeque<Range<N>>, num: u64) ->
}
}
ranges.truncate(ranges.len()-1);
ranges.shrink_to_fit();
target_i-1
} else if let Some(next_max) = next_max {
ranges[target_i] = Range::full(target_range.min, next_max, target_range.expires);
Expand All @@ -173,7 +216,6 @@ fn add_num_ranges<const N: usize>(ranges: &mut VecDeque<Range<N>>, num: u64) ->
}
}
ranges.truncate(ranges.len()-1);
ranges.shrink_to_fit();
target_i
} else {
target_i
Expand All @@ -182,6 +224,13 @@ fn add_num_ranges<const N: usize>(ranges: &mut VecDeque<Range<N>>, num: u64) ->
(target_i, false)
}

// The majority of the time is spent in Range::add_num and binary_search (based on flamegraph)
/// The basic idea is that the data structure consists of a dynamic number of fixed-size ranges. It's inspired
/// by a binary tree map, but is actually stored in a contiguous, sorted vector. When a range is full, it can
/// join adjacent arrays that are also full to reduce the total size. We also track expiry times. When a
/// new value is received in a higher range with an expiry time, we know that all values in earlier ranges
/// must have lesser or equal expiry times. This allows us to mark them as full even if we never receive a
/// value, but they become expired.
fn add_num<const N: usize>(ranges: &mut VecDeque<Range<N>>, num: u64, expires: u64) -> bool {
let (target_i, contains) = add_num_ranges(ranges, num);

Expand All @@ -194,6 +243,23 @@ fn add_num<const N: usize>(ranges: &mut VecDeque<Range<N>>, num: u64, expires: u
contains
}

// fn exp_lin_search<const N: usize>(ranges: &VecDeque<Range<N>>, time: u64) -> usize {
// for i in 0..ranges.len() {
// let range = &ranges[i];
// match range.expires {
// Expiry::At(at) => {
// if at > time {
// return i
// }
// },
// Expiry::Unknown => return i,
// }
// }

// ranges.len()
// }

// Linear search is not faster, but doesn't seem much slower either, but for better worst-case we'll use binary search
fn expired_search<const N: usize>(ranges: &VecDeque<Range<N>>, time: u64) -> usize {
match ranges.binary_search_by(|r| {
match r.expires {
Expand All @@ -209,7 +275,9 @@ fn expired_search<const N: usize>(ranges: &VecDeque<Range<N>>, time: u64) -> usi
}
}

// Basically all time is spent in the binary search
fn check_expired<const N: usize>(ranges: &mut VecDeque<Range<N>>, time: u64) {
//let exp_i = exp_lin_search(ranges, time);
let exp_i = expired_search(ranges, time);
let first_min = ranges[0].min;

Expand All @@ -223,6 +291,7 @@ fn check_expired<const N: usize>(ranges: &mut VecDeque<Range<N>>, time: u64) {
ranges.rotate_left(exp_i);
ranges.truncate(ranges.len()-exp_i);
ranges.push_front(new_range);
ranges.shrink_to_fit();
}

// fn check_expired<const N: usize>(ranges: VecDeque<Range<N>>, time: u64) -> VecDeque<Range<N>> {
Expand All @@ -244,6 +313,31 @@ fn check_expired<const N: usize>(ranges: &mut VecDeque<Range<N>>, time: u64) {
// new_vec
// }

// struct BitHole {
// arr: VecDeque<u8>,
// offset: usize,
// first_hole: usize
// }

// const BIT_MASK: [u8; 8] = [128, 64, 32, 16, 8, 4, 2, 1];

// impl BitHole {
// fn add_num(&mut self, num: usize) -> bool {
// let b = (num-self.offset)/8;
// let b_i = num - b*8;
// while self.arr.len() < b {
// self.arr.push_back(0);
// }

// let exists = self.arr[b] & BIT_MASK[b_i] != 0;
// if !exists {
// self.arr[b] += BIT_MASK[b_i];
// }

// exists
// }
// }

#[cfg(test)]
mod test {
use std::{collections::{BTreeMap, BTreeSet, HashMap, HashSet}, time::Instant};
Expand All @@ -261,18 +355,20 @@ mod test {
for i in 1..10 {
let mut range = Range::<RANGE_SIZE>::new(i*(RANGE_SIZE as u64));
if i < 7 {
range.expires = Expiry::At(i*100);
range.expires = Expiry::At((i as u64)*100);
}
ranges.push_back(range)
}

println!("{:?}", ranges);
let exists = add_num(&mut ranges, 8, 150);
println!("{:?}", ranges);
assert!(!exists);
assert_eq!(ranges[3].members, Some([8, 0]));
assert_eq!(ranges[3].members, Some([1, 0]));
assert_eq!(ranges[2].expires, Expiry::At(150));
let exists = add_num(&mut ranges, 8, 150);
assert!(exists);
assert_eq!(ranges[3].members, Some([8, 0]));
assert_eq!(ranges[3].members, Some([1, 0]));

let exists = add_num(&mut ranges, 9, 125);
assert!(!exists);
Expand Down Expand Up @@ -343,12 +439,15 @@ mod test {

// When max_distance is 50 (size 50k), it's must more efficient than a bit array
// When max distance is 2.5k (size 50k), it's less efficient
// Same holds for much larger size
// For larger size it's much more efficient except for very significant shuffling
#[test]
fn routine() {
let mut rng = thread_rng();

let size = 500000;
// For better benchmark do 500k and 50 amount
// There it can reach 20M ops/secs
// `cargo test --package tiauth-core --lib --release --all-features -- compactset::test::routine --exact --show-output`
let size = 5000;
let amnt = 10;
let ops = amnt*size;
let mut add_time = 0f64;
Expand All @@ -363,7 +462,7 @@ mod test {
values.push((i, expiry));
}

lightly_shuffle(&mut values, size/1000);
lightly_shuffle(&mut values, size/100);

let mut ranges: VecDeque<Range<ROUTINE_SIZE>> = VecDeque::new();
ranges.push_back(Range::new(1));
Expand All @@ -373,14 +472,15 @@ mod test {
let mut time = 0;

for (i, expires) in values {
let around: i32 = rng.gen_range(-900..900);
//println!("sp: {}", ranges.len());
let around: i32 = rng.gen_range(-900..100);
time = time.max(0.max((expires as i32)+around) as u64);
let now = Instant::now();
add_num(&mut ranges, i as u64, expires);
add_time += now.elapsed().as_secs_f64();
max_space = max_space.max(ranges.len());
let now_again = Instant::now();
if i % 5 == 0 {
if i % 4 == 0 {
check_expired(&mut ranges, time);
}
check_time += now_again.elapsed().as_secs_f64();
Expand All @@ -393,6 +493,54 @@ mod test {
let max_space: usize = maxes.into_iter().sum::<usize>()/amnt;

println!("add: {} ops/s.\ncheck: {} ops/s.", (ops as f64)/add_time, (ops as f64)/check_time);
println!("avg max space: {}", (max_space)*(std::mem::size_of::<Range<ROUTINE_SIZE>>()));
let mem_size = std::mem::size_of::<Range<ROUTINE_SIZE>>();
println!("avg max space: {}; mem_size: {}", (max_space)*mem_size, mem_size);
println!("total {} ops/s.", (ops as f64)/(add_time+check_time));
}

#[test]
fn routine_compactset() {
let mut rng = thread_rng();

// For better benchmark do 500k and 50 amount
// There it can reach 20M ops/secs
// `cargo test --package tiauth-core --lib --release --all-features -- compactset::test::routine --exact --show-output`
let size = 500000;
let amnt = 50;
let ops = amnt*size;
let mut add_time = 0f64;
let mut check_time = 0f64;

for _ in 0..amnt {
let mut values = Vec::new();
let mut expiry: u64 = 0;
for i in 1..size {
expiry += rng.gen_range(0..1000);
values.push((i, expiry));
}

lightly_shuffle(&mut values, size/100);

let mut ranges: VecDeque<Range<ROUTINE_SIZE>> = VecDeque::new();
ranges.push_back(Range::new(1));

let mut compact_set = CompactSet::new();

compact_set.ranges = Arc::new(Mutex::new(ranges));

let mut time = 0;

for (i, expires) in values {
//println!("sp: {}", ranges.len());
let around: i32 = rng.gen_range(-900..100);
time = time.max(0.max((expires as i32)+around) as u64);
let now = Instant::now();
compact_set.num_exists((i as u64), expires, Some(time));
add_time += now.elapsed().as_secs_f64();
}
}


println!("total: {} ops/s.", (ops as f64)/add_time);
}
}
Loading

0 comments on commit 9bd7731

Please sign in to comment.