Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 17 additions & 45 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,24 +552,6 @@ def recycle_cpu_blocks(self, cpu_block_ids):
else:
heapq.heappush(self.cpu_free_block_list, cpu_block_ids)

def _acquire_kvcache_lock(self):
"""Acquire the GPU KV cache lock for the transfer process.

Uses a file-based lock (fcntl.flock) to ensure mutual exclusion
between the worker and the CPU transfer process. This prevents
concurrent GPU KV cache access which may cause NaN errors under
certain DP+EP configurations.
"""
if not envs.FD_USE_KVCACHE_LOCK:
return
self.gpu_cache_lock.acquire()

def _release_kvcache_lock(self):
"""Release the GPU KV cache lock held by the transfer process."""
if not envs.FD_USE_KVCACHE_LOCK:
return
self.gpu_cache_lock.release()

def issue_swap_task(
self,
transfer_task_id,
Expand All @@ -590,14 +572,12 @@ def issue_swap_task(
is_sync: bool, whether to wait for the result of the swap task
"""
assert is_sync, "Only support is sync for swap_task now."
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug 缺少充分的解释和测试验证

移除 IPCLock 机制是一个重大的变更,但 PR 描述中没有说明:

  1. 为什么移除锁反而能避免 NaN 问题?
  2. 之前锁的实现是如何导致 NaN 的?
  3. 移除锁后是否有其他机制保证并发访问 GPU KV cache 的安全性?
  4. 在哪些配置下进行了验证?

建议

  • 在 PR 描述中详细说明问题根因和解决方案
  • 提供相关的 Issue 链接或问题复现步骤
  • 如果有 DP+EP 配置的测试结果,请提供

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 缺少单元测试

移除锁机制会影响 issue_swap_taskissue_write_back_storage_taskissue_prefetch_storage_task 的并发行为,建议添加单元测试来验证:

  1. 移除锁后,并发访问 GPU KV cache 不会导致数据竞争
  2. swap/storage 操作的正确性不受影响

相关测试文件:tests/cache_manager/test_prefix_cache_manager.py

self._acquire_kvcache_lock()
self.task_swapping_event[transfer_task_id] = Event()
self.cache_task_queue.put_transfer_task(
(event_type, transfer_task_id, swap_node_ids, gpu_block_ids, cpu_block_ids)
)
if is_sync:
Comment on lines 575 to 579
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里移除了 assert is_sync 后,is_sync=False 时会创建 task_swapping_event[transfer_task_id] 但不会进入 sync_swap_task() 清理,导致 task_swapping_event 可能持续增长(长跑进程内存/句柄泄漏)。如果仍只支持同步模式,建议保留断言/显式抛错;如果希望支持异步模式,需要提供对应的事件清理/等待接口或返回可用于后续清理的句柄。

Copilot uses AI. Check for mistakes.
self.sync_swap_task(transfer_task_id)
self._release_kvcache_lock()

def sync_swap_task(self, transfer_task_id):
"""
Expand Down Expand Up @@ -1143,20 +1123,17 @@ def issue_write_back_storage_task(self, task: WriteStorageTask, is_sync=True):
if self.kvcache_storage_backend is None:
return

assert is_sync, "Only support is_sync=True for now."
self._acquire_kvcache_lock()
try:
if len(task.keys) != len(task.gpu_block_ids):
err_msg = f"write_back_storage error: hash_keys({len(task.keys)}) != gpu_block_ids({len(task.gpu_block_ids)})"
logger.error(err_msg)
raise ValueError(err_msg)

self.task_write_back_event[task.task_id] = Event()
self.cache_task_queue.put_transfer_task((CacheStatus.GPU2STORAGE, task))
if is_sync:
self.wait_write_storage_task(task.task_id)
finally:
self._release_kvcache_lock()
if len(task.keys) != len(task.gpu_block_ids):
err_msg = (
f"write_back_storage error: hash_keys({len(task.keys)}) != gpu_block_ids({len(task.gpu_block_ids)})"
)
logger.error(err_msg)
raise ValueError(err_msg)

self.task_write_back_event[task.task_id] = Event()
self.cache_task_queue.put_transfer_task((CacheStatus.GPU2STORAGE, task))
if is_sync:
Comment on lines +1132 to +1135
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue_write_back_storage_task() 现在允许 is_sync=False 路径,但该路径会留下 task_write_back_event[task.task_id] 且没有任何地方负责删除;历史上通过 assert is_sync 避免了这类泄漏。建议要么恢复“仅支持同步”的约束,要么补齐异步任务的生命周期管理(事件清理/回收与调用方式)。

Copilot uses AI. Check for mistakes.
self.wait_write_storage_task(task.task_id)

def wait_write_storage_task(self, req_id):
"""
Expand All @@ -1173,18 +1150,13 @@ def issue_prefetch_storage_task(self, task: ReadStorageTask, is_sync=True):
if self.kvcache_storage_backend is None:
return []

assert is_sync, "Only support is_sync=True for now."
self._acquire_kvcache_lock()
storage_block_ids = []
self.task_prefetch_event[task.task_id] = Event()
# issue task to cache_transfer_manager
self.cache_task_queue.put_transfer_task((CacheStatus.STORAGE2GPU, task))
if is_sync:
Comment on lines +1153 to +1157
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue_prefetch_storage_task() 同样移除了仅同步的限制;当 is_sync=False 时会残留 task_prefetch_event[task.task_id](以及后续可能的 storage_prefetch_block_ids),除非调用方显式再调用 wait_prefetch_storage_task(),否则会造成状态/内存泄漏。建议恢复同步限制,或明确异步 API 的使用方式并保证事件在完成后可被清理。

Copilot uses AI. Check for mistakes.
storage_block_ids = self.wait_prefetch_storage_task(task.task_id)

try:
storage_block_ids = []
self.task_prefetch_event[task.task_id] = Event()
# issue task to cache_transfer_manager
self.cache_task_queue.put_transfer_task((CacheStatus.STORAGE2GPU, task))
if is_sync:
storage_block_ids = self.wait_prefetch_storage_task(task.task_id)
finally:
self._release_kvcache_lock()
return storage_block_ids

def wait_prefetch_storage_task(self, req_id):
Expand Down
13 changes: 0 additions & 13 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
from fastdeploy.inter_communicator import (
EngineCacheQueue,
EngineWorkerQueue,
IPCLock,
IPCSignal,
ZmqIpcServer,
ZmqTcpServer,
Expand Down Expand Up @@ -187,10 +186,6 @@ def __init__(self, cfg: FDConfig, start_queue=True, use_async_llm=False):
)
self._init_worker_monitor_signals()

# Pass the GPU KV cache lock to cache_manager for mutual exclusion
# between the CPU transfer process and the worker process.
self.resource_manager.cache_manager.gpu_cache_lock = self.gpu_cache_lock

if self.cfg.eplb_config.enable_eplb:
current_suffix = self.cfg.parallel_config.local_engine_worker_queue_port
init_eplb_signals(cfg, current_suffix)
Expand Down Expand Up @@ -401,14 +396,6 @@ def _init_worker_monitor_signals(self): # exist_task_signal 用于各worker进
create=True,
)

# gpu_cache_lock: file-based lock for mutual exclusion between worker
# and CPU transfer when accessing GPU KV cache.
self.gpu_cache_lock = IPCLock(
name="gpu_cache_lock",
suffix=current_suffix,
create=True,
)

def start_worker_queue_service(self, start_queue):
"""
start queue service for engine worker communication
Expand Down
6 changes: 0 additions & 6 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,6 @@
"FILE_BACKEND_STORAGE_DIR": lambda: str(os.getenv("FILE_BACKEND_STORAGE_DIR", "/tmp/fastdeploy")),
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里删除了环境变量 FD_USE_KVCACHE_LOCK 的定义后,任何依赖 fastdeploy.envs.FD_USE_KVCACHE_LOCK 的外部调用都会直接触发 AttributeErrorenvs.__getattr__ 仅对已注册 key 生效)。如果需要兼容已有用户/脚本,建议至少保留该 env 作为废弃项(即使不再生效也应保持可读),或在变更说明中明确迁移方式。

Suggested change
"FILE_BACKEND_STORAGE_DIR": lambda: str(os.getenv("FILE_BACKEND_STORAGE_DIR", "/tmp/fastdeploy")),
"FILE_BACKEND_STORAGE_DIR": lambda: str(os.getenv("FILE_BACKEND_STORAGE_DIR", "/tmp/fastdeploy")),
# Deprecated: keep this env readable for backward compatibility even if it is no longer used.
"FD_USE_KVCACHE_LOCK": lambda: bool(int(os.getenv("FD_USE_KVCACHE_LOCK", "0"))),

Copilot uses AI. Check for mistakes.
# Whether to use PD REORDER, can set 0 or 1
"FD_PD_REORDER": lambda: int(os.getenv("FD_PD_REORDER", "0")),
# Whether to enable KV cache lock, enforcing mutual exclusion between
# PrefixCacheManager and Worker when accessing GPU KV cache.
# Under certain DP+EP configurations, concurrent access (even read-only)
# has been observed to cause NaN computation errors.
# Set to 1 to enable the lock; defaults to 0 (disabled).
"FD_USE_KVCACHE_LOCK": lambda: bool(int(os.getenv("FD_USE_KVCACHE_LOCK", "0"))),
# Whether to probe MoE routing probabilities and use Fleet's fused SwiGLU kernel.
"FD_MOE_PROB_IN_ADVANCE": lambda: bool(int(os.getenv("FD_MOE_PROB_IN_ADVANCE", "0"))),
# Suspend rollouting routing replay
Expand Down
3 changes: 1 addition & 2 deletions fastdeploy/inter_communicator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from .engine_cache_queue import EngineCacheQueue
from .engine_worker_queue import EngineWorkerQueue
from .ipc_signal import IPCLock, IPCSignal, shared_memory_exists
from .ipc_signal import IPCSignal, shared_memory_exists
from .ipc_signal_const import (
Comment on lines 17 to 20
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fastdeploy.inter_communicator 不再导出 IPCLock(import 与 __all__ 均移除),这会破坏外部代码的 from fastdeploy.inter_communicator import IPCLock 用法。若该符号曾被用户侧使用,建议提供一段过渡期(保留别名/标记弃用并在日志或文档中提示),避免作为 BugFix 引入隐式破坏性变更。

Copilot uses AI. Check for mistakes.
ExistTaskStatus,
KVCacheStatus,
Expand All @@ -31,7 +31,6 @@
"ZmqIpcClient",
"ZmqIpcServer",
"ZmqTcpServer",
"IPCLock",
"IPCSignal",
"EngineWorkerQueue",
"EngineCacheQueue",
Expand Down
57 changes: 0 additions & 57 deletions fastdeploy/inter_communicator/ipc_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
# limitations under the License.
"""

import fcntl
import os
from multiprocessing.shared_memory import SharedMemory

import numpy as np
Expand Down Expand Up @@ -116,58 +114,3 @@ def clear(self) -> None:
if shared_memory_exists(self.shm.name):
self.shm.close()
self.shm.unlink()


class IPCLock:
"""A file-based inter-process lock using fcntl.flock.

Provides mutual exclusion between processes that may be spawned via
subprocess (not just fork/multiprocessing). Lock files are stored in
/dev/shm/ for performance, falling back to /tmp/.

Args:
name: Unique identifier for the lock.
suffix: Optional suffix appended to the name to avoid conflicts
when multiple engines are launched.
create: If True, creates the lock file; otherwise opens an
existing one.
"""

def __init__(self, name: str, suffix: int = None, create: bool = True) -> None:
if suffix is not None:
name = f"{name}.{suffix}"

lock_dir = "/dev/shm" if os.path.isdir("/dev/shm") else "/tmp"
self._lock_path = os.path.join(lock_dir, f"fd_lock_{name}")

if create:
llm_logger.debug(f"creating ipc lock: {self._lock_path}")
# Use restrictive permissions to avoid other users acquiring the lock.
self._fd = os.open(self._lock_path, os.O_CREAT | os.O_RDWR, 0o600)
else:
llm_logger.debug(f"attaching ipc lock: {self._lock_path}")
try:
self._fd = os.open(self._lock_path, os.O_RDWR)
except FileNotFoundError as e:
llm_logger.error(
f"Failed to attach IPC lock: {self._lock_path} does not exist. "
"Ensure that the lock has been created (create=True) with the same "
"name and suffix before attaching."
)
raise RuntimeError(f"IPC lock file not found: {self._lock_path}") from e

def acquire(self) -> None:
"""Acquire the lock (blocking). Uses kernel-level flock for atomicity."""
fcntl.flock(self._fd, fcntl.LOCK_EX)

def release(self) -> None:
"""Release the lock."""
fcntl.flock(self._fd, fcntl.LOCK_UN)

def clear(self) -> None:
"""Close the file descriptor and remove the lock file."""
os.close(self._fd)
try:
os.unlink(self._lock_path)
except FileNotFoundError:
pass
40 changes: 0 additions & 40 deletions fastdeploy/worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue
from fastdeploy.inter_communicator import (
ExistTaskStatus,
IPCLock,
IPCSignal,
ModelWeightsStatus,
RearrangeExpertStatus,
Expand Down Expand Up @@ -285,14 +284,6 @@ def init_health_status(self) -> None:
create=False,
)

# gpu_cache_lock: file-based lock for mutual exclusion between worker
# and CPU transfer when accessing GPU KV cache.
self.gpu_cache_lock = IPCLock(
name="gpu_cache_lock",
suffix=self.parallel_config.local_engine_worker_queue_port,
create=False,
)

def update_weights_from_tensor(self, mmap_infos):
"""
update_weights_from_tensor
Expand Down Expand Up @@ -435,35 +426,6 @@ def _run_eplb(self, tp_rank):
self.rearrange_experts_signal.value[0] = RearrangeExpertStatus.DONE.value
logger.info("redundant_expert: done")

def _acquire_kvcache_lock(self, tp_rank):
"""Acquire the GPU KV cache lock for the worker process.

Uses a file-based lock (fcntl.flock) to ensure mutual exclusion
between the worker and the CPU transfer process during model
execution. Only rank 0 acquires the lock to avoid deadlock among
tensor-parallel workers.

Args:
tp_rank: Tensor parallel rank of the current worker. Only rank 0
acquires the lock.
"""
if not envs.FD_USE_KVCACHE_LOCK:
return
if tp_rank == 0:
self.gpu_cache_lock.acquire()

def _release_kvcache_lock(self, tp_rank):
"""Release the GPU KV cache lock held by the worker process.

Args:
tp_rank: Tensor parallel rank of the current worker. Only rank 0
releases the lock.
"""
if not envs.FD_USE_KVCACHE_LOCK:
return
if tp_rank == 0:
self.gpu_cache_lock.release()

def event_loop_normal(self) -> None:
"""Main event loop for Paddle Distributed Workers.
TODO(gongshaotian): support remote calling of functions that control worker.
Expand Down Expand Up @@ -633,9 +595,7 @@ def event_loop_normal(self) -> None:
# These generated tokens can be obtained through get_output op.
start_execute_time = time.time()

self._acquire_kvcache_lock(tp_rank)
self.worker.execute_model(req_dicts, max_occupied_batch_index)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug 潜在的并发安全问题

移除 worker.execute_model 前后的锁调用后,需要确认以下场景的安全性:

  1. Worker 执行模型时,是否有其他进程同时访问相同的 GPU KV cache block?
  2. Cache transfer 任务是否可能与模型执行并发访问相同的 GPU memory?
  3. 在 TP 场景下,多个 worker 的同步机制是否仍然有效?

建议:请提供并发场景下的验证结果或说明现有的同步机制

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 缺少性能影响说明

移除锁机制可能会影响性能,建议说明:

  1. 移除锁对吞吐量和延迟的影响
  2. 是否有 benchmark 对比数据
  3. 锁本身是否存在性能瓶颈需要移除

self._release_kvcache_lock(tp_rank)

# Only v0 use this signal
if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
Expand Down
Loading