diff --git a/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestAppShared.scala b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestAppShared.scala index 8cb1244..1643b6b 100644 --- a/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestAppShared.scala +++ b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestAppShared.scala @@ -139,6 +139,7 @@ object TestAppShared { val values: Vector[TestClientTestCase] = Vector( DiscoverNewPod, DnsFailureRecover, + ResolveShortDomainName, ) /** @@ -165,5 +166,14 @@ object TestAppShared { * - check that after the configured reload TTL, the client sees both servers */ case object DnsFailureRecover extends TestClientTestCase + + /** + * [[TestClient]] test case verifying that `K8sDnsNameResolver` resolves short k8s + * service domain names using resolv.conf search domains. + * + * Why this needs a separate test case: + * [[com.evolution.jgrpc.tools.k8sdns.NameLookupState]] + */ + case object ResolveShortDomainName extends TestClientTestCase } } diff --git a/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestClient.scala b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestClient.scala index 9b853ae..d19c084 100644 --- a/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestClient.scala +++ b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestClient.scala @@ -63,6 +63,8 @@ private[it] final class TestClient { testCaseDiscoverNewPod(fixture) case TestClientTestCase.DnsFailureRecover => testCaseDnsFailureRecover(fixture) + case TestClientTestCase.ResolveShortDomainName => + testCaseResolveShortDomainName(fixture) } } @@ -86,22 +88,22 @@ private[it] final class TestClient { private def testCaseDiscoverNewPod(fixture: Fixture): Unit = { fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip)) - withRoundRobinLbClient { client => - callHost2TimesAssertServerIds(client, expectedServerIds = Set(1)) + withRoundRobinLbClient() { client => + callHostManyTimesAssertServerIds(client, expectedServerIds = Set(1)) fixture.coreDns.setServiceIps(Set(fixture.srv1Ip, fixture.srv2Ip)) sleepUntilClientGetsDnsUpdate() - callHost2TimesAssertServerIds(client, expectedServerIds = Set(1, 2)) + callHostManyTimesAssertServerIds(client, expectedServerIds = Set(1, 2)) } } private def testCaseDnsFailureRecover(fixture: Fixture): Unit = { fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip)) - withRoundRobinLbClient { client => - callHost2TimesAssertServerIds(client, expectedServerIds = Set(1)) + withRoundRobinLbClient() { client => + callHostManyTimesAssertServerIds(client, expectedServerIds = Set(1)) fixture.coreDns.ensureStopped() @@ -111,7 +113,15 @@ private[it] final class TestClient { sleepUntilClientGetsDnsUpdate() - callHost2TimesAssertServerIds(client, expectedServerIds = Set(1, 2)) + callHostManyTimesAssertServerIds(client, expectedServerIds = Set(1, 2)) + } + } + + private def testCaseResolveShortDomainName(fixture: Fixture): Unit = { + fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip, fixture.srv2Ip)) + + withRoundRobinLbClient(targetHostname = svcHostnameShort) { client => + callHostManyTimesAssertServerIds(client, expectedServerIds = Set(1, 2)) } } @@ -125,21 +135,44 @@ private[it] final class TestClient { Thread.sleep(sleepIntervalSeconds.toLong * 1000) } - private def callHost2TimesAssertServerIds( + private def callHostManyTimesAssertServerIds( client: TestSvcBlockingStub, expectedServerIds: Set[Int], ): Unit = { - val actualServerIds = 0.until(2).map { _ => + require(expectedServerIds.subsetOf(allServerIds)) + + var observedServerIdsVec: Vector[Int] = Vector.fill(allServerIds.size) { client.getId(GetIdRequest()).id - }.toSet - if (actualServerIds != expectedServerIds) { - sys.error(s"GRPC client observed server IDs $actualServerIds, expected $expectedServerIds") + } + + // When the client is just establishing connections, sometimes calling it multiple times + // doesn't give the round-robin call picture (1, 2, 1, 2,...), + // but it appears as if the client + // routes all the calls to the first opened connection, + // while waiting for the rest to be fully ready. + // So if we haven't observed all the servers yet, let's wait a bit and call again. + // This helps to recover such cases. + if (observedServerIdsVec.toSet.size < allServerIds.size) { + Thread.sleep(1000L) + observedServerIdsVec = observedServerIdsVec ++ Vector.fill(allServerIds.size) { + client.getId(GetIdRequest()).id + } + } + + val observedServerIds = observedServerIdsVec.toSet + if (observedServerIds != expectedServerIds) { + sys.error(s"GRPC client expected server IDs $expectedServerIds, " + + s"observed $observedServerIds (received in order: $observedServerIdsVec)") } } - private def withRoundRobinLbClient[T](body: TestSvcBlockingStub => T): T = { + private def withRoundRobinLbClient[T]( + targetHostname: String = svcHostname, + )( + body: TestSvcBlockingStub => T, + ): T = { val channel = NettyChannelBuilder - .forTarget(s"k8s-dns://$svcHostname:${ TestAppShared.ServerPort }") + .forTarget(s"k8s-dns://$targetHostname:${ TestAppShared.ServerPort }") .usePlaintext() .defaultLoadBalancingPolicy("round_robin") .build() @@ -156,7 +189,10 @@ private[it] final class TestClient { } private object TestClient { - private val svcHostname: String = "svc.example.org" + private val allServerIds = Set(1, 2) + private val clusterHostnameSuffix = "svc.cluster.local" + private val svcHostnameShort = "acme-grpc.acme" + private val svcHostname = s"$svcHostnameShort.$clusterHostnameSuffix" private val resolveConfPath = "/etc/resolv.conf" private val coreDnsCoreFilePath = "/etc/coredns/CoreFile" @@ -178,6 +214,8 @@ private object TestClient { Paths.get(resolveConfPath), Vector( "nameserver 127.0.0.1", + s"search $clusterHostnameSuffix", + "options ndots:5", ).asJava, StandardOpenOption.TRUNCATE_EXISTING, ) @@ -224,7 +262,7 @@ private object TestClient { private def writeCoreFile(): Unit = { Files.writeString( Paths.get(coreDnsCoreFilePath), - s"""$svcHostname { + s""". { | hosts $coreDnsHostsFilePath { | ttl $coreDnsHostsReloadIntervalSeconds | reload ${ coreDnsHostsReloadIntervalSeconds }s diff --git a/k8s-dns-name-resolver-it/src/test/scala/com/evolution/jgrpc/tools/k8sdns/it/K8sDnsNameResolverIt.scala b/k8s-dns-name-resolver-it/src/test/scala/com/evolution/jgrpc/tools/k8sdns/it/K8sDnsNameResolverIt.scala index 3b88830..f59acbb 100644 --- a/k8s-dns-name-resolver-it/src/test/scala/com/evolution/jgrpc/tools/k8sdns/it/K8sDnsNameResolverIt.scala +++ b/k8s-dns-name-resolver-it/src/test/scala/com/evolution/jgrpc/tools/k8sdns/it/K8sDnsNameResolverIt.scala @@ -72,6 +72,10 @@ class K8sDnsNameResolverIt extends AnyFreeSpec with BeforeAndAfterAll { "should recover after DNS query failure" in { runTestCase(TestClientTestCase.DnsFailureRecover) } + + "should resolve short domain names with search domains" in { + runTestCase(TestClientTestCase.ResolveShortDomainName) + } } private def runTestCase(testCase: TestClientTestCase): Unit = { diff --git a/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/K8sDnsNameResolver.java b/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/K8sDnsNameResolver.java index e66fe27..271a384 100644 --- a/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/K8sDnsNameResolver.java +++ b/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/K8sDnsNameResolver.java @@ -3,7 +3,6 @@ import static java.lang.Math.max; import static java.lang.String.format; -import com.google.common.net.InetAddresses; import io.grpc.*; import io.grpc.SynchronizationContext.ScheduledHandle; import java.net.InetAddress; @@ -12,18 +11,12 @@ import java.time.Duration; import java.time.Instant; import java.util.List; -import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import org.jspecify.annotations.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.xbill.DNS.Name; -import org.xbill.DNS.Record; -import org.xbill.DNS.Type; -import org.xbill.DNS.lookup.LookupResult; -import org.xbill.DNS.lookup.LookupSession; /* package */ final class K8sDnsNameResolver extends NameResolver { @@ -33,7 +26,8 @@ private final long refreshIntervalSeconds; private final SynchronizationContext syncCtx; private final ScheduledExecutorService scheduledExecutor; - private final LookupSession dnsLookupSession; + + private NameLookupState nameLookupState; @Nullable private Listener listener = null; @@ -52,8 +46,7 @@ private record SuccessResult(List addresses, Instant receiveTime) { this.refreshIntervalSeconds = refreshIntervalSeconds; this.syncCtx = syncCtx; this.scheduledExecutor = scheduledExecutor; - this.dnsLookupSession = - LookupSession.defaultBuilder().searchPath(targetUri.host()).clearCaches().build(); + this.nameLookupState = NameLookupState.initialize(targetUri.host()); } @Override @@ -159,28 +152,21 @@ private EquivalentAddressGroup mkAddressGroup(InetAddress addr) { // callback is executed under syncCtx private void resolveAllAsync( BiConsumer<@Nullable List, ? super @Nullable Throwable> cb) { - final var dnsLookupAsyncResult = this.dnsLookupSession.lookupAsync(Name.empty, Type.A); - dnsLookupAsyncResult - .thenApply( - (result) -> { - logger.debug("DNS lookup result: {}", result); - var records = - Optional.ofNullable(result).map(LookupResult::getRecords).orElse(List.of()); - return records.stream() - .map(Record::rdataToString) - .distinct() - .sorted() // make sure that result comparison does not depend on order - .map(InetAddresses::forString) - .toList(); - }) + nameLookupState + .runNextLookup() .whenComplete( - (addresses, err) -> + (nameLookupState, err) -> this.syncCtx.execute( () -> { if (err != null) { logger.error("DNS lookup failed", err); + cb.accept(null, err); + } else { + this.nameLookupState = nameLookupState; + logger.debug( + "DNS lookup successful {}", this.nameLookupState.getLastResult()); + cb.accept(this.nameLookupState.getLastResult(), null); } - cb.accept(addresses, err); })); } diff --git a/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/NameLookupState.java b/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/NameLookupState.java new file mode 100644 index 0000000..7a4cd62 --- /dev/null +++ b/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/NameLookupState.java @@ -0,0 +1,186 @@ +package com.evolution.jgrpc.tools.k8sdns; + +import com.google.common.net.InetAddresses; +import java.net.InetAddress; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; +import org.jspecify.annotations.Nullable; +import org.xbill.DNS.Name; +import org.xbill.DNS.NameTooLongException; +import org.xbill.DNS.Record; +import org.xbill.DNS.ResolverConfig; +import org.xbill.DNS.Type; +import org.xbill.DNS.lookup.LookupResult; +import org.xbill.DNS.lookup.LookupSession; + +/** + * Immutable component for performing DNS A-record lookups for a name. + * + *

Intended usage pattern: + * + *

+ * + *

Search domains support quirks: + * + *

+ * + * @see LookupSession + */ +/* package */ final class NameLookupState { + + private final Name name; + private final LookupSession session; + private final List lastResult; + @Nullable private final Name discoveredAbsoluteName; + + public static NameLookupState initialize(Name name) { + return new NameLookupState(name); + } + + private NameLookupState( + Name name, + LookupSession session, + List lastResult, + @Nullable Name discoveredAbsoluteName) { + this.name = name; + this.session = session; + this.lastResult = lastResult; + this.discoveredAbsoluteName = discoveredAbsoluteName; + } + + private NameLookupState(Name name) { + this.name = name; + this.session = + LookupSession.defaultBuilder() + // disable cache, to make sure we always get the actual information + .clearCaches() + // disable search domains support just in case, we are reimplementing it here anyway + // see the class doc for the "why" + .clearSearchPath() + .build(); + this.lastResult = Collections.emptyList(); + this.discoveredAbsoluteName = name.isAbsolute() ? name : null; + } + + private NameLookupState copyWithDiscoveredAbsoluteName( + Name discoveredAbsoluteName, List newLastResult) { + return new NameLookupState(this.name, this.session, newLastResult, discoveredAbsoluteName); + } + + private NameLookupState copyWithResult(List newLastResult) { + return new NameLookupState(this.name, this.session, newLastResult, this.discoveredAbsoluteName); + } + + public List getLastResult() { + return this.lastResult; + } + + public CompletionStage runNextLookup() { + if (discoveredAbsoluteName != null) { + return lookupByAbsoluteName(discoveredAbsoluteName).thenApply(this::copyWithResult); + } else { + var primaryName = concatFailIfInvalid(name, Name.root); + return lookupByAbsoluteName(primaryName) + .handle( + (addresses, err) -> { + if (err == null && !addresses.isEmpty()) { + return CompletableFuture.completedFuture( + copyWithDiscoveredAbsoluteName(primaryName, addresses)); + } else { + var alternativeAbsoluteNames = + ResolverConfig.getCurrentConfig().searchPath().stream() + .flatMap(searchName -> concatEmptyIfInvalid(name, searchName).stream()) + .toList(); + return lookupAlternativeAbsoluteNames(addresses, err, alternativeAbsoluteNames); + } + }) + .thenCompose(Function.identity()); + } + } + + private CompletionStage lookupAlternativeAbsoluteNames( + List primaryNameResultAddresses, + @Nullable Throwable primaryNameResultError, + List remainingAbsoluteNamesToTry) { + if (remainingAbsoluteNamesToTry.isEmpty()) { + if (primaryNameResultError != null) { + return CompletableFuture.failedFuture(primaryNameResultError); + } else { + return CompletableFuture.completedFuture(copyWithResult(primaryNameResultAddresses)); + } + } else { + var curNameToTry = remainingAbsoluteNamesToTry.get(0); + return lookupByAbsoluteName(curNameToTry) + .handle( + (addresses, err) -> { + if (err == null && !addresses.isEmpty()) { + return CompletableFuture.completedFuture( + copyWithDiscoveredAbsoluteName(curNameToTry, addresses)); + } else { + var newRemainingAbsoluteNamesToTry = + remainingAbsoluteNamesToTry.stream().skip(1).toList(); + return lookupAlternativeAbsoluteNames( + primaryNameResultAddresses, + primaryNameResultError, + newRemainingAbsoluteNamesToTry); + } + }) + .thenCompose(Function.identity()); + } + } + + private CompletionStage> lookupByAbsoluteName(Name name) { + return session + .lookupAsync(name, Type.A) + .thenApply( + (res) -> + Optional.ofNullable(res).map(LookupResult::getRecords).orElse(List.of()).stream() + .map(Record::rdataToString) + .distinct() + .sorted() // make sure that result comparison does not depend on order + .map(InetAddresses::forString) + .toList()); + } + + // org.xbill.DNS.lookup.LookupSession.safeConcat + private static Optional concatEmptyIfInvalid(Name name, Name suffix) { + try { + return Optional.of(Name.concatenate(name, suffix)); + } catch (NameTooLongException e) { + return Optional.empty(); + } + } + + private static Name concatFailIfInvalid(Name name, Name suffix) { + try { + return Name.concatenate(name, suffix); + } catch (NameTooLongException e) { + throw new RuntimeException(e); + } + } +} diff --git a/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/package-info.java b/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/package-info.java index 2c69f6e..feece08 100644 --- a/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/package-info.java +++ b/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/package-info.java @@ -3,5 +3,3 @@ */ @org.jspecify.annotations.NullMarked package com.evolution.jgrpc.tools.k8sdns; - -// TODO: #1 add README