diff --git a/changelog.d/top-tail-income-representation.added.md b/changelog.d/top-tail-income-representation.added.md new file mode 100644 index 00000000..e7b5bea5 --- /dev/null +++ b/changelog.d/top-tail-income-representation.added.md @@ -0,0 +1 @@ +Include PUF aggregate records and inject high-income PUF records into ExtendedCPS to improve top-tail income representation. diff --git a/policyengine_us_data/datasets/puf/puf.py b/policyengine_us_data/datasets/puf/puf.py index 040098c1..8fb6a82a 100644 --- a/policyengine_us_data/datasets/puf/puf.py +++ b/policyengine_us_data/datasets/puf/puf.py @@ -300,6 +300,81 @@ def decode_age_dependent(age_range: int) -> int: return rng.integers(low=lower, high=upper, endpoint=False) +MARS_IMPUTATION_PREDICTORS = [ + "E00200", + "E00300", + "E00600", + "E01000", + "E00900", + "E26270", + "E02400", + "E01500", + "XTOT", +] + + +def impute_aggregate_mars(puf: pd.DataFrame) -> pd.DataFrame: + """Impute MARS for aggregate records using QRF on income variables. + + PUF aggregate records (MARS=0) have complete income/deduction data + but no demographics. MARS is needed as a predictor for the + downstream demographic QRF in impute_missing_demographics(). + + We train a QRF on regular records' income profiles to predict + MARS, then impute it for the aggregate records. All other + demographics (AGERANGE, GENDER, EARNSPLIT, AGEDP1-3) are left + for impute_missing_demographics() to handle. + """ + from microimpute.models.qrf import QRF + + agg_mask = puf.MARS == 0 + n_agg = agg_mask.sum() + if n_agg == 0: + return puf + + reg = puf[puf.MARS != 0].copy() + + # Use available income variables as predictors for MARS + predictors = [c for c in MARS_IMPUTATION_PREDICTORS if c in puf.columns] + + # Train on a sample of regular records (sample() returns a copy) + train = reg.sample(n=min(10_000, len(reg)), random_state=42)[ + predictors + ["MARS"] + ].fillna(0) + + qrf = QRF() + fitted = qrf.fit( + X_train=train, + predictors=predictors, + imputed_variables=["MARS"], + ) + + # Predict MARS for aggregate records + agg_data = puf.loc[agg_mask, predictors].fillna(0) + predicted = fitted.predict(X_test=agg_data) + # QRF outputs continuous values; round to nearest valid MARS + mars_imputed = predicted["MARS"].values.round().astype(int) + mars_imputed = np.clip(mars_imputed, 1, 4) + puf.loc[agg_mask, "MARS"] = mars_imputed + + # Adjust XTOT for joint filers (need at least 2 exemptions) + joint_agg = agg_mask & (puf.MARS == 2) + puf.loc[joint_agg, "XTOT"] = puf.loc[joint_agg, "XTOT"].clip(lower=2) + + # DSI and EIC are predictors in the downstream demographic QRF: + # ultra-high-income filers are never dependents and never get EITC + if "DSI" in puf.columns: + puf.loc[agg_mask, "DSI"] = 0 + if "EIC" in puf.columns: + puf.loc[agg_mask, "EIC"] = 0 + + # AGERANGE, GENDER, EARNSPLIT, AGEDP1-3 are left unset — + # impute_missing_demographics() handles them via QRF using + # [E00200, MARS, DSI, EIC, XTOT] as predictors. + + return puf + + def preprocess_puf(puf: pd.DataFrame) -> pd.DataFrame: # Add variable renames puf.S006 = puf.S006 / 100 @@ -552,7 +627,7 @@ def generate(self): self.save_dataset(arrays) return - puf = puf[puf.MARS != 0] # Remove aggregate records + puf = impute_aggregate_mars(puf) original_recid = puf.RECID.values.copy() puf = preprocess_puf(puf)