Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,16 @@

package org.apache.hadoop.ozone.recon.api;

import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse;
import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics;
import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,51 +81,6 @@ public Response getPendingDeletionByComponent(
}
}

@GET
@Path("/download")
public Response downloadPendingDeleteData() {
DataNodeMetricsServiceResponse dnMetricsResponse = dataNodeMetricsService.getCollectedMetrics(null);

if (dnMetricsResponse.getStatus() != DataNodeMetricsService.MetricCollectionStatus.FINISHED) {
return Response.status(Response.Status.ACCEPTED)
.entity(dnMetricsResponse)
.type("application/json")
.build();
}

if (null == dnMetricsResponse.getPendingDeletionPerDataNode()) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity("Metrics data is missing despite FINISHED status.")
.type("text/plain")
.build();
}

StreamingOutput stream = output -> {
CSVFormat format = CSVFormat.DEFAULT.builder()
.setHeader("HostName", "Datanode UUID", "Pending Block Size (bytes)").build();
try (CSVPrinter csvPrinter = new CSVPrinter(
new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)), format)) {
for (DatanodePendingDeletionMetrics metric : dnMetricsResponse.getPendingDeletionPerDataNode()) {
csvPrinter.printRecord(
metric.getHostName(),
metric.getDatanodeUuid(),
metric.getPendingBlockSize()
);
}
csvPrinter.flush();
} catch (Exception e) {
LOG.error("Failed to stream CSV", e);
throw new WebApplicationException("Failed to generate CSV", e);
}
};

return Response.status(Response.Status.ACCEPTED)
.entity(stream)
.type("text/csv")
.header("Content-Disposition", "attachment; filename=\"pending_deletion_all_datanode_stats.csv\"")
.build();
}

private Response handleDataNodeMetrics(Integer limit) {
if (null != limit && limit < 1) {
return Response.status(Response.Status.BAD_REQUEST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,33 @@
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.ozone.recon.api.types.DUResponse;
import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse;
import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics;
import org.apache.hadoop.ozone.recon.api.types.DatanodeStorageReport;
import org.apache.hadoop.ozone.recon.api.types.GlobalNamespaceReport;
import org.apache.hadoop.ozone.recon.api.types.GlobalStorageReport;
Expand Down Expand Up @@ -70,16 +81,19 @@ public class StorageDistributionEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(StorageDistributionEndpoint.class);
private final ReconGlobalStatsManager reconGlobalStatsManager;
private final ReconGlobalMetricsService reconGlobalMetricsService;
private final DataNodeMetricsService dataNodeMetricsService;

@Inject
public StorageDistributionEndpoint(OzoneStorageContainerManager reconSCM,
NSSummaryEndpoint nsSummaryEndpoint,
ReconGlobalStatsManager reconGlobalStatsManager,
ReconGlobalMetricsService reconGlobalMetricsService) {
ReconGlobalMetricsService reconGlobalMetricsService,
DataNodeMetricsService dataNodeMetricsService) {
this.nodeManager = (ReconNodeManager) reconSCM.getScmNodeManager();
this.nsSummaryEndpoint = nsSummaryEndpoint;
this.reconGlobalStatsManager = reconGlobalStatsManager;
this.reconGlobalMetricsService = reconGlobalMetricsService;
this.dataNodeMetricsService = dataNodeMetricsService;
}

@GET
Expand Down Expand Up @@ -113,6 +127,85 @@ public Response getStorageDistribution() {
}
}

@GET
@Path("/download")
public Response downloadDataNodeDistribution() {
DataNodeMetricsServiceResponse metricsResponse =
dataNodeMetricsService.getCollectedMetrics(null);

if (metricsResponse.getStatus() != DataNodeMetricsService.MetricCollectionStatus.FINISHED) {
return Response.status(Response.Status.ACCEPTED)
.entity(metricsResponse)
.type(MediaType.APPLICATION_JSON)
.build();
}

List<DatanodePendingDeletionMetrics> pendingDeletionMetrics =
metricsResponse.getPendingDeletionPerDataNode();

if (pendingDeletionMetrics == null) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity("Metrics data is missing despite FINISHED status.")
.type(MediaType.TEXT_PLAIN)
.build();
}

Map<String, DatanodeStorageReport> reportByUuid =
collectDatanodeReports().stream()
.collect(Collectors.toMap(
DatanodeStorageReport::getDatanodeUuid,
Function.identity()));

StreamingOutput stream = output -> {
CSVFormat format = CSVFormat.DEFAULT.builder()
.setHeader(
"HostName",
"Datanode UUID",
"Capacity",
"Used Space",
"Remaining Space",
"Committed Space",
"Reserved Space",
"Minimum Free Space",
"Pending Block Size")
.build();

try (CSVPrinter printer = new CSVPrinter(
new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)),
format)) {

for (DatanodePendingDeletionMetrics metric : pendingDeletionMetrics) {
DatanodeStorageReport report = reportByUuid.get(metric.getDatanodeUuid());
if (report == null) {
continue; // skip if report is missing
}

printer.printRecord(
metric.getHostName(),
metric.getDatanodeUuid(),
report.getCapacity(),
report.getUsed(),
report.getRemaining(),
report.getCommitted(),
report.getReserved(),
report.getMinimumFreeSpace(),
metric.getPendingBlockSize()
);
}
printer.flush();
} catch (Exception e) {
LOG.error("Failed to stream CSV", e);
throw new WebApplicationException("Failed to generate CSV", e);
}
};

return Response.status(Response.Status.ACCEPTED)
.entity(stream)
.type("text/csv")
.header("Content-Disposition", "attachment; filename=\"datanode_storage_and_pending_deletion_stats.csv\"")
.build();
}

private GlobalStorageReport calculateGlobalStorageReport() {
try {
SCMNodeStat stats = nodeManager.getStats();
Expand Down Expand Up @@ -188,7 +281,7 @@ private StorageCapacityDistributionResponse buildStorageDistributionResponse(
.build();
}

private List<DatanodeStorageReport> collectDatanodeReports() {
public List<DatanodeStorageReport> collectDatanodeReports() {
return nodeManager.getAllNodes().stream()
.map(this::getStorageReport)
.filter(Objects::nonNull) // Filter out null reports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type CapacityState = {

const Capacity: React.FC<object> = () => {
const PENDING_POLL_INTERVAL = 5 * 1000;
const DN_CSV_DOWNLOAD_URL = '/api/v1/pendingDeletion/download';
const DN_CSV_DOWNLOAD_URL = '/api/v1/storageDistribution/download';
const DN_STATUS_URL = '/api/v1/pendingDeletion?component=dn';
const DOWNLOAD_POLL_TIMEOUT_MS = 10 * 60 * 1000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@
package org.apache.hadoop.ozone.recon.api;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse;
Expand Down Expand Up @@ -212,58 +206,4 @@ public void testOmComponentReturnsPendingDeletionSizes() {
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
assertEquals(pendingSizes, response.getEntity());
}

@Test
public void testDownloadReturnsAcceptedWhenCollectionInProgress() {
DataNodeMetricsServiceResponse metricsResponse = DataNodeMetricsServiceResponse.newBuilder()
.setStatus(DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS)
.build();
when(dataNodeMetricsService.getCollectedMetrics(null)).thenReturn(metricsResponse);

Response response = pendingDeletionEndpoint.downloadPendingDeleteData();

assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
assertEquals("application/json", response.getMediaType().toString());
assertEquals(metricsResponse, response.getEntity());
}

@Test
public void testDownloadReturnsServerErrorWhenMetricsMissing() {
DataNodeMetricsServiceResponse metricsResponse = DataNodeMetricsServiceResponse.newBuilder()
.setStatus(DataNodeMetricsService.MetricCollectionStatus.FINISHED)
.build();
when(dataNodeMetricsService.getCollectedMetrics(null)).thenReturn(metricsResponse);

Response response = pendingDeletionEndpoint.downloadPendingDeleteData();

assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
assertEquals("Metrics data is missing despite FINISHED status.", response.getEntity());
assertEquals("text/plain", response.getMediaType().toString());
}

@Test
public void testDownloadReturnsCsvWithMetrics() throws Exception {
List<DatanodePendingDeletionMetrics> pendingDeletionMetrics = Arrays.asList(
new DatanodePendingDeletionMetrics("dn1", "uuid-1", 10L),
new DatanodePendingDeletionMetrics("dn2", "uuid-2", 20L));
DataNodeMetricsServiceResponse metricsResponse = DataNodeMetricsServiceResponse.newBuilder()
.setStatus(DataNodeMetricsService.MetricCollectionStatus.FINISHED)
.setPendingDeletion(pendingDeletionMetrics)
.build();
when(dataNodeMetricsService.getCollectedMetrics(null)).thenReturn(metricsResponse);

Response response = pendingDeletionEndpoint.downloadPendingDeleteData();

assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
assertEquals("text/csv", response.getMediaType().toString());
assertEquals("attachment; filename=\"pending_deletion_all_datanode_stats.csv\"",
response.getHeaderString("Content-Disposition"));
StreamingOutput streamingOutput = assertInstanceOf(StreamingOutput.class, response.getEntity());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
streamingOutput.write(outputStream);
String csv = new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
assertTrue(csv.contains("HostName,Datanode UUID,Pending Block Size (bytes)"));
assertTrue(csv.contains("dn1,uuid-1,10"));
assertTrue(csv.contains("dn2,uuid-2,20"));
}
}
Loading