Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK-6722 - Variant Walker to enable user defined variant analysis #2522

Open
wants to merge 65 commits into
base: release-3.x.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
4b8dad2
storage: Add variant-walker tool #TASK-6722
j-coll Oct 8, 2024
9ea00eb
storage: Add STDERR to exception thrown. Fix max_bytes_per_map. #TASK…
j-coll Oct 10, 2024
7558a26
storage: Add satus details when throwing exceptions. #TASK-6722
j-coll Oct 10, 2024
bc7c6ae
storage: Fix walker output file name #TASK-6722
j-coll Oct 11, 2024
ab4dff5
storage: Properly configure task java heap #TASK-6722
j-coll Oct 15, 2024
7af8020
storage: Run docker image prune on cleanup. #TASK-6722
j-coll Oct 16, 2024
c5375ea
storage: Ensure walker output is sorted. #TASK-6722
j-coll Oct 24, 2024
663c03a
storage: Extract walker STDERR file from MR execution. #TASK-6722
j-coll Oct 25, 2024
154befa
storage: Do not write multiple headers. #TASK-6722
j-coll Oct 25, 2024
85aac6d
storage: Fix NoSuchMethodError creating StopWatch. #TASK-6722
j-coll Oct 25, 2024
697b08b
storage: Ensure stderr file is moved from scratch dir. #TASK-6722
j-coll Oct 25, 2024
356567e
storage: Fix stderr sorting. #TASK-6722
j-coll Oct 25, 2024
6253da3
storage: Write `\n` after the json header #TASK-6722
j-coll Oct 25, 2024
5789628
storage: Do not interrupt header with empty records. #TASK-6722
j-coll Oct 29, 2024
4ff0655
storage: Add a custom Partitioner to ensure sorted data with multiple…
j-coll Oct 29, 2024
8268266
storage: Fix partitioner. #TASK-6722
j-coll Oct 29, 2024
4147d01
storage: Restart process when changing chromosome to ensure correct s…
j-coll Oct 29, 2024
7fd439a
storage: Fix GenomeHellper generateBootPreSplits. #TASK-6722
j-coll Oct 29, 2024
e6128b0
storage: Do not interrupt header with empty lines while concat. #TASK…
j-coll Oct 30, 2024
100fecf
storage: Replace ImmutableBytesWritable with VariantLocusKey as map o…
j-coll Oct 31, 2024
0df69dc
storage: Use VariantLocusKey and VariantLocusPartitioner in VariantEx…
j-coll Oct 31, 2024
f6fd3d4
storage: Fix VariantLocusKey serialization. #TASK-6722
j-coll Nov 1, 2024
fa3c9f2
storage: Fix "Request body si too large" #TASK-6722
j-coll Nov 4, 2024
b528c03
analysis: Do not try to close twice the same ERM. #TASK-6722
j-coll Nov 4, 2024
96e5679
storage: Do not use flush on outputstream. HADOOP-16548 #TASK-6722
j-coll Nov 7, 2024
bcd8185
storage: Add VariantExporterDirectMultipleOutputsMapper to ensure sor…
j-coll Nov 7, 2024
c4c3d3b
storage: Do not use reduce step on variant-walker. #TASK-6722
j-coll Nov 7, 2024
0100097
storage: Fix VariantRecordWriter bytes_written counter. #TASK-6722
j-coll Nov 7, 2024
b52ca27
storage: Reduce number of intermediate mapper files. #TASK-6722
j-coll Nov 8, 2024
ad3521e
storage: Use SNAPPY as intermediate compression algorithm. #TASK-6722
j-coll Nov 8, 2024
ab50d6e
storage: Disable flush on AbfsOutputStream. HADOOP-16548 #TASK-6722
j-coll Nov 11, 2024
212f8ce
storage: Centralize variantMapperJob initialitation. #TASK-6722
j-coll Nov 11, 2024
2a39303
storage: Fix NoClassDefFoundError tephra. #TASK-7194 #TASK-6722
j-coll Nov 12, 2024
ae26598
storage: Fix NPE exporting from sampleindex. #TASK-6722
j-coll Nov 12, 2024
b000ec7
storage: Ensure variant-exports are sorted even from Phoenix. #TASK-6722
j-coll Nov 18, 2024
0a741d5
storage: Use HDFS to store intermediate MapReduce files. Concat local…
j-coll Nov 25, 2024
cd50a3c
storage: Improve MapReduceOutputFile concatMrOutputToLocal. #TASK-6722
j-coll Nov 25, 2024
d430391
storage: Increase mapreduce.task.timeout to 30min #TASK-6722
j-coll Nov 25, 2024
e35ee83
storage: Fix temporary mapreduce outdir. #TASK-6722
j-coll Nov 25, 2024
0c48603
storage: Do not double copy hdfs files #TASK-6722
j-coll Nov 26, 2024
ccf7438
storage: Use reducer to concat binary files #TASK-6722
j-coll Nov 26, 2024
f87686e
storage: Do not fail vairant-walker if no output is produced. #TASK-6722
j-coll Nov 27, 2024
a389e10
storage: Split PhoenixInputSplits into smaller splits. #TASK-6722
j-coll Nov 27, 2024
f453090
storage: Improve log message. #TASK-6722
j-coll Nov 27, 2024
47535c1
storage: Add HadoopVariantWalkerTest. #TASK-6722
j-coll Nov 28, 2024
003e467
storage: Rename some variant-walker params. Add descriptions #TASK-6722
j-coll Nov 28, 2024
48e1592
storage: Fix NPE running SampleVariantStats #TASK-6722
j-coll Nov 28, 2024
1d86756
storage: Fix CustomPhoenixInputFormat generateSplit for first and las…
j-coll Nov 29, 2024
5141031
analysis: Fix NPE at relatedness tool. #TASK-6722
j-coll Nov 29, 2024
c48ce0a
Merge branch 'release-3.x.x' into TASK-6722
j-coll Nov 29, 2024
f2bc782
cicd: Upload tests logs as artifacts. Reduce action log size. #TASK-6722
j-coll Nov 29, 2024
dd684aa
storage: Fix NPE at CohortVariantStatsDriver. #TASK-6722
j-coll Nov 29, 2024
9795c6a
cicd: Fix NPE. #TASK-6722
j-coll Nov 29, 2024
923651c
storage: Fix AIOOBE SampleVariantStatsDriver #TASK-6722
j-coll Nov 29, 2024
90010ac
storage: Do not produce a .crc checksum file copying from hdfs. #TASK…
j-coll Nov 29, 2024
9f326d9
storage: Improve docker process failure. Do not close the stdin twice…
j-coll Nov 29, 2024
627e56a
storage: Fix AIOOBE SampleVariantStatsDriver #TASK-6722
j-coll Nov 29, 2024
98ce6f8
storage: Do not produce a .crc checksum file copying from hdfs. #TASK…
j-coll Nov 29, 2024
14c07d9
analysis: Do not use the scratchDir as intermediate folder for export…
j-coll Nov 29, 2024
050c1ee
storage: Improve collections usage in SampleVariantStatsDriver. #TASK…
j-coll Nov 29, 2024
a0c2a5f
analysis: Fix VariantAnalysisTest. #TASK-6722
j-coll Dec 2, 2024
3853c63
app: Regenerate cli. #TASK-6722
j-coll Dec 3, 2024
eb61609
storage: Fix junit tests. #TASK-6722
j-coll Dec 3, 2024
54acc28
cicd: Increase "Publish Test Report on GitHub" memory #TASK-6722
j-coll Dec 4, 2024
4e96492
core: Fix NumberFormatException from IOUtils. #TASK-6722
j-coll Dec 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions .github/workflows/test-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,28 @@ jobs:
with:
mongodb-version: 6.0
mongodb-replica-set: rs-test
- name: Maven build
- name: Maven build (skip tests)
run: mvn -B clean install -DskipTests -P ${{ inputs.hadoop }} -Dcheckstyle.skip ${{ inputs.mvn_opts }}
- name: Build Junit log file name
id: BuildJunitLogFileName
run: |
MODULE=$(basename ${{ (inputs.module == '' || inputs.module == 'all') && 'opencga' || inputs.module }} )
if [[ -z "$MODULE" ]]; then
MODULE="opencga"
fi
TAGS=$(echo ${{ inputs.test_profile }} | sed -e 's/run\([^,]*\)Tests/\1/g' | tr ',' '_' | tr '[:upper:]' '[:lower:]' )
echo "TESTS_LOG_FILE_NAME=junit_${{ inputs.hadoop }}_${TAGS}_${MODULE}.log" >> $GITHUB_OUTPUT
- name: Run Junit tests
run: mvn -B verify surefire-report:report --fail-never -Dsurefire.testFailureIgnore=true -f ${{ (inputs.module == '' || inputs.module == 'all') && '.' || inputs.module }} -P ${{ inputs.hadoop }},${{ inputs.test_profile }} -Dcheckstyle.skip ${{ inputs.mvn_opts }}
run: mvn -B verify surefire-report:report --fail-never -Dsurefire.testFailureIgnore=true -f ${{ (inputs.module == '' || inputs.module == 'all') && '.' || inputs.module }} -P ${{ inputs.hadoop }},${{ inputs.test_profile }} -Dcheckstyle.skip ${{ inputs.mvn_opts }} |& tee ${{ steps.BuildJunitLogFileName.outputs.TESTS_LOG_FILE_NAME }} |& grep -P '^\[[^\]]*(INFO|WARNING|ERROR)' --colour=never --line-buffered
- name: Upload Junit test logs
uses: actions/upload-artifact@v4
with:
name: ${{ steps.BuildJunitLogFileName.outputs.TESTS_LOG_FILE_NAME }}
path: ${{ steps.BuildJunitLogFileName.outputs.TESTS_LOG_FILE_NAME }}
- name: Publish Test Report on GitHub
uses: scacap/action-surefire-report@v1
env:
NODE_OPTIONS: '--max_old_space_size=4096'
NODE_OPTIONS: '--max_old_space_size=6144'
## Skip cancelled()
## https://docs.github.com/en/actions/learn-github-actions/expressions#cancelled
if: success() || failure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opencb.opencga.analysis.variant.relatedness.RelatednessAnalysis;
import org.opencb.opencga.analysis.wrappers.plink.PlinkWrapperAnalysisExecutor;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.core.config.Analysis;
import org.opencb.opencga.core.exceptions.ToolException;
import org.opencb.opencga.core.models.family.Family;
import org.opencb.opencga.core.models.individual.Individual;
Expand Down Expand Up @@ -117,7 +118,7 @@ public static RelatednessReport compute(String study, Family family, List<String
} catch (IOException e) {
throw new ToolException("Something wrong happened when copying files during the relatedness analysis execution");
}
File outFile = runIBD(FILTERED_BASENAME, freqPath, outDir);
File outFile = runIBD(FILTERED_BASENAME, freqPath, outDir, storageManager.getCatalogManager().getConfiguration().getAnalysis());

if (!outFile.exists()) {
throw new ToolException("Something wrong happened executing relatedness analysis");
Expand Down Expand Up @@ -298,7 +299,7 @@ public static void filterFamilyTpedFile(Path tPedPath, Path tPedFilteredPath, Pa
bw.close();
}

private static File runIBD(String basename, Path freqPath, Path outDir) throws ToolException {
private static File runIBD(String basename, Path freqPath, Path outDir, Analysis analysisConf) throws ToolException {
// Input bindings
List<AbstractMap.SimpleEntry<String, String>> inputBindings = new ArrayList<>();
inputBindings.add(new AbstractMap.SimpleEntry<>(freqPath.getParent().toString(), "/input"));
Expand All @@ -311,8 +312,8 @@ private static File runIBD(String basename, Path freqPath, Path outDir) throws T
String plinkParams = "plink1.9 --tfile /output/" + basename + " --genome rel-check --read-freq /input/" + FREQ_FILENAME
+ " --out /output/" + basename;
try {
PlinkWrapperAnalysisExecutor plinkExecutor = new PlinkWrapperAnalysisExecutor();
DockerUtils.run(plinkExecutor.getDockerImageName(), inputBindings, outputBinding, plinkParams, null);
String dockerImageName = PlinkWrapperAnalysisExecutor.getDockerImageName(analysisConf);
DockerUtils.run(dockerImageName, inputBindings, outputBinding, plinkParams, null);
} catch (IOException e) {
throw new ToolException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,17 +192,11 @@ public final ExecutionResult start() throws ToolException {
if (!erm.isClosed()) {
String message = "Unexpected system shutdown. Job killed by the system.";
privateLogger.error(message);
if (exception == null) {
exception = new RuntimeException(message);
}
try {
if (scratchDir != null) {
deleteScratchDirectory();
}
if (exception == null) {
exception = new RuntimeException(message);
}
logException(exception);
ExecutionResult result = erm.close(exception);
privateLogger.info("------- Tool '" + getId() + "' executed in "
+ TimeUtils.durationToString(result.getEnd().getTime() - result.getStart().getTime()) + " -------");
close(exception);
} catch (ToolException e) {
privateLogger.error("Error closing ExecutionResult", e);
}
Expand Down Expand Up @@ -271,13 +265,25 @@ public final ExecutionResult start() throws ToolException {
}
throw e;
} finally {
// If the shutdown hook has been executed, the ExecutionResultManager is already closed
if (!erm.isClosed()) {
result = close(exception);
} else {
result = erm.read();
}
}
return result;
}

private ExecutionResult close(Throwable exception) throws ToolException {
if (scratchDir != null) {
deleteScratchDirectory();
stopMemoryMonitor();
result = erm.close(exception);
logException(exception);
privateLogger.info("------- Tool '" + getId() + "' executed in "
+ TimeUtils.durationToString(result.getEnd().getTime() - result.getStart().getTime()) + " -------");
}
logException(exception);
stopMemoryMonitor();
ExecutionResult result = erm.close(exception);
privateLogger.info("------- Tool '" + getId() + "' executed in "
+ TimeUtils.durationToString(result.getEnd().getTime() - result.getStart().getTime()) + " -------");
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.opencga.analysis.tools.OpenCgaTool;
import org.opencb.opencga.catalog.io.IOManager;
import org.opencb.opencga.core.common.UriUtils;
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.variant.VariantExportParams;
import org.opencb.opencga.core.tools.annotations.Tool;
import org.opencb.opencga.core.tools.annotations.ToolParams;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryParam;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;

import java.net.URI;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -70,9 +66,8 @@ protected List<String> getSteps() {

@Override
protected void run() throws Exception {
List<URI> uris = new ArrayList<>(2);
step(ID, () -> {
Path outDir = getScratchDir();
Path outDir = getOutDir();
String outputFile = StringUtils.isEmpty(toolParams.getOutputFileName())
? outDir.toString()
: outDir.resolve(toolParams.getOutputFileName()).toString();
Expand All @@ -81,17 +76,9 @@ protected void run() throws Exception {
for (VariantQueryParam param : VariantQueryParam.values()) {
queryOptions.remove(param.key());
}
uris.addAll(variantStorageManager.exportData(outputFile,
variantStorageManager.exportData(outputFile,
outputFormat,
toolParams.getVariantsFile(), query, queryOptions, token));
});
step("move-files", () -> {
IOManager ioManager = catalogManager.getIoManagerFactory().get(uris.get(0));
for (URI uri : uris) {
String fileName = UriUtils.fileName(uri);
logger.info("Moving file -- " + fileName);
ioManager.move(uri, getOutDir().resolve(fileName).toUri());
}
toolParams.getVariantsFile(), query, queryOptions, token);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2015-2020 OpenCB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.opencb.opencga.analysis.variant;

import org.apache.solr.common.StringUtils;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.opencga.analysis.tools.OpenCgaTool;
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.variant.VariantWalkerParams;
import org.opencb.opencga.core.tools.annotations.Tool;
import org.opencb.opencga.core.tools.annotations.ToolParams;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;

@Tool(id = VariantWalkerTool.ID, description = VariantWalkerTool.DESCRIPTION,
scope = Tool.Scope.PROJECT, resource = Enums.Resource.VARIANT)
public class VariantWalkerTool extends OpenCgaTool {
public static final String ID = "variant-walk";
public static final String DESCRIPTION = "Filter and walk variants from the variant storage to produce a file";

@ToolParams
protected VariantWalkerParams toolParams = new VariantWalkerParams();

private VariantWriterFactory.VariantOutputFormat format;

@Override
protected void check() throws Exception {
super.check();

if (StringUtils.isEmpty(toolParams.getInputFormat())) {
toolParams.setInputFormat(VariantWriterFactory.VariantOutputFormat.VCF.toString());
}

format = VariantWriterFactory.toOutputFormat(toolParams.getInputFormat(), toolParams.getOutputFileName());
if (format.isBinary()) {
throw new IllegalArgumentException("Binary format not supported for VariantWalkerTool");
}
if (!format.isPlain()) {
format = format.inPlain();
}

if (StringUtils.isEmpty(toolParams.getOutputFileName())) {
toolParams.setOutputFileName("output.txt.gz");
} else if (!toolParams.getOutputFileName().endsWith(".gz")) {
toolParams.setOutputFileName(toolParams.getOutputFileName() + ".gz");
}
}

@Override
protected List<String> getSteps() {
return Arrays.asList(ID, "move-files");
}

@Override
protected void run() throws Exception {
step(ID, () -> {
Path outDir = getOutDir();
String outputFile = outDir.resolve(toolParams.getOutputFileName()).toString();
Query query = toolParams.toQuery();
QueryOptions queryOptions = new QueryOptions().append(QueryOptions.INCLUDE, toolParams.getInclude())
.append(QueryOptions.EXCLUDE, toolParams.getExclude());
variantStorageManager.walkData(outputFile,
format, query, queryOptions, toolParams.getDockerImage(), toolParams.getCommandLine(), token);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opencb.commons.datastore.solr.SolrManager;
import org.opencb.opencga.analysis.StorageManager;
import org.opencb.opencga.analysis.variant.VariantExportTool;
import org.opencb.opencga.analysis.variant.VariantWalkerTool;
import org.opencb.opencga.analysis.variant.manager.operations.*;
import org.opencb.opencga.analysis.variant.metadata.CatalogStorageMetadataSynchronizer;
import org.opencb.opencga.analysis.variant.metadata.CatalogVariantMetadataFactory;
Expand Down Expand Up @@ -97,6 +98,7 @@

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -187,6 +189,37 @@ public List<URI> exportData(String outputFile, VariantOutputFormat outputFormat,
});
}

/**
* Exports the result of the given query and the associated metadata.
*
* @param outputFile Optional output file. If null or empty, will print into the Standard output. Won't export any metadata.
* @param format Variant Output format.
* @param query Query with the variants to export
* @param queryOptions Query options
* @param dockerImage Docker image to use
* @param commandLine Command line to use
* @param token User's session id
* @throws CatalogException if there is any error with Catalog
* @throws StorageEngineException If there is any error exporting variants
* @return generated files
*/
public List<URI> walkData(String outputFile, VariantOutputFormat format,
Query query, QueryOptions queryOptions, String dockerImage, String commandLine, String token)
throws CatalogException, StorageEngineException {
String anyStudy = catalogUtils.getAnyStudy(query, token);
return secureAnalysis(VariantWalkerTool.ID, anyStudy, queryOptions, token, engine -> {
Query finalQuery = catalogUtils.parseQuery(query, queryOptions, engine.getCellBaseUtils(), token);
checkSamplesPermissions(finalQuery, queryOptions, token);
URI outputUri;
try {
outputUri = UriUtils.createUri(outputFile);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
return engine.walkData(outputUri, format, finalQuery, queryOptions, dockerImage, commandLine);
});
}

// --------------------------//
// Data Operation methods //
// --------------------------//
Expand Down Expand Up @@ -435,8 +468,9 @@ private CatalogStorageMetadataSynchronizer getSynchronizer(VariantStorageEngine
return synchronizer;
}

public DataResult<Trio> familyIndexBySamples(String study, Collection<String> samples, ObjectMap params, String token)
public DataResult<Trio> familyIndexBySamples(String inputStudy, Collection<String> samples, ObjectMap params, String token)
throws CatalogException, StorageEngineException {
String study = getStudyFqn(inputStudy, token);
return secureOperation(VariantFamilyIndexOperationTool.ID, study, params, token, engine -> {
Collection<String> thisSamples = samples;
boolean allSamples;
Expand Down Expand Up @@ -506,6 +540,8 @@ public boolean hasVariantSetup(String studyStr, String token) throws CatalogExce

public ObjectMap configureProject(String projectStr, ObjectMap params, String token) throws CatalogException, StorageEngineException {
return secureOperationByProject("configure", projectStr, params, token, engine -> {
validateNewConfiguration(engine, params, token);

DataStore dataStore = getDataStoreByProjectId(projectStr, token);

dataStore.getOptions().putAll(params);
Expand All @@ -517,6 +553,7 @@ public ObjectMap configureProject(String projectStr, ObjectMap params, String to

public ObjectMap configureStudy(String studyStr, ObjectMap params, String token) throws CatalogException, StorageEngineException {
return secureOperation("configure", studyStr, params, token, engine -> {
validateNewConfiguration(engine, params, token);
Study study = catalogManager.getStudyManager()
.get(studyStr,
new QueryOptions(INCLUDE, StudyDBAdaptor.QueryParams.INTERNAL_CONFIGURATION_VARIANT_ENGINE_OPTIONS.key()),
Expand All @@ -540,6 +577,15 @@ public ObjectMap configureStudy(String studyStr, ObjectMap params, String token)
});
}

private void validateNewConfiguration(VariantStorageEngine engine, ObjectMap params, String token)
throws StorageEngineException, CatalogException {
if (catalogManager.getAuthorizationManager().isOpencgaAdministrator(catalogManager.getUserManager().validateToken(token))) {
logger.info("Skip configuration validation. User is an admin.");
return;
}
engine.validateNewConfiguration(params);
}

/**
* Modify SampleIndex configuration. Automatically submit a job to rebuild the sample index.
*
Expand Down
Loading