From cce3a1945376116b2bd1bb472a760a11b80d4535 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Wed, 11 Mar 2026 09:38:46 +0100 Subject: [PATCH 1/9] feat(scaler): add StackableScaler CRD, state machine, and scaling hooks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the StackableScaler custom resource and a generic reconciler that drives a multi-stage scaling state machine (Idle → PreScaling → Scaling → PostScaling → Idle). Operators implement the ScalingHooks trait to plug in product-specific logic (e.g. data offload before scale-down). Key components: - CRD types with serde/JsonSchema support for status subresource - Hook trait with pre_scale, post_scale, and on_failure callbacks - Reconciler that advances stages, patches status, and handles failures - JobTracker for coordinating async hook operations - ScalingContext helpers for direction detection and ordinal computation Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 22 +- Cargo.toml | 2 +- crates/stackable-operator/src/crd/mod.rs | 1 + .../src/crd/scaler/hooks.rs | 273 ++++++++ .../src/crd/scaler/job_tracker.rs | 161 +++++ .../stackable-operator/src/crd/scaler/mod.rs | 322 ++++++++++ .../src/crd/scaler/reconciler.rs | 590 ++++++++++++++++++ 7 files changed, 1359 insertions(+), 12 deletions(-) create mode 100644 crates/stackable-operator/src/crd/scaler/hooks.rs create mode 100644 crates/stackable-operator/src/crd/scaler/job_tracker.rs create mode 100644 crates/stackable-operator/src/crd/scaler/mod.rs create mode 100644 crates/stackable-operator/src/crd/scaler/reconciler.rs diff --git a/Cargo.lock b/Cargo.lock index 67f360b32..c2d13ffc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -746,7 +746,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -1426,7 +1426,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -1532,7 +1532,7 @@ dependencies = [ [[package]] name = "kube" version = "3.0.1" -source = "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5" +source = "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#1320643f8ce7f8189e03496ff1329d678d76224c" dependencies = [ "k8s-openapi", "kube-client", @@ -1544,7 +1544,7 @@ dependencies = [ [[package]] name = "kube-client" version = "3.0.1" -source = "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5" +source = "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#1320643f8ce7f8189e03496ff1329d678d76224c" dependencies = [ "base64", "bytes", @@ -1578,7 +1578,7 @@ dependencies = [ [[package]] name = "kube-core" version = "3.0.1" -source = "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5" +source = "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#1320643f8ce7f8189e03496ff1329d678d76224c" dependencies = [ "derive_more", "form_urlencoded", @@ -1596,7 +1596,7 @@ dependencies = [ [[package]] name = "kube-derive" version = "3.0.1" -source = "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5" +source = "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#1320643f8ce7f8189e03496ff1329d678d76224c" dependencies = [ "darling", "proc-macro2", @@ -1609,7 +1609,7 @@ dependencies = [ [[package]] name = "kube-runtime" version = "3.0.1" -source = "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5" +source = "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#1320643f8ce7f8189e03496ff1329d678d76224c" dependencies = [ "ahash", "async-broadcast", @@ -1740,7 +1740,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2475,7 +2475,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3188,7 +3188,7 @@ dependencies = [ "getrandom 0.4.1", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3888,7 +3888,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9ca62620b..f70930b8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ k8s-openapi = { version = "0.27.0", default-features = false, features = ["schem # We use rustls instead of openssl for easier portability, e.g. so that we can build stackablectl without the need to vendor (build from source) openssl # We use ring instead of aws-lc-rs, as this currently fails to build in "make run-dev" # We need a few schema fixes in kube, that went into main, but are not released yet -kube = { git = "https://github.com/kube-rs/kube-rs", rev = "fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5", version = "=3.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "admission", "rustls-tls", "ring"] } +kube = { git = "https://github.com/kube-rs/kube-rs", rev = "1320643f8ce7f8189e03496ff1329d678d76224c", version = "=3.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "admission", "rustls-tls", "ring"] } opentelemetry = "0.31.0" opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } opentelemetry-appender-tracing = "0.31.0" diff --git a/crates/stackable-operator/src/crd/mod.rs b/crates/stackable-operator/src/crd/mod.rs index 3beb69aa8..a07d6b632 100644 --- a/crates/stackable-operator/src/crd/mod.rs +++ b/crates/stackable-operator/src/crd/mod.rs @@ -8,6 +8,7 @@ pub mod authentication; pub mod git_sync; pub mod listener; pub mod s3; +pub mod scaler; /// A reference to a product cluster (for example, a `ZookeeperCluster`) /// diff --git a/crates/stackable-operator/src/crd/scaler/hooks.rs b/crates/stackable-operator/src/crd/scaler/hooks.rs new file mode 100644 index 000000000..b7881cfb0 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/hooks.rs @@ -0,0 +1,273 @@ +//! Scaling lifecycle hooks for [`StackableScaler`](super::StackableScaler). +//! +//! Operators implement [`ScalingHooks`] to run custom logic at each stage of a scaling +//! operation. Hook methods are called by [`reconcile_scaler`](super::reconcile_scaler) +//! during the appropriate state machine stage. + +use std::future::Future; + +use crate::client::Client; + +use super::FailedStage; + +/// Whether the replica change is an increase or decrease. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ScalingDirection { + /// Replica count is increasing (or unchanged). + Up, + /// Replica count is decreasing. + Down, +} + +impl ScalingDirection { + /// Derive the direction from current and desired replica counts. + /// Equal counts are treated as Up (no-op -- hooks still call Done immediately). + /// + /// # Parameters + /// + /// - `current`: The replica count in `status.replicas`. + /// - `desired`: The target replica count from `spec.replicas`. + pub fn from_replicas(current: i32, desired: i32) -> Self { + if desired >= current { + Self::Up + } else { + Self::Down + } + } +} + +/// Context passed to hook implementations on each reconcile invocation. +#[derive(Clone, Copy)] +pub struct ScalingContext<'a> { + /// Kubernetes client for API calls. + pub client: &'a Client, + /// Namespace of the StackableScaler (and its cluster). + pub namespace: &'a str, + /// Name of the role group being scaled (e.g. `"default"`). + pub role_group_name: &'a str, + /// The replica count before the current scaling operation started. + /// During `PreScaling` this equals `status.replicas`. During `Scaling` + /// and `PostScaling` it reflects the frozen `status.previous_replicas`, + /// so direction and ordinal calculations remain correct even after + /// `status.replicas` has been updated to the target value. + pub current_replicas: i32, + /// The replica count the operator is working towards. + pub desired_replicas: i32, + /// Whether this is a scale-up or scale-down — derived by operator-rs, + /// so the operator does not need to compare replica counts itself. + pub direction: ScalingDirection, +} + +impl ScalingContext<'_> { + /// Returns the StatefulSet pod ordinals that are being removed in a scale-down. + /// + /// For scale-up or no-op, returns an empty range. + /// For scale-down, returns `desired_replicas..current_replicas` — the ordinals + /// of pods that will be terminated once the StatefulSet is scaled. + pub fn removed_ordinals(&self) -> std::ops::Range { + if self.direction == ScalingDirection::Down { + self.desired_replicas..self.current_replicas + } else { + 0..0 + } + } + + /// Returns the StatefulSet pod ordinals that are being added in a scale-up. + /// + /// For scale-down or no-op, returns an empty range. + /// For scale-up, returns `current_replicas..desired_replicas`. + pub fn added_ordinals(&self) -> std::ops::Range { + if self.direction == ScalingDirection::Up && self.desired_replicas > self.current_replicas { + self.current_replicas..self.desired_replicas + } else { + 0..0 + } + } + + /// Whether this is a scale-down operation. + pub fn is_scale_down(&self) -> bool { + self.direction == ScalingDirection::Down + } + + /// Whether this is a scale-up operation (not a no-op where current == desired). + pub fn is_scale_up(&self) -> bool { + self.direction == ScalingDirection::Up && self.desired_replicas > self.current_replicas + } +} + +/// Return value from a hook invocation. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum HookOutcome { + /// Hook completed successfully — advance state machine to next stage. + Done, + /// Hook is still running — operator-rs will requeue and re-call on next reconcile. + InProgress, +} + +/// Condition information returned from `reconcile_scaler` for propagation to the cluster CR. +/// +/// The operator MUST apply this condition to its cluster resource status on every call +/// to `reconcile_scaler`. Using the return type enforces this at the call site. +#[derive(Debug)] +pub enum ScalingCondition { + /// No scaling in progress, or just completed successfully. + Healthy, + /// Scaling is actively in progress. + Progressing { + /// Human-readable name of the current [`ScalerStage`](super::ScalerStage). + stage: String, + }, + /// Scaling failed -- include details in the cluster CR condition message. + Failed { + /// Which stage failed. + stage: FailedStage, + /// The error message from the hook. + reason: String, + }, +} + +/// Result returned from `reconcile_scaler`. The operator MUST propagate `scaling_condition` +/// to the cluster CR status conditions on every reconcile call. +pub struct ScalingResult { + /// The `Action` to return from the operator's reconcile function for this role group. + pub action: kube::runtime::controller::Action, + /// Condition to merge into the cluster CR's `status.conditions`. + pub scaling_condition: ScalingCondition, +} + +/// Trait implemented by each product operator to provide scaling lifecycle hooks. +/// +/// All methods have default implementations that return `Done` immediately, so operators +/// only need to override the specific hooks they use. +/// +/// # Example +/// +/// ```rust,ignore +/// impl ScalingHooks for MyProductScalingHooks { +/// type Error = MyError; +/// +/// async fn pre_scale(&self, ctx: &ScalingContext<'_>) -> Result { +/// match ctx.direction { +/// ScalingDirection::Down => { +/// JobTracker::start_or_check(ctx.client, self.build_offload_job(ctx), ctx.namespace).await +/// } +/// ScalingDirection::Up => Ok(HookOutcome::Done), +/// } +/// } +/// } +/// ``` +pub trait ScalingHooks { + /// Error type returned by hook methods. + type Error: std::error::Error + Send + Sync + 'static; + + /// Called during the `PreScaling` stage on each reconcile. + /// Return `Done` to advance to `Scaling`, `InProgress` to requeue and re-check. + /// Return `Err` to transition to `Failed`. + fn pre_scale( + &self, + ctx: &ScalingContext<'_>, + ) -> impl Future> + Send { + let _ = ctx; + async { Ok(HookOutcome::Done) } + } + + /// Called during the `PostScaling` stage on each reconcile. + /// Return `Done` to return to `Idle`, `InProgress` to requeue and re-check. + /// Return `Err` to transition to `Failed`. + fn post_scale( + &self, + ctx: &ScalingContext<'_>, + ) -> impl Future> + Send { + let _ = ctx; + async { Ok(HookOutcome::Done) } + } + + /// Called when the state machine enters `Failed`. Best-effort cleanup. + /// + /// Errors returned here are logged at `warn` level but do not prevent the + /// `Failed` state from being written. + fn on_failure( + &self, + ctx: &ScalingContext<'_>, + failed_stage: &FailedStage, + ) -> impl Future> + Send { + let _ = (ctx, failed_stage); + async { Ok(()) } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn direction_scale_up() { + assert_eq!(ScalingDirection::from_replicas(3, 5), ScalingDirection::Up); + } + + #[test] + fn direction_scale_down() { + assert_eq!( + ScalingDirection::from_replicas(5, 3), + ScalingDirection::Down + ); + } + + #[test] + fn direction_equal_is_up() { + assert_eq!(ScalingDirection::from_replicas(3, 3), ScalingDirection::Up); + } + + // Helper methods are tested indirectly via ScalingDirection since they + // only depend on `direction`, `current_replicas`, and `desired_replicas`. + // ScalingContext requires a &Client reference, so we test the logic + // through the direction + replica helpers independently. + + #[test] + fn removed_ordinals_scale_down() { + let dir = ScalingDirection::Down; + let (current, desired) = (5, 3); + let range = if dir == ScalingDirection::Down { + desired..current + } else { + 0..0 + }; + assert_eq!(range.collect::>(), vec![3, 4]); + } + + #[test] + fn removed_ordinals_scale_up_is_empty() { + let dir = ScalingDirection::from_replicas(3, 5); + let (current, desired) = (3, 5); + let range = if dir == ScalingDirection::Down { + desired..current + } else { + 0..0 + }; + assert!(range.collect::>().is_empty()); + } + + #[test] + fn added_ordinals_scale_up() { + let dir = ScalingDirection::from_replicas(3, 5); + let (current, desired) = (3, 5); + let range = if dir == ScalingDirection::Up && desired > current { + current..desired + } else { + 0..0 + }; + assert_eq!(range.collect::>(), vec![3, 4]); + } + + #[test] + fn added_ordinals_equal_is_empty() { + let dir = ScalingDirection::from_replicas(3, 3); + let (current, desired) = (3, 3); + let range = if dir == ScalingDirection::Up && desired > current { + current..desired + } else { + 0..0 + }; + assert!(range.collect::>().is_empty()); + } +} diff --git a/crates/stackable-operator/src/crd/scaler/job_tracker.rs b/crates/stackable-operator/src/crd/scaler/job_tracker.rs new file mode 100644 index 000000000..e142d1367 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/job_tracker.rs @@ -0,0 +1,161 @@ +//! Kubernetes Job lifecycle management for scaling hooks. +//! +//! [`JobTracker`] provides an idempotent mechanism for running a Kubernetes `Job` as part +//! of a scaling hook. It uses server-side apply to create the Job if absent, then checks +//! completion status on each reconcile. +//! +//! [`job_name`] produces deterministic, DNS-safe Job names so the tracker can find the +//! same Job across reconcile calls without external state. + +use k8s_openapi::api::batch::v1::Job; +use kube::ResourceExt; +use snafu::{ResultExt, Snafu}; +use tracing::{debug, info, warn}; + +use crate::{client::Client, crd::scaler::hooks::HookOutcome}; + +/// Derive a stable, DNS-safe Job name from a scaler name and a stage label. +/// +/// The name is deterministic so `JobTracker` can find the same Job on requeue +/// without storing state externally. +/// +/// # Parameters +/// +/// - `scaler_name`: Name of the [`StackableScaler`](super::StackableScaler) resource. +/// - `stage`: A short label like `"pre-scale"` or `"post-scale"`. +/// +/// # Returns +/// +/// A DNS-safe string of at most 63 characters with no trailing hyphens. +pub fn job_name(scaler_name: &str, stage: &str) -> String { + let raw = format!("{scaler_name}-{stage}"); + // Kubernetes resource names must be <= 63 chars and valid DNS labels. + // Inputs are always ASCII (Kubernetes names), so byte-level truncation is safe. + let truncated = &raw[..raw.len().min(63)]; + truncated.trim_end_matches('-').to_string() +} + +/// Errors from managing a scaling hook [`Job`]. +#[derive(Debug, Snafu)] +pub enum JobTrackerError { + /// The server-side apply patch to create or update the Job failed. + #[snafu(display("failed to apply Job '{job_name}'"))] + ApplyJob { + #[snafu(source(from(crate::client::Error, Box::new)))] + source: Box, + /// The Kubernetes name of the Job that could not be applied. + job_name: String, + }, + /// The Job completed with one or more failed attempts. + #[snafu(display("Job '{job_name}' failed: {message}"))] + JobFailed { + /// The Kubernetes name of the failed Job. + job_name: String, + /// Diagnostic message including the failure count and a `kubectl logs` hint. + message: String, + }, +} + +/// Manages the lifecycle of a Kubernetes [`Job`] used as a scaling hook. +/// +/// Stateless -- the Job name is derived deterministically via [`job_name`], +/// so no persistent state is needed between reconciles. +pub struct JobTracker; + +impl JobTracker { + /// Ensures the Job exists (creates if absent via server-side apply), then checks completion. + /// + /// Returns: + /// - `Ok(HookOutcome::Done)` — Job succeeded; Job is deleted as cleanup. + /// - `Ok(HookOutcome::InProgress)` — Job is still running; requeue and re-call. + /// - `Err(JobTrackerError::JobFailed)` -- Job failed; caller should transition to `Failed`. + /// + /// # Parameters + /// + /// - `client`: Kubernetes client for server-side apply, get, and delete operations. + /// - `job`: The fully-constructed [`Job`] manifest. Its `.metadata.name` is used to + /// track it across reconcile calls -- use [`job_name`] to generate a stable name. + /// - `namespace`: The namespace in which to manage the Job. + pub async fn start_or_check( + client: &Client, + job: Job, + namespace: &str, + ) -> Result { + let name = job.name_any(); + + debug!(job = %name, namespace, "Applying hook Job (server-side apply)"); + + // Apply (server-side apply — idempotent; no-op if Job already exists). + // The response contains the full updated resource, so no separate GET is needed. + let current: Job = client + .apply_patch("stackable-operator", &job, &job) + .await + .context(ApplyJobSnafu { + job_name: name.clone(), + })?; + + let status = current.status.as_ref(); + let succeeded = status.and_then(|s| s.succeeded).unwrap_or(0); + let failed = status.and_then(|s| s.failed).unwrap_or(0); + + if succeeded > 0 { + info!(job = %name, namespace, "Hook Job completed successfully, cleaning up"); + // Best-effort cleanup — log errors but don't fail + if let Err(e) = client.delete(¤t).await { + warn!(job = %name, namespace, error = %e, "Failed to clean up completed Job — it will be retried on next reconcile"); + } + return Ok(HookOutcome::Done); + } + + if failed > 0 { + return Err(JobTrackerError::JobFailed { + job_name: name.clone(), + message: format!( + "{failed} attempt(s) failed — check pod logs with: kubectl logs -l job-name={name} -n {namespace}" + ), + }); + } + + debug!(job = %name, namespace, "Hook Job still running"); + Ok(HookOutcome::InProgress) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn job_name_is_stable_for_same_inputs() { + let name1 = job_name("my-scaler", "pre-scale"); + let name2 = job_name("my-scaler", "pre-scale"); + assert_eq!(name1, name2); + } + + #[test] + fn job_name_differs_for_different_stages() { + let pre = job_name("my-scaler", "pre-scale"); + let post = job_name("my-scaler", "post-scale"); + assert_ne!(pre, post); + } + + #[test] + fn job_name_max_63_chars() { + let long_name = "a".repeat(60); + let name = job_name(&long_name, "pre-scale"); + assert!(name.len() <= 63, "name too long: {}", name.len()); + } + + #[test] + fn job_name_no_trailing_hyphen() { + // If truncation lands on a hyphen, it should be stripped + let name = job_name("a".repeat(62).as_str(), "-x"); + assert!(!name.ends_with('-')); + } + + #[test] + fn job_name_format() { + let name = job_name("my-cluster-default", "pre-scale"); + assert_eq!(name, "my-cluster-default-pre-scale"); + } +} diff --git a/crates/stackable-operator/src/crd/scaler/mod.rs b/crates/stackable-operator/src/crd/scaler/mod.rs new file mode 100644 index 000000000..5a33d6dd0 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/mod.rs @@ -0,0 +1,322 @@ +//! Stackable scaler CRD and reconciliation framework. +//! +//! This module provides [`StackableScaler`], a Kubernetes custom resource that exposes a +//! `/scale` subresource so that a `HorizontalPodAutoscaler` can manage replica counts for +//! Stackable cluster role groups instead of targeting a `StatefulSet` directly. +//! +//! # State machine +//! +//! A [`StackableScaler`] progresses through stages tracked in [`ScalerStage`]: +//! +//! ```text +//! Idle → PreScaling → Scaling → PostScaling → Idle +//! ↘ ↘ ↘ +//! Failed +//! ``` +//! +//! Operators provide lifecycle hooks via the [`ScalingHooks`] trait and call +//! [`reconcile_scaler`] on every reconcile loop iteration for the relevant role group. + +use std::borrow::Cow; + +use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +pub mod hooks; +pub mod job_tracker; +pub mod reconciler; + +/// A type-erased cluster reference used in StackableScaler. +/// Does not carry apiVersion — CRD versioning handles conversions. +#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UnknownClusterRef { + /// The Kubernetes kind of the target cluster resource (e.g. "NifiCluster"). + pub kind: String, + /// The name of the target cluster resource within the same namespace. + pub name: String, +} + +/// Which stage of a scaling operation failed. +#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum FailedStage { + /// The [`ScalingHooks::pre_scale`] hook returned an error. + PreScaling, + /// The StatefulSet failed to reach the desired replica count. + Scaling, + /// The [`ScalingHooks::post_scale`] hook returned an error. + PostScaling, +} + +/// The current stage of the scaling state machine. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(tag = "stage", content = "details", rename_all = "camelCase")] +pub enum ScalerStage { + /// No scaling operation is in progress. + Idle, + /// Running the [`ScalingHooks::pre_scale`] hook (e.g. data offload). + PreScaling, + /// Waiting for the StatefulSet to converge to the new replica count. + Scaling, + /// Running the [`ScalingHooks::post_scale`] hook (e.g. cluster rebalance). + PostScaling, + /// A hook returned an error. The scaler stays here until manually reset. + Failed { + /// Which stage produced the error. + #[serde(rename = "failedAt")] + failed_at: FailedStage, + /// Human-readable error message from the hook. + reason: String, + }, +} + +/// Formats the stage name for logging and status messages. +/// +/// The `Failed` variant only includes the failed stage, not the full reason string, +/// to keep log messages concise. +impl std::fmt::Display for ScalerStage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Idle => write!(f, "Idle"), + Self::PreScaling => write!(f, "PreScaling"), + Self::Scaling => write!(f, "Scaling"), + Self::PostScaling => write!(f, "PostScaling"), + Self::Failed { failed_at, .. } => write!(f, "Failed({failed_at:?})"), + } + } +} + +/// Manual [`JsonSchema`] implementation because `schemars` does not support the +/// `#[serde(tag = "stage", content = "details")]` internally-tagged representation +/// used by this enum. +impl JsonSchema for ScalerStage { + fn schema_name() -> Cow<'static, str> { + "ScalerStage".into() + } + + fn json_schema(generator: &mut schemars::generate::SchemaGenerator) -> schemars::Schema { + schemars::json_schema!({ + "type": "object", + "required": ["stage"], + "properties": { + "stage": { + "type": "string", + "enum": ["idle", "preScaling", "scaling", "postScaling", "failed"] + }, + "details": { + "type": "object", + "properties": { + "failedAt": generator.subschema_for::(), + "reason": { "type": "string" } + } + } + } + }) + } +} + +/// The current state of the scaler, including when it last changed. +#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ScalerState { + /// The current stage of the scaler state machine. + pub stage: ScalerStage, + /// When this stage was entered. + pub last_transition_time: Time, +} + +/// Status of a StackableScaler. +#[derive(Clone, Debug, Default, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct StackableScalerStatus { + /// The replica count currently targeted by the managed StatefulSet. Exposed via the `/scale` subresource for HPA consumption. + pub replicas: i32, + /// Label selector string for HPA pod counting. Written at `.status.selector`. + #[serde(skip_serializing_if = "Option::is_none")] + pub selector: Option, + /// The target replica count for the in-progress scaling operation. `None` when idle. + #[serde(skip_serializing_if = "Option::is_none")] + pub desired_replicas: Option, + /// The replica count when the current scaling operation started. `None` when idle. + /// Used to derive [`ScalingDirection`] correctly across all stages, because + /// `status.replicas` is overwritten to the target value during the `Scaling` stage. + #[serde(skip_serializing_if = "Option::is_none")] + pub previous_replicas: Option, + /// The current state machine stage and its transition timestamp. + #[serde(skip_serializing_if = "Option::is_none")] + pub current_state: Option, +} + +/// A StackableScaler exposes a /scale subresource so that a Kubernetes +/// HorizontalPodAutoscaler can target it instead of a StatefulSet directly. +#[derive(Clone, CustomResource, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[kube( + group = "autoscaling.stackable.tech", + version = "v1alpha1", + kind = "StackableScaler", + namespaced, + status = "StackableScalerStatus", + scale = r#"{"specReplicasPath":".spec.replicas","statusReplicasPath":".status.replicas","labelSelectorPath":".status.selector"}"# +)] +#[serde(rename_all = "camelCase")] +pub struct StackableScalerSpec { + /// Desired replica count. Written by the HPA via the /scale subresource. + /// Only takes effect when the referenced roleGroup has `replicas: 0`. + pub replicas: i32, + /// Reference to the Stackable cluster resource this scaler manages. + pub cluster_ref: UnknownClusterRef, + /// The role within the cluster (e.g. `nodes`). + pub role: String, + /// The role group within the role (e.g. `default`). + pub role_group: String, +} + +/// Resolve the replica count for a StatefulSet, taking an optional [`StackableScaler`] into account. +/// +/// A scaler is only effective when `role_group_replicas` is `Some(0)` — this is the platform +/// convention that signals "externally managed replicas". In all other cases the role group +/// value is used unchanged. +/// +/// Always call this instead of reading `role_group.replicas` directly when building a StatefulSet, +/// to ensure scaler-managed role groups are handled consistently. +/// +/// # Parameters +/// +/// - `role_group_replicas`: The replica count from the role group config. `Some(0)` signals +/// externally-managed replicas (the scaler's value is used). Any other value is returned unchanged. +/// - `scaler`: The [`StackableScaler`] for this role group, if one exists. Only consulted +/// when `role_group_replicas` is `Some(0)`. +/// +/// # Returns +/// +/// The effective replica count, or `None` if the scaler has no status yet. +pub fn resolve_replicas( + role_group_replicas: Option, + scaler: Option<&StackableScaler>, +) -> Option { + match (role_group_replicas, scaler) { + (Some(0), Some(s)) => s.status.as_ref().map(|st| st.replicas), + (replicas, _) => replicas, + } +} + +pub use hooks::{ + HookOutcome, ScalingCondition, ScalingContext, ScalingDirection, ScalingHooks, ScalingResult, +}; +pub use job_tracker::{JobTracker, JobTrackerError, job_name}; +pub use reconciler::{Error as ReconcilerError, reconcile_scaler}; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn scaler_stage_idle_serializes() { + let stage = ScalerStage::Idle; + let json = serde_json::to_value(&stage).unwrap(); + assert_eq!(json["stage"], "idle"); + } + + #[test] + fn scaler_stage_failed_serializes() { + let stage = ScalerStage::Failed { + failed_at: FailedStage::PreScaling, + reason: "timeout".to_string(), + }; + let json = serde_json::to_value(&stage).unwrap(); + assert_eq!(json["stage"], "failed"); + assert_eq!(json["details"]["failedAt"], "preScaling"); + assert_eq!(json["details"]["reason"], "timeout"); + } + + #[test] + fn spec_round_trips() { + let spec = StackableScalerSpec { + replicas: 3, + cluster_ref: UnknownClusterRef { + kind: "NifiCluster".to_string(), + name: "my-nifi".to_string(), + }, + role: "nodes".to_string(), + role_group: "default".to_string(), + }; + let json = serde_json::to_string(&spec).unwrap(); + let back: StackableScalerSpec = serde_json::from_str(&json).unwrap(); + assert_eq!(spec, back); + } + + #[test] + fn resolve_replicas_no_scaler_uses_role_group() { + assert_eq!(resolve_replicas(Some(3), None), Some(3)); + } + + #[test] + fn resolve_replicas_none_role_group_no_scaler() { + assert_eq!(resolve_replicas(None, None), None); + } + + #[test] + fn resolve_replicas_zero_with_scaler_uses_status() { + let mut scaler = StackableScaler::new( + "test", + StackableScalerSpec { + replicas: 5, + cluster_ref: UnknownClusterRef { + kind: "NifiCluster".into(), + name: "n".into(), + }, + role: "nodes".into(), + role_group: "default".into(), + }, + ); + scaler.status = Some(StackableScalerStatus { + replicas: 3, + ..Default::default() + }); + assert_eq!(resolve_replicas(Some(0), Some(&scaler)), Some(3)); + } + + #[test] + fn resolve_replicas_nonzero_with_scaler_ignores_scaler() { + // role_group.replicas != 0 → scaler is not active (validation webhook should prevent this, + // but we defensively fall back to the role group value) + let mut scaler = StackableScaler::new( + "test", + StackableScalerSpec { + replicas: 5, + cluster_ref: UnknownClusterRef { + kind: "NifiCluster".into(), + name: "n".into(), + }, + role: "nodes".into(), + role_group: "default".into(), + }, + ); + scaler.status = Some(StackableScalerStatus { + replicas: 4, + ..Default::default() + }); + assert_eq!(resolve_replicas(Some(3), Some(&scaler)), Some(3)); + } + + #[test] + fn resolve_replicas_zero_scaler_no_status_returns_none() { + // Scaler exists but has no status yet (just created) → return None (don't set replicas) + let scaler = StackableScaler::new( + "test", + StackableScalerSpec { + replicas: 5, + cluster_ref: UnknownClusterRef { + kind: "NifiCluster".into(), + name: "n".into(), + }, + role: "nodes".into(), + role_group: "default".into(), + }, + ); + assert_eq!(resolve_replicas(Some(0), Some(&scaler)), None); + } +} diff --git a/crates/stackable-operator/src/crd/scaler/reconciler.rs b/crates/stackable-operator/src/crd/scaler/reconciler.rs new file mode 100644 index 000000000..b704f53a1 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/reconciler.rs @@ -0,0 +1,590 @@ +//! Reconciler for [`StackableScaler`](super::StackableScaler) resources. +//! +//! The public entry point is [`reconcile_scaler`]. Operators call this on every reconcile +//! for a role group, and it drives the [`ScalerStage`](super::ScalerStage) state machine, +//! invokes [`ScalingHooks`] at the appropriate stages, and patches the scaler's status. + +use std::time::Duration; + +use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; +use k8s_openapi::jiff::Timestamp; +use kube::runtime::controller::Action; +use snafu::{OptionExt, ResultExt, Snafu}; +use tracing::{debug, info, warn}; + +use crate::client::Client; +use crate::crd::scaler::hooks::{ + HookOutcome, ScalingCondition, ScalingContext, ScalingDirection, ScalingHooks, ScalingResult, +}; +use crate::crd::scaler::{ + FailedStage, ScalerStage, ScalerState, StackableScaler, StackableScalerStatus, +}; + +/// Requeue interval when a hook returns [`HookOutcome::InProgress`]. +const REQUEUE_HOOK_IN_PROGRESS: Duration = Duration::from_secs(10); +/// Requeue interval while waiting for the StatefulSet to converge. +const REQUEUE_SCALING: Duration = Duration::from_secs(5); + +/// Errors returned by [`reconcile_scaler`]. +#[derive(Debug, Snafu)] +pub enum Error { + /// The Kubernetes status patch for the [`StackableScaler`] failed. + #[snafu(display("failed to patch StackableScaler status"))] + PatchStatus { + #[snafu(source(from(crate::client::Error, Box::new)))] + source: Box, + }, + /// The [`StackableScaler`] is missing `.metadata.namespace`. + #[snafu(display("StackableScaler object is missing namespace"))] + ObjectHasNoNamespace, +} + +/// Compute the next state machine step from the current stage and hook/stability inputs. +/// +/// Hook outcomes are passed as closures so this function stays synchronous and +/// unit-testable without async infrastructure. +/// +/// # Parameters +/// +/// - `current`: The current [`ScalerStage`]. +/// - `current_replicas`: The replica count in `status.replicas`. +/// - `desired_replicas`: The target from `spec.replicas`. +/// - `pre_outcome`: Result of the `PreScaling` hook. Only called when in `PreScaling`. +/// - `post_outcome`: Result of the `PostScaling` hook. Only called when in `PostScaling`. +/// - `statefulset_stable`: Whether the StatefulSet has converged. Only relevant in `Scaling`. +fn next_stage( + current: &ScalerStage, + current_replicas: i32, + desired_replicas: i32, + pre_outcome: impl FnOnce() -> HookOutcome, + post_outcome: impl FnOnce() -> HookOutcome, + statefulset_stable: bool, +) -> NextStage { + match current { + ScalerStage::Idle => { + if current_replicas != desired_replicas { + NextStage::Transition(ScalerStage::PreScaling) + } else { + NextStage::NoChange + } + } + ScalerStage::PreScaling => match pre_outcome() { + HookOutcome::Done => NextStage::Transition(ScalerStage::Scaling), + HookOutcome::InProgress => NextStage::Requeue, + }, + ScalerStage::Scaling => { + if statefulset_stable { + NextStage::Transition(ScalerStage::PostScaling) + } else { + NextStage::Requeue + } + } + ScalerStage::PostScaling => match post_outcome() { + HookOutcome::Done => NextStage::Transition(ScalerStage::Idle), + HookOutcome::InProgress => NextStage::Requeue, + }, + ScalerStage::Failed { .. } => NextStage::NoChange, + } +} + +/// The outcome of [`next_stage`]: what the reconciler should do. +#[derive(Debug, Eq, PartialEq)] +enum NextStage { + /// Nothing to do; wait for an external watch event. + NoChange, + /// Current stage is not yet complete; requeue after a short interval. + Requeue, + /// Move to the given stage and patch the scaler status. + Transition(ScalerStage), +} + +/// Reconcile a [`StackableScaler`], advancing its state machine and invoking hooks. +/// +/// Call this from your operator's reconcile function for every role group that has a +/// corresponding [`StackableScaler`]. The returned [`ScalingCondition`] MUST be applied +/// to the cluster CR's `status.conditions`. +/// +/// # Parameters +/// +/// - `scaler`: The [`StackableScaler`] resource. Must have `.metadata.namespace` set. +/// - `hooks`: The operator's [`ScalingHooks`] implementation. +/// - `client`: Kubernetes client for status patches and hook API calls. +/// - `statefulset_stable`: `true` when the managed StatefulSet has reached its target +/// replica count and all pods are ready. +/// - `selector`: Pod label selector string for this role group (e.g. +/// `"app=myproduct,roleGroup=default"`). Written into `status.selector` for HPA +/// pod counting. Must be stable across reconcile calls. +/// +/// # Errors +/// +/// Returns [`Error::PatchStatus`] if the status patch fails, or +/// [`Error::ObjectHasNoNamespace`] if the scaler has no namespace. +pub async fn reconcile_scaler( + scaler: &StackableScaler, + hooks: &H, + client: &Client, + statefulset_stable: bool, + selector: &str, +) -> Result +where + H: ScalingHooks, +{ + let default_status = StackableScalerStatus::default(); + let status = scaler.status.as_ref().unwrap_or(&default_status); + let current_stage = status + .current_state + .as_ref() + .map(|s| s.stage.clone()) + .unwrap_or(ScalerStage::Idle); + let current_replicas = status.replicas; + let desired_replicas = scaler.spec.replicas; + let namespace = scaler + .metadata + .namespace + .as_deref() + .context(ObjectHasNoNamespaceSnafu)?; + + debug!( + scaler = scaler.metadata.name.as_deref().unwrap_or(""), + %current_stage, + current_replicas, + desired_replicas, + statefulset_stable, + "Reconciling StackableScaler" + ); + + // When a scaling operation is in progress, use the frozen previous_replicas + // to derive direction. status.replicas is overwritten during the Scaling stage + // and would always yield Up for the remainder of the operation. + let direction_base = status.previous_replicas.unwrap_or(current_replicas); + let ctx = ScalingContext { + client, + namespace, + role_group_name: &scaler.spec.role_group, + current_replicas: direction_base, + desired_replicas, + direction: ScalingDirection::from_replicas(direction_base, desired_replicas), + }; + + // Run the hook for the current stage if applicable, catching errors for Failed transition + let pre_result = if matches!(current_stage, ScalerStage::PreScaling) { + Some(hooks.pre_scale(&ctx).await) + } else { + None + }; + + let post_result = if matches!(current_stage, ScalerStage::PostScaling) { + Some(hooks.post_scale(&ctx).await) + } else { + None + }; + + // Handle hook errors → Failed transition + if let Some(Err(e)) = &pre_result { + return handle_hook_failure( + e, + FailedStage::PreScaling, + hooks, + &ctx, + scaler, + selector, + status, + ) + .await; + } + + if let Some(Err(e)) = &post_result { + return handle_hook_failure( + e, + FailedStage::PostScaling, + hooks, + &ctx, + scaler, + selector, + status, + ) + .await; + } + + let pre_outcome = pre_result.and_then(|r| r.ok()).unwrap_or(HookOutcome::Done); + let post_outcome = post_result + .and_then(|r| r.ok()) + .unwrap_or(HookOutcome::Done); + + let next = next_stage( + ¤t_stage, + current_replicas, + desired_replicas, + || pre_outcome.clone(), + || post_outcome.clone(), + statefulset_stable, + ); + + match next { + NextStage::NoChange => { + debug!( + scaler = scaler.metadata.name.as_deref().unwrap_or(""), + %current_stage, + "No stage change needed, awaiting external changes" + ); + Ok(ScalingResult { + action: Action::await_change(), + scaling_condition: ScalingCondition::Healthy, + }) + } + NextStage::Requeue => { + let interval = if matches!(current_stage, ScalerStage::Scaling) { + REQUEUE_SCALING + } else { + REQUEUE_HOOK_IN_PROGRESS + }; + debug!( + scaler = scaler.metadata.name.as_deref().unwrap_or(""), + %current_stage, + requeue_after_secs = interval.as_secs(), + "Requeuing, waiting for progress in current stage" + ); + Ok(ScalingResult { + action: Action::requeue(interval), + scaling_condition: ScalingCondition::Progressing { + stage: format!("{current_stage}"), + }, + }) + } + NextStage::Transition(new_stage) => { + info!( + scaler = scaler.metadata.name.as_deref().unwrap_or(""), + %current_stage, + %new_stage, + current_replicas, + desired_replicas, + "StackableScaler transitioning stage" + ); + // When transitioning to Scaling, update status.replicas to desired + // (this is what gets propagated to the StatefulSet) + // When completing PostScaling → Idle, clear desiredReplicas + let new_replicas = if matches!(new_stage, ScalerStage::Scaling) { + desired_replicas + } else { + current_replicas + }; + let new_desired = match new_stage { + ScalerStage::Idle => None, + ScalerStage::PreScaling => Some(desired_replicas), + _ => status.desired_replicas, + }; + let new_previous = match new_stage { + ScalerStage::PreScaling => Some(current_replicas), + ScalerStage::Idle => None, + _ => status.previous_replicas, + }; + let condition = match &new_stage { + ScalerStage::Idle => ScalingCondition::Healthy, + s => ScalingCondition::Progressing { + stage: format!("{s}"), + }, + }; + let new_status = + make_status(selector, new_stage, new_replicas, new_desired, new_previous); + patch_status(client, scaler, new_status) + .await + .context(PatchStatusSnafu)?; + Ok(ScalingResult { + action: Action::requeue(REQUEUE_SCALING), + scaling_condition: condition, + }) + } + } +} + +/// Transition the scaler to the `Failed` state after a hook error. +/// +/// Patches the scaler status to `Failed` first, then calls [`ScalingHooks::on_failure`] +/// for best-effort cleanup. Writing the status before the cleanup hook guarantees that +/// a re-entrant reconcile sees `Failed` and will not invoke `on_failure` a second time. +/// +/// If the cleanup hook itself fails, the status reason is updated to include the +/// cleanup error so it is visible via `kubectl describe` and the cluster CR condition. +/// +/// # Parameters +/// +/// - `error`: The hook error that caused the failure. +/// - `failed_stage`: Which stage (`PreScaling` or `PostScaling`) produced the error. +/// - `hooks`: The operator's [`ScalingHooks`] implementation, used to call `on_failure`. +/// - `ctx`: The [`ScalingContext`] for the current reconcile, forwarded to `on_failure`. +/// Also provides the Kubernetes client for patching the scaler status. +/// - `scaler`: The [`StackableScaler`] resource being reconciled. +/// - `selector`: Pod label selector string written into `status.selector`. +/// - `status`: The current scaler status, preserved in the `Failed` status so manual +/// recovery knows the original replica counts. +async fn handle_hook_failure( + error: &H::Error, + failed_stage: FailedStage, + hooks: &H, + ctx: &ScalingContext<'_>, + scaler: &StackableScaler, + selector: &str, + status: &StackableScalerStatus, +) -> Result { + let scaler_name = scaler.metadata.name.as_deref().unwrap_or(""); + let hook_reason = error.to_string(); + warn!( + scaler = scaler_name, + failed_at = ?failed_stage, + error = %hook_reason, + "StackableScaler hook failed, entering Failed state" + ); + + // Write Failed status BEFORE calling on_failure so that a subsequent reconcile + // sees the Failed stage and won't re-invoke on_failure. + let new_status = make_status( + selector, + ScalerStage::Failed { + failed_at: failed_stage.clone(), + reason: hook_reason.clone(), + }, + status.replicas, + status.desired_replicas, + status.previous_replicas, + ); + patch_status(ctx.client, scaler, new_status) + .await + .context(PatchStatusSnafu)?; + + // Run cleanup hook. If it fails, update the status reason so the failure is + // visible via `kubectl describe` / the cluster CR condition. + let final_reason = if let Err(on_failure_err) = hooks.on_failure(ctx, &failed_stage).await { + let reason_with_cleanup = format!( + "{hook_reason} (cleanup also failed: {on_failure_err})" + ); + warn!( + scaler = scaler_name, + error = %on_failure_err, + failed_at = ?failed_stage, + "on_failure hook returned an error, updating status" + ); + let updated_status = make_status( + selector, + ScalerStage::Failed { + failed_at: failed_stage.clone(), + reason: reason_with_cleanup.clone(), + }, + status.replicas, + status.desired_replicas, + status.previous_replicas, + ); + // Best-effort update — if this patch also fails, the original Failed reason + // is already persisted and the warn log captures the cleanup error. + if let Err(patch_err) = patch_status(ctx.client, scaler, updated_status).await { + warn!( + scaler = scaler_name, + error = %patch_err, + "Failed to update status with cleanup error" + ); + } + reason_with_cleanup + } else { + hook_reason + }; + + Ok(ScalingResult { + action: Action::await_change(), + scaling_condition: ScalingCondition::Failed { + stage: failed_stage, + reason: final_reason, + }, + }) +} + +/// Construct a [`StackableScalerStatus`] with the given values and `last_transition_time` of now. +/// +/// # Parameters +/// +/// - `selector`: Pod label selector string for HPA pod counting. +/// - `stage`: The new [`ScalerStage`] to record in the status. +/// - `replicas`: The replica count to write into `status.replicas`. +/// - `desired_replicas`: The in-flight target, or `None` when returning to `Idle`. +/// - `previous_replicas`: The replica count before the scaling operation started, +/// or `None` when returning to `Idle`. +fn make_status( + selector: &str, + stage: ScalerStage, + replicas: i32, + desired_replicas: Option, + previous_replicas: Option, +) -> StackableScalerStatus { + StackableScalerStatus { + replicas, + selector: Some(selector.to_string()), + desired_replicas, + previous_replicas, + current_state: Some(ScalerState { + stage, + last_transition_time: Time(Timestamp::now()), + }), + } +} + +/// Apply a server-side status patch to the [`StackableScaler`]. +/// +/// # Parameters +/// +/// - `client`: Kubernetes client for the status patch operation. +/// - `scaler`: The [`StackableScaler`] resource whose status to update. +/// - `status`: The new [`StackableScalerStatus`] to apply. +async fn patch_status( + client: &Client, + scaler: &StackableScaler, + status: StackableScalerStatus, +) -> Result<(), crate::client::Error> { + client + .apply_patch_status("stackable-operator", scaler, &status) + .await + .map(|_| ()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::crd::scaler::hooks::HookOutcome; + use crate::crd::scaler::{FailedStage, ScalerStage}; + + #[test] + fn idle_transitions_to_prescaling_when_replicas_differ() { + assert_eq!( + next_stage( + &ScalerStage::Idle, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + false + ), + NextStage::Transition(ScalerStage::PreScaling) + ); + } + + #[test] + fn idle_stays_idle_when_replicas_match() { + assert_eq!( + next_stage( + &ScalerStage::Idle, + 3, + 3, + || HookOutcome::Done, + || HookOutcome::Done, + false + ), + NextStage::NoChange + ); + } + + #[test] + fn prescaling_advances_when_hook_done() { + assert_eq!( + next_stage( + &ScalerStage::PreScaling, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + false + ), + NextStage::Transition(ScalerStage::Scaling) + ); + } + + #[test] + fn prescaling_requeues_when_hook_in_progress() { + assert_eq!( + next_stage( + &ScalerStage::PreScaling, + 3, + 5, + || HookOutcome::InProgress, + || HookOutcome::Done, + false + ), + NextStage::Requeue + ); + } + + #[test] + fn scaling_advances_when_statefulset_stable() { + assert_eq!( + next_stage( + &ScalerStage::Scaling, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + true + ), + NextStage::Transition(ScalerStage::PostScaling) + ); + } + + #[test] + fn scaling_requeues_when_not_stable() { + assert_eq!( + next_stage( + &ScalerStage::Scaling, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + false + ), + NextStage::Requeue + ); + } + + #[test] + fn postscaling_returns_to_idle_when_hook_done() { + assert_eq!( + next_stage( + &ScalerStage::PostScaling, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + true + ), + NextStage::Transition(ScalerStage::Idle) + ); + } + + #[test] + fn postscaling_requeues_when_hook_in_progress() { + assert_eq!( + next_stage( + &ScalerStage::PostScaling, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::InProgress, + true + ), + NextStage::Requeue + ); + } + + #[test] + fn failed_stays_failed() { + let failed = ScalerStage::Failed { + failed_at: FailedStage::PreScaling, + reason: "err".to_string(), + }; + assert_eq!( + next_stage( + &failed, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + false + ), + NextStage::NoChange + ); + } +} From f0fa70bcb76b6de91160aa24ac583880cc2552db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Wed, 11 Mar 2026 10:35:35 +0100 Subject: [PATCH 2/9] Add ScalerStage::is_scaling_in_progress() to centralize active-stage knowledge The admission webhook in commons-operator was pattern-matching on ScalerStage variants to determine whether a scaling operation blocks HPA writes. This duplicated the "which stages are active" logic, creating a maintenance risk: adding a new stage to the state machine would require updating the webhook's match arm in a separate crate. Move this knowledge into a single method on ScalerStage so both the reconciler and the webhook can query it without enumerating variants. Co-Authored-By: Claude Opus 4.6 --- .../stackable-operator/src/crd/scaler/mod.rs | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/crates/stackable-operator/src/crd/scaler/mod.rs b/crates/stackable-operator/src/crd/scaler/mod.rs index 5a33d6dd0..8714693e5 100644 --- a/crates/stackable-operator/src/crd/scaler/mod.rs +++ b/crates/stackable-operator/src/crd/scaler/mod.rs @@ -73,6 +73,17 @@ pub enum ScalerStage { }, } +impl ScalerStage { + /// Returns `true` when a scaling operation is actively running + /// (`PreScaling`, `Scaling`, or `PostScaling`). + /// + /// `Idle` and `Failed` are not considered active — the HPA is + /// free to write `spec.replicas` in those states. + pub fn is_scaling_in_progress(&self) -> bool { + matches!(self, Self::PreScaling | Self::Scaling | Self::PostScaling) + } +} + /// Formats the stage name for logging and status messages. /// /// The `Failed` variant only includes the failed stage, not the full reason string, @@ -213,6 +224,25 @@ pub use reconciler::{Error as ReconcilerError, reconcile_scaler}; mod tests { use super::*; + #[test] + fn is_scaling_in_progress_true_for_active_stages() { + assert!(ScalerStage::PreScaling.is_scaling_in_progress()); + assert!(ScalerStage::Scaling.is_scaling_in_progress()); + assert!(ScalerStage::PostScaling.is_scaling_in_progress()); + } + + #[test] + fn is_scaling_in_progress_false_for_idle_and_failed() { + assert!(!ScalerStage::Idle.is_scaling_in_progress()); + assert!( + !ScalerStage::Failed { + failed_at: FailedStage::PreScaling, + reason: "err".to_string(), + } + .is_scaling_in_progress() + ); + } + #[test] fn scaler_stage_idle_serializes() { let stage = ScalerStage::Idle; From bafaeae3467ff3ef5eb20e5057a875de17184cd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Wed, 11 Mar 2026 11:05:09 +0100 Subject: [PATCH 3/9] Implement retry annotation to recover StackableScaler from Failed state The design doc (ADR Decision 9) specified that Failed is a terminal trap state with annotation-based recovery, but the implementation was missing. When the operator sees `autoscaling.stackable.tech/retry: "true"` on a StackableScaler in the Failed stage, it now: 1. Strips the annotation via merge patch 2. Resets status.currentState.stage to Idle 3. Clears desired_replicas and previous_replicas 4. Requeues so the next reconcile can start a fresh scaling attempt Usage: kubectl annotate stackablescaler autoscaling.stackable.tech/retry=true Co-Authored-By: Claude Opus 4.6 --- .../stackable-operator/src/crd/scaler/mod.rs | 13 +++- .../src/crd/scaler/reconciler.rs | 69 +++++++++++++++++-- 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/crates/stackable-operator/src/crd/scaler/mod.rs b/crates/stackable-operator/src/crd/scaler/mod.rs index 8714693e5..fd8192e69 100644 --- a/crates/stackable-operator/src/crd/scaler/mod.rs +++ b/crates/stackable-operator/src/crd/scaler/mod.rs @@ -28,6 +28,16 @@ pub mod hooks; pub mod job_tracker; pub mod reconciler; +/// Annotation key that triggers recovery from the [`ScalerStage::Failed`] state. +/// +/// When the operator sees this annotation with value `"true"` on a [`StackableScaler`], +/// it strips the annotation and resets the stage to [`ScalerStage::Idle`]. +/// +/// ```bash +/// kubectl annotate stackablescaler autoscaling.stackable.tech/retry=true +/// ``` +pub const RETRY_ANNOTATION: &str = "autoscaling.stackable.tech/retry"; + /// A type-erased cluster reference used in StackableScaler. /// Does not carry apiVersion — CRD versioning handles conversions. #[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] @@ -63,7 +73,8 @@ pub enum ScalerStage { Scaling, /// Running the [`ScalingHooks::post_scale`] hook (e.g. cluster rebalance). PostScaling, - /// A hook returned an error. The scaler stays here until manually reset. + /// A hook returned an error. The scaler stays here until the user applies the + /// [`RETRY_ANNOTATION`] to trigger a reset to [`Idle`](Self::Idle). Failed { /// Which stage produced the error. #[serde(rename = "failedAt")] diff --git a/crates/stackable-operator/src/crd/scaler/reconciler.rs b/crates/stackable-operator/src/crd/scaler/reconciler.rs index b704f53a1..2307f36ea 100644 --- a/crates/stackable-operator/src/crd/scaler/reconciler.rs +++ b/crates/stackable-operator/src/crd/scaler/reconciler.rs @@ -17,7 +17,8 @@ use crate::crd::scaler::hooks::{ HookOutcome, ScalingCondition, ScalingContext, ScalingDirection, ScalingHooks, ScalingResult, }; use crate::crd::scaler::{ - FailedStage, ScalerStage, ScalerState, StackableScaler, StackableScalerStatus, + FailedStage, RETRY_ANNOTATION, ScalerStage, ScalerState, StackableScaler, + StackableScalerStatus, }; /// Requeue interval when a hook returns [`HookOutcome::InProgress`]. @@ -34,6 +35,12 @@ pub enum Error { #[snafu(source(from(crate::client::Error, Box::new)))] source: Box, }, + /// Removing the retry annotation from the [`StackableScaler`] failed. + #[snafu(display("failed to remove retry annotation from StackableScaler"))] + RemoveRetryAnnotation { + #[snafu(source(from(crate::client::Error, Box::new)))] + source: Box, + }, /// The [`StackableScaler`] is missing `.metadata.namespace`. #[snafu(display("StackableScaler object is missing namespace"))] ObjectHasNoNamespace, @@ -144,8 +151,10 @@ where .as_deref() .context(ObjectHasNoNamespaceSnafu)?; + let scaler_name = scaler.metadata.name.as_deref().unwrap_or(""); + debug!( - scaler = scaler.metadata.name.as_deref().unwrap_or(""), + scaler = scaler_name, %current_stage, current_replicas, desired_replicas, @@ -153,6 +162,56 @@ where "Reconciling StackableScaler" ); + // Recovery from Failed: if the user has set the retry annotation, strip it + // and reset to Idle so the next reconcile starts a fresh scaling attempt. + if matches!(current_stage, ScalerStage::Failed { .. }) { + let has_retry = scaler + .metadata + .annotations + .as_ref() + .and_then(|a| a.get(RETRY_ANNOTATION)) + .is_some_and(|v| v == "true"); + + if has_retry { + info!( + scaler = scaler_name, + "Retry annotation found on Failed scaler, resetting to Idle" + ); + + // Strip the annotation via merge patch (setting to null removes it) + client + .merge_patch( + scaler, + serde_json::json!({ + "metadata": { + "annotations": { + RETRY_ANNOTATION: null + } + } + }), + ) + .await + .context(RemoveRetryAnnotationSnafu)?; + + // Reset status to Idle + let idle_status = make_status( + selector, + ScalerStage::Idle, + current_replicas, + None, + None, + ); + patch_status(client, scaler, idle_status) + .await + .context(PatchStatusSnafu)?; + + return Ok(ScalingResult { + action: Action::requeue(REQUEUE_SCALING), + scaling_condition: ScalingCondition::Healthy, + }); + } + } + // When a scaling operation is in progress, use the frozen previous_replicas // to derive direction. status.replicas is overwritten during the Scaling stage // and would always yield Up for the remainder of the operation. @@ -223,7 +282,7 @@ where match next { NextStage::NoChange => { debug!( - scaler = scaler.metadata.name.as_deref().unwrap_or(""), + scaler = scaler_name, %current_stage, "No stage change needed, awaiting external changes" ); @@ -239,7 +298,7 @@ where REQUEUE_HOOK_IN_PROGRESS }; debug!( - scaler = scaler.metadata.name.as_deref().unwrap_or(""), + scaler = scaler_name, %current_stage, requeue_after_secs = interval.as_secs(), "Requeuing, waiting for progress in current stage" @@ -253,7 +312,7 @@ where } NextStage::Transition(new_stage) => { info!( - scaler = scaler.metadata.name.as_deref().unwrap_or(""), + scaler = scaler_name, %current_stage, %new_stage, current_replicas, From c321d733bd7fe23608d3226c2ce9fffcf7ce3c38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Thu, 12 Mar 2026 15:24:06 +0100 Subject: [PATCH 4/9] Added scaling functionality for trino-operator --- .../stackable-operator/src/crd/scaler/mod.rs | 102 ++++++++++-------- .../src/crd/scaler/reconciler.rs | 17 +-- .../src/attrs/container.rs | 21 +++- .../src/codegen/container/struct/mod.rs | 16 ++- 4 files changed, 99 insertions(+), 57 deletions(-) diff --git a/crates/stackable-operator/src/crd/scaler/mod.rs b/crates/stackable-operator/src/crd/scaler/mod.rs index fd8192e69..2c9db580e 100644 --- a/crates/stackable-operator/src/crd/scaler/mod.rs +++ b/crates/stackable-operator/src/crd/scaler/mod.rs @@ -24,6 +24,8 @@ use kube::CustomResource; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::versioned::versioned; + pub mod hooks; pub mod job_tracker; pub mod reconciler; @@ -38,17 +40,6 @@ pub mod reconciler; /// ``` pub const RETRY_ANNOTATION: &str = "autoscaling.stackable.tech/retry"; -/// A type-erased cluster reference used in StackableScaler. -/// Does not carry apiVersion — CRD versioning handles conversions. -#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct UnknownClusterRef { - /// The Kubernetes kind of the target cluster resource (e.g. "NifiCluster"). - pub kind: String, - /// The name of the target cluster resource within the same namespace. - pub name: String, -} - /// Which stage of a scaling operation failed. #[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] @@ -172,31 +163,56 @@ pub struct StackableScalerStatus { pub current_state: Option, } -/// A StackableScaler exposes a /scale subresource so that a Kubernetes -/// HorizontalPodAutoscaler can target it instead of a StatefulSet directly. -#[derive(Clone, CustomResource, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[kube( - group = "autoscaling.stackable.tech", - version = "v1alpha1", - kind = "StackableScaler", - namespaced, - status = "StackableScalerStatus", - scale = r#"{"specReplicasPath":".spec.replicas","statusReplicasPath":".status.replicas","labelSelectorPath":".status.selector"}"# -)] +/// A type-erased cluster reference used in StackableScaler. +/// Does not carry apiVersion — CRD versioning handles conversions. +#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] -pub struct StackableScalerSpec { - /// Desired replica count. Written by the HPA via the /scale subresource. - /// Only takes effect when the referenced roleGroup has `replicas: 0`. - pub replicas: i32, - /// Reference to the Stackable cluster resource this scaler manages. - pub cluster_ref: UnknownClusterRef, - /// The role within the cluster (e.g. `nodes`). - pub role: String, - /// The role group within the role (e.g. `default`). - pub role_group: String, +pub struct UnknownClusterRef { + /// The Kubernetes kind of the target cluster resource (e.g. "NifiCluster"). + pub kind: String, + /// The name of the target cluster resource within the same namespace. + pub name: String, +} + +#[versioned( + version(name = "v1alpha1"), + crates( + kube_core = "kube::core", + k8s_openapi = "k8s_openapi", + schemars = "schemars", + ) +)] +pub mod versioned { + /// A StackableScaler exposes a /scale subresource so that a Kubernetes + /// HorizontalPodAutoscaler can target it instead of a StatefulSet directly. + #[versioned(crd( + group = "autoscaling.stackable.tech", + kind = "StackableScaler", + namespaced, + status = "StackableScalerStatus", + scale( + spec_replicas_path = ".spec.replicas", + status_replicas_path = ".status.replicas", + label_selector_path = ".status.selector" + ) + ))] + #[derive(Clone, CustomResource, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct StackableScalerSpec { + /// Desired replica count. Written by the HPA via the /scale subresource. + /// Only takes effect when the referenced roleGroup has `replicas: 0`. + pub replicas: i32, + /// Reference to the Stackable cluster resource this scaler manages. + pub cluster_ref: UnknownClusterRef, + /// The role within the cluster (e.g. `nodes`). + pub role: String, + /// The role group within the role (e.g. `default`). + pub role_group: String, + } } -/// Resolve the replica count for a StatefulSet, taking an optional [`StackableScaler`] into account. +/// Resolve the replica count for a StatefulSet, taking an optional +/// [`v1alpha1::StackableScaler`] into account. /// /// A scaler is only effective when `role_group_replicas` is `Some(0)` — this is the platform /// convention that signals "externally managed replicas". In all other cases the role group @@ -209,7 +225,7 @@ pub struct StackableScalerSpec { /// /// - `role_group_replicas`: The replica count from the role group config. `Some(0)` signals /// externally-managed replicas (the scaler's value is used). Any other value is returned unchanged. -/// - `scaler`: The [`StackableScaler`] for this role group, if one exists. Only consulted +/// - `scaler`: The [`v1alpha1::StackableScaler`] for this role group, if one exists. Only consulted /// when `role_group_replicas` is `Some(0)`. /// /// # Returns @@ -217,7 +233,7 @@ pub struct StackableScalerSpec { /// The effective replica count, or `None` if the scaler has no status yet. pub fn resolve_replicas( role_group_replicas: Option, - scaler: Option<&StackableScaler>, + scaler: Option<&v1alpha1::StackableScaler>, ) -> Option { match (role_group_replicas, scaler) { (Some(0), Some(s)) => s.status.as_ref().map(|st| st.replicas), @@ -275,7 +291,7 @@ mod tests { #[test] fn spec_round_trips() { - let spec = StackableScalerSpec { + let spec = v1alpha1::StackableScalerSpec { replicas: 3, cluster_ref: UnknownClusterRef { kind: "NifiCluster".to_string(), @@ -285,7 +301,7 @@ mod tests { role_group: "default".to_string(), }; let json = serde_json::to_string(&spec).unwrap(); - let back: StackableScalerSpec = serde_json::from_str(&json).unwrap(); + let back: v1alpha1::StackableScalerSpec = serde_json::from_str(&json).unwrap(); assert_eq!(spec, back); } @@ -301,9 +317,9 @@ mod tests { #[test] fn resolve_replicas_zero_with_scaler_uses_status() { - let mut scaler = StackableScaler::new( + let mut scaler = v1alpha1::StackableScaler::new( "test", - StackableScalerSpec { + v1alpha1::StackableScalerSpec { replicas: 5, cluster_ref: UnknownClusterRef { kind: "NifiCluster".into(), @@ -324,9 +340,9 @@ mod tests { fn resolve_replicas_nonzero_with_scaler_ignores_scaler() { // role_group.replicas != 0 → scaler is not active (validation webhook should prevent this, // but we defensively fall back to the role group value) - let mut scaler = StackableScaler::new( + let mut scaler = v1alpha1::StackableScaler::new( "test", - StackableScalerSpec { + v1alpha1::StackableScalerSpec { replicas: 5, cluster_ref: UnknownClusterRef { kind: "NifiCluster".into(), @@ -346,9 +362,9 @@ mod tests { #[test] fn resolve_replicas_zero_scaler_no_status_returns_none() { // Scaler exists but has no status yet (just created) → return None (don't set replicas) - let scaler = StackableScaler::new( + let scaler = v1alpha1::StackableScaler::new( "test", - StackableScalerSpec { + v1alpha1::StackableScalerSpec { replicas: 5, cluster_ref: UnknownClusterRef { kind: "NifiCluster".into(), diff --git a/crates/stackable-operator/src/crd/scaler/reconciler.rs b/crates/stackable-operator/src/crd/scaler/reconciler.rs index 2307f36ea..8e1b2b949 100644 --- a/crates/stackable-operator/src/crd/scaler/reconciler.rs +++ b/crates/stackable-operator/src/crd/scaler/reconciler.rs @@ -17,8 +17,8 @@ use crate::crd::scaler::hooks::{ HookOutcome, ScalingCondition, ScalingContext, ScalingDirection, ScalingHooks, ScalingResult, }; use crate::crd::scaler::{ - FailedStage, RETRY_ANNOTATION, ScalerStage, ScalerState, StackableScaler, - StackableScalerStatus, + FailedStage, RETRY_ANNOTATION, ScalerStage, ScalerState, StackableScalerStatus, + v1alpha1::StackableScaler, }; /// Requeue interval when a hook returns [`HookOutcome::InProgress`]. @@ -194,13 +194,8 @@ where .context(RemoveRetryAnnotationSnafu)?; // Reset status to Idle - let idle_status = make_status( - selector, - ScalerStage::Idle, - current_replicas, - None, - None, - ); + let idle_status = + make_status(selector, ScalerStage::Idle, current_replicas, None, None); patch_status(client, scaler, idle_status) .await .context(PatchStatusSnafu)?; @@ -413,9 +408,7 @@ async fn handle_hook_failure( // Run cleanup hook. If it fails, update the status reason so the failure is // visible via `kubectl describe` / the cluster CR condition. let final_reason = if let Err(on_failure_err) = hooks.on_failure(ctx, &failed_stage).await { - let reason_with_cleanup = format!( - "{hook_reason} (cleanup also failed: {on_failure_err})" - ); + let reason_with_cleanup = format!("{hook_reason} (cleanup also failed: {on_failure_err})"); warn!( scaler = scaler_name, error = %on_failure_err, diff --git a/crates/stackable-versioned-macros/src/attrs/container.rs b/crates/stackable-versioned-macros/src/attrs/container.rs index d902dd1cd..284282c59 100644 --- a/crates/stackable-versioned-macros/src/attrs/container.rs +++ b/crates/stackable-versioned-macros/src/attrs/container.rs @@ -33,6 +33,24 @@ pub struct ContainerSkipArguments { pub try_convert: Flag, } +/// Scale subresource configuration for a CRD. +/// +/// Mirrors the fields of `k8s_openapi::CustomResourceSubresourceScale`. Passed through +/// to the `#[kube(scale(...))]` attribute. +/// +/// See the [Kubernetes documentation](https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#scale-subresource) +/// for details on the scale subresource. +#[derive(Clone, Debug, FromMeta)] +pub struct Scale { + /// JSON path to the replica count in the custom resource's spec (e.g. `.spec.replicas`). + pub spec_replicas_path: String, + /// JSON path to the replica count in the custom resource's status (e.g. `.status.replicas`). + pub status_replicas_path: String, + /// JSON path to the label selector in the custom resource's status (e.g. `.status.selector`). + #[darling(default)] + pub label_selector_path: Option, +} + /// This struct contains supported CRD arguments. /// /// The arguments are passed through to the `#[kube]` attribute. More details can be found in the @@ -50,6 +68,7 @@ pub struct ContainerSkipArguments { /// cluster scoped resource. /// - `crates`: Override specific crates. /// - `status`: Set the specified struct as the status subresource. +/// - `scale`: Configure the scale subresource for HPA integration. /// - `shortname`: Set a shortname for the CR object. This can be specified multiple /// times. /// - `skip`: Controls skipping parts of the generation. @@ -64,7 +83,7 @@ pub struct StructCrdArguments { pub status: Option, // derive // schema - // scale + pub scale: Option, // printcolumn #[darling(multiple, rename = "shortname")] pub shortnames: Vec, diff --git a/crates/stackable-versioned-macros/src/codegen/container/struct/mod.rs b/crates/stackable-versioned-macros/src/codegen/container/struct/mod.rs index 021f523fa..c022c8289 100644 --- a/crates/stackable-versioned-macros/src/codegen/container/struct/mod.rs +++ b/crates/stackable-versioned-macros/src/codegen/container/struct/mod.rs @@ -279,6 +279,20 @@ impl Struct { .map(|s| quote! { , shortname = #s }) .collect(); + let scale = spec_gen_ctx.kubernetes_arguments.scale.as_ref().map(|s| { + let spec_replicas_path = &s.spec_replicas_path; + let status_replicas_path = &s.status_replicas_path; + let label_selector_path = s + .label_selector_path + .as_ref() + .map(|p| quote! { , label_selector_path = #p }); + quote! { , scale( + spec_replicas_path = #spec_replicas_path, + status_replicas_path = #status_replicas_path + #label_selector_path + )} + }); + Some(quote! { // The end-developer needs to derive CustomResource and JsonSchema. // This is because we don't know if they want to use a re-exported or renamed import. @@ -286,7 +300,7 @@ impl Struct { // These must be comma separated (except the last) as they always exist: group = #group, version = #version, kind = #kind // These fields are optional, and therefore the token stream must prefix each with a comma: - #singular #plural #namespaced #crates #status #shortnames + #singular #plural #namespaced #crates #status #shortnames #scale )] }) } From 4c87c48f149059b5c87552185550521e61b835d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Thu, 19 Mar 2026 16:44:57 +0100 Subject: [PATCH 5/9] feat(scaler): add ReplicasConfig enum for role group replica management Introduces ReplicasConfig with Fixed, Hpa, Auto, and ExternallyScaled variants to replace the simple `replicas: Option` on role groups. Includes custom Deserialize impl (bare integer, string, tagged object), validation via snafu, JsonSchema, and comprehensive tests. Co-Authored-By: Claude Opus 4.6 --- .../stackable-operator/src/crd/scaler/mod.rs | 4 + .../src/crd/scaler/replicas_config.rs | 325 ++++++++++++++++++ 2 files changed, 329 insertions(+) create mode 100644 crates/stackable-operator/src/crd/scaler/replicas_config.rs diff --git a/crates/stackable-operator/src/crd/scaler/mod.rs b/crates/stackable-operator/src/crd/scaler/mod.rs index 2c9db580e..dfc262a41 100644 --- a/crates/stackable-operator/src/crd/scaler/mod.rs +++ b/crates/stackable-operator/src/crd/scaler/mod.rs @@ -29,6 +29,7 @@ use crate::versioned::versioned; pub mod hooks; pub mod job_tracker; pub mod reconciler; +mod replicas_config; /// Annotation key that triggers recovery from the [`ScalerStage::Failed`] state. /// @@ -246,6 +247,9 @@ pub use hooks::{ }; pub use job_tracker::{JobTracker, JobTrackerError, job_name}; pub use reconciler::{Error as ReconcilerError, reconcile_scaler}; +pub use replicas_config::{ + AutoConfig, HpaConfig, ReplicasConfig, ValidationError as ReplicasValidationError, +}; #[cfg(test)] mod tests { diff --git a/crates/stackable-operator/src/crd/scaler/replicas_config.rs b/crates/stackable-operator/src/crd/scaler/replicas_config.rs new file mode 100644 index 000000000..81cf57eb0 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/replicas_config.rs @@ -0,0 +1,325 @@ +//! Replica configuration for Stackable role groups. +//! +//! [`ReplicasConfig`] replaces the simple `replicas: Option` field on role groups, +//! allowing operators to express fixed counts, HPA-managed scaling, Stackable auto-scaling, +//! or externally-managed replicas in a single enum. + +use std::borrow::Cow; + +use k8s_openapi::api::autoscaling::v2::HorizontalPodAutoscalerSpec; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use snafu::Snafu; + +/// Errors returned by [`ReplicasConfig::validate`]. +#[derive(Debug, Snafu)] +pub enum ValidationError { + /// A `Fixed(0)` replica count is not allowed. + #[snafu(display("fixed replica count must be at least 1, got 0"))] + FixedZero, + + /// The `min_replicas` field in [`AutoConfig`] must be at least 1. + #[snafu(display("auto min_replicas must be at least 1, got {min}"))] + AutoMinZero { + /// The invalid minimum replica count. + min: u16, + }, + + /// The `max_replicas` must be greater than or equal to `min_replicas`. + #[snafu(display("auto max_replicas ({max}) must be >= min_replicas ({min})"))] + AutoMaxLessThanMin { + /// The minimum replica count. + min: u16, + /// The maximum replica count that was less than `min`. + max: u16, + }, +} + +/// Configuration for a Kubernetes `HorizontalPodAutoscaler` that manages the role group. +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct HpaConfig { + /// The HPA spec to apply. The `scaleTargetRef` and `minReplicas` fields are managed + /// by the operator and will be overwritten. + pub spec: HorizontalPodAutoscalerSpec, +} + +/// Configuration for Stackable-managed auto-scaling. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct AutoConfig { + /// Minimum number of replicas the auto-scaler may scale down to. + pub min_replicas: u16, + /// Maximum number of replicas the auto-scaler may scale up to. + pub max_replicas: u16, +} + +/// How replicas are managed for a role group. +/// +/// This enum supports multiple input formats for ergonomic YAML/JSON authoring: +/// +/// - A bare integer (e.g. `3`) is parsed as `Fixed(3)`. +/// - The string `"externallyScaled"` is parsed as `ExternallyScaled`. +/// - An object with a discriminant key (`fixed`, `hpa`, or `auto`) selects the +/// corresponding variant. +/// +/// # Validation +/// +/// After deserialization, call [`validate`](ReplicasConfig::validate) to enforce +/// business rules (e.g. `Fixed(0)` is not allowed). +#[derive(Clone, Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum ReplicasConfig { + /// A fixed number of replicas managed by the operator. + Fixed(u16), + /// Replicas managed by a Kubernetes `HorizontalPodAutoscaler`. + Hpa(Box), + /// Replicas managed by the Stackable auto-scaler. + Auto(AutoConfig), + /// Replicas managed by an external system. The operator creates a + /// [`StackableScaler`](super::v1alpha1::StackableScaler) that exposes + /// a `/scale` subresource for external controllers to target. + ExternallyScaled, +} + +impl Default for ReplicasConfig { + fn default() -> Self { + Self::Fixed(1) + } +} + +impl ReplicasConfig { + /// Validates business rules for this configuration. + /// + /// # Errors + /// + /// Returns [`ValidationError::FixedZero`] if the variant is `Fixed(0)`. + /// Returns [`ValidationError::AutoMinZero`] if `min_replicas` is 0. + /// Returns [`ValidationError::AutoMaxLessThanMin`] if `max_replicas < min_replicas`. + pub fn validate(&self) -> Result<(), ValidationError> { + match self { + Self::Fixed(0) => FixedZeroSnafu.fail(), + Self::Auto(cfg) if cfg.min_replicas == 0 => AutoMinZeroSnafu { + min: cfg.min_replicas, + } + .fail(), + Self::Auto(cfg) if cfg.max_replicas < cfg.min_replicas => AutoMaxLessThanMinSnafu { + min: cfg.min_replicas, + max: cfg.max_replicas, + } + .fail(), + _ => Ok(()), + } + } +} + +impl JsonSchema for ReplicasConfig { + fn schema_name() -> Cow<'static, str> { + "ReplicasConfig".into() + } + + fn json_schema(generator: &mut schemars::generate::SchemaGenerator) -> schemars::Schema { + schemars::json_schema!({ + "description": "Replica configuration for a role group.", + "oneOf": [ + { + "description": "Fixed replica count (bare integer).", + "type": "integer", + "minimum": 1 + }, + { + "description": "Externally managed replicas.", + "type": "string", + "const": "externallyScaled" + }, + { + "description": "Fixed replica count (object form).", + "type": "object", + "required": ["fixed"], + "properties": { + "fixed": { "type": "integer", "minimum": 1 } + }, + "additionalProperties": false + }, + { + "description": "HPA-managed replicas.", + "type": "object", + "required": ["hpa"], + "properties": { + "hpa": generator.subschema_for::() + }, + "additionalProperties": false + }, + { + "description": "Stackable auto-scaling.", + "type": "object", + "required": ["auto"], + "properties": { + "auto": generator.subschema_for::() + }, + "additionalProperties": false + } + ] + }) + } +} + +impl<'de> Deserialize<'de> for ReplicasConfig { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + use serde::de::{self, MapAccess, Visitor}; + + struct ReplicasConfigVisitor; + + impl<'de> Visitor<'de> for ReplicasConfigVisitor { + type Value = ReplicasConfig; + + fn expecting(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str( + "an integer, the string \"externallyScaled\", \ + or an object with key \"fixed\", \"hpa\", or \"auto\"", + ) + } + + fn visit_u64(self, value: u64) -> Result { + let value = u16::try_from(value) + .map_err(|_| de::Error::custom("integer out of u16 range"))?; + Ok(ReplicasConfig::Fixed(value)) + } + + fn visit_i64(self, value: i64) -> Result { + let value = u16::try_from(value) + .map_err(|_| de::Error::custom("integer out of u16 range"))?; + Ok(ReplicasConfig::Fixed(value)) + } + + fn visit_str(self, value: &str) -> Result { + match value { + "externallyScaled" => Ok(ReplicasConfig::ExternallyScaled), + other => Err(de::Error::unknown_variant(other, &["externallyScaled"])), + } + } + + fn visit_map>(self, mut map: A) -> Result { + let key: String = map + .next_key()? + .ok_or_else(|| de::Error::custom("expected a non-empty object"))?; + + let result = match key.as_str() { + "fixed" => { + let value: u16 = map.next_value()?; + ReplicasConfig::Fixed(value) + } + "hpa" => { + let value: HpaConfig = map.next_value()?; + ReplicasConfig::Hpa(Box::new(value)) + } + "auto" => { + let value: AutoConfig = map.next_value()?; + ReplicasConfig::Auto(value) + } + other => { + return Err(de::Error::unknown_field(other, &["fixed", "hpa", "auto"])); + } + }; + + // Drain remaining keys to ensure no extra fields. + if map.next_key::()?.is_some() { + return Err(de::Error::custom( + "expected exactly one key (\"fixed\", \"hpa\", or \"auto\")", + )); + } + + Ok(result) + } + } + + deserializer.deserialize_any(ReplicasConfigVisitor) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deserialize_fixed_from_integer() { + let config: ReplicasConfig = serde_json::from_str("3").expect("should parse integer"); + assert_eq!(config, ReplicasConfig::Fixed(3)); + } + + #[test] + fn deserialize_fixed_from_object() { + let config: ReplicasConfig = + serde_json::from_str(r#"{"fixed": 5}"#).expect("should parse fixed object"); + assert_eq!(config, ReplicasConfig::Fixed(5)); + } + + #[test] + fn deserialize_externally_scaled() { + let config: ReplicasConfig = + serde_json::from_str(r#""externallyScaled""#).expect("should parse string variant"); + assert_eq!(config, ReplicasConfig::ExternallyScaled); + } + + #[test] + fn deserialize_hpa() { + let config: ReplicasConfig = + serde_json::from_str(r#"{"hpa": {"spec": {"maxReplicas": 10}}}"#) + .expect("should parse hpa object"); + assert!(matches!(config, ReplicasConfig::Hpa(..))); + } + + #[test] + fn deserialize_auto() { + let config: ReplicasConfig = + serde_json::from_str(r#"{"auto": {"minReplicas": 2, "maxReplicas": 10}}"#) + .expect("should parse auto object"); + assert_eq!( + config, + ReplicasConfig::Auto(AutoConfig { + min_replicas: 2, + max_replicas: 10, + }) + ); + } + + #[test] + fn fixed_zero_is_invalid() { + let config = ReplicasConfig::Fixed(0); + let result = config.validate(); + assert!(matches!(result, Err(ValidationError::FixedZero))); + } + + #[test] + fn auto_min_zero_is_invalid() { + let config = ReplicasConfig::Auto(AutoConfig { + min_replicas: 0, + max_replicas: 5, + }); + let result = config.validate(); + assert!(matches!(result, Err(ValidationError::AutoMinZero { .. }))); + } + + #[test] + fn auto_max_less_than_min_is_invalid() { + let config = ReplicasConfig::Auto(AutoConfig { + min_replicas: 5, + max_replicas: 2, + }); + let result = config.validate(); + assert!(matches!( + result, + Err(ValidationError::AutoMaxLessThanMin { .. }) + )); + } + + #[test] + fn option_none_defaults_to_fixed_1() { + let config: Option = + serde_json::from_str("null").expect("should parse null"); + assert_eq!(config.unwrap_or_default(), ReplicasConfig::Fixed(1)); + } +} From bd3b25ccd9464b27229d055a93a48449e8bf5df1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Thu, 19 Mar 2026 16:57:23 +0100 Subject: [PATCH 6/9] refactor(scaler): remove identity fields from StackableScalerSpec Remove cluster_ref, role, and role_group from StackableScalerSpec since identity is now conveyed via owner references and labels set by callers. Also remove the UnknownClusterRef struct which was only used by the spec. The reconcile_scaler() function now accepts role_group_name as an explicit parameter instead of reading it from the spec. Co-Authored-By: Claude Opus 4.6 --- .../stackable-operator/src/crd/scaler/mod.rs | 69 ++----------------- .../src/crd/scaler/reconciler.rs | 5 +- 2 files changed, 11 insertions(+), 63 deletions(-) diff --git a/crates/stackable-operator/src/crd/scaler/mod.rs b/crates/stackable-operator/src/crd/scaler/mod.rs index dfc262a41..74ff1de3a 100644 --- a/crates/stackable-operator/src/crd/scaler/mod.rs +++ b/crates/stackable-operator/src/crd/scaler/mod.rs @@ -164,17 +164,6 @@ pub struct StackableScalerStatus { pub current_state: Option, } -/// A type-erased cluster reference used in StackableScaler. -/// Does not carry apiVersion — CRD versioning handles conversions. -#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct UnknownClusterRef { - /// The Kubernetes kind of the target cluster resource (e.g. "NifiCluster"). - pub kind: String, - /// The name of the target cluster resource within the same namespace. - pub name: String, -} - #[versioned( version(name = "v1alpha1"), crates( @@ -203,12 +192,6 @@ pub mod versioned { /// Desired replica count. Written by the HPA via the /scale subresource. /// Only takes effect when the referenced roleGroup has `replicas: 0`. pub replicas: i32, - /// Reference to the Stackable cluster resource this scaler manages. - pub cluster_ref: UnknownClusterRef, - /// The role within the cluster (e.g. `nodes`). - pub role: String, - /// The role group within the role (e.g. `default`). - pub role_group: String, } } @@ -295,15 +278,7 @@ mod tests { #[test] fn spec_round_trips() { - let spec = v1alpha1::StackableScalerSpec { - replicas: 3, - cluster_ref: UnknownClusterRef { - kind: "NifiCluster".to_string(), - name: "my-nifi".to_string(), - }, - role: "nodes".to_string(), - role_group: "default".to_string(), - }; + let spec = v1alpha1::StackableScalerSpec { replicas: 3 }; let json = serde_json::to_string(&spec).unwrap(); let back: v1alpha1::StackableScalerSpec = serde_json::from_str(&json).unwrap(); assert_eq!(spec, back); @@ -321,18 +296,8 @@ mod tests { #[test] fn resolve_replicas_zero_with_scaler_uses_status() { - let mut scaler = v1alpha1::StackableScaler::new( - "test", - v1alpha1::StackableScalerSpec { - replicas: 5, - cluster_ref: UnknownClusterRef { - kind: "NifiCluster".into(), - name: "n".into(), - }, - role: "nodes".into(), - role_group: "default".into(), - }, - ); + let mut scaler = + v1alpha1::StackableScaler::new("test", v1alpha1::StackableScalerSpec { replicas: 5 }); scaler.status = Some(StackableScalerStatus { replicas: 3, ..Default::default() @@ -344,18 +309,8 @@ mod tests { fn resolve_replicas_nonzero_with_scaler_ignores_scaler() { // role_group.replicas != 0 → scaler is not active (validation webhook should prevent this, // but we defensively fall back to the role group value) - let mut scaler = v1alpha1::StackableScaler::new( - "test", - v1alpha1::StackableScalerSpec { - replicas: 5, - cluster_ref: UnknownClusterRef { - kind: "NifiCluster".into(), - name: "n".into(), - }, - role: "nodes".into(), - role_group: "default".into(), - }, - ); + let mut scaler = + v1alpha1::StackableScaler::new("test", v1alpha1::StackableScalerSpec { replicas: 5 }); scaler.status = Some(StackableScalerStatus { replicas: 4, ..Default::default() @@ -366,18 +321,8 @@ mod tests { #[test] fn resolve_replicas_zero_scaler_no_status_returns_none() { // Scaler exists but has no status yet (just created) → return None (don't set replicas) - let scaler = v1alpha1::StackableScaler::new( - "test", - v1alpha1::StackableScalerSpec { - replicas: 5, - cluster_ref: UnknownClusterRef { - kind: "NifiCluster".into(), - name: "n".into(), - }, - role: "nodes".into(), - role_group: "default".into(), - }, - ); + let scaler = + v1alpha1::StackableScaler::new("test", v1alpha1::StackableScalerSpec { replicas: 5 }); assert_eq!(resolve_replicas(Some(0), Some(&scaler)), None); } } diff --git a/crates/stackable-operator/src/crd/scaler/reconciler.rs b/crates/stackable-operator/src/crd/scaler/reconciler.rs index 8e1b2b949..c6f699c42 100644 --- a/crates/stackable-operator/src/crd/scaler/reconciler.rs +++ b/crates/stackable-operator/src/crd/scaler/reconciler.rs @@ -121,6 +121,8 @@ enum NextStage { /// - `selector`: Pod label selector string for this role group (e.g. /// `"app=myproduct,roleGroup=default"`). Written into `status.selector` for HPA /// pod counting. Must be stable across reconcile calls. +/// - `role_group_name`: The name of the role group being scaled (e.g. `"default"`). +/// Passed to hooks via [`ScalingContext`]. /// /// # Errors /// @@ -132,6 +134,7 @@ pub async fn reconcile_scaler( client: &Client, statefulset_stable: bool, selector: &str, + role_group_name: &str, ) -> Result where H: ScalingHooks, @@ -214,7 +217,7 @@ where let ctx = ScalingContext { client, namespace, - role_group_name: &scaler.spec.role_group, + role_group_name, current_replicas: direction_base, desired_replicas, direction: ScalingDirection::from_replicas(direction_base, desired_replicas), From 76787f0cc6f8869ea997f31cc2161789142ee864 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Thu, 19 Mar 2026 17:06:42 +0100 Subject: [PATCH 7/9] feat: implement ClusterResource for StackableScaler and HorizontalPodAutoscaler Enable StackableScaler and HPA to be managed through ClusterResources.add(), providing label validation and orphan cleanup. Adds DeepMerge implementations for StackableScaler and its status types, and registers both resource types in delete_orphaned_resources(). Co-Authored-By: Claude Opus 4.6 --- .../src/cluster_resources.rs | 5 +++ .../src/crd/scaler/cluster_resource_impl.rs | 35 +++++++++++++++++++ .../stackable-operator/src/crd/scaler/mod.rs | 1 + 3 files changed, 41 insertions(+) create mode 100644 crates/stackable-operator/src/crd/scaler/cluster_resource_impl.rs diff --git a/crates/stackable-operator/src/cluster_resources.rs b/crates/stackable-operator/src/cluster_resources.rs index b8ece1137..82a0b7d7c 100644 --- a/crates/stackable-operator/src/cluster_resources.rs +++ b/crates/stackable-operator/src/cluster_resources.rs @@ -13,6 +13,7 @@ use k8s_openapi::{ apps::v1::{ DaemonSet, DaemonSetSpec, Deployment, DeploymentSpec, StatefulSet, StatefulSetSpec, }, + autoscaling::v2::HorizontalPodAutoscaler, batch::v1::Job, core::v1::{ ConfigMap, ObjectReference, PodSpec, PodTemplateSpec, Secret, Service, ServiceAccount, @@ -221,7 +222,9 @@ impl ClusterResource for Service {} impl ClusterResource for ServiceAccount {} impl ClusterResource for RoleBinding {} impl ClusterResource for PodDisruptionBudget {} +impl ClusterResource for HorizontalPodAutoscaler {} impl ClusterResource for listener::v1alpha1::Listener {} +impl ClusterResource for crate::crd::scaler::v1alpha1::StackableScaler {} impl ClusterResource for Job { fn pod_spec(&self) -> Option<&PodSpec> { @@ -681,6 +684,8 @@ impl<'a> ClusterResources<'a> { self.delete_orphaned_resources_of_kind::(client), self.delete_orphaned_resources_of_kind::(client), self.delete_orphaned_resources_of_kind::(client), + self.delete_orphaned_resources_of_kind::(client), + self.delete_orphaned_resources_of_kind::(client), )?; Ok(()) diff --git a/crates/stackable-operator/src/crd/scaler/cluster_resource_impl.rs b/crates/stackable-operator/src/crd/scaler/cluster_resource_impl.rs new file mode 100644 index 000000000..4daa0df7e --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/cluster_resource_impl.rs @@ -0,0 +1,35 @@ +use k8s_openapi::DeepMerge; + +use super::{ScalerState, ScalerStage, StackableScalerStatus, v1alpha1::StackableScaler}; + +impl DeepMerge for StackableScaler { + fn merge_from(&mut self, other: Self) { + DeepMerge::merge_from(&mut self.metadata, other.metadata); + DeepMerge::merge_from(&mut self.spec.replicas, other.spec.replicas); + DeepMerge::merge_from(&mut self.status, other.status); + } +} + +impl DeepMerge for StackableScalerStatus { + fn merge_from(&mut self, other: Self) { + DeepMerge::merge_from(&mut self.replicas, other.replicas); + DeepMerge::merge_from(&mut self.selector, other.selector); + DeepMerge::merge_from(&mut self.desired_replicas, other.desired_replicas); + DeepMerge::merge_from(&mut self.previous_replicas, other.previous_replicas); + DeepMerge::merge_from(&mut self.current_state, other.current_state); + } +} + +impl DeepMerge for ScalerState { + fn merge_from(&mut self, other: Self) { + DeepMerge::merge_from(&mut self.stage, other.stage); + // `Time` does not implement `DeepMerge`, so we replace directly. + self.last_transition_time = other.last_transition_time; + } +} + +impl DeepMerge for ScalerStage { + fn merge_from(&mut self, other: Self) { + *self = other; + } +} diff --git a/crates/stackable-operator/src/crd/scaler/mod.rs b/crates/stackable-operator/src/crd/scaler/mod.rs index 74ff1de3a..5e3f9af60 100644 --- a/crates/stackable-operator/src/crd/scaler/mod.rs +++ b/crates/stackable-operator/src/crd/scaler/mod.rs @@ -26,6 +26,7 @@ use serde::{Deserialize, Serialize}; use crate::versioned::versioned; +mod cluster_resource_impl; pub mod hooks; pub mod job_tracker; pub mod reconciler; From cbaa2a68fc279cdf736b3af45524c2f2c12fc00b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Thu, 19 Mar 2026 18:04:50 +0100 Subject: [PATCH 8/9] feat(scaler): add build_scaler() helper for constructing StackableScaler objects Provides a shared helper that product operators use to construct StackableScaler resources with the required labels (name, instance, managed-by, component, role-group) and owner reference, ensuring ClusterResources.add() validation passes consistently. Co-Authored-By: Claude Opus 4.6 --- .../src/crd/scaler/builder.rs | 240 ++++++++++++++++++ .../src/crd/scaler/cluster_resource_impl.rs | 2 +- .../stackable-operator/src/crd/scaler/mod.rs | 2 + 3 files changed, 243 insertions(+), 1 deletion(-) create mode 100644 crates/stackable-operator/src/crd/scaler/builder.rs diff --git a/crates/stackable-operator/src/crd/scaler/builder.rs b/crates/stackable-operator/src/crd/scaler/builder.rs new file mode 100644 index 000000000..4b4e90d39 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/builder.rs @@ -0,0 +1,240 @@ +//! Builder helper for constructing [`StackableScaler`] objects with proper metadata. +//! +//! Product operators use [`build_scaler`] to create a `StackableScaler` for each +//! auto-scaled role group, ensuring that all required labels are set so that +//! [`ClusterResources::add`](crate::cluster_resources::ClusterResources) validation passes. + +use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; +use snafu::{ResultExt, Snafu}; + +use crate::{ + builder::meta::ObjectMetaBuilder, + kvp::{Label, LabelError, Labels, consts::K8S_APP_MANAGED_BY_KEY}, +}; + +use super::v1alpha1::{StackableScaler, StackableScalerSpec}; + +/// Error returned by [`build_scaler`]. +#[derive(Debug, Snafu)] +pub enum BuildScalerError { + /// A label value failed validation. + #[snafu(display("failed to construct label for scaler"))] + Label { source: LabelError }, + + /// The metadata builder failed (e.g. missing owner reference fields). + #[snafu(display("failed to build ObjectMeta for scaler"))] + ObjectMeta { source: crate::builder::meta::Error }, +} + +/// Constructs a [`StackableScaler`] with the required labels and owner reference. +/// +/// The generated scaler name follows the convention +/// `{cluster_name}-{role}-{role_group}-scaler`. +/// +/// # Labels +/// +/// The following `app.kubernetes.io` labels are set: +/// +/// | Key | Value | +/// |-----|-------| +/// | `app.kubernetes.io/name` | `app_name` | +/// | `app.kubernetes.io/instance` | `cluster_name` | +/// | `app.kubernetes.io/managed-by` | `managed_by` | +/// | `app.kubernetes.io/component` | `role` | +/// | `app.kubernetes.io/role-group` | `role_group` | +/// +/// # Errors +/// +/// Returns [`BuildScalerError::Label`] if any label value is invalid. +/// Returns [`BuildScalerError::ObjectMeta`] if the owner reference cannot be set. +// `clippy::too_many_arguments` suppressed: these parameters correspond 1:1 to the +// distinct Kubernetes metadata fields required on a StackableScaler. Grouping them +// into a struct would just push the field list one level deeper without reducing +// cognitive load, since callers already have each value as a separate variable. +#[allow(clippy::too_many_arguments)] +pub fn build_scaler( + cluster_name: &str, + app_name: &str, + namespace: &str, + role: &str, + role_group: &str, + initial_replicas: i32, + owner_ref: &OwnerReference, + managed_by: &str, +) -> Result { + let scaler_name = format!("{cluster_name}-{role}-{role_group}-scaler"); + + // Build the label set: name + instance + component + role-group + managed-by + let mut labels = Labels::common(app_name, cluster_name).context(LabelSnafu)?; + labels.insert(Label::component(role).context(LabelSnafu)?); + labels.insert(Label::role_group(role_group).context(LabelSnafu)?); + labels.insert(Label::try_from((K8S_APP_MANAGED_BY_KEY, managed_by)).context(LabelSnafu)?); + + let metadata = ObjectMetaBuilder::new() + .name(&scaler_name) + .namespace(namespace) + .ownerreference(owner_ref.clone()) + .with_labels(labels) + .build(); + + Ok(StackableScaler { + metadata, + spec: StackableScalerSpec { + replicas: initial_replicas, + }, + status: None, + }) +} + +#[cfg(test)] +mod tests { + use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; + + use super::*; + + fn test_owner_ref() -> OwnerReference { + OwnerReference { + api_version: "nifi.stackable.tech/v1alpha1".to_string(), + kind: "NifiCluster".to_string(), + name: "my-nifi".to_string(), + uid: "abc-123".to_string(), + controller: Some(true), + block_owner_deletion: Some(true), + } + } + + #[test] + fn build_scaler_sets_replicas() { + let owner_ref = test_owner_ref(); + let scaler = build_scaler( + "my-nifi", + "nifi", + "default", + "nodes", + "default", + 3, + &owner_ref, + "nifi-operator", + ) + .expect("build_scaler should succeed"); + + assert_eq!(scaler.spec.replicas, 3); + } + + #[test] + fn build_scaler_sets_owner_reference() { + let owner_ref = test_owner_ref(); + let scaler = build_scaler( + "my-nifi", + "nifi", + "default", + "nodes", + "default", + 1, + &owner_ref, + "nifi-operator", + ) + .expect("build_scaler should succeed"); + + let refs = scaler + .metadata + .owner_references + .as_ref() + .expect("owner_references should be set"); + assert_eq!(refs.len(), 1); + assert_eq!(refs[0].name, "my-nifi"); + assert_eq!(refs[0].kind, "NifiCluster"); + assert_eq!(refs[0].uid, "abc-123"); + } + + #[test] + fn build_scaler_sets_required_labels() { + let owner_ref = test_owner_ref(); + let scaler = build_scaler( + "my-nifi", + "nifi", + "default", + "nodes", + "default", + 1, + &owner_ref, + "nifi-operator", + ) + .expect("build_scaler should succeed"); + + let labels = scaler + .metadata + .labels + .as_ref() + .expect("labels should be set"); + + assert_eq!( + labels.get("app.kubernetes.io/name"), + Some(&"nifi".to_string()), + "app.kubernetes.io/name should be the app_name" + ); + assert_eq!( + labels.get("app.kubernetes.io/instance"), + Some(&"my-nifi".to_string()), + "app.kubernetes.io/instance should be the cluster_name" + ); + assert_eq!( + labels.get("app.kubernetes.io/managed-by"), + Some(&"nifi-operator".to_string()), + "app.kubernetes.io/managed-by should be managed_by" + ); + assert_eq!( + labels.get("app.kubernetes.io/component"), + Some(&"nodes".to_string()), + "app.kubernetes.io/component should be the role" + ); + assert_eq!( + labels.get("app.kubernetes.io/role-group"), + Some(&"default".to_string()), + "app.kubernetes.io/role-group should be the role_group" + ); + } + + #[test] + fn build_scaler_generates_correct_name() { + let owner_ref = test_owner_ref(); + let scaler = build_scaler( + "my-nifi", + "nifi", + "production", + "nodes", + "workers", + 5, + &owner_ref, + "nifi-operator", + ) + .expect("build_scaler should succeed"); + + assert_eq!( + scaler.metadata.name.as_deref(), + Some("my-nifi-nodes-workers-scaler") + ); + assert_eq!(scaler.metadata.namespace.as_deref(), Some("production")); + } + + #[test] + fn build_scaler_status_is_none() { + let owner_ref = test_owner_ref(); + let scaler = build_scaler( + "my-nifi", + "nifi", + "default", + "nodes", + "default", + 1, + &owner_ref, + "nifi-operator", + ) + .expect("build_scaler should succeed"); + + assert!( + scaler.status.is_none(), + "status should be None on a newly built scaler" + ); + } +} diff --git a/crates/stackable-operator/src/crd/scaler/cluster_resource_impl.rs b/crates/stackable-operator/src/crd/scaler/cluster_resource_impl.rs index 4daa0df7e..5feb67d3f 100644 --- a/crates/stackable-operator/src/crd/scaler/cluster_resource_impl.rs +++ b/crates/stackable-operator/src/crd/scaler/cluster_resource_impl.rs @@ -1,6 +1,6 @@ use k8s_openapi::DeepMerge; -use super::{ScalerState, ScalerStage, StackableScalerStatus, v1alpha1::StackableScaler}; +use super::{ScalerStage, ScalerState, StackableScalerStatus, v1alpha1::StackableScaler}; impl DeepMerge for StackableScaler { fn merge_from(&mut self, other: Self) { diff --git a/crates/stackable-operator/src/crd/scaler/mod.rs b/crates/stackable-operator/src/crd/scaler/mod.rs index 5e3f9af60..e1b8ebe13 100644 --- a/crates/stackable-operator/src/crd/scaler/mod.rs +++ b/crates/stackable-operator/src/crd/scaler/mod.rs @@ -26,6 +26,7 @@ use serde::{Deserialize, Serialize}; use crate::versioned::versioned; +mod builder; mod cluster_resource_impl; pub mod hooks; pub mod job_tracker; @@ -226,6 +227,7 @@ pub fn resolve_replicas( } } +pub use builder::{BuildScalerError, build_scaler}; pub use hooks::{ HookOutcome, ScalingCondition, ScalingContext, ScalingDirection, ScalingHooks, ScalingResult, }; From 5a46dec88c7e7bb74b58727a54cc96a806d0051a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Thu, 19 Mar 2026 18:12:42 +0100 Subject: [PATCH 9/9] feat(scaler): add HPA builder and status initialization helpers Add three public helpers to the scaler module: - `scale_target_ref()`: builds a CrossVersionObjectReference pointing at a StackableScaler, for use as an HPA's scaleTargetRef. - `build_hpa_from_user_spec()`: constructs a HorizontalPodAutoscaler from a user-provided spec, overwriting scaleTargetRef to target the correct StackableScaler and applying the standard 5-label set. - `initialize_scaler_status()`: patches a freshly created scaler's status subresource with the current replica count and Idle stage, preventing the scale-to-zero edge case on first reconcile. Also makes BuildScalerError's snafu context selectors pub(super) so sibling modules can reuse them via .context(). Co-Authored-By: Claude Opus 4.6 --- .../src/crd/scaler/builder.rs | 3 +- .../src/crd/scaler/hpa_builder.rs | 338 ++++++++++++++++++ .../stackable-operator/src/crd/scaler/mod.rs | 4 + 3 files changed, 344 insertions(+), 1 deletion(-) create mode 100644 crates/stackable-operator/src/crd/scaler/hpa_builder.rs diff --git a/crates/stackable-operator/src/crd/scaler/builder.rs b/crates/stackable-operator/src/crd/scaler/builder.rs index 4b4e90d39..268646a0d 100644 --- a/crates/stackable-operator/src/crd/scaler/builder.rs +++ b/crates/stackable-operator/src/crd/scaler/builder.rs @@ -14,8 +14,9 @@ use crate::{ use super::v1alpha1::{StackableScaler, StackableScalerSpec}; -/// Error returned by [`build_scaler`]. +/// Error returned by [`build_scaler`] and [`build_hpa_from_user_spec`](super::build_hpa_from_user_spec). #[derive(Debug, Snafu)] +#[snafu(visibility(pub(super)))] pub enum BuildScalerError { /// A label value failed validation. #[snafu(display("failed to construct label for scaler"))] diff --git a/crates/stackable-operator/src/crd/scaler/hpa_builder.rs b/crates/stackable-operator/src/crd/scaler/hpa_builder.rs new file mode 100644 index 000000000..ee6004905 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/hpa_builder.rs @@ -0,0 +1,338 @@ +//! Builder helpers for constructing `HorizontalPodAutoscaler` objects and initializing +//! [`StackableScaler`] status. +//! +//! Product operators use [`build_hpa_from_user_spec`] to create an HPA whose +//! `scaleTargetRef` always points at the correct [`StackableScaler`], and +//! [`initialize_scaler_status`] to seed the scaler's status subresource so that +//! the first reconcile does not see `replicas: 0` and trigger an unintended scale-to-zero. + +use k8s_openapi::api::autoscaling::v2::{ + CrossVersionObjectReference, HorizontalPodAutoscaler, HorizontalPodAutoscalerSpec, +}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::{OwnerReference, Time}; +use k8s_openapi::jiff::Timestamp; +use snafu::{ResultExt, Snafu}; + +use crate::builder::meta::ObjectMetaBuilder; +use crate::client::Client; +use crate::kvp::{Label, Labels, consts::K8S_APP_MANAGED_BY_KEY}; + +use super::builder::BuildScalerError; +use super::v1alpha1::StackableScaler; +use super::{ScalerStage, ScalerState, StackableScalerStatus}; + +/// Errors returned by [`initialize_scaler_status`]. +#[derive(Debug, Snafu)] +pub enum InitializeStatusError { + /// The Kubernetes status patch for the [`StackableScaler`] failed. + #[snafu(display("failed to patch initial StackableScaler status"))] + PatchStatus { + #[snafu(source(from(crate::client::Error, Box::new)))] + source: Box, + }, +} + +/// Build a [`CrossVersionObjectReference`] that points at a [`StackableScaler`]. +/// +/// The returned reference is suitable for use as the `scaleTargetRef` of a +/// `HorizontalPodAutoscaler`. +/// +/// # Parameters +/// +/// - `scaler_name`: The `metadata.name` of the target [`StackableScaler`]. +/// - `group`: The API group (e.g. `"autoscaling.stackable.tech"`). +/// - `version`: The API version (e.g. `"v1alpha1"`). +pub fn scale_target_ref( + scaler_name: &str, + group: &str, + version: &str, +) -> CrossVersionObjectReference { + CrossVersionObjectReference { + kind: "StackableScaler".to_string(), + name: scaler_name.to_string(), + api_version: Some(format!("{group}/{version}")), + } +} + +/// Build a [`HorizontalPodAutoscaler`] from a user-provided spec, overwriting +/// `scaleTargetRef` so it always points at the correct [`StackableScaler`]. +/// +/// The generated HPA name follows the convention +/// `{cluster_name}-{role}-{role_group}-hpa`. +/// +/// # Labels +/// +/// The same five `app.kubernetes.io` labels used by [`build_scaler`](super::build_scaler) +/// are applied: +/// +/// | Key | Value | +/// |-----|-------| +/// | `app.kubernetes.io/name` | `app_name` | +/// | `app.kubernetes.io/instance` | `cluster_name` | +/// | `app.kubernetes.io/managed-by` | `managed_by` | +/// | `app.kubernetes.io/component` | `role` | +/// | `app.kubernetes.io/role-group` | `role_group` | +/// +/// # Errors +/// +/// Returns [`BuildScalerError::Label`] if any label value is invalid. +/// Returns [`BuildScalerError::ObjectMeta`] if the owner reference cannot be set. +// `clippy::too_many_arguments` suppressed: these parameters correspond 1:1 to the +// distinct Kubernetes metadata fields required on an HPA. Grouping them into a struct +// would just push the field list one level deeper without reducing cognitive load, +// since callers already have each value as a separate variable. +#[allow(clippy::too_many_arguments)] +pub fn build_hpa_from_user_spec( + user_spec: &HorizontalPodAutoscalerSpec, + target_ref: &CrossVersionObjectReference, + cluster_name: &str, + app_name: &str, + namespace: &str, + role: &str, + role_group: &str, + owner_ref: &OwnerReference, + managed_by: &str, +) -> Result { + let hpa_name = format!("{cluster_name}-{role}-{role_group}-hpa"); + + let mut labels = Labels::common(app_name, cluster_name).context(super::builder::LabelSnafu)?; + labels.insert(Label::component(role).context(super::builder::LabelSnafu)?); + labels.insert(Label::role_group(role_group).context(super::builder::LabelSnafu)?); + labels.insert( + Label::try_from((K8S_APP_MANAGED_BY_KEY, managed_by)) + .context(super::builder::LabelSnafu)?, + ); + + let metadata = ObjectMetaBuilder::new() + .name(&hpa_name) + .namespace(namespace) + .ownerreference(owner_ref.clone()) + .with_labels(labels) + .build(); + + let mut spec = user_spec.clone(); + spec.scale_target_ref = target_ref.clone(); + + Ok(HorizontalPodAutoscaler { + metadata, + spec: Some(spec), + status: None, + }) +} + +/// Patch a freshly created [`StackableScaler`]'s status to prevent scale-to-zero. +/// +/// When a scaler is first created it has no status. Without this initialization, +/// reading `status.replicas` would yield `0`, causing the StatefulSet to scale down +/// to zero pods. This function seeds the status with the current replica count and +/// an `Idle` stage so that the first reconcile sees the correct baseline. +/// +/// # Parameters +/// +/// - `client`: Kubernetes client for the status patch operation. +/// - `scaler`: The freshly created [`StackableScaler`] resource. +/// - `current_replicas`: The current replica count of the managed StatefulSet. +/// - `selector`: Pod label selector string for HPA pod counting (e.g. +/// `"app=myproduct,roleGroup=default"`). +/// +/// # Errors +/// +/// Returns [`InitializeStatusError::PatchStatus`] if the Kubernetes status patch fails. +pub async fn initialize_scaler_status( + client: &Client, + scaler: &StackableScaler, + current_replicas: i32, + selector: &str, +) -> Result<(), InitializeStatusError> { + let status = StackableScalerStatus { + replicas: current_replicas, + selector: Some(selector.to_string()), + desired_replicas: Some(current_replicas), + previous_replicas: None, + current_state: Some(ScalerState { + stage: ScalerStage::Idle, + last_transition_time: Time(Timestamp::now()), + }), + }; + + client + .apply_patch_status("stackable-operator", scaler, &status) + .await + .map(|_| ()) + .context(PatchStatusSnafu) +} + +#[cfg(test)] +mod tests { + use k8s_openapi::api::autoscaling::v2::{ + CrossVersionObjectReference, HorizontalPodAutoscalerSpec, + }; + use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; + + use super::*; + + fn test_owner_ref() -> OwnerReference { + OwnerReference { + api_version: "nifi.stackable.tech/v1alpha1".to_string(), + kind: "NifiCluster".to_string(), + name: "my-nifi".to_string(), + uid: "abc-123".to_string(), + controller: Some(true), + block_owner_deletion: Some(true), + } + } + + fn test_target_ref() -> CrossVersionObjectReference { + scale_target_ref( + "my-nifi-nodes-default-scaler", + "autoscaling.stackable.tech", + "v1alpha1", + ) + } + + #[test] + fn scale_target_ref_points_to_scaler() { + let target = scale_target_ref( + "my-nifi-nodes-default-scaler", + "autoscaling.stackable.tech", + "v1alpha1", + ); + + assert_eq!(target.kind, "StackableScaler"); + assert_eq!(target.name, "my-nifi-nodes-default-scaler"); + assert_eq!( + target.api_version.as_deref(), + Some("autoscaling.stackable.tech/v1alpha1") + ); + } + + #[test] + fn build_hpa_overwrites_scale_target_ref() { + let wrong_ref = CrossVersionObjectReference { + kind: "Deployment".to_string(), + name: "wrong-target".to_string(), + api_version: Some("apps/v1".to_string()), + }; + let user_spec = HorizontalPodAutoscalerSpec { + max_replicas: 10, + scale_target_ref: wrong_ref, + ..Default::default() + }; + let correct_ref = test_target_ref(); + + let hpa = build_hpa_from_user_spec( + &user_spec, + &correct_ref, + "my-nifi", + "nifi", + "default", + "nodes", + "default", + &test_owner_ref(), + "nifi-operator", + ) + .expect("build_hpa_from_user_spec should succeed"); + + let spec = hpa.spec.expect("spec should be set"); + assert_eq!(spec.scale_target_ref.kind, "StackableScaler"); + assert_eq!(spec.scale_target_ref.name, "my-nifi-nodes-default-scaler"); + assert_eq!( + spec.scale_target_ref.api_version.as_deref(), + Some("autoscaling.stackable.tech/v1alpha1") + ); + // Original max_replicas should be preserved. + assert_eq!(spec.max_replicas, 10); + } + + #[test] + fn build_hpa_sets_required_labels() { + let user_spec = HorizontalPodAutoscalerSpec { + max_replicas: 5, + scale_target_ref: CrossVersionObjectReference { + kind: "Deployment".to_string(), + name: "placeholder".to_string(), + api_version: None, + }, + ..Default::default() + }; + let target_ref = test_target_ref(); + + let hpa = build_hpa_from_user_spec( + &user_spec, + &target_ref, + "my-nifi", + "nifi", + "production", + "nodes", + "workers", + &test_owner_ref(), + "nifi-operator", + ) + .expect("build_hpa_from_user_spec should succeed"); + + let labels = hpa.metadata.labels.as_ref().expect("labels should be set"); + + assert_eq!( + labels.get("app.kubernetes.io/name"), + Some(&"nifi".to_string()), + "app.kubernetes.io/name should be the app_name" + ); + assert_eq!( + labels.get("app.kubernetes.io/instance"), + Some(&"my-nifi".to_string()), + "app.kubernetes.io/instance should be the cluster_name" + ); + assert_eq!( + labels.get("app.kubernetes.io/managed-by"), + Some(&"nifi-operator".to_string()), + "app.kubernetes.io/managed-by should be managed_by" + ); + assert_eq!( + labels.get("app.kubernetes.io/component"), + Some(&"nodes".to_string()), + "app.kubernetes.io/component should be the role" + ); + assert_eq!( + labels.get("app.kubernetes.io/role-group"), + Some(&"workers".to_string()), + "app.kubernetes.io/role-group should be the role_group" + ); + } + + #[test] + fn build_hpa_generates_correct_name() { + let user_spec = HorizontalPodAutoscalerSpec { + max_replicas: 5, + scale_target_ref: CrossVersionObjectReference { + kind: "Deployment".to_string(), + name: "placeholder".to_string(), + api_version: None, + }, + ..Default::default() + }; + let target_ref = test_target_ref(); + + let hpa = build_hpa_from_user_spec( + &user_spec, + &target_ref, + "my-nifi", + "nifi", + "production", + "nodes", + "workers", + &test_owner_ref(), + "nifi-operator", + ) + .expect("build_hpa_from_user_spec should succeed"); + + assert_eq!( + hpa.metadata.name.as_deref(), + Some("my-nifi-nodes-workers-hpa") + ); + assert_eq!(hpa.metadata.namespace.as_deref(), Some("production")); + } + + // `initialize_scaler_status` requires a running Kubernetes cluster and is not + // unit-testable. It is tested indirectly via the scaler reconciler integration + // tests and the NiFi operator end-to-end tests. +} diff --git a/crates/stackable-operator/src/crd/scaler/mod.rs b/crates/stackable-operator/src/crd/scaler/mod.rs index e1b8ebe13..578b97483 100644 --- a/crates/stackable-operator/src/crd/scaler/mod.rs +++ b/crates/stackable-operator/src/crd/scaler/mod.rs @@ -29,6 +29,7 @@ use crate::versioned::versioned; mod builder; mod cluster_resource_impl; pub mod hooks; +mod hpa_builder; pub mod job_tracker; pub mod reconciler; mod replicas_config; @@ -231,6 +232,9 @@ pub use builder::{BuildScalerError, build_scaler}; pub use hooks::{ HookOutcome, ScalingCondition, ScalingContext, ScalingDirection, ScalingHooks, ScalingResult, }; +pub use hpa_builder::{ + InitializeStatusError, build_hpa_from_user_spec, initialize_scaler_status, scale_target_ref, +}; pub use job_tracker::{JobTracker, JobTrackerError, job_name}; pub use reconciler::{Error as ReconcilerError, reconcile_scaler}; pub use replicas_config::{