-
Notifications
You must be signed in to change notification settings - Fork 594
HDDS-13590. Refactor HealthyPipelineSafeModeRule to not use PipelineReportFromDatanode #9651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,11 +19,7 @@ | |
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.base.Preconditions; | ||
| import java.util.HashSet; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.hadoop.hdds.HddsConfigKeys; | ||
|
|
@@ -60,11 +56,9 @@ public class HealthyPipelineSafeModeRule extends SafeModeExitRule<Pipeline> { | |
| private int healthyPipelineThresholdCount; | ||
| private int currentHealthyPipelineCount = 0; | ||
| private final double healthyPipelinesPercent; | ||
| private final Set<PipelineID> processedPipelineIDs = new HashSet<>(); | ||
| private final PipelineManager pipelineManager; | ||
| private final int minHealthyPipelines; | ||
| private final SCMContext scmContext; | ||
| private final Set<PipelineID> unProcessedPipelineSet = new HashSet<>(); | ||
| private final NodeManager nodeManager; | ||
|
|
||
| HealthyPipelineSafeModeRule(EventQueue eventQueue, | ||
|
|
@@ -123,81 +117,62 @@ protected synchronized boolean validate() { | |
| LOG.info("All SCM pipelines are closed due to ongoing upgrade " + | ||
| "finalization. Bypassing healthy pipeline safemode rule."); | ||
| return true; | ||
| } | ||
| // Query PipelineManager directly for healthy pipeline count | ||
| List<Pipeline> openPipelines = pipelineManager.getPipelines( | ||
| RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE), | ||
| Pipeline.PipelineState.OPEN); | ||
|
|
||
| LOG.info("Found {} open RATIS/THREE pipelines", openPipelines.size()); | ||
| currentHealthyPipelineCount = (int) openPipelines.stream() | ||
| .filter(this::isPipelineHealthy) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Earlier, for every pipeline event, pipeline info is logged once. But now it will be logged for all pipeline for every event, can result in a lot of INFO logs for pipeline only. |
||
| .count(); | ||
| getSafeModeMetrics().setNumCurrentHealthyPipelines(currentHealthyPipelineCount); | ||
| boolean isValid = currentHealthyPipelineCount >= healthyPipelineThresholdCount; | ||
| if (scmInSafeMode()) { | ||
| LOG.info("SCM in safe mode. Healthy pipelines: {}, threshold: {}, rule satisfied: {}", | ||
| currentHealthyPipelineCount, healthyPipelineThresholdCount, isValid); | ||
| } else { | ||
| return currentHealthyPipelineCount >= healthyPipelineThresholdCount; | ||
| LOG.debug("SCM not in safe mode. Healthy pipelines: {}, threshold: {}", | ||
| currentHealthyPipelineCount, healthyPipelineThresholdCount); | ||
| } | ||
| return isValid; | ||
| } | ||
|
|
||
| @Override | ||
| protected synchronized void process(Pipeline pipeline) { | ||
| Objects.requireNonNull(pipeline, "pipeline == null"); | ||
|
|
||
| // When SCM is in safe mode for long time, already registered | ||
| // datanode can send pipeline report again, or SCMPipelineManager will | ||
| // create new pipelines. | ||
|
|
||
| // Only handle RATIS + 3-replica pipelines. | ||
| if (pipeline.getType() != HddsProtos.ReplicationType.RATIS || | ||
| ((RatisReplicationConfig) pipeline.getReplicationConfig()).getReplicationFactor() != | ||
| HddsProtos.ReplicationFactor.THREE) { | ||
| Logger safeModeManagerLog = SCMSafeModeManager.getLogger(); | ||
| if (safeModeManagerLog.isDebugEnabled()) { | ||
| safeModeManagerLog.debug("Skipping pipeline safemode report processing as Replication type isn't RATIS " + | ||
| "or replication factor isn't 3."); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| // Skip already processed ones. | ||
| if (processedPipelineIDs.contains(pipeline.getId())) { | ||
| LOG.info("Skipping pipeline safemode report processing check as pipeline: {} is already recorded.", | ||
| pipeline.getId()); | ||
| return; | ||
| private boolean isPipelineHealthy(Pipeline pipeline) { | ||
| // Verify pipeline has all 3 nodes | ||
| List<DatanodeDetails> nodes = pipeline.getNodes(); | ||
| if (nodes.size() != 3) { | ||
| LOG.info("Pipeline {} is not healthy: has {} nodes instead of 3", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this logging will happen now 3 times for every pipeline event, 2 times in validate() as called pre-process() and post-process(), and one time in getStatus(). And this will be done for every pipeline in the SCM for each event. |
||
| pipeline.getId(), nodes.size()); | ||
| return false; | ||
| } | ||
|
|
||
| List<DatanodeDetails> pipelineDns = pipeline.getNodes(); | ||
| if (pipelineDns.size() != 3) { | ||
| LOG.warn("Only {} DNs reported this pipeline: {}, all 3 DNs should report the pipeline", pipelineDns.size(), | ||
| pipeline.getId()); | ||
| return; | ||
| } | ||
|
|
||
| Map<DatanodeDetails, String> badDnsWithReasons = new LinkedHashMap<>(); | ||
|
|
||
| for (DatanodeDetails dn : pipelineDns) { | ||
|
|
||
| // Verify all nodes are healthy | ||
| for (DatanodeDetails dn : nodes) { | ||
| try { | ||
| NodeStatus status = nodeManager.getNodeStatus(dn); | ||
| if (!status.equals(NodeStatus.inServiceHealthy())) { | ||
| String reason = String.format("Health: %s, Operational State: %s", | ||
| status.getHealth(), status.getOperationalState()); | ||
| badDnsWithReasons.put(dn, reason); | ||
| LOG.info("Pipeline {} is not healthy: DN {} has status - Health: {}, Operational State: {}", | ||
| pipeline.getId(), dn.getUuidString(), status.getHealth(), status.getOperationalState()); | ||
| return false; | ||
| } | ||
| } catch (NodeNotFoundException e) { | ||
| badDnsWithReasons.put(dn, "DN not registered with SCM"); | ||
| LOG.warn("Pipeline {} is not healthy: DN {} not found in node manager", | ||
| pipeline.getId(), dn.getUuidString()); | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| if (!badDnsWithReasons.isEmpty()) { | ||
| String badDnSummary = badDnsWithReasons.entrySet().stream() | ||
| .map(entry -> String.format("DN %s: %s", entry.getKey().getID(), entry.getValue())) | ||
| .collect(Collectors.joining("; ")); | ||
| LOG.warn("Below DNs reported by Pipeline: {} are either in bad health or un-registered with SCMs. Details: {}", | ||
| pipeline.getId(), badDnSummary); | ||
| return; | ||
| } | ||
|
|
||
| getSafeModeMetrics().incCurrentHealthyPipelinesCount(); | ||
| currentHealthyPipelineCount++; | ||
| processedPipelineIDs.add(pipeline.getId()); | ||
| unProcessedPipelineSet.remove(pipeline.getId()); | ||
|
|
||
| if (scmInSafeMode()) { | ||
| SCMSafeModeManager.getLogger().info( | ||
| "SCM in safe mode. Healthy pipelines reported count is {}, " + | ||
| "required healthy pipeline reported count is {}", | ||
| currentHealthyPipelineCount, getHealthyPipelineThresholdCount()); | ||
|
|
||
| } | ||
| @Override | ||
| protected synchronized void process(Pipeline pipeline) { | ||
| // No longer processing events, validation is done directly via validate() | ||
| // This method is still called when OPEN_PIPELINE events arrive, but we | ||
| // validate in validate() method instead | ||
| LOG.debug("Received OPEN_PIPELINE event for pipeline {}, validation will happen in validate() method", | ||
| pipeline.getId()); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -212,13 +187,12 @@ public synchronized void refresh(boolean forceRefresh) { | |
| } | ||
|
|
||
| private synchronized void initializeRule(boolean refresh) { | ||
| unProcessedPipelineSet.addAll(pipelineManager.getPipelines( | ||
| RatisReplicationConfig.getInstance( | ||
| HddsProtos.ReplicationFactor.THREE), | ||
| Pipeline.PipelineState.OPEN).stream().map(Pipeline::getId) | ||
| .collect(Collectors.toSet())); | ||
|
|
||
| int pipelineCount = unProcessedPipelineSet.size(); | ||
| // Get current open pipeline count from PipelineManager | ||
| List<Pipeline> openPipelines = pipelineManager.getPipelines( | ||
| RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE), | ||
| Pipeline.PipelineState.OPEN); | ||
|
|
||
| int pipelineCount = openPipelines.size(); | ||
|
|
||
| healthyPipelineThresholdCount = Math.max(minHealthyPipelines, | ||
| (int) Math.ceil(healthyPipelinesPercent * pipelineCount)); | ||
|
|
@@ -239,8 +213,7 @@ private synchronized void initializeRule(boolean refresh) { | |
|
|
||
| @Override | ||
| protected synchronized void cleanup() { | ||
| processedPipelineIDs.clear(); | ||
| unProcessedPipelineSet.clear(); | ||
| // No tracking state to clean up since we query PipelineManager directly | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
|
|
@@ -265,13 +238,20 @@ public String getStatusText() { | |
|
|
||
| private synchronized String updateStatusTextWithSamplePipelines( | ||
| String status) { | ||
| Set<PipelineID> samplePipelines = | ||
| unProcessedPipelineSet.stream().limit(SAMPLE_PIPELINE_DISPLAY_LIMIT) | ||
| .collect(Collectors.toSet()); | ||
|
|
||
| if (!samplePipelines.isEmpty()) { | ||
| // Get sample pipelines that don't satisfy the healthy criteria | ||
| List<Pipeline> openPipelines = pipelineManager.getPipelines( | ||
| RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE), | ||
| Pipeline.PipelineState.OPEN); | ||
|
|
||
| Set<PipelineID> unhealthyPipelines = openPipelines.stream() | ||
| .filter(p -> !isPipelineHealthy(p)) | ||
| .map(Pipeline::getId) | ||
| .limit(SAMPLE_PIPELINE_DISPLAY_LIMIT) | ||
| .collect(Collectors.toSet()); | ||
|
|
||
| if (!unhealthyPipelines.isEmpty()) { | ||
| String samplePipelineText = | ||
| "Sample pipelines not satisfying the criteria : " + samplePipelines; | ||
| "Sample pipelines not satisfying the criteria : " + unhealthyPipelines; | ||
| status = status.concat("\n").concat(samplePipelineText); | ||
| } | ||
| return status; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In current implementation, if Rule is satisfied once, its marked exit. Never enter again. But with having dynamic rule check, one time may be success and other time may be failure.
This may not satisfy the purpose.