Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/sequential-ss-shares.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use sequential share-of-remainder imputation for SS sub-components.
187 changes: 135 additions & 52 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@
"traditional_ira_contributions",
"roth_ira_contributions",
"self_employed_pension_contributions",
# Social Security sub-components
"social_security_retirement",
"social_security_disability",
"social_security_dependents",
"social_security_survivors",
# Social Security sub-components are handled separately via
# sequential share-of-remainder imputation -- see
# _impute_ss_subcomponents_sequential().
# Transfer income
"unemployment_compensation",
"tanf_reported",
Expand Down Expand Up @@ -149,8 +147,13 @@ def _impute_cps_only_variables(
)

# Load original (non-doubled) CPS for training data.
# Include SS sub-component columns for sequential imputation
# even though they're not in CPS_ONLY_IMPUTED_VARIABLES.
ss_train_cols = [v for v in SS_SUBCOMPONENT_SEQUENCE if v not in valid_outputs]
cps_sim = Microsimulation(dataset=dataset_path)
X_train = cps_sim.calculate_dataframe(all_predictors + valid_outputs)
X_train = cps_sim.calculate_dataframe(
all_predictors + valid_outputs + ss_train_cols
)

available_outputs = [col for col in valid_outputs if col in X_train.columns]
missing_outputs = [col for col in valid_outputs if col not in X_train.columns]
Expand Down Expand Up @@ -198,9 +201,32 @@ def _impute_cps_only_variables(
for var in missing_outputs:
predictions[var] = 0

# Apply domain constraints to retirement and SS variables.
# Apply domain constraints to retirement variables.
predictions = _apply_post_processing(predictions, X_test, time_period, data)

# SS sub-components: sequential share-of-remainder imputation.
# Each stage predicts "share of what's left", so the four
# components sum to total social_security by construction.
ss_vars_in_train = [v for v in SS_SUBCOMPONENT_SEQUENCE if v in X_train.columns]
if len(ss_vars_in_train) == len(SS_SUBCOMPONENT_SEQUENCE):
n_half = len(data["person_id"][time_period]) // 2
total_ss = data["social_security"][time_period][n_half:]
logger.info("Imputing SS sub-components via sequential shares")
ss_predictions = _impute_ss_subcomponents_sequential(
X_train=X_train,
X_test=X_test,
total_ss=total_ss,
predictors=all_predictors,
)
for col in SS_SUBCOMPONENT_SEQUENCE:
predictions[col] = ss_predictions[col].values
else:
logger.warning(
"SS sub-component vars missing from CPS training data, "
"skipping sequential imputation: %s",
set(SS_SUBCOMPONENT_SEQUENCE) - set(ss_vars_in_train),
)

logger.info(
"Stage-2 CPS-only imputation took %.2fs total",
time.time() - total_start,
Expand Down Expand Up @@ -259,38 +285,6 @@ def apply_retirement_constraints(predictions, X_test, time_period):
return result


def reconcile_ss_subcomponents(predictions, total_ss):
"""Normalize Social Security sub-components to sum to total.

Args:
predictions: DataFrame with columns for each SS
sub-component (retirement, disability, dependents,
survivors).
total_ss: numpy array of total social_security per record.

Returns:
DataFrame with reconciled dollar values.
"""
values = np.maximum(predictions.values, 0)
row_sums = values.sum(axis=1)
positive_mask = total_ss > 0

shares = np.zeros_like(values)
nonzero_rows = row_sums > 0
both = positive_mask & nonzero_rows
shares[both] = values[both] / row_sums[both, np.newaxis]
# If row_sum == 0 but total_ss > 0, distribute equally.
equal_rows = positive_mask & ~nonzero_rows
shares[equal_rows] = 1.0 / values.shape[1]

out = np.where(
positive_mask[:, np.newaxis],
shares * total_ss[:, np.newaxis],
0.0,
)
return pd.DataFrame(out, columns=predictions.columns)


_RETIREMENT_VARS = {
"traditional_401k_contributions",
"roth_401k_contributions",
Expand All @@ -299,16 +293,112 @@ def reconcile_ss_subcomponents(predictions, total_ss):
"self_employed_pension_contributions",
}

_SS_SUBCOMPONENT_VARS = {
# Ordered largest-to-smallest so early predictions carry
# the most signal and the smallest component absorbs rounding.
SS_SUBCOMPONENT_SEQUENCE = [
"social_security_retirement",
"social_security_survivors",
"social_security_disability",
"social_security_dependents",
"social_security_survivors",
}
]


def _impute_ss_subcomponents_sequential(X_train, X_test, total_ss, predictors):
"""Impute SS sub-components via sequential share-of-remainder.

Instead of predicting all four dollar amounts and normalizing,
we predict *shares of the remaining total* sequentially:

1. retirement_share = retirement / total (predict in [0,1])
retirement = retirement_share * total
2. disability_share = disability / (total - retirement)
disability = disability_share * remaining
3. survivors_share = survivors / (total - retirement - disability)
survivors = survivors_share * remaining
4. dependents = remaining (whatever is left)

This guarantees the four components sum to ``total_ss`` by
construction with no post-hoc normalization needed.

Args:
X_train: CPS training data with predictors + raw SS
sub-component columns.
X_test: PUF clone test data with predictors.
total_ss: 1-D array of total social_security per PUF clone.
predictors: list of predictor column names.

Returns:
DataFrame with one column per SS sub-component (dollar values).
"""
from microimpute.models.qrf import QRF

n = len(X_test)
results = {var: np.zeros(n) for var in SS_SUBCOMPONENT_SEQUENCE}
has_ss = total_ss > 0

if not has_ss.any():
return pd.DataFrame(results, index=X_test.index)

remaining_train = X_train[SS_SUBCOMPONENT_SEQUENCE].sum(axis=1).values
remaining_test = total_ss.copy()

# Augment predictors with running remaining-total so each
# stage conditions on what's left.
X_train_aug = X_train[predictors].copy()
X_test_aug = X_test[predictors].copy()

for i, var in enumerate(SS_SUBCOMPONENT_SEQUENCE[:-1]):
share_col = f"_share_{var}"

# Compute training shares: var / remaining, clipped to [0, 1].
raw_train = X_train[var].values
safe_remaining = np.where(remaining_train > 0, remaining_train, 1.0)
train_share = np.clip(raw_train / safe_remaining, 0, 1)
X_train_aug[share_col] = train_share
X_train_aug["_ss_remaining"] = remaining_train

X_test_aug["_ss_remaining"] = remaining_test

qrf = QRF(
log_level="WARNING",
memory_efficient=True,
max_train_samples=5000,
)
preds = qrf.fit_predict(
X_train=X_train_aug[predictors + ["_ss_remaining", share_col]],
X_test=X_test_aug[predictors + ["_ss_remaining"]],
predictors=predictors + ["_ss_remaining"],
imputed_variables=[share_col],
n_jobs=1,
)

share = np.clip(preds[share_col].values, 0, 1)
dollar = share * remaining_test
results[var] = np.where(has_ss, dollar, 0)

# Update remaining totals for next stage.
remaining_train = remaining_train - raw_train
remaining_train = np.maximum(remaining_train, 0)
remaining_test = remaining_test - dollar
remaining_test = np.maximum(remaining_test, 0)

# Last component is the remainder — no QRF needed.
last_var = SS_SUBCOMPONENT_SEQUENCE[-1]
results[last_var] = np.where(has_ss, remaining_test, 0)

logger.info(
"SS sequential imputation: shares %.1f%% / %.1f%% / %.1f%% / %.1f%%",
*(
np.sum(results[v][has_ss]) / np.sum(total_ss[has_ss]) * 100
for v in SS_SUBCOMPONENT_SEQUENCE
),
)

return pd.DataFrame(results, index=X_test.index)


def _apply_post_processing(predictions, X_test, time_period, data):
"""Apply retirement constraints and SS reconciliation."""
"""Apply retirement constraints (SS handled separately)."""
ret_cols = [c for c in predictions.columns if c in _RETIREMENT_VARS]
if ret_cols:
constrained = apply_retirement_constraints(
Expand All @@ -317,14 +407,6 @@ def _apply_post_processing(predictions, X_test, time_period, data):
for col in ret_cols:
predictions[col] = constrained[col]

ss_cols = [c for c in predictions.columns if c in _SS_SUBCOMPONENT_VARS]
if ss_cols:
n_half = len(data["person_id"][time_period]) // 2
total_ss = data["social_security"][time_period][n_half:]
reconciled = reconcile_ss_subcomponents(predictions[ss_cols], total_ss)
for col in ss_cols:
predictions[col] = reconciled[col]

return predictions


Expand Down Expand Up @@ -363,7 +445,8 @@ def _splice_cps_only_predictions(
if id_var in data:
entity_half_lengths[entity_key] = len(data[id_var][time_period]) // 2

for var in CPS_ONLY_IMPUTED_VARIABLES:
splice_vars = list(CPS_ONLY_IMPUTED_VARIABLES) + list(SS_SUBCOMPONENT_SEQUENCE)
for var in splice_vars:
if var not in data or var not in predictions.columns:
continue

Expand Down
Loading
Loading