Skip to content

Commit

Permalink
blockstore: Reduce intermediate collects from multi_get() (#4312)
Browse files Browse the repository at this point in the history
These methods create several unnecessary intermediate vectors. Instead,
just operate on the iterators directly to avoid the extra waste
  • Loading branch information
steviez authored Jan 8, 2025
1 parent a1844d1 commit 4d5daab
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 84 deletions.
71 changes: 32 additions & 39 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3741,19 +3741,15 @@ impl Blockstore {
let Some((_, all_ranges_end_index)) = completed_ranges.last().copied() else {
return Ok(vec![]);
};
let keys =
(all_ranges_start_index..=all_ranges_end_index).map(|index| (slot, u64::from(index)));
let keys = self.data_shred_cf.multi_get_keys(
(all_ranges_start_index..=all_ranges_end_index).map(|index| (slot, u64::from(index))),
);
let data_shred_iterator = self.data_shred_cf.multi_get_bytes(&keys);

let data_shreds: Result<Vec<Option<Vec<u8>>>> = self
.data_shred_cf
.multi_get_bytes(keys)
.into_iter()
.collect();
let data_shreds = data_shreds?;
let data_shreds: Result<Vec<Shred>> = data_shreds
.into_iter()
let data_shreds: Vec<Shred> = data_shred_iterator
.enumerate()
.map(|(idx, shred_bytes)| {
let shred_bytes = shred_bytes?;
if shred_bytes.is_none() {
if let Some(slot_meta) = slot_meta {
if slot > self.lowest_cleanup_slot() {
Expand All @@ -3769,20 +3765,19 @@ impl Blockstore {
);
}
}
return Err(BlockstoreError::InvalidShredData(Box::new(
bincode::ErrorKind::Custom(format!(
"Missing shred for slot {slot}, index {idx}"
)),
)));
return Err(BlockstoreError::MissingShred(
slot,
u64::try_from(idx).unwrap(),
));
}

Shred::new_from_serialized_shred(shred_bytes.unwrap()).map_err(|err| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("Could not reconstruct shred from shred payload: {err:?}"),
)))
})
})
.collect();
let data_shreds = data_shreds?;
.collect::<Result<Vec<_>>>()?;

completed_ranges
.into_iter()
Expand Down Expand Up @@ -3891,12 +3886,13 @@ impl Blockstore {
is_retransmitter_signed: false,
});
};
let keys = (start_index..=last_shred_index).map(|index| (slot, index));
let keys = self
.data_shred_cf
.multi_get_keys((start_index..=last_shred_index).map(|index| (slot, index)));

let deduped_shred_checks: Vec<(Hash, bool)> = self
.data_shred_cf
.multi_get_bytes(keys)
.into_iter()
.multi_get_bytes(&keys)
.enumerate()
.map(|(offset, shred_bytes)| {
let shred_bytes = shred_bytes.ok().flatten().ok_or_else(|| {
Expand Down Expand Up @@ -3937,20 +3933,18 @@ impl Blockstore {
/// Returns a mapping from each elements of `slots` to a list of the
/// element's children slots.
pub fn get_slots_since(&self, slots: &[Slot]) -> Result<HashMap<Slot, Vec<Slot>>> {
let slot_metas: Result<Vec<Option<SlotMeta>>> = self
.meta_cf
.multi_get(slots.iter().copied())
.into_iter()
.collect();
let slot_metas = slot_metas?;

let result: HashMap<Slot, Vec<Slot>> = slots
.iter()
.zip(slot_metas)
.filter_map(|(slot, meta)| meta.map(|meta| (*slot, meta.next_slots.to_vec())))
.collect();
let keys = self.meta_cf.multi_get_keys(slots.iter().copied());
let slot_metas = self.meta_cf.multi_get(&keys);

let mut slots_since: HashMap<Slot, Vec<Slot>> = HashMap::with_capacity(slots.len());
for meta in slot_metas.into_iter() {
let meta = meta?;
if let Some(meta) = meta {
slots_since.insert(meta.slot, meta.next_slots);
}
}

Ok(result)
Ok(slots_since)
}

pub fn is_root(&self, slot: Slot) -> bool {
Expand Down Expand Up @@ -5694,12 +5688,11 @@ pub mod tests {
.expect("Expected meta object to exist");
assert_eq!(result, meta);
}
let mut keys: Vec<u64> = vec![0; TEST_PUT_ENTRY_COUNT];
for (i, key) in keys.iter_mut().enumerate().take(TEST_PUT_ENTRY_COUNT) {
*key = u64::try_from(i).unwrap();
}
let values = blockstore.meta_cf.multi_get(keys);
for (i, value) in values.iter().enumerate().take(TEST_PUT_ENTRY_COUNT) {
let keys = blockstore
.meta_cf
.multi_get_keys(0..TEST_PUT_ENTRY_COUNT as Slot);
let values = blockstore.meta_cf.multi_get(&keys);
for (i, value) in values.enumerate().take(TEST_PUT_ENTRY_COUNT) {
let k = u64::try_from(i).unwrap();
assert_eq!(
value.as_ref().unwrap().as_ref().unwrap(),
Expand Down
104 changes: 59 additions & 45 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1451,33 +1451,45 @@ where
result
}

pub(crate) fn multi_get_bytes<I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>>>
/// Create a key type suitable for use with multi_get_bytes() and
/// multi_get(). Those functions return iterators, so the keys must be
/// created with a separate function in order to live long enough
pub(crate) fn multi_get_keys<I>(&self, keys: I) -> Vec<Vec<u8>>
where
I: IntoIterator<Item = C::Index>,
{
let keys: Vec<_> = keys.into_iter().map(C::key).collect();
{
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status,
);
let result = self
.backend
.multi_get_cf(self.handle(), &keys)
.map(|out| Ok(out?.as_deref().map(<[u8]>::to_vec)))
.collect::<Vec<Result<Option<_>>>>();
if let Some(op_start_instant) = is_perf_enabled {
// use multi-get instead
report_rocksdb_read_perf(
C::NAME,
PERF_METRIC_OP_NAME_MULTI_GET,
&op_start_instant.elapsed(),
&self.column_options,
);
}
keys.into_iter().map(C::key).collect()
}

result
pub(crate) fn multi_get_bytes<'a, I, E>(
&'a self,
keys: I,
) -> impl Iterator<Item = Result<Option<Vec<u8>>>> + 'a
where
I: IntoIterator<Item = &'a E> + 'a,
E: AsRef<[u8]> + 'a + ?Sized,
{
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status,
);

let result = self
.backend
.multi_get_cf(self.handle(), keys)
.map(|out| Ok(out?.as_deref().map(<[u8]>::to_vec)));

if let Some(op_start_instant) = is_perf_enabled {
// use multi-get instead
report_rocksdb_read_perf(
C::NAME,
PERF_METRIC_OP_NAME_MULTI_GET,
&op_start_instant.elapsed(),
&self.column_options,
);
}

result
}

pub fn iter(
Expand Down Expand Up @@ -1634,33 +1646,35 @@ impl<C, const K: usize> LedgerColumn<C, K>
where
C: TypedColumn + ColumnName,
{
pub(crate) fn multi_get<I>(&self, keys: I) -> Vec<Result<Option<C::Type>>>
pub(crate) fn multi_get<'a, I, E>(
&'a self,
keys: I,
) -> impl Iterator<Item = Result<Option<C::Type>>> + 'a
where
I: IntoIterator<Item = C::Index>,
I: IntoIterator<Item = &'a E> + 'a,
E: AsRef<[u8]> + 'a + ?Sized,
{
let keys: Vec<_> = keys.into_iter().map(C::key).collect();
{
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status,
);
let result = self
.backend
.multi_get_cf(self.handle(), &keys)
.map(|out| Ok(out?.as_deref().map(deserialize).transpose()?))
.collect::<Vec<Result<Option<_>>>>();
if let Some(op_start_instant) = is_perf_enabled {
// use multi-get instead
report_rocksdb_read_perf(
C::NAME,
PERF_METRIC_OP_NAME_MULTI_GET,
&op_start_instant.elapsed(),
&self.column_options,
);
}
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status,
);

let result = self
.backend
.multi_get_cf(self.handle(), keys)
.map(|out| Ok(out?.as_deref().map(deserialize).transpose()?));

result
if let Some(op_start_instant) = is_perf_enabled {
// use multi-get instead
report_rocksdb_read_perf(
C::NAME,
PERF_METRIC_OP_NAME_MULTI_GET,
&op_start_instant.elapsed(),
&self.column_options,
);
}

result
}

pub fn get(&self, index: C::Index) -> Result<Option<C::Type>> {
Expand Down

0 comments on commit 4d5daab

Please sign in to comment.