Skip to content

Hardcoded context timeouts in ListMessages cause failures #2432

@cobolbaby

Description

@cobolbaby

Description:

When Apache Kafka read/write throughput is slow, consuming messages based on a specific timestamp can exceed the default timeout (35s), leading to failures.

Image

Expected Behavior

Make the timeout configurable instead of hardcoded.

func (api *Service) ListMessages(
ctx context.Context,
req *connect.Request[v1alpha.ListMessagesRequest],
stream *connect.ServerStream[v1alpha.ListMessagesResponse],
) error {
lmq := httptypes.ListMessagesRequest{
TopicName: req.Msg.GetTopic(),
StartOffset: req.Msg.GetStartOffset(),
StartTimestamp: req.Msg.GetStartTimestamp(),
PartitionID: req.Msg.GetPartitionId(),
MaxResults: int(req.Msg.GetMaxResults()),
FilterInterpreterCode: req.Msg.GetFilterInterpreterCode(),
Enterprise: req.Msg.GetEnterprise(),
PageToken: req.Msg.GetPageToken(),
}
interpreterCode, err := lmq.DecodeInterpreterCode()
if err != nil {
return apierrors.NewConnectError(
connect.CodeInvalidArgument,
fmt.Errorf("failed decoding provided interpreter code: %w", err),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
)
}
// test compile
code := fmt.Sprintf(`var isMessageOk = function() {%s}`, interpreterCode)
_, err = goja.Compile("", code, true)
if err != nil {
return apierrors.NewConnectError(
connect.CodeInvalidArgument,
fmt.Errorf("failed to compile provided interpreter code: %w", err),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
)
}
// Request messages from kafka and return them once we got all the messages or the context is done
listReq := console.ListMessageRequest{
TopicName: lmq.TopicName,
PartitionID: lmq.PartitionID,
StartOffset: lmq.StartOffset,
StartTimestamp: lmq.StartTimestamp,
MessageCount: lmq.MaxResults,
FilterInterpreterCode: interpreterCode,
Troubleshoot: req.Msg.GetTroubleshoot(),
IncludeRawPayload: req.Msg.GetIncludeOriginalRawPayload(),
IgnoreMaxSizeLimit: req.Msg.GetIgnoreMaxSizeLimit(),
KeyDeserializer: fromProtoEncoding(req.Msg.GetKeyDeserializer()),
ValueDeserializer: fromProtoEncoding(req.Msg.GetValueDeserializer()),
PageToken: lmq.PageToken,
PageSize: int(req.Msg.GetPageSize()),
}
timeout := 35 * time.Second
if req.Msg.GetFilterInterpreterCode() != "" || req.Msg.GetStartOffset() == console.StartOffsetNewest {
// Push-down filters and StartOffset = Newest may be long-running streams.
// There's already a client-side provided timeout which we usually trust.
// But additionally we want to ensure it never takes much longer than that.
timeout = 31 * time.Minute
}
ctx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New("list fetch timeout"))
defer cancel()
progress := &streamProgressReporter{
logger: api.logger,
request: &listReq,
stream: stream,
messagesConsumed: atomic.Int64{},
bytesConsumed: atomic.Int64{},
}
progress.Start(ctx)
return api.consoleSvc.ListMessages(ctx, listReq, progress)
}

Environment

console version: v3.5.2 / 3.6.0 / 3.7.2

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions