Skip to content

Commit

Permalink
Purge obsolete mixture prior codes
Browse files Browse the repository at this point in the history
  • Loading branch information
gaow committed Oct 11, 2023
1 parent cf3c4f6 commit 12af9c8
Show file tree
Hide file tree
Showing 2 changed files with 335 additions and 655 deletions.
335 changes: 335 additions & 0 deletions multivariate-fine-mapping/mash_data_preprocessing.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
},
"source": [
"# Data munggling for multi-variant summary stats"
]
},
{
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
},
"source": [
"## Minimal working example"
]
},
{
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
},
"source": [
"To see the input requirements and output data formats, please [download a minimal working example here](https://drive.google.com/file/d/1838xUOQuWTszQ0WJGXNiJMszY05cw3RS/view?usp=sharing), and run the following:\n",
"\n",
"### Merge univariate results\n",
"\n",
"```\n",
"sos run mixture_prior.ipynb merge \\\n",
" --analysis-units <FIXME> \\\n",
" --plink-sumstats <FIXME> \\\n",
" --name gtex_mixture\n",
"```\n",
"\n",
"### Select and merge univariate effects\n",
"\n",
"```\n",
"m=/path/to/data\n",
"cd $m && ls *.rds | sed 's/\\.rds//g' > analysis_units.txt && cd -\n",
"sos run mixture_prior.ipynb extract_effects \\\n",
" --analysis-units $m/analysis_units.txt \\\n",
" --datadir $m --name `basename $m`\n",
"```\n",
"\n",
"Notice that for production use, each `sos run` command should be submitted to the cluster as a job."
]
},
{
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
},
"source": [
"## Global parameters"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"kernel": "SoS"
},
"outputs": [],
"source": [
"[global]\n",
"import os\n",
"# Work directory & output directory\n",
"parameter: cwd = path('./output')\n",
"# The filename prefix for output data\n",
"parameter: name = str\n",
"parameter: mixture_components = ['flash', 'flash_nonneg', 'pca', 'canonical']\n",
"parameter: job_size = 1# Residual correlatoin file\n",
"parameter: resid_cor = path(\".\")\n",
"fail_if(not (resid_cor.is_file() or resid_cor == path('.')), msg = f'Cannot find ``{resid_cor}``')"
]
},
{
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
},
"source": [
"## Merge PLINK univariate association summary statistic to RDS format"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"kernel": "SoS"
},
"outputs": [],
"source": [
"[merge]\n",
"parameter: molecular_pheno = path\n",
"# Analysis units file. For RDS files it can be generated by `ls *.rds | sed 's/\\.rds//g' > analysis_units.txt`\n",
"parameter: analysis_units = path\n",
"regions = [x.strip().split() for x in open(analysis_units).readlines() if x.strip() and not x.strip().startswith('#')]\n",
"input: molecular_pheno, for_each = \"regions\"\n",
"output: f'{cwd:a}/RDS/{_regions[0]}'\n",
"\n",
"task: trunk_workers = 1, trunk_size = job_size, walltime = '4h', mem = '6G', tags = f'{step_name}_{_output:bn}' \n",
"\n",
"R: expand = \"$[ ]\", stderr = f'{_output}.stderr', stdout = f'{_output}.stdout'\n",
" library(\"dplyr\")\n",
" library(\"tibble\")\n",
" library(\"purrr\")\n",
" library(\"readr\")\n",
" molecular_pheno = read_delim($[molecular_pheno:r], delim = \"\\t\")\n",
" molecular_pheno = molecular_pheno%>%mutate(dir = map_chr(`#molc_pheno`,~paste(c(`.x`,\"$[_regions[0]]\"),collapse = \"\")))\n",
" n = nrow(molecular_pheno)\n",
" # For every condition read rds and extract the bhat and sbhat.\n",
" genos = tibble( i = 1:n)\n",
" genos = genos%>%mutate(bhat = map(i, ~readRDS(molecular_pheno[[.x,2]])$bhat%>%as.data.frame%>%rownames_to_column),\n",
" sbhat = map(i, ~readRDS(molecular_pheno[[.x,2]])$sbhat%>%as.data.frame%>%rownames_to_column))\n",
" \n",
" # Join first two conditions\n",
" genos_join_bhat = full_join((genos%>%pull(bhat))[[1]],(genos%>%pull(bhat))[[2]],by = \"rowname\")\n",
" genos_join_sbhat = full_join((genos%>%pull(sbhat))[[1]],(genos%>%pull(sbhat))[[2]],by = \"rowname\")\n",
" \n",
" # If there are more conditions, join the rest\n",
" if(n > 2){\n",
" for(j in 3:n){\n",
" genos_join_bhat = full_join(genos_join_bhat,(genos%>%pull(bhat))[[j]],by = \"rowname\")%>%select(-rowname)%>%as.matrix\n",
" genos_join_sbhat = full_join(genos_join_sbhat,(genos%>%pull(sbhat))[[j]],by = \"rowname\")%>%select(-rowname)%>%as.matrix\n",
" }\n",
" }\n",
" \n",
" name = molecular_pheno%>%mutate(name = map(`#molc_pheno`, ~read.table(text = .x,sep = \"/\")),\n",
" name = map_chr(name, ~.x[,ncol(.x)-2]%>%as.character) )%>%pull(name)\n",
" colnames(genos_join_bhat) = name\n",
" colnames(genos_join_sbhat) = name\n",
" \n",
" \n",
" # save the rds file\n",
" saveRDS(file = \"$[_output]\", list(bhat=genos_join_bhat, sbhat=genos_join_sbhat))"
]
},
{
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
},
"source": [
"## Get top, random and null effects per analysis unit"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"kernel": "SoS"
},
"outputs": [],
"source": [
"# extract data for MASH from summary stats\n",
"[extract_effects_1]\n",
"parameter: table_name = \"\"\n",
"parameter: bhat = \"bhat\"\n",
"parameter: sbhat = \"sbhat\"\n",
"parameter: expected_ncondition = 0\n",
"parameter: datadir = path\n",
"parameter: seed = 999\n",
"parameter: n_random = 4\n",
"parameter: n_null = 4\n",
"parameter: z_only = True\n",
"# Analysis units file. For RDS files it can be generated by `ls *.rds | sed 's/\\.rds//g' > analysis_units.txt`\n",
"parameter: analysis_units = path\n",
"# handle N = per_chunk data-set in one job\n",
"parameter: per_chunk = 1000\n",
"regions = [x.strip().split() for x in open(analysis_units).readlines() if x.strip() and not x.strip().startswith('#')]\n",
"input: [f'{datadir}/{x[0]}.rds' for x in regions], group_by = per_chunk\n",
"output: f\"{cwd}/{name}/cache/{name}_{_index+1}.rds\"\n",
"task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'\n",
"R: expand = \"${ }\"\n",
" set.seed(${seed})\n",
" matxMax <- function(mtx) {\n",
" return(arrayInd(which.max(mtx), dim(mtx)))\n",
" }\n",
" remove_rownames = function(x) {\n",
" for (name in names(x)) rownames(x[[name]]) = NULL\n",
" return(x)\n",
" }\n",
" handle_nan_etc = function(x) {\n",
" x$bhat[which(is.nan(x$bhat))] = 0\n",
" x$sbhat[which(is.nan(x$sbhat) | is.infinite(x$sbhat))] = 1E3\n",
" return(x)\n",
" }\n",
" extract_one_data = function(dat, n_random, n_null, infile) {\n",
" if (is.null(dat)) return(NULL)\n",
" z = abs(dat$${bhat}/dat$${sbhat})\n",
" max_idx = matxMax(z)\n",
" # strong effect samples\n",
" strong = list(bhat = dat$${bhat}[max_idx[1],,drop=F], sbhat = dat$${sbhat}[max_idx[1],,drop=F])\n",
" # random samples excluding the top one\n",
" if (max_idx[1] == 1) {\n",
" sample_idx = 2:nrow(z)\n",
" } else if (max_idx[1] == nrow(z)) {\n",
" sample_idx = 1:(max_idx[1]-1)\n",
" } else {\n",
" sample_idx = c(1:(max_idx[1]-1), (max_idx[1]+1):nrow(z))\n",
" }\n",
" random_idx = sample(sample_idx, min(n_random, length(sample_idx)), replace = F)\n",
" random = list(bhat = dat$${bhat}[random_idx,,drop=F], sbhat = dat$${sbhat}[random_idx,,drop=F])\n",
" # null samples defined as |z| < 2\n",
" null.id = which(apply(abs(z), 1, max) < 2)\n",
" if (length(null.id) == 0) {\n",
" warning(paste(\"Null data is empty for input file\", infile))\n",
" null = list()\n",
" } else {\n",
" null_idx = sample(null.id, min(n_null, length(null.id)), replace = F)\n",
" null = list(bhat = dat$${bhat}[null_idx,,drop=F], sbhat = dat$${sbhat}[null_idx,,drop=F])\n",
" }\n",
" dat = (list(random = remove_rownames(random), null = remove_rownames(null), strong = remove_rownames(strong)))\n",
" dat$random = handle_nan_etc(dat$random)\n",
" dat$null = handle_nan_etc(dat$null)\n",
" dat$strong = handle_nan_etc(dat$strong)\n",
" return(dat)\n",
" }\n",
" reformat_data = function(dat, z_only = TRUE) {\n",
" # make output consistent in format with \n",
" # https://github.com/stephenslab/gtexresults/blob/master/workflows/mashr_flashr_workflow.ipynb \n",
" res = list(random.z = dat$random$bhat/dat$random$sbhat, \n",
" strong.z = dat$strong$bhat/dat$strong$sbhat, \n",
" null.z = dat$null$bhat/dat$null$sbhat)\n",
" if (!z_only) {\n",
" res = c(res, list(random.b = dat$random$bhat,\n",
" strong.b = dat$strong$bhat,\n",
" null.b = dat$null$bhat,\n",
" null.s = dat$null$sbhat,\n",
" random.s = dat$random$sbhat,\n",
" strong.s = dat$strong$sbhat))\n",
" }\n",
" return(res)\n",
" }\n",
" merge_data = function(res, one_data) {\n",
" if (length(res) == 0) {\n",
" return(one_data)\n",
" } else if (is.null(one_data)) {\n",
" return(res)\n",
" } else {\n",
" for (d in names(one_data)) {\n",
" if (is.null(one_data[[d]])) {\n",
" next\n",
" } else {\n",
" res[[d]] = rbind(res[[d]], one_data[[d]])\n",
" }\n",
" }\n",
" return(res)\n",
" }\n",
" }\n",
" res = list()\n",
" for (f in c(${_input:r,})) {\n",
" # If cannot read the input for some reason then we just skip it, assuming we have other enough data-sets to use.\n",
" dat = tryCatch(readRDS(f), error = function(e) return(NULL))${(\"$\"+table_name) if table_name != \"\" else \"\"}\n",
" if (is.null(dat)) {\n",
" message(paste(\"Skip loading file\", f, \"due to load failure.\"))\n",
" next\n",
" }\n",
" if (${expected_ncondition} > 0 && (ncol(dat$${bhat}) != ${expected_ncondition} || ncol(dat$${sbhat}) != ${expected_ncondition})) {\n",
" message(paste(\"Skip loading file\", f, \"because it has\", ncol(dat$${bhat}), \"columns different from required\", ${expected_ncondition}))\n",
" next\n",
" }\n",
" res = merge_data(res, reformat_data(extract_one_data(dat, ${n_random}, ${n_null}, f), ${\"TRUE\" if z_only else \"FALSE\"}))\n",
" }\n",
" saveRDS(res, ${_output:r})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"kernel": "SoS"
},
"outputs": [],
"source": [
"[extract_effects_2]\n",
"input: group_by = \"all\"\n",
"output: f\"{cwd}/{name}.rds\"\n",
"task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '16G', cores = 1, tags = f'{_output:bn}'\n",
"R: expand = \"${ }\"\n",
" merge_data = function(res, one_data) {\n",
" if (length(res) == 0) {\n",
" return(one_data)\n",
" } else {\n",
" for (d in names(one_data)) {\n",
" res[[d]] = rbind(res[[d]], one_data[[d]])\n",
" }\n",
" return(res)\n",
" }\n",
" }\n",
" dat = list()\n",
" for (f in c(${_input:r,})) {\n",
" dat = merge_data(dat, readRDS(f))\n",
" }\n",
" # compute empirical covariance XtX\n",
" X = dat$strong.z\n",
" X[is.na(X)] = 0\n",
" dat$XtX = t(as.matrix(X)) %*% as.matrix(X) / nrow(X)\n",
" saveRDS(dat, ${_output:r})"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "SoS",
"language": "sos",
"name": "sos"
},
"language_info": {
"codemirror_mode": "sos",
"file_extension": ".sos",
"mimetype": "text/x-sos",
"name": "sos",
"nbconvert_exporter": "sos_notebook.converter.SoS_Exporter",
"pygments_lexer": "sos"
},
"sos": {
"kernels": [
[
"R"
],
[
"SoS"
]
],
"version": "0.22.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Loading

0 comments on commit 12af9c8

Please sign in to comment.