Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to using skani cli #48

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ needletail = "0.5.*"
bird_tool_utils-man = "0.4.0"
lazy_static = "1.4.0"
ansi_term = "0.12"
skani = "0.1.1"
# skani = "0.1.1"
# skani = { path = "../skani" }
concurrent-queue = "2"

Expand Down
1 change: 1 addition & 0 deletions galah.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ channels:
dependencies:
- dashing >= 0.4.0
- fastani >= 1.3
- skani >= 0.1.1
- extern
1 change: 1 addition & 0 deletions src/cluster_argument_parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,7 @@ pub fn generate_galah_clusterer<'a>(
)
)
}),
threads,
}),
_ => panic!("Programming error"),
},
Expand Down
2 changes: 2 additions & 0 deletions src/clusterer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ mod tests {
&crate::skani::SkaniPreclusterer {
threshold: 90.0,
min_aligned_threshold: 0.2,
threads: 1,
},
&crate::skani::SkaniClusterer {
threshold: 99.0,
Expand All @@ -650,6 +651,7 @@ mod tests {
&crate::skani::SkaniPreclusterer {
threshold: 90.0,
min_aligned_threshold: 0.2,
threads: 1,
},
&crate::skani::SkaniClusterer {
threshold: 99.0,
Expand Down
252 changes: 142 additions & 110 deletions src/skani.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use std;
use std::io::BufReader;
use std::io::Write;

use crate::sorted_pair_genome_distance_cache::SortedPairGenomeDistanceCache;
use crate::ClusterDistanceFinder;
use crate::PreclusterDistanceFinder;

use rayon::prelude::*;

use skani::chain;
use skani::file_io;
use skani::params::*;
use skani::screen;

use concurrent_queue::ConcurrentQueue;
use bird_tool_utils::command::finish_command_safely;
use tempfile;

pub struct SkaniPreclusterer {
pub threshold: f32,
pub min_aligned_threshold: f32,
pub threads: u16,
}

impl PreclusterDistanceFinder for SkaniPreclusterer {
Expand All @@ -22,6 +21,7 @@ impl PreclusterDistanceFinder for SkaniPreclusterer {
genome_fasta_paths,
self.threshold,
self.min_aligned_threshold,
self.threads,
)
}

Expand All @@ -34,6 +34,7 @@ fn precluster_skani(
genome_fasta_paths: &[&str],
threshold: f32,
min_aligned_threshold: f32,
threads: u16,
) -> SortedPairGenomeDistanceCache {
if threshold < 85.0 {
panic!(
Expand All @@ -42,74 +43,94 @@ fn precluster_skani(
);
}

let (command_params, sketch_params) = default_params(Mode::Dist, min_aligned_threshold);

debug!("Sketching genomes with skani ..");
let fasta_strings = genome_fasta_paths
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>();
// Note that sketches is now shuffled!
let sketches = &file_io::fastx_to_sketches(&fasta_strings, &sketch_params, true);

// Right now implemented by parallel collection into a queue, and then
// reprocessed into a BTreeMap. Could potentially be made more efficient by
// direct collection into a concurrent BTreeMap, but eh for now.
let queue = ConcurrentQueue::unbounded();

// Screen genomes before ANI calculation
let kmer_to_sketch = screen::kmer_to_sketch_from_refs(sketches);

info!("Calculating ANI from skani sketches ..");
sketches.par_iter().enumerate().for_each(|(i, ref_sketch)| {
let ref_sketch_i = &sketches[i];
let screened_refs = screen::screen_refs(
0.80,
&kmer_to_sketch,
ref_sketch_i,
&sketch_params,
sketches,
);
debug!(
"{} has {} refs passing screening.",
ref_sketch_i.file_name,
screened_refs.len()
);
// Create a tempfile to list all the fasta file paths
let mut tf = tempfile::Builder::new()
.prefix("galah-input-genomes")
.suffix(".txt")
.tempfile()
.expect("Failed to open temporary file to run skani");

for fasta in genome_fasta_paths {
writeln!(tf, "{}", fasta)
.expect("Failed to write genome fasta paths to tempfile for skani");
}

// --sparse only outputs non-zero entries in an edge-list output
// Ref_file Query_file ANI Align_fraction_ref Align_fraction_query Ref_name Query_name
info!("Running skani to get distances ..");
let mut cmd = std::process::Command::new("skani");
cmd.arg("triangle")
.arg("-t")
.arg(&format!("{}", threads))
.arg("--sparse")
.arg("--min-af")
.arg(&format!("{}", min_aligned_threshold * 100.0))
.arg("-l")
.arg(tf.path().to_str().unwrap())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
debug!("Running skani command: {:?}", &cmd);

// Parse the distances
let mut process = cmd
.spawn()
.unwrap_or_else(|_| panic!("Failed to spawn {}", "skani"));
let stdout = process.stdout.as_mut().unwrap();
let stdout_reader = BufReader::new(stdout);

screened_refs.into_par_iter().for_each(|j| {
if j > i {
let map_params = chain::map_params_from_sketch(ref_sketch, false, &command_params);
let ani_result = chain::chain_seeds(ref_sketch, &sketches[j], map_params);
let ani = ani_result.ani * 100.;
let ref_name = ref_sketch.file_name.clone();
let query_name = sketches[j].file_name.clone();
let mut rdr = csv::ReaderBuilder::new()
.delimiter(b'\t')
.has_headers(true)
.from_reader(stdout_reader);

debug!("Pushing ANI result for {} and {}", ref_name, query_name);
let ref_index = genome_fasta_paths
let mut distances = SortedPairGenomeDistanceCache::new();

// Order is likely not conserved, so need to keep track
for record_res in rdr.records() {
match record_res {
Ok(record) => {
debug!("Found skani record {:?}", record);

// Get index of genomes within genome_fasta_paths
let genome_id1 = genome_fasta_paths
.iter()
.position(|&r| r == ref_name)
.unwrap();
let query_index = genome_fasta_paths
.position(|&x| x == &record[0])
.unwrap_or_else(|| {
panic!(
"Failed to find genome fasta path in genome_fasta_paths: {}",
&record[0]
)
});
let genome_id2 = genome_fasta_paths
.iter()
.position(|&r| r == query_name)
.unwrap();
.position(|&x| x == &record[1])
.unwrap_or_else(|| {
panic!(
"Failed to find genome fasta path in genome_fasta_paths: {}",
&record[1]
)
});

let ani: f32 = record[2].parse().unwrap_or_else(|_| {
panic!("Failed to convert skani ANI to float value: {}", &record[2])
});
trace!("Found ANI {}", ani);
if ani >= threshold {
queue
.push((ref_index, query_index, ani))
.expect("Failed to push to queue during preclustering");
trace!("Accepting ANI since it passed threshold");
distances.insert((genome_id1, genome_id2), Some(ani))
}
}
});
});
debug!("Converting ANI results into sparse cache ..");

let mut to_return = SortedPairGenomeDistanceCache::new();
while let Ok((i, j, ani)) = queue.pop() {
to_return.insert((i, j), Some(ani));
Err(e) => {
error!("Error parsing skani output: {}", e);
std::process::exit(1);
}
}
}
debug!("Finished skani.");
finish_command_safely(process, "skani");
debug!("Found skani distances: {:#?}", distances);
info!("Finished skani triangle.");

to_return
distances
}

pub struct SkaniClusterer {
Expand All @@ -135,52 +156,61 @@ impl ClusterDistanceFinder for SkaniClusterer {
}
}

fn default_params(mode: Mode, min_aligned_frac: f32) -> (CommandParams, SketchParams) {
let cmd_params = CommandParams {
screen: true,
screen_val: 0.00,
mode,
out_file_name: "".to_string(),
ref_files: vec![],
query_files: vec![],
refs_are_sketch: false,
queries_are_sketch: false,
robust: false,
median: false,
sparse: false,
full_matrix: false,
max_results: 10000000, // for Triange usize::MAX,
individual_contig_q: false,
individual_contig_r: false,
min_aligned_frac: min_aligned_frac as f64,
keep_refs: false,
est_ci: false,
learned_ani: true,
learned_ani_cmd: false,
detailed_out: false,
// distance: false,
// rescue_small: true,
};

let m = 1000;
let c = 125;
let k = 15;
let sketch_params = SketchParams::new(m, c, k, false, false);
(cmd_params, sketch_params)
}
pub fn calculate_skani(fasta1: &str, fasta2: &str, min_aligned_threshold: f32) -> f32 {
// --sparse only outputs non-zero entries in an edge-list output
// Ref_file Query_file ANI Align_fraction_ref Align_fraction_query Ref_name Query_name
info!("Running skani to get distances ..");
let mut cmd = std::process::Command::new("skani");
cmd.arg("dist")
.arg("--min-af")
.arg(&format!("{}", min_aligned_threshold * 100.0))
.arg("-q")
.arg(fasta1)
.arg("-r")
.arg(fasta2)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
debug!("Running skani command: {:?}", &cmd);

// Parse the distances
let mut process = cmd
.spawn()
.unwrap_or_else(|_| panic!("Failed to spawn {}", "skani"));
let stdout = process.stdout.as_mut().unwrap();
let stdout_reader = BufReader::new(stdout);

let mut rdr = csv::ReaderBuilder::new()
.delimiter(b'\t')
.has_headers(true)
.from_reader(stdout_reader);

pub fn calculate_skani(fasta1: &str, fasta2: &str, min_aligned_frac: f32) -> f32 {
//Vector of Strings
let refs = vec![fasta1.to_string()];
let queries = vec![fasta2.to_string()];
let mut num_records = 0;
let mut to_return = 0.0;

let (command_params, sketch_params) = default_params(Mode::Dist, min_aligned_frac);
let ref_sketch = &file_io::fastx_to_sketches(&refs, &sketch_params, true)[0];
let query_sketch = &file_io::fastx_to_sketches(&queries, &sketch_params, true)[0];
let map_params = chain::map_params_from_sketch(ref_sketch, false, &command_params);
let ani_result = chain::chain_seeds(ref_sketch, query_sketch, map_params);
for record_res in rdr.records() {
match record_res {
Ok(record) => {
if num_records > 0 {
error!("Unexpectedly found >1 result from skani");
std::process::exit(1);
}

assert!(record.len() == 7);
to_return = record[2].parse().unwrap_or_else(|_| {
panic!("Failed to convert skani ANI to float value: {}", &record[2])
});
num_records += 1;
}
Err(e) => {
error!("Error parsing skani output: {}", e);
std::process::exit(1);
}
}
}

ani_result.ani * 100.0
debug!("skani of {} against {} was {:?}", fasta1, fasta2, to_return);
finish_command_safely(process, "skani");
to_return
}

#[cfg(test)]
Expand All @@ -206,6 +236,7 @@ mod tests {
],
80.0,
0.2,
1,
);
}

Expand All @@ -221,6 +252,7 @@ mod tests {
],
95.0,
0.2,
1,
);
}
}
Loading