Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions cli/src/slack-active-pill.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { test, describe, before, after } from "node:test";
import { strict as assert } from "node:assert";
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";

// Point kanbanHome() at a tmp dir BEFORE importing the module under test so the
// persistence files land somewhere disposable per run.
let tmpHome: string;

before(() => {
tmpHome = mkdtempSync(join(tmpdir(), "kanban-active-pill-"));
process.env.KANBAN_CODE_HOME = tmpHome;
});
after(() => {
rmSync(tmpHome, { recursive: true, force: true });
delete process.env.KANBAN_CODE_HOME;
});

describe("active-pill persistence", () => {
test("write -> read round-trips the pill state", async () => {
const { writeActivePill, readActivePill } = await import("./slack/active-pill.js");
writeActivePill("agent-a", {
channelId: "C123",
threadTs: "1730000000.000100",
label: "is working…",
lastSetMs: 1730000000000,
});
const got = readActivePill("agent-a");
assert.deepEqual(got, {
channelId: "C123",
threadTs: "1730000000.000100",
label: "is working…",
lastSetMs: 1730000000000,
});
});

test("read returns undefined for an agent that has never had a pill", async () => {
const { readActivePill } = await import("./slack/active-pill.js");
assert.equal(readActivePill("never-existed"), undefined);
});

test("clear removes the file so a later read sees nothing", async () => {
const { writeActivePill, readActivePill, clearActivePill } = await import("./slack/active-pill.js");
writeActivePill("agent-b", {
channelId: "C2",
threadTs: "1.2",
label: "is working…",
lastSetMs: 1,
});
assert.notEqual(readActivePill("agent-b"), undefined);
clearActivePill("agent-b");
assert.equal(readActivePill("agent-b"), undefined);
});

test("clear on a missing file is a no-op (does not throw)", async () => {
const { clearActivePill } = await import("./slack/active-pill.js");
assert.doesNotThrow(() => clearActivePill("never-existed-also"));
});

test("read tolerates a corrupted file (returns undefined)", async () => {
const { writeFileSync, mkdirSync } = await import("node:fs");
const { join } = await import("node:path");
const dir = join(tmpHome, "active-pills");
mkdirSync(dir, { recursive: true });
writeFileSync(join(dir, "agent-corrupt"), "not json");
const { readActivePill } = await import("./slack/active-pill.js");
assert.equal(readActivePill("agent-corrupt"), undefined);
});

test("read rejects partial records (missing required fields)", async () => {
const { writeFileSync, mkdirSync } = await import("node:fs");
const { join } = await import("node:path");
const dir = join(tmpHome, "active-pills");
mkdirSync(dir, { recursive: true });
writeFileSync(
join(dir, "agent-partial"),
JSON.stringify({ channelId: "C", threadTs: "1.0" }), // missing label + lastSetMs
);
const { readActivePill } = await import("./slack/active-pill.js");
assert.equal(readActivePill("agent-partial"), undefined);
});
});
66 changes: 66 additions & 0 deletions cli/src/slack/active-pill.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { mkdirSync, writeFileSync, readFileSync, renameSync, unlinkSync } from "node:fs";
import { dirname, join } from "node:path";
import { kanbanHome } from "../paths.js";

/// On-disk record of the "is working…" pill the bridge has set on an
/// agent's current channel-root anchor. The bridge keeps an in-memory
/// `active` map already; this file mirrors that map so a bridge restart
/// (config-sync triggers one on every applied bundle) doesn't drop the
/// pill until the agent's NEXT text post relights it. An agent that
/// goes long on tools with no intermediate text — common when it's
/// debugging or planning — would otherwise have its channel look dead
/// for the rest of the turn after a restart.
///
/// Same pattern as thread-root: atomic write (tmp + rename) per slug,
/// stored under ~/.kanban-code/active-pills/<slug>. We keep the record
/// even after a pill is cleared on a non-terminal turn-end so the
/// restart path always has SOMETHING to re-light to.

export interface PersistedPill {
channelId: string;
threadTs: string;
label: string;
/// Wall-clock when the pill was last set/refreshed. Used by the
/// restore path: if a pill is much older than Slack's own idle
/// behaviour suggests an agent is still working (say, 10 minutes
/// without any refresh), we don't blindly re-light it on restart —
/// the agent probably finished and Slack has already cleared it.
lastSetMs: number;
}

function pillPath(slug: string): string {
return join(kanbanHome(), "active-pills", slug);
}

export function writeActivePill(slug: string, pill: PersistedPill): void {
if (!slug) return;
const path = pillPath(slug);
mkdirSync(dirname(path), { recursive: true });
const tmp = `${path}.tmp`;
writeFileSync(tmp, JSON.stringify(pill));
renameSync(tmp, path);
}

export function readActivePill(slug: string): PersistedPill | undefined {
try {
const raw = readFileSync(pillPath(slug), "utf-8");
const parsed = JSON.parse(raw) as Partial<PersistedPill>;
if (!parsed.channelId || !parsed.threadTs || !parsed.label) return undefined;
if (typeof parsed.lastSetMs !== "number") return undefined;
return parsed as PersistedPill;
} catch {
return undefined;
}
}

/// Best-effort delete. Used when the bridge explicitly clears a pill
/// (e.g. a terminal text post) so the next restart doesn't restore
/// something that was deliberately turned off.
export function clearActivePill(slug: string): void {
if (!slug) return;
try {
unlinkSync(pillPath(slug));
} catch {
/* file may not exist — that's fine */
}
}
44 changes: 42 additions & 2 deletions cli/src/slack/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { Runtime } from "../agents/runtime.js";
import { recordAnnounceSuppress } from "./announce-suppress.js";
import { WORKING_PILL_LABEL } from "./announce.js";
import { writeThreadRoot, readThreadRoot } from "./thread-root.js";
import { writeActivePill, readActivePill, clearActivePill } from "./active-pill.js";
import { downloadSlackFile, formatPromptWithAttachments, DownloadedFile, sweepInbox, DEFAULT_RETENTION_DAYS } from "./inbox.js";
import { parsePicker, Picker } from "./picker.js";
import { findSessionJsonl, findCodexRollout, pasteTmuxPrompt, captureTmuxPane, sendTmuxKey } from "../data.js";
Expand Down Expand Up @@ -166,6 +167,42 @@ export async function runSlackBridge(opts: BridgeOptions): Promise<void> {
interface ActivePill { channelId: string; threadTs: string; label: string; lastSetMs: number; }
const active = new Map<string /* slug */, ActivePill>();

/// Restore pills from disk on startup so a bridge restart (config-sync
/// triggers one on every bundle apply, sometimes several per hour while
/// we iterate) doesn't drop the pill until the agent's next text post.
/// We only restore pills that were set recently — `MAX_RESTORE_AGE_MS`
/// older and the agent has likely already finished its turn, Slack's
/// own idle TTL would have cleared the visual pill, and re-lighting
/// would falsely advertise active work.
const MAX_RESTORE_AGE_MS = 10 * 60_000;
for (const a of agents) {
const pill = readActivePill(a.slug);
if (!pill) continue;
if (Date.now() - pill.lastSetMs > MAX_RESTORE_AGE_MS) {
clearActivePill(a.slug);
continue;
}
active.set(a.slug, pill);
// Re-light immediately rather than waiting for the next refresh tick,
// so the channel shows "is working…" within seconds of bridge start
// (the gap that prompted this whole change). Best-effort; the
// refresh loop will retry every cycle anyway.
try {
await client.setStatus(pill.channelId, pill.threadTs, pill.label);
} catch (e) {
console.error(`setStatus restore for ${a.slug} failed:`, e);
}
}

function setActivePill(slug: string, pill: ActivePill): void {
active.set(slug, pill);
writeActivePill(slug, pill);
}
function dropActivePill(slug: string): void {
active.delete(slug);
clearActivePill(slug);
}

// Per-agent buffer of tool/thinking posts that have NOT yet been sent. We
// hold them until the next text post arrives (or a picker pops), then drain
// them all into the previous text's thread as a single message. This keeps
Expand Down Expand Up @@ -240,7 +277,7 @@ export async function runSlackBridge(opts: BridgeOptions): Promise<void> {
}
const ts = await client.post(t.channelId, post.text);
if (ts) writeThreadRoot(t.slug, ts);
active.delete(t.slug);
dropActivePill(t.slug);
// `terminal: true` posts (currently only the codex out-of-credits
// sentinel) are the final word of the turn — no more work coming.
// Skip the WORKING pill entirely so the channel doesn't show a
Expand All @@ -249,7 +286,7 @@ export async function runSlackBridge(opts: BridgeOptions): Promise<void> {
if (ts && !post.terminal) {
try {
await client.setStatus(t.channelId, ts, WORKING_LABEL);
active.set(t.slug, { channelId: t.channelId, threadTs: ts, label: WORKING_LABEL, lastSetMs: Date.now() });
setActivePill(t.slug, { channelId: t.channelId, threadTs: ts, label: WORKING_LABEL, lastSetMs: Date.now() });
} catch (e) {
console.error(`setStatus (text) for ${t.slug} failed:`, e);
}
Expand Down Expand Up @@ -279,6 +316,9 @@ export async function runSlackBridge(opts: BridgeOptions): Promise<void> {
try {
await client.setStatus(pill.channelId, pill.threadTs, pill.label);
pill.lastSetMs = now;
// Persist the refresh so a restart right after this tick keeps
// the pill fresh (and within the MAX_RESTORE_AGE_MS window).
writeActivePill(slug, pill);
} catch (e) {
console.error(`setStatus refresh for ${slug} failed:`, e);
}
Expand Down