Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 61 additions & 20 deletions flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Time;

import java.math.BigInteger;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
Expand All @@ -39,6 +40,8 @@ public class TimeUtils {
private static final Map<String, ChronoUnit> LABEL_TO_UNIT_MAP =
Collections.unmodifiableMap(initMap());

private static final BigInteger NANOS_PER_SECOND = BigInteger.valueOf(1_000_000_000L);

/**
* Parse the given string to a java {@link Duration}. The string is in format "{length
* value}{time unit label}", e.g. "123ms", "321 s". If no time unit label is specified, it will
Expand Down Expand Up @@ -79,30 +82,45 @@ public static Duration parseDuration(String text) {
throw new NumberFormatException("text does not start with a number");
}

final long value;
final BigInteger value;
try {
value = Long.parseLong(number); // this throws a NumberFormatException on overflow
value = new BigInteger(number); // this throws a NumberFormatException
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"The value '"
+ number
+ "' cannot be re represented as 64bit number (numeric overflow).");
"The value '" + number + "' cannot be represented as an integer number.", e);
}

final ChronoUnit unit;
if (unitLabel.isEmpty()) {
return Duration.of(value, ChronoUnit.MILLIS);
}

ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel);
if (unit != null) {
return Duration.of(value, unit);
unit = ChronoUnit.MILLIS;
} else {
unit = LABEL_TO_UNIT_MAP.get(unitLabel);
}
if (unit == null) {
throw new IllegalArgumentException(
"Time interval unit label '"
+ unitLabel
+ "' does not match any of the recognized units: "
+ TimeUnit.getAllUnits());
}

try {
return convertBigIntToDuration(value, unit);
} catch (ArithmeticException e) {
throw new IllegalArgumentException(
"The value '"
+ number
+ "' cannot be represented as java.time.Duration (numeric overflow).",
e);
}
}

private static Duration convertBigIntToDuration(BigInteger value, ChronoUnit unit) {
final BigInteger nanos = value.multiply(BigInteger.valueOf(unit.getDuration().toNanos()));

final BigInteger[] dividedAndRemainder = nanos.divideAndRemainder(NANOS_PER_SECOND);
return Duration.ofSeconds(dividedAndRemainder[0].longValueExact())
.plusNanos(dividedAndRemainder[1].longValueExact());
}

private static Map<String, ChronoUnit> initMap() {
Expand Down Expand Up @@ -136,17 +154,35 @@ public static String getStringInMillis(final Duration duration) {
* <b>NOTE:</b> It supports only durations that fit into long.
*/
public static String formatWithHighestUnit(Duration duration) {
long nanos = duration.toNanos();
BigInteger nanos = toNanos(duration);

TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos);
return String.format(
"%d %s",
nanos / highestIntegerUnit.unit.getDuration().toNanos(),
"%s %s",
nanos.divide(highestIntegerUnit.getUnitAsNanos()),
highestIntegerUnit.getLabels().get(0));
}

private static TimeUnit getHighestIntegerUnit(long nanos) {
if (nanos == 0) {
/**
* Converted from {@link Duration#toNanos()}, but produces {@link BigInteger} and does not throw
* an exception on overflow.
*/
private static BigInteger toNanos(Duration duration) {
long tempSeconds = duration.getSeconds();
long tempNanos = duration.getNano();
if (tempSeconds < 0) {
// change the seconds and nano value to
// handle Long.MIN_VALUE case
tempSeconds = tempSeconds + 1;
tempNanos = tempNanos - NANOS_PER_SECOND.longValue();
}
return BigInteger.valueOf(tempSeconds)
.multiply(NANOS_PER_SECOND)
.add(BigInteger.valueOf(tempNanos));
}

private static TimeUnit getHighestIntegerUnit(BigInteger nanos) {
if (nanos.compareTo(BigInteger.ZERO) == 0) {
return TimeUnit.MILLISECONDS;
}

Expand All @@ -162,7 +198,7 @@ private static TimeUnit getHighestIntegerUnit(long nanos) {

TimeUnit highestIntegerUnit = null;
for (TimeUnit timeUnit : orderedUnits) {
if (nanos % timeUnit.unit.getDuration().toNanos() != 0) {
if (nanos.remainder(timeUnit.getUnitAsNanos()).compareTo(BigInteger.ZERO) != 0) {
break;
}
highestIntegerUnit = timeUnit;
Expand All @@ -187,12 +223,13 @@ private enum TimeUnit {

private final ChronoUnit unit;

private final BigInteger unitAsNanos;

TimeUnit(ChronoUnit unit, String[]... labels) {
this.unit = unit;
this.unitAsNanos = BigInteger.valueOf(unit.getDuration().toNanos());
this.labels =
Arrays.stream(labels)
.flatMap(ls -> Arrays.stream(ls))
.collect(Collectors.toList());
Arrays.stream(labels).flatMap(Arrays::stream).collect(Collectors.toList());
}

/**
Expand All @@ -219,6 +256,10 @@ public ChronoUnit getUnit() {
return unit;
}

public BigInteger getUnitAsNanos() {
return unitAsNanos;
}

public static String getAllUnits() {
return Arrays.stream(TimeUnit.values())
.map(TimeUnit::createTimeUnitString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ private static Stream<Arguments> testDurationAndExpectedString() {
Arguments.of(Duration.ofHours(23), "23 h"),
Arguments.of(Duration.ofMillis(-1), "-1 ms"),
Arguments.of(Duration.ofMillis(TimeUnit.DAYS.toMillis(1)), "1 d"),
Arguments.of(Duration.ofHours(24), "1 d"));
Arguments.of(Duration.ofHours(24), "1 d"),
Arguments.of(Duration.ofMillis(Long.MAX_VALUE), "9223372036854775807 ms"),
Arguments.of(
Duration.ofMillis(Long.MAX_VALUE).plusNanos(1),
"9223372036854775807000001 ns"));
}

@ParameterizedTest
Expand Down