Skip to content
Merged
4 changes: 2 additions & 2 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 0 additions & 8 deletions packages/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions packages/windmill/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ async-trait = "0.1"
clap = { version = "4.4", features = ["derive"] } # Added for generate_logs binary
dotenv = "0.15"
chrono = "0.4"
lazy_static = "1.4"
async_once = "0.2"
uuid = { version = "1.5", features = ["v4", "fast-rng"] }
toml = "0.8" # Updated for generate_logs binary, was 0.6.0
quick-error = "2.0"
Expand Down Expand Up @@ -129,3 +127,6 @@ rustls = { version = "0.23.32", features = ["ring"] }

[dev-dependencies]
dhat = "0.3"

[lints.rust]
unsafe_code = "forbid"
50 changes: 21 additions & 29 deletions packages/windmill/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@
//
// SPDX-License-Identifier: AGPL-3.0-only

extern crate lazy_static;
use lazy_static::lazy_static;

use anyhow::Context;
use anyhow::{anyhow, Result};
use celery::Celery;
use clap::Parser;
use dotenv::dotenv;
use sequent_core::util::init_log::init_log;
use std::collections::HashMap;
use std::sync::LazyLock;
use tokio::runtime::Builder;
use tracing::{event, Level};
use windmill::services::celery_app::*;
use windmill::services::celery_app::{self as celery_cfg, Queue};
use windmill::services::probe::{setup_probe, AppName};
use windmill::services::tasks_semaphore::init_semaphore;

Expand All @@ -27,16 +25,7 @@ fn get_queue_name(queue: Queue) -> String {
queue.queue_name(&slug)
}

lazy_static! {
static ref BEAT_QUEUE_NAME: String = get_queue_name(Queue::Beat);
static ref SHORT_QUEUE_NAME: String = get_queue_name(Queue::Short);
static ref ELECTORAL_LOG_BEAT_QUEUE_NAME: String = get_queue_name(Queue::ElectoralLogBeat);
static ref COMMUNICATION_QUEUE_NAME: String = get_queue_name(Queue::Communication);
static ref TALLY_QUEUE_NAME: String = get_queue_name(Queue::Tally);
static ref REPORTS_QUEUE_NAME: String = get_queue_name(Queue::Reports);
static ref IMPORT_EXPORT_QUEUE_NAME: String = get_queue_name(Queue::ImportExport);
static ref ELECTORAL_LOG_BATCH_QUEUE_NAME: String = get_queue_name(Queue::ElectoralLogBatch);
}
static BEAT_QUEUE_NAME: LazyLock<String> = LazyLock::new(|| get_queue_name(Queue::Beat));

#[derive(Debug, Parser, Clone)]
#[command(name = "windmill", about = "Windmill task queue prosumer.")]
Expand Down Expand Up @@ -88,13 +77,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

let opt = CeleryOpt::parse();

let cpus = read_worker_threads(&opt);
set_worker_threads(cpus);
let worker_threads = read_worker_threads(&opt);
celery_cfg::set_worker_threads(worker_threads);

// 1) Build a custom runtime
let rt = Builder::new_multi_thread()
.enable_all()
.worker_threads(cpus)
.worker_threads(worker_threads)
.thread_stack_size(8 * 1024 * 1024)
.build()?;

Expand All @@ -108,8 +97,8 @@ async fn async_main(opt: CeleryOpt) -> Result<()> {
init_log(true);
setup_probe(AppName::WINDMILL).await;

let cpus = get_worker_threads();
init_semaphore(cpus);
let cpus = celery_cfg::get_worker_threads();
init_semaphore(cpus).with_context(|| "failed to init semaphore")?;
let slug = std::env::var("ENV_SLUG").with_context(|| "missing env var ENV_SLUG")?;

match opt.clone() {
Expand All @@ -122,12 +111,15 @@ async fn async_main(opt: CeleryOpt) -> Result<()> {
heartbeat,
..
} => {
set_prefetch_count(prefetch_count);
set_acks_late(acks_late);
set_task_max_retries(task_max_retries);
set_broker_connection_max_retries(broker_connection_max_retries);
set_heartbeat(heartbeat);
let celery_app = get_celery_app().await;
celery_cfg::set_config(celery_cfg::CeleryConfig {
prefetch_count,
acks_late,
task_max_retries,
broker_connection_max_retries,
heartbeat_secs: heartbeat,
});

let celery_app = celery_cfg::get_celery_app().await;
celery_app.display_pretty().await;
let queues: Vec<String> = queues_input
.iter()
Expand All @@ -145,14 +137,14 @@ async fn async_main(opt: CeleryOpt) -> Result<()> {
if !duplicates.is_empty() {
return Err(anyhow!("Found duplicate queues: {:?}", duplicates));
}
set_queues(queues.clone());
set_is_app_active(true);
celery_cfg::set_queues(queues.clone());
celery_cfg::set_is_app_active(true);
celery_app.consume_from(&vec_str[..]).await?;
set_is_app_active(false);
celery_cfg::set_is_app_active(false);
celery_app.close().await?;
}
CeleryOpt::Produce => {
let celery_app = get_celery_app().await;
let celery_app = celery_cfg::get_celery_app().await;
event!(Level::INFO, "No new tasks to produce");
celery_app.close().await?;
}
Expand Down
3 changes: 0 additions & 3 deletions packages/windmill/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
//
// SPDX-License-Identifier: AGPL-3.0-only
#![recursion_limit = "256"]
#[macro_use]
extern crate lazy_static;

#[macro_use]
extern crate quick_error;

Expand Down
Loading
Loading