Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK-7142 - OOME running calculate_pedigree_graph #2532

Open
wants to merge 7 commits into
base: release-2.12.x
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
import org.opencb.commons.exec.Command;
import org.opencb.commons.utils.DockerUtils;
import org.opencb.opencga.analysis.tools.OpenCgaToolScopeStudy;
import org.opencb.opencga.catalog.db.api.DBIterator;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.catalog.managers.FamilyManager;
import org.opencb.opencga.catalog.utils.PedigreeGraphUtils;
import org.opencb.opencga.core.api.ParamConstants;
import org.opencb.opencga.core.exceptions.ToolException;
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.family.Family;
import org.opencb.opencga.core.response.OpenCGAResult;
import org.opencb.opencga.core.tools.annotations.Tool;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

@Tool(id = PedigreeGraphInitAnalysis.ID, resource = Enums.Resource.FAMILY)
public class PedigreeGraphInitAnalysis extends OpenCgaToolScopeStudy {
Expand Down Expand Up @@ -68,30 +68,27 @@ protected void run() throws ToolException {

step(getId(), () -> {
// Get all families from that study
QueryOptions queryOptions = new QueryOptions(QueryOptions.INCLUDE, Arrays.asList("id", "members", "pedigreeGraph"));
OpenCGAResult<Family> results = catalogManager.getFamilyManager().search(study, new Query(), queryOptions, token);

// Get families to update by filtering
List<Family> familiesToUpdate = results.getResults().stream().filter(family -> PedigreeGraphUtils.hasMinTwoGenerations(family)
&& (family.getPedigreeGraph() == null || StringUtils.isEmpty(family.getPedigreeGraph().getBase64())))
.collect(Collectors.toList());

// Update those families
for (Family family: familiesToUpdate) {
try {
logger.info("Updating pedigree graph for family '{}'", family.getId());
catalogManager.getFamilyManager().update(study, family.getId(), null,
new QueryOptions(ParamConstants.FAMILY_UPDATE_PEDIGREEE_GRAPH_PARAM, true), token);
String msg = "Updated pedigree graph for family '" + family.getId() + "'";
logger.info(msg);
addInfo(msg);
} catch (CatalogException e) {
String msg = "Something wrong happened when updating pedigree graph for family '" + family.getId() + "'. Error: "
+ e.getMessage();
logger.info(msg);
addWarning(msg);
try (DBIterator<Family> iterator = catalogManager.getFamilyManager().iterator(study, new Query(), FamilyManager.INCLUDE_FAMILY_FOR_PEDIGREE, token)) {
while (iterator.hasNext()) {
Family family = iterator.next();
if (PedigreeGraphUtils.hasMinTwoGenerations(family)
&& (family.getPedigreeGraph() == null || StringUtils.isEmpty(family.getPedigreeGraph().getBase64()))) {
try {
logger.info("Updating pedigree graph for family '{}'", family.getId());
catalogManager.getFamilyManager().update(study, family.getId(), null,
new QueryOptions(ParamConstants.FAMILY_UPDATE_PEDIGREEE_GRAPH_PARAM, true), token);
String msg = "Updated pedigree graph for family '" + family.getId() + "'";
logger.info(msg);
addInfo(msg);
} catch (CatalogException e) {
String msg = "Something wrong happened when updating pedigree graph for family '" + family.getId() + "'.";
logger.warn(msg, e);
addWarning(msg + " Error: " + e.getMessage());
}
}
}
}

logger.info("Finished updating pedigree graph for families of the study '{}'", study);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.opencga.analysis.family.PedigreeGraphInitAnalysis;
import org.opencb.opencga.catalog.db.api.DBIterator;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.catalog.managers.FamilyManager;
import org.opencb.opencga.catalog.migration.Migration;
import org.opencb.opencga.catalog.migration.MigrationRun;
import org.opencb.opencga.catalog.migration.MigrationTool;
Expand Down Expand Up @@ -35,15 +37,15 @@ protected void run() throws Exception {
MigrationRun migrationRun = getMigrationRun();

// Map study studyFqn -> job
Map<String, Job> jobs = new HashMap<>();
Map<String, List<Job>> jobsMap = new HashMap<>();
for (JobReferenceParam jobReference : migrationRun.getJobs()) {
Job job = catalogManager.getJobManager().get(jobReference.getStudyId(), jobReference.getId(), new QueryOptions(), token)
.first();
logger.info("Reading already executed job '{}' for study '{}' with status '{}'",
job.getId(),
job.getStudy().getId(),
job.getInternal().getStatus().getId());
jobs.put(job.getStudy().getId(), job);
jobsMap.computeIfAbsent(job.getStudy().getId(), k -> new ArrayList<>()).add(job);
}

Set<String> studies = new LinkedHashSet<>(getStudies());
Expand All @@ -56,20 +58,38 @@ protected void run() throws Exception {
StringUtils.join(studies, ", "));

for (String study : studies) {
Job job = jobs.get(study);
if (job != null) {
String status = job.getInternal().getStatus().getId();
if (status.equals(Enums.ExecutionStatus.DONE)) {
// Skip this study. Already migrated
logger.info("Study {} already migrated", study);
continue;
} else if (status.equals(Enums.ExecutionStatus.ERROR) || status.equals(Enums.ExecutionStatus.ABORTED)) {
logger.info("Retry migration job for study {}", study);
} else {
logger.info("Job {} for migrating study {} in status {}. Wait for completion", job.getId(), study, status);
continue;
List<Job> jobs = jobsMap.get(study);
boolean migrated = false;
boolean running = false;
List<Job> errorJobs = new ArrayList<>();
if (jobs != null) {
for (Job job : jobs) {
String status = job.getInternal().getStatus().getId();
if (status.equals(Enums.ExecutionStatus.DONE)) {
migrated = true;
} else if (status.equals(Enums.ExecutionStatus.ERROR) || status.equals(Enums.ExecutionStatus.ABORTED)) {
logger.info("Retry migration job for study {}", study);
errorJobs.add(job);
} else {
running = true;
logger.info("Job {} for migrating study {} in status {}. Wait for completion", job.getId(), study, status);
}
}
getMigrationRun().removeJob(job);
}
if (running) {
// Skip this study. There is a job running
logger.info("Study {} has a job running. Skip", study);
continue;
}
// Remove error jobs if any
for (Job errorJob : errorJobs) {
logger.info("Remove error job {} for study {}", errorJob.getId(), study);
getMigrationRun().removeJob(errorJob);
}
if (migrated) {
// Skip this study. Already migrated
logger.info("Study {} already migrated", study);
continue;
}

logger.info("Adding new job to migrate/initialize pedigree graph for study {}", study);
Expand All @@ -84,16 +104,18 @@ protected void run() throws Exception {
public List<String> getStudies() throws CatalogException {
Set<String> studies = new LinkedHashSet<>();
QueryOptions projectOptions = new QueryOptions(QueryOptions.INCLUDE, Arrays.asList("id", "studies"));
QueryOptions familyOptions = new QueryOptions(QueryOptions.INCLUDE, Arrays.asList("id", "members", "pedigreeGraph"));
for (Project project : catalogManager.getProjectManager().search(new Query(), projectOptions, token).getResults()) {
if (CollectionUtils.isNotEmpty(project.getStudies())) {
for (Study study : project.getStudies()) {
String id = study.getFqn();
for (Family family : catalogManager.getFamilyManager().search(id, new Query(), familyOptions, token).getResults()) {
if (PedigreeGraphUtils.hasMinTwoGenerations(family)
&& (family.getPedigreeGraph() == null || StringUtils.isEmpty(family.getPedigreeGraph().getBase64()))) {
studies.add(id);
break;
try (DBIterator<Family> iterator = catalogManager.getFamilyManager().iterator(id, new Query(), FamilyManager.INCLUDE_FAMILY_FOR_PEDIGREE, token)) {
while (iterator.hasNext()) {
Family family = iterator.next();
if (PedigreeGraphUtils.hasMinTwoGenerations(family)
&& (family.getPedigreeGraph() == null || StringUtils.isEmpty(family.getPedigreeGraph().getBase64()))) {
studies.add(id);
break;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opencb.opencga.catalog.exceptions.CatalogDBException;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.catalog.exceptions.CatalogParameterException;
import org.opencb.opencga.catalog.managers.FamilyManager;
import org.opencb.opencga.catalog.managers.IndividualManager;
import org.opencb.opencga.catalog.utils.Constants;
import org.opencb.opencga.catalog.utils.ParamUtils;
Expand Down Expand Up @@ -543,7 +544,7 @@ private PedigreeGraph computePedigreeGraph(ClientSession clientSession, Family f
Query query = new Query()
.append(QueryParams.UID.key(), family.getUid())
.append(QueryParams.STUDY_UID.key(), family.getStudyUid());
Family tmpFamily = get(clientSession, query, QueryOptions.empty()).first();
Family tmpFamily = get(clientSession, query, FamilyManager.INCLUDE_FAMILY_FOR_PEDIGREE).first();

try {
return PedigreeGraphUtils.getPedigreeGraph(tmpFamily,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,8 @@ private MongoDBIterator<Document> getMongoCursor(ClientSession clientSession, Qu
qOptions = new QueryOptions();
}

qOptions = removeInnerProjections(qOptions, QueryParams.SAMPLES.key());
qOptions = removeInnerProjections(qOptions, Arrays.asList(QueryParams.SAMPLES.key(), QueryParams.FATHER.key(),
QueryParams.MOTHER.key()));
qOptions = removeAnnotationProjectionOptions(qOptions);

// FIXME we should be able to remove this now safely
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,42 +558,70 @@ protected QueryOptions extractNestedOptions(QueryOptions options, String key) {
* @return new QueryOptions after removing the inner projectionKey projections.
*/
protected QueryOptions removeInnerProjections(QueryOptions options, String projectionKey) {
return removeInnerProjections(options, Collections.singletonList(projectionKey));
}

/**
* Removes any other entity projections made. This method should be called by any entity containing inner entities:
* Family -> Individual; Individual -> Sample; File -> Sample; Cohort -> Sample
*
* @param options current query options object.
* @param projectionKeyList Projection keys to be removed from the query options.
* @return new QueryOptions after removing the inner projectionKey projections.
*/
protected QueryOptions removeInnerProjections(QueryOptions options, List<String> projectionKeyList) {
QueryOptions queryOptions = ParamUtils.defaultObject(options, QueryOptions::new);

if (queryOptions.containsKey(QueryOptions.INCLUDE)) {
List<String> includeList = queryOptions.getAsStringList(QueryOptions.INCLUDE);
List<String> newInclude = new ArrayList<>(includeList.size());
boolean projectionKeyExcluded = false;
Map<String, Boolean> projectionKeyExcluded = new HashMap<>(projectionKeyList.size());
for (String projectionKey : projectionKeyList) {
projectionKeyExcluded.put(projectionKey, false);
}

for (String include : includeList) {
if (!include.startsWith(projectionKey + ".")) {
boolean excluded = false;
for (String projectionKey : projectionKeyList) {
if (include.startsWith(projectionKey + ".")) {
projectionKeyExcluded.put(projectionKey, true);
excluded = true;
break;
}
}
if (!excluded) {
newInclude.add(include);
} else {
projectionKeyExcluded = true;
}
}
if (newInclude.isEmpty()) {
queryOptions.put(QueryOptions.INCLUDE, Arrays.asList(ID, projectionKey));
List<String> tmpInclude = new ArrayList<>(projectionKeyList.size() + 1);
tmpInclude.addAll(projectionKeyList);
tmpInclude.add(ID);
queryOptions.put(QueryOptions.INCLUDE, tmpInclude);
} else {
if (projectionKeyExcluded) {
newInclude.add(projectionKey);
for (Map.Entry<String, Boolean> entry : projectionKeyExcluded.entrySet()) {
if (entry.getValue()) {
newInclude.add(entry.getKey());
}
}
queryOptions.put(QueryOptions.INCLUDE, newInclude);
}
}
if (queryOptions.containsKey(QueryOptions.EXCLUDE)) {
List<String> excludeList = queryOptions.getAsStringList(QueryOptions.EXCLUDE);
List<String> newExclude = new ArrayList<>(excludeList.size());
for (String exclude : excludeList) {
if (!exclude.startsWith(projectionKey + ".")) {
newExclude.add(exclude);
}
}
if (newExclude.isEmpty()) {
queryOptions.remove(QueryOptions.EXCLUDE);
} else {
queryOptions.put(QueryOptions.EXCLUDE, newExclude);
}
}
// TODO: This code seems unnecessary. We should remove it if we don't find any issue.
// if (queryOptions.containsKey(QueryOptions.EXCLUDE)) {
// List<String> excludeList = queryOptions.getAsStringList(QueryOptions.EXCLUDE);
// List<String> newExclude = new ArrayList<>(excludeList.size());
// for (String exclude : excludeList) {
// if (!exclude.startsWith(projectionKey + ".")) {
// newExclude.add(exclude);
// }
// }
// if (newExclude.isEmpty()) {
// queryOptions.remove(QueryOptions.EXCLUDE);
// } else {
// queryOptions.put(QueryOptions.EXCLUDE, newExclude);
// }
// }

return queryOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ public class FamilyManager extends AnnotationSetManager<Family> {
FamilyDBAdaptor.QueryParams.ID.key(), FamilyDBAdaptor.QueryParams.UID.key(), FamilyDBAdaptor.QueryParams.UUID.key(),
FamilyDBAdaptor.QueryParams.VERSION.key(), FamilyDBAdaptor.QueryParams.STUDY_UID.key(),
FamilyDBAdaptor.QueryParams.MEMBERS.key()));
public static final QueryOptions INCLUDE_FAMILY_FOR_PEDIGREE = keepFieldsInQueryOptions(INCLUDE_FAMILY_IDS, Arrays.asList(
FamilyDBAdaptor.QueryParams.PEDIGREE_GRAPH.key(), FamilyDBAdaptor.QueryParams.DISORDERS.key(),
FamilyDBAdaptor.QueryParams.MEMBERS.key() + "." + IndividualDBAdaptor.QueryParams.ID.key(),
FamilyDBAdaptor.QueryParams.MEMBERS.key() + "." + IndividualDBAdaptor.QueryParams.SEX.key(),
FamilyDBAdaptor.QueryParams.MEMBERS.key() + "." + IndividualDBAdaptor.QueryParams.DISORDERS.key(),
FamilyDBAdaptor.QueryParams.MEMBERS.key() + "." + IndividualDBAdaptor.QueryParams.LIFE_STATUS.key(),
FamilyDBAdaptor.QueryParams.MEMBERS.key() + "." + IndividualDBAdaptor.QueryParams.FATHER.key()
+ "." + IndividualDBAdaptor.QueryParams.ID.key(),
FamilyDBAdaptor.QueryParams.MEMBERS.key() + "." + IndividualDBAdaptor.QueryParams.MOTHER.key()
+ "." + IndividualDBAdaptor.QueryParams.ID.key()));
protected static Logger logger = LoggerFactory.getLogger(FamilyManager.class);
private final String defaultFacet = "creationYear>>creationMonth;status;phenotypes;expectedSize;numMembers[0..20]:2";
private UserManager userManager;
Expand Down
Loading