Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
53428dc
fix(dashboard): hide auth/secret keys from settings API
pratyush618 May 30, 2026
6bd7787
fix(dashboard): enforce admin role on mutating routes
pratyush618 May 30, 2026
e547db8
fix(dashboard): Secure cookies and token-gate metrics endpoints
pratyush618 May 30, 2026
a40b68f
fix(dashboard): clear admin password from environ after read
pratyush618 May 30, 2026
2dedc4b
fix(webhooks): re-validate resolved IP at delivery and block redirects
pratyush618 May 30, 2026
58f8732
test(webhooks): allow private host for local delivery tests
pratyush618 May 30, 2026
cd83c6d
feat(serializers): add opt-in SignedSerializer (HMAC integrity)
pratyush618 May 30, 2026
1181945
fix(proxies): harden recipe signing key and warn when unsigned
pratyush618 May 30, 2026
5a14567
fix(proxies): close file allowlist prefix and symlink bypass
pratyush618 May 30, 2026
2a5c7bf
fix(interception): restrict type_path reconstruction to safe kinds
pratyush618 May 30, 2026
70cc81e
test(security): cover SignedSerializer, file allowlist, type_path guards
pratyush618 May 30, 2026
3477d79
fix(locks): mask foreign lock owner token in info()
pratyush618 May 30, 2026
8b5620d
fix(scaler): bind to localhost by default
pratyush618 May 30, 2026
e045a1a
fix(idempotency): use NUL domain separator in auto key
pratyush618 May 30, 2026
d0aa59b
fix(queue): cap serialized payload size at enqueue
pratyush618 May 30, 2026
c0203cb
fix(worker): drop exception repr from failure log
pratyush618 May 30, 2026
762137c
fix(dashboard): summarize tracebacks in job API responses
pratyush618 May 30, 2026
4531706
fix(scheduler): skip dispatch when claim errors
pratyush618 May 30, 2026
798bef2
fix(prefork): recover from poisoned slot mutex
pratyush618 May 30, 2026
fc9c71e
fix(workflows): cap fan-out child count
pratyush618 May 30, 2026
c4ea93e
fix(postgres): quote schema identifier in DDL
pratyush618 May 30, 2026
8bb4ee9
fix(redis): set native lock TTL and atomic reap
pratyush618 May 30, 2026
0bf3f63
docs(security): add security guide and SignedSerializer
pratyush618 May 30, 2026
c350c0f
test(dashboard): assert settings API hides auth keys
pratyush618 May 30, 2026
890b7cc
docs(serializers): document SignedSerializer
pratyush618 May 30, 2026
68b1435
Revert "fix(worker): drop exception repr from failure log"
pratyush618 May 30, 2026
ce0c786
docs(dashboard): document role enforcement, Secure cookies, metrics t…
pratyush618 May 30, 2026
8b1fc02
docs(webhooks): note delivery-time SSRF revalidation and no redirects
pratyush618 May 30, 2026
a4730b8
docs(locking): note owner_id masking via info()
pratyush618 May 30, 2026
557e525
fix(proxies): emit unsigned-recipe warning via else branch
pratyush618 May 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions crates/taskito-core/src/scheduler/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,19 +218,21 @@ impl Scheduler {
Ok(true)
}

/// Try to claim exactly-once execution. Returns `Ok(true)` if the claim
/// was taken (or recoverably failed and the caller should still attempt
/// dispatch), `Ok(false)` if the job was already claimed by another
/// scheduler.
/// Try to claim exactly-once execution. Returns `Ok(true)` only if the
/// claim was actually taken; `Ok(false)` if it was already claimed by
/// another scheduler **or** the claim attempt errored.
fn claim_for_dispatch(&self, job: &Job) -> Result<bool> {
match self.storage.claim_execution(&job.id, SCHEDULER_CLAIM_OWNER) {
Ok(true) => Ok(true),
Ok(false) => Ok(false),
Err(e) => {
// Don't drop the job on a transient claim error — proceed and
// let the worker handle the duplicate execution defensively.
warn!("claim_execution error for job {}: {e}", job.id);
Ok(true)
// Treat a claim error as "not claimed" and skip this tick. The
// job stays Running and the stale-reaper will requeue it. The
// previous behaviour (dispatch anyway) caused duplicate
// execution when several schedulers hit a transient storage
// error at once, since no claim row actually guarded the job.
warn!("claim_execution error for job {}; skipping: {e}", job.id);
Ok(false)
}
}
}
Expand Down
24 changes: 19 additions & 5 deletions crates/taskito-core/src/storage/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ fn validate_schema_name(schema: &str) -> Result<()> {
Ok(())
}

/// Quote a SQL identifier for safe interpolation. Postgres can't bind
/// identifiers as parameters, and while `validate_schema_name` already
/// restricts the schema to `[A-Za-z0-9_]`, quoting here makes the structural
/// safety explicit rather than relying solely on the validator.
fn pg_quote_ident(name: &str) -> String {
format!("\"{}\"", name.replace('"', "\"\""))
}

/// PostgreSQL-backed storage for the task queue, using Diesel ORM.
#[derive(Clone)]
pub struct PostgresStorage {
Expand Down Expand Up @@ -117,18 +125,24 @@ impl PostgresStorage {

pub fn conn(&self) -> Result<diesel::r2d2::PooledConnection<ConnectionManager<PgConnection>>> {
let mut conn = self.pool.get()?;
diesel::sql_query(format!("SET search_path TO {}", self.schema))
.execute(&mut conn)
.map_err(crate::error::QueueError::Storage)?;
diesel::sql_query(format!(
"SET search_path TO {}",
pg_quote_ident(&self.schema)
))
.execute(&mut conn)
.map_err(crate::error::QueueError::Storage)?;
Ok(conn)
}

fn run_migrations(&self) -> Result<()> {
let mut conn = self.conn()?;

// Postgres-only: ensure the target schema exists before any DDL runs.
diesel::sql_query(format!("CREATE SCHEMA IF NOT EXISTS {}", self.schema))
.execute(&mut conn)?;
diesel::sql_query(format!(
"CREATE SCHEMA IF NOT EXISTS {}",
pg_quote_ident(&self.schema)
))
.execute(&mut conn)?;

for sql in common_migrations::create_tables(&common_migrations::POSTGRES) {
diesel::sql_query(&sql).execute(&mut conn)?;
Expand Down
25 changes: 18 additions & 7 deletions crates/taskito-core/src/storage/redis_backend/locks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const EXTEND_LOCK_SCRIPT: &str = r#"
local current = redis.call('HGET', key, 'owner_id')
if current == owner then
redis.call('HSET', key, 'expires_at', new_expires)
redis.call('PEXPIREAT', key, tonumber(new_expires))
return 1
end
return 0
Expand All @@ -43,9 +44,23 @@ const ACQUIRE_LOCK_SCRIPT: &str = r#"
end
redis.call('HSET', key, 'lock_name', KEYS[2], 'owner_id', owner,
'acquired_at', acquired_at, 'expires_at', expires_at)
redis.call('PEXPIREAT', key, tonumber(expires_at))
return 1
"#;

/// Lua script: delete a lock only if it is still expired at delete time.
/// Re-checking inside the script closes the TOCTOU window where the SCAN-driven
/// reaper would HGET an expired lock, another client re-acquires it, and the
/// reaper then DELs the now-valid lock.
const REAP_LOCK_SCRIPT: &str = r#"
local exp = redis.call('HGET', KEYS[1], 'expires_at')
if exp and tonumber(exp) <= tonumber(ARGV[1]) then
redis.call('DEL', KEYS[1])
return 1
end
return 0
"#;

impl RedisStorage {
pub fn acquire_lock(&self, lock_name: &str, owner_id: &str, ttl_ms: i64) -> Result<bool> {
let mut conn = self.conn()?;
Expand Down Expand Up @@ -139,14 +154,10 @@ impl RedisStorage {
.query(&mut conn)
.map_err(map_err)?;

let reap = redis::Script::new(REAP_LOCK_SCRIPT);
for key in keys {
let expires_at: Option<i64> = conn.hget(&key, "expires_at").map_err(map_err)?;
if let Some(exp) = expires_at {
if exp <= now {
conn.del::<_, ()>(&key).map_err(map_err)?;
count += 1;
}
}
let deleted: i64 = reap.key(&key).arg(now).invoke(&mut conn).map_err(map_err)?;
count += deleted as u64;
}

cursor = next_cursor;
Expand Down
23 changes: 18 additions & 5 deletions crates/taskito-python/src/prefork/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,19 @@
//! reader (child finished normally) and the watchdog (child exceeded its
//! deadline).

use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, PoisonError};
use std::time::Instant;

/// Recover a slot guard from a poisoned mutex instead of panicking.
///
/// A panic in one reader/watchdog thread must not cascade into every later slot
/// operation (which would crash the whole prefork pool and abandon in-flight
/// jobs). The slot holds a plain `Option<ActiveJob>`, so reading through a
/// poisoned lock is safe. Mirrors the recovery used in the scheduler poller.
fn recover_poison<T>(poisoned: PoisonError<T>) -> T {
poisoned.into_inner()
}

/// Metadata about a job currently being executed by a child process.
#[derive(Clone)]
pub struct ActiveJob {
Expand All @@ -35,12 +45,15 @@ pub fn new_slots(n: usize) -> SlotState {
/// Atomically install `job` in slot `idx`, returning any previous occupant
/// (which would only happen on a programming error — children are sequential).
pub fn set(slots: &SlotState, idx: usize, job: ActiveJob) -> Option<ActiveJob> {
slots[idx].lock().expect("slot mutex poisoned").replace(job)
slots[idx]
.lock()
.unwrap_or_else(recover_poison)
.replace(job)
}

/// Atomically take whatever is in slot `idx`, leaving it empty.
pub fn take(slots: &SlotState, idx: usize) -> Option<ActiveJob> {
slots[idx].lock().expect("slot mutex poisoned").take()
slots[idx].lock().unwrap_or_else(recover_poison).take()
}

/// Atomically take the slot only if its deadline has passed at `now`.
Expand All @@ -49,7 +62,7 @@ pub fn take(slots: &SlotState, idx: usize) -> Option<ActiveJob> {
/// race-free: if a result arrived between the watchdog's scan and its
/// take, the reader will have cleared the slot first and we return `None`.
pub fn take_if_expired(slots: &SlotState, idx: usize, now: Instant) -> Option<ActiveJob> {
let mut guard = slots[idx].lock().expect("slot mutex poisoned");
let mut guard = slots[idx].lock().unwrap_or_else(recover_poison);
let expired = guard
.as_ref()
.and_then(|j| j.deadline)
Expand All @@ -68,7 +81,7 @@ pub fn take_if_expired(slots: &SlotState, idx: usize, now: Instant) -> Option<Ac
/// the scan never serialises with dispatch or completion.
pub fn find_by_job_id(slots: &SlotState, job_id: &str) -> Option<usize> {
for (idx, slot) in slots.iter().enumerate() {
let guard = slot.lock().expect("slot mutex poisoned");
let guard = slot.lock().unwrap_or_else(recover_poison);
if guard.as_ref().is_some_and(|j| j.job_id == job_id) {
return Some(idx);
}
Expand Down
11 changes: 11 additions & 0 deletions crates/taskito-python/src/py_queue/workflow_ops/fan_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ use taskito_workflows::{WorkflowNode, WorkflowNodeStatus, WorkflowStorage};
use crate::py_queue::workflow_ops::{build_metadata_json, workflow_storage};
use crate::py_queue::PyQueue;

/// Maximum number of children a single fan-out may expand into. Guards against
/// a task returning an enormous list and flooding storage + memory in one
/// transaction.
const MAX_FAN_OUT: usize = 10_000;

#[pymethods]
impl PyQueue {
/// Expand a fan-out node into N child nodes + jobs.
Expand Down Expand Up @@ -42,6 +47,12 @@ impl PyQueue {
"child_names and child_payloads must have the same length",
));
}
if child_names.len() > MAX_FAN_OUT {
return Err(PyValueError::new_err(format!(
"fan-out of {} children exceeds the limit of {MAX_FAN_OUT}",
child_names.len()
)));
}

let wf_storage = workflow_storage(self)?;
let run_id_owned = run_id.to_string();
Expand Down
25 changes: 20 additions & 5 deletions docs/content/docs/guides/dashboard/authentication.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ token on every state-changing request.
baseline). No third-party crypto dependency.
- **Sessions** are stored server-side under
`auth:session:<random_token>` with a 24-hour TTL. The token rides in
an `HttpOnly` + `SameSite=Strict` cookie named `taskito_session`.
an `HttpOnly` + `SameSite=Strict` + `Secure` cookie named
`taskito_session` (the `Secure` flag means HTTPS-only; for local HTTP
dev use `serve_dashboard(secure_cookies=False)` or
`taskito dashboard --insecure-cookies`).
- **CSRF** uses the double-submit pattern: a non-HttpOnly cookie named
`taskito_csrf` carries a per-session token that the SPA reads and
echoes back via the `X-CSRF-Token` header on POST/PUT/DELETE. The
Expand Down Expand Up @@ -100,7 +103,9 @@ All routes live under `/api/auth/`:

Every other route under `/api/` is auth-gated. Public exceptions:
`/health`, `/readiness`, `/metrics` (Prometheus), and the static SPA
assets.
assets. Set `TASKITO_DASHBOARD_METRICS_TOKEN` to additionally require an
`Authorization: Bearer <token>` header on `/metrics` and `/readiness`
(`/health` stays open for liveness probes).

## Headless requests

Expand Down Expand Up @@ -154,10 +159,20 @@ password auth — see [SSO (OAuth & OIDC)](/guides/dashboard/sso).
Operators can mix-and-match providers or run an OAuth-only deployment
by setting `TASKITO_DASHBOARD_PASSWORD_AUTH_ENABLED=false`.

## Roles

Two roles exist, and the role is **enforced server-side**:

- **admin** — full access. Every state-changing API route (cancel/replay
jobs, purge dead-letters, pause/resume queues, manage webhooks, edit
settings, manage users, edit task/queue overrides) requires this role.
- **viewer** — read-only access to dashboards and job data, plus their own
logout and change-password. Any mutating route returns `403 forbidden`.

The first user is always an admin; additional users can be created with
either role.

## Limitations

- **One role** today (`admin`). Read-only viewers and per-route
permissions are planned; the column already exists on the user
record.
- **Password rotation** has an endpoint but no UI yet — invoke
`POST /api/auth/change-password` directly.
7 changes: 5 additions & 2 deletions docs/content/docs/guides/extensibility/events-webhooks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,11 @@ your business logic.

### SSRF guard

Outbound webhook URLs are validated before the manager queues any
delivery. The guard rejects:
Outbound webhook URLs are validated both when a webhook is registered and
again **at delivery time**, so a hostname that is rebound to an internal
address after registration (DNS rebinding) is still refused on the actual
request. Redirects are not followed, so a `30x` response can't bounce
delivery to an internal host either. The guard rejects:

- Non-`http` / `https` schemes
- `localhost`, `*.local`, `*.internal`, `*.intranet`, `*.lan`,
Expand Down
34 changes: 34 additions & 0 deletions docs/content/docs/guides/extensibility/serializers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,40 @@ from taskito.serializers import EncryptedSerializer, MsgPackSerializer
queue = Queue(serializer=EncryptedSerializer(key=key, inner=MsgPackSerializer()))
```

### SignedSerializer

HMAC-SHA256 integrity tag for task arguments and results. Unlike
`EncryptedSerializer`, it does **not** hide the payload — it authenticates it.
A worker refuses to deserialize any bytes that were not produced with the
shared key, so an attacker who can write to the queue's storage (the SQLite
file, a Postgres table, or Redis) cannot smuggle in a forged payload.

```python
import os
from taskito.serializers import SignedSerializer, SmartSerializer

key = os.urandom(32) # share this across producers and workers
queue = Queue(serializer=SignedSerializer(SmartSerializer(), key))
```

<Callout type="warn" title="Why this matters">
The default serializer can execute code on load (cloudpickle handles
lambdas and arbitrary objects). Without signing, anyone able to write to
the backing store can achieve remote code execution on every worker that
dequeues a crafted job. `SignedSerializer` closes that path. The key must
be at least 32 bytes of CSPRNG output and identical on producers and
workers.
</Callout>

For both confidentiality **and** integrity, wrap one in the other:

```python
from taskito.serializers import EncryptedSerializer, SignedSerializer, SmartSerializer

inner = EncryptedSerializer(SmartSerializer(), key=enc_key)
queue = Queue(serializer=SignedSerializer(inner, sign_key))
```

## When to use each

| | CloudpickleSerializer | JsonSerializer | MsgPackSerializer | EncryptedSerializer |
Expand Down
1 change: 1 addition & 0 deletions docs/content/docs/guides/operations/meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"testing",
"job-management",
"troubleshooting",
"security",
"deployment",
"autoscaler",
"keda",
Expand Down
Loading
Loading