Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/scripts/gha-e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@
bash test/gha-e2e/alluxio/test.sh
}

function alluxio_scaledown_e2e() {

Check warning on line 93 in .github/scripts/gha-e2e.sh

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add an explicit return statement at the end of the function.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZ79D8zckXMtfYKNdGnT&open=AZ79D8zckXMtfYKNdGnT&pullRequest=6061
set -e
bash test/gha-e2e/alluxio-scaledown/test.sh
}

function jindo_e2e() {
set -e
bash test/gha-e2e/jindo/test.sh
Expand All @@ -107,6 +112,7 @@

check_control_plane_status
alluxio_e2e
alluxio_scaledown_e2e
jindo_e2e
juicefs_e2e
curvine_e2e
4 changes: 4 additions & 0 deletions api/v1alpha1/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ const (
RuntimeFusesScaledIn RuntimeConditionType = "FusesScaledIn"
// RuntimeFusesScaledOut means the fuses of runtime just scaled out
RuntimeFusesScaledOut RuntimeConditionType = "FusesScaledOut"
// RuntimeWorkerDecommissioning means the runtime is draining workers ahead of a scale-down
RuntimeWorkerDecommissioning RuntimeConditionType = "WorkerDecommissioning"
)

const (
Expand All @@ -214,6 +216,8 @@ const (
RuntimeFusesScaledInReason = "Fuses scaled in"
// RuntimeFusesScaledInReason means the fuses of runtime just scaled out
RuntimeFusesScaledOutReason = "Fuses scaled out"
// RuntimeWorkerDecommissioningReason means workers are being decommissioned ahead of a scale-down
RuntimeWorkerDecommissioningReason = "Workers are being decommissioned"
)

// Condition describes the state of the cache at a certain point.
Expand Down
2 changes: 2 additions & 0 deletions cmd/alluxio/app/alluxio.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator"
"github.com/fluid-cloudnative/fluid/pkg/utils"
utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature"
Comment thread
jakharmonika364 marked this conversation as resolved.
"github.com/spf13/cobra"
zapOpt "go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -95,6 +96,7 @@ func init() {
alluxioCmd.Flags().StringVar(&controllerWorkqueueMaxSyncBackoffStr, "workqueue-max-sync-backoff", "1000s", "max backoff period for failed reconciliation in controller's workqueue")
alluxioCmd.Flags().IntVar(&controllerWorkqueueQPS, "workqueue-qps", 10, "qps limit value for controller's workqueue")
alluxioCmd.Flags().IntVar(&controllerWorkqueueBurst, "workqueue-burst", 100, "burst limit value for controller's workqueue")
utilfeature.DefaultMutableFeatureGate.AddFlag(alluxioCmd.Flags())
}

func handle() {
Expand Down
12 changes: 12 additions & 0 deletions pkg/ddc/alluxio/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package alluxio

import "time"

const (
// NON_NATIVE_MOUNT_DATA_NAME also used in master 'statefulset.yaml' and config 'alluxio-mount.conf.yaml'
NON_NATIVE_MOUNT_DATA_NAME = "mount.info"
Expand Down Expand Up @@ -57,6 +59,16 @@ const (
defaultGracefulShutdownLimits int32 = 3
defaultCleanCacheGracePeriodSeconds int32 = 60

// defaultWorkerRPCPort is the Alluxio worker Thrift RPC port used when the
// runtime spec does not override alluxio.worker.rpc.port.
defaultWorkerRPCPort = 29999

MountConfigStorage = "ALLUXIO_MOUNT_CONFIG_STORAGE"
ConfigmapStorageName = "configmap"

// defaultWorkerDecommissionDeadline bounds how long the engine keeps
// retrying a stuck worker drain (e.g. an unhealthy master, unreplicable
// blocks) before forcing the scale-down to proceed anyway, rather than
// stalling on every reconcile indefinitely.
defaultWorkerDecommissionDeadline = 10 * time.Minute
)
86 changes: 86 additions & 0 deletions pkg/ddc/alluxio/operations/decommission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2026 The Fluid Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operations

import "strings"

// DecommissionWorkers signals the Alluxio master to decommission the given
// workers. Each address must be in "<host>:<rpcPort>" form.
// The call is idempotent: re-issuing it against an already-decommissioned
// worker is safe.
//
// Requires Alluxio >= 2.9, where "fsadmin decommissionWorker" was introduced;
// against older masters this subcommand does not exist.
func (a AlluxioFileUtils) DecommissionWorkers(addresses []string) error {
if len(addresses) == 0 {
return nil
}
command := []string{
"alluxio", "fsadmin", "decommissionWorker",
"--addresses", strings.Join(addresses, ","),
}
_, _, err := a.exec(command, false)
if err != nil {
a.log.Error(err, "AlluxioFileUtils.DecommissionWorkers() failed", "addresses", addresses)
}
return err
}

// CountActiveWorkers returns the number of live workers according to
// "alluxio fsadmin report capacity -live". The "-live" flag is what makes
// this safe to compare against immediately after a decommission: it asks the
// master for currently live workers rather than every worker it still has a
// record of, so a worker that was just decommissioned doesn't linger in the
// count until its heartbeat times out.
func (a AlluxioFileUtils) CountActiveWorkers() (int, error) {
report, _, err := a.exec([]string{"alluxio", "fsadmin", "report", "capacity", "-live"}, false)
if err != nil {
a.log.Error(err, "AlluxioFileUtils.CountActiveWorkers() failed")
return 0, err
}
return parseActiveWorkerCount(report), nil
}

// parseActiveWorkerCount counts workers in the capacity report produced by
// "alluxio fsadmin report capacity". Worker entries begin at the non-indented
// line after the "Worker Name" header; the indented line that follows each
// entry contains the used-capacity detail.
//
// Worker Name Last Heartbeat Storage MEM
// 192.168.1.147 0 capacity 2048.00MB <- worker entry
// used 443.89MB (21%) <- detail, indented
// 192.168.1.146 0 capacity 2048.00MB <- worker entry
// used 0B (0%)
func parseActiveWorkerCount(report string) int {
inWorkerSection := false
count := 0
for _, line := range strings.Split(report, "\n") {
if strings.HasPrefix(line, "Worker Name") {
inWorkerSection = true
continue
}
if !inWorkerSection || strings.TrimSpace(line) == "" {
continue
}
// Non-indented lines are new worker entries; indented lines are
// the used-capacity continuation for the previous entry.
if line[0] != ' ' && line[0] != '\t' {
count++
}
}
return count
}
227 changes: 227 additions & 0 deletions pkg/ddc/alluxio/operations/decommission_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
Copyright 2026 The Fluid Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operations

import (
"errors"
"testing"

"github.com/agiledragon/gomonkey/v2"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
)

func TestAlluxioFileUtils_DecommissionWorkers(t *testing.T) {

Check failure on line 27 in pkg/ddc/alluxio/operations/decommission_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 24 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZ7-T3p5K0Fs6AeJ8pp5&open=AZ7-T3p5K0Fs6AeJ8pp5&pullRequest=6061
a := &AlluxioFileUtils{log: fake.NullLogger()}

t.Run("empty address list is a no-op", func(t *testing.T) {
if err := a.DecommissionWorkers(nil); err != nil {
t.Fatalf("want nil, got: %v", err)
}
if err := a.DecommissionWorkers([]string{}); err != nil {
t.Fatalf("want nil, got: %v", err)
}
})

t.Run("exec error is propagated", func(t *testing.T) {
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) {
return "", "", errors.New("exec failed")
})
defer patches.Reset()

if err := a.DecommissionWorkers([]string{"192.168.1.1:29999"}); err == nil {
t.Error("want error, got nil")
}
})

t.Run("address is forwarded to the alluxio CLI", func(t *testing.T) {
var capturedCmd []string
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) {
capturedCmd = cmd
return "", "", nil
})
defer patches.Reset()

addr := "192.168.1.1:29999"
if err := a.DecommissionWorkers([]string{addr}); err != nil {
t.Fatalf("want nil, got: %v", err)
}
found := false
for _, arg := range capturedCmd {
if arg == addr {
found = true
break
}
}
if !found {
t.Errorf("address %q not found in command: %v", addr, capturedCmd)
}
})

t.Run("multiple addresses are joined with commas", func(t *testing.T) {
var capturedCmd []string
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) {
capturedCmd = cmd
return "", "", nil
})
defer patches.Reset()

if err := a.DecommissionWorkers([]string{"10.0.0.1:29999", "10.0.0.2:29999"}); err != nil {
t.Fatalf("want nil, got: %v", err)
}
found := false
for _, arg := range capturedCmd {
if arg == "10.0.0.1:29999,10.0.0.2:29999" {
found = true
break
}
}
if !found {
t.Errorf("joined addresses not found in command: %v", capturedCmd)
}
})
}

func TestAlluxioFileUtils_CountActiveWorkers(t *testing.T) {

Check failure on line 101 in pkg/ddc/alluxio/operations/decommission_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 17 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZ7-T3p5K0Fs6AeJ8pp6&open=AZ7-T3p5K0Fs6AeJ8pp6&pullRequest=6061
a := &AlluxioFileUtils{log: fake.NullLogger()}

t.Run("exec error returns zero and the error", func(t *testing.T) {
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) {
return "", "", errors.New("exec failed")
})
defer patches.Reset()

count, err := a.CountActiveWorkers()
if err == nil {
t.Error("want error, got nil")
}
if count != 0 {
t.Errorf("want 0 on error, got %d", count)
}
})

t.Run("requests live workers only", func(t *testing.T) {
var capturedCmd []string
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) {
capturedCmd = cmd
return "", "", nil
})
defer patches.Reset()

if _, err := a.CountActiveWorkers(); err != nil {
t.Fatalf("want nil, got: %v", err)
}
found := false
for _, arg := range capturedCmd {
if arg == "-live" {
found = true
break
}
}
if !found {
t.Errorf("-live flag not found in command: %v", capturedCmd)
}
})

t.Run("two active workers", func(t *testing.T) {
report := `Capacity information for all workers:
Total Capacity: 4096.00MB
Used Capacity: 443.89MB

Worker Name Last Heartbeat Storage MEM
192.168.1.147 0 capacity 2048.00MB
used 443.89MB (21%)
192.168.1.146 0 capacity 2048.00MB
used 0B (0%)
`
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) {
return report, "", nil
})
defer patches.Reset()

count, err := a.CountActiveWorkers()
if err != nil {
t.Fatalf("want nil, got: %v", err)
}
if count != 2 {
t.Errorf("want 2, got %d", count)
}
})
}

func TestParseActiveWorkerCount(t *testing.T) {
cases := []struct {
name string
input string
expect int
}{
{
name: "empty report",
input: "",
expect: 0,
},
{
name: "no worker section header",
input: "Capacity information for all workers:\n Total Capacity: 0B\n",
expect: 0,
},
{
name: "single worker",
input: `Worker Name Last Heartbeat Storage MEM
192.168.1.1 0 capacity 1024.00MB
used 0B (0%)
`,
expect: 1,
},
{
name: "three workers",
input: `Worker Name Last Heartbeat Storage MEM
10.0.0.1 0 capacity 2048.00MB
used 100MB (5%)
10.0.0.2 0 capacity 2048.00MB
used 0B (0%)
10.0.0.3 0 capacity 2048.00MB
used 500MB (25%)
`,
expect: 3,
},
{
name: "trailing blank lines are ignored",
input: `Worker Name Last Heartbeat Storage MEM
10.0.0.1 0 capacity 1024.00MB
used 0B (0%)


`,
expect: 1,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := parseActiveWorkerCount(tc.input)
if got != tc.expect {
t.Errorf("want %d, got %d", tc.expect, got)
}
})
}
}
Loading
Loading