diff --git a/.github/scripts/gha-e2e.sh b/.github/scripts/gha-e2e.sh index 2c7cf04464e..71bbbca2495 100755 --- a/.github/scripts/gha-e2e.sh +++ b/.github/scripts/gha-e2e.sh @@ -90,6 +90,11 @@ function alluxio_e2e() { bash test/gha-e2e/alluxio/test.sh } +function alluxio_scaledown_e2e() { + set -e + bash test/gha-e2e/alluxio-scaledown/test.sh +} + function jindo_e2e() { set -e bash test/gha-e2e/jindo/test.sh @@ -107,6 +112,7 @@ function curvine_e2e() { check_control_plane_status alluxio_e2e +alluxio_scaledown_e2e jindo_e2e juicefs_e2e curvine_e2e diff --git a/api/v1alpha1/status.go b/api/v1alpha1/status.go index 4f98bca2bea..b6ccf400437 100644 --- a/api/v1alpha1/status.go +++ b/api/v1alpha1/status.go @@ -191,6 +191,8 @@ const ( RuntimeFusesScaledIn RuntimeConditionType = "FusesScaledIn" // RuntimeFusesScaledOut means the fuses of runtime just scaled out RuntimeFusesScaledOut RuntimeConditionType = "FusesScaledOut" + // RuntimeWorkerDecommissioning means the runtime is draining workers ahead of a scale-down + RuntimeWorkerDecommissioning RuntimeConditionType = "WorkerDecommissioning" ) const ( @@ -214,6 +216,8 @@ const ( RuntimeFusesScaledInReason = "Fuses scaled in" // RuntimeFusesScaledInReason means the fuses of runtime just scaled out RuntimeFusesScaledOutReason = "Fuses scaled out" + // RuntimeWorkerDecommissioningReason means workers are being decommissioned ahead of a scale-down + RuntimeWorkerDecommissioningReason = "Workers are being decommissioned" ) // Condition describes the state of the cache at a certain point. diff --git a/cmd/alluxio/app/alluxio.go b/cmd/alluxio/app/alluxio.go index 1493677b1cd..b8569c0e824 100644 --- a/cmd/alluxio/app/alluxio.go +++ b/cmd/alluxio/app/alluxio.go @@ -29,6 +29,7 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" "github.com/fluid-cloudnative/fluid/pkg/utils" + utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature" "github.com/spf13/cobra" zapOpt "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -95,6 +96,7 @@ func init() { alluxioCmd.Flags().StringVar(&controllerWorkqueueMaxSyncBackoffStr, "workqueue-max-sync-backoff", "1000s", "max backoff period for failed reconciliation in controller's workqueue") alluxioCmd.Flags().IntVar(&controllerWorkqueueQPS, "workqueue-qps", 10, "qps limit value for controller's workqueue") alluxioCmd.Flags().IntVar(&controllerWorkqueueBurst, "workqueue-burst", 100, "burst limit value for controller's workqueue") + utilfeature.DefaultMutableFeatureGate.AddFlag(alluxioCmd.Flags()) } func handle() { diff --git a/pkg/ddc/alluxio/const.go b/pkg/ddc/alluxio/const.go index 833988ec453..08c72ba9815 100644 --- a/pkg/ddc/alluxio/const.go +++ b/pkg/ddc/alluxio/const.go @@ -16,6 +16,8 @@ limitations under the License. package alluxio +import "time" + const ( // NON_NATIVE_MOUNT_DATA_NAME also used in master 'statefulset.yaml' and config 'alluxio-mount.conf.yaml' NON_NATIVE_MOUNT_DATA_NAME = "mount.info" @@ -57,6 +59,16 @@ const ( defaultGracefulShutdownLimits int32 = 3 defaultCleanCacheGracePeriodSeconds int32 = 60 + // defaultWorkerRPCPort is the Alluxio worker Thrift RPC port used when the + // runtime spec does not override alluxio.worker.rpc.port. + defaultWorkerRPCPort = 29999 + MountConfigStorage = "ALLUXIO_MOUNT_CONFIG_STORAGE" ConfigmapStorageName = "configmap" + + // defaultWorkerDecommissionDeadline bounds how long the engine keeps + // retrying a stuck worker drain (e.g. an unhealthy master, unreplicable + // blocks) before forcing the scale-down to proceed anyway, rather than + // stalling on every reconcile indefinitely. + defaultWorkerDecommissionDeadline = 10 * time.Minute ) diff --git a/pkg/ddc/alluxio/operations/decommission.go b/pkg/ddc/alluxio/operations/decommission.go new file mode 100644 index 00000000000..61c0a7b7b13 --- /dev/null +++ b/pkg/ddc/alluxio/operations/decommission.go @@ -0,0 +1,86 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operations + +import "strings" + +// DecommissionWorkers signals the Alluxio master to decommission the given +// workers. Each address must be in ":" form. +// The call is idempotent: re-issuing it against an already-decommissioned +// worker is safe. +// +// Requires Alluxio >= 2.9, where "fsadmin decommissionWorker" was introduced; +// against older masters this subcommand does not exist. +func (a AlluxioFileUtils) DecommissionWorkers(addresses []string) error { + if len(addresses) == 0 { + return nil + } + command := []string{ + "alluxio", "fsadmin", "decommissionWorker", + "--addresses", strings.Join(addresses, ","), + } + _, _, err := a.exec(command, false) + if err != nil { + a.log.Error(err, "AlluxioFileUtils.DecommissionWorkers() failed", "addresses", addresses) + } + return err +} + +// CountActiveWorkers returns the number of live workers according to +// "alluxio fsadmin report capacity -live". The "-live" flag is what makes +// this safe to compare against immediately after a decommission: it asks the +// master for currently live workers rather than every worker it still has a +// record of, so a worker that was just decommissioned doesn't linger in the +// count until its heartbeat times out. +func (a AlluxioFileUtils) CountActiveWorkers() (int, error) { + report, _, err := a.exec([]string{"alluxio", "fsadmin", "report", "capacity", "-live"}, false) + if err != nil { + a.log.Error(err, "AlluxioFileUtils.CountActiveWorkers() failed") + return 0, err + } + return parseActiveWorkerCount(report), nil +} + +// parseActiveWorkerCount counts workers in the capacity report produced by +// "alluxio fsadmin report capacity". Worker entries begin at the non-indented +// line after the "Worker Name" header; the indented line that follows each +// entry contains the used-capacity detail. +// +// Worker Name Last Heartbeat Storage MEM +// 192.168.1.147 0 capacity 2048.00MB <- worker entry +// used 443.89MB (21%) <- detail, indented +// 192.168.1.146 0 capacity 2048.00MB <- worker entry +// used 0B (0%) +func parseActiveWorkerCount(report string) int { + inWorkerSection := false + count := 0 + for _, line := range strings.Split(report, "\n") { + if strings.HasPrefix(line, "Worker Name") { + inWorkerSection = true + continue + } + if !inWorkerSection || strings.TrimSpace(line) == "" { + continue + } + // Non-indented lines are new worker entries; indented lines are + // the used-capacity continuation for the previous entry. + if line[0] != ' ' && line[0] != '\t' { + count++ + } + } + return count +} diff --git a/pkg/ddc/alluxio/operations/decommission_test.go b/pkg/ddc/alluxio/operations/decommission_test.go new file mode 100644 index 00000000000..9b1dbc2486f --- /dev/null +++ b/pkg/ddc/alluxio/operations/decommission_test.go @@ -0,0 +1,227 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operations + +import ( + "errors" + "testing" + + "github.com/agiledragon/gomonkey/v2" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" +) + +func TestAlluxioFileUtils_DecommissionWorkers(t *testing.T) { + a := &AlluxioFileUtils{log: fake.NullLogger()} + + t.Run("empty address list is a no-op", func(t *testing.T) { + if err := a.DecommissionWorkers(nil); err != nil { + t.Fatalf("want nil, got: %v", err) + } + if err := a.DecommissionWorkers([]string{}); err != nil { + t.Fatalf("want nil, got: %v", err) + } + }) + + t.Run("exec error is propagated", func(t *testing.T) { + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { + return "", "", errors.New("exec failed") + }) + defer patches.Reset() + + if err := a.DecommissionWorkers([]string{"192.168.1.1:29999"}); err == nil { + t.Error("want error, got nil") + } + }) + + t.Run("address is forwarded to the alluxio CLI", func(t *testing.T) { + var capturedCmd []string + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) { + capturedCmd = cmd + return "", "", nil + }) + defer patches.Reset() + + addr := "192.168.1.1:29999" + if err := a.DecommissionWorkers([]string{addr}); err != nil { + t.Fatalf("want nil, got: %v", err) + } + found := false + for _, arg := range capturedCmd { + if arg == addr { + found = true + break + } + } + if !found { + t.Errorf("address %q not found in command: %v", addr, capturedCmd) + } + }) + + t.Run("multiple addresses are joined with commas", func(t *testing.T) { + var capturedCmd []string + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) { + capturedCmd = cmd + return "", "", nil + }) + defer patches.Reset() + + if err := a.DecommissionWorkers([]string{"10.0.0.1:29999", "10.0.0.2:29999"}); err != nil { + t.Fatalf("want nil, got: %v", err) + } + found := false + for _, arg := range capturedCmd { + if arg == "10.0.0.1:29999,10.0.0.2:29999" { + found = true + break + } + } + if !found { + t.Errorf("joined addresses not found in command: %v", capturedCmd) + } + }) +} + +func TestAlluxioFileUtils_CountActiveWorkers(t *testing.T) { + a := &AlluxioFileUtils{log: fake.NullLogger()} + + t.Run("exec error returns zero and the error", func(t *testing.T) { + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { + return "", "", errors.New("exec failed") + }) + defer patches.Reset() + + count, err := a.CountActiveWorkers() + if err == nil { + t.Error("want error, got nil") + } + if count != 0 { + t.Errorf("want 0 on error, got %d", count) + } + }) + + t.Run("requests live workers only", func(t *testing.T) { + var capturedCmd []string + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) { + capturedCmd = cmd + return "", "", nil + }) + defer patches.Reset() + + if _, err := a.CountActiveWorkers(); err != nil { + t.Fatalf("want nil, got: %v", err) + } + found := false + for _, arg := range capturedCmd { + if arg == "-live" { + found = true + break + } + } + if !found { + t.Errorf("-live flag not found in command: %v", capturedCmd) + } + }) + + t.Run("two active workers", func(t *testing.T) { + report := `Capacity information for all workers: + Total Capacity: 4096.00MB + Used Capacity: 443.89MB + +Worker Name Last Heartbeat Storage MEM +192.168.1.147 0 capacity 2048.00MB + used 443.89MB (21%) +192.168.1.146 0 capacity 2048.00MB + used 0B (0%) +` + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { + return report, "", nil + }) + defer patches.Reset() + + count, err := a.CountActiveWorkers() + if err != nil { + t.Fatalf("want nil, got: %v", err) + } + if count != 2 { + t.Errorf("want 2, got %d", count) + } + }) +} + +func TestParseActiveWorkerCount(t *testing.T) { + cases := []struct { + name string + input string + expect int + }{ + { + name: "empty report", + input: "", + expect: 0, + }, + { + name: "no worker section header", + input: "Capacity information for all workers:\n Total Capacity: 0B\n", + expect: 0, + }, + { + name: "single worker", + input: `Worker Name Last Heartbeat Storage MEM +192.168.1.1 0 capacity 1024.00MB + used 0B (0%) +`, + expect: 1, + }, + { + name: "three workers", + input: `Worker Name Last Heartbeat Storage MEM +10.0.0.1 0 capacity 2048.00MB + used 100MB (5%) +10.0.0.2 0 capacity 2048.00MB + used 0B (0%) +10.0.0.3 0 capacity 2048.00MB + used 500MB (25%) +`, + expect: 3, + }, + { + name: "trailing blank lines are ignored", + input: `Worker Name Last Heartbeat Storage MEM +10.0.0.1 0 capacity 1024.00MB + used 0B (0%) + + +`, + expect: 1, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := parseActiveWorkerCount(tc.input) + if got != tc.expect { + t.Errorf("want %d, got %d", tc.expect, got) + } + }) + } +} diff --git a/pkg/ddc/alluxio/replicas.go b/pkg/ddc/alluxio/replicas.go index f149578869f..83848c890f2 100644 --- a/pkg/ddc/alluxio/replicas.go +++ b/pkg/ddc/alluxio/replicas.go @@ -18,19 +18,32 @@ package alluxio import ( "context" + stderrors "errors" "fmt" "reflect" + "time" data "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/ctrl" + "github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio/operations" + "github.com/fluid-cloudnative/fluid/pkg/features" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils" + utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" ) +// errWorkersNotYetDrained marks the normal, transient state during scale-in +// where the targeted workers have not finished migrating their cached blocks +// to the surviving workers yet. It lets the caller log this at Info level +// instead of Error, while still propagating a non-nil error so the existing +// fixed-interval reconcile requeue (see runtime_controller.go) kicks in. +var errWorkersNotYetDrained = stderrors.New("workers not yet drained") + // SyncReplicas syncs the replicas func (e *AlluxioEngine) SyncReplicas(ctx cruntime.ReconcileRequestContext) (err error) { err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { @@ -88,12 +101,190 @@ func (e *AlluxioEngine) SyncReplicas(ctx cruntime.ReconcileRequestContext) (err return err } runtimeToUpdate := runtime.DeepCopy() + + // When the GracefulWorkerScaleDown feature is enabled and we detect a + // scale-in, decommission the targeted workers before the StatefulSet + // controller terminates them. This gives the Alluxio master a chance to + // migrate their cached blocks to the surviving workers. The reconciler + // requeues until the active worker count has dropped to the desired + // level. + // + // workers.Status.Replicas (the number of Pods the StatefulSet controller + // has actually created) is used rather than workers.Spec.Replicas: the + // spec is the target this engine itself lowers once a drain succeeds, so + // relying on it could under-count pods that still exist but whose spec + // update already landed. + if utilfeature.DefaultFeatureGate.Enabled(features.GracefulWorkerScaleDown) && + runtime.Replicas() < workers.Status.Replicas { + + decommissionStart, alreadyTracked := getDecommissionStart(runtime) + if !alreadyTracked { + decommissionStart = time.Now() + } + + drained, drainErr := e.drainScalingDownWorkers(ctx, runtime, runtime.Replicas(), workers.Status.Replicas) + if drainErr != nil { + return drainErr + } + + if !drained { + elapsed := time.Since(decommissionStart) + if elapsed > defaultWorkerDecommissionDeadline { + // A worker that never finishes draining (unhealthy master, + // unreplicable blocks, ...) would otherwise stall scale-down + // forever. Past the deadline we fall through and proceed + // anyway so the StatefulSet still converges; any data loss + // risk this avoided is the same the cluster accepts today + // without this feature. + e.Log.Info("Worker decommission exceeded the deadline; forcing scale-down to proceed", + "elapsed", elapsed, "deadline", defaultWorkerDecommissionDeadline) + } else { + if !alreadyTracked { + runtimeToUpdate.Status.Conditions = utils.UpdateRuntimeCondition( + runtimeToUpdate.Status.Conditions, newDecommissioningCondition(decommissionStart)) + if updateErr := e.Client.Status().Update(ctx, runtimeToUpdate); updateErr != nil { + return updateErr + } + } + return fmt.Errorf("%w: scale-in to %d replicas will resume on next reconcile", + errWorkersNotYetDrained, runtime.Replicas()) + } + } + + if alreadyTracked { + runtimeToUpdate.Status.Conditions = clearDecommissioningCondition(runtimeToUpdate.Status.Conditions) + if updateErr := e.Client.Status().Update(ctx, runtimeToUpdate); updateErr != nil { + return updateErr + } + } + } + err = e.Helper.SyncReplicas(ctx, runtimeToUpdate, runtimeToUpdate.Status, workers) return err }) if err != nil { - _ = utils.LoggingErrorExceptConflict(e.Log, err, "Failed to sync replicas", types.NamespacedName{Namespace: e.namespace, Name: e.name}) + if stderrors.Is(err, errWorkersNotYetDrained) { + e.Log.Info(err.Error(), "name", e.name, "namespace", e.namespace) + } else { + _ = utils.LoggingErrorExceptConflict(e.Log, err, "Failed to sync replicas", types.NamespacedName{Namespace: e.namespace, Name: e.name}) + } } return } + +// drainScalingDownWorkers decommissions the Alluxio workers that are about to be +// removed when scaling from currentReplicas down to desiredReplicas. +// +// A standard StatefulSet removes the highest-ordinal pods first, so the targets +// are ordinals [desiredReplicas, currentReplicas). The function issues a +// decommission request via the master and returns whether Alluxio's active +// worker count has already dropped to the desired level. +func (e *AlluxioEngine) drainScalingDownWorkers(ctx context.Context, runtime *data.AlluxioRuntime, desiredReplicas, currentReplicas int32) (bool, error) { + masterPodName, masterContainerName := e.getMasterPodInfo() + fileUtils := operations.NewAlluxioFileUtils(masterPodName, masterContainerName, e.namespace, e.Log) + + workerRPCPort := e.getWorkerRPCPort(runtime) + workerStsName := e.getWorkerName() + + // Collect RPC addresses of the pods that will be terminated on scale-down. + // The worker registers with the master under its node's IP (see the + // ALLUXIO_WORKER_HOSTNAME wiring in charts/alluxio, which sources + // alluxio.worker.hostname from status.hostIP), not its pod IP, so that is + // the identity "fsadmin decommissionWorker" must be addressed by. + // + // Pods sharing a node produce the same HostIP; seen tracks addresses + // already added so the request doesn't list the same worker twice. + var toDecommission []string + seen := make(map[string]struct{}) + for ord := desiredReplicas; ord < currentReplicas; ord++ { + podName := fmt.Sprintf("%s-%d", workerStsName, ord) + pod := &corev1.Pod{} + if err := e.Client.Get(ctx, + types.NamespacedName{Name: podName, Namespace: e.namespace}, pod); err != nil { + if errors.IsNotFound(err) { + // Pod is already gone; nothing to decommission here. + continue + } + return false, err + } + if pod.Status.HostIP == "" { + e.Log.Info("Worker pod has no host IP yet, will retry", "pod", podName) + return false, nil + } + addr := fmt.Sprintf("%s:%d", pod.Status.HostIP, workerRPCPort) + if _, dup := seen[addr]; dup { + continue + } + seen[addr] = struct{}{} + toDecommission = append(toDecommission, addr) + } + + if len(toDecommission) == 0 { + // All targeted pods are already gone from the cluster. + return true, nil + } + + if err := fileUtils.DecommissionWorkers(toDecommission); err != nil { + return false, err + } + + activeCount, err := fileUtils.CountActiveWorkers() + if err != nil { + return false, err + } + + if int32(activeCount) > desiredReplicas { + e.Log.Info("Workers are still draining, will retry", + "activeWorkers", activeCount, "desired", desiredReplicas) + return false, nil + } + + return true, nil +} + +// getWorkerRPCPort returns the configured Alluxio worker RPC port, falling back +// to the Alluxio default when the runtime does not override it. +func (e *AlluxioEngine) getWorkerRPCPort(runtime *data.AlluxioRuntime) int { + if port, ok := runtime.Spec.Worker.Ports["rpc"]; ok && port > 0 { + return port + } + return defaultWorkerRPCPort +} + +// getDecommissionStart returns when the current worker-drain attempt began, +// based on the RuntimeWorkerDecommissioning condition set the first time a +// scale-down's drain didn't finish within one reconcile. The bool reports +// whether such an in-progress attempt is already being tracked. +func getDecommissionStart(runtime *data.AlluxioRuntime) (time.Time, bool) { + _, cond := utils.GetRuntimeCondition(runtime.Status.Conditions, data.RuntimeWorkerDecommissioning) + if cond == nil || cond.Status != corev1.ConditionTrue { + return time.Time{}, false + } + return cond.LastTransitionTime.Time, true +} + +// newDecommissioningCondition marks the start of a worker-drain attempt that +// didn't complete within one reconcile, so subsequent reconciles can measure +// elapsed time against defaultWorkerDecommissionDeadline. +func newDecommissioningCondition(start time.Time) data.RuntimeCondition { + cond := utils.NewRuntimeCondition(data.RuntimeWorkerDecommissioning, data.RuntimeWorkerDecommissioningReason, + "Workers are being decommissioned ahead of a scale-down.", corev1.ConditionTrue) + cond.LastTransitionTime = metav1.NewTime(start) + return cond +} + +// clearDecommissioningCondition marks a tracked drain attempt as finished, +// whether because it succeeded or because defaultWorkerDecommissionDeadline +// forced the scale-down to proceed anyway. +func clearDecommissioningCondition(conditions []data.RuntimeCondition) []data.RuntimeCondition { + idx, cond := utils.GetRuntimeCondition(conditions, data.RuntimeWorkerDecommissioning) + if cond == nil { + return conditions + } + cleared := *cond + cleared.Status = corev1.ConditionFalse + cleared.LastTransitionTime = metav1.Now() + conditions[idx] = cleared + return conditions +} diff --git a/pkg/ddc/alluxio/replicas_drain_test.go b/pkg/ddc/alluxio/replicas_drain_test.go new file mode 100644 index 00000000000..e18eeb4fbcb --- /dev/null +++ b/pkg/ddc/alluxio/replicas_drain_test.go @@ -0,0 +1,319 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package alluxio + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/agiledragon/gomonkey/v2" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio/operations" + "github.com/fluid-cloudnative/fluid/pkg/features" + cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" +) + +const testDrainWorkerSts = "drain-worker" +const testDrainNamespace = "fluid" + +var _ = Describe("AlluxioEngine drainScalingDownWorkers", Label("pkg.ddc.alluxio.replicas_drain_test.go"), func() { + var ( + engine *AlluxioEngine + rt *v1alpha1.AlluxioRuntime + ) + + BeforeEach(func() { + rt = &v1alpha1.AlluxioRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: testDrainWorkerSts, + Namespace: testDrainNamespace, + }, + } + }) + + newEngineWithPods := func(pods ...*corev1.Pod) *AlluxioEngine { + objs := []runtime.Object{} + for _, p := range pods { + objs = append(objs, p.DeepCopy()) + } + fakeClient := fake.NewFakeClientWithScheme(testScheme, objs...) + return newAlluxioEngineREP(fakeClient, testDrainWorkerSts, testDrainNamespace) + } + + // hostIP mirrors status.hostIP, which is what ALLUXIO_WORKER_HOSTNAME (and + // therefore the worker's registered identity with the master) is sourced + // from in charts/alluxio - not the pod's own IP. + workerPod := func(ordinal int, hostIP string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-worker-%d", testDrainWorkerSts, ordinal), + Namespace: testDrainNamespace, + }, + Status: corev1.PodStatus{ + HostIP: hostIP, + }, + } + } + + Context("when the pod targeted for removal is already gone", func() { + It("treats a NotFound pod as already decommissioned", func() { + engine = newEngineWithPods() + drained, err := engine.drainScalingDownWorkers(context.TODO(), rt, 1, 2) + Expect(err).NotTo(HaveOccurred()) + Expect(drained).To(BeTrue()) + }) + }) + + Context("when the pod has not yet been assigned a host IP", func() { + It("returns not drained without error", func() { + engine = newEngineWithPods(workerPod(1, "")) + drained, err := engine.drainScalingDownWorkers(context.TODO(), rt, 1, 2) + Expect(err).NotTo(HaveOccurred()) + Expect(drained).To(BeFalse()) + }) + }) + + Context("when the decommission call fails", func() { + It("propagates the error", func() { + engine = newEngineWithPods(workerPod(1, "10.0.0.1")) + patch := gomonkey.ApplyFunc(operations.AlluxioFileUtils.DecommissionWorkers, + func(_ operations.AlluxioFileUtils, _ []string) error { + return errors.New("decommission failed") + }) + defer patch.Reset() + + drained, err := engine.drainScalingDownWorkers(context.TODO(), rt, 1, 2) + Expect(err).To(HaveOccurred()) + Expect(drained).To(BeFalse()) + }) + }) + + Context("when active workers are still above the desired count", func() { + It("returns not drained and requests a retry", func() { + engine = newEngineWithPods(workerPod(1, "10.0.0.1")) + patch1 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.DecommissionWorkers, + func(_ operations.AlluxioFileUtils, _ []string) error { + return nil + }) + defer patch1.Reset() + patch2 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.CountActiveWorkers, + func(_ operations.AlluxioFileUtils) (int, error) { + return 2, nil + }) + defer patch2.Reset() + + drained, err := engine.drainScalingDownWorkers(context.TODO(), rt, 1, 2) + Expect(err).NotTo(HaveOccurred()) + Expect(drained).To(BeFalse()) + }) + }) + + Context("when the worker has successfully drained", func() { + It("returns drained with no error", func() { + engine = newEngineWithPods(workerPod(1, "10.0.0.1")) + patch1 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.DecommissionWorkers, + func(_ operations.AlluxioFileUtils, _ []string) error { + return nil + }) + defer patch1.Reset() + patch2 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.CountActiveWorkers, + func(_ operations.AlluxioFileUtils) (int, error) { + return 1, nil + }) + defer patch2.Reset() + + drained, err := engine.drainScalingDownWorkers(context.TODO(), rt, 1, 2) + Expect(err).NotTo(HaveOccurred()) + Expect(drained).To(BeTrue()) + }) + }) + + Context("when multiple targeted pods share the same node", func() { + It("deduplicates the decommission address list", func() { + engine = newEngineWithPods(workerPod(1, "10.0.0.1"), workerPod(2, "10.0.0.1")) + + var capturedAddrs []string + patch1 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.DecommissionWorkers, + func(_ operations.AlluxioFileUtils, addrs []string) error { + capturedAddrs = append([]string(nil), addrs...) + return nil + }) + defer patch1.Reset() + patch2 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.CountActiveWorkers, + func(_ operations.AlluxioFileUtils) (int, error) { + return 1, nil + }) + defer patch2.Reset() + + drained, err := engine.drainScalingDownWorkers(context.TODO(), rt, 1, 3) + Expect(err).NotTo(HaveOccurred()) + Expect(drained).To(BeTrue()) + Expect(capturedAddrs).To(HaveLen(1)) + }) + }) +}) + +var _ = Describe("AlluxioEngine getWorkerRPCPort", Label("pkg.ddc.alluxio.replicas_drain_test.go"), func() { + var engine *AlluxioEngine + + BeforeEach(func() { + engine = newAlluxioEngineREP(fake.NewFakeClientWithScheme(testScheme), testDrainWorkerSts, testDrainNamespace) + }) + + It("returns the configured rpc port when set", func() { + rt := &v1alpha1.AlluxioRuntime{ + Spec: v1alpha1.AlluxioRuntimeSpec{ + Worker: v1alpha1.AlluxioCompTemplateSpec{ + Ports: map[string]int{"rpc": 12345}, + }, + }, + } + Expect(engine.getWorkerRPCPort(rt)).To(Equal(12345)) + }) + + It("falls back to the default port when unset", func() { + rt := &v1alpha1.AlluxioRuntime{} + Expect(engine.getWorkerRPCPort(rt)).To(Equal(defaultWorkerRPCPort)) + }) + + It("falls back to the default port when the configured value is not positive", func() { + rt := &v1alpha1.AlluxioRuntime{ + Spec: v1alpha1.AlluxioRuntimeSpec{ + Worker: v1alpha1.AlluxioCompTemplateSpec{ + Ports: map[string]int{"rpc": 0}, + }, + }, + } + Expect(engine.getWorkerRPCPort(rt)).To(Equal(defaultWorkerRPCPort)) + }) +}) + +var _ = Describe("AlluxioEngine SyncReplicas worker decommission deadline", Label("pkg.ddc.alluxio.replicas_drain_test.go"), func() { + const ( + deadlineTestRuntime = "deadline-worker" + deadlineTestNs = "fluid" + ) + + newFixtures := func(existingCond *v1alpha1.RuntimeCondition) *AlluxioEngine { + rt := &v1alpha1.AlluxioRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: deadlineTestRuntime, Namespace: deadlineTestNs}, + Spec: v1alpha1.AlluxioRuntimeSpec{Replicas: 1}, + Status: v1alpha1.RuntimeStatus{DesiredWorkerNumberScheduled: 2}, + } + if existingCond != nil { + rt.Status.Conditions = []v1alpha1.RuntimeCondition{*existingCond} + } + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: deadlineTestRuntime + "-worker", Namespace: deadlineTestNs}, + Spec: appsv1.StatefulSetSpec{Replicas: ptr.To[int32](2)}, + Status: appsv1.StatefulSetStatus{Replicas: 2}, + } + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: deadlineTestRuntime + "-worker-1", Namespace: deadlineTestNs}, + Status: corev1.PodStatus{HostIP: "10.0.0.5"}, + } + // BuildWorkersAffinity (invoked when Helper.SyncReplicas updates the + // StatefulSet's replica count) requires the Dataset to exist. + dataset := &v1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: deadlineTestRuntime, Namespace: deadlineTestNs}, + } + fakeClient := fake.NewFakeClientWithScheme(testScheme, rt, sts, pod, dataset) + return newAlluxioEngineREP(fakeClient, deadlineTestRuntime, deadlineTestNs) + } + + getCondition := func(engine *AlluxioEngine) *v1alpha1.RuntimeCondition { + rt, err := engine.getRuntime() + Expect(err).NotTo(HaveOccurred()) + _, cond := utils.GetRuntimeCondition(rt.Status.Conditions, v1alpha1.RuntimeWorkerDecommissioning) + return cond + } + + BeforeEach(func() { + Expect(utilfeature.DefaultMutableFeatureGate.Set(string(features.GracefulWorkerScaleDown) + "=true")).To(Succeed()) + }) + + AfterEach(func() { + Expect(utilfeature.DefaultMutableFeatureGate.Set(string(features.GracefulWorkerScaleDown) + "=false")).To(Succeed()) + }) + + Context("when a drain doesn't finish within one reconcile", func() { + It("records when the decommission attempt started and keeps requeuing", func() { + engine := newFixtures(nil) + patch1 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.DecommissionWorkers, + func(_ operations.AlluxioFileUtils, _ []string) error { return nil }) + defer patch1.Reset() + patch2 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.CountActiveWorkers, + func(_ operations.AlluxioFileUtils) (int, error) { return 2, nil }) + defer patch2.Reset() + + err := engine.SyncReplicas(cruntime.ReconcileRequestContext{ + Log: fake.NullLogger(), Recorder: record.NewFakeRecorder(300), + }) + Expect(errors.Is(err, errWorkersNotYetDrained)).To(BeTrue()) + + cond := getCondition(engine) + Expect(cond).NotTo(BeNil()) + Expect(cond.Status).To(Equal(corev1.ConditionTrue)) + Expect(time.Since(cond.LastTransitionTime.Time)).To(BeNumerically("<", time.Minute)) + }) + }) + + Context("when a drain is still stuck past the deadline", func() { + It("forces the scale-down to proceed and clears the marker", func() { + staleCond := utils.NewRuntimeCondition(v1alpha1.RuntimeWorkerDecommissioning, + v1alpha1.RuntimeWorkerDecommissioningReason, "started earlier", corev1.ConditionTrue) + staleCond.LastTransitionTime = metav1.NewTime(time.Now().Add(-defaultWorkerDecommissionDeadline - time.Minute)) + engine := newFixtures(&staleCond) + + patch1 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.DecommissionWorkers, + func(_ operations.AlluxioFileUtils, _ []string) error { return nil }) + defer patch1.Reset() + patch2 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.CountActiveWorkers, + func(_ operations.AlluxioFileUtils) (int, error) { return 2, nil }) + defer patch2.Reset() + + err := engine.SyncReplicas(cruntime.ReconcileRequestContext{ + Log: fake.NullLogger(), Recorder: record.NewFakeRecorder(300), + }) + Expect(err).NotTo(HaveOccurred()) + + cond := getCondition(engine) + Expect(cond).NotTo(BeNil()) + Expect(cond.Status).To(Equal(corev1.ConditionFalse)) + + var sts appsv1.StatefulSet + Expect(engine.Client.Get(context.TODO(), + types.NamespacedName{Name: deadlineTestRuntime + "-worker", Namespace: deadlineTestNs}, &sts)).To(Succeed()) + Expect(*sts.Spec.Replicas).To(Equal(int32(1))) + }) + }) +}) diff --git a/pkg/features/features.go b/pkg/features/features.go new file mode 100644 index 00000000000..329b938d709 --- /dev/null +++ b/pkg/features/features.go @@ -0,0 +1,46 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package features + +import ( + utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/component-base/featuregate" +) + +const ( + // GracefulWorkerScaleDown gates graceful worker scale-down for AlluxioRuntime + // on a standard Kubernetes StatefulSet. + // + // When enabled, workers targeted for removal are decommissioned from the + // Alluxio cluster before the pod is terminated, giving the master time to + // migrate their cached blocks to the surviving workers. Without this gate, + // cached data held on removed workers is lost immediately on scale-in. + // + // This only supports the standard StatefulSet scale-down order (highest + // ordinal first); it does not yet integrate with OpenKruise's Advanced + // StatefulSet for selective deletion of non-highest-ordinal pods. + GracefulWorkerScaleDown featuregate.Feature = "GracefulWorkerScaleDown" +) + +var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ + GracefulWorkerScaleDown: {Default: false, PreRelease: featuregate.Alpha}, +} + +func init() { + runtime.Must(utilfeature.DefaultMutableFeatureGate.Add(defaultFeatureGates)) +} diff --git a/test/gha-e2e/alluxio-scaledown/dataset.yaml b/test/gha-e2e/alluxio-scaledown/dataset.yaml new file mode 100644 index 00000000000..06b4fb7fce4 --- /dev/null +++ b/test/gha-e2e/alluxio-scaledown/dataset.yaml @@ -0,0 +1,37 @@ +apiVersion: data.fluid.io/v1alpha1 +kind: Dataset +metadata: + name: scaledown-demo +spec: + mounts: + - mountPoint: s3://scaledown/ + name: scaledown + options: + alluxio.underfs.s3.endpoint: "http://scaledown-minio:9000" + alluxio.underfs.s3.disable.dns.buckets: "true" + alluxio.underfs.s3.inherit.acl: "false" + encryptOptions: + - name: aws.accessKeyId + valueFrom: + secretKeyRef: + name: scaledown-minio-secret + key: aws.accessKeyId + - name: aws.secretKey + valueFrom: + secretKeyRef: + name: scaledown-minio-secret + key: aws.secretKey +--- +apiVersion: data.fluid.io/v1alpha1 +kind: AlluxioRuntime +metadata: + name: scaledown-demo +spec: + replicas: 2 + tieredstore: + levels: + - mediumtype: SSD + path: /tmp/alluxio + quota: 1Gi + high: "0.95" + low: "0.7" diff --git a/test/gha-e2e/alluxio-scaledown/minio.yaml b/test/gha-e2e/alluxio-scaledown/minio.yaml new file mode 100644 index 00000000000..b55f05458a4 --- /dev/null +++ b/test/gha-e2e/alluxio-scaledown/minio.yaml @@ -0,0 +1,54 @@ +apiVersion: v1 +kind: Secret +metadata: + name: scaledown-minio-secret +stringData: + aws.accessKeyId: minioadmin + aws.secretKey: minioadmin +--- +apiVersion: v1 +kind: Service +metadata: + name: scaledown-minio +spec: + type: ClusterIP + ports: + - port: 9000 + targetPort: 9000 + protocol: TCP + selector: + app: scaledown-minio +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: scaledown-minio +spec: + selector: + matchLabels: + app: scaledown-minio + strategy: + type: Recreate + template: + metadata: + labels: + app: scaledown-minio + spec: + containers: + - name: minio + image: minio/minio + imagePullPolicy: IfNotPresent + resources: + limits: + memory: "512Mi" + args: + - server + - /data + env: + - name: MINIO_ROOT_USER + value: "minioadmin" + - name: MINIO_ROOT_PASSWORD + value: "minioadmin" + ports: + - containerPort: 9000 + automountServiceAccountToken: false diff --git a/test/gha-e2e/alluxio-scaledown/minio_create_bucket.yaml b/test/gha-e2e/alluxio-scaledown/minio_create_bucket.yaml new file mode 100644 index 00000000000..2b799b7ea89 --- /dev/null +++ b/test/gha-e2e/alluxio-scaledown/minio_create_bucket.yaml @@ -0,0 +1,26 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: scaledown-minio-bucket-create +spec: + template: + spec: + containers: + - name: mc + image: minio/mc + imagePullPolicy: IfNotPresent + resources: + limits: + memory: "512Mi" + command: + - /bin/sh + - -c + - "mc alias set myminio http://scaledown-minio:9000 $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD && mc mb myminio/scaledown && printf '%s' 'fluid-alluxio-scaledown-e2e-fixture' | mc pipe myminio/scaledown/fixture.txt" + env: + - name: MINIO_ROOT_USER + value: "minioadmin" + - name: MINIO_ROOT_PASSWORD + value: "minioadmin" + automountServiceAccountToken: false + restartPolicy: OnFailure + backoffLimit: 4 diff --git a/test/gha-e2e/alluxio-scaledown/read_after_job.yaml b/test/gha-e2e/alluxio-scaledown/read_after_job.yaml new file mode 100644 index 00000000000..18623e237b9 --- /dev/null +++ b/test/gha-e2e/alluxio-scaledown/read_after_job.yaml @@ -0,0 +1,31 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: read-after-scaledown +spec: + backoffLimit: 1 + template: + spec: + restartPolicy: Never + containers: + - name: busybox + image: busybox + resources: + limits: + memory: "512Mi" + ephemeral-storage: "5Gi" + command: ["/bin/sh"] + args: + - -c + - | + set -ex + content=$(cat /data/scaledown/fixture.txt) + [ "$content" = "fluid-alluxio-scaledown-e2e-fixture" ] + volumeMounts: + - mountPath: /data + name: fluid-vol + automountServiceAccountToken: false + volumes: + - name: fluid-vol + persistentVolumeClaim: + claimName: scaledown-demo diff --git a/test/gha-e2e/alluxio-scaledown/read_before_job.yaml b/test/gha-e2e/alluxio-scaledown/read_before_job.yaml new file mode 100644 index 00000000000..09f454ef8ca --- /dev/null +++ b/test/gha-e2e/alluxio-scaledown/read_before_job.yaml @@ -0,0 +1,31 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: read-before-scaledown +spec: + backoffLimit: 1 + template: + spec: + restartPolicy: Never + containers: + - name: busybox + image: busybox + resources: + limits: + memory: "512Mi" + ephemeral-storage: "5Gi" + command: ["/bin/sh"] + args: + - -c + - | + set -ex + content=$(cat /data/scaledown/fixture.txt) + [ "$content" = "fluid-alluxio-scaledown-e2e-fixture" ] + volumeMounts: + - mountPath: /data + name: fluid-vol + automountServiceAccountToken: false + volumes: + - name: fluid-vol + persistentVolumeClaim: + claimName: scaledown-demo diff --git a/test/gha-e2e/alluxio-scaledown/test.sh b/test/gha-e2e/alluxio-scaledown/test.sh new file mode 100644 index 00000000000..4958e84a097 --- /dev/null +++ b/test/gha-e2e/alluxio-scaledown/test.sh @@ -0,0 +1,198 @@ +#!/bin/bash + +testname="alluxioruntime graceful scale-down e2e" + +dataset_name="scaledown-demo" +worker_sts_name="scaledown-demo-worker" +controller_deployment="alluxioruntime-controller" +controller_namespace="fluid-system" +read_before_job_name="read-before-scaledown" +read_after_job_name="read-after-scaledown" +bucket_create_job_name="scaledown-minio-bucket-create" + +function syslog() { + echo ">>> $1" +} + +function panic() { + local err_msg=$1 + syslog "test \"$testname\" failed: $err_msg" + exit 1 +} + +# GracefulWorkerScaleDown is Alpha and disabled by default; the controller +# binary has no Helm value wired up for it yet, so enable it directly on the +# running deployment for this scenario. Nothing else patches a --feature-gates +# arg onto this deployment today, so the substring check below is safe; if +# that ever changes, switch to matching the exact GracefulWorkerScaleDown=true +# token so this doesn't end up appending a second --feature-gates arg. +function enable_graceful_scale_down() { + if kubectl get deployment "$controller_deployment" -n "$controller_namespace" \ + -ojsonpath='{.spec.template.spec.containers[0].args}' | grep -q "feature-gates=GracefulWorkerScaleDown=true"; then + syslog "GracefulWorkerScaleDown feature gate already enabled" + return + fi + + kubectl patch deployment "$controller_deployment" -n "$controller_namespace" --type=json \ + -p '[{"op":"add","path":"/spec/template/spec/containers/0/args/-","value":"--feature-gates=GracefulWorkerScaleDown=true"}]' \ + || panic "failed to patch $controller_deployment to enable GracefulWorkerScaleDown" + + kubectl rollout status deployment/"$controller_deployment" -n "$controller_namespace" --timeout=120s \ + || panic "alluxioruntime-controller did not roll out after enabling the feature gate" + + syslog "Enabled GracefulWorkerScaleDown feature gate on $controller_deployment" +} + +function setup_minio() { + kubectl create -f test/gha-e2e/alluxio-scaledown/minio.yaml + kubectl create -f test/gha-e2e/alluxio-scaledown/minio_create_bucket.yaml + wait_job_completed "$bucket_create_job_name" +} + +function create_dataset() { + kubectl create -f test/gha-e2e/alluxio-scaledown/dataset.yaml + + if [[ -z "$(kubectl get dataset $dataset_name -oname)" ]]; then + panic "failed to create dataset $dataset_name" + fi + + if [[ -z "$(kubectl get alluxioruntime $dataset_name -oname)" ]]; then + panic "failed to create alluxioruntime $dataset_name" + fi +} + +function wait_dataset_bound() { + local deadline=300 # 5 minutes + local last_state="" + local counter=0 + while true; do + last_state=$(kubectl get dataset $dataset_name -ojsonpath='{@.status.phase}') + if [[ "$last_state" == "Bound" ]]; then + break + fi + + if [[ $((counter % 3)) -eq 0 ]]; then + syslog "checking dataset.status.phase==Bound (already $((counter * 5))s, last state: $last_state)" + fi + + counter=$((counter + 1)) + if [[ $((counter * 5)) -ge $deadline ]]; then + panic "timeout for ${deadline}s!" + fi + sleep 5 + done + syslog "Found dataset $dataset_name status.phase==Bound" +} + +function wait_worker_replicas() { + local expected=$1 + # Generous enough to cover both a normal drain and the + # defaultWorkerDecommissionDeadline (10m) forced-proceed fallback. + local deadline=900 + local spec_replicas="" + local status_replicas="" + local decommission_condition="" + local counter=0 + while true; do + spec_replicas=$(kubectl get statefulset "$worker_sts_name" -ojsonpath='{@.spec.replicas}' 2>/dev/null) + status_replicas=$(kubectl get statefulset "$worker_sts_name" -ojsonpath='{@.status.replicas}' 2>/dev/null) + + if [[ "$spec_replicas" == "$expected" ]] && [[ "$status_replicas" == "$expected" ]]; then + break + fi + + if [[ $((counter % 6)) -eq 0 ]]; then + decommission_condition=$(kubectl get alluxioruntime "$dataset_name" \ + -ojsonpath='{.status.conditions[?(@.type=="WorkerDecommissioning")]}' 2>/dev/null) + syslog "waiting for $worker_sts_name to reach $expected replicas (already $((counter * 5))s, spec=$spec_replicas status=$status_replicas, decommissionCondition=$decommission_condition)" + fi + + counter=$((counter + 1)) + if [[ $((counter * 5)) -ge $deadline ]]; then + panic "timeout ${deadline}s waiting for $worker_sts_name to reach $expected replicas" + fi + sleep 5 + done + syslog "$worker_sts_name reached $expected replicas" +} + +function scale_down() { + kubectl patch alluxioruntime "$dataset_name" --type=merge -p '{"spec":{"replicas":1}}' \ + || panic "failed to patch alluxioruntime $dataset_name to replicas=1" + syslog "Requested scale-down of $dataset_name to 1 replica" +} + +function create_job() { + local job_file=$1 + local job_name=$2 + kubectl create -f "$job_file" + + if [[ -z "$(kubectl get job "$job_name" -oname)" ]]; then + panic "failed to create job $job_name" + fi +} + +function wait_job_completed() { + local job_name=$1 + local deadline=300 + local counter=0 + local succeed="" + local job_failed="" + while true; do + succeed=$(kubectl get job "$job_name" -ojsonpath='{@.status.succeeded}') + [[ -z "$succeed" ]] && succeed=0 + + if [[ "$succeed" -ge "1" ]]; then + break + fi + + job_failed=$(kubectl get job "$job_name" \ + -ojsonpath='{.status.conditions[?(@.type=="Failed")].status}' 2>/dev/null || true) + if [[ "$job_failed" == "True" ]]; then + panic "job $job_name failed when accessing data (all retries exhausted)" + fi + + counter=$((counter + 1)) + if [[ $((counter * 5)) -ge $deadline ]]; then + panic "timeout ${deadline}s waiting for job $job_name to complete" + fi + sleep 5 + done + syslog "Found succeeded job $job_name" +} + +function dump_env_and_clean_up() { + bash tools/diagnose-fluid-alluxio.sh collect --name $dataset_name --namespace default --collect-path ./e2e-tmp/testcase-alluxio-scaledown.tgz + syslog "Cleaning up resources for testcase $testname" + kubectl delete --ignore-not-found -f test/gha-e2e/alluxio-scaledown/read_after_job.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/alluxio-scaledown/read_before_job.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/alluxio-scaledown/dataset.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/alluxio-scaledown/minio_create_bucket.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/alluxio-scaledown/minio.yaml +} + +# Note: the GHA Kind cluster is single-node, so both worker pods land on the +# same node and report the same HostIP. drainScalingDownWorkers in +# pkg/ddc/alluxio/replicas.go dedupes decommission addresses by HostIP:port, +# so this test cannot prove a *specific* worker was targeted by address - only +# that the decommission flow runs, the StatefulSet converges, and data isn't +# lost. Per-address targeting on distinct hosts is covered by the gomonkey +# unit tests in pkg/ddc/alluxio/replicas_drain_test.go instead. +function main() { + syslog "[TESTCASE $testname STARTS AT $(date)]" + trap dump_env_and_clean_up EXIT + enable_graceful_scale_down + setup_minio + create_dataset + wait_dataset_bound + wait_worker_replicas 2 + create_job test/gha-e2e/alluxio-scaledown/read_before_job.yaml $read_before_job_name + wait_job_completed $read_before_job_name + scale_down + wait_worker_replicas 1 + create_job test/gha-e2e/alluxio-scaledown/read_after_job.yaml $read_after_job_name + wait_job_completed $read_after_job_name + syslog "[TESTCASE $testname SUCCEEDED AT $(date)]" +} + +main diff --git a/test/gha-e2e/curvine/read_job.yaml b/test/gha-e2e/curvine/read_job.yaml index e7d584f682f..bb96ee806e1 100644 --- a/test/gha-e2e/curvine/read_job.yaml +++ b/test/gha-e2e/curvine/read_job.yaml @@ -24,7 +24,14 @@ spec: command: ['sh'] args: - -c - - set -ex; test -n "$(cat /data/minio/bar)" + - | + for i in $(seq 1 12); do + content=$(cat /data/minio/bar 2>/dev/null) && [ -n "$content" ] && exit 0 + echo "Attempt $i: /data/minio/bar not readable yet, retrying in 5s..." + sleep 5 + done + echo "ERROR: /data/minio/bar is not readable after 60 seconds" + exit 1 volumeMounts: - name: data-vol mountPath: /data