Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/windmill/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,18 @@ dhat = "0.3"
[package.metadata.component.target.dependencies]
"docs:plugin" = { path = "packages/sequent-core/src/wit/plugin_interface.wit" }

[lints.rustdoc]
missing_crate_level_docs = "deny"


[lints.rust]
missing_docs = "deny"


[lints.clippy]
missing_docs_in_private_items = "deny"
missing_errors_doc = "deny"
missing_panics_doc = "deny"
arithmetic_side_effects = "deny"
complexity = "deny"
style = "deny"
Expand Down
10 changes: 8 additions & 2 deletions packages/windmill/external-bin/generate_logs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: 2025 Sequent Tech Inc <legal@sequentech.io>
//
// SPDX-License-Identifier: AGPL-3.0-only

//! Command-line tool to generate a CSV report of activity logs from immudb.
use anyhow::{Context, Result};
use base64::engine::general_purpose;
use base64::Engine;
Expand Down Expand Up @@ -43,15 +43,21 @@ struct Cli {
}

#[derive(Deserialize, Debug)]
/// Configuration for the generate_logs tool.
struct Config {
/// Immudb URL
immudb_url: String,
/// Immudb username
immudb_user: String,
/// Immudb password
immudb_password: String,
elections: HashMap<String, String>, // election_id -> election_name (for CSV filename)
/// Election ID -> Election Name (for CSV filename)
elections: HashMap<String, String>,
}

// --- Helper Functions ---

/// Sanitizes a filename by replacing non-alphanumeric characters with underscores.
fn sanitize_filename(name: &str) -> String {
name.chars()
.map(|c| match c {
Expand Down
7 changes: 7 additions & 0 deletions packages/windmill/src/bin/beat.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(non_upper_case_globals)]
#![recursion_limit = "256"]
//! Celery Beat process for Windmill: registers periodic tasks and publishes them to RabbitMQ.
// SPDX-FileCopyrightText: 2025 Sequent Tech Inc <legal@sequentech.io>
//
// SPDX-License-Identifier: AGPL-3.0-only
Expand All @@ -18,19 +19,25 @@ use windmill::tasks::review_boards::review_boards;
use windmill::tasks::scheduled_events::scheduled_events;
use windmill::tasks::scheduled_reports::scheduled_reports;

/// Beat tick intervals for periodic tasks (all values are in seconds).
#[derive(Debug, Parser)]
#[command(name = "beat", about = "Windmill's periodic task scheduler.")]
struct CeleryOpt {
/// Interval between `review_boards` dispatches.
#[arg(short = 'r', long, default_value = "15")]
review_boards_interval: u64,
/// Interval between `scheduled_events` dispatches.
#[arg(short = 's', long, default_value = "10")]
schedule_events_interval: u64,
/// Interval between `scheduled_reports` dispatches.
#[arg(short = 'c', long, default_value = "60")]
schedule_reports_interval: u64,
/// Interval between `electoral_log_batch_dispatcher` dispatches.
#[arg(short = 'e', long, default_value = "5")]
electoral_log_interval: u64,
}

/// Starts the beat scheduler: loads env, wires periodic tasks, and blocks until shutdown.
#[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();
Expand Down
20 changes: 20 additions & 0 deletions packages/windmill/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(non_upper_case_globals)]
#![recursion_limit = "256"]
//! Celery worker binary for Windmill: runs the Celery app as a queue consumer or in produce-only mode.
// SPDX-FileCopyrightText: 2025 Sequent Tech Inc <legal@sequentech.io>
//
// SPDX-License-Identifier: AGPL-3.0-only
Expand All @@ -20,6 +21,11 @@ use windmill::services::celery_app::*;
use windmill::services::probe::{setup_probe, AppName};
use windmill::services::tasks_semaphore::init_semaphore;

/// Returns the AMQP queue name for `queue` prefixed with `ENV_SLUG`.
///
/// # Panics
///
/// Panics if `ENV_SLUG` is not set in the environment.
fn get_queue_name(queue: Queue) -> String {
let slug = std::env::var("ENV_SLUG")
.with_context(|| "missing env var ENV_SLUG")
Expand All @@ -38,28 +44,39 @@ lazy_static! {
static ref ELECTORAL_LOG_BATCH_QUEUE_NAME: String = get_queue_name(Queue::ElectoralLogBatch);
}

/// Celery options for the Windmill Celery worker process.
#[derive(Debug, Parser, Clone)]
#[command(name = "windmill", about = "Windmill task queue prosumer.")]
enum CeleryOpt {
/// Consume tasks from one or more AMQP queues.
Consume {
/// Queue names to bind.
#[arg(short, long, num_args(1..), default_values_t = vec![BEAT_QUEUE_NAME.clone()])]
queues: Vec<String>,
/// Maximum unacknowledged messages per consumer.
#[arg(short, long, default_value = "100")]
prefetch_count: u16,
/// When true, acknowledgements are sent after the task body returns.
#[arg(short, long)]
acks_late: bool,
/// Default retry cap Celery applies before marking a task failed.
#[arg(short, long, default_value = "4")]
task_max_retries: u32,
/// Retries when establishing the broker connection before exiting.
#[arg(short, long, default_value = "5")]
broker_connection_max_retries: u32,
/// Broker heartbeat interval in seconds.
#[arg(short = 'H', long, default_value = "10")]
heartbeat: u16,
/// Tokio worker thread count for the runtime (defaults to logical CPUs).
#[arg(short, long)]
worker_threads: Option<usize>,
},
/// Connect to the broker, log readiness, and exit without consuming.
Produce,
}

/// Finds duplicates in a vector of strings.
fn find_duplicates(input: Vec<&str>) -> Vec<&str> {
let mut occurrences = HashMap::new();
let mut duplicates = Vec::new();
Expand All @@ -77,6 +94,7 @@ fn find_duplicates(input: Vec<&str>) -> Vec<&str> {
duplicates
}

/// Resolves the async runtime worker thread count from `Consume` options or CPU count.
fn read_worker_threads(opt: &CeleryOpt) -> usize {
match opt.clone() {
CeleryOpt::Consume { worker_threads, .. } => worker_threads,
Expand All @@ -85,6 +103,7 @@ fn read_worker_threads(opt: &CeleryOpt) -> usize {
.unwrap_or(num_cpus::get())
}

/// Entry point: builds the multi-thread runtime and runs async worker.
fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv().ok();

Expand All @@ -106,6 +125,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}

/// Runs the Celery app.
async fn async_main(opt: CeleryOpt) -> Result<()> {
init_log(true);
setup_probe(AppName::WINDMILL).await;
Expand Down
3 changes: 3 additions & 0 deletions packages/windmill/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// SPDX-FileCopyrightText: 2025 Sequent Tech Inc <legal@sequentech.io>
//
// SPDX-License-Identifier: AGPL-3.0-only
//! Windmill executes background work for the Sequent platform: Celery tasks,
//! Hasura-backed Postgres access, Keycloak and vault integration, reports,
//! imports/exports, and WASM-backed plugins.
#![allow(clippy::too_many_arguments)]
#![recursion_limit = "256"]
#[macro_use]
Expand Down
64 changes: 59 additions & 5 deletions packages/windmill/src/postgres/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,28 @@ use crate::{
};

#[derive(Clone, Debug)]
/// Enrollment filters
pub struct EnrollmentFilters {
/// Current lifecycle or workflow status.
pub status: ApplicationStatus,
/// How the applicant was or will be verified.
pub verification_type: Option<ApplicationType>,
}
use anyhow::{anyhow, Context, Result};
use deadpool_postgres::Transaction;
use sequent_core::types::hasura::core::Application;
use serde_json::Value;
use tokio_postgres::row::Row;
// use tokio_postgres::types::ToSql;
use chrono::DateTime;
use chrono::Local;
use deadpool_postgres::Transaction;
use sequent_core::services::uuid_validation::parse_uuid_v4;
use sequent_core::types::hasura::core::Application;
use serde::Serialize;
use serde_json::json;
use serde_json::Value;
use tokio_postgres::row::Row;
use tokio_postgres::types::{Json, ToSql};
use tracing::{event, instrument, Level};
use uuid::Uuid;

/// Row representing Application wrapper
pub struct ApplicationWrapper(pub Application);

impl TryFrom<Row> for ApplicationWrapper {
Expand All @@ -53,6 +56,12 @@ impl TryFrom<Row> for ApplicationWrapper {
}))
}
}
/// Get permission label for a given post from the database.
///
/// # Errors
///
/// Returns an error if SQL preparation or execution fails,
/// if UUID or other parsing fails, or if row mapping is inconsistent.

#[instrument(err, skip(hasura_transaction))]
pub async fn get_permission_label_from_post(
Expand Down Expand Up @@ -105,6 +114,11 @@ pub async fn get_permission_label_from_post(

Ok((permission_label, area_id))
}
/// Insert application into the database.
///
/// # Errors
///
/// Returns an error if SQL preparation or execution fails, if UUID or other parsing fails, or if row mapping is inconsistent.

#[instrument(err, skip_all)]
pub async fn insert_application(
Expand Down Expand Up @@ -174,6 +188,12 @@ pub async fn insert_application(

Ok(())
}
/// Updates application status and returns the updated row when applicable.
///
/// # Errors
///
/// Returns an error if SQL preparation or execution fails,
/// if UUID or other parsing fails, or if row mapping is inconsistent.

#[instrument(err, skip_all)]
pub async fn update_application_status(
Expand Down Expand Up @@ -279,6 +299,17 @@ pub async fn update_application_status(

Ok(application)
}
/// Get applications for a given area, tenant and election event from the database.
///
/// # Errors
///
/// Returns an error if SQL preparation or execution fails,
/// if UUID or other parsing fails, or if row mapping is inconsistent.
///
/// # Panics
///
/// Panics only if internal SQL placeholder arithmetic overflows,
/// which is not expected in production-sized filters.

#[instrument(err, skip_all)]
pub async fn get_applications(
Expand Down Expand Up @@ -377,6 +408,17 @@ pub async fn get_applications(

Ok((results, last_offset))
}
/// Counts applications based on filters.
///
/// # Errors
///
/// Returns an error if SQL preparation or execution fails,
/// if UUID or other parsing fails, or if row mapping is inconsistent.
///
/// # Panics
///
/// Panics only if internal SQL placeholder arithmetic overflows,
/// which is not expected in production-sized filters.

#[instrument(err, skip_all)]
pub async fn count_applications(
Expand Down Expand Up @@ -488,6 +530,12 @@ pub async fn count_applications(

Ok(count)
}
/// Get applications for a given election from the database.
///
/// # Errors
///
/// Returns an error if SQL preparation or execution fails,
/// if UUID or other parsing fails, or if row mapping is inconsistent.

#[instrument(err, skip_all)]
pub async fn get_applications_by_election(
Expand Down Expand Up @@ -529,6 +577,12 @@ pub async fn get_applications_by_election(

Ok(results)
}
/// Insert applications into the database.
///
/// # Errors
///
/// Returns an error if SQL preparation or execution fails,
/// if UUID or other parsing fails, or if row mapping is inconsistent.

#[instrument(err, skip_all)]
pub async fn insert_applications(
Expand Down
Loading
Loading