Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/top-tail-income-representation.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Include PUF aggregate records and inject high-income PUF records into ExtendedCPS to improve top-tail income representation.
145 changes: 145 additions & 0 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

logger = logging.getLogger(__name__)

# Minimum AGI for PUF records to be injected into the ExtendedCPS
AGI_INJECTION_THRESHOLD = 1_000_000

# CPS-only variables that should be QRF-imputed for the PUF clone half
# instead of naively duplicated from the CPS donor. These are
# income-correlated variables that exist only in the CPS; demographics,
Expand Down Expand Up @@ -446,8 +449,150 @@ def generate(self):

new_data = self._rename_imputed_to_inputs(new_data)
new_data = self._drop_formula_variables(new_data)
new_data = self._inject_high_income_puf_records(new_data)
self.save_dataset(new_data)

def _inject_high_income_puf_records(self, new_data):
"""Inject ultra-high-income PUF records into the extended CPS.

The CPS severely under-represents the top of the income
distribution ($5M+ AGI brackets have -95% to -99% calibration
errors). This directly appends PUF records with AGI above
AGI_INJECTION_THRESHOLD as additional households, giving the
reweighter actual high-income observations.
"""
from policyengine_us import Microsimulation

logger.info("Loading PUF for high-income record injection...")
puf_sim = Microsimulation(dataset=self.puf)

# Identify high-AGI households
hh_agi = puf_sim.calculate("adjusted_gross_income", map_to="household").values
all_hh_ids = puf_sim.calculate("household_id").values
high_mask_hh = hh_agi > AGI_INJECTION_THRESHOLD
high_hh_id_set = set(all_hh_ids[high_mask_hh])

n_high_hh = int(high_mask_hh.sum())
if n_high_hh == 0:
logger.info("No high-income PUF households found")
return new_data

logger.info(
"Injecting %d PUF households with AGI > $%d",
n_high_hh,
AGI_INJECTION_THRESHOLD,
)

# Load raw PUF arrays for entity mask construction
puf_data = puf_sim.dataset.load_dataset()

# Build entity-level masks
high_hh_id_arr = np.fromiter(high_hh_id_set, dtype=float)
person_mask = np.isin(
puf_data["person_household_id"],
high_hh_id_arr,
)
# In PUF, household = tax_unit = spm_unit = family
group_mask = np.isin(
puf_data["household_id"],
high_hh_id_arr,
)
# Marital units: find which belong to high-income persons
high_marital_ids = np.unique(puf_data["person_marital_unit_id"][person_mask])
marital_mask = np.isin(
puf_data["marital_unit_id"],
high_marital_ids,
)

entity_masks = {
"person": person_mask,
"household": group_mask,
"tax_unit": group_mask,
"spm_unit": group_mask,
"family": group_mask,
"marital_unit": marital_mask,
}

logger.info(
"High-income record counts: persons=%d, households=%d, marital_units=%d",
person_mask.sum(),
group_mask.sum(),
marital_mask.sum(),
)

# Compute ID offset to avoid collisions with existing data
id_offset = 0
for key in new_data:
if key.endswith("_id"):
vals = new_data[key][self.time_period]
if vals.dtype.kind in ("f", "i", "u"):
id_offset = max(id_offset, int(vals.max()))
id_offset += 1_000_000

tbs = puf_sim.tax_benefit_system

# For each variable, extract high-income PUF values and append
for variable in list(new_data.keys()):
var_meta = tbs.variables.get(variable)
if var_meta is None:
continue

entity = var_meta.entity.key
if entity not in entity_masks:
continue

mask = entity_masks[entity]

try:
puf_values = puf_sim.calculate(variable).values
except (KeyError, ValueError, RuntimeError) as e:
logger.warning(
"Could not calculate %s from PUF: %s",
variable,
e,
)
puf_values = np.zeros(int(mask.sum()))

if len(puf_values) != len(mask):
logger.warning(
"Length mismatch for %s: values=%d, mask=%d",
variable,
len(puf_values),
len(mask),
)
continue

puf_subset = puf_values[mask]

# Offset IDs to avoid collisions
if variable.endswith("_id") and puf_subset.dtype.kind in (
"f",
"i",
"u",
):
puf_subset = puf_subset + id_offset

# Match dtypes to avoid object arrays that HDF5 can't save
existing = new_data[variable][self.time_period]
try:
puf_subset = np.array(puf_subset).astype(existing.dtype)
except (ValueError, TypeError):
# Can't cast — pad with zeros/empty to keep lengths aligned
logger.warning(
"Padding %s with defaults: cannot cast PUF dtype %s to %s",
variable,
puf_subset.dtype,
existing.dtype,
)
puf_subset = np.zeros(len(puf_subset), dtype=existing.dtype)

new_data[variable][self.time_period] = np.concatenate(
[existing, puf_subset]
)

del puf_sim
return new_data

@classmethod
def _rename_imputed_to_inputs(cls, data):
"""Rename QRF-imputed formula vars to their leaf inputs.
Expand Down
77 changes: 76 additions & 1 deletion policyengine_us_data/datasets/puf/puf.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,81 @@ def decode_age_dependent(age_range: int) -> int:
return rng.integers(low=lower, high=upper, endpoint=False)


MARS_IMPUTATION_PREDICTORS = [
"E00200",
"E00300",
"E00600",
"E01000",
"E00900",
"E26270",
"E02400",
"E01500",
"XTOT",
]


def impute_aggregate_mars(puf: pd.DataFrame) -> pd.DataFrame:
"""Impute MARS for aggregate records using QRF on income variables.

PUF aggregate records (MARS=0) have complete income/deduction data
but no demographics. MARS is needed as a predictor for the
downstream demographic QRF in impute_missing_demographics().

We train a QRF on regular records' income profiles to predict
MARS, then impute it for the aggregate records. All other
demographics (AGERANGE, GENDER, EARNSPLIT, AGEDP1-3) are left
for impute_missing_demographics() to handle.
"""
from microimpute.models.qrf import QRF

agg_mask = puf.MARS == 0
n_agg = agg_mask.sum()
if n_agg == 0:
return puf

reg = puf[puf.MARS != 0].copy()

# Use available income variables as predictors for MARS
predictors = [c for c in MARS_IMPUTATION_PREDICTORS if c in puf.columns]

# Train on a sample of regular records (sample() returns a copy)
train = reg.sample(n=min(10_000, len(reg)), random_state=42)[
predictors + ["MARS"]
].fillna(0)

qrf = QRF()
fitted = qrf.fit(
X_train=train,
predictors=predictors,
imputed_variables=["MARS"],
)

# Predict MARS for aggregate records
agg_data = puf.loc[agg_mask, predictors].fillna(0)
predicted = fitted.predict(X_test=agg_data)
# QRF outputs continuous values; round to nearest valid MARS
mars_imputed = predicted["MARS"].values.round().astype(int)
mars_imputed = np.clip(mars_imputed, 1, 4)
puf.loc[agg_mask, "MARS"] = mars_imputed

# Adjust XTOT for joint filers (need at least 2 exemptions)
joint_agg = agg_mask & (puf.MARS == 2)
puf.loc[joint_agg, "XTOT"] = puf.loc[joint_agg, "XTOT"].clip(lower=2)

# DSI and EIC are predictors in the downstream demographic QRF:
# ultra-high-income filers are never dependents and never get EITC
if "DSI" in puf.columns:
puf.loc[agg_mask, "DSI"] = 0
if "EIC" in puf.columns:
puf.loc[agg_mask, "EIC"] = 0

# AGERANGE, GENDER, EARNSPLIT, AGEDP1-3 are left unset —
# impute_missing_demographics() handles them via QRF using
# [E00200, MARS, DSI, EIC, XTOT] as predictors.

return puf


def preprocess_puf(puf: pd.DataFrame) -> pd.DataFrame:
# Add variable renames
puf.S006 = puf.S006 / 100
Expand Down Expand Up @@ -552,7 +627,7 @@ def generate(self):
self.save_dataset(arrays)
return

puf = puf[puf.MARS != 0] # Remove aggregate records
puf = impute_aggregate_mars(puf)

original_recid = puf.RECID.values.copy()
puf = preprocess_puf(puf)
Expand Down
Loading