Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/705.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use Census work-and-childcare capping inputs for clone-half SPM childcare expenses instead of donor capping shares.
1 change: 1 addition & 0 deletions policyengine_us_data/datasets/cps/census_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ class CensusCPS_2018(CensusCPS):
"A_FNLWGT",
"A_LINENO",
"A_SPOUSE",
"PERRP",
"A_AGE",
"A_SEX",
"PEDISEYE",
Expand Down
22 changes: 22 additions & 0 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@
),
}

# Census CPS ASEC 2024 technical documentation, PERRP:
# https://www2.census.gov/programs-surveys/cps/techdocs/cpsmar24.pdf
PERRP_UNMARRIED_PARTNER_OF_HOUSEHOLD_HEAD_CODES = {
43: "Opposite Sex Unmarried Partner with Relatives",
44: "Opposite Sex Unmarried Partner without Relatives",
46: "Same Sex Unmarried Partner with Relatives",
47: "Same Sex Unmarried Partner without Relatives",
}


class CPS(Dataset):
name = "cps"
Expand Down Expand Up @@ -572,6 +581,14 @@ def children_per_parent(col: str) -> pd.DataFrame:

cps["is_surviving_spouse"] = person.A_MARITL == 4
cps["is_separated"] = person.A_MARITL == 6
perrp = (
person.PERRP
if "PERRP" in person
else pd.Series(0, index=person.index, dtype=np.int16)
)
cps["is_unmarried_partner_of_household_head"] = perrp.isin(
PERRP_UNMARRIED_PARTNER_OF_HOUSEHOLD_HEAD_CODES.keys()
)
# High school or college/university enrollment status.
cps["is_full_time_college_student"] = person.A_HSCOL == 2

Expand All @@ -582,6 +599,10 @@ def children_per_parent(col: str) -> pd.DataFrame:
add_overtime_occupation(cps, person)


def derive_weeks_worked(weeks_worked: Series | np.ndarray) -> Series | np.ndarray:
return np.clip(weeks_worked, 0, 52)


def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int):
"""Add income variables.

Expand All @@ -607,6 +628,7 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int):

cps["weekly_hours_worked"] = person.HRSWK
cps["hours_worked_last_week"] = person.A_HRS1
cps["weeks_worked"] = derive_weeks_worked(person.WKSWORK)

cps["taxable_interest_income"] = person.INT_VAL * (p["taxable_interest_fraction"])
cps["tax_exempt_interest_income"] = person.INT_VAL * (
Expand Down
137 changes: 0 additions & 137 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,125 +584,6 @@ def reconcile_ss_subcomponents(predictions, total_ss):
}


def derive_clone_capped_childcare_expenses(
donor_pre_subsidy: np.ndarray,
donor_capped: np.ndarray,
clone_pre_subsidy: np.ndarray,
clone_person_data: pd.DataFrame,
clone_spm_unit_ids: np.ndarray,
) -> np.ndarray:
"""Derive clone-half capped childcare from clone inputs.

The CPS provides both pre-subsidy childcare and the SPM-specific
capped childcare deduction. For the clone half, we impute only the
pre-subsidy amount, then deterministically rebuild the capped amount
instead of letting a second QRF predict it independently.

We preserve the donor's observed capping share while also respecting
the clone's own earnings cap. This keeps the clone-half value
consistent with pre-subsidy childcare and avoids impossible outputs
such as capped childcare exceeding pre-subsidy childcare.
"""

donor_pre_subsidy = np.asarray(donor_pre_subsidy, dtype=float)
donor_capped = np.asarray(donor_capped, dtype=float)
clone_pre_subsidy = np.asarray(clone_pre_subsidy, dtype=float)
clone_spm_unit_ids = np.asarray(clone_spm_unit_ids)

donor_cap_share = np.divide(
donor_capped,
donor_pre_subsidy,
out=np.zeros_like(donor_capped, dtype=float),
where=donor_pre_subsidy > 0,
)
donor_cap_share = np.clip(donor_cap_share, 0.0, 1.0)
capped_from_share = np.maximum(clone_pre_subsidy, 0.0) * donor_cap_share

if clone_person_data.empty:
earnings_cap = np.zeros(len(clone_spm_unit_ids), dtype=float)
else:
eligible = clone_person_data["is_parent_proxy"].astype(bool)
parent_rows = clone_person_data.loc[
eligible, ["spm_unit_id", "age", "earnings"]
].copy()
if parent_rows.empty:
earnings_cap = np.zeros(len(clone_spm_unit_ids), dtype=float)
else:
parent_rows["earnings"] = parent_rows["earnings"].clip(lower=0.0)
parent_rows["age_rank"] = parent_rows.groupby("spm_unit_id")["age"].rank(
method="first", ascending=False
)
top_two = parent_rows[parent_rows["age_rank"] <= 2].sort_values(
["spm_unit_id", "age_rank"]
)
earnings_cap_by_unit = top_two.groupby("spm_unit_id")["earnings"].agg(
lambda values: (
float(values.iloc[0])
if len(values) == 1
else float(np.minimum(values.iloc[0], values.iloc[1]))
)
)
earnings_cap = earnings_cap_by_unit.reindex(
clone_spm_unit_ids, fill_value=0.0
).to_numpy(dtype=float)

return np.minimum(capped_from_share, earnings_cap)


def _rebuild_clone_capped_childcare_expenses(
data: dict,
time_period: int,
cps_sim,
) -> np.ndarray:
"""Rebuild clone-half capped childcare expenses after stage-2 imputation."""

n_persons_half = len(data["person_id"][time_period]) // 2
n_spm_units_half = len(data["spm_unit_id"][time_period]) // 2

person_roles = cps_sim.calculate_dataframe(
["age", "is_tax_unit_head", "is_tax_unit_spouse"]
)
if len(person_roles) != n_persons_half:
raise ValueError(
"Unexpected person role frame length while rebuilding clone childcare "
f"expenses: got {len(person_roles)}, expected {n_persons_half}"
)

clone_person_data = pd.DataFrame(
{
"spm_unit_id": data["person_spm_unit_id"][time_period][n_persons_half:],
"age": person_roles["age"].values,
"is_parent_proxy": (
person_roles["is_tax_unit_head"].values
| person_roles["is_tax_unit_spouse"].values
),
"earnings": (
data["employment_income"][time_period][n_persons_half:]
+ data["self_employment_income"][time_period][n_persons_half:]
),
}
)

donor_pre_subsidy = data["spm_unit_pre_subsidy_childcare_expenses"][time_period][
:n_spm_units_half
]
donor_capped = data["spm_unit_capped_work_childcare_expenses"][time_period][
:n_spm_units_half
]
clone_pre_subsidy = data["spm_unit_pre_subsidy_childcare_expenses"][time_period][
n_spm_units_half:
]
clone_spm_unit_ids = data["spm_unit_id"][time_period][n_spm_units_half:]

return derive_clone_capped_childcare_expenses(
donor_pre_subsidy=donor_pre_subsidy,
donor_capped=donor_capped,
clone_pre_subsidy=clone_pre_subsidy,
clone_person_data=clone_person_data,
clone_spm_unit_ids=clone_spm_unit_ids,
)


def _apply_post_processing(predictions, X_test, time_period, data):
"""Apply retirement constraints and SS reconciliation."""
ret_cols = [c for c in predictions.columns if c in _RETIREMENT_VARS]
Expand Down Expand Up @@ -807,24 +688,6 @@ def _splice_cps_only_predictions(
new_values = np.concatenate([cps_half, pred_values])
data[var] = {time_period: new_values}

if (
"spm_unit_capped_work_childcare_expenses" in data
and "spm_unit_pre_subsidy_childcare_expenses" in data
):
n_half = entity_half_lengths.get(
"spm_unit",
len(data["spm_unit_capped_work_childcare_expenses"][time_period]) // 2,
)
cps_half = data["spm_unit_capped_work_childcare_expenses"][time_period][:n_half]
clone_half = _rebuild_clone_capped_childcare_expenses(
data=data,
time_period=time_period,
cps_sim=cps_sim,
)
data["spm_unit_capped_work_childcare_expenses"] = {
time_period: np.concatenate([cps_half, clone_half])
}

del cps_sim
return data

Expand Down
77 changes: 1 addition & 76 deletions tests/unit/test_extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
_derive_overtime_occupation_inputs,
_impute_clone_cps_features,
apply_retirement_constraints,
derive_clone_capped_childcare_expenses,
reconcile_ss_subcomponents,
)
from policyengine_us_data.datasets.cps.tipped_occupation import (
Expand Down Expand Up @@ -132,86 +131,12 @@ def test_pension_income_not_in_cps_only(self):
)

def test_capped_childcare_not_in_cps_only(self):
"""Capped childcare should be derived from clone-half inputs, not
independently QRF-imputed."""
"""Capped childcare should not be independently QRF-imputed."""
assert "spm_unit_capped_work_childcare_expenses" not in set(
CPS_ONLY_IMPUTED_VARIABLES
)


class TestCloneChildcareDerivation:
"""Clone-half capped childcare should be derived deterministically."""

def test_caps_at_pre_subsidy_and_clone_earnings(self):
donor_pre_subsidy = np.array([10000.0, 4000.0, 6000.0])
donor_capped = np.array([4000.0, 4000.0, 0.0])
clone_pre_subsidy = np.array([12000.0, 5000.0, 3000.0])
person_data = pd.DataFrame(
{
"spm_unit_id": [1, 1, 2, 2, 3],
"age": [40, 38, 35, 33, 29],
"is_parent_proxy": [True, True, True, True, True],
"earnings": [9000.0, 3000.0, 1500.0, 0.0, 2000.0],
}
)

result = derive_clone_capped_childcare_expenses(
donor_pre_subsidy=donor_pre_subsidy,
donor_capped=donor_capped,
clone_pre_subsidy=clone_pre_subsidy,
clone_person_data=person_data,
clone_spm_unit_ids=np.array([1, 2, 3]),
)

np.testing.assert_allclose(result, np.array([3000.0, 0.0, 0.0]))

def test_uses_single_parent_earnings_cap_for_single_proxy_units(self):
donor_pre_subsidy = np.array([4000.0])
donor_capped = np.array([4000.0])
clone_pre_subsidy = np.array([6000.0])
person_data = pd.DataFrame(
{
"spm_unit_id": [10],
"age": [31],
"is_parent_proxy": [True],
"earnings": [2500.0],
}
)

result = derive_clone_capped_childcare_expenses(
donor_pre_subsidy=donor_pre_subsidy,
donor_capped=donor_capped,
clone_pre_subsidy=clone_pre_subsidy,
clone_person_data=person_data,
clone_spm_unit_ids=np.array([10]),
)

np.testing.assert_allclose(result, np.array([2500.0]))

def test_falls_back_to_zero_without_parent_proxies(self):
donor_pre_subsidy = np.array([3000.0])
donor_capped = np.array([2000.0])
clone_pre_subsidy = np.array([3000.0])
person_data = pd.DataFrame(
{
"spm_unit_id": [20, 20],
"age": [12, 9],
"is_parent_proxy": [False, False],
"earnings": [0.0, 0.0],
}
)

result = derive_clone_capped_childcare_expenses(
donor_pre_subsidy=donor_pre_subsidy,
donor_capped=donor_capped,
clone_pre_subsidy=clone_pre_subsidy,
clone_person_data=person_data,
clone_spm_unit_ids=np.array([20]),
)

np.testing.assert_allclose(result, np.array([0.0]))


class TestRetirementConstraints:
"""Post-processing retirement constraints enforce IRS caps."""

Expand Down
70 changes: 70 additions & 0 deletions tests/unit/test_reference_partner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Tests for reference-person partner extraction from CPS ASEC.

The public CPS ASEC relationship-to-reference-person variable PERRP identifies
unmarried partners of the household head/reference person. We carry that
through so the SPM childcare cap can distinguish the reference person's partner
from unrelated adults in the same SPM unit.
"""

import numpy as np
import pandas as pd

from policyengine_us_data.datasets.cps.census_cps import PERSON_COLUMNS
from policyengine_us_data.datasets.cps.cps import (
PERRP_UNMARRIED_PARTNER_OF_HOUSEHOLD_HEAD_CODES,
add_personal_variables,
)


def _person_frame(**columns):
n_persons = len(next(iter(columns.values())))
data = {column: np.zeros(n_persons, dtype=int) for column in PERSON_COLUMNS}
data.update(columns)
return pd.DataFrame(data)


class TestReferencePartner:
"""Test suite for CPS relationship-to-reference-person extraction."""

def test_census_cps_loads_perrp(self):
assert "PERRP" in PERSON_COLUMNS

def test_unmarried_partner_perrp_code_table_matches_census_labels(self):
assert PERRP_UNMARRIED_PARTNER_OF_HOUSEHOLD_HEAD_CODES == {
43: "Opposite Sex Unmarried Partner with Relatives",
44: "Opposite Sex Unmarried Partner without Relatives",
46: "Same Sex Unmarried Partner with Relatives",
47: "Same Sex Unmarried Partner without Relatives",
}

def test_cps_maps_unmarried_partner_from_perrp(self):
person = _person_frame(
PH_SEQ=np.arange(7) + 1,
A_LINENO=np.ones(7),
A_AGE=np.full(7, 35),
PERRP=np.array([40, 43, 44, 45, 46, 47, 48]),
)

cps = {}
add_personal_variables(cps, person)

np.testing.assert_array_equal(
cps["is_unmarried_partner_of_household_head"],
np.array([False, True, True, False, True, True, False]),
)

def test_missing_perrp_defaults_to_false(self):
person = _person_frame(
PH_SEQ=np.arange(3) + 1,
A_LINENO=np.ones(3),
A_AGE=np.full(3, 35),
).drop(columns="PERRP")

cps = {}
add_personal_variables(cps, person)

np.testing.assert_array_equal(
cps["is_unmarried_partner_of_household_head"],
np.array([False, False, False]),
)
Loading
Loading