Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions packages/windmill/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,32 @@ dhat = "0.3"

[lints.rustdoc]
missing_crate_level_docs = "deny"

broken_intra_doc_links = "deny"

[lints.rust]
missing_docs = "deny"

unsafe_code = "forbid"
private_interfaces = "warn"
private_bounds = "warn"
unnameable_types = "warn"
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage,coverage_nightly)'] }

[lints.clippy]
missing_docs_in_private_items = "deny"
missing_errors_doc = "deny"
missing_panics_doc = "deny"
doc_markdown = "deny"
unwrap_used = "deny"
panic = "deny"
shadow_unrelated = "deny"
print_stdout = "deny"
print_stderr = "deny"
indexing_slicing = "deny"
missing_const_for_fn = "deny"
future_not_send = "deny"
arithmetic_side_effects = "deny"
suspicious = "deny"
complexity = "deny"
style = "deny"
perf = "deny"
perf = "deny"
pedantic = "deny"
21 changes: 9 additions & 12 deletions packages/windmill/external-bin/generate_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct Cli {
}

#[derive(Deserialize, Debug)]
/// Configuration for the generate_logs tool.
/// Configuration for the `generate_logs` tool.
struct Config {
/// Immudb URL
immudb_url: String,
Expand All @@ -67,7 +67,7 @@ fn sanitize_filename(name: &str) -> String {
.collect()
}

/// Constructs the immudb board name from tenant_id and election_event_id.
/// Constructs the immudb board name from `tenant_id` and `election_event_id`.
/// Replicates logic from `packages/windmill/src/services/protocol_manager.rs`.
fn get_event_board_name(tenant_id: &str, election_event_id: &str) -> String {
let tenant: String = tenant_id
Expand All @@ -76,7 +76,7 @@ fn get_event_board_name(tenant_id: &str, election_event_id: &str) -> String {
.filter(|&c| c != '-')
.take(17)
.collect();
format!("tenant{}event{}", tenant, election_event_id)
format!("tenant{tenant}event{election_event_id}")
.chars()
.filter(|&c| c != '-')
.collect()
Expand All @@ -100,6 +100,7 @@ async fn connect_immudb(config: &Config) -> Result<Client> {
}

#[tokio::main]
#[allow(clippy::too_many_lines)]
async fn main() -> Result<()> {
// Initialize tracing subscriber
// Default to `info` level for this crate if RUST_LOG is not set.
Expand Down Expand Up @@ -155,7 +156,7 @@ async fn main() -> Result<()> {
client
.open_session(&board_name)
.await
.with_context(|| format!("Failed to open session to board: {}", board_name))?;
.with_context(|| format!("Failed to open session to board: {board_name}"))?;
info!(%board_name, "Successfully opened session to board.");

let mut total_rows_fetched: i32 = 0;
Expand Down Expand Up @@ -237,17 +238,13 @@ async fn main() -> Result<()> {
Some(id) => config
.elections
.get(id)
.map(|s| s.as_str())
.unwrap_or(id)
.to_string(),
.map_or(id.clone(), String::to_string),
None => "general_logs".to_string(),
};
let sanitized_stem = sanitize_filename(&filename_stem_key);

if !csv_writers.contains_key(&sanitized_stem) {
let csv_path = cli
.output_folder_path
.join(format!("{}.csv", sanitized_stem));
let csv_path = cli.output_folder_path.join(format!("{sanitized_stem}.csv"));
info!(file_path = %csv_path.display(), election_id_key = %filename_stem_key, "Creating new CSV file.");
let file = File::create(&csv_path).with_context(|| {
format!("Failed to create CSV file: {}", csv_path.display())
Expand Down Expand Up @@ -279,10 +276,10 @@ async fn main() -> Result<()> {
}
info!("Finished processing all batches from Immudb stream.");

for (filename_stem, writer) in csv_writers.iter_mut() {
for (filename_stem, writer) in &mut csv_writers {
writer
.flush()
.with_context(|| format!("Failed to flush CSV writer for {}", filename_stem))?;
.with_context(|| format!("Failed to flush CSV writer for {filename_stem}"))?;
info!(
filename_stem,
count = activity_log_written_counts.get(filename_stem).unwrap_or(&0),
Expand Down
38 changes: 19 additions & 19 deletions packages/windmill/src/bin/beat.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(non_upper_case_globals)]
#![recursion_limit = "256"]
//! Celery Beat process for Windmill: registers periodic tasks and publishes them to RabbitMQ.
//! Celery Beat process for Windmill: registers periodic tasks and publishes them to `RabbitMQ`.
// SPDX-FileCopyrightText: 2025 Sequent Tech Inc <legal@sequentech.io>
//
// SPDX-License-Identifier: AGPL-3.0-only
Expand All @@ -23,18 +23,18 @@ use windmill::tasks::scheduled_reports::scheduled_reports;
#[derive(Debug, Parser)]
#[command(name = "beat", about = "Windmill's periodic task scheduler.")]
struct CeleryOpt {
/// Interval between `review_boards` dispatches.
#[arg(short = 'r', long, default_value = "15")]
review_boards_interval: u64,
/// Interval between `scheduled_events` dispatches.
#[arg(short = 's', long, default_value = "10")]
schedule_events_interval: u64,
/// Interval between `scheduled_reports` dispatches.
#[arg(short = 'c', long, default_value = "60")]
schedule_reports_interval: u64,
/// Interval between `electoral_log_batch_dispatcher` dispatches.
#[arg(short = 'e', long, default_value = "5")]
electoral_log_interval: u64,
/// Interval between `review_boards` dispatches (seconds).
#[arg(short = 'r', long = "review-boards-interval", default_value = "15")]
review_boards: u64,
/// Interval between `scheduled_events` dispatches (seconds).
#[arg(short = 's', long = "schedule-events-interval", default_value = "10")]
schedule_events: u64,
/// Interval between `scheduled_reports` dispatches (seconds).
#[arg(short = 'c', long = "schedule-reports-interval", default_value = "60")]
schedule_reports: u64,
/// Interval between `electoral_log_batch_dispatcher` dispatches (seconds).
#[arg(short = 'e', long = "electoral-log-interval", default_value = "5")]
electoral_log: u64,
}

/// Starts the beat scheduler: loads env, wires periodic tasks, and blocks until shutdown.
Expand All @@ -50,22 +50,22 @@ async fn main() -> Result<()> {
tasks = [
review_boards::NAME => {
review_boards,
schedule = DeltaSchedule::new(Duration::from_secs(CeleryOpt::parse().review_boards_interval)),
schedule = DeltaSchedule::new(Duration::from_secs(CeleryOpt::parse().review_boards)),
args = (),
},
scheduled_events::NAME => {
scheduled_events,
schedule = DeltaSchedule::new(Duration::from_secs(CeleryOpt::parse().schedule_events_interval)),
args = (CeleryOpt::parse().schedule_events_interval),
schedule = DeltaSchedule::new(Duration::from_secs(CeleryOpt::parse().schedule_events)),
args = (CeleryOpt::parse().schedule_events),
},
scheduled_reports::NAME => {
scheduled_reports,
schedule = DeltaSchedule::new(Duration::from_secs(CeleryOpt::parse().schedule_reports_interval)),
args = (CeleryOpt::parse().schedule_events_interval),
schedule = DeltaSchedule::new(Duration::from_secs(CeleryOpt::parse().schedule_reports)),
args = (CeleryOpt::parse().schedule_events),
},
electoral_log_batch_dispatcher::NAME => {
electoral_log_batch_dispatcher,
schedule = DeltaSchedule::new(Duration::from_secs(CeleryOpt::parse().electoral_log_interval)),
schedule = DeltaSchedule::new(Duration::from_secs(CeleryOpt::parse().electoral_log)),
args = (),
},
],
Expand Down
47 changes: 27 additions & 20 deletions packages/windmill/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(non_upper_case_globals)]
#![recursion_limit = "256"]
#![allow(clippy::non_std_lazy_statics)]
//! Celery worker binary for Windmill: runs the Celery app as a queue consumer or in produce-only mode.
// SPDX-FileCopyrightText: 2025 Sequent Tech Inc <legal@sequentech.io>
//
Expand All @@ -17,7 +18,11 @@ use sequent_core::util::init_log::init_log;
use std::collections::HashMap;
use tokio::runtime::Builder;
use tracing::{event, Level};
use windmill::services::celery_app::*;
use windmill::services::celery_app::{
get_celery_app, get_worker_threads, set_acks_late, set_broker_connection_max_retries,
set_heartbeat, set_is_app_active, set_prefetch_count, set_queues, set_task_max_retries,
set_worker_threads, Queue,
};
use windmill::services::probe::{setup_probe, AppName};
use windmill::services::tasks_semaphore::init_semaphore;

Expand All @@ -26,22 +31,21 @@ use windmill::services::tasks_semaphore::init_semaphore;
/// # Panics
///
/// Panics if `ENV_SLUG` is not set in the environment.
fn get_queue_name(queue: Queue) -> String {
let slug = std::env::var("ENV_SLUG")
.with_context(|| "missing env var ENV_SLUG")
.unwrap();
fn get_queue_name(queue: &Queue) -> String {
let slug =
std::env::var("ENV_SLUG").expect("ENV_SLUG must be set before resolving AMQP queue names");
queue.queue_name(&slug)
}

lazy_static! {
static ref BEAT_QUEUE_NAME: String = get_queue_name(Queue::Beat);
static ref SHORT_QUEUE_NAME: String = get_queue_name(Queue::Short);
static ref ELECTORAL_LOG_BEAT_QUEUE_NAME: String = get_queue_name(Queue::ElectoralLogBeat);
static ref COMMUNICATION_QUEUE_NAME: String = get_queue_name(Queue::Communication);
static ref TALLY_QUEUE_NAME: String = get_queue_name(Queue::Tally);
static ref REPORTS_QUEUE_NAME: String = get_queue_name(Queue::Reports);
static ref IMPORT_EXPORT_QUEUE_NAME: String = get_queue_name(Queue::ImportExport);
static ref ELECTORAL_LOG_BATCH_QUEUE_NAME: String = get_queue_name(Queue::ElectoralLogBatch);
static ref BEAT_QUEUE_NAME: String = get_queue_name(&Queue::Beat);
static ref SHORT_QUEUE_NAME: String = get_queue_name(&Queue::Short);
static ref ELECTORAL_LOG_BEAT_QUEUE_NAME: String = get_queue_name(&Queue::ElectoralLogBeat);
static ref COMMUNICATION_QUEUE_NAME: String = get_queue_name(&Queue::Communication);
static ref TALLY_QUEUE_NAME: String = get_queue_name(&Queue::Tally);
static ref REPORTS_QUEUE_NAME: String = get_queue_name(&Queue::Reports);
static ref IMPORT_EXPORT_QUEUE_NAME: String = get_queue_name(&Queue::ImportExport);
static ref ELECTORAL_LOG_BATCH_QUEUE_NAME: String = get_queue_name(&Queue::ElectoralLogBatch);
}

/// Celery options for the Windmill Celery worker process.
Expand Down Expand Up @@ -76,11 +80,11 @@ enum CeleryOpt {
Produce,
}

/// Finds duplicates in a vector of strings.
fn find_duplicates(input: Vec<&str>) -> Vec<&str> {
/// Finds duplicates in a slice of queue name strings.
fn find_duplicates<'a>(input: &'a [&'a str]) -> Vec<&'a str> {
let mut occurrences = HashMap::new();
let mut duplicates = Vec::new();
for &item in &input {
for &item in input {
let count: &mut i32 = occurrences.entry(item).or_insert(0);
*count = (*count)
.checked_add(1)
Expand Down Expand Up @@ -126,6 +130,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}

/// Runs the Celery app.
///
/// `celery`'s broker delivery stream is not `Send`; the worker runs on a single runtime thread.
#[allow(clippy::future_not_send)]
async fn async_main(opt: CeleryOpt) -> Result<()> {
init_log(true);
setup_probe(AppName::WINDMILL).await;
Expand Down Expand Up @@ -157,15 +164,15 @@ async fn async_main(opt: CeleryOpt) -> Result<()> {
if queue_name.starts_with(&slug) {
queue_name.clone()
} else {
format!("{}_{}", slug, queue_name)
format!("{slug}_{queue_name}")
}
})
.collect();

let vec_str: Vec<&str> = queues.iter().map(AsRef::as_ref).collect();
let duplicates = find_duplicates(vec_str.clone());
let duplicates = find_duplicates(&vec_str);
if !duplicates.is_empty() {
return Err(anyhow!("Found duplicate queues: {:?}", duplicates));
return Err(anyhow!("Found duplicate queues: {duplicates:?}"));
}
set_queues(queues.clone());
set_is_app_active(true);
Expand All @@ -178,6 +185,6 @@ async fn async_main(opt: CeleryOpt) -> Result<()> {
event!(Level::INFO, "No new tasks to produce");
celery_app.close().await?;
}
};
}
Ok(())
}
Loading
Loading