Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 64 additions & 20 deletions Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ extension Transaction {
case requestHeadSent
case producing
case paused(continuation: CheckedContinuation<Void, Error>?)
case endForwarded
case finished
}

Expand Down Expand Up @@ -97,7 +98,8 @@ extension Transaction {
bodyStreamContinuation: CheckedContinuation<Void, Error>?
)

case failRequestStreamContinuation(CheckedContinuation<Void, Error>, Error)
case failRequestStreamContinuation(CheckedContinuation<Void, Error>, Error, HTTPRequestExecutor)
case cancelExecutor(HTTPRequestExecutor)
}

mutating func fail(_ error: Error) -> FailAction {
Expand Down Expand Up @@ -135,7 +137,7 @@ extension Transaction {
bodyStreamContinuation: continuation
)

case .requestHeadSent, .finished, .producing, .paused(continuation: .none):
case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none):
self.state = .finished(error: error)
return .failResponseHead(
context.continuation,
Expand All @@ -156,12 +158,29 @@ extension Transaction {
context.executor,
bodyStreamContinuation: bodyStreamContinuation
)
case .finished, .producing, .requestHeadSent:
case .endForwarded, .finished, .producing, .requestHeadSent:
return .failResponseStream(source, error, context.executor, bodyStreamContinuation: nil)
}

case .finished(error: _),
.executing(_, _, .finished):
case .executing(let context, let requestStreamState, .finished):
// an error occured after full response received, but before the full request was sent
self.state = .finished(error: error)
switch requestStreamState {
case .paused(let bodyStreamContinuation):
if let bodyStreamContinuation {
return .failRequestStreamContinuation(
bodyStreamContinuation,
error,
context.executor
)
} else {
return .cancelExecutor(context.executor)
}
case .endForwarded, .finished, .producing, .requestHeadSent:
return .cancelExecutor(context.executor)
}

case .finished(error: _):
return .none
}
}
Expand Down Expand Up @@ -232,7 +251,7 @@ extension Transaction {
self.state = .executing(context, .producing, responseState)
return .resumeStream(continuation)

case .executing(_, .finished, _):
case .executing(_, .endForwarded, _), .executing(_, .finished, _):
// the channels writability changed to writable after we have forwarded all the
// request bytes. Can be ignored.
return .none
Expand All @@ -254,6 +273,7 @@ extension Transaction {
self.state = .executing(context, .paused(continuation: nil), responseSteam)

case .executing(_, .paused, _),
.executing(_, .endForwarded, _),
.executing(_, .finished, _),
.finished:
// the channels writability changed to paused after we have already forwarded all
Expand Down Expand Up @@ -298,7 +318,7 @@ extension Transaction {
"A write continuation already exists, but we tried to set another one. Invalid state: \(self.state)"
)

case .finished, .executing(_, .finished, _):
case .finished, .executing(_, .endForwarded, _), .executing(_, .finished, _):
return .fail
}
}
Expand All @@ -309,6 +329,7 @@ extension Transaction {
.queued,
.deadlineExceededWhileQueued,
.executing(_, .requestHeadSent, _),
.executing(_, .endForwarded, _),
.executing(_, .finished, _):
preconditionFailure(
"A request stream can only produce, if the request was started. Invalid state: \(self.state)"
Expand Down Expand Up @@ -343,6 +364,7 @@ extension Transaction {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, .endForwarded, _),
.executing(_, .finished, _):
preconditionFailure("Invalid state: \(self.state)")

Expand All @@ -355,23 +377,36 @@ extension Transaction {
.executing(let context, .paused(continuation: .none), let responseState),
.executing(let context, .requestHeadSent, let responseState):

switch responseState {
case .finished:
// if the response stream has already finished before the request, we must succeed
// the final continuation.
self.state = .finished(error: nil)
return .forwardStreamFinished(context.executor)

case .waitingForResponseHead, .streamingBody:
self.state = .executing(context, .finished, responseState)
return .forwardStreamFinished(context.executor)
}
self.state = .executing(context, .endForwarded, responseState)
return .forwardStreamFinished(context.executor)

case .finished:
return .none
}
}

mutating func requestBodyStreamSent() {
switch self.state {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, .requestHeadSent, _),
.executing(_, .finished, _),
.executing(_, .producing, _),
.executing(_, .paused, _):
preconditionFailure("Invalid state: \(self.state)")

case .executing(_, .endForwarded, .finished):
self.state = .finished(error: nil)

case .executing(let context, .endForwarded, let responseState):
self.state = .executing(context, .finished, responseState)

case .finished:
break
}
}

// MARK: - Response -

enum ReceiveResponseHeadAction {
Expand Down Expand Up @@ -482,7 +517,7 @@ extension Transaction {
switch requestState {
case .finished:
self.state = .finished(error: nil)
case .paused, .producing, .requestHeadSent:
case .paused, .producing, .requestHeadSent, .endForwarded:
self.state = .executing(context, requestState, .finished)
}
return .finishResponseStream(source, finalBody: newChunks)
Expand All @@ -497,6 +532,15 @@ extension Transaction {
}
}

mutating func httpResponseStreamTerminated() -> FailAction {
switch self.state {
case .executing(_, _, .finished), .finished:
return .none
default:
return self.fail(HTTPClientError.cancelled)
}
}

enum DeadlineExceededAction {
case none
case cancelSchedulerOnly(scheduler: HTTPRequestScheduler)
Expand Down Expand Up @@ -538,7 +582,7 @@ extension Transaction {
executor: context.executor,
bodyStreamContinuation: continuation
)
case .requestHeadSent, .finished, .producing, .paused(continuation: .none):
case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none):
self.state = .finished(error: error)
return .cancel(
requestContinuation: context.continuation,
Expand Down
27 changes: 23 additions & 4 deletions Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ final class Transaction:
break

case .forwardStreamFinished(let executor):
executor.finishRequestBodyStream(self, promise: nil)
executor.finishRequestBodyStream(trailers: nil, request: self, promise: nil)
}
return
}
Expand Down Expand Up @@ -206,7 +206,9 @@ extension Transaction: HTTPExecutableRequest {
}
}

func requestHeadSent() {}
func requestHeadSent() {
// protocol requirement. Intentionally not needed.
}

func resumeRequestBodyStream() {
let action = self.state.withLockedValue { state in
Expand Down Expand Up @@ -245,6 +247,12 @@ extension Transaction: HTTPExecutableRequest {
}
}

func requestBodyStreamSent() {
self.state.withLockedValue { state in
state.requestBodyStreamSent()
}
}

// MARK: Response

func receiveResponseHead(_ head: HTTPResponseHead) {
Expand Down Expand Up @@ -302,6 +310,13 @@ extension Transaction: HTTPExecutableRequest {
}
}

func httpResponseStreamTerminated() {
let action = self.state.withLockedValue { state in
state.httpResponseStreamTerminated()
}
self.performFailAction(action)
}

func fail(_ error: Error) {
let action = self.state.withLockedValue { state in
state.fail(error)
Expand All @@ -325,8 +340,12 @@ extension Transaction: HTTPExecutableRequest {
requestBodyStreamContinuation?.resume(throwing: error)
executor.cancelRequest(self)

case .failRequestStreamContinuation(let bodyStreamContinuation, let error):
case .failRequestStreamContinuation(let bodyStreamContinuation, let error, let executor):
bodyStreamContinuation.resume(throwing: error)
executor.cancelRequest(self)

case .cancelExecutor(let executor):
executor.cancelRequest(self)
}
}

Expand Down Expand Up @@ -369,6 +388,6 @@ extension Transaction: NIOAsyncSequenceProducerDelegate {

@usableFromInline
func didTerminate() {
self.fail(HTTPClientError.cancelled)
self.httpResponseStreamTerminated()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,45 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
case .sendBodyPart(let part, let writePromise):
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise)

case .sendRequestEnd(let writePromise):
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
case .sendRequestEnd(let trailers, let writePromise, let finalAction):

let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We need to defer succeeding the old request to avoid ordering issues

writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in
guard let oldRequest = self.request else {
// in the meantime an error might have happened, which is why this request is
// not reference anymore.
return
}
oldRequest.requestBodyStreamSent()
switch result {
case .success:
// If our final action is not `none`, that means we've already received
// the complete response. As a result, once we've uploaded all the body parts
// we need to tell the pool that the connection is idle or, if we were asked to
// close when we're done, send the close. Either way, we then succeed the request

switch finalAction {
case .none:
break

case .informConnectionIsIdle:
self.request = nil
self.onConnectionIdle()

case .close:
self.request = nil
context.close(promise: nil)
}

case .failure(let error):
context.close(promise: nil)
oldRequest.fail(error)
}
}

context.writeAndFlush(self.wrapOutboundOut(.end(trailers)), promise: writePromise)

if let readTimeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
self.runTimeoutAction(readTimeoutAction, context: context)
Expand Down Expand Up @@ -300,7 +337,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
// that the request is neither failed nor finished yet
self.request!.receiveResponseBodyParts(buffer)

case .succeedRequest(let finalAction, let buffer):
case .forwardResponseEnd(let finalAction, let buffer, let trailers):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet

Expand All @@ -312,41 +349,22 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
// other way around.

let oldRequest = self.request!
self.request = nil
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
self.runTimeoutAction(.clearIdleWriteTimeoutTimer, context: context)

switch finalAction {
case .close:
self.request = nil
context.close(promise: nil)
oldRequest.receiveResponseEnd(buffer, trailers: nil)
case .sendRequestEnd(let writePromise, let shouldClose):
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We need to defer succeeding the old request to avoid ordering issues
writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in
switch result {
case .success:
// If our final action was `sendRequestEnd`, that means we've already received
// the complete response. As a result, once we've uploaded all the body parts
// we need to tell the pool that the connection is idle or, if we were asked to
// close when we're done, send the close. Either way, we then succeed the request
if shouldClose {
context.close(promise: nil)
} else {
self.onConnectionIdle()
}

oldRequest.receiveResponseEnd(buffer, trailers: nil)
case .failure(let error):
context.close(promise: nil)
oldRequest.fail(error)
}
}
oldRequest.receiveResponseEnd(buffer, trailers: trailers)

case .none:
oldRequest.receiveResponseEnd(buffer, trailers: trailers)

context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
case .informConnectionIsIdle:
self.request = nil
self.onConnectionIdle()
oldRequest.receiveResponseEnd(buffer, trailers: nil)
oldRequest.receiveResponseEnd(buffer, trailers: trailers)
}

case .failRequest(let error, let finalAction):
Expand Down Expand Up @@ -484,14 +502,18 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
self.run(action, context: context)
}

fileprivate func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
fileprivate func finishRequestBodyStream0(
trailers: HTTPHeaders?,
request: HTTPExecutableRequest,
promise: EventLoopPromise<Void>?
) {
guard self.request === request, let context = self.channelContext else {
// See code comment in `writeRequestBodyPart0`
promise?.fail(HTTPClientError.requestStreamCancelled)
return
}

let action = self.state.requestStreamFinished(promise: promise)
let action = self.state.requestStreamFinished(trailers: trailers, promise: promise)
self.run(action, context: context)
}

Expand Down Expand Up @@ -545,9 +567,9 @@ extension HTTP1ClientChannelHandler {
}
}

func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
func finishRequestBodyStream(trailers: HTTPHeaders?, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
self.loopBound.execute {
$0.finishRequestBodyStream0(request, promise: promise)
$0.finishRequestBodyStream0(trailers: trailers, request: request, promise: promise)
}
}

Expand Down
Loading
Loading