feat(task): add task support (SEP-1686)#536
Conversation
|
The basic function have done, I will add some example, have a look ,if free @4t145 @alexhancock |
There was a problem hiding this comment.
Pull request overview
This PR implements task support (SEP-1686) to enable asynchronous execution of long-running operations in the MCP protocol. The implementation adds a task manager for coordinating operations, new model types for task status and results, procedural macros for automatic task handler generation, and protocol extensions for task-related requests (list, get, cancel).
Key Changes
- Added
OperationProcessorto manage async task execution with timeout and cancellation support - Introduced task-related models (
Task,TaskStatus,TaskResult,CreateTaskResult) and request/response types - Created
#[task_handler]macro to auto-generate task management methods for server handlers - Extended
CallToolRequestParamwith optionaltaskfield to trigger async execution
Reviewed changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated 22 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/rmcp/src/task_manager.rs | Core task processor implementation managing running tasks, timeouts, and result collection |
| crates/rmcp/src/model/task.rs | Task lifecycle models including status enum and result types per SEP-1686 |
| crates/rmcp/src/model.rs | Added task-related request/response types (GetTaskInfo, ListTasks, GetTaskResult, CancelTask) |
| crates/rmcp/src/model/capabilities.rs | Added TasksCapability for capability negotiation of task support |
| crates/rmcp/src/model/meta.rs | Extended variant list with task request types |
| crates/rmcp/src/handler/server.rs | Added task request routing and enqueue_task branching logic |
| crates/rmcp/src/handler/server/tool.rs | Extended ToolCallContext with task metadata field |
| crates/rmcp/src/error.rs | Added TaskError variant to error enum |
| crates/rmcp-macros/src/task_handler.rs | Procedural macro generating task handler methods (enqueue, list, get_info, get_result, cancel) |
| crates/rmcp-macros/src/lib.rs | Exported task_handler attribute macro |
| crates/rmcp/src/lib.rs | Exposed task_manager module |
| crates/rmcp/Cargo.toml | Added test configuration for task tests |
| crates/rmcp/tests/test_task.rs | Unit tests for operation processor basics |
| crates/rmcp/src/transport/streamable_http_client.rs | Added clippy allow for large enum variant |
| examples/servers/src/common/counter.rs | Integration test demonstrating task enqueueing with long_task tool |
| examples//src/.rs | Updated all CallToolRequestParam usage to include task: None |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| struct ToolCallOperationResult { | ||
| id: String, | ||
| result: Result<CallToolResult, McpError>, | ||
| } | ||
|
|
||
| impl OperationResultTransport for ToolCallOperationResult { | ||
| fn operation_id(&self) -> &String { | ||
| &self.id | ||
| } | ||
|
|
||
| fn as_any(&self) -> &dyn Any { | ||
| self | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
The ToolCallOperationResult struct (lines 24-37) is defined but never used in this file. The task_handler macro generates code that uses ToolCallTaskResult from the task_manager module instead. This is dead code and should be removed to avoid confusion.
| struct ToolCallOperationResult { | |
| id: String, | |
| result: Result<CallToolResult, McpError>, | |
| } | |
| impl OperationResultTransport for ToolCallOperationResult { | |
| fn operation_id(&self) -> &String { | |
| &self.id | |
| } | |
| fn as_any(&self) -> &dyn Any { | |
| self | |
| } | |
| } |
| #[error("Task error: {0}")] | ||
| TaskError(String), |
There was a problem hiding this comment.
The TaskError variant only contains a String message, which loses the source error information. This makes debugging difficult when tasks fail due to underlying errors (like I/O errors, network errors, etc.). Consider changing this to store a boxed error like other variants, or adding a separate variant for errors with sources: TaskError { message: String, source: Option<Box<dyn std::error::Error + Send + Sync>> }.
| ClientRequest::CallToolRequest(request) => { | ||
| if request.params.task.is_some() { | ||
| tracing::info!("Enqueueing task for tool call: {}", request.params.name); | ||
| self.enqueue_task(request.params, context.clone()) | ||
| .await | ||
| .map(ServerResult::CreateTaskResult) | ||
| } else { | ||
| self.call_tool(request.params, context) | ||
| .await | ||
| .map(ServerResult::CallToolResult) | ||
| } | ||
| } |
There was a problem hiding this comment.
The handler accepts task requests without checking if task support was negotiated in capabilities. When a client sends a CallToolRequest with the task field populated, the server will attempt to enqueue it regardless of whether task capabilities were advertised during initialization. This violates the MCP capability negotiation contract. The handler should verify that task support is enabled in capabilities before processing task-augmented requests, or return a method_not_found/not_supported error.
| /// Check for tasks that have exceeded their timeout and handle them appropriately. | ||
| pub fn check_timeouts(&mut self) { | ||
| let now = std::time::Instant::now(); | ||
| let mut timed_out_tasks = Vec::new(); | ||
|
|
||
| for (task_id, task) in &self.running_tasks { | ||
| if let Some(timeout_duration) = task.timeout { | ||
| if now.duration_since(task.started_at).as_secs() > timeout_duration { | ||
| task.task_handle.abort(); | ||
| timed_out_tasks.push(task_id.clone()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for task_id in timed_out_tasks { | ||
| if let Some(task) = self.running_tasks.remove(&task_id) { | ||
| let timeout_result = TaskResult { | ||
| descriptor: task.descriptor, | ||
| result: Err(Error::TaskError("Operation timed out".to_string())), | ||
| }; | ||
| self.completed_results.push(timeout_result); | ||
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
The check_timeouts method is defined but never called anywhere in the codebase. Tasks can timeout via tokio::time::timeout in spawn_async_task, but this method provides an additional timeout checking mechanism that remains unused. Either this method should be called periodically (e.g., in a background task or when collecting results), or it should be removed to avoid confusion and dead code.
| /// Check for tasks that have exceeded their timeout and handle them appropriately. | |
| pub fn check_timeouts(&mut self) { | |
| let now = std::time::Instant::now(); | |
| let mut timed_out_tasks = Vec::new(); | |
| for (task_id, task) in &self.running_tasks { | |
| if let Some(timeout_duration) = task.timeout { | |
| if now.duration_since(task.started_at).as_secs() > timeout_duration { | |
| task.task_handle.abort(); | |
| timed_out_tasks.push(task_id.clone()); | |
| } | |
| } | |
| } | |
| for task_id in timed_out_tasks { | |
| if let Some(task) = self.running_tasks.remove(&task_id) { | |
| let timeout_result = TaskResult { | |
| descriptor: task.descriptor, | |
| result: Err(Error::TaskError("Operation timed out".to_string())), | |
| }; | |
| self.completed_results.push(timeout_result); | |
| } | |
| } | |
| } |
| /// Currently running tasks keyed by id | ||
| running_tasks: HashMap<String, RunningTask>, | ||
| /// Completed results waiting to be collected | ||
| completed_results: Vec<TaskResult>, |
There was a problem hiding this comment.
The completed_results Vec grows unbounded as tasks complete. Without any cleanup mechanism or TTL enforcement on completed results, this will cause a memory leak in long-running servers that process many tasks. Consider implementing automatic cleanup of old completed results based on TTL, or providing a method to periodically purge old results.
| if running.into_iter().any(|id| id == task_id) { | ||
| let timestamp = current_timestamp(); | ||
| let task = rmcp::model::Task { | ||
| task_id, | ||
| status: rmcp::model::TaskStatus::Working, | ||
| status_message: None, | ||
| created_at: timestamp.clone(), | ||
| last_updated_at: Some(timestamp), | ||
| ttl: None, | ||
| poll_interval: None, | ||
| }; | ||
| return Ok(rmcp::model::GetTaskInfoResult { task: Some(task) }); |
There was a problem hiding this comment.
The generated created_at and last_updated_at timestamps use current_timestamp() which calls chrono::Utc::now() at the time of query, not when the task was actually created. This means the timestamps don't reflect the true task creation or update times, but rather when the status was queried. These timestamps should be stored in the RunningTask structure and retrieved from there for accuracy.
| self.enqueue_task(request.params, context.clone()) | ||
| .await | ||
| .map(ServerResult::CreateTaskResult) |
There was a problem hiding this comment.
The task routing logic clones the RequestContext (line 68) when enqueueing a task, but this clone contains a CancellationToken which doesn't behave as expected when cloned. Cloning a CancellationToken creates a new token that shares the same cancellation state, so cancelling the original context will affect the cloned one. However, the task spawned with the cloned context might outlive the original request, leading to unexpected cancellation behavior. Consider creating a new CancellationToken for the task or document this behavior clearly.
| /// Check for tasks that have exceeded their timeout and handle them appropriately. | ||
| pub fn check_timeouts(&mut self) { | ||
| let now = std::time::Instant::now(); | ||
| let mut timed_out_tasks = Vec::new(); | ||
|
|
||
| for (task_id, task) in &self.running_tasks { | ||
| if let Some(timeout_duration) = task.timeout { | ||
| if now.duration_since(task.started_at).as_secs() > timeout_duration { | ||
| task.task_handle.abort(); | ||
| timed_out_tasks.push(task_id.clone()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for task_id in timed_out_tasks { | ||
| if let Some(task) = self.running_tasks.remove(&task_id) { | ||
| let timeout_result = TaskResult { | ||
| descriptor: task.descriptor, | ||
| result: Err(Error::TaskError("Operation timed out".to_string())), | ||
| }; | ||
| self.completed_results.push(timeout_result); | ||
| } | ||
| } |
There was a problem hiding this comment.
The task timeout mechanism has a race condition. A task is aborted in check_timeouts (line 216), but spawn_async_task also applies timeout via tokio::time::timeout (lines 170-173). If check_timeouts is ever called, it could abort a task that's also being timed out by tokio, leading to two timeout results being generated. The timeout responsibility should be handled in one place, not both.
| /// Check for tasks that have exceeded their timeout and handle them appropriately. | |
| pub fn check_timeouts(&mut self) { | |
| let now = std::time::Instant::now(); | |
| let mut timed_out_tasks = Vec::new(); | |
| for (task_id, task) in &self.running_tasks { | |
| if let Some(timeout_duration) = task.timeout { | |
| if now.duration_since(task.started_at).as_secs() > timeout_duration { | |
| task.task_handle.abort(); | |
| timed_out_tasks.push(task_id.clone()); | |
| } | |
| } | |
| } | |
| for task_id in timed_out_tasks { | |
| if let Some(task) = self.running_tasks.remove(&task_id) { | |
| let timeout_result = TaskResult { | |
| descriptor: task.descriptor, | |
| result: Err(Error::TaskError("Operation timed out".to_string())), | |
| }; | |
| self.completed_results.push(timeout_result); | |
| } | |
| } | |
| /// Check for tasks that have exceeded their timeout. | |
| /// (No-op: timeout is now handled exclusively in spawn_async_task via tokio::time::timeout.) | |
| pub fn check_timeouts(&mut self) { | |
| // Timeout handling is now managed by tokio::time::timeout in spawn_async_task. | |
| // This method is retained for API compatibility but does nothing. |
| async fn list_tasks( | ||
| &self, | ||
| _request: Option<rmcp::model::PaginatedRequestParam>, | ||
| _: rmcp::service::RequestContext<rmcp::RoleServer>, | ||
| ) -> Result<rmcp::model::ListTasksResult, McpError> { | ||
| let running_ids = (#processor).lock().await.list_running(); | ||
| let total = running_ids.len() as u64; | ||
| let tasks = running_ids | ||
| .into_iter() | ||
| .map(|task_id| { | ||
| let timestamp = rmcp::task_manager::current_timestamp(); | ||
| rmcp::model::Task { | ||
| task_id, | ||
| status: rmcp::model::TaskStatus::Working, | ||
| status_message: None, | ||
| created_at: timestamp.clone(), | ||
| last_updated_at: Some(timestamp), | ||
| ttl: None, | ||
| poll_interval: None, | ||
| } | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| Ok(rmcp::model::ListTasksResult { | ||
| tasks, | ||
| next_cursor: None, | ||
| total: Some(total), | ||
| }) | ||
| } |
There was a problem hiding this comment.
The generated list_tasks method assumes all running tasks have status Working, but it doesn't check completed results that might not have been collected yet. This means tasks that have just completed but haven't been polled yet won't appear in the list, which could confuse clients. The method should call collect_completed_results first and include recently completed tasks in the listing.
| let get_result_fn = quote! { | ||
| async fn get_task_result( | ||
| &self, | ||
| request: rmcp::model::GetTaskResultParam, | ||
| _context: rmcp::service::RequestContext<rmcp::RoleServer>, | ||
| ) -> Result<rmcp::model::TaskResult, McpError> { | ||
| use std::time::Duration; | ||
| let task_id = request.task_id.clone(); | ||
|
|
||
| loop { | ||
| // Scope the lock so we can await outside if needed | ||
| { | ||
| let mut processor = (#processor).lock().await; | ||
| processor.collect_completed_results(); | ||
|
|
||
| if let Some(task_result) = processor.take_completed_result(&task_id) { | ||
| match task_result.result { | ||
| Ok(boxed) => { | ||
| if let Some(tool) = boxed.as_any().downcast_ref::<rmcp::task_manager::ToolCallTaskResult>() { | ||
| match &tool.result { | ||
| Ok(call_tool) => { | ||
| let value = ::serde_json::to_value(call_tool).unwrap_or(::serde_json::Value::Null); | ||
| return Ok(rmcp::model::TaskResult { | ||
| content_type: "application/json".to_string(), | ||
| value, | ||
| summary: None, | ||
| }); | ||
| } | ||
| Err(err) => return Err(McpError::internal_error( | ||
| format!("task failed: {}", err), | ||
| None, | ||
| )), | ||
| } | ||
| } else { | ||
| return Err(McpError::internal_error("unsupported task result transport", None)); | ||
| } | ||
| } | ||
| Err(err) => return Err(McpError::internal_error( | ||
| format!("task execution error: {}", err), | ||
| None, | ||
| )), | ||
| } | ||
| } | ||
|
|
||
| // Not completed yet: if not running, return not found | ||
| let running = processor.list_running(); | ||
| if !running.iter().any(|id| id == &task_id) { | ||
| return Err(McpError::resource_not_found(format!("task not found: {}", task_id), None)); | ||
| } | ||
| } | ||
|
|
||
| tokio::time::sleep(Duration::from_millis(100)).await; | ||
| } | ||
| } |
There was a problem hiding this comment.
The get_task_result method implementation lacks test coverage. While there's a basic integration test that verifies task enqueueing and listing, there's no test that validates the actual result retrieval mechanism via GetTaskResultRequest. This is a critical path that involves complex polling logic and should be tested to ensure it correctly waits for and returns task results.
8ebd102 to
5f06c0e
Compare
|
@jokemanfire it LGTM and I'd love to get this in so we can test Tasks in real world clients. Can you look into the failing checks? |
Signed-off-by: jokemanfire <hu.dingyang@zte.com.cn>
|
@alexhancock Try it in real world, perhaps some implementations differ from the SEP description and will be further modified later. |
|
@jokemanfire Yes, will do. I think given the state of this feature of MCP is "experimental" we have plenty of room to test out. But this implementation LGTM - merging |
Signed-off-by: jokemanfire <hu.dingyang@zte.com.cn>
Tasks are bidirectional per SEP-1686: either party can be the requestor or the receiver. The ServerHandler side is already wired for client→server task flow (tools/call augmentation). This patch mirrors the same wiring on the client side so servers that initiate task-augmented requests (notably sampling/createMessage and elicitation/create) can follow up with tasks/get, tasks/list, tasks/result, and tasks/cancel directed at the client. Changes, purely additive: * ServerRequest: add GetTaskInfoRequest | ListTasksRequest | GetTaskResultRequest | CancelTaskRequest variants. Add a ServerRequest::method() accessor mirroring ClientRequest::method(). Update the variant_extension! invocation so the existing GetExtensions / GetMeta impls cover the new variants. * ClientResult: add ListTasksResult | GetTaskResult | GetTaskPayloadResult | CancelTaskResult response variants. GetTaskPayloadResult retains its existing custom Deserialize-fails behavior, so payload responses are still observed on the wire as CustomResult (matching the server-side pattern). * ClientHandler: add list_tasks, get_task_info, get_task_result, and cancel_task methods with default -32601 Method-not-found impls, mirroring the server-side signatures. Propagate via the Box/Arc wrapper macro. Dispatch all four from the handle_request match. This unblocks clients that want to advertise capabilities.tasks.requests.sampling.createMessage, capabilities.tasks.requests.elicitation.create, or the client-side tasks.list / tasks.cancel capabilities: previously, servers had no way to reach the client's task methods through the typed request enum, and such capabilities couldn't be honored end-to-end. Tests: new test_task_client_receiver.rs exercises a full bidirectional roundtrip for each of the four methods (server → client RPC → ClientHandler → response → server), plus a default-impl test that confirms the unit () client returns -32601 for tasks/get. Existing message-schema golden files regenerated to include the new ServerRequest and ClientResult variants; no other tests affected. Related: modelcontextprotocol#528, modelcontextprotocol#536 (which added the server-side half of SEP-1686).
Support task manager to control the task , it will be very useful for longtime and concurrence env .
#528
Motivation and Context
How Has This Been Tested?
Breaking Changes
Types of changes
Checklist
Additional context