From c19f6a9ad87c9dd384fb9bb244575181f9d0b263 Mon Sep 17 00:00:00 2001 From: pfurio Date: Tue, 16 Jul 2024 10:05:26 +0200 Subject: [PATCH 001/106] core: first nextflow implementation, #TASK-6445 --- .../analysis/tools/NextFlowExecutor.java | 300 ++++++++++++++++++ .../opencga/analysis/tools/OpenCgaTool.java | 5 + .../src/main/resources/nextflow.config | 6 + .../analysis/tools/NextFlowExecutorTest.java | 32 ++ .../catalog/db/api/NextFlowDBAdaptor.java | 68 ++++ .../db/mongodb/NextFlowMongoDBAdaptor.java | 259 +++++++++++++++ .../mongodb/converters/NextFlowConverter.java | 10 + .../NextFlowCatalogMongoDBIterator.java | 47 +++ opencga-client/src/main/R/R/Admin-methods.R | 2 +- opencga-client/src/main/R/R/Study-methods.R | 2 +- opencga-client/src/main/javascript/Admin.js | 2 +- opencga-client/src/main/javascript/Study.js | 2 +- .../pyopencga/rest_clients/admin_client.py | 3 +- .../pyopencga/rest_clients/study_client.py | 2 +- .../opencga/core/models/common/Enums.java | 1 + .../core/models/nextflow/NextFlow.java | 54 ++++ .../models/nextflow/NextFlowRunParams.java | 46 +++ .../tools/result/ExecutionResultManager.java | 4 + 18 files changed, 839 insertions(+), 6 deletions(-) create mode 100644 opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/NextFlowExecutor.java create mode 100644 opencga-analysis/src/main/resources/nextflow.config create mode 100644 opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/NextFlowExecutorTest.java create mode 100644 opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/api/NextFlowDBAdaptor.java create mode 100644 opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/NextFlowMongoDBAdaptor.java create mode 100644 opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/NextFlowConverter.java create mode 100644 opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/iterators/NextFlowCatalogMongoDBIterator.java create mode 100644 opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlow.java create mode 100644 opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlowRunParams.java diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/NextFlowExecutor.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/NextFlowExecutor.java new file mode 100644 index 00000000000..c3398c82c3a --- /dev/null +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/NextFlowExecutor.java @@ -0,0 +1,300 @@ +package org.opencb.opencga.analysis.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.time.StopWatch; +import org.opencb.commons.datastore.core.ObjectMap; +import org.opencb.commons.utils.DockerUtils; +import org.opencb.opencga.core.common.JacksonUtils; +import org.opencb.opencga.core.common.TimeUtils; +import org.opencb.opencga.core.exceptions.ToolException; +import org.opencb.opencga.core.models.common.Enums; +import org.opencb.opencga.core.models.nextflow.NextFlowRunParams; +import org.opencb.opencga.core.tools.annotations.Tool; +import org.opencb.opencga.core.tools.annotations.ToolParams; +import org.opencb.opencga.core.tools.result.Status; +import org.opencb.opencga.core.tools.result.ToolStep; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UncheckedIOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.stream.Stream; + +import static org.opencb.opencga.analysis.wrappers.executors.DockerWrapperAnalysisExecutor.DOCKER_INPUT_PATH; +import static org.opencb.opencga.analysis.wrappers.executors.DockerWrapperAnalysisExecutor.DOCKER_OUTPUT_PATH; + +@Tool(id = NextFlowExecutor.ID, resource = Enums.Resource.NEXTFLOW, description = NextFlowExecutor.DESCRIPTION) +public class NextFlowExecutor extends OpenCgaTool { + + public final static String ID = "nextflow-run"; + public static final String DESCRIPTION = "Execute a Nextflow analysis."; + + public final static String DOCKER_IMAGE = "nextflow/nextflow"; + + @ToolParams + protected NextFlowRunParams toolParams = new NextFlowRunParams(); + + private String script; + + private final static Logger logger = LoggerFactory.getLogger(NextFlowExecutor.class); + + @Override + protected void check() throws Exception { + super.check(); + + if (toolParams.getId() == null) { + throw new IllegalArgumentException("Missing Nextflow ID"); + } + + // TODO: Change and look for Nextflow script + this.script = "params.str = 'Hello world!'\n" + + "\n" + + "process splitLetters {\n" + + " output:\n" + + " path 'chunk_*'\n" + + "\n" + + " \"\"\"\n" + + " printf '${params.str}' | split -b 6 - chunk_\n" + + " \"\"\"\n" + + "}\n" + + "\n" + + "process convertToUpper {\n" + + " input:\n" + + " path x\n" + + "\n" + + " output:\n" + + " stdout\n" + + "\n" + + " \"\"\"\n" + + " cat $x | tr '[a-z]' '[A-Z]'\n" + + " \"\"\"\n" + + "}\n" + + "\n" + + "process sleep {\n" + + " input:\n" + + " val x\n" + + "\n" + + " \"\"\"\n" + + " sleep 1\n" + + " \"\"\"\n" + + "}\n" + + "\n" + + "workflow {\n" + + " splitLetters | flatten | convertToUpper | view { it.trim() } | sleep\n" + + "}"; + } + + @Override + protected void run() throws Exception { + // Write main script file + Files.write(getOutDir().resolve("pipeline.nf"), script.getBytes()); + + // Write nextflow.config file + URL nextflowConfig = getClass().getResource("/nextflow.config"); + if (nextflowConfig != null) { + Files.copy(nextflowConfig.openStream(), getOutDir().resolve("nextflow.config")); + } else { + throw new RuntimeException("Can't fetch nextflow.config file"); + } + + // Execute docker image + String workingDirectory = getOutDir().toAbsolutePath().toString(); + List> inputBindings = new ArrayList<>(); + inputBindings.add(new AbstractMap.SimpleEntry<>(workingDirectory, DOCKER_INPUT_PATH)); + AbstractMap.SimpleEntry outputBinding = new AbstractMap.SimpleEntry<>(workingDirectory, DOCKER_OUTPUT_PATH); + + // TODO: Copy nextflow.config and pipeline.nf to DOCKER_INPUT_PATH + + StringBuilder stringBuilder = new StringBuilder() + .append("nextflow run ").append(DOCKER_OUTPUT_PATH).append("/pipeline.nf") + .append(" --work-dir ").append(DOCKER_OUTPUT_PATH) + .append(" -with-report ").append(DOCKER_OUTPUT_PATH).append("/report.html"); + + StopWatch stopWatch = StopWatch.createStarted(); + Map dockerParams = new HashMap<>(); + dockerParams.put("user", "root"); + String cmdline = DockerUtils.run(DOCKER_IMAGE, inputBindings, outputBinding, stringBuilder.toString(), dockerParams); + logger.info("Docker command line: " + cmdline); + logger.info("Execution time: " + TimeUtils.durationToString(stopWatch)); + + // Delete input files + Files.delete(getOutDir().resolve("pipeline.nf")); + Files.delete(getOutDir().resolve("nextflow.config")); + + processTraceFile(); + } + + private void processTraceFile() { + List steps = new LinkedList<>(); + // Read tabular file + Path traceFile = getOutDir().resolve("trace.txt"); + if (Files.exists(traceFile)) { + try (Stream lines = Files.lines(traceFile)) { + // Read line one by one + lines.forEach(line -> { + if (line.startsWith("task_id")) { + return; + } + Trace trace = new Trace(line); + ToolStep toolStep = trace.toToolStep(); + steps.add(toolStep); + System.out.println(trace); + }); + } catch (Exception e) { + logger.error("Error reading trace file: " + traceFile, e); + } + } + + try { + setManualSteps(steps); + } catch (ToolException e) { + throw new RuntimeException(e); + } + } + + private static class Trace { + private String taskId; + private String hash; + private String name; + private String status; + private String start; + private String complete; + private String cpu; + private String peak_vmem; + + public Trace() { + } + + public Trace(String traceLine) { + String[] split = traceLine.split("\t"); + this.taskId = split[0]; + this.hash = split[1]; + this.name = split[2]; + this.status = split[3]; + this.start = split[4]; + this.complete = split[5]; + this.cpu = split[6]; + this.peak_vmem = split[7]; + } + + public Trace(String taskId, String hash, String name, String status, String start, String complete, String cpu, String peak_vmem) { + this.taskId = taskId; + this.hash = hash; + this.name = name; + this.status = status; + this.start = start; + this.complete = complete; + this.cpu = cpu; + this.peak_vmem = peak_vmem; + } + + public ToolStep toToolStep() { + Date startDate = toDate(start); + Date completeDate = toDate(complete); + return new ToolStep(taskId, startDate, completeDate, + status.equals("COMPLETED") ? Status.Type.DONE : Status.Type.ERROR, toObjectMap()); + } + + public String getTaskId() { + return taskId; + } + + public Trace setTaskId(String taskId) { + this.taskId = taskId; + return this; + } + + public String getHash() { + return hash; + } + + public Trace setHash(String hash) { + this.hash = hash; + return this; + } + + public String getName() { + return name; + } + + public Trace setName(String name) { + this.name = name; + return this; + } + + public String getStatus() { + return status; + } + + public Trace setStatus(String status) { + this.status = status; + return this; + } + + public String getStart() { + return start; + } + + public Trace setStart(String start) { + this.start = start; + return this; + } + + public String getComplete() { + return complete; + } + + public Trace setComplete(String complete) { + this.complete = complete; + return this; + } + + public String getCpu() { + return cpu; + } + + public Trace setCpu(String cpu) { + this.cpu = cpu; + return this; + } + + public String getPeak_vmem() { + return peak_vmem; + } + + public Trace setPeak_vmem(String peak_vmem) { + this.peak_vmem = peak_vmem; + return this; + } + + public ObjectMap toObjectMap() { + ObjectMapper objectMapper = JacksonUtils.getDefaultObjectMapper(); + try { + return new ObjectMap(objectMapper.writeValueAsString(this)); + } catch (JsonProcessingException e) { + throw new UncheckedIOException(e); + } + } + + } + + static Date toDate(String dateStr) { + String format = "yyyy-MM-dd HH:mm:ss.SSS"; + SimpleDateFormat sdf = new SimpleDateFormat(format); + + Date date = null; + try { + date = sdf.parse(dateStr); + } catch (ParseException e) { + logger.warn(e.getMessage()); + } + return date; + } + +} diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/OpenCgaTool.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/OpenCgaTool.java index 5a88635e45c..75720aee064 100644 --- a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/OpenCgaTool.java +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/OpenCgaTool.java @@ -43,6 +43,7 @@ import org.opencb.opencga.core.tools.result.ExecutionResult; import org.opencb.opencga.core.tools.result.ExecutionResultManager; import org.opencb.opencga.core.tools.result.ExecutorInfo; +import org.opencb.opencga.core.tools.result.ToolStep; import org.opencb.opencga.storage.core.StorageEngineFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -482,6 +483,10 @@ protected final void step(String stepId, StepRunnable step) throws ToolException } } + protected final void setManualSteps(List steps) throws ToolException { + erm.setManualSteps(steps); + } + protected final boolean checkStep(String stepId) throws ToolException { return erm.checkStep(stepId); } diff --git a/opencga-analysis/src/main/resources/nextflow.config b/opencga-analysis/src/main/resources/nextflow.config new file mode 100644 index 00000000000..fd91b7d3811 --- /dev/null +++ b/opencga-analysis/src/main/resources/nextflow.config @@ -0,0 +1,6 @@ +trace { + enabled = true + file = '/data/output/trace.txt' + overwrite = true + fields = 'task_id,hash,name,status,start,complete,%cpu,peak_vmem' +} \ No newline at end of file diff --git a/opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/NextFlowExecutorTest.java b/opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/NextFlowExecutorTest.java new file mode 100644 index 00000000000..bcc66443b93 --- /dev/null +++ b/opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/NextFlowExecutorTest.java @@ -0,0 +1,32 @@ +package org.opencb.opencga.analysis.tools; + +import org.apache.commons.lang3.time.StopWatch; +import org.junit.ClassRule; +import org.junit.Test; +import org.opencb.opencga.analysis.variant.OpenCGATestExternalResource; +import org.opencb.opencga.core.exceptions.ToolException; +import org.opencb.opencga.core.models.nextflow.NextFlowRunParams; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; + +public class NextFlowExecutorTest { + + @ClassRule + public static OpenCGATestExternalResource opencga = new OpenCGATestExternalResource(false); + + @Test + public void myTest() throws ToolException, IOException { + Path outDir = Paths.get(opencga.createTmpOutdir("_nextflow")); + + StopWatch stopWatch = StopWatch.createStarted(); + NextFlowExecutor nextFlowExecutorTest = new NextFlowExecutor(); + NextFlowRunParams runParams = new NextFlowRunParams("myId", 2); + nextFlowExecutorTest.setUp(opencga.getOpencgaHome().toString(), runParams.toObjectMap(), outDir, opencga.getAdminToken()); + nextFlowExecutorTest.start(); + System.out.println(stopWatch.getTime(TimeUnit.MILLISECONDS)); + } + +} diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/api/NextFlowDBAdaptor.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/api/NextFlowDBAdaptor.java new file mode 100644 index 00000000000..d3ee6a73bfc --- /dev/null +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/api/NextFlowDBAdaptor.java @@ -0,0 +1,68 @@ +package org.opencb.opencga.catalog.db.api; + +import org.apache.commons.collections4.map.LinkedMap; +import org.opencb.commons.datastore.core.QueryParam; +import org.opencb.opencga.core.models.nextflow.NextFlow; + +import java.util.Map; + +import static org.opencb.commons.datastore.core.QueryParam.Type.*; + +public interface NextFlowDBAdaptor extends CoreDBAdaptor { + + enum QueryParams implements QueryParam { + ID("id", TEXT, ""), + UID("uid", LONG, ""), + UUID("uuid", TEXT, ""), + RELEASE("release", INTEGER, ""), // Release where the sample was created + SNAPSHOT("snapshot", INTEGER, ""), // Last version of sample at release = snapshot + VERSION("version", INTEGER, ""), // Version of the sample + CREATION_DATE("creationDate", DATE, ""), + MODIFICATION_DATE("modificationDate", DATE, ""), + STUDY_UID("studyUid", INTEGER_ARRAY, ""), + STUDY("study", INTEGER_ARRAY, ""); // Alias to studyId in the database. Only for the webservices. + + private static Map map; + + static { + map = new LinkedMap(); + for (QueryParams params : QueryParams.values()) { + map.put(params.key(), params); + } + } + + private final String key; + private Type type; + private String description; + + QueryParams(String key, Type type, String description) { + this.key = key; + this.type = type; + this.description = description; + } + + public static Map getMap() { + return map; + } + + public static QueryParams getParam(String key) { + return map.get(key); + } + + @Override + public String key() { + return key; + } + + @Override + public Type type() { + return type; + } + + @Override + public String description() { + return description; + } + } + +} diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/NextFlowMongoDBAdaptor.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/NextFlowMongoDBAdaptor.java new file mode 100644 index 00000000000..401f0f28872 --- /dev/null +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/NextFlowMongoDBAdaptor.java @@ -0,0 +1,259 @@ +package org.opencb.opencga.catalog.db.mongodb; + +import com.mongodb.client.ClientSession; +import org.apache.commons.lang3.time.StopWatch; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.opencb.commons.datastore.core.ObjectMap; +import org.opencb.commons.datastore.core.Query; +import org.opencb.commons.datastore.core.QueryOptions; +import org.opencb.commons.datastore.mongodb.MongoDBCollection; +import org.opencb.commons.datastore.mongodb.MongoDBIterator; +import org.opencb.opencga.catalog.db.api.DBIterator; +import org.opencb.opencga.catalog.db.api.NextFlowDBAdaptor; +import org.opencb.opencga.catalog.db.mongodb.converters.NextFlowConverter; +import org.opencb.opencga.catalog.db.mongodb.iterators.NextFlowCatalogMongoDBIterator; +import org.opencb.opencga.catalog.exceptions.CatalogAuthorizationException; +import org.opencb.opencga.catalog.exceptions.CatalogDBException; +import org.opencb.opencga.catalog.exceptions.CatalogParameterException; +import org.opencb.opencga.core.config.Configuration; +import org.opencb.opencga.core.models.nextflow.NextFlow; +import org.opencb.opencga.core.response.OpenCGAResult; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.opencb.opencga.catalog.db.mongodb.MongoDBUtils.fixAclProjection; + +public class NextFlowMongoDBAdaptor extends CatalogMongoDBAdaptor implements NextFlowDBAdaptor { + + private final MongoDBCollection nextflowCollection; + private final MongoDBCollection archiveNextflowCollection; + private final MongoDBCollection deleteNextflowCollection; + private final SnapshotVersionedMongoDBAdaptor versionedMongoDBAdaptor; + private final NextFlowConverter nextflowConverter; + + public NextFlowMongoDBAdaptor(MongoDBCollection nextflowCollection, MongoDBCollection archiveNextflowCollection, + MongoDBCollection deleteNextflowCollection, Configuration configuration, + OrganizationMongoDBAdaptorFactory dbAdaptorFactory) { + super(configuration, LoggerFactory.getLogger(NextFlowMongoDBAdaptor.class)); + this.dbAdaptorFactory = dbAdaptorFactory; + this.nextflowCollection = nextflowCollection; + this.archiveNextflowCollection = archiveNextflowCollection; + this.deleteNextflowCollection = deleteNextflowCollection; + this.versionedMongoDBAdaptor = new SnapshotVersionedMongoDBAdaptor(nextflowCollection, archiveNextflowCollection, + deleteNextflowCollection); + this.nextflowConverter = new NextFlowConverter(); + } + + @Override + public OpenCGAResult get(long studyUid, Query query, QueryOptions options, String user) + throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { + long startTime = startQuery(); + try (DBIterator dbIterator = iterator(studyUid, query, options, user)) { + return endQuery(startTime, dbIterator); + } + } + + @Override + public OpenCGAResult get(Query query, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + long startTime = startQuery(); + try (DBIterator dbIterator = iterator(query, options)) { + return endQuery(startTime, dbIterator); + } + } + + @Override + public OpenCGAResult nativeGet(long studyUid, Query query, QueryOptions options, String user) + throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { + long startTime = startQuery(); + try (DBIterator dbIterator = nativeIterator(studyUid, query, options, user)) { + return endQuery(startTime, dbIterator); + } + } + + @Override + OpenCGAResult nativeGet(Query query, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + long startTime = startQuery(); + try (DBIterator dbIterator = nativeIterator(query, options)) { + return endQuery(startTime, dbIterator); + } + } + + @Override + public DBIterator iterator(long studyUid, Query query, QueryOptions options, String user) + throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { + query.put(PRIVATE_STUDY_UID, studyUid); + MongoDBIterator mongoCursor = getMongoCursor(null, query, options, user); + return new NextFlowCatalogMongoDBIterator<>(mongoCursor, null, nextflowConverter, null, studyUid, user, options); + } + + @Override + public DBIterator iterator(Query query, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + MongoDBIterator mongoCursor = getMongoCursor(null, query, options, null); + return new NextFlowCatalogMongoDBIterator<>(mongoCursor, null, nextflowConverter, null, options); + } + + @Override + public DBIterator nativeIterator(Query query, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + QueryOptions queryOptions = options != null ? new QueryOptions(options) : new QueryOptions(); + queryOptions.put(NATIVE_QUERY, true); + MongoDBIterator mongoCursor = getMongoCursor(null, query, queryOptions, null); + return new NextFlowCatalogMongoDBIterator(mongoCursor, null, null, null, options); + } + + @Override + public DBIterator nativeIterator(long studyUid, Query query, QueryOptions options, String user) + throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { + QueryOptions queryOptions = options != null ? new QueryOptions(options) : new QueryOptions(); + queryOptions.put(NATIVE_QUERY, true); + + query.put(PRIVATE_STUDY_UID, studyUid); + MongoDBIterator mongoCursor = getMongoCursor(null, query, queryOptions, user); + return new NextFlowCatalogMongoDBIterator<>(mongoCursor, null, null, null, studyUid, user, options); + } + + @Override + public OpenCGAResult count(Query query, String user) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Bson bson = parseQuery(query, user); + return new OpenCGAResult<>(nextflowCollection.count(null, bson)); + } + + @Override + public OpenCGAResult count(Query query) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Bson bson = parseQuery(query); + return new OpenCGAResult<>(nextflowCollection.count(null, bson)); + } + + @Override + public OpenCGAResult groupBy(Query query, List fields, QueryOptions options, String user) + throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { + return null; + } + + @Override + public OpenCGAResult groupBy(Query query, String field, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Bson bsonQuery = parseQuery(query); + return groupBy(nextflowCollection, bsonQuery, field, NextFlowDBAdaptor.QueryParams.ID.key(), options); + } + + @Override + public OpenCGAResult groupBy(Query query, List fields, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Bson bsonQuery = parseQuery(query); + return groupBy(nextflowCollection, bsonQuery, fields, NextFlowDBAdaptor.QueryParams.ID.key(), options); + } + + @Override + public OpenCGAResult distinct(long studyUid, String field, Query query, String userId) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Query finalQuery = query != null ? new Query(query) : new Query(); + finalQuery.put(NextFlowDBAdaptor.QueryParams.STUDY_UID.key(), studyUid); + Bson bson = parseQuery(finalQuery, userId); + return new OpenCGAResult<>(nextflowCollection.distinct(field, bson)); + } + + @Override + public OpenCGAResult distinct(long studyUid, List fields, Query query, String userId) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + StopWatch stopWatch = StopWatch.createStarted(); + Query finalQuery = query != null ? new Query(query) : new Query(); + finalQuery.put(NextFlowDBAdaptor.QueryParams.STUDY_UID.key(), studyUid); + Bson bson = parseQuery(finalQuery, userId); + + Set results = new LinkedHashSet<>(); + for (String field : fields) { + results.addAll(nextflowCollection.distinct(field, bson, String.class).getResults()); + } + + return new OpenCGAResult<>((int) stopWatch.getTime(TimeUnit.MILLISECONDS), Collections.emptyList(), results.size(), + new ArrayList<>(results), -1); + } + + @Override + public OpenCGAResult stats(Query query) { + return null; + } + + @Override + public OpenCGAResult update(long id, ObjectMap parameters, QueryOptions queryOptions) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return null; + } + + @Override + public OpenCGAResult update(Query query, ObjectMap parameters, QueryOptions queryOptions) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return null; + } + + @Override + public OpenCGAResult delete(NextFlow id) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return null; + } + + @Override + public OpenCGAResult delete(Query query) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return null; + } + + @Override + public OpenCGAResult restore(long id, QueryOptions queryOptions) throws CatalogDBException { + return null; + } + + @Override + public OpenCGAResult restore(Query query, QueryOptions queryOptions) throws CatalogDBException { + return null; + } + + @Override + public OpenCGAResult rank(Query query, String field, int numResults, boolean asc) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return null; + } + + @Override + public void forEach(Query query, Consumer action, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + + } + + + private MongoDBIterator getMongoCursor(ClientSession clientSession, Query query, QueryOptions options, String user) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Query finalQuery = new Query(query); + + QueryOptions qOptions; + if (options != null) { + qOptions = new QueryOptions(options); + } else { + qOptions = new QueryOptions(); + } + fixAclProjection(qOptions); + + Bson bson = parseQuery(finalQuery, user); + MongoDBCollection collection = getQueryCollection(finalQuery, nextflowCollection, archiveNextflowCollection, + deleteNextflowCollection); + logger.debug("Nextflow query: {}", bson.toBsonDocument()); + return collection.iterator(clientSession, bson, null, null, qOptions); + } + + + private Bson parseQuery(Query query) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return parseQuery(query, null); + } + + private Bson parseQuery(Query query, String user) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return null; + } +} diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/NextFlowConverter.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/NextFlowConverter.java new file mode 100644 index 00000000000..6abbee7e1fd --- /dev/null +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/NextFlowConverter.java @@ -0,0 +1,10 @@ +package org.opencb.opencga.catalog.db.mongodb.converters; + +import org.opencb.opencga.core.models.nextflow.NextFlow; + +public class NextFlowConverter extends OpenCgaMongoConverter { + + public NextFlowConverter() { + super(NextFlow.class); + } +} diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/iterators/NextFlowCatalogMongoDBIterator.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/iterators/NextFlowCatalogMongoDBIterator.java new file mode 100644 index 00000000000..05e279f4965 --- /dev/null +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/iterators/NextFlowCatalogMongoDBIterator.java @@ -0,0 +1,47 @@ +package org.opencb.opencga.catalog.db.mongodb.iterators; + +import com.mongodb.client.ClientSession; +import org.bson.Document; +import org.opencb.commons.datastore.core.QueryOptions; +import org.opencb.commons.datastore.mongodb.GenericDocumentComplexConverter; +import org.opencb.commons.datastore.mongodb.MongoDBIterator; +import org.opencb.opencga.catalog.db.mongodb.OrganizationMongoDBAdaptorFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.Queue; + +public class NextFlowCatalogMongoDBIterator extends CatalogMongoDBIterator { + + private long studyUid; + private String user; + + private QueryOptions options; + private Queue nextflowListBuffer; + + private Logger logger; + + private static final int BUFFER_SIZE = 100; + private static final String UID_VERSION_SEP = "___"; + + public NextFlowCatalogMongoDBIterator(MongoDBIterator mongoCursor, ClientSession clientSession, + GenericDocumentComplexConverter converter, + OrganizationMongoDBAdaptorFactory dbAdaptorFactory, QueryOptions options) { + this(mongoCursor, clientSession, converter, dbAdaptorFactory, 0, null, options); + } + + public NextFlowCatalogMongoDBIterator(MongoDBIterator mongoCursor, ClientSession clientSession, + GenericDocumentComplexConverter converter, OrganizationMongoDBAdaptorFactory dbAdaptorFactory, + long studyUid, String user, QueryOptions options) { + super(mongoCursor, clientSession, converter, null); + + this.user = user; + this.studyUid = studyUid; + + this.options = options; + this.nextflowListBuffer = new LinkedList<>(); + this.logger = LoggerFactory.getLogger(NextFlowCatalogMongoDBIterator.class); + } + +} diff --git a/opencga-client/src/main/R/R/Admin-methods.R b/opencga-client/src/main/R/R/Admin-methods.R index d63a1ad3478..5bfb1b049c2 100644 --- a/opencga-client/src/main/R/R/Admin-methods.R +++ b/opencga-client/src/main/R/R/Admin-methods.R @@ -43,7 +43,7 @@ setMethod("adminClient", "OpencgaR", function(OpencgaR, user, endpointName, para #' @param count Count the number of elements matching the group. #' @param limit Maximum number of documents (groups) to be returned. #' @param fields Comma separated list of fields by which to group by. - #' @param entity Entity to be grouped by. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA FUNCTIONAL'] + #' @param entity Entity to be grouped by. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL'] #' @param action Action performed. #' @param before Object before update. #' @param after Object after update. diff --git a/opencga-client/src/main/R/R/Study-methods.R b/opencga-client/src/main/R/R/Study-methods.R index ddbc75ca374..ba022ea666e 100644 --- a/opencga-client/src/main/R/R/Study-methods.R +++ b/opencga-client/src/main/R/R/Study-methods.R @@ -118,7 +118,7 @@ setMethod("studyClient", "OpencgaR", function(OpencgaR, group, id, members, stud #' @param operationId Audit operation UUID. #' @param userId User ID. #' @param action Action performed by the user. - #' @param resource Resource involved. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA FUNCTIONAL'] + #' @param resource Resource involved. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL'] #' @param resourceId Resource ID. #' @param resourceUuid resource UUID. #' @param status Filter by status. Allowed values: ['SUCCESS ERROR'] diff --git a/opencga-client/src/main/javascript/Admin.js b/opencga-client/src/main/javascript/Admin.js index 3f6204d4165..5538e904dfc 100644 --- a/opencga-client/src/main/javascript/Admin.js +++ b/opencga-client/src/main/javascript/Admin.js @@ -37,7 +37,7 @@ export default class Admin extends OpenCGAParentClass { /** Group by operation * @param {String} fields - Comma separated list of fields by which to group by. * @param {"AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS - * INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA FUNCTIONAL"} entity - Entity to be grouped by. + * INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL"} entity - Entity to be grouped by. * @param {Object} [params] - The Object containing the following optional parameters: * @param {Boolean} [params.count] - Count the number of elements matching the group. * @param {Number} [params.limit = "50"] - Maximum number of documents (groups) to be returned. The default value is 50. diff --git a/opencga-client/src/main/javascript/Study.js b/opencga-client/src/main/javascript/Study.js index 9e013fef0dc..2c2b6abf793 100644 --- a/opencga-client/src/main/javascript/Study.js +++ b/opencga-client/src/main/javascript/Study.js @@ -119,7 +119,7 @@ export default class Study extends OpenCGAParentClass { * @param {String} [params.userId] - User ID. * @param {String} [params.action] - Action performed by the user. * @param {"AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS - * INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA FUNCTIONAL"} [params.resource] - Resource involved. + * INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL"} [params.resource] - Resource involved. * @param {String} [params.resourceId] - Resource ID. * @param {String} [params.resourceUuid] - resource UUID. * @param {"SUCCESS ERROR"} [params.status] - Filter by status. diff --git a/opencga-client/src/main/python/pyopencga/rest_clients/admin_client.py b/opencga-client/src/main/python/pyopencga/rest_clients/admin_client.py index 7311fa7e5dd..9702e5d6463 100644 --- a/opencga-client/src/main/python/pyopencga/rest_clients/admin_client.py +++ b/opencga-client/src/main/python/pyopencga/rest_clients/admin_client.py @@ -27,7 +27,8 @@ def group_by_audit(self, fields, entity, **options): :param str entity: Entity to be grouped by. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION - VARIANT ALIGNMENT CLINICAL EXPRESSION RGA FUNCTIONAL'] (REQUIRED) + VARIANT ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL'] + (REQUIRED) :param str fields: Comma separated list of fields by which to group by. (REQUIRED) :param bool count: Count the number of elements matching the group. diff --git a/opencga-client/src/main/python/pyopencga/rest_clients/study_client.py b/opencga-client/src/main/python/pyopencga/rest_clients/study_client.py index 93e70823d52..639f20c6b53 100644 --- a/opencga-client/src/main/python/pyopencga/rest_clients/study_client.py +++ b/opencga-client/src/main/python/pyopencga/rest_clients/study_client.py @@ -139,7 +139,7 @@ def search_audit(self, study, **options): :param str resource: Resource involved. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION VARIANT - ALIGNMENT CLINICAL EXPRESSION RGA FUNCTIONAL'] + ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL'] :param str resource_id: Resource ID. :param str resource_uuid: resource UUID. :param str status: Filter by status. Allowed values: ['SUCCESS ERROR'] diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/models/common/Enums.java b/opencga-core/src/main/java/org/opencb/opencga/core/models/common/Enums.java index 2777222715b..04981030ebc 100644 --- a/opencga-core/src/main/java/org/opencb/opencga/core/models/common/Enums.java +++ b/opencga-core/src/main/java/org/opencb/opencga/core/models/common/Enums.java @@ -79,6 +79,7 @@ public enum Resource { CLINICAL, EXPRESSION, RGA, + NEXTFLOW, FUNCTIONAL; public List getFullPermissionList() { diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlow.java b/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlow.java new file mode 100644 index 00000000000..af4a527ef1d --- /dev/null +++ b/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlow.java @@ -0,0 +1,54 @@ +package org.opencb.opencga.core.models.nextflow; + +public class NextFlow { + + private String id; + private int version; + private String script; + + public NextFlow() { + } + + public NextFlow(String id, int version, String script) { + this.id = id; + this.version = version; + this.script = script; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("NextFlow{"); + sb.append("id='").append(id).append('\''); + sb.append(", version=").append(version); + sb.append(", script='").append(script).append('\''); + sb.append('}'); + return sb.toString(); + } + + public String getId() { + return id; + } + + public NextFlow setId(String id) { + this.id = id; + return this; + } + + public int getVersion() { + return version; + } + + public NextFlow setVersion(int version) { + this.version = version; + return this; + } + + public String getScript() { + return script; + } + + public NextFlow setScript(String script) { + this.script = script; + return this; + } +} diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlowRunParams.java b/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlowRunParams.java new file mode 100644 index 00000000000..062863b3916 --- /dev/null +++ b/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlowRunParams.java @@ -0,0 +1,46 @@ +package org.opencb.opencga.core.models.nextflow; + +import org.opencb.opencga.core.tools.ToolParams; + +public class NextFlowRunParams extends ToolParams { + + public static final String DESCRIPTION = "NextFlow run parameters"; + + private String id; + private Integer version; + + public NextFlowRunParams() { + } + + public NextFlowRunParams(String id, Integer version) { + this.id = id; + this.version = version; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("NextFlowRunParams{"); + sb.append("id='").append(id).append('\''); + sb.append(", version=").append(version); + sb.append('}'); + return sb.toString(); + } + + public String getId() { + return id; + } + + public NextFlowRunParams setId(String id) { + this.id = id; + return this; + } + + public Integer getVersion() { + return version; + } + + public NextFlowRunParams setVersion(Integer version) { + this.version = version; + return this; + } +} diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/tools/result/ExecutionResultManager.java b/opencga-core/src/main/java/org/opencb/opencga/core/tools/result/ExecutionResultManager.java index b1e716bd41f..aa84dcd3631 100644 --- a/opencga-core/src/main/java/org/opencb/opencga/core/tools/result/ExecutionResultManager.java +++ b/opencga-core/src/main/java/org/opencb/opencga/core/tools/result/ExecutionResultManager.java @@ -254,6 +254,10 @@ public void addStepAttribute(String key, Object value) throws ToolException { }); } + public void setManualSteps(List steps) throws ToolException { + updateResult(result -> result.setSteps(steps)); + } + public void errorStep() throws ToolException { updateResult(result -> getStep(result, result.getStatus().getStep()) .setStatus(Status.Type.ERROR).setEnd(now())); From 621bef21eb2b4482d3ca04545c0f2d8ac9ec7b0d Mon Sep 17 00:00:00 2001 From: pfurio Date: Thu, 18 Jul 2024 14:30:10 +0200 Subject: [PATCH 002/106] catalog: implement workflow manager and dbadaptor, #TASK-6445 --- .../analysis/tools/NextFlowExecutor.java | 166 +++++--- .../src/main/resources/nextflow.config | 2 +- .../analysis/tools/NextFlowExecutorTest.java | 32 -- .../analysis/tools/WorkflowExecutorTest.java | 86 ++++ .../variant/OpenCGATestExternalResource.java | 21 +- .../opencga/catalog/db/DBAdaptorFactory.java | 2 + ...wDBAdaptor.java => WorkflowDBAdaptor.java} | 15 +- .../db/mongodb/MongoDBAdaptorFactory.java | 5 + .../db/mongodb/NextFlowMongoDBAdaptor.java | 259 ------------ .../OrganizationMongoDBAdaptorFactory.java | 19 + .../SnapshotVersionedMongoDBAdaptor.java | 3 + .../db/mongodb/WorkflowMongoDBAdaptor.java | 386 ++++++++++++++++++ .../mongodb/converters/NextFlowConverter.java | 10 - .../mongodb/converters/WorkflowConverter.java | 10 + ...va => WorkflowCatalogMongoDBIterator.java} | 12 +- .../catalog/managers/AbstractManager.java | 13 +- .../catalog/managers/CatalogManager.java | 7 + .../catalog/managers/WorkflowManager.java | 220 ++++++++++ .../opencga/catalog/utils/UuidUtils.java | 3 +- .../CatalogManagerExternalResource.java | 21 + opencga-client/src/main/R/R/Admin-methods.R | 2 +- opencga-client/src/main/R/R/Study-methods.R | 2 +- opencga-client/src/main/javascript/Admin.js | 2 +- opencga-client/src/main/javascript/Study.js | 2 +- .../pyopencga/rest_clients/admin_client.py | 2 +- .../pyopencga/rest_clients/study_client.py | 2 +- .../opencga/core/models/common/Enums.java | 2 +- .../core/models/nextflow/NextFlow.java | 54 --- .../core/models/nextflow/Workflow.java | 165 ++++++++ 29 files changed, 1062 insertions(+), 463 deletions(-) delete mode 100644 opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/NextFlowExecutorTest.java create mode 100644 opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/WorkflowExecutorTest.java rename opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/api/{NextFlowDBAdaptor.java => WorkflowDBAdaptor.java} (71%) delete mode 100644 opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/NextFlowMongoDBAdaptor.java create mode 100644 opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/WorkflowMongoDBAdaptor.java delete mode 100644 opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/NextFlowConverter.java create mode 100644 opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/WorkflowConverter.java rename opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/iterators/{NextFlowCatalogMongoDBIterator.java => WorkflowCatalogMongoDBIterator.java} (75%) create mode 100644 opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/WorkflowManager.java delete mode 100644 opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlow.java create mode 100644 opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/Workflow.java diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/NextFlowExecutor.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/NextFlowExecutor.java index c3398c82c3a..667356af04e 100644 --- a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/NextFlowExecutor.java +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/tools/NextFlowExecutor.java @@ -2,14 +2,18 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.StopWatch; import org.opencb.commons.datastore.core.ObjectMap; -import org.opencb.commons.utils.DockerUtils; +import org.opencb.commons.datastore.core.QueryOptions; import org.opencb.opencga.core.common.JacksonUtils; import org.opencb.opencga.core.common.TimeUtils; import org.opencb.opencga.core.exceptions.ToolException; import org.opencb.opencga.core.models.common.Enums; import org.opencb.opencga.core.models.nextflow.NextFlowRunParams; +import org.opencb.opencga.core.models.nextflow.Workflow; +import org.opencb.opencga.core.response.OpenCGAResult; import org.opencb.opencga.core.tools.annotations.Tool; import org.opencb.opencga.core.tools.annotations.ToolParams; import org.opencb.opencga.core.tools.result.Status; @@ -17,30 +21,34 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; import java.io.UncheckedIOException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.Arrays; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; import java.util.stream.Stream; -import static org.opencb.opencga.analysis.wrappers.executors.DockerWrapperAnalysisExecutor.DOCKER_INPUT_PATH; -import static org.opencb.opencga.analysis.wrappers.executors.DockerWrapperAnalysisExecutor.DOCKER_OUTPUT_PATH; - -@Tool(id = NextFlowExecutor.ID, resource = Enums.Resource.NEXTFLOW, description = NextFlowExecutor.DESCRIPTION) +@Tool(id = NextFlowExecutor.ID, resource = Enums.Resource.WORKFLOW, description = NextFlowExecutor.DESCRIPTION) public class NextFlowExecutor extends OpenCgaTool { public final static String ID = "nextflow-run"; public static final String DESCRIPTION = "Execute a Nextflow analysis."; - public final static String DOCKER_IMAGE = "nextflow/nextflow"; - @ToolParams protected NextFlowRunParams toolParams = new NextFlowRunParams(); - private String script; + private Workflow workflow; + + private Thread thread; + private final int monitorThreadPeriod = 5000; private final static Logger logger = LoggerFactory.getLogger(NextFlowExecutor.class); @@ -52,48 +60,23 @@ protected void check() throws Exception { throw new IllegalArgumentException("Missing Nextflow ID"); } - // TODO: Change and look for Nextflow script - this.script = "params.str = 'Hello world!'\n" + - "\n" + - "process splitLetters {\n" + - " output:\n" + - " path 'chunk_*'\n" + - "\n" + - " \"\"\"\n" + - " printf '${params.str}' | split -b 6 - chunk_\n" + - " \"\"\"\n" + - "}\n" + - "\n" + - "process convertToUpper {\n" + - " input:\n" + - " path x\n" + - "\n" + - " output:\n" + - " stdout\n" + - "\n" + - " \"\"\"\n" + - " cat $x | tr '[a-z]' '[A-Z]'\n" + - " \"\"\"\n" + - "}\n" + - "\n" + - "process sleep {\n" + - " input:\n" + - " val x\n" + - "\n" + - " \"\"\"\n" + - " sleep 1\n" + - " \"\"\"\n" + - "}\n" + - "\n" + - "workflow {\n" + - " splitLetters | flatten | convertToUpper | view { it.trim() } | sleep\n" + - "}"; + OpenCGAResult result = catalogManager.getWorkflowManager().get(toolParams.getId(), QueryOptions.empty(), token); + if (result.getNumResults() == 0) { + throw new ToolException("Workflow '" + toolParams.getId() + "' not found"); + } + workflow = result.first(); + + if (workflow == null) { + throw new ToolException("Workflow '" + toolParams.getId() + "' is null"); + } } @Override protected void run() throws Exception { - // Write main script file - Files.write(getOutDir().resolve("pipeline.nf"), script.getBytes()); + for (Workflow.Script script : workflow.getScripts()) { + // Write script files + Files.write(getOutDir().resolve(script.getId()), script.getContent().getBytes()); + } // Write nextflow.config file URL nextflowConfig = getClass().getResource("/nextflow.config"); @@ -105,31 +88,81 @@ protected void run() throws Exception { // Execute docker image String workingDirectory = getOutDir().toAbsolutePath().toString(); - List> inputBindings = new ArrayList<>(); - inputBindings.add(new AbstractMap.SimpleEntry<>(workingDirectory, DOCKER_INPUT_PATH)); - AbstractMap.SimpleEntry outputBinding = new AbstractMap.SimpleEntry<>(workingDirectory, DOCKER_OUTPUT_PATH); - - // TODO: Copy nextflow.config and pipeline.nf to DOCKER_INPUT_PATH StringBuilder stringBuilder = new StringBuilder() - .append("nextflow run ").append(DOCKER_OUTPUT_PATH).append("/pipeline.nf") - .append(" --work-dir ").append(DOCKER_OUTPUT_PATH) - .append(" -with-report ").append(DOCKER_OUTPUT_PATH).append("/report.html"); + .append("nextflow -c ").append(workingDirectory).append("/nextflow.config") + .append(" ").append(workflow.getCommandLine()) +// .append(" run nextflow-io/rnaseq-nf -with-docker") +// .append(" run ").append(workingDirectory).append("/pipeline.nf") + .append(" -with-report ").append(workingDirectory).append("/report.html"); + List cliArgs = Arrays.asList(StringUtils.split(stringBuilder.toString(), " ")); + + startTraceFileMonitor(); StopWatch stopWatch = StopWatch.createStarted(); - Map dockerParams = new HashMap<>(); - dockerParams.put("user", "root"); - String cmdline = DockerUtils.run(DOCKER_IMAGE, inputBindings, outputBinding, stringBuilder.toString(), dockerParams); - logger.info("Docker command line: " + cmdline); + + // Execute nextflow binary + ProcessBuilder processBuilder = new ProcessBuilder(cliArgs); + // Establish the working directory of the process + processBuilder.directory(getOutDir().toFile()); + logger.info("Executing: {}", stringBuilder); + Process p; + try { + p = processBuilder.start(); + BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); + BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream())); + String line; + while ((line = input.readLine()) != null) { + logger.info("{}", line); + } + while ((line = error.readLine()) != null) { + logger.error("{} ", line); + } + p.waitFor(); + input.close(); + } catch (IOException | InterruptedException e) { + throw new RuntimeException("Error executing cli: " + e.getMessage(), e); + } + logger.info("Execution time: " + TimeUtils.durationToString(stopWatch)); // Delete input files - Files.delete(getOutDir().resolve("pipeline.nf")); + for (Workflow.Script script : workflow.getScripts()) { + Files.delete(getOutDir().resolve(script.getId())); + } Files.delete(getOutDir().resolve("nextflow.config")); + endTraceFileMonitor(); + } + + @Override + protected void onShutdown() { + super.onShutdown(); + endTraceFileMonitor(); + } + + protected void endTraceFileMonitor() { + if (thread != null) { + thread.interrupt(); + } processTraceFile(); } + private void startTraceFileMonitor() { + thread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(monitorThreadPeriod); + } catch (InterruptedException e) { + return; + } + + processTraceFile(); + } + }); + thread.start(); + } + private void processTraceFile() { List steps = new LinkedList<>(); // Read tabular file @@ -144,17 +177,17 @@ private void processTraceFile() { Trace trace = new Trace(line); ToolStep toolStep = trace.toToolStep(); steps.add(toolStep); - System.out.println(trace); }); } catch (Exception e) { logger.error("Error reading trace file: " + traceFile, e); } } - - try { - setManualSteps(steps); - } catch (ToolException e) { - throw new RuntimeException(e); + if (CollectionUtils.isNotEmpty(steps)) { + try { + setManualSteps(steps); + } catch (ToolException e) { + logger.error("Error writing nextflow steps to ExecutionResult", e); + } } } @@ -281,7 +314,6 @@ public ObjectMap toObjectMap() { throw new UncheckedIOException(e); } } - } static Date toDate(String dateStr) { diff --git a/opencga-analysis/src/main/resources/nextflow.config b/opencga-analysis/src/main/resources/nextflow.config index fd91b7d3811..aa0b82ae0ee 100644 --- a/opencga-analysis/src/main/resources/nextflow.config +++ b/opencga-analysis/src/main/resources/nextflow.config @@ -1,6 +1,6 @@ trace { enabled = true - file = '/data/output/trace.txt' + file = 'trace.txt' overwrite = true fields = 'task_id,hash,name,status,start,complete,%cpu,peak_vmem' } \ No newline at end of file diff --git a/opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/NextFlowExecutorTest.java b/opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/NextFlowExecutorTest.java deleted file mode 100644 index bcc66443b93..00000000000 --- a/opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/NextFlowExecutorTest.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.opencb.opencga.analysis.tools; - -import org.apache.commons.lang3.time.StopWatch; -import org.junit.ClassRule; -import org.junit.Test; -import org.opencb.opencga.analysis.variant.OpenCGATestExternalResource; -import org.opencb.opencga.core.exceptions.ToolException; -import org.opencb.opencga.core.models.nextflow.NextFlowRunParams; - -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.concurrent.TimeUnit; - -public class NextFlowExecutorTest { - - @ClassRule - public static OpenCGATestExternalResource opencga = new OpenCGATestExternalResource(false); - - @Test - public void myTest() throws ToolException, IOException { - Path outDir = Paths.get(opencga.createTmpOutdir("_nextflow")); - - StopWatch stopWatch = StopWatch.createStarted(); - NextFlowExecutor nextFlowExecutorTest = new NextFlowExecutor(); - NextFlowRunParams runParams = new NextFlowRunParams("myId", 2); - nextFlowExecutorTest.setUp(opencga.getOpencgaHome().toString(), runParams.toObjectMap(), outDir, opencga.getAdminToken()); - nextFlowExecutorTest.start(); - System.out.println(stopWatch.getTime(TimeUnit.MILLISECONDS)); - } - -} diff --git a/opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/WorkflowExecutorTest.java b/opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/WorkflowExecutorTest.java new file mode 100644 index 00000000000..a54543cfc88 --- /dev/null +++ b/opencga-analysis/src/test/java/org/opencb/opencga/analysis/tools/WorkflowExecutorTest.java @@ -0,0 +1,86 @@ +package org.opencb.opencga.analysis.tools; + +import org.apache.commons.lang3.time.StopWatch; +import org.junit.Test; +import org.opencb.commons.datastore.core.QueryOptions; +import org.opencb.opencga.analysis.StorageManager; +import org.opencb.opencga.catalog.exceptions.CatalogException; +import org.opencb.opencga.catalog.managers.AbstractManagerTest; +import org.opencb.opencga.core.config.storage.StorageConfiguration; +import org.opencb.opencga.core.exceptions.ToolException; +import org.opencb.opencga.core.models.nextflow.NextFlowRunParams; +import org.opencb.opencga.core.models.nextflow.Workflow; +import org.opencb.opencga.storage.core.StorageEngineFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +public class WorkflowExecutorTest extends AbstractManagerTest { + + @Test + public void myTest() throws ToolException, CatalogException, IOException { + InputStream inputStream = StorageManager.class.getClassLoader().getResourceAsStream("storage-configuration.yml"); + StorageConfiguration storageConfiguration = StorageConfiguration.load(inputStream, "yml"); + + Workflow workflow = getDummyWorkflow(); + catalogManager.getWorkflowManager().create(workflow, QueryOptions.empty(), ownerToken); + + Path outDir = Paths.get(catalogManagerResource.createTmpOutdir("_nextflow")); + + StopWatch stopWatch = StopWatch.createStarted(); + NextFlowExecutor nextFlowExecutorTest = new NextFlowExecutor(); + NextFlowRunParams runParams = new NextFlowRunParams(workflow.getId(), 1); + nextFlowExecutorTest.setUp(catalogManagerResource.getOpencgaHome().toString(), catalogManager, + StorageEngineFactory.get(storageConfiguration), runParams.toObjectMap(), outDir, "", false, ownerToken); +// nextFlowExecutorTest.setUp(catalogManagerResource.getOpencgaHome().toString(), runParams.toObjectMap(), outDir, ownerToken); + nextFlowExecutorTest.start(); + System.out.println(stopWatch.getTime(TimeUnit.MILLISECONDS)); + } + + private Workflow getDummyWorkflow() { + String scriptContent = "params.str = 'Hello world!'\n" + + "\n" + + "process splitLetters {\n" + + " output:\n" + + " path 'chunk_*'\n" + + "\n" + + " \"\"\"\n" + + " printf '${params.str}' | split -b 6 - chunk_\n" + + " \"\"\"\n" + + "}\n" + + "\n" + + "process convertToUpper {\n" + + " input:\n" + + " path x\n" + + "\n" + + " output:\n" + + " stdout\n" + + "\n" + + " \"\"\"\n" + + " cat $x | tr '[a-z]' '[A-Z]'\n" + + " \"\"\"\n" + + "}\n" + + "\n" + + "process sleep {\n" + + " input:\n" + + " val x\n" + + "\n" + + " \"\"\"\n" + + " sleep 6\n" + + " \"\"\"\n" + + "}\n" + + "\n" + + "workflow {\n" + + " splitLetters | flatten | convertToUpper | view { it.trim() } | sleep\n" + + "}"; + return new Workflow() + .setId("workflow") + .setCommandLine("run pipeline.nf") + .setType(Workflow.Type.NEXTFLOW) + .setScripts(Collections.singletonList(new Workflow.Script("pipeline.nf", scriptContent))); + } +} diff --git a/opencga-analysis/src/test/java/org/opencb/opencga/analysis/variant/OpenCGATestExternalResource.java b/opencga-analysis/src/test/java/org/opencb/opencga/analysis/variant/OpenCGATestExternalResource.java index 5c5956d7cf7..dbc8479305a 100644 --- a/opencga-analysis/src/test/java/org/opencb/opencga/analysis/variant/OpenCGATestExternalResource.java +++ b/opencga-analysis/src/test/java/org/opencb/opencga/analysis/variant/OpenCGATestExternalResource.java @@ -46,11 +46,8 @@ import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.nio.file.StandardCopyOption; -import java.text.SimpleDateFormat; import java.util.Arrays; -import java.util.Date; import java.util.List; import java.util.Map; @@ -334,27 +331,15 @@ public void clearStorageDB() { } public String createTmpOutdir(String studyId, String suffix, String sessionId) throws CatalogException, IOException { - return createTmpOutdir(suffix); + return catalogManagerExternalResource.createTmpOutdir(suffix); } public String createTmpOutdir() throws IOException { - StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); - // stackTrace[0] = "Thread.currentThread" - // stackTrace[1] = "newOutputUri" - // stackTrace[2] = caller method - String testName = stackTrace[2].getMethodName(); - return createTmpOutdir(testName); + return catalogManagerExternalResource.createTmpOutdir(); } public String createTmpOutdir(String suffix) throws IOException { - if (suffix.endsWith("_")) { - suffix = suffix.substring(0, suffix.length() - 1); - } - String folder = "I_tmp_" + new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss.SSS").format(new Date()) + suffix; - Path tmpOutDir = Paths.get(getCatalogManager().getConfiguration().getJobDir()).resolve(folder); - Files.createDirectories(tmpOutDir); - return tmpOutDir.toString(); -// return getCatalogManager().getJobManager().createJobOutDir(studyId, "I_tmp_" + date + sufix, sessionId).toString(); + return catalogManagerExternalResource.createTmpOutdir(suffix); } // private class StorageLocalExecutorManager extends LocalExecutorManager { diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/DBAdaptorFactory.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/DBAdaptorFactory.java index 5f3367d4d1c..607712bd8b0 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/DBAdaptorFactory.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/DBAdaptorFactory.java @@ -117,4 +117,6 @@ OpenCGAResult createOrganization(Organization organization, QueryO ClinicalAnalysisDBAdaptor getClinicalAnalysisDBAdaptor(String organization) throws CatalogDBException; InterpretationDBAdaptor getInterpretationDBAdaptor(String organization) throws CatalogDBException; + + WorkflowDBAdaptor getWorkflowDBAdaptor(String organization) throws CatalogDBException; } diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/api/NextFlowDBAdaptor.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/api/WorkflowDBAdaptor.java similarity index 71% rename from opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/api/NextFlowDBAdaptor.java rename to opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/api/WorkflowDBAdaptor.java index d3ee6a73bfc..cc9ebc9e96a 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/api/NextFlowDBAdaptor.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/api/WorkflowDBAdaptor.java @@ -1,19 +1,30 @@ package org.opencb.opencga.catalog.db.api; import org.apache.commons.collections4.map.LinkedMap; +import org.opencb.commons.datastore.core.QueryOptions; import org.opencb.commons.datastore.core.QueryParam; -import org.opencb.opencga.core.models.nextflow.NextFlow; +import org.opencb.opencga.catalog.exceptions.CatalogAuthorizationException; +import org.opencb.opencga.catalog.exceptions.CatalogDBException; +import org.opencb.opencga.catalog.exceptions.CatalogParameterException; +import org.opencb.opencga.core.models.nextflow.Workflow; +import org.opencb.opencga.core.response.OpenCGAResult; import java.util.Map; import static org.opencb.commons.datastore.core.QueryParam.Type.*; -public interface NextFlowDBAdaptor extends CoreDBAdaptor { +public interface WorkflowDBAdaptor extends CoreDBAdaptor { + + OpenCGAResult insert(Workflow workflow, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException; enum QueryParams implements QueryParam { ID("id", TEXT, ""), UID("uid", LONG, ""), UUID("uuid", TEXT, ""), + TYPE("type", TEXT, ""), + COMMAND_LINE("commandLine", TEXT, ""), + SCRIPTS("scripts", OBJECT, ""), RELEASE("release", INTEGER, ""), // Release where the sample was created SNAPSHOT("snapshot", INTEGER, ""), // Last version of sample at release = snapshot VERSION("version", INTEGER, ""), // Version of the sample diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/MongoDBAdaptorFactory.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/MongoDBAdaptorFactory.java index 621c537cddd..9cb82011a1b 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/MongoDBAdaptorFactory.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/MongoDBAdaptorFactory.java @@ -400,6 +400,11 @@ public InterpretationDBAdaptor getInterpretationDBAdaptor(String organizationId) return getOrganizationMongoDBAdaptorFactory(organizationId).getInterpretationDBAdaptor(); } + @Override + public WorkflowDBAdaptor getWorkflowDBAdaptor(String organization) throws CatalogDBException { + return getOrganizationMongoDBAdaptorFactory(organization).getWorkflowDBAdaptor(); + } + public MongoDataStoreManager getMongoManager() { return mongoManager; } diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/NextFlowMongoDBAdaptor.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/NextFlowMongoDBAdaptor.java deleted file mode 100644 index 401f0f28872..00000000000 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/NextFlowMongoDBAdaptor.java +++ /dev/null @@ -1,259 +0,0 @@ -package org.opencb.opencga.catalog.db.mongodb; - -import com.mongodb.client.ClientSession; -import org.apache.commons.lang3.time.StopWatch; -import org.bson.Document; -import org.bson.conversions.Bson; -import org.opencb.commons.datastore.core.ObjectMap; -import org.opencb.commons.datastore.core.Query; -import org.opencb.commons.datastore.core.QueryOptions; -import org.opencb.commons.datastore.mongodb.MongoDBCollection; -import org.opencb.commons.datastore.mongodb.MongoDBIterator; -import org.opencb.opencga.catalog.db.api.DBIterator; -import org.opencb.opencga.catalog.db.api.NextFlowDBAdaptor; -import org.opencb.opencga.catalog.db.mongodb.converters.NextFlowConverter; -import org.opencb.opencga.catalog.db.mongodb.iterators.NextFlowCatalogMongoDBIterator; -import org.opencb.opencga.catalog.exceptions.CatalogAuthorizationException; -import org.opencb.opencga.catalog.exceptions.CatalogDBException; -import org.opencb.opencga.catalog.exceptions.CatalogParameterException; -import org.opencb.opencga.core.config.Configuration; -import org.opencb.opencga.core.models.nextflow.NextFlow; -import org.opencb.opencga.core.response.OpenCGAResult; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - -import static org.opencb.opencga.catalog.db.mongodb.MongoDBUtils.fixAclProjection; - -public class NextFlowMongoDBAdaptor extends CatalogMongoDBAdaptor implements NextFlowDBAdaptor { - - private final MongoDBCollection nextflowCollection; - private final MongoDBCollection archiveNextflowCollection; - private final MongoDBCollection deleteNextflowCollection; - private final SnapshotVersionedMongoDBAdaptor versionedMongoDBAdaptor; - private final NextFlowConverter nextflowConverter; - - public NextFlowMongoDBAdaptor(MongoDBCollection nextflowCollection, MongoDBCollection archiveNextflowCollection, - MongoDBCollection deleteNextflowCollection, Configuration configuration, - OrganizationMongoDBAdaptorFactory dbAdaptorFactory) { - super(configuration, LoggerFactory.getLogger(NextFlowMongoDBAdaptor.class)); - this.dbAdaptorFactory = dbAdaptorFactory; - this.nextflowCollection = nextflowCollection; - this.archiveNextflowCollection = archiveNextflowCollection; - this.deleteNextflowCollection = deleteNextflowCollection; - this.versionedMongoDBAdaptor = new SnapshotVersionedMongoDBAdaptor(nextflowCollection, archiveNextflowCollection, - deleteNextflowCollection); - this.nextflowConverter = new NextFlowConverter(); - } - - @Override - public OpenCGAResult get(long studyUid, Query query, QueryOptions options, String user) - throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { - long startTime = startQuery(); - try (DBIterator dbIterator = iterator(studyUid, query, options, user)) { - return endQuery(startTime, dbIterator); - } - } - - @Override - public OpenCGAResult get(Query query, QueryOptions options) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - long startTime = startQuery(); - try (DBIterator dbIterator = iterator(query, options)) { - return endQuery(startTime, dbIterator); - } - } - - @Override - public OpenCGAResult nativeGet(long studyUid, Query query, QueryOptions options, String user) - throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { - long startTime = startQuery(); - try (DBIterator dbIterator = nativeIterator(studyUid, query, options, user)) { - return endQuery(startTime, dbIterator); - } - } - - @Override - OpenCGAResult nativeGet(Query query, QueryOptions options) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - long startTime = startQuery(); - try (DBIterator dbIterator = nativeIterator(query, options)) { - return endQuery(startTime, dbIterator); - } - } - - @Override - public DBIterator iterator(long studyUid, Query query, QueryOptions options, String user) - throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { - query.put(PRIVATE_STUDY_UID, studyUid); - MongoDBIterator mongoCursor = getMongoCursor(null, query, options, user); - return new NextFlowCatalogMongoDBIterator<>(mongoCursor, null, nextflowConverter, null, studyUid, user, options); - } - - @Override - public DBIterator iterator(Query query, QueryOptions options) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - MongoDBIterator mongoCursor = getMongoCursor(null, query, options, null); - return new NextFlowCatalogMongoDBIterator<>(mongoCursor, null, nextflowConverter, null, options); - } - - @Override - public DBIterator nativeIterator(Query query, QueryOptions options) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - QueryOptions queryOptions = options != null ? new QueryOptions(options) : new QueryOptions(); - queryOptions.put(NATIVE_QUERY, true); - MongoDBIterator mongoCursor = getMongoCursor(null, query, queryOptions, null); - return new NextFlowCatalogMongoDBIterator(mongoCursor, null, null, null, options); - } - - @Override - public DBIterator nativeIterator(long studyUid, Query query, QueryOptions options, String user) - throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { - QueryOptions queryOptions = options != null ? new QueryOptions(options) : new QueryOptions(); - queryOptions.put(NATIVE_QUERY, true); - - query.put(PRIVATE_STUDY_UID, studyUid); - MongoDBIterator mongoCursor = getMongoCursor(null, query, queryOptions, user); - return new NextFlowCatalogMongoDBIterator<>(mongoCursor, null, null, null, studyUid, user, options); - } - - @Override - public OpenCGAResult count(Query query, String user) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - Bson bson = parseQuery(query, user); - return new OpenCGAResult<>(nextflowCollection.count(null, bson)); - } - - @Override - public OpenCGAResult count(Query query) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - Bson bson = parseQuery(query); - return new OpenCGAResult<>(nextflowCollection.count(null, bson)); - } - - @Override - public OpenCGAResult groupBy(Query query, List fields, QueryOptions options, String user) - throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { - return null; - } - - @Override - public OpenCGAResult groupBy(Query query, String field, QueryOptions options) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - Bson bsonQuery = parseQuery(query); - return groupBy(nextflowCollection, bsonQuery, field, NextFlowDBAdaptor.QueryParams.ID.key(), options); - } - - @Override - public OpenCGAResult groupBy(Query query, List fields, QueryOptions options) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - Bson bsonQuery = parseQuery(query); - return groupBy(nextflowCollection, bsonQuery, fields, NextFlowDBAdaptor.QueryParams.ID.key(), options); - } - - @Override - public OpenCGAResult distinct(long studyUid, String field, Query query, String userId) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - Query finalQuery = query != null ? new Query(query) : new Query(); - finalQuery.put(NextFlowDBAdaptor.QueryParams.STUDY_UID.key(), studyUid); - Bson bson = parseQuery(finalQuery, userId); - return new OpenCGAResult<>(nextflowCollection.distinct(field, bson)); - } - - @Override - public OpenCGAResult distinct(long studyUid, List fields, Query query, String userId) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - StopWatch stopWatch = StopWatch.createStarted(); - Query finalQuery = query != null ? new Query(query) : new Query(); - finalQuery.put(NextFlowDBAdaptor.QueryParams.STUDY_UID.key(), studyUid); - Bson bson = parseQuery(finalQuery, userId); - - Set results = new LinkedHashSet<>(); - for (String field : fields) { - results.addAll(nextflowCollection.distinct(field, bson, String.class).getResults()); - } - - return new OpenCGAResult<>((int) stopWatch.getTime(TimeUnit.MILLISECONDS), Collections.emptyList(), results.size(), - new ArrayList<>(results), -1); - } - - @Override - public OpenCGAResult stats(Query query) { - return null; - } - - @Override - public OpenCGAResult update(long id, ObjectMap parameters, QueryOptions queryOptions) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - return null; - } - - @Override - public OpenCGAResult update(Query query, ObjectMap parameters, QueryOptions queryOptions) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - return null; - } - - @Override - public OpenCGAResult delete(NextFlow id) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - return null; - } - - @Override - public OpenCGAResult delete(Query query) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - return null; - } - - @Override - public OpenCGAResult restore(long id, QueryOptions queryOptions) throws CatalogDBException { - return null; - } - - @Override - public OpenCGAResult restore(Query query, QueryOptions queryOptions) throws CatalogDBException { - return null; - } - - @Override - public OpenCGAResult rank(Query query, String field, int numResults, boolean asc) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - return null; - } - - @Override - public void forEach(Query query, Consumer action, QueryOptions options) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - - } - - - private MongoDBIterator getMongoCursor(ClientSession clientSession, Query query, QueryOptions options, String user) - throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - Query finalQuery = new Query(query); - - QueryOptions qOptions; - if (options != null) { - qOptions = new QueryOptions(options); - } else { - qOptions = new QueryOptions(); - } - fixAclProjection(qOptions); - - Bson bson = parseQuery(finalQuery, user); - MongoDBCollection collection = getQueryCollection(finalQuery, nextflowCollection, archiveNextflowCollection, - deleteNextflowCollection); - logger.debug("Nextflow query: {}", bson.toBsonDocument()); - return collection.iterator(clientSession, bson, null, null, qOptions); - } - - - private Bson parseQuery(Query query) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - return parseQuery(query, null); - } - - private Bson parseQuery(Query query, String user) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - return null; - } -} diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/OrganizationMongoDBAdaptorFactory.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/OrganizationMongoDBAdaptorFactory.java index b1c1c6704ec..591825374fe 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/OrganizationMongoDBAdaptorFactory.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/OrganizationMongoDBAdaptorFactory.java @@ -42,6 +42,7 @@ public class OrganizationMongoDBAdaptorFactory { public static final String PANEL_COLLECTION = "panel"; public static final String CLINICAL_ANALYSIS_COLLECTION = "clinical"; public static final String INTERPRETATION_COLLECTION = "interpretation"; + public static final String WORKFLOW_COLLECTION = "workflow"; public static final String NOTE_ARCHIVE_COLLECTION = "note_archive"; public static final String SAMPLE_ARCHIVE_COLLECTION = "sample_archive"; @@ -49,6 +50,7 @@ public class OrganizationMongoDBAdaptorFactory { public static final String FAMILY_ARCHIVE_COLLECTION = "family_archive"; public static final String PANEL_ARCHIVE_COLLECTION = "panel_archive"; public static final String INTERPRETATION_ARCHIVE_COLLECTION = "interpretation_archive"; + public static final String WORKFLOW_ARCHIVE_COLLECTION = "workflow_archive"; public static final String DELETED_NOTE_COLLECTION = "note_deleted"; public static final String DELETED_USER_COLLECTION = "user_deleted"; @@ -63,6 +65,7 @@ public class OrganizationMongoDBAdaptorFactory { public static final String DELETED_PANEL_COLLECTION = "panel_deleted"; public static final String DELETED_CLINICAL_ANALYSIS_COLLECTION = "clinical_deleted"; public static final String DELETED_INTERPRETATION_COLLECTION = "interpretation_deleted"; + public static final String DELETED_WORKFLOW_COLLECTION = "workflow_deleted"; public static final String METADATA_COLLECTION = "metadata"; public static final String MIGRATION_COLLECTION = "migration"; @@ -83,6 +86,7 @@ public class OrganizationMongoDBAdaptorFactory { FAMILY_COLLECTION, CLINICAL_ANALYSIS_COLLECTION, INTERPRETATION_COLLECTION, + WORKFLOW_COLLECTION, NOTE_ARCHIVE_COLLECTION, SAMPLE_ARCHIVE_COLLECTION, @@ -90,6 +94,7 @@ public class OrganizationMongoDBAdaptorFactory { FAMILY_ARCHIVE_COLLECTION, PANEL_ARCHIVE_COLLECTION, INTERPRETATION_ARCHIVE_COLLECTION, + WORKFLOW_ARCHIVE_COLLECTION, DELETED_NOTE_COLLECTION, DELETED_USER_COLLECTION, @@ -104,6 +109,7 @@ public class OrganizationMongoDBAdaptorFactory { DELETED_FAMILY_COLLECTION, DELETED_CLINICAL_ANALYSIS_COLLECTION, DELETED_INTERPRETATION_COLLECTION, + DELETED_WORKFLOW_COLLECTION, MIGRATION_COLLECTION, // FIXME metadata collection is unused @@ -131,6 +137,7 @@ public class OrganizationMongoDBAdaptorFactory { private final PanelMongoDBAdaptor panelDBAdaptor; private final ClinicalAnalysisMongoDBAdaptor clinicalDBAdaptor; private final InterpretationMongoDBAdaptor interpretationDBAdaptor; + private final WorkflowMongoDBAdaptor workflowDBAdaptor; private final AuditMongoDBAdaptor auditDBAdaptor; // private final MetaMongoDBAdaptor metaDBAdaptor; private final MigrationMongoDBAdaptor migrationDBAdaptor; @@ -170,6 +177,7 @@ public OrganizationMongoDBAdaptorFactory(String organizationId, MongoDataStoreMa MongoDBCollection familyCollection = mongoDataStore.getCollection(FAMILY_COLLECTION); MongoDBCollection clinicalCollection = mongoDataStore.getCollection(CLINICAL_ANALYSIS_COLLECTION); MongoDBCollection interpretationCollection = mongoDataStore.getCollection(INTERPRETATION_COLLECTION); + MongoDBCollection workflowCollection = mongoDataStore.getCollection(WORKFLOW_COLLECTION); MongoDBCollection notesArchivedCollection = mongoDataStore.getCollection(NOTE_ARCHIVE_COLLECTION); MongoDBCollection sampleArchivedCollection = mongoDataStore.getCollection(SAMPLE_ARCHIVE_COLLECTION); @@ -177,6 +185,7 @@ public OrganizationMongoDBAdaptorFactory(String organizationId, MongoDataStoreMa MongoDBCollection familyArchivedCollection = mongoDataStore.getCollection(FAMILY_ARCHIVE_COLLECTION); MongoDBCollection panelArchivedCollection = mongoDataStore.getCollection(PANEL_ARCHIVE_COLLECTION); MongoDBCollection interpretationArchivedCollection = mongoDataStore.getCollection(INTERPRETATION_ARCHIVE_COLLECTION); + MongoDBCollection workflowArchivedCollection = mongoDataStore.getCollection(WORKFLOW_ARCHIVE_COLLECTION); MongoDBCollection deletedNotesCollection = mongoDataStore.getCollection(DELETED_NOTE_COLLECTION); MongoDBCollection deletedUserCollection = mongoDataStore.getCollection(DELETED_USER_COLLECTION); @@ -191,6 +200,7 @@ public OrganizationMongoDBAdaptorFactory(String organizationId, MongoDataStoreMa MongoDBCollection deletedFamilyCollection = mongoDataStore.getCollection(DELETED_FAMILY_COLLECTION); MongoDBCollection deletedClinicalCollection = mongoDataStore.getCollection(DELETED_CLINICAL_ANALYSIS_COLLECTION); MongoDBCollection deletedInterpretationCollection = mongoDataStore.getCollection(DELETED_INTERPRETATION_COLLECTION); + MongoDBCollection deletedWorkflowCollection = mongoDataStore.getCollection(DELETED_WORKFLOW_COLLECTION); MongoDBCollection auditCollection = mongoDataStore.getCollection(AUDIT_COLLECTION); @@ -214,6 +224,8 @@ public OrganizationMongoDBAdaptorFactory(String organizationId, MongoDataStoreMa clinicalDBAdaptor = new ClinicalAnalysisMongoDBAdaptor(clinicalCollection, deletedClinicalCollection, configuration, this); interpretationDBAdaptor = new InterpretationMongoDBAdaptor(interpretationCollection, interpretationArchivedCollection, deletedInterpretationCollection, configuration, this); + workflowDBAdaptor = new WorkflowMongoDBAdaptor(workflowCollection, workflowArchivedCollection, deletedWorkflowCollection, + configuration, this); // metaDBAdaptor = new MetaMongoDBAdaptor(metaCollection, configuration, this); migrationDBAdaptor = new MigrationMongoDBAdaptor(migrationCollection, configuration, this); auditDBAdaptor = new AuditMongoDBAdaptor(auditCollection, configuration); @@ -237,6 +249,7 @@ public OrganizationMongoDBAdaptorFactory(String organizationId, MongoDataStoreMa mongoDBCollectionMap.put(FAMILY_COLLECTION, familyCollection); mongoDBCollectionMap.put(CLINICAL_ANALYSIS_COLLECTION, clinicalCollection); mongoDBCollectionMap.put(INTERPRETATION_COLLECTION, interpretationCollection); + mongoDBCollectionMap.put(WORKFLOW_COLLECTION, workflowCollection); mongoDBCollectionMap.put(NOTE_ARCHIVE_COLLECTION, notesArchivedCollection); mongoDBCollectionMap.put(SAMPLE_ARCHIVE_COLLECTION, sampleArchivedCollection); @@ -244,6 +257,7 @@ public OrganizationMongoDBAdaptorFactory(String organizationId, MongoDataStoreMa mongoDBCollectionMap.put(FAMILY_ARCHIVE_COLLECTION, familyArchivedCollection); mongoDBCollectionMap.put(PANEL_ARCHIVE_COLLECTION, panelArchivedCollection); mongoDBCollectionMap.put(INTERPRETATION_ARCHIVE_COLLECTION, interpretationArchivedCollection); + mongoDBCollectionMap.put(WORKFLOW_ARCHIVE_COLLECTION, workflowArchivedCollection); mongoDBCollectionMap.put(DELETED_NOTE_COLLECTION, deletedNotesCollection); mongoDBCollectionMap.put(DELETED_USER_COLLECTION, deletedUserCollection); @@ -257,6 +271,7 @@ public OrganizationMongoDBAdaptorFactory(String organizationId, MongoDataStoreMa mongoDBCollectionMap.put(DELETED_FAMILY_COLLECTION, deletedFamilyCollection); mongoDBCollectionMap.put(DELETED_CLINICAL_ANALYSIS_COLLECTION, deletedClinicalCollection); mongoDBCollectionMap.put(DELETED_INTERPRETATION_COLLECTION, deletedInterpretationCollection); + mongoDBCollectionMap.put(DELETED_WORKFLOW_COLLECTION, deletedWorkflowCollection); mongoDBCollectionMap.put(AUDIT_COLLECTION, auditCollection); } @@ -478,6 +493,10 @@ public InterpretationMongoDBAdaptor getInterpretationDBAdaptor() { return interpretationDBAdaptor; } + public WorkflowMongoDBAdaptor getWorkflowDBAdaptor() { + return workflowDBAdaptor; + } + public AuthorizationMongoDBAdaptor getAuthorizationDBAdaptor() { return authorizationMongoDBAdaptor; } diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/SnapshotVersionedMongoDBAdaptor.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/SnapshotVersionedMongoDBAdaptor.java index a86ebb9cc36..1c782686bf7 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/SnapshotVersionedMongoDBAdaptor.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/SnapshotVersionedMongoDBAdaptor.java @@ -138,6 +138,9 @@ DBIterator iterator(ClientSession session, Query query, QueryOptions options) protected void insert(ClientSession session, Document document) { String uuid = getClientSessionUuid(session); + document.put(VERSION, 1); + document.put(LAST_OF_VERSION, true); + document.put(LAST_OF_RELEASE, true); document.put(PRIVATE_TRANSACTION_ID, uuid); collection.insert(session, document, QueryOptions.empty()); archiveCollection.insert(session, document, QueryOptions.empty()); diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/WorkflowMongoDBAdaptor.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/WorkflowMongoDBAdaptor.java new file mode 100644 index 00000000000..809550306c0 --- /dev/null +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/WorkflowMongoDBAdaptor.java @@ -0,0 +1,386 @@ +package org.opencb.opencga.catalog.db.mongodb; + +import com.mongodb.client.ClientSession; +import com.mongodb.client.model.Filters; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.opencb.commons.datastore.core.DataResult; +import org.opencb.commons.datastore.core.ObjectMap; +import org.opencb.commons.datastore.core.Query; +import org.opencb.commons.datastore.core.QueryOptions; +import org.opencb.commons.datastore.mongodb.MongoDBCollection; +import org.opencb.commons.datastore.mongodb.MongoDBIterator; +import org.opencb.opencga.catalog.db.api.DBIterator; +import org.opencb.opencga.catalog.db.api.WorkflowDBAdaptor; +import org.opencb.opencga.catalog.db.mongodb.converters.WorkflowConverter; +import org.opencb.opencga.catalog.db.mongodb.iterators.WorkflowCatalogMongoDBIterator; +import org.opencb.opencga.catalog.exceptions.CatalogAuthorizationException; +import org.opencb.opencga.catalog.exceptions.CatalogDBException; +import org.opencb.opencga.catalog.exceptions.CatalogParameterException; +import org.opencb.opencga.catalog.utils.Constants; +import org.opencb.opencga.core.common.TimeUtils; +import org.opencb.opencga.core.config.Configuration; +import org.opencb.opencga.core.models.nextflow.Workflow; +import org.opencb.opencga.core.response.OpenCGAResult; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.opencb.opencga.catalog.db.mongodb.MongoDBUtils.fixAclProjection; + +public class WorkflowMongoDBAdaptor extends CatalogMongoDBAdaptor implements WorkflowDBAdaptor { + + private final MongoDBCollection workflowCollection; + private final MongoDBCollection archiveWorkflowCollection; + private final MongoDBCollection deleteWorkflowCollection; + private final SnapshotVersionedMongoDBAdaptor versionedMongoDBAdaptor; + private final WorkflowConverter workflowConverter; + + public WorkflowMongoDBAdaptor(MongoDBCollection workflowCollection, MongoDBCollection archiveWorkflowCollection, + MongoDBCollection deleteWorkflowCollection, Configuration configuration, + OrganizationMongoDBAdaptorFactory dbAdaptorFactory) { + super(configuration, LoggerFactory.getLogger(WorkflowMongoDBAdaptor.class)); + this.dbAdaptorFactory = dbAdaptorFactory; + this.workflowCollection = workflowCollection; + this.archiveWorkflowCollection = archiveWorkflowCollection; + this.deleteWorkflowCollection = deleteWorkflowCollection; + this.versionedMongoDBAdaptor = new SnapshotVersionedMongoDBAdaptor(workflowCollection, archiveWorkflowCollection, + deleteWorkflowCollection); + this.workflowConverter = new WorkflowConverter(); + } + + Workflow insert(ClientSession clientSession, Workflow workflow) throws CatalogDBException { + if (StringUtils.isEmpty(workflow.getId())) { + throw new CatalogDBException("Missing workflow id"); + } + + // Check the workflow does not exist + Bson bson = Filters.eq(QueryParams.ID.key(), workflow.getId()); + DataResult count = workflowCollection.count(clientSession, bson); + if (count.getNumMatches() > 0) { + throw new CatalogDBException("Workflow { id: '" + workflow.getId() + "'} already exists."); + } + + long uid = getNewUid(clientSession); + workflow.setUid(uid); + + Document workflowObject = workflowConverter.convertToStorageType(workflow); + + workflowObject.put(PRIVATE_CREATION_DATE, + StringUtils.isNotEmpty(workflow.getCreationDate()) ? TimeUtils.toDate(workflow.getCreationDate()) : TimeUtils.getDate()); + workflowObject.put(PRIVATE_MODIFICATION_DATE, StringUtils.isNotEmpty(workflow.getModificationDate()) + ? TimeUtils.toDate(workflow.getModificationDate()) : TimeUtils.getDate()); + + logger.debug("Inserting workflow '{}' ({})...", workflow.getId(), workflow.getUid()); + versionedMongoDBAdaptor.insert(clientSession, workflowObject); + logger.debug("Workflow '{}' successfully inserted", workflow.getId()); + + return workflow; + } + + @Override + public OpenCGAResult insert(Workflow workflow, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return runTransaction(clientSession -> { + long tmpStartTime = startQuery(); + logger.debug("Starting workflow insert transaction for workflow id '{}'", workflow.getId()); + + insert(clientSession, workflow); + return endWrite(tmpStartTime, 1, 1, 0, 0, null); + }, e -> logger.error("Could not create workflow {}: {}", workflow.getId(), e.getMessage())); + } + + @Override + public OpenCGAResult get(long studyUid, Query query, QueryOptions options, String user) + throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { + long startTime = startQuery(); + try (DBIterator dbIterator = iterator(studyUid, query, options, user)) { + return endQuery(startTime, dbIterator); + } + } + + @Override + public OpenCGAResult get(Query query, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + long startTime = startQuery(); + try (DBIterator dbIterator = iterator(query, options)) { + return endQuery(startTime, dbIterator); + } + } + + @Override + public OpenCGAResult nativeGet(long studyUid, Query query, QueryOptions options, String user) + throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { + long startTime = startQuery(); + try (DBIterator dbIterator = nativeIterator(studyUid, query, options, user)) { + return endQuery(startTime, dbIterator); + } + } + + @Override + OpenCGAResult nativeGet(Query query, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + long startTime = startQuery(); + try (DBIterator dbIterator = nativeIterator(query, options)) { + return endQuery(startTime, dbIterator); + } + } + + @Override + public DBIterator iterator(long studyUid, Query query, QueryOptions options, String user) + throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { + query.put(PRIVATE_STUDY_UID, studyUid); + MongoDBIterator mongoCursor = getMongoCursor(null, query, options, user); + return new WorkflowCatalogMongoDBIterator<>(mongoCursor, null, workflowConverter, null, studyUid, user, options); + } + + @Override + public DBIterator iterator(Query query, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + MongoDBIterator mongoCursor = getMongoCursor(null, query, options, null); + return new WorkflowCatalogMongoDBIterator<>(mongoCursor, null, workflowConverter, null, options); + } + + @Override + public DBIterator nativeIterator(Query query, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + QueryOptions queryOptions = options != null ? new QueryOptions(options) : new QueryOptions(); + queryOptions.put(NATIVE_QUERY, true); + MongoDBIterator mongoCursor = getMongoCursor(null, query, queryOptions, null); + return new WorkflowCatalogMongoDBIterator(mongoCursor, null, null, null, options); + } + + @Override + public DBIterator nativeIterator(long studyUid, Query query, QueryOptions options, String user) + throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { + QueryOptions queryOptions = options != null ? new QueryOptions(options) : new QueryOptions(); + queryOptions.put(NATIVE_QUERY, true); + + query.put(PRIVATE_STUDY_UID, studyUid); + MongoDBIterator mongoCursor = getMongoCursor(null, query, queryOptions, user); + return new WorkflowCatalogMongoDBIterator<>(mongoCursor, null, null, null, studyUid, user, options); + } + + @Override + public OpenCGAResult count(Query query, String user) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Bson bson = parseQuery(query, user); + return new OpenCGAResult<>(workflowCollection.count(null, bson)); + } + + @Override + public OpenCGAResult count(Query query) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Bson bson = parseQuery(query); + return new OpenCGAResult<>(workflowCollection.count(null, bson)); + } + + @Override + public OpenCGAResult groupBy(Query query, List fields, QueryOptions options, String user) + throws CatalogDBException, CatalogAuthorizationException, CatalogParameterException { + return null; + } + + @Override + public OpenCGAResult groupBy(Query query, String field, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Bson bsonQuery = parseQuery(query); + return groupBy(workflowCollection, bsonQuery, field, WorkflowDBAdaptor.QueryParams.ID.key(), options); + } + + @Override + public OpenCGAResult groupBy(Query query, List fields, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Bson bsonQuery = parseQuery(query); + return groupBy(workflowCollection, bsonQuery, fields, WorkflowDBAdaptor.QueryParams.ID.key(), options); + } + + @Override + public OpenCGAResult distinct(long studyUid, String field, Query query, String userId) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Query finalQuery = query != null ? new Query(query) : new Query(); + finalQuery.put(WorkflowDBAdaptor.QueryParams.STUDY_UID.key(), studyUid); + Bson bson = parseQuery(finalQuery, userId); + return new OpenCGAResult<>(workflowCollection.distinct(field, bson)); + } + + @Override + public OpenCGAResult distinct(long studyUid, List fields, Query query, String userId) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + StopWatch stopWatch = StopWatch.createStarted(); + Query finalQuery = query != null ? new Query(query) : new Query(); + finalQuery.put(WorkflowDBAdaptor.QueryParams.STUDY_UID.key(), studyUid); + Bson bson = parseQuery(finalQuery, userId); + + Set results = new LinkedHashSet<>(); + for (String field : fields) { + results.addAll(workflowCollection.distinct(field, bson, String.class).getResults()); + } + + return new OpenCGAResult<>((int) stopWatch.getTime(TimeUnit.MILLISECONDS), Collections.emptyList(), results.size(), + new ArrayList<>(results), -1); + } + + @Override + public OpenCGAResult stats(Query query) { + return null; + } + + @Override + public OpenCGAResult update(long id, ObjectMap parameters, QueryOptions queryOptions) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return null; + } + + @Override + public OpenCGAResult update(Query query, ObjectMap parameters, QueryOptions queryOptions) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return null; + } + + @Override + public OpenCGAResult delete(Workflow id) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return null; + } + + @Override + public OpenCGAResult delete(Query query) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return null; + } + + @Override + public OpenCGAResult restore(long id, QueryOptions queryOptions) throws CatalogDBException { + return null; + } + + @Override + public OpenCGAResult restore(Query query, QueryOptions queryOptions) throws CatalogDBException { + return null; + } + + @Override + public OpenCGAResult rank(Query query, String field, int numResults, boolean asc) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return null; + } + + @Override + public void forEach(Query query, Consumer action, QueryOptions options) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + + } + + + private MongoDBIterator getMongoCursor(ClientSession clientSession, Query query, QueryOptions options, String user) + throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + Query finalQuery = new Query(query); + + QueryOptions qOptions; + if (options != null) { + qOptions = new QueryOptions(options); + } else { + qOptions = new QueryOptions(); + } + fixAclProjection(qOptions); + + Bson bson = parseQuery(finalQuery, user); + MongoDBCollection collection = getQueryCollection(finalQuery, workflowCollection, archiveWorkflowCollection, + deleteWorkflowCollection); + logger.debug("Nextflow query: {}", bson.toBsonDocument()); + return collection.iterator(clientSession, bson, null, null, qOptions); + } + + + private Bson parseQuery(Query query) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + return parseQuery(query, null); + } + + private Bson parseQuery(Query query, String user) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { + List andBsonList = new ArrayList<>(); + + Query queryCopy = new Query(query); +// queryCopy.remove(WorkflowDBAdaptor.QueryParams.DELETED.key()); + + if ("all".equalsIgnoreCase(queryCopy.getString(WorkflowDBAdaptor.QueryParams.VERSION.key()))) { + queryCopy.put(Constants.ALL_VERSIONS, true); + queryCopy.remove(WorkflowDBAdaptor.QueryParams.VERSION.key()); + } + + boolean uidVersionQueryFlag = versionedMongoDBAdaptor.generateUidVersionQuery(queryCopy, andBsonList); + + for (Map.Entry entry : queryCopy.entrySet()) { + String key = entry.getKey().split("\\.")[0]; + WorkflowDBAdaptor.QueryParams queryParam = WorkflowDBAdaptor.QueryParams.getParam(entry.getKey()) != null + ? WorkflowDBAdaptor.QueryParams.getParam(entry.getKey()) + : WorkflowDBAdaptor.QueryParams.getParam(key); + if (queryParam == null) { + if (Constants.ALL_VERSIONS.equals(entry.getKey())) { + continue; + } + throw new CatalogDBException("Unexpected parameter " + entry.getKey() + ". The parameter does not exist or cannot be " + + "queried for."); + } + try { + switch (queryParam) { + case UID: + addAutoOrQuery(PRIVATE_UID, queryParam.key(), queryCopy, queryParam.type(), andBsonList); + break; +// case SNAPSHOT: +// addAutoOrQuery(RELEASE_FROM_VERSION, queryParam.key(), queryCopy, queryParam.type(), andBsonList); +// break; + case CREATION_DATE: + addAutoOrQuery(PRIVATE_CREATION_DATE, queryParam.key(), queryCopy, queryParam.type(), andBsonList); + break; + case MODIFICATION_DATE: + addAutoOrQuery(PRIVATE_MODIFICATION_DATE, queryParam.key(), query, queryParam.type(), andBsonList); + break; +// case STATUS: +// case STATUS_ID: +// addAutoOrQuery(WorkflowDBAdaptor.QueryParams.STATUS_ID.key(), queryParam.key(), query, +// WorkflowDBAdaptor.QueryParams.STATUS_ID.type(), andBsonList); +// break; +// case INTERNAL_STATUS: +// case INTERNAL_STATUS_ID: +// // Convert the status to a positive status +// query.put(queryParam.key(), InternalStatus.getPositiveStatus(InternalStatus.STATUS_LIST, +// query.getString(queryParam.key()))); +// addAutoOrQuery(WorkflowDBAdaptor.QueryParams.INTERNAL_STATUS_ID.key(), queryParam.key(), query, +// WorkflowDBAdaptor.QueryParams.INTERNAL_STATUS_ID.type(), andBsonList); +// break; + case ID: + case UUID: + case RELEASE: + case VERSION: + addAutoOrQuery(queryParam.key(), queryParam.key(), queryCopy, queryParam.type(), andBsonList); + break; + default: + throw new CatalogDBException("Cannot query by parameter " + queryParam.key()); + } + } catch (Exception e) { + if (e instanceof CatalogDBException) { + throw e; + } else { + throw new CatalogDBException("Error parsing query : " + queryCopy.toJson(), e); + } + } + } + + // If the user doesn't look for a concrete version... + if (!uidVersionQueryFlag && !queryCopy.getBoolean(Constants.ALL_VERSIONS) + && !queryCopy.containsKey(WorkflowDBAdaptor.QueryParams.VERSION.key()) + && queryCopy.containsKey(WorkflowDBAdaptor.QueryParams.SNAPSHOT.key())) { + // If the user looks for anything from some release, we will try to find the latest from the release (snapshot) + andBsonList.add(Filters.eq(LAST_OF_RELEASE, true)); + } + + if (andBsonList.size() > 0) { + return Filters.and(andBsonList); + } else { + return new Document(); + } + } +} diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/NextFlowConverter.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/NextFlowConverter.java deleted file mode 100644 index 6abbee7e1fd..00000000000 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/NextFlowConverter.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.opencb.opencga.catalog.db.mongodb.converters; - -import org.opencb.opencga.core.models.nextflow.NextFlow; - -public class NextFlowConverter extends OpenCgaMongoConverter { - - public NextFlowConverter() { - super(NextFlow.class); - } -} diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/WorkflowConverter.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/WorkflowConverter.java new file mode 100644 index 00000000000..589220cacaa --- /dev/null +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/converters/WorkflowConverter.java @@ -0,0 +1,10 @@ +package org.opencb.opencga.catalog.db.mongodb.converters; + +import org.opencb.opencga.core.models.nextflow.Workflow; + +public class WorkflowConverter extends OpenCgaMongoConverter { + + public WorkflowConverter() { + super(Workflow.class); + } +} diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/iterators/NextFlowCatalogMongoDBIterator.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/iterators/WorkflowCatalogMongoDBIterator.java similarity index 75% rename from opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/iterators/NextFlowCatalogMongoDBIterator.java rename to opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/iterators/WorkflowCatalogMongoDBIterator.java index 05e279f4965..bef72d87cd5 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/iterators/NextFlowCatalogMongoDBIterator.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/iterators/WorkflowCatalogMongoDBIterator.java @@ -12,7 +12,7 @@ import java.util.LinkedList; import java.util.Queue; -public class NextFlowCatalogMongoDBIterator extends CatalogMongoDBIterator { +public class WorkflowCatalogMongoDBIterator extends CatalogMongoDBIterator { private long studyUid; private String user; @@ -25,13 +25,13 @@ public class NextFlowCatalogMongoDBIterator extends CatalogMongoDBIterator private static final int BUFFER_SIZE = 100; private static final String UID_VERSION_SEP = "___"; - public NextFlowCatalogMongoDBIterator(MongoDBIterator mongoCursor, ClientSession clientSession, - GenericDocumentComplexConverter converter, - OrganizationMongoDBAdaptorFactory dbAdaptorFactory, QueryOptions options) { + public WorkflowCatalogMongoDBIterator(MongoDBIterator mongoCursor, ClientSession clientSession, + GenericDocumentComplexConverter converter, + OrganizationMongoDBAdaptorFactory dbAdaptorFactory, QueryOptions options) { this(mongoCursor, clientSession, converter, dbAdaptorFactory, 0, null, options); } - public NextFlowCatalogMongoDBIterator(MongoDBIterator mongoCursor, ClientSession clientSession, + public WorkflowCatalogMongoDBIterator(MongoDBIterator mongoCursor, ClientSession clientSession, GenericDocumentComplexConverter converter, OrganizationMongoDBAdaptorFactory dbAdaptorFactory, long studyUid, String user, QueryOptions options) { super(mongoCursor, clientSession, converter, null); @@ -41,7 +41,7 @@ public NextFlowCatalogMongoDBIterator(MongoDBIterator mongoCursor, Cli this.options = options; this.nextflowListBuffer = new LinkedList<>(); - this.logger = LoggerFactory.getLogger(NextFlowCatalogMongoDBIterator.class); + this.logger = LoggerFactory.getLogger(WorkflowCatalogMongoDBIterator.class); } } diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/AbstractManager.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/AbstractManager.java index 3d3611c5414..26ebc950048 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/AbstractManager.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/AbstractManager.java @@ -30,7 +30,6 @@ import org.opencb.opencga.catalog.models.InternalGetDataResult; import org.opencb.opencga.core.api.ParamConstants; import org.opencb.opencga.core.config.Configuration; -import org.opencb.opencga.core.models.IPrivateStudyUid; import org.opencb.opencga.core.models.study.Group; import org.opencb.opencga.core.response.OpenCGAResult; import org.slf4j.Logger; @@ -137,6 +136,10 @@ protected InterpretationDBAdaptor getInterpretationDBAdaptor(String organization return catalogDBAdaptorFactory.getInterpretationDBAdaptor(organization); } + protected WorkflowDBAdaptor getWorkflowDBAdaptor(String organization) throws CatalogDBException { + return catalogDBAdaptorFactory.getWorkflowDBAdaptor(organization); + } + protected void fixQueryObject(Query query) { changeQueryId(query, ParamConstants.INTERNAL_STATUS_PARAM, "internal.status"); } @@ -173,9 +176,8 @@ protected void changeQueryId(Query query, String currentKey, String newKey) { * @return the OpenCGAResult with the proper order of results. * @throws CatalogException In case of inconsistencies found. */ - InternalGetDataResult keepOriginalOrder(List entries, Function getId, - OpenCGAResult queryResult, boolean silent, - boolean keepAllVersions) throws CatalogException { + InternalGetDataResult keepOriginalOrder(List entries, Function getId, OpenCGAResult queryResult, + boolean silent, boolean keepAllVersions) throws CatalogException { InternalGetDataResult internalGetDataResult = new InternalGetDataResult<>(queryResult); Map> resultMap = new HashMap<>(); @@ -260,8 +262,7 @@ static QueryOptions keepFieldsInQueryOptions(QueryOptions options, List * @param Generic entry (Sample, File, Cohort...) * @return a list containing the entries that are in {@code originalEntries} that are not in {@code finalEntries}. */ - List getMissingFields(List originalEntries, List finalEntries, - Function getId) { + List getMissingFields(List originalEntries, List finalEntries, Function getId) { Set entrySet = new HashSet<>(); for (T finalEntry : finalEntries) { entrySet.add(getId.apply(finalEntry)); diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/CatalogManager.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/CatalogManager.java index 1409b1e9d4c..9f7b11d0dd3 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/CatalogManager.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/CatalogManager.java @@ -89,6 +89,7 @@ public class CatalogManager implements AutoCloseable { private ClinicalAnalysisManager clinicalAnalysisManager; private InterpretationManager interpretationManager; private PanelManager panelManager; + private WorkflowManager workflowManager; private AuditManager auditManager; private AuthorizationManager authorizationManager; @@ -164,6 +165,8 @@ private void configureManagers(Configuration configuration) throws CatalogExcept clinicalAnalysisManager = new ClinicalAnalysisManager(authorizationManager, auditManager, this, catalogDBAdaptorFactory, configuration); interpretationManager = new InterpretationManager(authorizationManager, auditManager, this, catalogDBAdaptorFactory, configuration); + workflowManager = new WorkflowManager(authorizationManager, auditManager, this, catalogDBAdaptorFactory, ioManagerFactory, + catalogIOManager, configuration); } private void initializeAdmin(Configuration configuration) throws CatalogDBException { @@ -451,4 +454,8 @@ public AuditManager getAuditManager() { public MigrationManager getMigrationManager() { return migrationManager; } + + public WorkflowManager getWorkflowManager() { + return workflowManager; + } } diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/WorkflowManager.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/WorkflowManager.java new file mode 100644 index 00000000000..c9d588bd0b0 --- /dev/null +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/WorkflowManager.java @@ -0,0 +1,220 @@ +package org.opencb.opencga.catalog.managers; + +import org.opencb.commons.datastore.core.Event; +import org.opencb.commons.datastore.core.ObjectMap; +import org.opencb.commons.datastore.core.Query; +import org.opencb.commons.datastore.core.QueryOptions; +import org.opencb.commons.utils.ListUtils; +import org.opencb.opencga.catalog.auth.authorization.AuthorizationManager; +import org.opencb.opencga.catalog.db.DBAdaptorFactory; +import org.opencb.opencga.catalog.db.api.WorkflowDBAdaptor; +import org.opencb.opencga.catalog.exceptions.CatalogException; +import org.opencb.opencga.catalog.exceptions.CatalogParameterException; +import org.opencb.opencga.catalog.io.CatalogIOManager; +import org.opencb.opencga.catalog.io.IOManagerFactory; +import org.opencb.opencga.catalog.models.InternalGetDataResult; +import org.opencb.opencga.catalog.utils.Constants; +import org.opencb.opencga.catalog.utils.ParamUtils; +import org.opencb.opencga.catalog.utils.UuidUtils; +import org.opencb.opencga.core.api.ParamConstants; +import org.opencb.opencga.core.config.Configuration; +import org.opencb.opencga.core.models.JwtPayload; +import org.opencb.opencga.core.models.audit.AuditRecord; +import org.opencb.opencga.core.models.common.Enums; +import org.opencb.opencga.core.models.nextflow.Workflow; +import org.opencb.opencga.core.response.OpenCGAResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class WorkflowManager extends AbstractManager { + + private final CatalogIOManager catalogIOManager; + private final IOManagerFactory ioManagerFactory; + + private final Logger logger; + + WorkflowManager(AuthorizationManager authorizationManager, AuditManager auditManager, CatalogManager catalogManager, + DBAdaptorFactory catalogDBAdaptorFactory, IOManagerFactory ioManagerFactory, CatalogIOManager catalogIOManager, + Configuration configuration) { + super(authorizationManager, auditManager, catalogManager, catalogDBAdaptorFactory, configuration); + + this.catalogIOManager = catalogIOManager; + this.ioManagerFactory = ioManagerFactory; + this.logger = LoggerFactory.getLogger(WorkflowManager.class); + } + + + private InternalGetDataResult internalGet(String organizationId, List workflowList, Query query, QueryOptions options, + String userId, boolean ignoreException) throws CatalogException { + if (ListUtils.isEmpty(workflowList)) { + throw new CatalogException("Missing workflow entries."); + } + List uniqueList = ListUtils.unique(workflowList); + + QueryOptions queryOptions = new QueryOptions(ParamUtils.defaultObject(options, QueryOptions::new)); + Query queryCopy = query == null ? new Query() : new Query(query); + + boolean versioned = queryCopy.getBoolean(Constants.ALL_VERSIONS) + || queryCopy.containsKey(WorkflowDBAdaptor.QueryParams.VERSION.key()); + if (versioned && uniqueList.size() > 1) { + throw new CatalogException("Only one workflow allowed when requesting multiple versions"); + } + + WorkflowDBAdaptor.QueryParams idQueryParam = getFieldFilter(uniqueList); + queryCopy.put(idQueryParam.key(), uniqueList); + + // Ensure the field by which we are querying for will be kept in the results + queryOptions = keepFieldInQueryOptions(queryOptions, idQueryParam.key()); + + OpenCGAResult workflowResult = getWorkflowDBAdaptor(organizationId).get(queryCopy, queryOptions); + + Function workflowStringFunction = Workflow::getId; + if (idQueryParam.equals(WorkflowDBAdaptor.QueryParams.UUID)) { + workflowStringFunction = Workflow::getUuid; + } + + if (ignoreException || workflowResult.getNumResults() >= uniqueList.size()) { + return keepOriginalOrder(uniqueList, workflowStringFunction, workflowResult, ignoreException, versioned); + } else { + throw CatalogException.notFound("workflows", getMissingFields(uniqueList, workflowResult.getResults(), workflowStringFunction)); + } + } + + private WorkflowDBAdaptor.QueryParams getFieldFilter(List idList) throws CatalogException { + WorkflowDBAdaptor.QueryParams idQueryParam = null; + for (String entry : idList) { + WorkflowDBAdaptor.QueryParams param = WorkflowDBAdaptor.QueryParams.ID; + if (UuidUtils.isOpenCgaUuid(entry)) { + param = WorkflowDBAdaptor.QueryParams.UUID; + } + if (idQueryParam == null) { + idQueryParam = param; + } + if (idQueryParam != param) { + throw new CatalogException("Found uuids and ids in the same query. Please, choose one or do two different queries."); + } + } + return idQueryParam; + } + + public OpenCGAResult create(Workflow workflow, QueryOptions options, String token) throws CatalogException { + options = ParamUtils.defaultObject(options, QueryOptions::new); + + JwtPayload tokenPayload = catalogManager.getUserManager().validateToken(token); + ObjectMap auditParams = new ObjectMap() + .append("workflow", workflow) + .append("options", options) + .append("token", token); + + String organizationId = tokenPayload.getOrganization(); + String userId = tokenPayload.getUserId(organizationId); + try { + // 1. Check permissions + authorizationManager.checkIsAtLeastOrganizationOwnerOrAdmin(organizationId, userId); + + // 2. Validate the workflow parameters + validateNewWorkflow(workflow); + + // 3. We insert the workflow + OpenCGAResult insert = getWorkflowDBAdaptor(organizationId).insert(workflow, options); + if (options.getBoolean(ParamConstants.INCLUDE_RESULT_PARAM)) { + // Fetch created workflow + Query query = new Query() + .append(WorkflowDBAdaptor.QueryParams.UID.key(), workflow.getUid()); + OpenCGAResult result = getWorkflowDBAdaptor(organizationId).get(query, options); + insert.setResults(result.getResults()); + } + auditManager.auditCreate(organizationId, userId, Enums.Resource.WORKFLOW, workflow.getId(), workflow.getUuid(), "", "", + auditParams, new AuditRecord.Status(AuditRecord.Status.Result.SUCCESS)); + return insert; + } catch (CatalogException e) { + auditManager.auditCreate(organizationId, userId, Enums.Resource.WORKFLOW, workflow.getId(), "", "", "", auditParams, + new AuditRecord.Status(AuditRecord.Status.Result.ERROR, e.getError())); + throw e; + } + } + + private void validateNewWorkflow(Workflow workflow) throws CatalogParameterException { + ParamUtils.checkIdentifier(workflow.getId(), WorkflowDBAdaptor.QueryParams.ID.key()); + ParamUtils.checkObj(workflow.getType(), WorkflowDBAdaptor.QueryParams.TYPE.key()); + ParamUtils.checkParameter(workflow.getCommandLine(), WorkflowDBAdaptor.QueryParams.COMMAND_LINE.key()); + ParamUtils.checkNotEmptyArray(workflow.getScripts(), WorkflowDBAdaptor.QueryParams.SCRIPTS.key()); + for (Workflow.Script script : workflow.getScripts()) { + ParamUtils.checkIdentifier(script.getId(), WorkflowDBAdaptor.QueryParams.SCRIPTS.key() + ".id"); + ParamUtils.checkParameter(script.getContent(), WorkflowDBAdaptor.QueryParams.SCRIPTS.key() + ".content"); + } + workflow.setUuid(UuidUtils.generateOpenCgaUuid(UuidUtils.Entity.WORKFLOW)); + workflow.setVersion(1); + workflow.setCreationDate(ParamUtils.checkDateOrGetCurrentDate(workflow.getCreationDate(), + WorkflowDBAdaptor.QueryParams.CREATION_DATE.key())); + workflow.setModificationDate(ParamUtils.checkDateOrGetCurrentDate(workflow.getModificationDate(), + WorkflowDBAdaptor.QueryParams.MODIFICATION_DATE.key())); + } + + public OpenCGAResult get(String workflow, QueryOptions queryOptions, String token) throws CatalogException { + return get(Collections.singletonList(workflow), queryOptions, false, token); + } + + public OpenCGAResult get(List workflowList, QueryOptions queryOptions, boolean ignoreException, String token) + throws CatalogException { + JwtPayload tokenPayload = catalogManager.getUserManager().validateToken(token); + String organizationId = tokenPayload.getOrganization(); + String userId = tokenPayload.getUserId(); + + ObjectMap auditParams = new ObjectMap() + .append("workflowList", workflowList) + .append("queryOptions", queryOptions) + .append("ignoreException", ignoreException) + .append("token", token); + String operationUuid = UuidUtils.generateOpenCgaUuid(UuidUtils.Entity.AUDIT); + auditManager.initAuditBatch(operationUuid); + QueryOptions options = queryOptions != null ? new QueryOptions(queryOptions) : new QueryOptions(); + try { + OpenCGAResult result = OpenCGAResult.empty(Workflow.class); + options.remove(QueryOptions.LIMIT); + InternalGetDataResult responseResult = internalGet(organizationId, workflowList, null, options, userId, + ignoreException); + + Map.Missing> missingMap = new HashMap<>(); + if (responseResult.getMissing() != null) { + missingMap = responseResult.getMissing().stream() + .collect(Collectors.toMap(InternalGetDataResult.Missing::getId, Function.identity())); + } + + List> versionedResults = responseResult.getVersionedResults(); + for (int i = 0; i < versionedResults.size(); i++) { + String entryId = workflowList.get(i); + if (versionedResults.get(i).isEmpty()) { + Event event = new Event(Event.Type.ERROR, entryId, missingMap.get(entryId).getErrorMsg()); + // Missing + result.getEvents().add(event); + } else { + int size = versionedResults.get(i).size(); + result.append(new OpenCGAResult<>(0, Collections.emptyList(), size, versionedResults.get(i), size)); + + Workflow entry = versionedResults.get(i).get(0); + auditManager.auditInfo(organizationId, operationUuid, userId, Enums.Resource.WORKFLOW, entry.getId(), entry.getUuid(), + "", "", auditParams, new AuditRecord.Status(AuditRecord.Status.Result.SUCCESS)); + } + } + + return result; + } catch (CatalogException e) { + for (String entryId : workflowList) { + auditManager.auditInfo(organizationId, operationUuid, userId, Enums.Resource.WORKFLOW, entryId, "", "", "", + auditParams, new AuditRecord.Status(AuditRecord.Status.Result.ERROR, e.getError())); + } + throw e; + } finally { + auditManager.finishAuditBatch(organizationId, operationUuid); + } + } + +} diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/utils/UuidUtils.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/utils/UuidUtils.java index 8fcab95c4d2..ad0b1e4b0d1 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/utils/UuidUtils.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/utils/UuidUtils.java @@ -50,7 +50,8 @@ public enum Entity { PANEL(10), INTERPRETATION(11), ORGANIZATION(12), - NOTES(13); + NOTES(13), + WORKFLOW(14); private final int mask; diff --git a/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerExternalResource.java b/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerExternalResource.java index e8259db2c4b..5080c54f424 100644 --- a/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerExternalResource.java +++ b/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerExternalResource.java @@ -39,6 +39,8 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; +import java.text.SimpleDateFormat; +import java.util.Date; /** * Created on 05/05/16 @@ -179,4 +181,23 @@ public URI getResourceUri(String resourceName, String targetName) throws IOExcep return resourcePath.toUri(); } + public String createTmpOutdir() throws IOException { + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + // stackTrace[0] = "Thread.currentThread" + // stackTrace[1] = "newOutputUri" + // stackTrace[2] = caller method + String testName = stackTrace[2].getMethodName(); + return createTmpOutdir(testName); + } + + public String createTmpOutdir(String suffix) throws IOException { + if (suffix.endsWith("_")) { + suffix = suffix.substring(0, suffix.length() - 1); + } + String folder = "I_tmp_" + new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss.SSS").format(new Date()) + suffix; + Path tmpOutDir = Paths.get(getCatalogManager().getConfiguration().getJobDir()).resolve(folder); + Files.createDirectories(tmpOutDir); + return tmpOutDir.toString(); + } + } diff --git a/opencga-client/src/main/R/R/Admin-methods.R b/opencga-client/src/main/R/R/Admin-methods.R index 5bfb1b049c2..b44096f424d 100644 --- a/opencga-client/src/main/R/R/Admin-methods.R +++ b/opencga-client/src/main/R/R/Admin-methods.R @@ -43,7 +43,7 @@ setMethod("adminClient", "OpencgaR", function(OpencgaR, user, endpointName, para #' @param count Count the number of elements matching the group. #' @param limit Maximum number of documents (groups) to be returned. #' @param fields Comma separated list of fields by which to group by. - #' @param entity Entity to be grouped by. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL'] + #' @param entity Entity to be grouped by. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA WORKFLOW FUNCTIONAL'] #' @param action Action performed. #' @param before Object before update. #' @param after Object after update. diff --git a/opencga-client/src/main/R/R/Study-methods.R b/opencga-client/src/main/R/R/Study-methods.R index ba022ea666e..ba5ba51c6c3 100644 --- a/opencga-client/src/main/R/R/Study-methods.R +++ b/opencga-client/src/main/R/R/Study-methods.R @@ -118,7 +118,7 @@ setMethod("studyClient", "OpencgaR", function(OpencgaR, group, id, members, stud #' @param operationId Audit operation UUID. #' @param userId User ID. #' @param action Action performed by the user. - #' @param resource Resource involved. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL'] + #' @param resource Resource involved. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA WORKFLOW FUNCTIONAL'] #' @param resourceId Resource ID. #' @param resourceUuid resource UUID. #' @param status Filter by status. Allowed values: ['SUCCESS ERROR'] diff --git a/opencga-client/src/main/javascript/Admin.js b/opencga-client/src/main/javascript/Admin.js index 5538e904dfc..b096169fe0e 100644 --- a/opencga-client/src/main/javascript/Admin.js +++ b/opencga-client/src/main/javascript/Admin.js @@ -37,7 +37,7 @@ export default class Admin extends OpenCGAParentClass { /** Group by operation * @param {String} fields - Comma separated list of fields by which to group by. * @param {"AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS - * INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL"} entity - Entity to be grouped by. + * INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA WORKFLOW FUNCTIONAL"} entity - Entity to be grouped by. * @param {Object} [params] - The Object containing the following optional parameters: * @param {Boolean} [params.count] - Count the number of elements matching the group. * @param {Number} [params.limit = "50"] - Maximum number of documents (groups) to be returned. The default value is 50. diff --git a/opencga-client/src/main/javascript/Study.js b/opencga-client/src/main/javascript/Study.js index 2c2b6abf793..d6eead290d4 100644 --- a/opencga-client/src/main/javascript/Study.js +++ b/opencga-client/src/main/javascript/Study.js @@ -119,7 +119,7 @@ export default class Study extends OpenCGAParentClass { * @param {String} [params.userId] - User ID. * @param {String} [params.action] - Action performed by the user. * @param {"AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS - * INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL"} [params.resource] - Resource involved. + * INTERPRETATION VARIANT ALIGNMENT CLINICAL EXPRESSION RGA WORKFLOW FUNCTIONAL"} [params.resource] - Resource involved. * @param {String} [params.resourceId] - Resource ID. * @param {String} [params.resourceUuid] - resource UUID. * @param {"SUCCESS ERROR"} [params.status] - Filter by status. diff --git a/opencga-client/src/main/python/pyopencga/rest_clients/admin_client.py b/opencga-client/src/main/python/pyopencga/rest_clients/admin_client.py index 9702e5d6463..7169a5a04c8 100644 --- a/opencga-client/src/main/python/pyopencga/rest_clients/admin_client.py +++ b/opencga-client/src/main/python/pyopencga/rest_clients/admin_client.py @@ -27,7 +27,7 @@ def group_by_audit(self, fields, entity, **options): :param str entity: Entity to be grouped by. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION - VARIANT ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL'] + VARIANT ALIGNMENT CLINICAL EXPRESSION RGA WORKFLOW FUNCTIONAL'] (REQUIRED) :param str fields: Comma separated list of fields by which to group by. (REQUIRED) diff --git a/opencga-client/src/main/python/pyopencga/rest_clients/study_client.py b/opencga-client/src/main/python/pyopencga/rest_clients/study_client.py index 639f20c6b53..b399d3b0529 100644 --- a/opencga-client/src/main/python/pyopencga/rest_clients/study_client.py +++ b/opencga-client/src/main/python/pyopencga/rest_clients/study_client.py @@ -139,7 +139,7 @@ def search_audit(self, study, **options): :param str resource: Resource involved. Allowed values: ['AUDIT NOTE ORGANIZATION USER PROJECT STUDY FILE SAMPLE JOB INDIVIDUAL COHORT DISEASE_PANEL FAMILY CLINICAL_ANALYSIS INTERPRETATION VARIANT - ALIGNMENT CLINICAL EXPRESSION RGA NEXTFLOW FUNCTIONAL'] + ALIGNMENT CLINICAL EXPRESSION RGA WORKFLOW FUNCTIONAL'] :param str resource_id: Resource ID. :param str resource_uuid: resource UUID. :param str status: Filter by status. Allowed values: ['SUCCESS ERROR'] diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/models/common/Enums.java b/opencga-core/src/main/java/org/opencb/opencga/core/models/common/Enums.java index 04981030ebc..b82f7669af2 100644 --- a/opencga-core/src/main/java/org/opencb/opencga/core/models/common/Enums.java +++ b/opencga-core/src/main/java/org/opencb/opencga/core/models/common/Enums.java @@ -79,7 +79,7 @@ public enum Resource { CLINICAL, EXPRESSION, RGA, - NEXTFLOW, + WORKFLOW, FUNCTIONAL; public List getFullPermissionList() { diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlow.java b/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlow.java deleted file mode 100644 index af4a527ef1d..00000000000 --- a/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/NextFlow.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.opencb.opencga.core.models.nextflow; - -public class NextFlow { - - private String id; - private int version; - private String script; - - public NextFlow() { - } - - public NextFlow(String id, int version, String script) { - this.id = id; - this.version = version; - this.script = script; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder("NextFlow{"); - sb.append("id='").append(id).append('\''); - sb.append(", version=").append(version); - sb.append(", script='").append(script).append('\''); - sb.append('}'); - return sb.toString(); - } - - public String getId() { - return id; - } - - public NextFlow setId(String id) { - this.id = id; - return this; - } - - public int getVersion() { - return version; - } - - public NextFlow setVersion(int version) { - this.version = version; - return this; - } - - public String getScript() { - return script; - } - - public NextFlow setScript(String script) { - this.script = script; - return this; - } -} diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/Workflow.java b/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/Workflow.java new file mode 100644 index 00000000000..726cfeebe00 --- /dev/null +++ b/opencga-core/src/main/java/org/opencb/opencga/core/models/nextflow/Workflow.java @@ -0,0 +1,165 @@ +package org.opencb.opencga.core.models.nextflow; + +import org.opencb.opencga.core.models.PrivateFields; + +import java.util.List; + +public class Workflow extends PrivateFields { + + private String id; + private String uuid; + private int version; + private Type type; + private String commandLine; + private List