diff --git a/Cargo.lock b/Cargo.lock index 67f360b32..c2d13ffc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -746,7 +746,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -1426,7 +1426,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -1532,7 +1532,7 @@ dependencies = [ [[package]] name = "kube" version = "3.0.1" -source = "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5" +source = "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#1320643f8ce7f8189e03496ff1329d678d76224c" dependencies = [ "k8s-openapi", "kube-client", @@ -1544,7 +1544,7 @@ dependencies = [ [[package]] name = "kube-client" version = "3.0.1" -source = "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5" +source = "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#1320643f8ce7f8189e03496ff1329d678d76224c" dependencies = [ "base64", "bytes", @@ -1578,7 +1578,7 @@ dependencies = [ [[package]] name = "kube-core" version = "3.0.1" -source = "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5" +source = "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#1320643f8ce7f8189e03496ff1329d678d76224c" dependencies = [ "derive_more", "form_urlencoded", @@ -1596,7 +1596,7 @@ dependencies = [ [[package]] name = "kube-derive" version = "3.0.1" -source = "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5" +source = "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#1320643f8ce7f8189e03496ff1329d678d76224c" dependencies = [ "darling", "proc-macro2", @@ -1609,7 +1609,7 @@ dependencies = [ [[package]] name = "kube-runtime" version = "3.0.1" -source = "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5" +source = "git+https://github.com/kube-rs/kube-rs?rev=1320643f8ce7f8189e03496ff1329d678d76224c#1320643f8ce7f8189e03496ff1329d678d76224c" dependencies = [ "ahash", "async-broadcast", @@ -1740,7 +1740,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2475,7 +2475,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3188,7 +3188,7 @@ dependencies = [ "getrandom 0.4.1", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3888,7 +3888,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9ca62620b..f70930b8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ k8s-openapi = { version = "0.27.0", default-features = false, features = ["schem # We use rustls instead of openssl for easier portability, e.g. so that we can build stackablectl without the need to vendor (build from source) openssl # We use ring instead of aws-lc-rs, as this currently fails to build in "make run-dev" # We need a few schema fixes in kube, that went into main, but are not released yet -kube = { git = "https://github.com/kube-rs/kube-rs", rev = "fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5", version = "=3.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "admission", "rustls-tls", "ring"] } +kube = { git = "https://github.com/kube-rs/kube-rs", rev = "1320643f8ce7f8189e03496ff1329d678d76224c", version = "=3.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "admission", "rustls-tls", "ring"] } opentelemetry = "0.31.0" opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } opentelemetry-appender-tracing = "0.31.0" diff --git a/crates/stackable-operator/src/cluster_resources.rs b/crates/stackable-operator/src/cluster_resources.rs index b8ece1137..82a0b7d7c 100644 --- a/crates/stackable-operator/src/cluster_resources.rs +++ b/crates/stackable-operator/src/cluster_resources.rs @@ -13,6 +13,7 @@ use k8s_openapi::{ apps::v1::{ DaemonSet, DaemonSetSpec, Deployment, DeploymentSpec, StatefulSet, StatefulSetSpec, }, + autoscaling::v2::HorizontalPodAutoscaler, batch::v1::Job, core::v1::{ ConfigMap, ObjectReference, PodSpec, PodTemplateSpec, Secret, Service, ServiceAccount, @@ -221,7 +222,9 @@ impl ClusterResource for Service {} impl ClusterResource for ServiceAccount {} impl ClusterResource for RoleBinding {} impl ClusterResource for PodDisruptionBudget {} +impl ClusterResource for HorizontalPodAutoscaler {} impl ClusterResource for listener::v1alpha1::Listener {} +impl ClusterResource for crate::crd::scaler::v1alpha1::StackableScaler {} impl ClusterResource for Job { fn pod_spec(&self) -> Option<&PodSpec> { @@ -681,6 +684,8 @@ impl<'a> ClusterResources<'a> { self.delete_orphaned_resources_of_kind::(client), self.delete_orphaned_resources_of_kind::(client), self.delete_orphaned_resources_of_kind::(client), + self.delete_orphaned_resources_of_kind::(client), + self.delete_orphaned_resources_of_kind::(client), )?; Ok(()) diff --git a/crates/stackable-operator/src/crd/mod.rs b/crates/stackable-operator/src/crd/mod.rs index 3beb69aa8..a07d6b632 100644 --- a/crates/stackable-operator/src/crd/mod.rs +++ b/crates/stackable-operator/src/crd/mod.rs @@ -8,6 +8,7 @@ pub mod authentication; pub mod git_sync; pub mod listener; pub mod s3; +pub mod scaler; /// A reference to a product cluster (for example, a `ZookeeperCluster`) /// diff --git a/crates/stackable-operator/src/crd/scaler/builder.rs b/crates/stackable-operator/src/crd/scaler/builder.rs new file mode 100644 index 000000000..268646a0d --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/builder.rs @@ -0,0 +1,241 @@ +//! Builder helper for constructing [`StackableScaler`] objects with proper metadata. +//! +//! Product operators use [`build_scaler`] to create a `StackableScaler` for each +//! auto-scaled role group, ensuring that all required labels are set so that +//! [`ClusterResources::add`](crate::cluster_resources::ClusterResources) validation passes. + +use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; +use snafu::{ResultExt, Snafu}; + +use crate::{ + builder::meta::ObjectMetaBuilder, + kvp::{Label, LabelError, Labels, consts::K8S_APP_MANAGED_BY_KEY}, +}; + +use super::v1alpha1::{StackableScaler, StackableScalerSpec}; + +/// Error returned by [`build_scaler`] and [`build_hpa_from_user_spec`](super::build_hpa_from_user_spec). +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(super)))] +pub enum BuildScalerError { + /// A label value failed validation. + #[snafu(display("failed to construct label for scaler"))] + Label { source: LabelError }, + + /// The metadata builder failed (e.g. missing owner reference fields). + #[snafu(display("failed to build ObjectMeta for scaler"))] + ObjectMeta { source: crate::builder::meta::Error }, +} + +/// Constructs a [`StackableScaler`] with the required labels and owner reference. +/// +/// The generated scaler name follows the convention +/// `{cluster_name}-{role}-{role_group}-scaler`. +/// +/// # Labels +/// +/// The following `app.kubernetes.io` labels are set: +/// +/// | Key | Value | +/// |-----|-------| +/// | `app.kubernetes.io/name` | `app_name` | +/// | `app.kubernetes.io/instance` | `cluster_name` | +/// | `app.kubernetes.io/managed-by` | `managed_by` | +/// | `app.kubernetes.io/component` | `role` | +/// | `app.kubernetes.io/role-group` | `role_group` | +/// +/// # Errors +/// +/// Returns [`BuildScalerError::Label`] if any label value is invalid. +/// Returns [`BuildScalerError::ObjectMeta`] if the owner reference cannot be set. +// `clippy::too_many_arguments` suppressed: these parameters correspond 1:1 to the +// distinct Kubernetes metadata fields required on a StackableScaler. Grouping them +// into a struct would just push the field list one level deeper without reducing +// cognitive load, since callers already have each value as a separate variable. +#[allow(clippy::too_many_arguments)] +pub fn build_scaler( + cluster_name: &str, + app_name: &str, + namespace: &str, + role: &str, + role_group: &str, + initial_replicas: i32, + owner_ref: &OwnerReference, + managed_by: &str, +) -> Result { + let scaler_name = format!("{cluster_name}-{role}-{role_group}-scaler"); + + // Build the label set: name + instance + component + role-group + managed-by + let mut labels = Labels::common(app_name, cluster_name).context(LabelSnafu)?; + labels.insert(Label::component(role).context(LabelSnafu)?); + labels.insert(Label::role_group(role_group).context(LabelSnafu)?); + labels.insert(Label::try_from((K8S_APP_MANAGED_BY_KEY, managed_by)).context(LabelSnafu)?); + + let metadata = ObjectMetaBuilder::new() + .name(&scaler_name) + .namespace(namespace) + .ownerreference(owner_ref.clone()) + .with_labels(labels) + .build(); + + Ok(StackableScaler { + metadata, + spec: StackableScalerSpec { + replicas: initial_replicas, + }, + status: None, + }) +} + +#[cfg(test)] +mod tests { + use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; + + use super::*; + + fn test_owner_ref() -> OwnerReference { + OwnerReference { + api_version: "nifi.stackable.tech/v1alpha1".to_string(), + kind: "NifiCluster".to_string(), + name: "my-nifi".to_string(), + uid: "abc-123".to_string(), + controller: Some(true), + block_owner_deletion: Some(true), + } + } + + #[test] + fn build_scaler_sets_replicas() { + let owner_ref = test_owner_ref(); + let scaler = build_scaler( + "my-nifi", + "nifi", + "default", + "nodes", + "default", + 3, + &owner_ref, + "nifi-operator", + ) + .expect("build_scaler should succeed"); + + assert_eq!(scaler.spec.replicas, 3); + } + + #[test] + fn build_scaler_sets_owner_reference() { + let owner_ref = test_owner_ref(); + let scaler = build_scaler( + "my-nifi", + "nifi", + "default", + "nodes", + "default", + 1, + &owner_ref, + "nifi-operator", + ) + .expect("build_scaler should succeed"); + + let refs = scaler + .metadata + .owner_references + .as_ref() + .expect("owner_references should be set"); + assert_eq!(refs.len(), 1); + assert_eq!(refs[0].name, "my-nifi"); + assert_eq!(refs[0].kind, "NifiCluster"); + assert_eq!(refs[0].uid, "abc-123"); + } + + #[test] + fn build_scaler_sets_required_labels() { + let owner_ref = test_owner_ref(); + let scaler = build_scaler( + "my-nifi", + "nifi", + "default", + "nodes", + "default", + 1, + &owner_ref, + "nifi-operator", + ) + .expect("build_scaler should succeed"); + + let labels = scaler + .metadata + .labels + .as_ref() + .expect("labels should be set"); + + assert_eq!( + labels.get("app.kubernetes.io/name"), + Some(&"nifi".to_string()), + "app.kubernetes.io/name should be the app_name" + ); + assert_eq!( + labels.get("app.kubernetes.io/instance"), + Some(&"my-nifi".to_string()), + "app.kubernetes.io/instance should be the cluster_name" + ); + assert_eq!( + labels.get("app.kubernetes.io/managed-by"), + Some(&"nifi-operator".to_string()), + "app.kubernetes.io/managed-by should be managed_by" + ); + assert_eq!( + labels.get("app.kubernetes.io/component"), + Some(&"nodes".to_string()), + "app.kubernetes.io/component should be the role" + ); + assert_eq!( + labels.get("app.kubernetes.io/role-group"), + Some(&"default".to_string()), + "app.kubernetes.io/role-group should be the role_group" + ); + } + + #[test] + fn build_scaler_generates_correct_name() { + let owner_ref = test_owner_ref(); + let scaler = build_scaler( + "my-nifi", + "nifi", + "production", + "nodes", + "workers", + 5, + &owner_ref, + "nifi-operator", + ) + .expect("build_scaler should succeed"); + + assert_eq!( + scaler.metadata.name.as_deref(), + Some("my-nifi-nodes-workers-scaler") + ); + assert_eq!(scaler.metadata.namespace.as_deref(), Some("production")); + } + + #[test] + fn build_scaler_status_is_none() { + let owner_ref = test_owner_ref(); + let scaler = build_scaler( + "my-nifi", + "nifi", + "default", + "nodes", + "default", + 1, + &owner_ref, + "nifi-operator", + ) + .expect("build_scaler should succeed"); + + assert!( + scaler.status.is_none(), + "status should be None on a newly built scaler" + ); + } +} diff --git a/crates/stackable-operator/src/crd/scaler/cluster_resource_impl.rs b/crates/stackable-operator/src/crd/scaler/cluster_resource_impl.rs new file mode 100644 index 000000000..5feb67d3f --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/cluster_resource_impl.rs @@ -0,0 +1,35 @@ +use k8s_openapi::DeepMerge; + +use super::{ScalerStage, ScalerState, StackableScalerStatus, v1alpha1::StackableScaler}; + +impl DeepMerge for StackableScaler { + fn merge_from(&mut self, other: Self) { + DeepMerge::merge_from(&mut self.metadata, other.metadata); + DeepMerge::merge_from(&mut self.spec.replicas, other.spec.replicas); + DeepMerge::merge_from(&mut self.status, other.status); + } +} + +impl DeepMerge for StackableScalerStatus { + fn merge_from(&mut self, other: Self) { + DeepMerge::merge_from(&mut self.replicas, other.replicas); + DeepMerge::merge_from(&mut self.selector, other.selector); + DeepMerge::merge_from(&mut self.desired_replicas, other.desired_replicas); + DeepMerge::merge_from(&mut self.previous_replicas, other.previous_replicas); + DeepMerge::merge_from(&mut self.current_state, other.current_state); + } +} + +impl DeepMerge for ScalerState { + fn merge_from(&mut self, other: Self) { + DeepMerge::merge_from(&mut self.stage, other.stage); + // `Time` does not implement `DeepMerge`, so we replace directly. + self.last_transition_time = other.last_transition_time; + } +} + +impl DeepMerge for ScalerStage { + fn merge_from(&mut self, other: Self) { + *self = other; + } +} diff --git a/crates/stackable-operator/src/crd/scaler/hooks.rs b/crates/stackable-operator/src/crd/scaler/hooks.rs new file mode 100644 index 000000000..b7881cfb0 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/hooks.rs @@ -0,0 +1,273 @@ +//! Scaling lifecycle hooks for [`StackableScaler`](super::StackableScaler). +//! +//! Operators implement [`ScalingHooks`] to run custom logic at each stage of a scaling +//! operation. Hook methods are called by [`reconcile_scaler`](super::reconcile_scaler) +//! during the appropriate state machine stage. + +use std::future::Future; + +use crate::client::Client; + +use super::FailedStage; + +/// Whether the replica change is an increase or decrease. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ScalingDirection { + /// Replica count is increasing (or unchanged). + Up, + /// Replica count is decreasing. + Down, +} + +impl ScalingDirection { + /// Derive the direction from current and desired replica counts. + /// Equal counts are treated as Up (no-op -- hooks still call Done immediately). + /// + /// # Parameters + /// + /// - `current`: The replica count in `status.replicas`. + /// - `desired`: The target replica count from `spec.replicas`. + pub fn from_replicas(current: i32, desired: i32) -> Self { + if desired >= current { + Self::Up + } else { + Self::Down + } + } +} + +/// Context passed to hook implementations on each reconcile invocation. +#[derive(Clone, Copy)] +pub struct ScalingContext<'a> { + /// Kubernetes client for API calls. + pub client: &'a Client, + /// Namespace of the StackableScaler (and its cluster). + pub namespace: &'a str, + /// Name of the role group being scaled (e.g. `"default"`). + pub role_group_name: &'a str, + /// The replica count before the current scaling operation started. + /// During `PreScaling` this equals `status.replicas`. During `Scaling` + /// and `PostScaling` it reflects the frozen `status.previous_replicas`, + /// so direction and ordinal calculations remain correct even after + /// `status.replicas` has been updated to the target value. + pub current_replicas: i32, + /// The replica count the operator is working towards. + pub desired_replicas: i32, + /// Whether this is a scale-up or scale-down — derived by operator-rs, + /// so the operator does not need to compare replica counts itself. + pub direction: ScalingDirection, +} + +impl ScalingContext<'_> { + /// Returns the StatefulSet pod ordinals that are being removed in a scale-down. + /// + /// For scale-up or no-op, returns an empty range. + /// For scale-down, returns `desired_replicas..current_replicas` — the ordinals + /// of pods that will be terminated once the StatefulSet is scaled. + pub fn removed_ordinals(&self) -> std::ops::Range { + if self.direction == ScalingDirection::Down { + self.desired_replicas..self.current_replicas + } else { + 0..0 + } + } + + /// Returns the StatefulSet pod ordinals that are being added in a scale-up. + /// + /// For scale-down or no-op, returns an empty range. + /// For scale-up, returns `current_replicas..desired_replicas`. + pub fn added_ordinals(&self) -> std::ops::Range { + if self.direction == ScalingDirection::Up && self.desired_replicas > self.current_replicas { + self.current_replicas..self.desired_replicas + } else { + 0..0 + } + } + + /// Whether this is a scale-down operation. + pub fn is_scale_down(&self) -> bool { + self.direction == ScalingDirection::Down + } + + /// Whether this is a scale-up operation (not a no-op where current == desired). + pub fn is_scale_up(&self) -> bool { + self.direction == ScalingDirection::Up && self.desired_replicas > self.current_replicas + } +} + +/// Return value from a hook invocation. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum HookOutcome { + /// Hook completed successfully — advance state machine to next stage. + Done, + /// Hook is still running — operator-rs will requeue and re-call on next reconcile. + InProgress, +} + +/// Condition information returned from `reconcile_scaler` for propagation to the cluster CR. +/// +/// The operator MUST apply this condition to its cluster resource status on every call +/// to `reconcile_scaler`. Using the return type enforces this at the call site. +#[derive(Debug)] +pub enum ScalingCondition { + /// No scaling in progress, or just completed successfully. + Healthy, + /// Scaling is actively in progress. + Progressing { + /// Human-readable name of the current [`ScalerStage`](super::ScalerStage). + stage: String, + }, + /// Scaling failed -- include details in the cluster CR condition message. + Failed { + /// Which stage failed. + stage: FailedStage, + /// The error message from the hook. + reason: String, + }, +} + +/// Result returned from `reconcile_scaler`. The operator MUST propagate `scaling_condition` +/// to the cluster CR status conditions on every reconcile call. +pub struct ScalingResult { + /// The `Action` to return from the operator's reconcile function for this role group. + pub action: kube::runtime::controller::Action, + /// Condition to merge into the cluster CR's `status.conditions`. + pub scaling_condition: ScalingCondition, +} + +/// Trait implemented by each product operator to provide scaling lifecycle hooks. +/// +/// All methods have default implementations that return `Done` immediately, so operators +/// only need to override the specific hooks they use. +/// +/// # Example +/// +/// ```rust,ignore +/// impl ScalingHooks for MyProductScalingHooks { +/// type Error = MyError; +/// +/// async fn pre_scale(&self, ctx: &ScalingContext<'_>) -> Result { +/// match ctx.direction { +/// ScalingDirection::Down => { +/// JobTracker::start_or_check(ctx.client, self.build_offload_job(ctx), ctx.namespace).await +/// } +/// ScalingDirection::Up => Ok(HookOutcome::Done), +/// } +/// } +/// } +/// ``` +pub trait ScalingHooks { + /// Error type returned by hook methods. + type Error: std::error::Error + Send + Sync + 'static; + + /// Called during the `PreScaling` stage on each reconcile. + /// Return `Done` to advance to `Scaling`, `InProgress` to requeue and re-check. + /// Return `Err` to transition to `Failed`. + fn pre_scale( + &self, + ctx: &ScalingContext<'_>, + ) -> impl Future> + Send { + let _ = ctx; + async { Ok(HookOutcome::Done) } + } + + /// Called during the `PostScaling` stage on each reconcile. + /// Return `Done` to return to `Idle`, `InProgress` to requeue and re-check. + /// Return `Err` to transition to `Failed`. + fn post_scale( + &self, + ctx: &ScalingContext<'_>, + ) -> impl Future> + Send { + let _ = ctx; + async { Ok(HookOutcome::Done) } + } + + /// Called when the state machine enters `Failed`. Best-effort cleanup. + /// + /// Errors returned here are logged at `warn` level but do not prevent the + /// `Failed` state from being written. + fn on_failure( + &self, + ctx: &ScalingContext<'_>, + failed_stage: &FailedStage, + ) -> impl Future> + Send { + let _ = (ctx, failed_stage); + async { Ok(()) } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn direction_scale_up() { + assert_eq!(ScalingDirection::from_replicas(3, 5), ScalingDirection::Up); + } + + #[test] + fn direction_scale_down() { + assert_eq!( + ScalingDirection::from_replicas(5, 3), + ScalingDirection::Down + ); + } + + #[test] + fn direction_equal_is_up() { + assert_eq!(ScalingDirection::from_replicas(3, 3), ScalingDirection::Up); + } + + // Helper methods are tested indirectly via ScalingDirection since they + // only depend on `direction`, `current_replicas`, and `desired_replicas`. + // ScalingContext requires a &Client reference, so we test the logic + // through the direction + replica helpers independently. + + #[test] + fn removed_ordinals_scale_down() { + let dir = ScalingDirection::Down; + let (current, desired) = (5, 3); + let range = if dir == ScalingDirection::Down { + desired..current + } else { + 0..0 + }; + assert_eq!(range.collect::>(), vec![3, 4]); + } + + #[test] + fn removed_ordinals_scale_up_is_empty() { + let dir = ScalingDirection::from_replicas(3, 5); + let (current, desired) = (3, 5); + let range = if dir == ScalingDirection::Down { + desired..current + } else { + 0..0 + }; + assert!(range.collect::>().is_empty()); + } + + #[test] + fn added_ordinals_scale_up() { + let dir = ScalingDirection::from_replicas(3, 5); + let (current, desired) = (3, 5); + let range = if dir == ScalingDirection::Up && desired > current { + current..desired + } else { + 0..0 + }; + assert_eq!(range.collect::>(), vec![3, 4]); + } + + #[test] + fn added_ordinals_equal_is_empty() { + let dir = ScalingDirection::from_replicas(3, 3); + let (current, desired) = (3, 3); + let range = if dir == ScalingDirection::Up && desired > current { + current..desired + } else { + 0..0 + }; + assert!(range.collect::>().is_empty()); + } +} diff --git a/crates/stackable-operator/src/crd/scaler/hpa_builder.rs b/crates/stackable-operator/src/crd/scaler/hpa_builder.rs new file mode 100644 index 000000000..ee6004905 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/hpa_builder.rs @@ -0,0 +1,338 @@ +//! Builder helpers for constructing `HorizontalPodAutoscaler` objects and initializing +//! [`StackableScaler`] status. +//! +//! Product operators use [`build_hpa_from_user_spec`] to create an HPA whose +//! `scaleTargetRef` always points at the correct [`StackableScaler`], and +//! [`initialize_scaler_status`] to seed the scaler's status subresource so that +//! the first reconcile does not see `replicas: 0` and trigger an unintended scale-to-zero. + +use k8s_openapi::api::autoscaling::v2::{ + CrossVersionObjectReference, HorizontalPodAutoscaler, HorizontalPodAutoscalerSpec, +}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::{OwnerReference, Time}; +use k8s_openapi::jiff::Timestamp; +use snafu::{ResultExt, Snafu}; + +use crate::builder::meta::ObjectMetaBuilder; +use crate::client::Client; +use crate::kvp::{Label, Labels, consts::K8S_APP_MANAGED_BY_KEY}; + +use super::builder::BuildScalerError; +use super::v1alpha1::StackableScaler; +use super::{ScalerStage, ScalerState, StackableScalerStatus}; + +/// Errors returned by [`initialize_scaler_status`]. +#[derive(Debug, Snafu)] +pub enum InitializeStatusError { + /// The Kubernetes status patch for the [`StackableScaler`] failed. + #[snafu(display("failed to patch initial StackableScaler status"))] + PatchStatus { + #[snafu(source(from(crate::client::Error, Box::new)))] + source: Box, + }, +} + +/// Build a [`CrossVersionObjectReference`] that points at a [`StackableScaler`]. +/// +/// The returned reference is suitable for use as the `scaleTargetRef` of a +/// `HorizontalPodAutoscaler`. +/// +/// # Parameters +/// +/// - `scaler_name`: The `metadata.name` of the target [`StackableScaler`]. +/// - `group`: The API group (e.g. `"autoscaling.stackable.tech"`). +/// - `version`: The API version (e.g. `"v1alpha1"`). +pub fn scale_target_ref( + scaler_name: &str, + group: &str, + version: &str, +) -> CrossVersionObjectReference { + CrossVersionObjectReference { + kind: "StackableScaler".to_string(), + name: scaler_name.to_string(), + api_version: Some(format!("{group}/{version}")), + } +} + +/// Build a [`HorizontalPodAutoscaler`] from a user-provided spec, overwriting +/// `scaleTargetRef` so it always points at the correct [`StackableScaler`]. +/// +/// The generated HPA name follows the convention +/// `{cluster_name}-{role}-{role_group}-hpa`. +/// +/// # Labels +/// +/// The same five `app.kubernetes.io` labels used by [`build_scaler`](super::build_scaler) +/// are applied: +/// +/// | Key | Value | +/// |-----|-------| +/// | `app.kubernetes.io/name` | `app_name` | +/// | `app.kubernetes.io/instance` | `cluster_name` | +/// | `app.kubernetes.io/managed-by` | `managed_by` | +/// | `app.kubernetes.io/component` | `role` | +/// | `app.kubernetes.io/role-group` | `role_group` | +/// +/// # Errors +/// +/// Returns [`BuildScalerError::Label`] if any label value is invalid. +/// Returns [`BuildScalerError::ObjectMeta`] if the owner reference cannot be set. +// `clippy::too_many_arguments` suppressed: these parameters correspond 1:1 to the +// distinct Kubernetes metadata fields required on an HPA. Grouping them into a struct +// would just push the field list one level deeper without reducing cognitive load, +// since callers already have each value as a separate variable. +#[allow(clippy::too_many_arguments)] +pub fn build_hpa_from_user_spec( + user_spec: &HorizontalPodAutoscalerSpec, + target_ref: &CrossVersionObjectReference, + cluster_name: &str, + app_name: &str, + namespace: &str, + role: &str, + role_group: &str, + owner_ref: &OwnerReference, + managed_by: &str, +) -> Result { + let hpa_name = format!("{cluster_name}-{role}-{role_group}-hpa"); + + let mut labels = Labels::common(app_name, cluster_name).context(super::builder::LabelSnafu)?; + labels.insert(Label::component(role).context(super::builder::LabelSnafu)?); + labels.insert(Label::role_group(role_group).context(super::builder::LabelSnafu)?); + labels.insert( + Label::try_from((K8S_APP_MANAGED_BY_KEY, managed_by)) + .context(super::builder::LabelSnafu)?, + ); + + let metadata = ObjectMetaBuilder::new() + .name(&hpa_name) + .namespace(namespace) + .ownerreference(owner_ref.clone()) + .with_labels(labels) + .build(); + + let mut spec = user_spec.clone(); + spec.scale_target_ref = target_ref.clone(); + + Ok(HorizontalPodAutoscaler { + metadata, + spec: Some(spec), + status: None, + }) +} + +/// Patch a freshly created [`StackableScaler`]'s status to prevent scale-to-zero. +/// +/// When a scaler is first created it has no status. Without this initialization, +/// reading `status.replicas` would yield `0`, causing the StatefulSet to scale down +/// to zero pods. This function seeds the status with the current replica count and +/// an `Idle` stage so that the first reconcile sees the correct baseline. +/// +/// # Parameters +/// +/// - `client`: Kubernetes client for the status patch operation. +/// - `scaler`: The freshly created [`StackableScaler`] resource. +/// - `current_replicas`: The current replica count of the managed StatefulSet. +/// - `selector`: Pod label selector string for HPA pod counting (e.g. +/// `"app=myproduct,roleGroup=default"`). +/// +/// # Errors +/// +/// Returns [`InitializeStatusError::PatchStatus`] if the Kubernetes status patch fails. +pub async fn initialize_scaler_status( + client: &Client, + scaler: &StackableScaler, + current_replicas: i32, + selector: &str, +) -> Result<(), InitializeStatusError> { + let status = StackableScalerStatus { + replicas: current_replicas, + selector: Some(selector.to_string()), + desired_replicas: Some(current_replicas), + previous_replicas: None, + current_state: Some(ScalerState { + stage: ScalerStage::Idle, + last_transition_time: Time(Timestamp::now()), + }), + }; + + client + .apply_patch_status("stackable-operator", scaler, &status) + .await + .map(|_| ()) + .context(PatchStatusSnafu) +} + +#[cfg(test)] +mod tests { + use k8s_openapi::api::autoscaling::v2::{ + CrossVersionObjectReference, HorizontalPodAutoscalerSpec, + }; + use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; + + use super::*; + + fn test_owner_ref() -> OwnerReference { + OwnerReference { + api_version: "nifi.stackable.tech/v1alpha1".to_string(), + kind: "NifiCluster".to_string(), + name: "my-nifi".to_string(), + uid: "abc-123".to_string(), + controller: Some(true), + block_owner_deletion: Some(true), + } + } + + fn test_target_ref() -> CrossVersionObjectReference { + scale_target_ref( + "my-nifi-nodes-default-scaler", + "autoscaling.stackable.tech", + "v1alpha1", + ) + } + + #[test] + fn scale_target_ref_points_to_scaler() { + let target = scale_target_ref( + "my-nifi-nodes-default-scaler", + "autoscaling.stackable.tech", + "v1alpha1", + ); + + assert_eq!(target.kind, "StackableScaler"); + assert_eq!(target.name, "my-nifi-nodes-default-scaler"); + assert_eq!( + target.api_version.as_deref(), + Some("autoscaling.stackable.tech/v1alpha1") + ); + } + + #[test] + fn build_hpa_overwrites_scale_target_ref() { + let wrong_ref = CrossVersionObjectReference { + kind: "Deployment".to_string(), + name: "wrong-target".to_string(), + api_version: Some("apps/v1".to_string()), + }; + let user_spec = HorizontalPodAutoscalerSpec { + max_replicas: 10, + scale_target_ref: wrong_ref, + ..Default::default() + }; + let correct_ref = test_target_ref(); + + let hpa = build_hpa_from_user_spec( + &user_spec, + &correct_ref, + "my-nifi", + "nifi", + "default", + "nodes", + "default", + &test_owner_ref(), + "nifi-operator", + ) + .expect("build_hpa_from_user_spec should succeed"); + + let spec = hpa.spec.expect("spec should be set"); + assert_eq!(spec.scale_target_ref.kind, "StackableScaler"); + assert_eq!(spec.scale_target_ref.name, "my-nifi-nodes-default-scaler"); + assert_eq!( + spec.scale_target_ref.api_version.as_deref(), + Some("autoscaling.stackable.tech/v1alpha1") + ); + // Original max_replicas should be preserved. + assert_eq!(spec.max_replicas, 10); + } + + #[test] + fn build_hpa_sets_required_labels() { + let user_spec = HorizontalPodAutoscalerSpec { + max_replicas: 5, + scale_target_ref: CrossVersionObjectReference { + kind: "Deployment".to_string(), + name: "placeholder".to_string(), + api_version: None, + }, + ..Default::default() + }; + let target_ref = test_target_ref(); + + let hpa = build_hpa_from_user_spec( + &user_spec, + &target_ref, + "my-nifi", + "nifi", + "production", + "nodes", + "workers", + &test_owner_ref(), + "nifi-operator", + ) + .expect("build_hpa_from_user_spec should succeed"); + + let labels = hpa.metadata.labels.as_ref().expect("labels should be set"); + + assert_eq!( + labels.get("app.kubernetes.io/name"), + Some(&"nifi".to_string()), + "app.kubernetes.io/name should be the app_name" + ); + assert_eq!( + labels.get("app.kubernetes.io/instance"), + Some(&"my-nifi".to_string()), + "app.kubernetes.io/instance should be the cluster_name" + ); + assert_eq!( + labels.get("app.kubernetes.io/managed-by"), + Some(&"nifi-operator".to_string()), + "app.kubernetes.io/managed-by should be managed_by" + ); + assert_eq!( + labels.get("app.kubernetes.io/component"), + Some(&"nodes".to_string()), + "app.kubernetes.io/component should be the role" + ); + assert_eq!( + labels.get("app.kubernetes.io/role-group"), + Some(&"workers".to_string()), + "app.kubernetes.io/role-group should be the role_group" + ); + } + + #[test] + fn build_hpa_generates_correct_name() { + let user_spec = HorizontalPodAutoscalerSpec { + max_replicas: 5, + scale_target_ref: CrossVersionObjectReference { + kind: "Deployment".to_string(), + name: "placeholder".to_string(), + api_version: None, + }, + ..Default::default() + }; + let target_ref = test_target_ref(); + + let hpa = build_hpa_from_user_spec( + &user_spec, + &target_ref, + "my-nifi", + "nifi", + "production", + "nodes", + "workers", + &test_owner_ref(), + "nifi-operator", + ) + .expect("build_hpa_from_user_spec should succeed"); + + assert_eq!( + hpa.metadata.name.as_deref(), + Some("my-nifi-nodes-workers-hpa") + ); + assert_eq!(hpa.metadata.namespace.as_deref(), Some("production")); + } + + // `initialize_scaler_status` requires a running Kubernetes cluster and is not + // unit-testable. It is tested indirectly via the scaler reconciler integration + // tests and the NiFi operator end-to-end tests. +} diff --git a/crates/stackable-operator/src/crd/scaler/job_tracker.rs b/crates/stackable-operator/src/crd/scaler/job_tracker.rs new file mode 100644 index 000000000..e142d1367 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/job_tracker.rs @@ -0,0 +1,161 @@ +//! Kubernetes Job lifecycle management for scaling hooks. +//! +//! [`JobTracker`] provides an idempotent mechanism for running a Kubernetes `Job` as part +//! of a scaling hook. It uses server-side apply to create the Job if absent, then checks +//! completion status on each reconcile. +//! +//! [`job_name`] produces deterministic, DNS-safe Job names so the tracker can find the +//! same Job across reconcile calls without external state. + +use k8s_openapi::api::batch::v1::Job; +use kube::ResourceExt; +use snafu::{ResultExt, Snafu}; +use tracing::{debug, info, warn}; + +use crate::{client::Client, crd::scaler::hooks::HookOutcome}; + +/// Derive a stable, DNS-safe Job name from a scaler name and a stage label. +/// +/// The name is deterministic so `JobTracker` can find the same Job on requeue +/// without storing state externally. +/// +/// # Parameters +/// +/// - `scaler_name`: Name of the [`StackableScaler`](super::StackableScaler) resource. +/// - `stage`: A short label like `"pre-scale"` or `"post-scale"`. +/// +/// # Returns +/// +/// A DNS-safe string of at most 63 characters with no trailing hyphens. +pub fn job_name(scaler_name: &str, stage: &str) -> String { + let raw = format!("{scaler_name}-{stage}"); + // Kubernetes resource names must be <= 63 chars and valid DNS labels. + // Inputs are always ASCII (Kubernetes names), so byte-level truncation is safe. + let truncated = &raw[..raw.len().min(63)]; + truncated.trim_end_matches('-').to_string() +} + +/// Errors from managing a scaling hook [`Job`]. +#[derive(Debug, Snafu)] +pub enum JobTrackerError { + /// The server-side apply patch to create or update the Job failed. + #[snafu(display("failed to apply Job '{job_name}'"))] + ApplyJob { + #[snafu(source(from(crate::client::Error, Box::new)))] + source: Box, + /// The Kubernetes name of the Job that could not be applied. + job_name: String, + }, + /// The Job completed with one or more failed attempts. + #[snafu(display("Job '{job_name}' failed: {message}"))] + JobFailed { + /// The Kubernetes name of the failed Job. + job_name: String, + /// Diagnostic message including the failure count and a `kubectl logs` hint. + message: String, + }, +} + +/// Manages the lifecycle of a Kubernetes [`Job`] used as a scaling hook. +/// +/// Stateless -- the Job name is derived deterministically via [`job_name`], +/// so no persistent state is needed between reconciles. +pub struct JobTracker; + +impl JobTracker { + /// Ensures the Job exists (creates if absent via server-side apply), then checks completion. + /// + /// Returns: + /// - `Ok(HookOutcome::Done)` — Job succeeded; Job is deleted as cleanup. + /// - `Ok(HookOutcome::InProgress)` — Job is still running; requeue and re-call. + /// - `Err(JobTrackerError::JobFailed)` -- Job failed; caller should transition to `Failed`. + /// + /// # Parameters + /// + /// - `client`: Kubernetes client for server-side apply, get, and delete operations. + /// - `job`: The fully-constructed [`Job`] manifest. Its `.metadata.name` is used to + /// track it across reconcile calls -- use [`job_name`] to generate a stable name. + /// - `namespace`: The namespace in which to manage the Job. + pub async fn start_or_check( + client: &Client, + job: Job, + namespace: &str, + ) -> Result { + let name = job.name_any(); + + debug!(job = %name, namespace, "Applying hook Job (server-side apply)"); + + // Apply (server-side apply — idempotent; no-op if Job already exists). + // The response contains the full updated resource, so no separate GET is needed. + let current: Job = client + .apply_patch("stackable-operator", &job, &job) + .await + .context(ApplyJobSnafu { + job_name: name.clone(), + })?; + + let status = current.status.as_ref(); + let succeeded = status.and_then(|s| s.succeeded).unwrap_or(0); + let failed = status.and_then(|s| s.failed).unwrap_or(0); + + if succeeded > 0 { + info!(job = %name, namespace, "Hook Job completed successfully, cleaning up"); + // Best-effort cleanup — log errors but don't fail + if let Err(e) = client.delete(¤t).await { + warn!(job = %name, namespace, error = %e, "Failed to clean up completed Job — it will be retried on next reconcile"); + } + return Ok(HookOutcome::Done); + } + + if failed > 0 { + return Err(JobTrackerError::JobFailed { + job_name: name.clone(), + message: format!( + "{failed} attempt(s) failed — check pod logs with: kubectl logs -l job-name={name} -n {namespace}" + ), + }); + } + + debug!(job = %name, namespace, "Hook Job still running"); + Ok(HookOutcome::InProgress) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn job_name_is_stable_for_same_inputs() { + let name1 = job_name("my-scaler", "pre-scale"); + let name2 = job_name("my-scaler", "pre-scale"); + assert_eq!(name1, name2); + } + + #[test] + fn job_name_differs_for_different_stages() { + let pre = job_name("my-scaler", "pre-scale"); + let post = job_name("my-scaler", "post-scale"); + assert_ne!(pre, post); + } + + #[test] + fn job_name_max_63_chars() { + let long_name = "a".repeat(60); + let name = job_name(&long_name, "pre-scale"); + assert!(name.len() <= 63, "name too long: {}", name.len()); + } + + #[test] + fn job_name_no_trailing_hyphen() { + // If truncation lands on a hyphen, it should be stripped + let name = job_name("a".repeat(62).as_str(), "-x"); + assert!(!name.ends_with('-')); + } + + #[test] + fn job_name_format() { + let name = job_name("my-cluster-default", "pre-scale"); + assert_eq!(name, "my-cluster-default-pre-scale"); + } +} diff --git a/crates/stackable-operator/src/crd/scaler/mod.rs b/crates/stackable-operator/src/crd/scaler/mod.rs new file mode 100644 index 000000000..578b97483 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/mod.rs @@ -0,0 +1,335 @@ +//! Stackable scaler CRD and reconciliation framework. +//! +//! This module provides [`StackableScaler`], a Kubernetes custom resource that exposes a +//! `/scale` subresource so that a `HorizontalPodAutoscaler` can manage replica counts for +//! Stackable cluster role groups instead of targeting a `StatefulSet` directly. +//! +//! # State machine +//! +//! A [`StackableScaler`] progresses through stages tracked in [`ScalerStage`]: +//! +//! ```text +//! Idle → PreScaling → Scaling → PostScaling → Idle +//! ↘ ↘ ↘ +//! Failed +//! ``` +//! +//! Operators provide lifecycle hooks via the [`ScalingHooks`] trait and call +//! [`reconcile_scaler`] on every reconcile loop iteration for the relevant role group. + +use std::borrow::Cow; + +use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::versioned::versioned; + +mod builder; +mod cluster_resource_impl; +pub mod hooks; +mod hpa_builder; +pub mod job_tracker; +pub mod reconciler; +mod replicas_config; + +/// Annotation key that triggers recovery from the [`ScalerStage::Failed`] state. +/// +/// When the operator sees this annotation with value `"true"` on a [`StackableScaler`], +/// it strips the annotation and resets the stage to [`ScalerStage::Idle`]. +/// +/// ```bash +/// kubectl annotate stackablescaler autoscaling.stackable.tech/retry=true +/// ``` +pub const RETRY_ANNOTATION: &str = "autoscaling.stackable.tech/retry"; + +/// Which stage of a scaling operation failed. +#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum FailedStage { + /// The [`ScalingHooks::pre_scale`] hook returned an error. + PreScaling, + /// The StatefulSet failed to reach the desired replica count. + Scaling, + /// The [`ScalingHooks::post_scale`] hook returned an error. + PostScaling, +} + +/// The current stage of the scaling state machine. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(tag = "stage", content = "details", rename_all = "camelCase")] +pub enum ScalerStage { + /// No scaling operation is in progress. + Idle, + /// Running the [`ScalingHooks::pre_scale`] hook (e.g. data offload). + PreScaling, + /// Waiting for the StatefulSet to converge to the new replica count. + Scaling, + /// Running the [`ScalingHooks::post_scale`] hook (e.g. cluster rebalance). + PostScaling, + /// A hook returned an error. The scaler stays here until the user applies the + /// [`RETRY_ANNOTATION`] to trigger a reset to [`Idle`](Self::Idle). + Failed { + /// Which stage produced the error. + #[serde(rename = "failedAt")] + failed_at: FailedStage, + /// Human-readable error message from the hook. + reason: String, + }, +} + +impl ScalerStage { + /// Returns `true` when a scaling operation is actively running + /// (`PreScaling`, `Scaling`, or `PostScaling`). + /// + /// `Idle` and `Failed` are not considered active — the HPA is + /// free to write `spec.replicas` in those states. + pub fn is_scaling_in_progress(&self) -> bool { + matches!(self, Self::PreScaling | Self::Scaling | Self::PostScaling) + } +} + +/// Formats the stage name for logging and status messages. +/// +/// The `Failed` variant only includes the failed stage, not the full reason string, +/// to keep log messages concise. +impl std::fmt::Display for ScalerStage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Idle => write!(f, "Idle"), + Self::PreScaling => write!(f, "PreScaling"), + Self::Scaling => write!(f, "Scaling"), + Self::PostScaling => write!(f, "PostScaling"), + Self::Failed { failed_at, .. } => write!(f, "Failed({failed_at:?})"), + } + } +} + +/// Manual [`JsonSchema`] implementation because `schemars` does not support the +/// `#[serde(tag = "stage", content = "details")]` internally-tagged representation +/// used by this enum. +impl JsonSchema for ScalerStage { + fn schema_name() -> Cow<'static, str> { + "ScalerStage".into() + } + + fn json_schema(generator: &mut schemars::generate::SchemaGenerator) -> schemars::Schema { + schemars::json_schema!({ + "type": "object", + "required": ["stage"], + "properties": { + "stage": { + "type": "string", + "enum": ["idle", "preScaling", "scaling", "postScaling", "failed"] + }, + "details": { + "type": "object", + "properties": { + "failedAt": generator.subschema_for::(), + "reason": { "type": "string" } + } + } + } + }) + } +} + +/// The current state of the scaler, including when it last changed. +#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ScalerState { + /// The current stage of the scaler state machine. + pub stage: ScalerStage, + /// When this stage was entered. + pub last_transition_time: Time, +} + +/// Status of a StackableScaler. +#[derive(Clone, Debug, Default, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct StackableScalerStatus { + /// The replica count currently targeted by the managed StatefulSet. Exposed via the `/scale` subresource for HPA consumption. + pub replicas: i32, + /// Label selector string for HPA pod counting. Written at `.status.selector`. + #[serde(skip_serializing_if = "Option::is_none")] + pub selector: Option, + /// The target replica count for the in-progress scaling operation. `None` when idle. + #[serde(skip_serializing_if = "Option::is_none")] + pub desired_replicas: Option, + /// The replica count when the current scaling operation started. `None` when idle. + /// Used to derive [`ScalingDirection`] correctly across all stages, because + /// `status.replicas` is overwritten to the target value during the `Scaling` stage. + #[serde(skip_serializing_if = "Option::is_none")] + pub previous_replicas: Option, + /// The current state machine stage and its transition timestamp. + #[serde(skip_serializing_if = "Option::is_none")] + pub current_state: Option, +} + +#[versioned( + version(name = "v1alpha1"), + crates( + kube_core = "kube::core", + k8s_openapi = "k8s_openapi", + schemars = "schemars", + ) +)] +pub mod versioned { + /// A StackableScaler exposes a /scale subresource so that a Kubernetes + /// HorizontalPodAutoscaler can target it instead of a StatefulSet directly. + #[versioned(crd( + group = "autoscaling.stackable.tech", + kind = "StackableScaler", + namespaced, + status = "StackableScalerStatus", + scale( + spec_replicas_path = ".spec.replicas", + status_replicas_path = ".status.replicas", + label_selector_path = ".status.selector" + ) + ))] + #[derive(Clone, CustomResource, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct StackableScalerSpec { + /// Desired replica count. Written by the HPA via the /scale subresource. + /// Only takes effect when the referenced roleGroup has `replicas: 0`. + pub replicas: i32, + } +} + +/// Resolve the replica count for a StatefulSet, taking an optional +/// [`v1alpha1::StackableScaler`] into account. +/// +/// A scaler is only effective when `role_group_replicas` is `Some(0)` — this is the platform +/// convention that signals "externally managed replicas". In all other cases the role group +/// value is used unchanged. +/// +/// Always call this instead of reading `role_group.replicas` directly when building a StatefulSet, +/// to ensure scaler-managed role groups are handled consistently. +/// +/// # Parameters +/// +/// - `role_group_replicas`: The replica count from the role group config. `Some(0)` signals +/// externally-managed replicas (the scaler's value is used). Any other value is returned unchanged. +/// - `scaler`: The [`v1alpha1::StackableScaler`] for this role group, if one exists. Only consulted +/// when `role_group_replicas` is `Some(0)`. +/// +/// # Returns +/// +/// The effective replica count, or `None` if the scaler has no status yet. +pub fn resolve_replicas( + role_group_replicas: Option, + scaler: Option<&v1alpha1::StackableScaler>, +) -> Option { + match (role_group_replicas, scaler) { + (Some(0), Some(s)) => s.status.as_ref().map(|st| st.replicas), + (replicas, _) => replicas, + } +} + +pub use builder::{BuildScalerError, build_scaler}; +pub use hooks::{ + HookOutcome, ScalingCondition, ScalingContext, ScalingDirection, ScalingHooks, ScalingResult, +}; +pub use hpa_builder::{ + InitializeStatusError, build_hpa_from_user_spec, initialize_scaler_status, scale_target_ref, +}; +pub use job_tracker::{JobTracker, JobTrackerError, job_name}; +pub use reconciler::{Error as ReconcilerError, reconcile_scaler}; +pub use replicas_config::{ + AutoConfig, HpaConfig, ReplicasConfig, ValidationError as ReplicasValidationError, +}; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn is_scaling_in_progress_true_for_active_stages() { + assert!(ScalerStage::PreScaling.is_scaling_in_progress()); + assert!(ScalerStage::Scaling.is_scaling_in_progress()); + assert!(ScalerStage::PostScaling.is_scaling_in_progress()); + } + + #[test] + fn is_scaling_in_progress_false_for_idle_and_failed() { + assert!(!ScalerStage::Idle.is_scaling_in_progress()); + assert!( + !ScalerStage::Failed { + failed_at: FailedStage::PreScaling, + reason: "err".to_string(), + } + .is_scaling_in_progress() + ); + } + + #[test] + fn scaler_stage_idle_serializes() { + let stage = ScalerStage::Idle; + let json = serde_json::to_value(&stage).unwrap(); + assert_eq!(json["stage"], "idle"); + } + + #[test] + fn scaler_stage_failed_serializes() { + let stage = ScalerStage::Failed { + failed_at: FailedStage::PreScaling, + reason: "timeout".to_string(), + }; + let json = serde_json::to_value(&stage).unwrap(); + assert_eq!(json["stage"], "failed"); + assert_eq!(json["details"]["failedAt"], "preScaling"); + assert_eq!(json["details"]["reason"], "timeout"); + } + + #[test] + fn spec_round_trips() { + let spec = v1alpha1::StackableScalerSpec { replicas: 3 }; + let json = serde_json::to_string(&spec).unwrap(); + let back: v1alpha1::StackableScalerSpec = serde_json::from_str(&json).unwrap(); + assert_eq!(spec, back); + } + + #[test] + fn resolve_replicas_no_scaler_uses_role_group() { + assert_eq!(resolve_replicas(Some(3), None), Some(3)); + } + + #[test] + fn resolve_replicas_none_role_group_no_scaler() { + assert_eq!(resolve_replicas(None, None), None); + } + + #[test] + fn resolve_replicas_zero_with_scaler_uses_status() { + let mut scaler = + v1alpha1::StackableScaler::new("test", v1alpha1::StackableScalerSpec { replicas: 5 }); + scaler.status = Some(StackableScalerStatus { + replicas: 3, + ..Default::default() + }); + assert_eq!(resolve_replicas(Some(0), Some(&scaler)), Some(3)); + } + + #[test] + fn resolve_replicas_nonzero_with_scaler_ignores_scaler() { + // role_group.replicas != 0 → scaler is not active (validation webhook should prevent this, + // but we defensively fall back to the role group value) + let mut scaler = + v1alpha1::StackableScaler::new("test", v1alpha1::StackableScalerSpec { replicas: 5 }); + scaler.status = Some(StackableScalerStatus { + replicas: 4, + ..Default::default() + }); + assert_eq!(resolve_replicas(Some(3), Some(&scaler)), Some(3)); + } + + #[test] + fn resolve_replicas_zero_scaler_no_status_returns_none() { + // Scaler exists but has no status yet (just created) → return None (don't set replicas) + let scaler = + v1alpha1::StackableScaler::new("test", v1alpha1::StackableScalerSpec { replicas: 5 }); + assert_eq!(resolve_replicas(Some(0), Some(&scaler)), None); + } +} diff --git a/crates/stackable-operator/src/crd/scaler/reconciler.rs b/crates/stackable-operator/src/crd/scaler/reconciler.rs new file mode 100644 index 000000000..c6f699c42 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/reconciler.rs @@ -0,0 +1,645 @@ +//! Reconciler for [`StackableScaler`](super::StackableScaler) resources. +//! +//! The public entry point is [`reconcile_scaler`]. Operators call this on every reconcile +//! for a role group, and it drives the [`ScalerStage`](super::ScalerStage) state machine, +//! invokes [`ScalingHooks`] at the appropriate stages, and patches the scaler's status. + +use std::time::Duration; + +use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; +use k8s_openapi::jiff::Timestamp; +use kube::runtime::controller::Action; +use snafu::{OptionExt, ResultExt, Snafu}; +use tracing::{debug, info, warn}; + +use crate::client::Client; +use crate::crd::scaler::hooks::{ + HookOutcome, ScalingCondition, ScalingContext, ScalingDirection, ScalingHooks, ScalingResult, +}; +use crate::crd::scaler::{ + FailedStage, RETRY_ANNOTATION, ScalerStage, ScalerState, StackableScalerStatus, + v1alpha1::StackableScaler, +}; + +/// Requeue interval when a hook returns [`HookOutcome::InProgress`]. +const REQUEUE_HOOK_IN_PROGRESS: Duration = Duration::from_secs(10); +/// Requeue interval while waiting for the StatefulSet to converge. +const REQUEUE_SCALING: Duration = Duration::from_secs(5); + +/// Errors returned by [`reconcile_scaler`]. +#[derive(Debug, Snafu)] +pub enum Error { + /// The Kubernetes status patch for the [`StackableScaler`] failed. + #[snafu(display("failed to patch StackableScaler status"))] + PatchStatus { + #[snafu(source(from(crate::client::Error, Box::new)))] + source: Box, + }, + /// Removing the retry annotation from the [`StackableScaler`] failed. + #[snafu(display("failed to remove retry annotation from StackableScaler"))] + RemoveRetryAnnotation { + #[snafu(source(from(crate::client::Error, Box::new)))] + source: Box, + }, + /// The [`StackableScaler`] is missing `.metadata.namespace`. + #[snafu(display("StackableScaler object is missing namespace"))] + ObjectHasNoNamespace, +} + +/// Compute the next state machine step from the current stage and hook/stability inputs. +/// +/// Hook outcomes are passed as closures so this function stays synchronous and +/// unit-testable without async infrastructure. +/// +/// # Parameters +/// +/// - `current`: The current [`ScalerStage`]. +/// - `current_replicas`: The replica count in `status.replicas`. +/// - `desired_replicas`: The target from `spec.replicas`. +/// - `pre_outcome`: Result of the `PreScaling` hook. Only called when in `PreScaling`. +/// - `post_outcome`: Result of the `PostScaling` hook. Only called when in `PostScaling`. +/// - `statefulset_stable`: Whether the StatefulSet has converged. Only relevant in `Scaling`. +fn next_stage( + current: &ScalerStage, + current_replicas: i32, + desired_replicas: i32, + pre_outcome: impl FnOnce() -> HookOutcome, + post_outcome: impl FnOnce() -> HookOutcome, + statefulset_stable: bool, +) -> NextStage { + match current { + ScalerStage::Idle => { + if current_replicas != desired_replicas { + NextStage::Transition(ScalerStage::PreScaling) + } else { + NextStage::NoChange + } + } + ScalerStage::PreScaling => match pre_outcome() { + HookOutcome::Done => NextStage::Transition(ScalerStage::Scaling), + HookOutcome::InProgress => NextStage::Requeue, + }, + ScalerStage::Scaling => { + if statefulset_stable { + NextStage::Transition(ScalerStage::PostScaling) + } else { + NextStage::Requeue + } + } + ScalerStage::PostScaling => match post_outcome() { + HookOutcome::Done => NextStage::Transition(ScalerStage::Idle), + HookOutcome::InProgress => NextStage::Requeue, + }, + ScalerStage::Failed { .. } => NextStage::NoChange, + } +} + +/// The outcome of [`next_stage`]: what the reconciler should do. +#[derive(Debug, Eq, PartialEq)] +enum NextStage { + /// Nothing to do; wait for an external watch event. + NoChange, + /// Current stage is not yet complete; requeue after a short interval. + Requeue, + /// Move to the given stage and patch the scaler status. + Transition(ScalerStage), +} + +/// Reconcile a [`StackableScaler`], advancing its state machine and invoking hooks. +/// +/// Call this from your operator's reconcile function for every role group that has a +/// corresponding [`StackableScaler`]. The returned [`ScalingCondition`] MUST be applied +/// to the cluster CR's `status.conditions`. +/// +/// # Parameters +/// +/// - `scaler`: The [`StackableScaler`] resource. Must have `.metadata.namespace` set. +/// - `hooks`: The operator's [`ScalingHooks`] implementation. +/// - `client`: Kubernetes client for status patches and hook API calls. +/// - `statefulset_stable`: `true` when the managed StatefulSet has reached its target +/// replica count and all pods are ready. +/// - `selector`: Pod label selector string for this role group (e.g. +/// `"app=myproduct,roleGroup=default"`). Written into `status.selector` for HPA +/// pod counting. Must be stable across reconcile calls. +/// - `role_group_name`: The name of the role group being scaled (e.g. `"default"`). +/// Passed to hooks via [`ScalingContext`]. +/// +/// # Errors +/// +/// Returns [`Error::PatchStatus`] if the status patch fails, or +/// [`Error::ObjectHasNoNamespace`] if the scaler has no namespace. +pub async fn reconcile_scaler( + scaler: &StackableScaler, + hooks: &H, + client: &Client, + statefulset_stable: bool, + selector: &str, + role_group_name: &str, +) -> Result +where + H: ScalingHooks, +{ + let default_status = StackableScalerStatus::default(); + let status = scaler.status.as_ref().unwrap_or(&default_status); + let current_stage = status + .current_state + .as_ref() + .map(|s| s.stage.clone()) + .unwrap_or(ScalerStage::Idle); + let current_replicas = status.replicas; + let desired_replicas = scaler.spec.replicas; + let namespace = scaler + .metadata + .namespace + .as_deref() + .context(ObjectHasNoNamespaceSnafu)?; + + let scaler_name = scaler.metadata.name.as_deref().unwrap_or(""); + + debug!( + scaler = scaler_name, + %current_stage, + current_replicas, + desired_replicas, + statefulset_stable, + "Reconciling StackableScaler" + ); + + // Recovery from Failed: if the user has set the retry annotation, strip it + // and reset to Idle so the next reconcile starts a fresh scaling attempt. + if matches!(current_stage, ScalerStage::Failed { .. }) { + let has_retry = scaler + .metadata + .annotations + .as_ref() + .and_then(|a| a.get(RETRY_ANNOTATION)) + .is_some_and(|v| v == "true"); + + if has_retry { + info!( + scaler = scaler_name, + "Retry annotation found on Failed scaler, resetting to Idle" + ); + + // Strip the annotation via merge patch (setting to null removes it) + client + .merge_patch( + scaler, + serde_json::json!({ + "metadata": { + "annotations": { + RETRY_ANNOTATION: null + } + } + }), + ) + .await + .context(RemoveRetryAnnotationSnafu)?; + + // Reset status to Idle + let idle_status = + make_status(selector, ScalerStage::Idle, current_replicas, None, None); + patch_status(client, scaler, idle_status) + .await + .context(PatchStatusSnafu)?; + + return Ok(ScalingResult { + action: Action::requeue(REQUEUE_SCALING), + scaling_condition: ScalingCondition::Healthy, + }); + } + } + + // When a scaling operation is in progress, use the frozen previous_replicas + // to derive direction. status.replicas is overwritten during the Scaling stage + // and would always yield Up for the remainder of the operation. + let direction_base = status.previous_replicas.unwrap_or(current_replicas); + let ctx = ScalingContext { + client, + namespace, + role_group_name, + current_replicas: direction_base, + desired_replicas, + direction: ScalingDirection::from_replicas(direction_base, desired_replicas), + }; + + // Run the hook for the current stage if applicable, catching errors for Failed transition + let pre_result = if matches!(current_stage, ScalerStage::PreScaling) { + Some(hooks.pre_scale(&ctx).await) + } else { + None + }; + + let post_result = if matches!(current_stage, ScalerStage::PostScaling) { + Some(hooks.post_scale(&ctx).await) + } else { + None + }; + + // Handle hook errors → Failed transition + if let Some(Err(e)) = &pre_result { + return handle_hook_failure( + e, + FailedStage::PreScaling, + hooks, + &ctx, + scaler, + selector, + status, + ) + .await; + } + + if let Some(Err(e)) = &post_result { + return handle_hook_failure( + e, + FailedStage::PostScaling, + hooks, + &ctx, + scaler, + selector, + status, + ) + .await; + } + + let pre_outcome = pre_result.and_then(|r| r.ok()).unwrap_or(HookOutcome::Done); + let post_outcome = post_result + .and_then(|r| r.ok()) + .unwrap_or(HookOutcome::Done); + + let next = next_stage( + ¤t_stage, + current_replicas, + desired_replicas, + || pre_outcome.clone(), + || post_outcome.clone(), + statefulset_stable, + ); + + match next { + NextStage::NoChange => { + debug!( + scaler = scaler_name, + %current_stage, + "No stage change needed, awaiting external changes" + ); + Ok(ScalingResult { + action: Action::await_change(), + scaling_condition: ScalingCondition::Healthy, + }) + } + NextStage::Requeue => { + let interval = if matches!(current_stage, ScalerStage::Scaling) { + REQUEUE_SCALING + } else { + REQUEUE_HOOK_IN_PROGRESS + }; + debug!( + scaler = scaler_name, + %current_stage, + requeue_after_secs = interval.as_secs(), + "Requeuing, waiting for progress in current stage" + ); + Ok(ScalingResult { + action: Action::requeue(interval), + scaling_condition: ScalingCondition::Progressing { + stage: format!("{current_stage}"), + }, + }) + } + NextStage::Transition(new_stage) => { + info!( + scaler = scaler_name, + %current_stage, + %new_stage, + current_replicas, + desired_replicas, + "StackableScaler transitioning stage" + ); + // When transitioning to Scaling, update status.replicas to desired + // (this is what gets propagated to the StatefulSet) + // When completing PostScaling → Idle, clear desiredReplicas + let new_replicas = if matches!(new_stage, ScalerStage::Scaling) { + desired_replicas + } else { + current_replicas + }; + let new_desired = match new_stage { + ScalerStage::Idle => None, + ScalerStage::PreScaling => Some(desired_replicas), + _ => status.desired_replicas, + }; + let new_previous = match new_stage { + ScalerStage::PreScaling => Some(current_replicas), + ScalerStage::Idle => None, + _ => status.previous_replicas, + }; + let condition = match &new_stage { + ScalerStage::Idle => ScalingCondition::Healthy, + s => ScalingCondition::Progressing { + stage: format!("{s}"), + }, + }; + let new_status = + make_status(selector, new_stage, new_replicas, new_desired, new_previous); + patch_status(client, scaler, new_status) + .await + .context(PatchStatusSnafu)?; + Ok(ScalingResult { + action: Action::requeue(REQUEUE_SCALING), + scaling_condition: condition, + }) + } + } +} + +/// Transition the scaler to the `Failed` state after a hook error. +/// +/// Patches the scaler status to `Failed` first, then calls [`ScalingHooks::on_failure`] +/// for best-effort cleanup. Writing the status before the cleanup hook guarantees that +/// a re-entrant reconcile sees `Failed` and will not invoke `on_failure` a second time. +/// +/// If the cleanup hook itself fails, the status reason is updated to include the +/// cleanup error so it is visible via `kubectl describe` and the cluster CR condition. +/// +/// # Parameters +/// +/// - `error`: The hook error that caused the failure. +/// - `failed_stage`: Which stage (`PreScaling` or `PostScaling`) produced the error. +/// - `hooks`: The operator's [`ScalingHooks`] implementation, used to call `on_failure`. +/// - `ctx`: The [`ScalingContext`] for the current reconcile, forwarded to `on_failure`. +/// Also provides the Kubernetes client for patching the scaler status. +/// - `scaler`: The [`StackableScaler`] resource being reconciled. +/// - `selector`: Pod label selector string written into `status.selector`. +/// - `status`: The current scaler status, preserved in the `Failed` status so manual +/// recovery knows the original replica counts. +async fn handle_hook_failure( + error: &H::Error, + failed_stage: FailedStage, + hooks: &H, + ctx: &ScalingContext<'_>, + scaler: &StackableScaler, + selector: &str, + status: &StackableScalerStatus, +) -> Result { + let scaler_name = scaler.metadata.name.as_deref().unwrap_or(""); + let hook_reason = error.to_string(); + warn!( + scaler = scaler_name, + failed_at = ?failed_stage, + error = %hook_reason, + "StackableScaler hook failed, entering Failed state" + ); + + // Write Failed status BEFORE calling on_failure so that a subsequent reconcile + // sees the Failed stage and won't re-invoke on_failure. + let new_status = make_status( + selector, + ScalerStage::Failed { + failed_at: failed_stage.clone(), + reason: hook_reason.clone(), + }, + status.replicas, + status.desired_replicas, + status.previous_replicas, + ); + patch_status(ctx.client, scaler, new_status) + .await + .context(PatchStatusSnafu)?; + + // Run cleanup hook. If it fails, update the status reason so the failure is + // visible via `kubectl describe` / the cluster CR condition. + let final_reason = if let Err(on_failure_err) = hooks.on_failure(ctx, &failed_stage).await { + let reason_with_cleanup = format!("{hook_reason} (cleanup also failed: {on_failure_err})"); + warn!( + scaler = scaler_name, + error = %on_failure_err, + failed_at = ?failed_stage, + "on_failure hook returned an error, updating status" + ); + let updated_status = make_status( + selector, + ScalerStage::Failed { + failed_at: failed_stage.clone(), + reason: reason_with_cleanup.clone(), + }, + status.replicas, + status.desired_replicas, + status.previous_replicas, + ); + // Best-effort update — if this patch also fails, the original Failed reason + // is already persisted and the warn log captures the cleanup error. + if let Err(patch_err) = patch_status(ctx.client, scaler, updated_status).await { + warn!( + scaler = scaler_name, + error = %patch_err, + "Failed to update status with cleanup error" + ); + } + reason_with_cleanup + } else { + hook_reason + }; + + Ok(ScalingResult { + action: Action::await_change(), + scaling_condition: ScalingCondition::Failed { + stage: failed_stage, + reason: final_reason, + }, + }) +} + +/// Construct a [`StackableScalerStatus`] with the given values and `last_transition_time` of now. +/// +/// # Parameters +/// +/// - `selector`: Pod label selector string for HPA pod counting. +/// - `stage`: The new [`ScalerStage`] to record in the status. +/// - `replicas`: The replica count to write into `status.replicas`. +/// - `desired_replicas`: The in-flight target, or `None` when returning to `Idle`. +/// - `previous_replicas`: The replica count before the scaling operation started, +/// or `None` when returning to `Idle`. +fn make_status( + selector: &str, + stage: ScalerStage, + replicas: i32, + desired_replicas: Option, + previous_replicas: Option, +) -> StackableScalerStatus { + StackableScalerStatus { + replicas, + selector: Some(selector.to_string()), + desired_replicas, + previous_replicas, + current_state: Some(ScalerState { + stage, + last_transition_time: Time(Timestamp::now()), + }), + } +} + +/// Apply a server-side status patch to the [`StackableScaler`]. +/// +/// # Parameters +/// +/// - `client`: Kubernetes client for the status patch operation. +/// - `scaler`: The [`StackableScaler`] resource whose status to update. +/// - `status`: The new [`StackableScalerStatus`] to apply. +async fn patch_status( + client: &Client, + scaler: &StackableScaler, + status: StackableScalerStatus, +) -> Result<(), crate::client::Error> { + client + .apply_patch_status("stackable-operator", scaler, &status) + .await + .map(|_| ()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::crd::scaler::hooks::HookOutcome; + use crate::crd::scaler::{FailedStage, ScalerStage}; + + #[test] + fn idle_transitions_to_prescaling_when_replicas_differ() { + assert_eq!( + next_stage( + &ScalerStage::Idle, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + false + ), + NextStage::Transition(ScalerStage::PreScaling) + ); + } + + #[test] + fn idle_stays_idle_when_replicas_match() { + assert_eq!( + next_stage( + &ScalerStage::Idle, + 3, + 3, + || HookOutcome::Done, + || HookOutcome::Done, + false + ), + NextStage::NoChange + ); + } + + #[test] + fn prescaling_advances_when_hook_done() { + assert_eq!( + next_stage( + &ScalerStage::PreScaling, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + false + ), + NextStage::Transition(ScalerStage::Scaling) + ); + } + + #[test] + fn prescaling_requeues_when_hook_in_progress() { + assert_eq!( + next_stage( + &ScalerStage::PreScaling, + 3, + 5, + || HookOutcome::InProgress, + || HookOutcome::Done, + false + ), + NextStage::Requeue + ); + } + + #[test] + fn scaling_advances_when_statefulset_stable() { + assert_eq!( + next_stage( + &ScalerStage::Scaling, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + true + ), + NextStage::Transition(ScalerStage::PostScaling) + ); + } + + #[test] + fn scaling_requeues_when_not_stable() { + assert_eq!( + next_stage( + &ScalerStage::Scaling, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + false + ), + NextStage::Requeue + ); + } + + #[test] + fn postscaling_returns_to_idle_when_hook_done() { + assert_eq!( + next_stage( + &ScalerStage::PostScaling, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + true + ), + NextStage::Transition(ScalerStage::Idle) + ); + } + + #[test] + fn postscaling_requeues_when_hook_in_progress() { + assert_eq!( + next_stage( + &ScalerStage::PostScaling, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::InProgress, + true + ), + NextStage::Requeue + ); + } + + #[test] + fn failed_stays_failed() { + let failed = ScalerStage::Failed { + failed_at: FailedStage::PreScaling, + reason: "err".to_string(), + }; + assert_eq!( + next_stage( + &failed, + 3, + 5, + || HookOutcome::Done, + || HookOutcome::Done, + false + ), + NextStage::NoChange + ); + } +} diff --git a/crates/stackable-operator/src/crd/scaler/replicas_config.rs b/crates/stackable-operator/src/crd/scaler/replicas_config.rs new file mode 100644 index 000000000..81cf57eb0 --- /dev/null +++ b/crates/stackable-operator/src/crd/scaler/replicas_config.rs @@ -0,0 +1,325 @@ +//! Replica configuration for Stackable role groups. +//! +//! [`ReplicasConfig`] replaces the simple `replicas: Option` field on role groups, +//! allowing operators to express fixed counts, HPA-managed scaling, Stackable auto-scaling, +//! or externally-managed replicas in a single enum. + +use std::borrow::Cow; + +use k8s_openapi::api::autoscaling::v2::HorizontalPodAutoscalerSpec; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use snafu::Snafu; + +/// Errors returned by [`ReplicasConfig::validate`]. +#[derive(Debug, Snafu)] +pub enum ValidationError { + /// A `Fixed(0)` replica count is not allowed. + #[snafu(display("fixed replica count must be at least 1, got 0"))] + FixedZero, + + /// The `min_replicas` field in [`AutoConfig`] must be at least 1. + #[snafu(display("auto min_replicas must be at least 1, got {min}"))] + AutoMinZero { + /// The invalid minimum replica count. + min: u16, + }, + + /// The `max_replicas` must be greater than or equal to `min_replicas`. + #[snafu(display("auto max_replicas ({max}) must be >= min_replicas ({min})"))] + AutoMaxLessThanMin { + /// The minimum replica count. + min: u16, + /// The maximum replica count that was less than `min`. + max: u16, + }, +} + +/// Configuration for a Kubernetes `HorizontalPodAutoscaler` that manages the role group. +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct HpaConfig { + /// The HPA spec to apply. The `scaleTargetRef` and `minReplicas` fields are managed + /// by the operator and will be overwritten. + pub spec: HorizontalPodAutoscalerSpec, +} + +/// Configuration for Stackable-managed auto-scaling. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct AutoConfig { + /// Minimum number of replicas the auto-scaler may scale down to. + pub min_replicas: u16, + /// Maximum number of replicas the auto-scaler may scale up to. + pub max_replicas: u16, +} + +/// How replicas are managed for a role group. +/// +/// This enum supports multiple input formats for ergonomic YAML/JSON authoring: +/// +/// - A bare integer (e.g. `3`) is parsed as `Fixed(3)`. +/// - The string `"externallyScaled"` is parsed as `ExternallyScaled`. +/// - An object with a discriminant key (`fixed`, `hpa`, or `auto`) selects the +/// corresponding variant. +/// +/// # Validation +/// +/// After deserialization, call [`validate`](ReplicasConfig::validate) to enforce +/// business rules (e.g. `Fixed(0)` is not allowed). +#[derive(Clone, Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum ReplicasConfig { + /// A fixed number of replicas managed by the operator. + Fixed(u16), + /// Replicas managed by a Kubernetes `HorizontalPodAutoscaler`. + Hpa(Box), + /// Replicas managed by the Stackable auto-scaler. + Auto(AutoConfig), + /// Replicas managed by an external system. The operator creates a + /// [`StackableScaler`](super::v1alpha1::StackableScaler) that exposes + /// a `/scale` subresource for external controllers to target. + ExternallyScaled, +} + +impl Default for ReplicasConfig { + fn default() -> Self { + Self::Fixed(1) + } +} + +impl ReplicasConfig { + /// Validates business rules for this configuration. + /// + /// # Errors + /// + /// Returns [`ValidationError::FixedZero`] if the variant is `Fixed(0)`. + /// Returns [`ValidationError::AutoMinZero`] if `min_replicas` is 0. + /// Returns [`ValidationError::AutoMaxLessThanMin`] if `max_replicas < min_replicas`. + pub fn validate(&self) -> Result<(), ValidationError> { + match self { + Self::Fixed(0) => FixedZeroSnafu.fail(), + Self::Auto(cfg) if cfg.min_replicas == 0 => AutoMinZeroSnafu { + min: cfg.min_replicas, + } + .fail(), + Self::Auto(cfg) if cfg.max_replicas < cfg.min_replicas => AutoMaxLessThanMinSnafu { + min: cfg.min_replicas, + max: cfg.max_replicas, + } + .fail(), + _ => Ok(()), + } + } +} + +impl JsonSchema for ReplicasConfig { + fn schema_name() -> Cow<'static, str> { + "ReplicasConfig".into() + } + + fn json_schema(generator: &mut schemars::generate::SchemaGenerator) -> schemars::Schema { + schemars::json_schema!({ + "description": "Replica configuration for a role group.", + "oneOf": [ + { + "description": "Fixed replica count (bare integer).", + "type": "integer", + "minimum": 1 + }, + { + "description": "Externally managed replicas.", + "type": "string", + "const": "externallyScaled" + }, + { + "description": "Fixed replica count (object form).", + "type": "object", + "required": ["fixed"], + "properties": { + "fixed": { "type": "integer", "minimum": 1 } + }, + "additionalProperties": false + }, + { + "description": "HPA-managed replicas.", + "type": "object", + "required": ["hpa"], + "properties": { + "hpa": generator.subschema_for::() + }, + "additionalProperties": false + }, + { + "description": "Stackable auto-scaling.", + "type": "object", + "required": ["auto"], + "properties": { + "auto": generator.subschema_for::() + }, + "additionalProperties": false + } + ] + }) + } +} + +impl<'de> Deserialize<'de> for ReplicasConfig { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + use serde::de::{self, MapAccess, Visitor}; + + struct ReplicasConfigVisitor; + + impl<'de> Visitor<'de> for ReplicasConfigVisitor { + type Value = ReplicasConfig; + + fn expecting(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str( + "an integer, the string \"externallyScaled\", \ + or an object with key \"fixed\", \"hpa\", or \"auto\"", + ) + } + + fn visit_u64(self, value: u64) -> Result { + let value = u16::try_from(value) + .map_err(|_| de::Error::custom("integer out of u16 range"))?; + Ok(ReplicasConfig::Fixed(value)) + } + + fn visit_i64(self, value: i64) -> Result { + let value = u16::try_from(value) + .map_err(|_| de::Error::custom("integer out of u16 range"))?; + Ok(ReplicasConfig::Fixed(value)) + } + + fn visit_str(self, value: &str) -> Result { + match value { + "externallyScaled" => Ok(ReplicasConfig::ExternallyScaled), + other => Err(de::Error::unknown_variant(other, &["externallyScaled"])), + } + } + + fn visit_map>(self, mut map: A) -> Result { + let key: String = map + .next_key()? + .ok_or_else(|| de::Error::custom("expected a non-empty object"))?; + + let result = match key.as_str() { + "fixed" => { + let value: u16 = map.next_value()?; + ReplicasConfig::Fixed(value) + } + "hpa" => { + let value: HpaConfig = map.next_value()?; + ReplicasConfig::Hpa(Box::new(value)) + } + "auto" => { + let value: AutoConfig = map.next_value()?; + ReplicasConfig::Auto(value) + } + other => { + return Err(de::Error::unknown_field(other, &["fixed", "hpa", "auto"])); + } + }; + + // Drain remaining keys to ensure no extra fields. + if map.next_key::()?.is_some() { + return Err(de::Error::custom( + "expected exactly one key (\"fixed\", \"hpa\", or \"auto\")", + )); + } + + Ok(result) + } + } + + deserializer.deserialize_any(ReplicasConfigVisitor) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deserialize_fixed_from_integer() { + let config: ReplicasConfig = serde_json::from_str("3").expect("should parse integer"); + assert_eq!(config, ReplicasConfig::Fixed(3)); + } + + #[test] + fn deserialize_fixed_from_object() { + let config: ReplicasConfig = + serde_json::from_str(r#"{"fixed": 5}"#).expect("should parse fixed object"); + assert_eq!(config, ReplicasConfig::Fixed(5)); + } + + #[test] + fn deserialize_externally_scaled() { + let config: ReplicasConfig = + serde_json::from_str(r#""externallyScaled""#).expect("should parse string variant"); + assert_eq!(config, ReplicasConfig::ExternallyScaled); + } + + #[test] + fn deserialize_hpa() { + let config: ReplicasConfig = + serde_json::from_str(r#"{"hpa": {"spec": {"maxReplicas": 10}}}"#) + .expect("should parse hpa object"); + assert!(matches!(config, ReplicasConfig::Hpa(..))); + } + + #[test] + fn deserialize_auto() { + let config: ReplicasConfig = + serde_json::from_str(r#"{"auto": {"minReplicas": 2, "maxReplicas": 10}}"#) + .expect("should parse auto object"); + assert_eq!( + config, + ReplicasConfig::Auto(AutoConfig { + min_replicas: 2, + max_replicas: 10, + }) + ); + } + + #[test] + fn fixed_zero_is_invalid() { + let config = ReplicasConfig::Fixed(0); + let result = config.validate(); + assert!(matches!(result, Err(ValidationError::FixedZero))); + } + + #[test] + fn auto_min_zero_is_invalid() { + let config = ReplicasConfig::Auto(AutoConfig { + min_replicas: 0, + max_replicas: 5, + }); + let result = config.validate(); + assert!(matches!(result, Err(ValidationError::AutoMinZero { .. }))); + } + + #[test] + fn auto_max_less_than_min_is_invalid() { + let config = ReplicasConfig::Auto(AutoConfig { + min_replicas: 5, + max_replicas: 2, + }); + let result = config.validate(); + assert!(matches!( + result, + Err(ValidationError::AutoMaxLessThanMin { .. }) + )); + } + + #[test] + fn option_none_defaults_to_fixed_1() { + let config: Option = + serde_json::from_str("null").expect("should parse null"); + assert_eq!(config.unwrap_or_default(), ReplicasConfig::Fixed(1)); + } +} diff --git a/crates/stackable-versioned-macros/src/attrs/container.rs b/crates/stackable-versioned-macros/src/attrs/container.rs index d902dd1cd..284282c59 100644 --- a/crates/stackable-versioned-macros/src/attrs/container.rs +++ b/crates/stackable-versioned-macros/src/attrs/container.rs @@ -33,6 +33,24 @@ pub struct ContainerSkipArguments { pub try_convert: Flag, } +/// Scale subresource configuration for a CRD. +/// +/// Mirrors the fields of `k8s_openapi::CustomResourceSubresourceScale`. Passed through +/// to the `#[kube(scale(...))]` attribute. +/// +/// See the [Kubernetes documentation](https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#scale-subresource) +/// for details on the scale subresource. +#[derive(Clone, Debug, FromMeta)] +pub struct Scale { + /// JSON path to the replica count in the custom resource's spec (e.g. `.spec.replicas`). + pub spec_replicas_path: String, + /// JSON path to the replica count in the custom resource's status (e.g. `.status.replicas`). + pub status_replicas_path: String, + /// JSON path to the label selector in the custom resource's status (e.g. `.status.selector`). + #[darling(default)] + pub label_selector_path: Option, +} + /// This struct contains supported CRD arguments. /// /// The arguments are passed through to the `#[kube]` attribute. More details can be found in the @@ -50,6 +68,7 @@ pub struct ContainerSkipArguments { /// cluster scoped resource. /// - `crates`: Override specific crates. /// - `status`: Set the specified struct as the status subresource. +/// - `scale`: Configure the scale subresource for HPA integration. /// - `shortname`: Set a shortname for the CR object. This can be specified multiple /// times. /// - `skip`: Controls skipping parts of the generation. @@ -64,7 +83,7 @@ pub struct StructCrdArguments { pub status: Option, // derive // schema - // scale + pub scale: Option, // printcolumn #[darling(multiple, rename = "shortname")] pub shortnames: Vec, diff --git a/crates/stackable-versioned-macros/src/codegen/container/struct/mod.rs b/crates/stackable-versioned-macros/src/codegen/container/struct/mod.rs index 021f523fa..c022c8289 100644 --- a/crates/stackable-versioned-macros/src/codegen/container/struct/mod.rs +++ b/crates/stackable-versioned-macros/src/codegen/container/struct/mod.rs @@ -279,6 +279,20 @@ impl Struct { .map(|s| quote! { , shortname = #s }) .collect(); + let scale = spec_gen_ctx.kubernetes_arguments.scale.as_ref().map(|s| { + let spec_replicas_path = &s.spec_replicas_path; + let status_replicas_path = &s.status_replicas_path; + let label_selector_path = s + .label_selector_path + .as_ref() + .map(|p| quote! { , label_selector_path = #p }); + quote! { , scale( + spec_replicas_path = #spec_replicas_path, + status_replicas_path = #status_replicas_path + #label_selector_path + )} + }); + Some(quote! { // The end-developer needs to derive CustomResource and JsonSchema. // This is because we don't know if they want to use a re-exported or renamed import. @@ -286,7 +300,7 @@ impl Struct { // These must be comma separated (except the last) as they always exist: group = #group, version = #version, kind = #kind // These fields are optional, and therefore the token stream must prefix each with a comma: - #singular #plural #namespaced #crates #status #shortnames + #singular #plural #namespaced #crates #status #shortnames #scale )] }) }