Skip to content

[Bug] NATS transport: unhandled NATSConnectionClosedException during JetStream Ack()/Nak() can crash subscriber process #1785

@davte-beijer

Description

@davte-beijer

Summary

When using CAP with the NATS JetStream transport, the subscriber process can crash with a fatal/unhandled exception if the NATS server goes down while a message is being processed. Although the user handler may be wrapped in try/catch, CAP still performs JetStream ACK/NAK after the handler completes/fails, and those calls (Msg.Ack() / Msg.Nak()) can throw NATSConnectionClosedException when the connection is closed. Because this happens on the subscriber callback/threadpool path, the exception can escape and terminate the process.

Steps to Reproduce

Reproduction steps

Prerequisites

  • A running NATS server with JetStream enabled
  • A .NET service using CAP with the NATS JetStream transport
  • A CAP subscriber with configuration below
  • A CAP subscriber in .NET service with handler that:
    • Does a long-running, multi-step process
    • Publishes additional messages via CAP between steps

Steps

  1. Start NATS (JetStream enabled) and ensure the CAP application can connect.
  2. Start the CAP subscriber service and verify it successfully subscribes to the configured subject(s).
  3. Publish a test message to the subscribed subject that triggers the long-running handler.
  4. While the handler is still running (or right before it returns), stop the NATS server to simulate an outage.
  5. Wait for the handler execution to complete (or fail).

Expected result (current/buggy behavior)

  • The service terminates with an unhandled exception similar to:
    • NATS.Client.NATSConnectionClosedException: Connection is closed.
  • The stack trace shows the crash occurs during JetStream ACK/NAK (Msg.Ack() / Msg.Nak()) inside CAP’s consumer commit/reject flow rather than inside user code.

Expected Behavior

Consuming application should not crash.

Actual Behavior

Consuming application crashes

Log Output

[FTL] An error occurred when installing package: Error while performing handshake with server. See inner exception for more details.
2026-02-06T15:27:03 [15:27:03 INF] Executed subscriber method 'PackageTransferCompletedHandler.HandleAsync' on group 'cap.queue.userland.application.service.v1' with instance 'BEPxfybE8tLj3DM' in 13017,4173ms
2026-02-06T15:27:03 [15:27:03 ERR] An exception occurred when process received message. Message:'DotNetCore.CAP.Messages.TransportMessage'.
2026-02-06T15:27:03 NATS.Client.NATSConnectionClosedException: Connection is closed.
2026-02-06T15:27:03    at NATS.Client.Connection.PublishImpl(String subject, String reply, MsgHeader inHeaders, Byte[] data, Int32 offset, Nullable`1 inCount, Boolean flushBuffer)
2026-02-06T15:27:03    at NATS.Client.Connection.Publish(String subject, Byte[] data)
2026-02-06T15:27:03    at NATS.Client.JetStream.JetStreamMsg.AckReply(AckType ackType, Int64 delayNanoseconds, Int32 timeout)
2026-02-06T15:27:03    at NATS.Client.JetStream.JetStreamMsg.Ack()
2026-02-06T15:27:03    at DotNetCore.CAP.NATS.NATSConsumerClient.CommitAsync(Object sender)
2026-02-06T15:27:03    at DotNetCore.CAP.Internal.ConsumerRegister.<>c__DisplayClass22_0.<<RegisterMessageProcessor>b__0>d.MoveNext()
2026-02-06T15:27:03 Unhandled exception. NATS.Client.NATSConnectionClosedException: Connection is closed.
2026-02-06T15:27:03    at NATS.Client.Connection.PublishImpl(String subject, String reply, MsgHeader inHeaders, Byte[] data, Int32 offset, Nullable`1 inCount, Boolean flushBuffer)
2026-02-06T15:27:03    at NATS.Client.Connection.Publish(String subject, Byte[] data)
2026-02-06T15:27:03    at NATS.Client.JetStream.JetStreamMsg.AckReply(AckType ackType, Int64 delayNanoseconds, Int32 timeout)
2026-02-06T15:27:03    at NATS.Client.JetStream.JetStreamMsg.Nak()
2026-02-06T15:27:03    at DotNetCore.CAP.NATS.NATSConsumerClient.RejectAsync(Object sender)
2026-02-06T15:27:03    at DotNetCore.CAP.Internal.ConsumerRegister.<>c__DisplayClass22_0.<<RegisterMessageProcessor>b__0>d.MoveNext()
2026-02-06T15:27:03 --- End of stack trace from previous location ---
2026-02-06T15:27:03    at DotNetCore.CAP.NATS.NATSConsumerClient.SubscriptionMessageHandler(Object sender, MsgHandlerEventArgs e)
2026-02-06T15:27:03    at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
2026-02-06T15:27:03    at System.Threading.ThreadPoolWorkQueue.Dispatch()
2026-02-06T15:27:03    at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

CAP Configuration

_ = services.AddCap(capOptions =>
    {
        _ = capOptions.UseInMemoryStorage();

        // Setup CAP to use NATS
        _ = capOptions.UseNATS(natsOptions =>
        {
            NATS.Client.Options options = ConnectionFactory.GetDefaultOptions();
            natsOptions.NormalizeStreamName = s => "VolatileStream";
            natsOptions.EnableSubscriberClientStreamAndSubjectCreation = false;

            options.Url = userlandOptions.NatsUrl;
            options.Secure = true;
            options.MaxReconnect = NATS.Client.Options.ReconnectForever;

            options.SetNkey(userlandOptions.PublicKey, (_, args) =>
            {
                NkeyPair? nkeyPair = Nkeys.FromSeed(userlandOptions.Seed);
                args.SignedNonce = nkeyPair.Sign(args.ServerNonce);
            });

            natsOptions.Options = options;
        });

        capOptions.FailedRetryCount = 50;
    })
    .AddSubscriberAssembly(AppDomain.CurrentDomain.GetAssemblies()) // Registers IMessageHandler implementations
    .AddSubscribeFilter<MessageFilter>(); // Add message filter to handle idempotent messages

Transport Used

Nats

Storage Provider

InMemory

Environment

Windows 11 x64

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions