diff --git a/examples/apol1/apol1-classifier/assets/aggregate_apol1_group_stats.py b/examples/apol1/apol1-classifier/assets/aggregate_apol1_group_stats.py new file mode 100644 index 0000000..4bb49aa --- /dev/null +++ b/examples/apol1/apol1-classifier/assets/aggregate_apol1_group_stats.py @@ -0,0 +1,51 @@ +import argparse +import csv +from collections import Counter + +GROUPS = ["G0/G0", "G1/G0", "G1/G1", "G2/G0", "G2/G1", "G2/G2"] + + +def parse_args(): + parser = argparse.ArgumentParser(description="Count distinct APOL1 diploid classification groups") + parser.add_argument("--input", required=True, help="Input TSV from combined classifier output") + parser.add_argument("--output", required=True, help="Output TSV path") + return parser.parse_args() + + +def normalize_status(apol1_status): + status = (apol1_status or "").strip() + if not status: + return None + parts = [p.strip() for p in status.split("/")] + if len(parts) != 2: + return None + order = {"G2": 0, "G1": 1, "G0": 2} + sorted_parts = sorted(parts, key=lambda p: order.get(p, 99)) + return f"{sorted_parts[0]}/{sorted_parts[1]}" + + +def main(): + args = parse_args() + + participant_status = {} + with open(args.input, "r", encoding="utf-8") as fh: + reader = csv.DictReader(fh, delimiter="\t") + for row in reader: + pid = (row.get("participant_id") or "").strip() + status = (row.get("apol1_status") or "").strip() + if not pid or not status: + continue + if pid not in participant_status: + participant_status[pid] = normalize_status(status) + + counts = Counter(s for s in participant_status.values() if s) + + with open(args.output, "w", encoding="utf-8", newline="") as out_fh: + writer = csv.DictWriter(out_fh, fieldnames=["classification", "count"], delimiter="\t") + writer.writeheader() + for group in GROUPS: + writer.writerow({"classification": group, "count": counts.get(group, 0)}) + + +if __name__ == "__main__": + main() diff --git a/examples/apol1/apol1-classifier/flow.yaml b/examples/apol1/apol1-classifier/flow.yaml index 0a6d451..2ed9ba7 100644 --- a/examples/apol1/apol1-classifier/flow.yaml +++ b/examples/apol1/apol1-classifier/flow.yaml @@ -17,6 +17,7 @@ spec: population_stats: File(result_APOL1_stats.tsv) classification_stats: File(result_APOL1_classification_stats.tsv) apol1_status: File(result_APOL1_status.tsv) + group_stats: File(result_APOL1_group_stats.tsv) store: counts_sql: kind: sql diff --git a/examples/apol1/apol1-classifier/module.yaml b/examples/apol1/apol1-classifier/module.yaml index 61381a7..60b109b 100644 --- a/examples/apol1/apol1-classifier/module.yaml +++ b/examples/apol1/apol1-classifier/module.yaml @@ -11,7 +11,7 @@ spec: kind: nextflow entrypoint: workflow.nf template: dynamic-nextflow - image: ghcr.io/openmined/bioscript:0.1.6 + image: ghcr.io/openmined/bioscript:0.1.7 inputs: - name: participants type: List[GenotypeRecord] @@ -46,9 +46,16 @@ spec: format: kind: tsv path: result_APOL1_status.tsv + - name: group_stats + type: File + description: Absolute counts per distinct APOL1 diploid classification group + format: + kind: tsv + path: result_APOL1_group_stats.tsv parameters: [] assets: - path: classify_apol1.py - path: aggregate_population_stats.py - path: aggregate_classification_stats.py - path: aggregate_apol1_status.py + - path: aggregate_apol1_group_stats.py diff --git a/examples/apol1/apol1-classifier/workflow.nf b/examples/apol1/apol1-classifier/workflow.nf index a6171c7..52e6ff6 100644 --- a/examples/apol1/apol1-classifier/workflow.nf +++ b/examples/apol1/apol1-classifier/workflow.nf @@ -51,11 +51,18 @@ workflow USER { aggregated ) + // Count distinct diploid classification groups (G0/G0, G1/G0, etc.) + def group_stats_ch = aggregate_apol1_group_stats( + Channel.value(assetsDirPath), + aggregated + ) + emit: classification_result = aggregated population_stats = population_stats_ch classification_stats = classification_stats_ch apol1_status = apol1_status_ch + group_stats = group_stats_ch } process apol1_classifier { @@ -152,3 +159,22 @@ process aggregate_apol1_status { --output result_APOL1_status.tsv """ } + +process aggregate_apol1_group_stats { + container 'ghcr.io/openmined/bioscript:0.1.7' + publishDir params.results_dir, mode: 'copy', overwrite: true + + input: + path assets_dir + path aggregated_results + + output: + path "result_APOL1_group_stats.tsv" + + script: + """ + python3 "${assets_dir}/aggregate_apol1_group_stats.py" \ + --input "${aggregated_results}" \ + --output result_APOL1_group_stats.tsv + """ +} diff --git a/examples/apol1/apol1_dev.ipynb b/examples/apol1/apol1_dev.ipynb index 020840f..6c6a651 100644 --- a/examples/apol1/apol1_dev.ipynb +++ b/examples/apol1/apol1_dev.ipynb @@ -411,7 +411,7 @@ { "data": { "text/plain": [ - "BioVaultProject(name='apol1-classifier', author='madhava@openmined.org', workflow='workflow.nf', description='Classification of APOL1 genotypes (G0, G1, G2) for kidney disease risk assessment.', template=, version='0.1.1', assets=['classify_apol1.py', 'aggregate_population_stats.py', 'aggregate_classification_stats.py', 'aggregate_apol1_status.py'], parameters=[], inputs=[Input(name='participants', type='List[GenotypeRecord]', description='CSV/TSV with participant_id and genotype_file columns', format='csv', path=None, mapping={'participant_id': 'participant_id', 'genotype_file': 'genotype_file'}, cli_flag=None)], outputs=[Output(name='classification_result', type='File', description='APOL1 genotype classification', format='tsv', path='result_APOL1.tsv', cli_flag=None), Output(name='population_stats', type='File', description='APOL1 population allele statistics (aggregated)', format='tsv', path='result_APOL1_stats.tsv', cli_flag=None), Output(name='classification_stats', type='File', description='APOL1 status counts by allele class (G0/G1/G2) with hetero/homo split', format='tsv', path='result_APOL1_classification_stats.tsv', cli_flag=None), Output(name='apol1_status', type='File', description='Per-participant APOL1 status summary', format='tsv', path='result_APOL1_status.tsv', cli_flag=None)], processes=[ProcessDefinition(name='apol1_classifier', script='classify_apol1.py', container='ghcr.io/openmined/bioscript:0.1.6', kind='bioscript')], docker_image='ghcr.io/openmined/bioscript:0.1.6', docker_platform='linux/amd64')" + "BioVaultProject(name='apol1-classifier', author='madhava@openmined.org', workflow='workflow.nf', description='Classification of APOL1 genotypes (G0, G1, G2) for kidney disease risk assessment.', template=, version='0.1.1', assets=['classify_apol1.py', 'aggregate_population_stats.py', 'aggregate_classification_stats.py', 'aggregate_apol1_status.py', 'aggregate_apol1_group_stats.py'], parameters=[], inputs=[Input(name='participants', type='List[GenotypeRecord]', description='CSV/TSV with participant_id and genotype_file columns', format='csv', path=None, mapping={'participant_id': 'participant_id', 'genotype_file': 'genotype_file'}, cli_flag=None)], outputs=[Output(name='classification_result', type='File', description='APOL1 genotype classification', format='tsv', path='result_APOL1.tsv', cli_flag=None), Output(name='population_stats', type='File', description='APOL1 population allele statistics (aggregated)', format='tsv', path='result_APOL1_stats.tsv', cli_flag=None), Output(name='classification_stats', type='File', description='APOL1 status counts by allele class (G0/G1/G2) with hetero/homo split', format='tsv', path='result_APOL1_classification_stats.tsv', cli_flag=None), Output(name='apol1_status', type='File', description='Per-participant APOL1 status summary', format='tsv', path='result_APOL1_status.tsv', cli_flag=None), Output(name='group_stats', type='File', description='Absolute counts per distinct APOL1 diploid classification group', format='tsv', path='result_APOL1_group_stats.tsv', cli_flag=None)], processes=[ProcessDefinition(name='apol1_classifier', script='classify_apol1.py', container='ghcr.io/openmined/bioscript:0.1.7', kind='bioscript')], docker_image='ghcr.io/openmined/bioscript:0.1.7', docker_platform='linux/amd64')" ] }, "execution_count": 12, @@ -431,6 +431,7 @@ " 'apol1-classifier/assets/aggregate_population_stats.py',\n", " 'apol1-classifier/assets/aggregate_classification_stats.py',\n", " 'apol1-classifier/assets/aggregate_apol1_status.py',\n", + " 'apol1-classifier/assets/aggregate_apol1_group_stats.py',\n", " ],\n", " inputs=[\n", " {\n", @@ -473,6 +474,13 @@ " 'format': 'tsv',\n", " 'path': 'result_APOL1_status.tsv',\n", " },\n", + " {\n", + " 'name': 'group_stats',\n", + " 'type': 'File',\n", + " 'description': 'Absolute counts per distinct APOL1 diploid classification group',\n", + " 'format': 'tsv',\n", + " 'path': 'result_APOL1_group_stats.tsv',\n", + " },\n", " ],\n", " version=\"0.1.1\",\n", " description=\"Classification of APOL1 genotypes (G0, G1, G2) for kidney disease risk assessment.\",\n", @@ -488,7 +496,7 @@ { "data": { "text/plain": [ - "4227" + "4944" ] }, "execution_count": 13, @@ -508,12 +516,12 @@ "if 'aggregate_population_stats' not in text:\n", " text = text.replace(\n", " \" def aggregated = aggregate_results(\\n per_participant_results.collect()\\n )\\n\\n emit:\\n classification_result = aggregated\\n}\",\n", - " \" def aggregated = aggregate_results(\\n per_participant_results.collect()\\n )\\n\\n // Aggregate population statistics\\n def population_stats_ch = aggregate_population_stats(\\n Channel.value(assetsDirPath),\\n aggregated\\n )\\n\\n // Aggregate APOL1 status statistics (G0/G1/G2 with hetero/homo counts)\\n def classification_stats_ch = aggregate_classification_stats(\\n Channel.value(assetsDirPath),\\n aggregated\\n )\\n\\n // Emit per-participant APOL1 status summary.\\n def apol1_status_ch = aggregate_apol1_status(\\n Channel.value(assetsDirPath),\\n aggregated\\n )\\n\\n emit:\\n classification_result = aggregated\\n population_stats = population_stats_ch\\n classification_stats = classification_stats_ch\\n apol1_status = apol1_status_ch\\n}\",\n", + " \" def aggregated = aggregate_results(\\n per_participant_results.collect()\\n )\\n\\n // Aggregate population statistics\\n def population_stats_ch = aggregate_population_stats(\\n Channel.value(assetsDirPath),\\n aggregated\\n )\\n\\n // Aggregate APOL1 status statistics (G0/G1/G2 with hetero/homo counts)\\n def classification_stats_ch = aggregate_classification_stats(\\n Channel.value(assetsDirPath),\\n aggregated\\n )\\n\\n // Emit per-participant APOL1 status summary.\\n def apol1_status_ch = aggregate_apol1_status(\\n Channel.value(assetsDirPath),\\n aggregated\\n )\\n\\n // Count distinct diploid classification groups (G0/G0, G1/G0, etc.)\\n def group_stats_ch = aggregate_apol1_group_stats(\\n Channel.value(assetsDirPath),\\n aggregated\\n )\\n\\n emit:\\n classification_result = aggregated\\n population_stats = population_stats_ch\\n classification_stats = classification_stats_ch\\n apol1_status = apol1_status_ch\\n group_stats = group_stats_ch\\n}\",\n", " )\n", "\n", "population_stats_block = '''\n", "process aggregate_population_stats {\n", - " container 'ghcr.io/openmined/bioscript:0.1.6'\n", + " container 'ghcr.io/openmined/bioscript:0.1.7'\n", " publishDir params.results_dir, mode: 'copy', overwrite: true\n", "\n", " input:\n", @@ -534,7 +542,7 @@ "\n", "classification_stats_block = '''\n", "process aggregate_classification_stats {\n", - " container 'ghcr.io/openmined/bioscript:0.1.6'\n", + " container 'ghcr.io/openmined/bioscript:0.1.7'\n", " publishDir params.results_dir, mode: 'copy', overwrite: true\n", "\n", " input:\n", @@ -555,7 +563,7 @@ "\n", "apol1_status_block = '''\n", "process aggregate_apol1_status {\n", - " container 'ghcr.io/openmined/bioscript:0.1.6'\n", + " container 'ghcr.io/openmined/bioscript:0.1.7'\n", " publishDir params.results_dir, mode: 'copy', overwrite: true\n", "\n", " input:\n", @@ -574,6 +582,27 @@ "}\n", "'''\n", "\n", + "group_stats_block = '''\n", + "process aggregate_apol1_group_stats {\n", + " container 'ghcr.io/openmined/bioscript:0.1.7'\n", + " publishDir params.results_dir, mode: 'copy', overwrite: true\n", + "\n", + " input:\n", + " path assets_dir\n", + " path aggregated_results\n", + "\n", + " output:\n", + " path \"result_APOL1_group_stats.tsv\"\n", + "\n", + " script:\n", + " \"\"\"\n", + " python3 \"${assets_dir}/aggregate_apol1_group_stats.py\" \\\\\n", + " --input \"${aggregated_results}\" \\\\\n", + " --output result_APOL1_group_stats.tsv\n", + " \"\"\"\n", + "}\n", + "'''\n", + "\n", "if 'process aggregate_population_stats' not in text:\n", " text += population_stats_block\n", "\n", @@ -583,6 +612,9 @@ "if 'process aggregate_apol1_status' not in text:\n", " text += apol1_status_block\n", "\n", + "if 'process aggregate_apol1_group_stats' not in text:\n", + " text += group_stats_block\n", + "\n", "workflow_path.write_text(text, encoding='utf-8')" ] }, @@ -594,7 +626,7 @@ { "data": { "text/plain": [ - "BioVaultPipeline(name='apol1-classifier', inputs={'samplesheet': 'List[GenotypeRecord]'}, steps=[PipelineStep(step_id='apol1', uses='./', with_args={'participants': 'inputs.samplesheet'}, publish={'classification_result': 'File(result_APOL1.tsv)', 'population_stats': 'File(result_APOL1_stats.tsv)', 'classification_stats': 'File(result_APOL1_classification_stats.tsv)', 'apol1_status': 'File(result_APOL1_status.tsv)'}, store={'counts_sql': SQLStore(source='classification_result', table_name='apol1_{run_id}', destination='SQL()', participant_column='participant_id', key_column='participant_id')})], version='0.1.1')" + "BioVaultPipeline(name='apol1-classifier', inputs={'samplesheet': 'List[GenotypeRecord]'}, steps=[PipelineStep(step_id='apol1', uses='./', with_args={'participants': 'inputs.samplesheet'}, publish={'classification_result': 'File(result_APOL1.tsv)', 'population_stats': 'File(result_APOL1_stats.tsv)', 'classification_stats': 'File(result_APOL1_classification_stats.tsv)', 'apol1_status': 'File(result_APOL1_status.tsv)', 'group_stats': 'File(result_APOL1_group_stats.tsv)'}, store={'counts_sql': SQLStore(source='classification_result', table_name='apol1_{run_id}', destination='SQL()', participant_column='participant_id', key_column='participant_id')})], version='0.1.1')" ] }, "execution_count": 14, @@ -623,6 +655,7 @@ " 'population_stats': 'File(result_APOL1_stats.tsv)',\n", " 'classification_stats': 'File(result_APOL1_classification_stats.tsv)',\n", " 'apol1_status': 'File(result_APOL1_status.tsv)',\n", + " 'group_stats': 'File(result_APOL1_group_stats.tsv)',\n", " },\n", " store={\n", " 'counts_sql': SQLStore(\n", @@ -693,7 +726,7 @@ "[bioscript] Provided path absolute? False\n", "[bioscript] Resolved SNP path: /Users/madhavajay/dev/biovault-desktop/workspace8/bioscript/examples/apol1/apol1_test_g1.tsv\n", "[bioscript] Resolved exists? True\n", - "[bioscript] CWD contents: .DS_Store, .ipynb_checkpoints, .pytest_cache, README.md, __pycache__, apol1-classifier, apol1_decodeme.csv, apol1_dev.ipynb, apol1_headerless.txt, apol1_myheritage.csv, apol1_test_g1.tsv, classify_apol1.py, genotype_files, process_samplesheet.sh, result_APOL1_DECODE.tsv, result_APOL1_HEADERLESS.tsv, result_APOL1_MYHERITAGE.tsv, result_APOL1_TEST_APOL1.tsv, results.tsv, samplesheet.csv, test_snps.txt, test_snps_p002.txt, test_snps_p003.txt\n", + "[bioscript] CWD contents: .DS_Store, .ipynb_checkpoints, .pytest_cache, README.md, __pycache__, apol1-classifier, apol1_decodeme.csv, apol1_dev.ipynb, apol1_headerless.txt, apol1_myheritage.csv, apol1_test_g1.tsv, classify_apol1.py, genotype_files, process_samplesheet.sh, result_APOL1_DECODE.tsv, result_APOL1_HEADERLESS.tsv, result_APOL1_MYHERITAGE.tsv, result_APOL1_P001.tsv, result_APOL1_P002.tsv, result_APOL1_P003.tsv, result_APOL1_TEST_APOL1.tsv, results.tsv, samplesheet.csv, test_snps.txt, test_snps_p002.txt, test_snps_p003.txt\n", "[bioscript] Using resolved SNP path: /Users/madhavajay/dev/biovault-desktop/workspace8/bioscript/examples/apol1/apol1_test_g1.tsv\n", "participant_id=TEST_APOL1\n", "APOL1_count=3\n", @@ -790,7 +823,7 @@ "[bioscript] Provided path absolute? False\n", "[bioscript] Resolved SNP path: /Users/madhavajay/dev/biovault-desktop/workspace8/bioscript/examples/apol1/apol1_decodeme.csv\n", "[bioscript] Resolved exists? True\n", - "[bioscript] CWD contents: .DS_Store, .ipynb_checkpoints, .pytest_cache, README.md, __pycache__, apol1-classifier, apol1_decodeme.csv, apol1_dev.ipynb, apol1_headerless.txt, apol1_myheritage.csv, apol1_test_g1.tsv, classify_apol1.py, genotype_files, process_samplesheet.sh, result_APOL1_DECODE.tsv, result_APOL1_HEADERLESS.tsv, result_APOL1_MYHERITAGE.tsv, result_APOL1_TEST_APOL1.tsv, results.tsv, samplesheet.csv, test_snps.txt, test_snps_p002.txt, test_snps_p003.txt\n", + "[bioscript] CWD contents: .DS_Store, .ipynb_checkpoints, .pytest_cache, README.md, __pycache__, apol1-classifier, apol1_decodeme.csv, apol1_dev.ipynb, apol1_headerless.txt, apol1_myheritage.csv, apol1_test_g1.tsv, classify_apol1.py, genotype_files, process_samplesheet.sh, result_APOL1_DECODE.tsv, result_APOL1_HEADERLESS.tsv, result_APOL1_MYHERITAGE.tsv, result_APOL1_P001.tsv, result_APOL1_P002.tsv, result_APOL1_P003.tsv, result_APOL1_TEST_APOL1.tsv, results.tsv, samplesheet.csv, test_snps.txt, test_snps_p002.txt, test_snps_p003.txt\n", "[bioscript] Using resolved SNP path: /Users/madhavajay/dev/biovault-desktop/workspace8/bioscript/examples/apol1/apol1_decodeme.csv\n", "participant_id=DECODE\n", "APOL1_count=3\n", @@ -886,7 +919,7 @@ "[bioscript] Provided path absolute? False\n", "[bioscript] Resolved SNP path: /Users/madhavajay/dev/biovault-desktop/workspace8/bioscript/examples/apol1/apol1_myheritage.csv\n", "[bioscript] Resolved exists? True\n", - "[bioscript] CWD contents: .DS_Store, .ipynb_checkpoints, .pytest_cache, README.md, __pycache__, apol1-classifier, apol1_decodeme.csv, apol1_dev.ipynb, apol1_headerless.txt, apol1_myheritage.csv, apol1_test_g1.tsv, classify_apol1.py, genotype_files, process_samplesheet.sh, result_APOL1_DECODE.tsv, result_APOL1_HEADERLESS.tsv, result_APOL1_MYHERITAGE.tsv, result_APOL1_TEST_APOL1.tsv, results.tsv, samplesheet.csv, test_snps.txt, test_snps_p002.txt, test_snps_p003.txt\n", + "[bioscript] CWD contents: .DS_Store, .ipynb_checkpoints, .pytest_cache, README.md, __pycache__, apol1-classifier, apol1_decodeme.csv, apol1_dev.ipynb, apol1_headerless.txt, apol1_myheritage.csv, apol1_test_g1.tsv, classify_apol1.py, genotype_files, process_samplesheet.sh, result_APOL1_DECODE.tsv, result_APOL1_HEADERLESS.tsv, result_APOL1_MYHERITAGE.tsv, result_APOL1_P001.tsv, result_APOL1_P002.tsv, result_APOL1_P003.tsv, result_APOL1_TEST_APOL1.tsv, results.tsv, samplesheet.csv, test_snps.txt, test_snps_p002.txt, test_snps_p003.txt\n", "[bioscript] Using resolved SNP path: /Users/madhavajay/dev/biovault-desktop/workspace8/bioscript/examples/apol1/apol1_myheritage.csv\n", "participant_id=MYHERITAGE\n", "APOL1_count=3\n", @@ -980,7 +1013,7 @@ "[bioscript] Provided path absolute? False\n", "[bioscript] Resolved SNP path: /Users/madhavajay/dev/biovault-desktop/workspace8/bioscript/examples/apol1/apol1_headerless.txt\n", "[bioscript] Resolved exists? True\n", - "[bioscript] CWD contents: .DS_Store, .ipynb_checkpoints, .pytest_cache, README.md, __pycache__, apol1-classifier, apol1_decodeme.csv, apol1_dev.ipynb, apol1_headerless.txt, apol1_myheritage.csv, apol1_test_g1.tsv, classify_apol1.py, genotype_files, process_samplesheet.sh, result_APOL1_DECODE.tsv, result_APOL1_HEADERLESS.tsv, result_APOL1_MYHERITAGE.tsv, result_APOL1_TEST_APOL1.tsv, results.tsv, samplesheet.csv, test_snps.txt, test_snps_p002.txt, test_snps_p003.txt\n", + "[bioscript] CWD contents: .DS_Store, .ipynb_checkpoints, .pytest_cache, README.md, __pycache__, apol1-classifier, apol1_decodeme.csv, apol1_dev.ipynb, apol1_headerless.txt, apol1_myheritage.csv, apol1_test_g1.tsv, classify_apol1.py, genotype_files, process_samplesheet.sh, result_APOL1_DECODE.tsv, result_APOL1_HEADERLESS.tsv, result_APOL1_MYHERITAGE.tsv, result_APOL1_P001.tsv, result_APOL1_P002.tsv, result_APOL1_P003.tsv, result_APOL1_TEST_APOL1.tsv, results.tsv, samplesheet.csv, test_snps.txt, test_snps_p002.txt, test_snps_p003.txt\n", "[bioscript] Using resolved SNP path: /Users/madhavajay/dev/biovault-desktop/workspace8/bioscript/examples/apol1/apol1_headerless.txt\n", "participant_id=HEADERLESS\n", "APOL1_count=3\n",