diff --git a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java index 80d38c780785d..3b47da1a73c7a 100644 --- a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.time.Time; +import java.math.BigInteger; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Arrays; @@ -39,6 +40,8 @@ public class TimeUtils { private static final Map LABEL_TO_UNIT_MAP = Collections.unmodifiableMap(initMap()); + private static final BigInteger NANOS_PER_SECOND = BigInteger.valueOf(1_000_000_000L); + /** * Parse the given string to a java {@link Duration}. The string is in format "{length * value}{time unit label}", e.g. "123ms", "321 s". If no time unit label is specified, it will @@ -79,30 +82,45 @@ public static Duration parseDuration(String text) { throw new NumberFormatException("text does not start with a number"); } - final long value; + final BigInteger value; try { - value = Long.parseLong(number); // this throws a NumberFormatException on overflow + value = new BigInteger(number); // this throws a NumberFormatException } catch (NumberFormatException e) { throw new IllegalArgumentException( - "The value '" - + number - + "' cannot be re represented as 64bit number (numeric overflow)."); + "The value '" + number + "' cannot be represented as an integer number.", e); } + final ChronoUnit unit; if (unitLabel.isEmpty()) { - return Duration.of(value, ChronoUnit.MILLIS); - } - - ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel); - if (unit != null) { - return Duration.of(value, unit); + unit = ChronoUnit.MILLIS; } else { + unit = LABEL_TO_UNIT_MAP.get(unitLabel); + } + if (unit == null) { throw new IllegalArgumentException( "Time interval unit label '" + unitLabel + "' does not match any of the recognized units: " + TimeUnit.getAllUnits()); } + + try { + return convertBigIntToDuration(value, unit); + } catch (ArithmeticException e) { + throw new IllegalArgumentException( + "The value '" + + number + + "' cannot be represented as java.time.Duration (numeric overflow).", + e); + } + } + + private static Duration convertBigIntToDuration(BigInteger value, ChronoUnit unit) { + final BigInteger nanos = value.multiply(BigInteger.valueOf(unit.getDuration().toNanos())); + + final BigInteger[] dividedAndRemainder = nanos.divideAndRemainder(NANOS_PER_SECOND); + return Duration.ofSeconds(dividedAndRemainder[0].longValueExact()) + .plusNanos(dividedAndRemainder[1].longValueExact()); } private static Map initMap() { @@ -136,17 +154,35 @@ public static String getStringInMillis(final Duration duration) { * NOTE: It supports only durations that fit into long. */ public static String formatWithHighestUnit(Duration duration) { - long nanos = duration.toNanos(); + BigInteger nanos = toNanos(duration); TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos); return String.format( - "%d %s", - nanos / highestIntegerUnit.unit.getDuration().toNanos(), + "%s %s", + nanos.divide(highestIntegerUnit.getUnitAsNanos()), highestIntegerUnit.getLabels().get(0)); } - private static TimeUnit getHighestIntegerUnit(long nanos) { - if (nanos == 0) { + /** + * Converted from {@link Duration#toNanos()}, but produces {@link BigInteger} and does not throw + * an exception on overflow. + */ + private static BigInteger toNanos(Duration duration) { + long tempSeconds = duration.getSeconds(); + long tempNanos = duration.getNano(); + if (tempSeconds < 0) { + // change the seconds and nano value to + // handle Long.MIN_VALUE case + tempSeconds = tempSeconds + 1; + tempNanos = tempNanos - NANOS_PER_SECOND.longValue(); + } + return BigInteger.valueOf(tempSeconds) + .multiply(NANOS_PER_SECOND) + .add(BigInteger.valueOf(tempNanos)); + } + + private static TimeUnit getHighestIntegerUnit(BigInteger nanos) { + if (nanos.compareTo(BigInteger.ZERO) == 0) { return TimeUnit.MILLISECONDS; } @@ -162,7 +198,7 @@ private static TimeUnit getHighestIntegerUnit(long nanos) { TimeUnit highestIntegerUnit = null; for (TimeUnit timeUnit : orderedUnits) { - if (nanos % timeUnit.unit.getDuration().toNanos() != 0) { + if (nanos.remainder(timeUnit.getUnitAsNanos()).compareTo(BigInteger.ZERO) != 0) { break; } highestIntegerUnit = timeUnit; @@ -187,12 +223,13 @@ private enum TimeUnit { private final ChronoUnit unit; + private final BigInteger unitAsNanos; + TimeUnit(ChronoUnit unit, String[]... labels) { this.unit = unit; + this.unitAsNanos = BigInteger.valueOf(unit.getDuration().toNanos()); this.labels = - Arrays.stream(labels) - .flatMap(ls -> Arrays.stream(ls)) - .collect(Collectors.toList()); + Arrays.stream(labels).flatMap(Arrays::stream).collect(Collectors.toList()); } /** @@ -219,6 +256,10 @@ public ChronoUnit getUnit() { return unit; } + public BigInteger getUnitAsNanos() { + return unitAsNanos; + } + public static String getAllUnits() { return Arrays.stream(TimeUnit.values()) .map(TimeUnit::createTimeUnitString) diff --git a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java index d23ae0090d1f2..949cabf0fef00 100644 --- a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java @@ -43,7 +43,11 @@ private static Stream testDurationAndExpectedString() { Arguments.of(Duration.ofHours(23), "23 h"), Arguments.of(Duration.ofMillis(-1), "-1 ms"), Arguments.of(Duration.ofMillis(TimeUnit.DAYS.toMillis(1)), "1 d"), - Arguments.of(Duration.ofHours(24), "1 d")); + Arguments.of(Duration.ofHours(24), "1 d"), + Arguments.of(Duration.ofMillis(Long.MAX_VALUE), "9223372036854775807 ms"), + Arguments.of( + Duration.ofMillis(Long.MAX_VALUE).plusNanos(1), + "9223372036854775807000001 ns")); } @ParameterizedTest