Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 0 additions & 8 deletions packages/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/sequent-core/src/ballot_codec/multi_ballot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,7 @@ mod tests {
presentation: None,
created_at: None,
annotations: None,
tie_breaking_policy: None,
}
}

Expand Down
5 changes: 3 additions & 2 deletions packages/windmill/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ async-trait = "0.1"
clap = { version = "4.4", features = ["derive"] } # Added for generate_logs binary
dotenv = "0.15"
chrono = "0.4"
lazy_static = "1.4"
async_once = "0.2"
uuid = { version = "1.5", features = ["v4", "fast-rng"] }
toml = "0.8" # Updated for generate_logs binary, was 0.6.0
quick-error = "2.0"
Expand Down Expand Up @@ -142,3 +140,6 @@ dhat = "0.3"

[package.metadata.component.target.dependencies]
"docs:plugin" = { path = "packages/sequent-core/src/wit/plugin_interface.wit" }

[lints.rust]
unsafe_code = "forbid"
50 changes: 21 additions & 29 deletions packages/windmill/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@
//
// SPDX-License-Identifier: AGPL-3.0-only

extern crate lazy_static;
use lazy_static::lazy_static;

use anyhow::Context;
use anyhow::{anyhow, Result};
use celery::Celery;
use clap::Parser;
use dotenv::dotenv;
use sequent_core::util::init_log::init_log;
use std::collections::HashMap;
use std::sync::LazyLock;
use tokio::runtime::Builder;
use tracing::{event, Level};
use windmill::services::celery_app::*;
use windmill::services::celery_app::{self as celery_cfg, Queue};
use windmill::services::probe::{setup_probe, AppName};
use windmill::services::tasks_semaphore::init_semaphore;

Expand All @@ -27,16 +25,7 @@ fn get_queue_name(queue: Queue) -> String {
queue.queue_name(&slug)
}

lazy_static! {
static ref BEAT_QUEUE_NAME: String = get_queue_name(Queue::Beat);
static ref SHORT_QUEUE_NAME: String = get_queue_name(Queue::Short);
static ref ELECTORAL_LOG_BEAT_QUEUE_NAME: String = get_queue_name(Queue::ElectoralLogBeat);
static ref COMMUNICATION_QUEUE_NAME: String = get_queue_name(Queue::Communication);
static ref TALLY_QUEUE_NAME: String = get_queue_name(Queue::Tally);
static ref REPORTS_QUEUE_NAME: String = get_queue_name(Queue::Reports);
static ref IMPORT_EXPORT_QUEUE_NAME: String = get_queue_name(Queue::ImportExport);
static ref ELECTORAL_LOG_BATCH_QUEUE_NAME: String = get_queue_name(Queue::ElectoralLogBatch);
}
static BEAT_QUEUE_NAME: LazyLock<String> = LazyLock::new(|| get_queue_name(Queue::Beat));

#[derive(Debug, Parser, Clone)]
#[command(name = "windmill", about = "Windmill task queue prosumer.")]
Expand Down Expand Up @@ -88,13 +77,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

let opt = CeleryOpt::parse();

let cpus = read_worker_threads(&opt);
set_worker_threads(cpus);
let worker_threads = read_worker_threads(&opt);
celery_cfg::set_worker_threads(worker_threads);

// 1) Build a custom runtime
let rt = Builder::new_multi_thread()
.enable_all()
.worker_threads(cpus)
.worker_threads(worker_threads)
.thread_stack_size(8 * 1024 * 1024)
.build()?;

Expand All @@ -108,8 +97,8 @@ async fn async_main(opt: CeleryOpt) -> Result<()> {
init_log(true);
setup_probe(AppName::WINDMILL).await;

let cpus = get_worker_threads();
init_semaphore(cpus);
let cpus = celery_cfg::get_worker_threads();
init_semaphore(cpus).with_context(|| "failed to init semaphore")?;
let slug = std::env::var("ENV_SLUG").with_context(|| "missing env var ENV_SLUG")?;

match opt.clone() {
Expand All @@ -122,12 +111,15 @@ async fn async_main(opt: CeleryOpt) -> Result<()> {
heartbeat,
..
} => {
set_prefetch_count(prefetch_count);
set_acks_late(acks_late);
set_task_max_retries(task_max_retries);
set_broker_connection_max_retries(broker_connection_max_retries);
set_heartbeat(heartbeat);
let celery_app = get_celery_app().await;
celery_cfg::set_config(celery_cfg::CeleryConfig {
prefetch_count,
acks_late,
task_max_retries,
broker_connection_max_retries,
heartbeat_secs: heartbeat,
});

let celery_app = celery_cfg::get_celery_app().await;
celery_app.display_pretty().await;
let queues: Vec<String> = queues_input
.iter()
Expand All @@ -145,14 +137,14 @@ async fn async_main(opt: CeleryOpt) -> Result<()> {
if !duplicates.is_empty() {
return Err(anyhow!("Found duplicate queues: {:?}", duplicates));
}
set_queues(queues.clone());
set_is_app_active(true);
celery_cfg::set_queues(queues.clone());
celery_cfg::set_is_app_active(true);
celery_app.consume_from(&vec_str[..]).await?;
set_is_app_active(false);
celery_cfg::set_is_app_active(false);
celery_app.close().await?;
}
CeleryOpt::Produce => {
let celery_app = get_celery_app().await;
let celery_app = celery_cfg::get_celery_app().await;
event!(Level::INFO, "No new tasks to produce");
celery_app.close().await?;
}
Expand Down
3 changes: 0 additions & 3 deletions packages/windmill/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
//
// SPDX-License-Identifier: AGPL-3.0-only
#![recursion_limit = "256"]
#[macro_use]
extern crate lazy_static;

#[macro_use]
extern crate quick_error;

Expand Down
Loading
Loading