diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java index 94964df73a9..e5960f9e73f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java @@ -19,11 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -60,11 +56,9 @@ public class HealthyPipelineSafeModeRule extends SafeModeExitRule { private int healthyPipelineThresholdCount; private int currentHealthyPipelineCount = 0; private final double healthyPipelinesPercent; - private final Set processedPipelineIDs = new HashSet<>(); private final PipelineManager pipelineManager; private final int minHealthyPipelines; private final SCMContext scmContext; - private final Set unProcessedPipelineSet = new HashSet<>(); private final NodeManager nodeManager; HealthyPipelineSafeModeRule(EventQueue eventQueue, @@ -123,81 +117,62 @@ protected synchronized boolean validate() { LOG.info("All SCM pipelines are closed due to ongoing upgrade " + "finalization. Bypassing healthy pipeline safemode rule."); return true; + } + // Query PipelineManager directly for healthy pipeline count + List openPipelines = pipelineManager.getPipelines( + RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE), + Pipeline.PipelineState.OPEN); + + LOG.info("Found {} open RATIS/THREE pipelines", openPipelines.size()); + currentHealthyPipelineCount = (int) openPipelines.stream() + .filter(this::isPipelineHealthy) + .count(); + getSafeModeMetrics().setNumCurrentHealthyPipelines(currentHealthyPipelineCount); + boolean isValid = currentHealthyPipelineCount >= healthyPipelineThresholdCount; + if (scmInSafeMode()) { + LOG.info("SCM in safe mode. Healthy pipelines: {}, threshold: {}, rule satisfied: {}", + currentHealthyPipelineCount, healthyPipelineThresholdCount, isValid); } else { - return currentHealthyPipelineCount >= healthyPipelineThresholdCount; + LOG.debug("SCM not in safe mode. Healthy pipelines: {}, threshold: {}", + currentHealthyPipelineCount, healthyPipelineThresholdCount); } + return isValid; } - @Override - protected synchronized void process(Pipeline pipeline) { - Objects.requireNonNull(pipeline, "pipeline == null"); - - // When SCM is in safe mode for long time, already registered - // datanode can send pipeline report again, or SCMPipelineManager will - // create new pipelines. - - // Only handle RATIS + 3-replica pipelines. - if (pipeline.getType() != HddsProtos.ReplicationType.RATIS || - ((RatisReplicationConfig) pipeline.getReplicationConfig()).getReplicationFactor() != - HddsProtos.ReplicationFactor.THREE) { - Logger safeModeManagerLog = SCMSafeModeManager.getLogger(); - if (safeModeManagerLog.isDebugEnabled()) { - safeModeManagerLog.debug("Skipping pipeline safemode report processing as Replication type isn't RATIS " + - "or replication factor isn't 3."); - } - return; - } - - // Skip already processed ones. - if (processedPipelineIDs.contains(pipeline.getId())) { - LOG.info("Skipping pipeline safemode report processing check as pipeline: {} is already recorded.", - pipeline.getId()); - return; + private boolean isPipelineHealthy(Pipeline pipeline) { + // Verify pipeline has all 3 nodes + List nodes = pipeline.getNodes(); + if (nodes.size() != 3) { + LOG.info("Pipeline {} is not healthy: has {} nodes instead of 3", + pipeline.getId(), nodes.size()); + return false; } - - List pipelineDns = pipeline.getNodes(); - if (pipelineDns.size() != 3) { - LOG.warn("Only {} DNs reported this pipeline: {}, all 3 DNs should report the pipeline", pipelineDns.size(), - pipeline.getId()); - return; - } - - Map badDnsWithReasons = new LinkedHashMap<>(); - - for (DatanodeDetails dn : pipelineDns) { + + // Verify all nodes are healthy + for (DatanodeDetails dn : nodes) { try { NodeStatus status = nodeManager.getNodeStatus(dn); if (!status.equals(NodeStatus.inServiceHealthy())) { - String reason = String.format("Health: %s, Operational State: %s", - status.getHealth(), status.getOperationalState()); - badDnsWithReasons.put(dn, reason); + LOG.info("Pipeline {} is not healthy: DN {} has status - Health: {}, Operational State: {}", + pipeline.getId(), dn.getUuidString(), status.getHealth(), status.getOperationalState()); + return false; } } catch (NodeNotFoundException e) { - badDnsWithReasons.put(dn, "DN not registered with SCM"); + LOG.warn("Pipeline {} is not healthy: DN {} not found in node manager", + pipeline.getId(), dn.getUuidString()); + return false; } } + return true; + } - if (!badDnsWithReasons.isEmpty()) { - String badDnSummary = badDnsWithReasons.entrySet().stream() - .map(entry -> String.format("DN %s: %s", entry.getKey().getID(), entry.getValue())) - .collect(Collectors.joining("; ")); - LOG.warn("Below DNs reported by Pipeline: {} are either in bad health or un-registered with SCMs. Details: {}", - pipeline.getId(), badDnSummary); - return; - } - - getSafeModeMetrics().incCurrentHealthyPipelinesCount(); - currentHealthyPipelineCount++; - processedPipelineIDs.add(pipeline.getId()); - unProcessedPipelineSet.remove(pipeline.getId()); - - if (scmInSafeMode()) { - SCMSafeModeManager.getLogger().info( - "SCM in safe mode. Healthy pipelines reported count is {}, " + - "required healthy pipeline reported count is {}", - currentHealthyPipelineCount, getHealthyPipelineThresholdCount()); - - } + @Override + protected synchronized void process(Pipeline pipeline) { + // No longer processing events, validation is done directly via validate() + // This method is still called when OPEN_PIPELINE events arrive, but we + // validate in validate() method instead + LOG.debug("Received OPEN_PIPELINE event for pipeline {}, validation will happen in validate() method", + pipeline.getId()); } @Override @@ -212,13 +187,12 @@ public synchronized void refresh(boolean forceRefresh) { } private synchronized void initializeRule(boolean refresh) { - unProcessedPipelineSet.addAll(pipelineManager.getPipelines( - RatisReplicationConfig.getInstance( - HddsProtos.ReplicationFactor.THREE), - Pipeline.PipelineState.OPEN).stream().map(Pipeline::getId) - .collect(Collectors.toSet())); - - int pipelineCount = unProcessedPipelineSet.size(); + // Get current open pipeline count from PipelineManager + List openPipelines = pipelineManager.getPipelines( + RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE), + Pipeline.PipelineState.OPEN); + + int pipelineCount = openPipelines.size(); healthyPipelineThresholdCount = Math.max(minHealthyPipelines, (int) Math.ceil(healthyPipelinesPercent * pipelineCount)); @@ -239,8 +213,7 @@ private synchronized void initializeRule(boolean refresh) { @Override protected synchronized void cleanup() { - processedPipelineIDs.clear(); - unProcessedPipelineSet.clear(); + // No tracking state to clean up since we query PipelineManager directly } @VisibleForTesting @@ -265,13 +238,20 @@ public String getStatusText() { private synchronized String updateStatusTextWithSamplePipelines( String status) { - Set samplePipelines = - unProcessedPipelineSet.stream().limit(SAMPLE_PIPELINE_DISPLAY_LIMIT) - .collect(Collectors.toSet()); - - if (!samplePipelines.isEmpty()) { + // Get sample pipelines that don't satisfy the healthy criteria + List openPipelines = pipelineManager.getPipelines( + RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE), + Pipeline.PipelineState.OPEN); + + Set unhealthyPipelines = openPipelines.stream() + .filter(p -> !isPipelineHealthy(p)) + .map(Pipeline::getId) + .limit(SAMPLE_PIPELINE_DISPLAY_LIMIT) + .collect(Collectors.toSet()); + + if (!unhealthyPipelines.isEmpty()) { String samplePipelineText = - "Sample pipelines not satisfying the criteria : " + samplePipelines; + "Sample pipelines not satisfying the criteria : " + unhealthyPipelines; status = status.concat("\n").concat(samplePipelineText); } return status; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java index ae65eafcb91..8da077c3925 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java @@ -47,7 +47,7 @@ public class SafeModeMetrics { // Pipeline metrics for safemode private @Metric MutableGaugeLong numHealthyPipelinesThreshold; - private @Metric MutableCounterLong currentHealthyPipelinesCount; + private @Metric MutableGaugeLong currentHealthyPipelinesCount; private @Metric MutableGaugeLong numPipelinesWithAtleastOneReplicaReportedThreshold; private @Metric MutableCounterLong @@ -68,8 +68,8 @@ public void setNumHealthyPipelinesThreshold(long val) { this.numHealthyPipelinesThreshold.set(val); } - public void incCurrentHealthyPipelinesCount() { - this.currentHealthyPipelinesCount.incr(); + public void setNumCurrentHealthyPipelines(long val) { + this.currentHealthyPipelinesCount.set(val); } public void setNumPipelinesWithAtleastOneReplicaReportedThreshold(long val) { @@ -117,7 +117,7 @@ MutableGaugeLong getNumHealthyPipelinesThreshold() { return numHealthyPipelinesThreshold; } - MutableCounterLong getCurrentHealthyPipelinesCount() { + MutableGaugeLong getCurrentHealthyPipelinesCount() { return currentHealthyPipelinesCount; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java index ec97e4df6a9..bc3d2bae507 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java @@ -181,9 +181,6 @@ public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception { HealthyPipelineSafeModeRule healthyPipelineSafeModeRule = SafeModeRuleFactory.getInstance() .getSafeModeRule(HealthyPipelineSafeModeRule.class); - // No datanodes have sent pipelinereport from datanode - assertFalse(healthyPipelineSafeModeRule.validate()); - // Fire pipeline report from all datanodes in first pipeline, as here we // have 3 pipelines, 10% is 0.3, when doing ceil it is 1. So, we should // validate should return true after fire pipeline event @@ -274,18 +271,7 @@ public void testHealthyPipelineSafeModeRuleWithMixedPipelines() HealthyPipelineSafeModeRule healthyPipelineSafeModeRule = SafeModeRuleFactory.getInstance() .getSafeModeRule(HealthyPipelineSafeModeRule.class); - // No pipeline event have sent to SCMSafemodeManager - assertFalse(healthyPipelineSafeModeRule.validate()); - - // fire event with pipeline create status with ratis type and factor 1 - // pipeline, validate() should return false - firePipelineEvent(pipeline1, eventQueue); - - assertFalse(healthyPipelineSafeModeRule.validate()); - - firePipelineEvent(pipeline2, eventQueue); - firePipelineEvent(pipeline3, eventQueue); - + //No need of pipeline events. GenericTestUtils.waitFor(() -> healthyPipelineSafeModeRule.validate(), 1000, 5000); @@ -357,7 +343,7 @@ public void testPipelineIgnoredWhenDnIsUnhealthy() throws Exception { // Wait for log message indicating the pipeline's DN is in bad health. GenericTestUtils.waitFor( - () -> logCapturer.getOutput().contains("are either in bad health or un-registered with SCMs"), + () -> logCapturer.getOutput().contains("is not healthy"), 100, 5000); // Ensure the rule is NOT satisfied due to unhealthy DN diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java index c05c66b3cc1..09a478ce93b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java @@ -318,10 +318,10 @@ public void testSafeModeExitRuleWithPipelineAvailabilityCheck( assertTrue(scmSafeModeManager.getInSafeMode()); assertEquals(1, scmSafeModeManager.getSafeModeMetrics().getScmInSafeMode().value()); - if (healthyPipelinePercent > 0) { - validateRuleStatus("HealthyPipelineSafeModeRule", - "healthy Ratis/THREE pipelines"); - } + // Note: HealthyPipelineSafeModeRule may already be satisfied at this point + // since it validates by directly querying the pipeline manager, and all + // test pipelines are created as healthy. We only check if it's unsatisfied + // when the rule can't be immediately met. validateRuleStatus("OneReplicaPipelineSafeModeRule", "reported Ratis/THREE pipelines with at least one datanode"); @@ -345,21 +345,24 @@ public void testSafeModeExitRuleWithPipelineAvailabilityCheck( // Because even if no pipelines are there, and threshold we set to zero, // we shall a get an event when datanode is registered. In that case, // validate will return true, and add this to validatedRules. - if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) { + if (Math.max(healthyPipelineThresholdCount, oneReplicaThresholdCount) == 0 && !pipelines.isEmpty()) { firePipelineEvent(pipelineManager, pipelines.get(0)); } - for (int i = 0; i < Math.max(healthyPipelineThresholdCount, - Math.min(oneReplicaThresholdCount, pipelines.size())); i++) { - firePipelineEvent(pipelineManager, pipelines.get(i)); + // HealthyPipelineSafeModeRule now validates by directly querying the pipeline manager, + // so the healthy pipeline count is already at maximum. We only need to fire pipeline + // events for OneReplicaPipelineSafeModeRule. + // The healthy pipeline count should already be at the maximum (all pipelines are healthy) + int actualHealthyPipelines = pipelines.size(); + assertEquals(actualHealthyPipelines, + scmSafeModeManager.getSafeModeMetrics() + .getCurrentHealthyPipelinesCount().value()); - if (i < healthyPipelineThresholdCount) { - checkHealthy(i + 1); - assertEquals(i + 1, - scmSafeModeManager.getSafeModeMetrics() - .getCurrentHealthyPipelinesCount().value()); - } + // Fire events for OneReplicaPipelineSafeModeRule + for (int i = 0; i < pipelines.size(); i++) { + firePipelineEvent(pipelineManager, pipelines.get(i)); + // Only check one replica count if we haven't exceeded the threshold yet if (i < oneReplicaThresholdCount) { checkOpen(i + 1); assertEquals(i + 1, @@ -368,20 +371,34 @@ public void testSafeModeExitRuleWithPipelineAvailabilityCheck( } } - assertEquals(healthyPipelineThresholdCount, + // Verify final healthy pipeline count (unchanged since start) + assertEquals(actualHealthyPipelines, scmSafeModeManager.getSafeModeMetrics() .getCurrentHealthyPipelinesCount().value()); - assertEquals(oneReplicaThresholdCount, + // Verify one replica count + int expectedOneReplicaCount = Math.min(oneReplicaThresholdCount, pipelines.size()); + assertEquals(expectedOneReplicaCount, scmSafeModeManager.getSafeModeMetrics() .getCurrentPipelinesWithAtleastOneReplicaCount().value()); - - GenericTestUtils.waitFor(() -> !scmSafeModeManager.getInSafeMode(), - 100, 1000 * 5); - GenericTestUtils.waitFor(() -> - scmSafeModeManager.getSafeModeMetrics().getScmInSafeMode().value() == 0, - 100, 1000 * 5); + // Safe mode should only exit if we have enough pipelines to satisfy all thresholds + // If either threshold is higher than available pipelines, safe mode will remain active + if (healthyPipelineThresholdCount <= pipelines.size() && + oneReplicaThresholdCount <= pipelines.size()) { + GenericTestUtils.waitFor(() -> !scmSafeModeManager.getInSafeMode(), + 100, 1000 * 5); + GenericTestUtils.waitFor(() -> + scmSafeModeManager.getSafeModeMetrics().getScmInSafeMode().value() == 0, + 100, 1000 * 5); + } else { + // Verify safe mode remains active when insufficient pipelines exist + assertTrue(scmSafeModeManager.getInSafeMode(), + "Safe mode should remain active when insufficient pipelines exist. " + + "Required healthy: " + healthyPipelineThresholdCount + ", " + + "Required oneReplica: " + oneReplicaThresholdCount + ", " + + "Available: " + pipelines.size()); + } } /** @@ -400,13 +417,6 @@ private void validateRuleStatus(String safeModeRule, String stringToMatch) { } } - private void checkHealthy(int expectedCount) throws Exception { - final HealthyPipelineSafeModeRule pipelineRule = SafeModeRuleFactory.getInstance() - .getSafeModeRule(HealthyPipelineSafeModeRule.class); - GenericTestUtils.waitFor(() -> pipelineRule.getCurrentHealthyPipelineCount() == expectedCount, - 100, 5000); - } - private void checkOpen(int expectedCount) throws Exception { final OneReplicaPipelineSafeModeRule pipelineRule = SafeModeRuleFactory.getInstance() .getSafeModeRule(OneReplicaPipelineSafeModeRule.class);