Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions policyengine_us_data/calibration/publish_local_area.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from policyengine_us_data.utils.takeup import (
SIMPLE_TAKEUP_VARS,
apply_block_takeup_to_arrays,
any_person_flag_by_entity,
)

CHECKPOINT_FILE = Path("completed_states.txt")
Expand Down Expand Up @@ -526,6 +527,19 @@ def build_h5(
}
hh_state_fips = clone_geo["state_fips"].astype(np.int32)
original_hh_ids = household_ids[active_hh].astype(np.int64)
reported_anchors = {}
if "reported_has_marketplace_health_coverage_at_interview" in data:
reported_anchors["takes_up_aca_if_eligible"] = any_person_flag_by_entity(
data["person_tax_unit_id"][time_period],
data["tax_unit_id"][time_period],
data["reported_has_marketplace_health_coverage_at_interview"][
time_period
],
)
if "reported_has_means_tested_health_coverage_at_interview" in data:
reported_anchors["takes_up_medicaid_if_eligible"] = data[
"reported_has_means_tested_health_coverage_at_interview"
][time_period].astype(bool)

takeup_results = apply_block_takeup_to_arrays(
hh_blocks=active_blocks,
Expand All @@ -535,6 +549,7 @@ def build_h5(
entity_counts=entity_counts,
time_period=time_period,
takeup_filter=takeup_filter,
reported_anchors=reported_anchors,
)
for var_name, bools in takeup_results.items():
data[var_name] = {time_period: bools}
Expand Down
36 changes: 36 additions & 0 deletions policyengine_us_data/calibration/unified_matrix_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from collections import defaultdict
from typing import Dict, List, Optional, Tuple

import h5py
import numpy as np
import pandas as pd
from scipy import sparse
Expand Down Expand Up @@ -532,6 +533,7 @@ def _process_single_clone(
entity_hh_idx_map = sd.get("entity_hh_idx_map", {})
entity_to_person_idx = sd.get("entity_to_person_idx", {})
precomputed_rates = sd.get("precomputed_rates", {})
reported_takeup_anchors = sd.get("reported_takeup_anchors", {})

# Slice geography for this clone
clone_states = geo_states[col_start:col_end]
Expand Down Expand Up @@ -594,6 +596,7 @@ def _process_single_clone(
precomputed_rates[info["rate_key"]],
ent_blocks,
ent_hh_ids,
reported_mask=reported_takeup_anchors.get(takeup_var),
)

ent_values = (ent_eligible * ent_takeup).astype(np.float32)
Expand Down Expand Up @@ -1876,6 +1879,7 @@ def build_matrix(
from policyengine_us_data.utils.takeup import (
TAKEUP_AFFECTED_TARGETS,
compute_block_takeup_for_entities,
any_person_flag_by_entity,
)
from policyengine_us_data.parameters import (
load_take_up_rate,
Expand Down Expand Up @@ -1904,6 +1908,35 @@ def build_matrix(
"person": person_hh_indices,
}

reported_takeup_anchors = {}
with h5py.File(self.dataset_path, "r") as f:
period_key = str(self.time_period)
if (
"reported_has_marketplace_health_coverage_at_interview" in f
and period_key
in f["reported_has_marketplace_health_coverage_at_interview"]
):
person_marketplace = f[
"reported_has_marketplace_health_coverage_at_interview"
][period_key][...].astype(bool)
person_tax_unit_ids = f["person_tax_unit_id"][period_key][...]
tax_unit_ids = f["tax_unit_id"][period_key][...]
reported_takeup_anchors["takes_up_aca_if_eligible"] = (
any_person_flag_by_entity(
person_tax_unit_ids,
tax_unit_ids,
person_marketplace,
)
)
if (
"reported_has_means_tested_health_coverage_at_interview" in f
and period_key
in f["reported_has_means_tested_health_coverage_at_interview"]
):
reported_takeup_anchors["takes_up_medicaid_if_eligible"] = f[
"reported_has_means_tested_health_coverage_at_interview"
][period_key][...].astype(bool)

entity_to_person_idx = {}
for entity_level in ("spm_unit", "tax_unit"):
ent_ids = sim.calculate(
Expand Down Expand Up @@ -1940,6 +1973,7 @@ def build_matrix(
self.household_ids = household_ids
self.precomputed_rates = precomputed_rates
self.affected_target_info = affected_target_info
self.reported_takeup_anchors = reported_takeup_anchors

# 5d. Clone loop
from pathlib import Path
Expand Down Expand Up @@ -1987,6 +2021,7 @@ def build_matrix(
shared_data["entity_hh_idx_map"] = entity_hh_idx_map
shared_data["entity_to_person_idx"] = entity_to_person_idx
shared_data["precomputed_rates"] = precomputed_rates
shared_data["reported_takeup_anchors"] = reported_takeup_anchors

logger.info(
"Starting parallel clone processing: %d clones, %d workers",
Expand Down Expand Up @@ -2124,6 +2159,7 @@ def build_matrix(
precomputed_rates[info["rate_key"]],
ent_blocks,
ent_hh_ids,
reported_mask=reported_takeup_anchors.get(takeup_var),
)

ent_values = (ent_eligible * ent_takeup).astype(np.float32)
Expand Down
18 changes: 17 additions & 1 deletion policyengine_us_data/datasets/cps/census_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,24 @@ class CensusCPS_2018(CensusCPS):
"A_AGE",
"A_SEX",
"PEDISEYE",
"NOW_COV",
"NOW_DIR",
"NOW_MRK",
"NOW_MRKS",
"NOW_MRKUN",
"NOW_NONM",
"NOW_PRIV",
"NOW_PUB",
"NOW_GRP",
"NOW_CAID",
"NOW_MCAID",
"NOW_PCHIP",
"NOW_OTHMT",
"NOW_MCARE",
"NOW_MIL",
"NOW_CHAMPVA",
"NOW_VACARE",
"NOW_IHSFLG",
"WSAL_VAL",
"INT_VAL",
"SEMP_VAL",
Expand Down Expand Up @@ -294,7 +311,6 @@ class CensusCPS_2018(CensusCPS):
"PMED_VAL",
"PEMCPREM",
"PRCITSHP",
"NOW_GRP",
"POCCU2",
"PEINUSYR",
"MCARE",
Expand Down
76 changes: 71 additions & 5 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,31 @@
import logging
from policyengine_us_data.parameters import load_take_up_rate
from policyengine_us_data.utils.randomness import seeded_rng
from policyengine_us_data.utils.takeup import (
any_person_flag_by_entity,
assign_takeup_with_reported_anchors,
)


CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP = {
"reported_has_direct_purchase_health_coverage_at_interview": "NOW_DIR",
"reported_has_marketplace_health_coverage_at_interview": "NOW_MRK",
"reported_has_subsidized_marketplace_health_coverage_at_interview": "NOW_MRKS",
"reported_has_unsubsidized_marketplace_health_coverage_at_interview": "NOW_MRKUN",
"reported_has_non_marketplace_direct_purchase_health_coverage_at_interview": (
"NOW_NONM"
),
"reported_has_employer_sponsored_health_coverage_at_interview": "NOW_GRP",
"reported_has_medicare_health_coverage_at_interview": "NOW_MCARE",
"reported_has_medicaid_health_coverage_at_interview": "NOW_CAID",
"reported_has_means_tested_health_coverage_at_interview": "NOW_MCAID",
"reported_has_chip_health_coverage_at_interview": "NOW_PCHIP",
"reported_has_other_means_tested_health_coverage_at_interview": "NOW_OTHMT",
"reported_has_tricare_health_coverage_at_interview": "NOW_MIL",
"reported_has_champva_health_coverage_at_interview": "NOW_CHAMPVA",
"reported_has_va_health_coverage_at_interview": "NOW_VACARE",
"reported_has_indian_health_service_coverage_at_interview": "NOW_IHSFLG",
}


class CPS(Dataset):
Expand Down Expand Up @@ -241,7 +266,16 @@ def add_takeup(self):

# ACA
rng = seeded_rng("takes_up_aca_if_eligible")
data["takes_up_aca_if_eligible"] = rng.random(n_tax_units) < aca_rate
reported_marketplace_by_tax_unit = any_person_flag_by_entity(
data["person_tax_unit_id"],
data["tax_unit_id"],
data["reported_has_marketplace_health_coverage_at_interview"],
)
data["takes_up_aca_if_eligible"] = assign_takeup_with_reported_anchors(
rng.random(n_tax_units),
aca_rate,
reported_mask=reported_marketplace_by_tax_unit,
)

# Medicaid: state-specific rates
state_codes = baseline.calculate("state_code_str").values
Expand All @@ -253,8 +287,11 @@ def add_takeup(self):
[medicaid_rates_by_state.get(s, 0.93) for s in person_states]
)
rng = seeded_rng("takes_up_medicaid_if_eligible")
data["takes_up_medicaid_if_eligible"] = (
rng.random(n_persons) < medicaid_rate_by_person
data["takes_up_medicaid_if_eligible"] = assign_takeup_with_reported_anchors(
rng.random(n_persons),
medicaid_rate_by_person,
reported_mask=data["reported_has_means_tested_health_coverage_at_interview"],
group_keys=person_states,
)

# Head Start
Expand Down Expand Up @@ -462,9 +499,38 @@ def children_per_parent(col: str) -> pd.DataFrame:
)
cps["own_children_in_household"] = tmp.children.fillna(0)

cps["has_marketplace_health_coverage"] = person.NOW_MRK == 1
for variable, cps_column in CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP.items():
cps[variable] = person[cps_column] == 1

cps["reported_has_private_health_coverage_at_interview"] = person.NOW_PRIV == 1
cps["reported_has_public_health_coverage_at_interview"] = person.NOW_PUB == 1
cps["reported_is_insured_at_interview"] = person.NOW_COV == 1
cps["reported_is_uninsured_at_interview"] = person.NOW_COV != 1

cps["has_esi"] = person.NOW_GRP == 1
coverage_families = np.column_stack(
[
cps["reported_has_employer_sponsored_health_coverage_at_interview"],
cps["reported_has_marketplace_health_coverage_at_interview"],
cps[
"reported_has_non_marketplace_direct_purchase_health_coverage_at_interview"
],
cps["reported_has_medicare_health_coverage_at_interview"],
cps["reported_has_means_tested_health_coverage_at_interview"],
cps["reported_has_tricare_health_coverage_at_interview"],
cps["reported_has_champva_health_coverage_at_interview"],
cps["reported_has_va_health_coverage_at_interview"],
cps["reported_has_indian_health_service_coverage_at_interview"],
]
)
cps["reported_has_multiple_health_coverage_at_interview"] = (
coverage_families.sum(axis=1) > 1
)

# Legacy aliases retained for compatibility until rules-side names catch up.
cps["has_marketplace_health_coverage"] = cps[
"reported_has_marketplace_health_coverage_at_interview"
]
cps["has_esi"] = cps["reported_has_employer_sponsored_health_coverage_at_interview"]

cps["cps_race"] = person.PRDTRACE
cps["is_hispanic"] = person.PRDTHSP != 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,22 @@ def test_different_blocks_different_result(self):
differs = any(not np.array_equal(r1[v], r2[v]) for v in r1)
assert differs

def test_reported_anchors_feed_through_for_aca(self):
args = self._make_arrays(4, 1, 1, 1)
result = apply_block_takeup_to_arrays(
*args,
time_period=2024,
takeup_filter=["takes_up_aca_if_eligible"],
precomputed_rates={"aca": 0.25},
reported_anchors={
"takes_up_aca_if_eligible": np.array([True, False, False, False])
},
)
np.testing.assert_array_equal(
result["takes_up_aca_if_eligible"],
[True, False, False, False],
)


class TestResolveRate:
"""Verify _resolve_rate handles scalar and dict rates."""
Expand Down
67 changes: 67 additions & 0 deletions policyengine_us_data/tests/test_datasets/test_cps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,72 @@
import pytest
import numpy as np
import pandas as pd


def test_add_personal_variables_maps_current_health_coverage_flags():
from policyengine_us_data.datasets.cps.cps import add_personal_variables

person = pd.DataFrame(
{
"A_AGE": [30, 45],
"A_SEX": [2, 1],
"PEDISEYE": [0, 1],
"PEDISDRS": [0, 0],
"PEDISEAR": [0, 0],
"PEDISOUT": [0, 0],
"PEDISPHY": [0, 0],
"PEDISREM": [0, 0],
"PEPAR1": [0, 0],
"PEPAR2": [0, 0],
"PH_SEQ": [1, 1],
"A_LINENO": [1, 2],
"NOW_COV": [1, 0],
"NOW_DIR": [1, 0],
"NOW_MRK": [1, 0],
"NOW_MRKS": [1, 0],
"NOW_MRKUN": [0, 0],
"NOW_NONM": [0, 0],
"NOW_PRIV": [1, 0],
"NOW_PUB": [0, 1],
"NOW_GRP": [0, 1],
"NOW_CAID": [0, 1],
"NOW_MCAID": [0, 1],
"NOW_PCHIP": [0, 0],
"NOW_OTHMT": [0, 0],
"NOW_MCARE": [0, 0],
"NOW_MIL": [0, 0],
"NOW_CHAMPVA": [0, 0],
"NOW_VACARE": [0, 0],
"NOW_IHSFLG": [0, 0],
"PRDTRACE": [1, 2],
"PRDTHSP": [0, 1],
"A_MARITL": [1, 4],
"A_HSCOL": [0, 2],
"POCCU2": [39, 52],
}
)
cps = {}

add_personal_variables(cps, person)

np.testing.assert_array_equal(
cps["reported_has_marketplace_health_coverage_at_interview"],
[True, False],
)
np.testing.assert_array_equal(
cps["reported_has_means_tested_health_coverage_at_interview"],
[False, True],
)
np.testing.assert_array_equal(
cps["reported_is_uninsured_at_interview"],
[False, True],
)
np.testing.assert_array_equal(
cps["reported_has_multiple_health_coverage_at_interview"],
[False, True],
)
np.testing.assert_array_equal(cps["has_marketplace_health_coverage"], [True, False])
np.testing.assert_array_equal(cps["has_esi"], [False, True])


def test_cps_has_auto_loan_interest():
Expand Down
Loading
Loading