Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions examples/apol1/apol1-classifier/assets/aggregate_apol1_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import argparse
import csv


def parse_args():
parser = argparse.ArgumentParser(description="Aggregate APOL1 participant statuses")
parser.add_argument("--input", required=True, help="Input TSV from combined classifier output")
parser.add_argument("--output", required=True, help="Output TSV path")
return parser.parse_args()


def main():
args = parse_args()

participant_rows = {}

with open(args.input, "r", encoding="utf-8") as fh:
reader = csv.DictReader(fh, delimiter="\t")
for row in reader:
participant_id = (row.get("participant_id") or "").strip()
original_file = (row.get("filename") or "").strip()
apol1_status = (row.get("apol1_status") or "").strip()

if not participant_id:
continue
if participant_id not in participant_rows:
participant_rows[participant_id] = {
"participant_id": participant_id,
"original_file": original_file,
"apol1_status": apol1_status,
}

with open(args.output, "w", encoding="utf-8", newline="") as out_fh:
fieldnames = ["participant_id", "original_file", "apol1_status"]
writer = csv.DictWriter(out_fh, fieldnames=fieldnames, delimiter="\t")
writer.writeheader()
for row in participant_rows.values():
writer.writerow(row)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import argparse
import csv
from collections import defaultdict

STATUSES = ["G0", "G1", "G2"]


def parse_args():
parser = argparse.ArgumentParser(
description="Aggregate APOL1 status statistics (G0/G1/G2 counts + hetero/homo)"
)
parser.add_argument("--input", required=True, help="Input TSV from combined classifier output")
parser.add_argument("--output", required=True, help="Output TSV path")
return parser.parse_args()


def parse_status(apol1_status):
status = (apol1_status or "").strip()
if not status:
return []
parts = [part.strip() for part in status.split("/")]
if len(parts) != 2:
return []
return parts


def main():
args = parse_args()

# Track one APOL1 status per participant (all rows for a participant should match).
participant_status = {}

with open(args.input, "r", encoding="utf-8") as fh:
reader = csv.DictReader(fh, delimiter="\t")
for row in reader:
participant_id = (row.get("participant_id") or "").strip()
apol1_status = (row.get("apol1_status") or "").strip()
if not participant_id or not apol1_status:
continue
if participant_id not in participant_status:
participant_status[participant_id] = apol1_status

stats = defaultdict(lambda: {"count": 0, "hetero": 0, "homo": 0})
total_successful_reads = 0

for apol1_status in participant_status.values():
alleles = parse_status(apol1_status)
called = [allele for allele in alleles if allele in STATUSES]

# "number" is the total number of successful allele reads across participants.
total_successful_reads += len(called)

for allele in called:
stats[allele]["count"] += 1

# Track per-status zygosity participant counts when both alleles are callable.
if len(called) == 2:
if called[0] == called[1]:
stats[called[0]]["homo"] += 1
else:
stats[called[0]]["hetero"] += 1
stats[called[1]]["hetero"] += 1

with open(args.output, "w", encoding="utf-8", newline="") as out_fh:
fieldnames = ["status", "count", "number", "hetero", "homo", "frequency"]
writer = csv.DictWriter(out_fh, fieldnames=fieldnames, delimiter="\t")
writer.writeheader()

for status in STATUSES:
count = stats[status]["count"]
frequency = (count / total_successful_reads) if total_successful_reads else 0
writer.writerow(
{
"status": status,
"count": count,
"number": total_successful_reads,
"hetero": stats[status]["hetero"],
"homo": stats[status]["homo"],
"frequency": f"{frequency:.6f}",
}
)


if __name__ == "__main__":
main()
135 changes: 135 additions & 0 deletions examples/apol1/apol1-classifier/assets/aggregate_population_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import argparse
import csv
import re
from collections import defaultdict

def parse_args():
parser = argparse.ArgumentParser(description="Aggregate population allele statistics")
parser.add_argument("--input", required=True, help="Input TSV from combined classifier output")
parser.add_argument("--output", required=True, help="Output TSV path")
return parser.parse_args()

def locus_key_for_row(row):
chrom = (row.get("chromosome") or "").strip()
pos = (row.get("position") or "").strip()
ref = (row.get("ref") or "").strip()
alt = (row.get("alt") or "").strip()
if not (chrom and pos and ref and alt):
return None
return f"{chrom}-{pos}-{ref}-{alt}"

def parse_alleles(genotype, alleles):
if not genotype:
return []
genotype = genotype.strip()
if not genotype:
return []

# Split on common genotype separators
if "/" in genotype or "|" in genotype:
parts = [p for p in re.split(r"[\/|]", genotype) if p]
return parts

# If alleles are same length and genotype is a concat of two alleles, split evenly
if alleles:
lengths = {len(a) for a in alleles if a}
if len(lengths) == 1:
allele_len = lengths.pop()
if allele_len > 0 and len(genotype) == 2 * allele_len:
return [genotype[:allele_len], genotype[allele_len:]]

# Fallback for single-base alleles like "AA" or "AG"
if len(genotype) == 2:
return [genotype[0], genotype[1]]

return [genotype]

def main():
args = parse_args()

# stats keyed by (locus_base, allele)
stats = defaultdict(lambda: {
"allele_count": 0,
"num_homo": 0,
"num_hetero": 0,
"rsid": None,
})
locus_called_samples = defaultdict(int)

with open(args.input, "r", encoding="utf-8") as fh:
reader = csv.DictReader(fh, delimiter="\t")
for row in reader:
locus_key = locus_key_for_row(row)
if locus_key is None:
continue

chrom = (row.get("chromosome") or "").strip()
pos = (row.get("position") or "").strip()
ref = (row.get("ref") or "").strip()
alt = (row.get("alt") or "").strip()
locus_base = f"{chrom}-{pos}"

alleles = []
if ref:
alleles.append(ref)
if alt:
alleles.extend([a.strip() for a in alt.split(",") if a.strip()])

genotype = (row.get("genotype") or "").strip()
gt_alleles = parse_alleles(genotype, alleles)
if not gt_alleles:
continue

# Keep only alleles that are callable for this locus.
allowed = {a.upper() for a in alleles}
callable_gt_alleles = [a.upper() for a in gt_alleles if a.upper() in allowed]

# Require a full diploid call for denominator consistency (allele_number = 2 * N called samples).
if len(callable_gt_alleles) != 2:
continue

locus_called_samples[locus_base] += 1

rsid = row.get("rsid")
is_homo = callable_gt_alleles[0] == callable_gt_alleles[1]

for allele in callable_gt_alleles:
key = (locus_base, allele)
entry = stats[key]
entry["allele_count"] += 1
if is_homo and allele == callable_gt_alleles[0]:
entry["num_homo"] += 1
elif not is_homo:
entry["num_hetero"] += 1
if rsid and not entry["rsid"]:
entry["rsid"] = rsid

with open(args.output, "w", encoding="utf-8", newline="") as out_fh:
fieldnames = [
"locus_key",
"allele_count",
"allele_number",
"num_homo",
"num_hetero",
"allele_freq",
"rsid",
]
writer = csv.DictWriter(out_fh, fieldnames=fieldnames, delimiter="\t")
writer.writeheader()
for (locus_base, allele) in sorted(stats.keys()):
entry = stats[(locus_base, allele)]
called = locus_called_samples.get(locus_base, 0)
allele_number = called * 2
allele_freq = (entry["allele_count"] / allele_number) if allele_number else 0
writer.writerow({
"locus_key": f"{locus_base}-{allele}",
"allele_count": entry["allele_count"],
"allele_number": allele_number,
"num_homo": entry["num_homo"],
"num_hetero": entry["num_hetero"],
"allele_freq": f"{allele_freq:.6f}",
"rsid": entry["rsid"],
})

if __name__ == "__main__":
main()
Loading
Loading