Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 39 additions & 193 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
<div align="center">

# taskito

A Rust-powered task queue for Python. No broker required — just SQLite or Postgres.

[![PyPI version](https://img.shields.io/pypi/v/taskito.svg)](https://pypi.org/project/taskito/)
[![Python versions](https://img.shields.io/pypi/pyversions/taskito.svg)](https://pypi.org/project/taskito/)
[![License](https://img.shields.io/pypi/l/taskito.svg)](https://github.com/ByteVeda/taskito/blob/master/LICENSE)

A Rust-powered task queue for Python. No broker required — just SQLite or Postgres.
</div>

```
pip install taskito # SQLite (default)
Expand All @@ -25,13 +29,13 @@ def add(a: int, b: int) -> int:
return a + b
```

**2. Start a worker** in one terminal:
**2. Start a worker:**

```bash
taskito worker --app tasks:queue
```

**3. Enqueue jobs** from another terminal or script:
**3. Enqueue jobs:**

```python
from tasks import add
Expand All @@ -42,213 +46,61 @@ print(job.result(timeout=10)) # 5

## Why taskito?

Most Python task queues require a separate broker (Redis, RabbitMQ) even for single-machine workloads. taskito embeds everything — storage, scheduling, and worker management — into a single `pip install` with no external dependencies beyond Python itself. For distributed setups, an optional Postgres backend enables multi-machine workers with the same API.
Most Python task queues need a separate broker (Redis, RabbitMQ) even for single-machine
workloads. taskito embeds storage, scheduling, and worker management into one `pip install`
with no external services. An optional Postgres backend adds multi-machine workers with the
same API.

The heavy lifting runs in Rust: a Tokio async scheduler, OS thread worker pool with `tokio::sync::mpsc` channels, and Diesel ORM over SQLite in WAL mode. Python's GIL is only held during task execution. For CPU-bound workloads, run with `--pool prefork` to spawn child processes with independent GILs and get true parallel speedup — see the [prefork guide](https://docs.byteveda.org/taskito/docs/guides/advanced-execution/prefork).
The engine runs in Rust — a Tokio async scheduler, an OS-thread worker pool, and Diesel over
SQLite in WAL mode. The GIL is held only during task execution; run `--pool prefork` for true
parallelism on CPU-bound work.

## Features

- **Priority queues** — higher priority jobs run first
- **Retry with exponential backoff** — automatic retries with jitter
- **Dead letter queue** — inspect and replay failed jobs
- **Rate limiting** — token bucket with `"100/m"` syntax
- **Task dependencies** — `depends_on` for DAG workflows with cascade cancel
- **Task workflows** — `chain`, `group`, `chord` primitives
- **Periodic tasks** — cron scheduling with seconds granularity
- **Progress tracking** — report and read progress from inside tasks
- **Job cancellation** — cancel pending or running jobs
- **Unique tasks** — deduplicate active jobs by key
- **Batch enqueue** — `task.map()` for high-throughput bulk inserts
- **Named queues** — route tasks to isolated queues
- **Hooks** — before/after/success/failure middleware
- **Per-task middleware** — `TaskMiddleware` with `before`/`after`/`on_retry` hooks
- **Pluggable serializers** — `CloudpickleSerializer` (default), `JsonSerializer`, or custom
- **Cancel running tasks** — cooperative cancellation with `check_cancelled()`
- **Soft timeouts** — `check_timeout()` inside tasks
- **Worker heartbeat** — monitor worker health via `queue.workers()`
- **Job expiration** — `expires` parameter for time-sensitive jobs
- **Exception filtering** — `retry_on` / `dont_retry_on` for selective retries
- **OpenTelemetry** — optional tracing integration via `pip install taskito[otel]`
- **Async support** — `await job.aresult()`, `await queue.astats()`
- **Web dashboard** — `taskito dashboard --app myapp:queue` serves a built-in monitoring UI
- **FastAPI integration** — `TaskitoRouter` for instant REST API over the queue
- **Postgres backend** — optional multi-machine storage via PostgreSQL
- **Events system** — subscribe to `JOB_COMPLETED`, `JOB_FAILED`, etc. with `queue.on_event()`
- **Webhooks** — `queue.add_webhook(url, events, secret)` with HMAC-SHA256 signing
- **Job archival** — `queue.archive(older_than=86400)`, `queue.list_archived()`
- **Queue pause/resume** — `queue.pause()`, `queue.resume()`, `queue.paused_queues()`
- **Circuit breakers** — `circuit_breaker={"threshold": 5, "window": 60, "cooldown": 300}`
- **Structured logging** — `current_job.log("msg", level=LogLevel.INFO, extra={...})`
- **CLI** — `taskito worker`, `taskito info --watch`, `taskito dashboard`

## Integrations

Install optional extras to unlock additional integrations:

| Extra | Install | What you get |
|-------|---------|--------------|
| **Flask** | `pip install taskito[flask]` | `Taskito(app)` extension, `flask taskito worker` CLI |
| **FastAPI** | `pip install taskito[fastapi]` | `TaskitoRouter` for instant REST API over the queue |
| **Django** | `pip install taskito[django]` | Admin integration, management commands |
| **OpenTelemetry** | `pip install taskito[otel]` | Distributed tracing with span-per-task |
| **Prometheus** | `pip install taskito[prometheus]` | `PrometheusMiddleware`, queue depth gauges, `/metrics` server |
| **Sentry** | `pip install taskito[sentry]` | `SentryMiddleware` with auto error capture and task tags |
| **Encryption** | `pip install taskito[encryption]` | `EncryptedSerializer` for at-rest payload encryption |
| **MsgPack** | `pip install taskito[msgpack]` | `MsgpackSerializer` for compact binary serialization |
| **Postgres** | `pip install taskito[postgres]` | Multi-machine workers via PostgreSQL backend |
| **Redis** | `pip install taskito[redis]` | Redis storage backend |
- **Reliability** — retries with exponential backoff, dead letter queue with replay, circuit breakers, exception filtering
- **Scheduling** — priorities, rate limiting, periodic (cron) tasks, delayed execution, job expiration
- **Workflows** — `chain` / `group` / `chord`, task dependencies with cascade cancel
- **Control** — cooperative cancellation, soft timeouts, unique/idempotent tasks, queue pause/resume
- **Observability** — web dashboard, events, HMAC-signed webhooks, structured logging, worker heartbeats
- **Extensibility** — pluggable serializers, per-task middleware, async API, Postgres/Redis backends

## Examples

### Retry with Backoff

```python
@queue.task(max_retries=5, retry_backoff=2.0)
def fetch_url(url: str) -> str:
return requests.get(url).text
```

### Priority Queues

```python
urgent_report.apply_async(args=[data], priority=10)
bulk_report.delay(data) # default priority 0
```

### Rate Limiting

```python
@queue.task(rate_limit="100/m")
def call_api(endpoint: str) -> dict:
return requests.get(endpoint).json()
```

### Task Dependencies

```python
download = fetch_file.delay("data.csv")
parsed = parse_file.apply_async(
args=["data.csv"],
depends_on=[download.id],
)
# parsed waits until download completes; if download fails, parsed is cancelled
```

### Workflows

```python
from taskito import chain, group, chord

# Sequential pipeline — each step receives the previous result
chain(fetch.s(url), parse.s(), store.s()).apply()

# Parallel fan-out
group(process.s(chunk) for chunk in chunks).apply()

# Parallel + callback when all complete
# Parallel fan-out, then a callback once all complete
chord([download.s(u) for u in urls], merge.s()).apply()
```

### Periodic Tasks

```python
@queue.periodic(cron="0 0 */6 * * *")
def cleanup_temp_files():
...
```

### Progress Tracking

```python
from taskito import current_job

@queue.task()
def train_model(epochs: int):
for i in range(epochs):
...
current_job.update_progress(int((i + 1) / epochs * 100))
```

### Hooks

```python
@queue.on_failure
def alert_on_failure(task_name, args, kwargs, error):
slack.post(f"Task {task_name} failed: {error}")
```

### Exception Filtering

```python
@queue.task(
max_retries=5,
retry_on=[ConnectionError, TimeoutError],
dont_retry_on=[ValueError],
)
def fetch_data(url: str) -> dict:
return requests.get(url).json()
```

### Per-Task Middleware

```python
from taskito import TaskMiddleware

class TimingMiddleware(TaskMiddleware):
def before(self, ctx):
ctx._start = time.time()

def after(self, ctx, result, error):
elapsed = time.time() - ctx._start
print(f"{ctx.task_name} took {elapsed:.2f}s")

@queue.task(middleware=[TimingMiddleware()])
def process(data):
...
```

### Delayed Scheduling

```python
# Run 30 minutes from now
reminder.apply_async(args=[user_id, msg], delay=1800)
```

### Unique Tasks

```python
report.apply_async(args=[user_id], unique_key=f"report:{user_id}")
# Second enqueue with same key is silently deduplicated while first is active
```

### FastAPI Integration

```python
from fastapi import FastAPI
from taskito.contrib.fastapi import TaskitoRouter

app = FastAPI()
app.include_router(TaskitoRouter(queue), prefix="/tasks")
# GET /tasks/stats, GET /tasks/jobs/{id}, GET /tasks/jobs/{id}/progress (SSE), ...
@queue.task(max_retries=5, retry_backoff=2.0, rate_limit="100/m")
def fetch_url(url: str) -> str:
return requests.get(url).text
```

### Batch Enqueue

```python
jobs = send_email.map([("alice@x.com",), ("bob@x.com",), ("carol@x.com",)])
```
More examples — dependencies, progress tracking, middleware, FastAPI — in the
**[docs](https://docs.byteveda.org/taskito)**.

### Async Support
## Integrations

```python
job = expensive_task.delay(data)
result = await job.aresult(timeout=30)
stats = await queue.astats()
```
| Extra | Install | What you get |
|-------|---------|--------------|
| **Flask** | `pip install taskito[flask]` | `Taskito(app)` extension, `flask taskito worker` CLI |
| **FastAPI** | `pip install taskito[fastapi]` | `TaskitoRouter` for instant REST API over the queue |
| **Django** | `pip install taskito[django]` | Admin integration, management commands |
| **OpenTelemetry** | `pip install taskito[otel]` | Distributed tracing with span-per-task |
| **Prometheus** | `pip install taskito[prometheus]` | `PrometheusMiddleware`, queue depth gauges, `/metrics` server |
| **Sentry** | `pip install taskito[sentry]` | `SentryMiddleware` with auto error capture and task tags |
| **Postgres** | `pip install taskito[postgres]` | Multi-machine workers via PostgreSQL backend |
| **Redis** | `pip install taskito[redis]` | Redis storage backend |

## Testing

taskito includes a built-in test mode — no worker needed:
Built-in test mode — no worker needed:

```python
def test_add():
Expand All @@ -259,10 +111,7 @@ def test_add():

## Documentation

Full documentation with guides, API reference, architecture diagrams, and examples:

**[Read the docs →](https://docs.byteveda.org/taskito)**

**[Read the docs →](https://docs.byteveda.org/taskito)** — guides, API reference, and architecture.
Coming from Celery? See the **[Migration Guide](https://docs.byteveda.org/taskito/docs/guides/operations/migration)**.

## Comparison
Expand All @@ -275,12 +124,9 @@ Coming from Celery? See the **[Migration Guide](https://docs.byteveda.org/taskit
| Rate limiting | **Yes** | Yes | No | Yes | No |
| Dead letter queue | **Yes** | No | Yes | No | No |
| Task dependencies | **Yes** | No | No | No | No |
| Task chaining | **Yes** | Yes | No | Yes | No |
| Built-in dashboard | **Yes** | No | No | No | No |
| FastAPI integration | **Yes** | No | No | No | No |
| Per-task middleware | **Yes** | No | No | Yes | No |
| Cancel running tasks | **Yes** | Yes | No | No | No |
| Custom serializers | **Yes** | Yes | No | No | No |
| Postgres backend | **Yes** | Yes | No | No | No |
| Setup | **`pip install`** | Broker + backend | Redis | Broker | Redis |

Expand Down
2 changes: 1 addition & 1 deletion crates/taskito-async/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-async"
version = "0.15.0"
version = "0.15.1"
edition = "2021"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/taskito-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-core"
version = "0.15.0"
version = "0.15.1"
edition = "2021"

[features]
Expand Down
2 changes: 1 addition & 1 deletion crates/taskito-python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-python"
version = "0.15.0"
version = "0.15.1"
edition = "2021"

[features]
Expand Down
2 changes: 1 addition & 1 deletion crates/taskito-workflows/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-workflows"
version = "0.15.0"
version = "0.15.1"
edition = "2021"

[features]
Expand Down
17 changes: 17 additions & 0 deletions docs/content/docs/more/changelog.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ description: "Release history for taskito — every notable change, fix, and fea

All notable changes to taskito are documented here.

## 0.15.1

### Fixed

- **Mobile navigation drawer closes on navigation.** The drawer previously
stayed open after tapping a link; it now closes on every navigation,
including re-selecting the current route.
- **Mobile menu shows the full navigation.** The Tasks and Webhooks links were
missing from the mobile menu. The sidebar and mobile menu now share a single
navigation definition so the two can no longer drift.
- **Job logs tab virtualized.** The job detail logs view renders only the
visible rows, keeping jobs with hundreds of log lines smooth to scroll.
- **Resources table loading state matches its layout.** The placeholder now
uses the table-shaped skeleton instead of a single opaque block.

---

## 0.15.0

See the [Upgrading to 0.15](/guides/operations/upgrading-0.15) guide for
Expand Down
2 changes: 1 addition & 1 deletion py_src/taskito/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,4 @@

__version__ = _get_version("taskito")
except PackageNotFoundError:
__version__ = "0.15.0"
__version__ = "0.15.1"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "maturin"

[project]
name = "taskito"
version = "0.15.0"
version = "0.15.1"
description = "Rust-powered task queue for Python. No broker required."
requires-python = ">=3.10"
license = { file = "LICENSE" }
Expand Down
Loading
Loading