Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions crates/taskito-core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,20 @@ macro_rules! impl_storage {
namespace,
)
}
fn get_setting(&self, key: &str) -> $crate::error::Result<Option<String>> {
self.get_setting(key)
}
fn set_setting(&self, key: &str, value: &str) -> $crate::error::Result<()> {
self.set_setting(key, value)
}
fn delete_setting(&self, key: &str) -> $crate::error::Result<bool> {
self.delete_setting(key)
}
fn list_settings(
&self,
) -> $crate::error::Result<std::collections::HashMap<String, String>> {
self.list_settings()
}
}
};
}
Expand Down Expand Up @@ -881,4 +895,16 @@ impl Storage for StorageBackend {
namespace
)
}
fn get_setting(&self, key: &str) -> Result<Option<String>> {
delegate!(self, get_setting, key)
}
fn set_setting(&self, key: &str, value: &str) -> Result<()> {
delegate!(self, set_setting, key, value)
}
fn delete_setting(&self, key: &str) -> Result<bool> {
delegate!(self, delete_setting, key)
}
fn list_settings(&self) -> Result<std::collections::HashMap<String, String>> {
delegate!(self, list_settings)
}
}
16 changes: 13 additions & 3 deletions crates/taskito-core/src/storage/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use diesel::prelude::*;
use serde::{Deserialize, Serialize};

use super::schema::{
archived_jobs, circuit_breakers, dead_letter, distributed_locks, execution_claims,
job_dependencies, job_errors, jobs, periodic_tasks, queue_state, rate_limits, replay_history,
task_logs, task_metrics, workers,
archived_jobs, circuit_breakers, dashboard_settings, dead_letter, distributed_locks,
execution_claims, job_dependencies, job_errors, jobs, periodic_tasks, queue_state, rate_limits,
replay_history, task_logs, task_metrics, workers,
};

/// A row in the `jobs` table (for SELECT queries).
Expand Down Expand Up @@ -340,6 +340,16 @@ pub struct QueueStateRow {
pub paused_at: Option<i64>,
}

// ── Dashboard Settings ──────────────────────────────────────────

#[derive(Queryable, Selectable, Insertable, AsChangeset, Debug, Clone)]
#[diesel(table_name = dashboard_settings)]
pub struct DashboardSettingRow {
pub key: String,
pub value: String,
pub updated_at: i64,
}

// ── Distributed Locks ───────────────────────────────────────────

#[derive(Queryable, Selectable, QueryableByName, Debug, Clone)]
Expand Down
56 changes: 56 additions & 0 deletions crates/taskito-core/src/storage/postgres/dashboard_settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::collections::HashMap;

use diesel::prelude::*;

use super::super::models::DashboardSettingRow;
use super::super::schema::dashboard_settings;
use super::PostgresStorage;
use crate::error::Result;
use crate::job::now_millis;

impl PostgresStorage {
pub fn get_setting(&self, key: &str) -> Result<Option<String>> {
let mut conn = self.conn()?;
let row: Option<DashboardSettingRow> = dashboard_settings::table
.filter(dashboard_settings::key.eq(key))
.first::<DashboardSettingRow>(&mut conn)
.optional()?;
Ok(row.map(|r| r.value))
}

pub fn set_setting(&self, key: &str, value: &str) -> Result<()> {
let mut conn = self.conn()?;
let now = now_millis();
let row = DashboardSettingRow {
key: key.to_string(),
value: value.to_string(),
updated_at: now,
};
diesel::insert_into(dashboard_settings::table)
.values(&row)
.on_conflict(dashboard_settings::key)
.do_update()
.set((
dashboard_settings::value.eq(value),
dashboard_settings::updated_at.eq(now),
))
.execute(&mut conn)?;
Ok(())
}

pub fn delete_setting(&self, key: &str) -> Result<bool> {
let mut conn = self.conn()?;
let deleted =
diesel::delete(dashboard_settings::table.filter(dashboard_settings::key.eq(key)))
.execute(&mut conn)?;
Ok(deleted > 0)
}

pub fn list_settings(&self) -> Result<HashMap<String, String>> {
let mut conn = self.conn()?;
let rows: Vec<DashboardSettingRow> = dashboard_settings::table
.select(DashboardSettingRow::as_select())
.load(&mut conn)?;
Ok(rows.into_iter().map(|r| (r.key, r.value)).collect())
}
}
11 changes: 11 additions & 0 deletions crates/taskito-core/src/storage/postgres/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod archival;
mod circuit_breakers;
mod dashboard_settings;
mod dead_letter;
mod jobs;
mod locks;
Expand Down Expand Up @@ -491,6 +492,16 @@ impl PostgresStorage {
)
.execute(&mut conn)?;

// ── Dashboard Settings ───────────────────────────
diesel::sql_query(
"CREATE TABLE IF NOT EXISTS dashboard_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at BIGINT NOT NULL
)",
)
.execute(&mut conn)?;

Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::collections::HashMap;

use redis::Commands;

use super::{map_err, RedisStorage};
use crate::error::Result;

/// Redis key for the dashboard settings hash. All keys are stored under
/// a single hash so atomic ``HGETALL`` returns the full snapshot.
fn settings_key(storage: &RedisStorage) -> String {
storage.key(&["dashboard", "settings"])
}

impl RedisStorage {
pub fn get_setting(&self, key: &str) -> Result<Option<String>> {
let mut conn = self.conn()?;
let value: Option<String> = conn.hget(settings_key(self), key).map_err(map_err)?;
Ok(value)
}

pub fn set_setting(&self, key: &str, value: &str) -> Result<()> {
let mut conn = self.conn()?;
conn.hset::<_, _, _, ()>(settings_key(self), key, value)
.map_err(map_err)?;
Ok(())
}

pub fn delete_setting(&self, key: &str) -> Result<bool> {
let mut conn = self.conn()?;
let removed: i64 = conn.hdel(settings_key(self), key).map_err(map_err)?;
Ok(removed > 0)
}

pub fn list_settings(&self) -> Result<HashMap<String, String>> {
let mut conn = self.conn()?;
let map: HashMap<String, String> = conn.hgetall(settings_key(self)).map_err(map_err)?;
Ok(map)
}
}
1 change: 1 addition & 0 deletions crates/taskito-core/src/storage/redis_backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod archival;
mod circuit_breakers;
mod dashboard_settings;
mod dead_letter;
mod jobs;
mod locks;
Expand Down
8 changes: 8 additions & 0 deletions crates/taskito-core/src/storage/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,12 @@ diesel::table! {
}
}

diesel::table! {
dashboard_settings (key) {
key -> Text,
value -> Text,
updated_at -> BigInt,
}
}

diesel::allow_tables_to_appear_in_same_query!(jobs, job_dependencies);
49 changes: 49 additions & 0 deletions crates/taskito-core/src/storage/sqlite/dashboard_settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::collections::HashMap;

use diesel::prelude::*;

use super::super::models::DashboardSettingRow;
use super::super::schema::dashboard_settings;
use super::SqliteStorage;
use crate::error::Result;
use crate::job::now_millis;

impl SqliteStorage {
pub fn get_setting(&self, key: &str) -> Result<Option<String>> {
let mut conn = self.conn()?;
let row: Option<DashboardSettingRow> = dashboard_settings::table
.filter(dashboard_settings::key.eq(key))
.first::<DashboardSettingRow>(&mut conn)
.optional()?;
Ok(row.map(|r| r.value))
}

pub fn set_setting(&self, key: &str, value: &str) -> Result<()> {
let mut conn = self.conn()?;
let row = DashboardSettingRow {
key: key.to_string(),
value: value.to_string(),
updated_at: now_millis(),
};
diesel::replace_into(dashboard_settings::table)
.values(&row)
.execute(&mut conn)?;
Ok(())
}

pub fn delete_setting(&self, key: &str) -> Result<bool> {
let mut conn = self.conn()?;
let deleted =
diesel::delete(dashboard_settings::table.filter(dashboard_settings::key.eq(key)))
.execute(&mut conn)?;
Ok(deleted > 0)
}

pub fn list_settings(&self) -> Result<HashMap<String, String>> {
let mut conn = self.conn()?;
let rows: Vec<DashboardSettingRow> = dashboard_settings::table
.select(DashboardSettingRow::as_select())
.load(&mut conn)?;
Ok(rows.into_iter().map(|r| (r.key, r.value)).collect())
}
}
11 changes: 11 additions & 0 deletions crates/taskito-core/src/storage/sqlite/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod archival;
mod circuit_breakers;
mod dashboard_settings;
mod dead_letter;
mod jobs;
mod locks;
Expand Down Expand Up @@ -480,6 +481,16 @@ impl SqliteStorage {
)
.execute(&mut conn)?;

// ── Dashboard Settings ───────────────────────────
diesel::sql_query(
"CREATE TABLE IF NOT EXISTS dashboard_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at INTEGER NOT NULL
)",
)
.execute(&mut conn)?;

Ok(())
}
}
Expand Down
56 changes: 56 additions & 0 deletions crates/taskito-core/src/storage/sqlite/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,59 @@ fn test_enqueue_rejects_missing_dependency() {
let result = storage.enqueue(dep_job);
assert!(result.is_err());
}

#[test]
fn test_setting_get_returns_none_when_unset() {
let storage = test_storage();
assert_eq!(storage.get_setting("missing").unwrap(), None);
}

#[test]
fn test_setting_set_and_get() {
let storage = test_storage();
storage.set_setting("dashboard.title", "My Queue").unwrap();
assert_eq!(
storage.get_setting("dashboard.title").unwrap(),
Some("My Queue".to_string())
);
}

#[test]
fn test_setting_set_overwrites() {
let storage = test_storage();
storage.set_setting("k", "v1").unwrap();
storage.set_setting("k", "v2").unwrap();
assert_eq!(storage.get_setting("k").unwrap(), Some("v2".to_string()));
}

#[test]
fn test_setting_delete() {
let storage = test_storage();
storage.set_setting("k", "v").unwrap();
assert!(storage.delete_setting("k").unwrap());
assert_eq!(storage.get_setting("k").unwrap(), None);
// Deleting non-existent returns false.
assert!(!storage.delete_setting("k").unwrap());
}

#[test]
fn test_setting_list_returns_all() {
let storage = test_storage();
storage.set_setting("a", "1").unwrap();
storage.set_setting("b", "2").unwrap();
let all = storage.list_settings().unwrap();
assert_eq!(all.len(), 2);
assert_eq!(all.get("a"), Some(&"1".to_string()));
assert_eq!(all.get("b"), Some(&"2".to_string()));
}

#[test]
fn test_setting_preserves_unicode_and_json() {
let storage = test_storage();
let payload = r#"{"label":"Grafana ⏱️","url":"https://grafana.example/dash"}"#;
storage.set_setting("dashboard.links.0", payload).unwrap();
assert_eq!(
storage.get_setting("dashboard.links.0").unwrap(),
Some(payload.to_string())
);
}
11 changes: 11 additions & 0 deletions crates/taskito-core/src/storage/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,15 @@ pub trait Storage: Send + Sync + Clone {
offset: i64,
namespace: Option<&str>,
) -> Result<Vec<Job>>;

// ── Dashboard settings (key/value store) ─────────────────────

/// Fetch a single setting value by key, or ``None`` if unset.
fn get_setting(&self, key: &str) -> Result<Option<String>>;
/// Insert or update a setting.
fn set_setting(&self, key: &str, value: &str) -> Result<()>;
/// Delete a setting. Returns ``true`` if a row was removed.
fn delete_setting(&self, key: &str) -> Result<bool>;
/// Return all settings as a key→value map.
fn list_settings(&self) -> Result<std::collections::HashMap<String, String>>;
}
Loading
Loading