Skip to content

Commit

Permalink
csv+serde based writing, new SumNotEmpty to mimic bedtools map's sum.
Browse files Browse the repository at this point in the history
 - New `SerializableDatumType` and `DatumType::into_serializable()` that
   wraps in a runtime serialization config, e.g. for precision, missing
   values, etc.

 - Removed trait `TsvSerialize` and `to_tsv()` methods.
 - New special `Operation::SumNotEmpty` that returns a `DatumType::NoValue`
   to mimic bedtools map sum's behavior. This was to simplify testing and
   some users may want this behavior.
 - Renamed `deserialize_bed_missing` to `bed_missing` for ergonomics.
 - New parse capacity, reusing buffer (didn't seem to speed up much?).
 - Much better testing against bedtools map.
 - New testing utility functions: `head_file()` file looking at temp
   files, assert macros for floats and option floats.
 - New test case against bedtools map with multiple operations (before,
   only had unchecked benchmark! yikes)
 - Added a header to the `example.bed` BED file to mess stuff up :)
  • Loading branch information
vsbuffalo committed Feb 29, 2024
1 parent de235dc commit cd05f1f
Show file tree
Hide file tree
Showing 15 changed files with 413 additions and 339 deletions.
79 changes: 44 additions & 35 deletions src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
//! Command functions that implement each of the `granges` subcommands.

use std::path::PathBuf;
use std::{path::PathBuf, io, fs::File};

use csv::WriterBuilder;

use crate::{
data::{operations::Operation, DatumType},
data::{operations::Operation, SerializableDatumType},
io::{
parsers::{Bed5Iterator, GenomicRangesParser},
tsv::BED_TSV,
OutputStream,
},
prelude::*,
ranges::{operations::adjust_range, GenomicRangeEmptyRecord, GenomicRangeRecord},
ranges::{operations::adjust_range, GenomicRangeRecordEmpty, GenomicRangeRecord},
reporting::{CommandOutput, Report},
test_utilities::{random_granges, random_granges_mock_bed5},
traits::TsvSerialize,
Position, PositionOffset,
};

Expand Down Expand Up @@ -57,15 +57,18 @@ pub fn granges_adjust(
) -> Result<CommandOutput<()>, GRangesError> {
let genome = read_seqlens(seqlens)?;

// Setup Output stream -- header is None for now (TODO).
let output_stream = output.map_or(OutputStream::new_stdout(None), |file| {
OutputStream::new(file, None)
});
let mut writer = output_stream.writer()?;
let writer_boxed: Box<dyn io::Write> = match output {
Some(path) => Box::new(File::create(path)?),
None => Box::new(io::stdout()),
};

let mut writer = WriterBuilder::new()
.delimiter(b'\t')
.has_headers(false)
.from_writer(writer_boxed);

// For reporting stuff to the user.
let mut report = Report::new();

let mut skipped_ranges = 0;

if !sort {
Expand All @@ -84,16 +87,16 @@ pub fn granges_adjust(
let possibly_adjusted_range = adjust_range(range, -both, both, length);

if let Some(range_adjusted) = possibly_adjusted_range {
writeln!(writer, "{}", &range_adjusted.to_tsv(&BED_TSV))?;
writer.serialize(range_adjusted)?;
} else {
skipped_ranges += 1;
}

if skipped_ranges > 0 {
report.add_issue(format!(
"{} ranges were removed because their widths after adjustment were ≤ 0",
skipped_ranges
))
"{} ranges were removed because their widths after adjustment were ≤ 0",
skipped_ranges
))
}
}
} else {
Expand Down Expand Up @@ -158,7 +161,7 @@ pub fn granges_filter(
right_path: &PathBuf,
output: Option<&PathBuf>,
skip_missing: bool,
) -> Result<CommandOutput<()>, GRangesError> {
) -> Result<CommandOutput<()>, GRangesError> {
let genome = read_seqlens(seqlens)?;
let seqnames: Vec<String> = genome.keys().cloned().collect();

Expand Down Expand Up @@ -197,7 +200,7 @@ pub fn granges_filter(
right_gr = GRanges::from_iter(
right.try_unwrap_data().retain_seqnames(&seqnames),
&genome,
)?;
)?;
} else {
left_gr = GRangesEmpty::from_iter(left, &genome)?;
right_gr = GRanges::from_iter(right.try_unwrap_data(), &genome)?;
Expand Down Expand Up @@ -240,7 +243,7 @@ pub fn granges_filter(
right_gr = GRanges::from_iter(
right.try_unwrap_data().retain_seqnames(&seqnames),
&genome,
)?;
)?;
} else {
left_gr = GRanges::from_iter(left.try_unwrap_data(), &genome)?;
right_gr = GRanges::from_iter(right.try_unwrap_data(), &genome)?;
Expand Down Expand Up @@ -289,7 +292,7 @@ pub fn granges_flank(
output: Option<&PathBuf>,
skip_missing: bool,
mode: ProcessingMode,
) -> Result<CommandOutput<()>, GRangesError> {
) -> Result<CommandOutput<()>, GRangesError> {
let genome = read_seqlens(seqlens)?;
let seqnames: Vec<String> = genome.keys().cloned().collect();
let ranges_iter = GenomicRangesFile::parsing_iterator(bedfile)?;
Expand Down Expand Up @@ -326,11 +329,15 @@ pub fn granges_flank(
}
},
ProcessingMode::Streaming => {
// Setup Output stream -- header is None for now (TODO).
let output_stream = output.map_or(OutputStream::new_stdout(None), |file| {
OutputStream::new(file, None)
});
let mut writer = output_stream.writer()?;
let writer_boxed: Box<dyn io::Write> = match output {
Some(path) => Box::new(File::create(path)?),
None => Box::new(io::stdout()),
};

let mut writer = WriterBuilder::new()
.delimiter(b'\t')
.has_headers(false)
.from_writer(writer_boxed);

match ranges_iter {
// FIXME: code redundancy. But too early now to design traits, etc.
Expand All @@ -346,7 +353,7 @@ pub fn granges_flank(
let flanking_ranges = range
.flanking_ranges::<GenomicRangeRecord<String>>(left, right, length);
for flanking_range in flanking_ranges {
writeln!(writer, "{}", &flanking_range.to_tsv(&BED_TSV))?;
writer.serialize(flanking_range)?;
}
}
} else {
Expand All @@ -358,9 +365,9 @@ pub fn granges_flank(
.ok_or(GRangesError::MissingSequence(seqname.to_string()))?;

let flanking_ranges = range
.flanking_ranges::<GenomicRangeEmptyRecord>(left, right, length);
.flanking_ranges::<GenomicRangeRecordEmpty>(left, right, length);
for flanking_range in flanking_ranges {
writeln!(writer, "{}", &flanking_range.to_tsv(&BED_TSV))?;
writer.serialize(flanking_range)?;
}
}
}
Expand All @@ -380,7 +387,7 @@ pub fn granges_flank(
let flanking_ranges = range
.flanking_ranges::<GenomicRangeRecord<String>>(left, right, length);
for flanking_range in flanking_ranges {
writeln!(writer, "{}", &flanking_range.to_tsv(&BED_TSV))?;
writer.serialize(flanking_range)?;
}
}
} else {
Expand All @@ -392,9 +399,9 @@ pub fn granges_flank(
.ok_or(GRangesError::MissingSequence(seqname.to_string()))?;

let flanking_ranges = range
.flanking_ranges::<GenomicRangeEmptyRecord>(left, right, length);
.flanking_ranges::<GenomicRangeRecordEmpty>(left, right, length);
for flanking_range in flanking_ranges {
writeln!(writer, "{}", &flanking_range.to_tsv(&BED_TSV))?;
writer.serialize(flanking_range)?;
}
}
}
Expand All @@ -417,7 +424,7 @@ pub fn granges_map(
operations: Vec<Operation>,
output: Option<&PathBuf>,
skip_missing: bool,
) -> Result<CommandOutput<()>, GRangesError> {
) -> Result<CommandOutput<()>, GRangesError> {
let genome = read_seqlens(seqlens)?;
let seqnames: Vec<String> = genome.keys().cloned().collect();
let report = Report::new();
Expand Down Expand Up @@ -470,8 +477,10 @@ pub fn granges_map(
// Run all operations on the scores.
operations
.iter()
.map(|operation| operation.run(&mut overlap_scores))
.collect::<Vec<DatumType>>()
.map(|operation| {
operation.run(&mut overlap_scores).into_serializable(&BED_TSV)
})
.collect::<Vec<SerializableDatumType>>()
})?;

result_gr.write_to_tsv(output, &BED_TSV)?;
Expand All @@ -486,7 +495,7 @@ pub fn granges_windows(
step: Option<Position>,
chop: bool,
output: Option<impl Into<PathBuf>>,
) -> Result<CommandOutput<()>, GRangesError> {
) -> Result<CommandOutput<()>, GRangesError> {
let genome = read_seqlens(seqlens)?;
GRangesEmpty::from_windows(&genome, width, step, chop)?.write_to_tsv(output, &BED_TSV)?;
let report = Report::new();
Expand All @@ -500,7 +509,7 @@ pub fn granges_random_bed(
output: Option<impl Into<PathBuf>>,
sort: bool,
bed5: bool,
) -> Result<CommandOutput<()>, GRangesError> {
) -> Result<CommandOutput<()>, GRangesError> {
// get the genome info
let genome = read_seqlens(seqlens)?;

Expand Down
46 changes: 42 additions & 4 deletions src/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
//! Data container implementations.
//!

use crate::traits::{DataContainer, IntoDatumType};
use crate::{
io::TsvConfig,
traits::{DataContainer, IntoDatumType},
};
use serde::ser::Serializer;
use serde::Serialize;

#[cfg(feature = "ndarray")]
pub mod ndarray;
Expand All @@ -12,9 +17,9 @@ impl<U> DataContainer for Vec<U> {}
impl DataContainer for () {}

/// These are core supported data types stored in an `enum`, to
/// unify the types that come out of standard operations like
/// `select()`.
#[derive(Debug, Clone)]
/// unify the types that come out of standard operations of
/// heterogeneous output types.
#[derive(Debug, Clone, Serialize)]
pub enum DatumType {
Float32(f32),
Float64(f64),
Expand All @@ -26,6 +31,39 @@ pub enum DatumType {
NoValue,
}

impl DatumType {
pub fn into_serializable(self, config: &TsvConfig) -> SerializableDatumType {
SerializableDatumType {
datum: self,
config,
}
}
}

#[derive(Debug, Clone)]
pub struct SerializableDatumType<'a> {
pub datum: DatumType,
pub config: &'a TsvConfig,
}

impl<'a> Serialize for SerializableDatumType<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match &self.datum {
DatumType::NoValue => serializer.serialize_str(&self.config.no_value_string),
DatumType::Float32(value) => serializer.serialize_str(&value.to_string()),
DatumType::Float64(value) => serializer.serialize_str(&value.to_string()),
DatumType::String(value) => serializer.serialize_str(value),
DatumType::Integer32(value) => serializer.serialize_str(&value.to_string()),
DatumType::Integer64(value) => serializer.serialize_str(&value.to_string()),
DatumType::Unsigned32(value) => serializer.serialize_str(&value.to_string()),
DatumType::Unsigned64(value) => serializer.serialize_str(&value.to_string()),
}
}
}

impl IntoDatumType for f64 {
fn into_data_type(self) -> DatumType {
DatumType::Float64(self)
Expand Down
14 changes: 2 additions & 12 deletions src/data/ndarray.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

use crate::error::GRangesError;
use crate::granges::GRanges;
use crate::io::TsvConfig;
use crate::traits::{DataContainer, IndexedDataContainer, RangeContainer, TsvSerialize};
use ndarray::{Array1, Array2, ArrayBase, ArrayView1, Dim, OwnedRepr};
use crate::traits::{DataContainer, IndexedDataContainer, RangeContainer};
use ndarray::{Array1, Array2, ArrayView1};

impl<T> DataContainer for Array1<T> {}
impl<T> DataContainer for Array2<T> {}
Expand Down Expand Up @@ -210,15 +209,6 @@ where
}
}

impl<T: TsvSerialize> TsvSerialize for ArrayBase<OwnedRepr<T>, Dim<[usize; 1]>> {
fn to_tsv(&self, config: &TsvConfig) -> String {
self.iter()
.map(|x| x.to_tsv(config))
.collect::<Vec<_>>()
.join("\t")
}
}

#[cfg(test)]
mod tests {
use crate::test_utilities::granges_test_case_01;
Expand Down
15 changes: 15 additions & 0 deletions src/data/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,19 @@ pub fn median<F: Float + Sum>(numbers: &mut [F]) -> Option<F> {
/// The (subset of) standard `bedtools map` operations.
#[derive(Clone, Debug, ValueEnum)]
pub enum Operation {
/// Calculate the sum of all values (a set of zero elements has sum 0.0).
Sum,
/// Calculate the sum of all values, but set of zero elements is a missing value, not 0.0.
SumNotEmpty,
/// Calculate the minimum of values.
Min,
/// Calculate the maximum of values.
Max,
/// Calculate the mean of values.
Mean,
/// Calculate the median of values.
Median,
/// Concatenate all values into a string separated by commas.
Collapse,
}

Expand All @@ -53,6 +61,13 @@ impl Operation {
let sum: T = data.iter().copied().sum();
sum.into_data_type()
}
Operation::SumNotEmpty => {
if data.is_empty() {
return DatumType::NoValue;
}
let sum: T = data.iter().copied().sum();
sum.into_data_type()
}
Operation::Min => {
let min = data
.iter()
Expand Down
Loading

0 comments on commit cd05f1f

Please sign in to comment.