diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index a12add0c44b..3d88e199d27 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -552,24 +552,6 @@ def recycle_cpu_blocks(self, cpu_block_ids): else: heapq.heappush(self.cpu_free_block_list, cpu_block_ids) - def _acquire_kvcache_lock(self): - """Acquire the GPU KV cache lock for the transfer process. - - Uses a file-based lock (fcntl.flock) to ensure mutual exclusion - between the worker and the CPU transfer process. This prevents - concurrent GPU KV cache access which may cause NaN errors under - certain DP+EP configurations. - """ - if not envs.FD_USE_KVCACHE_LOCK: - return - self.gpu_cache_lock.acquire() - - def _release_kvcache_lock(self): - """Release the GPU KV cache lock held by the transfer process.""" - if not envs.FD_USE_KVCACHE_LOCK: - return - self.gpu_cache_lock.release() - def issue_swap_task( self, transfer_task_id, @@ -590,14 +572,12 @@ def issue_swap_task( is_sync: bool, whether to wait for the result of the swap task """ assert is_sync, "Only support is sync for swap_task now." - self._acquire_kvcache_lock() self.task_swapping_event[transfer_task_id] = Event() self.cache_task_queue.put_transfer_task( (event_type, transfer_task_id, swap_node_ids, gpu_block_ids, cpu_block_ids) ) if is_sync: self.sync_swap_task(transfer_task_id) - self._release_kvcache_lock() def sync_swap_task(self, transfer_task_id): """ @@ -1143,20 +1123,17 @@ def issue_write_back_storage_task(self, task: WriteStorageTask, is_sync=True): if self.kvcache_storage_backend is None: return - assert is_sync, "Only support is_sync=True for now." - self._acquire_kvcache_lock() - try: - if len(task.keys) != len(task.gpu_block_ids): - err_msg = f"write_back_storage error: hash_keys({len(task.keys)}) != gpu_block_ids({len(task.gpu_block_ids)})" - logger.error(err_msg) - raise ValueError(err_msg) - - self.task_write_back_event[task.task_id] = Event() - self.cache_task_queue.put_transfer_task((CacheStatus.GPU2STORAGE, task)) - if is_sync: - self.wait_write_storage_task(task.task_id) - finally: - self._release_kvcache_lock() + if len(task.keys) != len(task.gpu_block_ids): + err_msg = ( + f"write_back_storage error: hash_keys({len(task.keys)}) != gpu_block_ids({len(task.gpu_block_ids)})" + ) + logger.error(err_msg) + raise ValueError(err_msg) + + self.task_write_back_event[task.task_id] = Event() + self.cache_task_queue.put_transfer_task((CacheStatus.GPU2STORAGE, task)) + if is_sync: + self.wait_write_storage_task(task.task_id) def wait_write_storage_task(self, req_id): """ @@ -1173,18 +1150,13 @@ def issue_prefetch_storage_task(self, task: ReadStorageTask, is_sync=True): if self.kvcache_storage_backend is None: return [] - assert is_sync, "Only support is_sync=True for now." - self._acquire_kvcache_lock() + storage_block_ids = [] + self.task_prefetch_event[task.task_id] = Event() + # issue task to cache_transfer_manager + self.cache_task_queue.put_transfer_task((CacheStatus.STORAGE2GPU, task)) + if is_sync: + storage_block_ids = self.wait_prefetch_storage_task(task.task_id) - try: - storage_block_ids = [] - self.task_prefetch_event[task.task_id] = Event() - # issue task to cache_transfer_manager - self.cache_task_queue.put_transfer_task((CacheStatus.STORAGE2GPU, task)) - if is_sync: - storage_block_ids = self.wait_prefetch_storage_task(task.task_id) - finally: - self._release_kvcache_lock() return storage_block_ids def wait_prefetch_storage_task(self, req_id): diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index a7ca50262b6..22f0a278874 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -60,7 +60,6 @@ from fastdeploy.inter_communicator import ( EngineCacheQueue, EngineWorkerQueue, - IPCLock, IPCSignal, ZmqIpcServer, ZmqTcpServer, @@ -187,10 +186,6 @@ def __init__(self, cfg: FDConfig, start_queue=True, use_async_llm=False): ) self._init_worker_monitor_signals() - # Pass the GPU KV cache lock to cache_manager for mutual exclusion - # between the CPU transfer process and the worker process. - self.resource_manager.cache_manager.gpu_cache_lock = self.gpu_cache_lock - if self.cfg.eplb_config.enable_eplb: current_suffix = self.cfg.parallel_config.local_engine_worker_queue_port init_eplb_signals(cfg, current_suffix) @@ -401,14 +396,6 @@ def _init_worker_monitor_signals(self): # exist_task_signal 用于各worker进 create=True, ) - # gpu_cache_lock: file-based lock for mutual exclusion between worker - # and CPU transfer when accessing GPU KV cache. - self.gpu_cache_lock = IPCLock( - name="gpu_cache_lock", - suffix=current_suffix, - create=True, - ) - def start_worker_queue_service(self, start_queue): """ start queue service for engine worker communication diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 2acdb6ce12b..43e376a3d72 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -217,12 +217,6 @@ "FILE_BACKEND_STORAGE_DIR": lambda: str(os.getenv("FILE_BACKEND_STORAGE_DIR", "/tmp/fastdeploy")), # Whether to use PD REORDER, can set 0 or 1 "FD_PD_REORDER": lambda: int(os.getenv("FD_PD_REORDER", "0")), - # Whether to enable KV cache lock, enforcing mutual exclusion between - # PrefixCacheManager and Worker when accessing GPU KV cache. - # Under certain DP+EP configurations, concurrent access (even read-only) - # has been observed to cause NaN computation errors. - # Set to 1 to enable the lock; defaults to 0 (disabled). - "FD_USE_KVCACHE_LOCK": lambda: bool(int(os.getenv("FD_USE_KVCACHE_LOCK", "0"))), # Whether to probe MoE routing probabilities and use Fleet's fused SwiGLU kernel. "FD_MOE_PROB_IN_ADVANCE": lambda: bool(int(os.getenv("FD_MOE_PROB_IN_ADVANCE", "0"))), # Suspend rollouting routing replay diff --git a/fastdeploy/inter_communicator/__init__.py b/fastdeploy/inter_communicator/__init__.py index d3358d9ff81..6331e06b955 100644 --- a/fastdeploy/inter_communicator/__init__.py +++ b/fastdeploy/inter_communicator/__init__.py @@ -16,7 +16,7 @@ from .engine_cache_queue import EngineCacheQueue from .engine_worker_queue import EngineWorkerQueue -from .ipc_signal import IPCLock, IPCSignal, shared_memory_exists +from .ipc_signal import IPCSignal, shared_memory_exists from .ipc_signal_const import ( ExistTaskStatus, KVCacheStatus, @@ -31,7 +31,6 @@ "ZmqIpcClient", "ZmqIpcServer", "ZmqTcpServer", - "IPCLock", "IPCSignal", "EngineWorkerQueue", "EngineCacheQueue", diff --git a/fastdeploy/inter_communicator/ipc_signal.py b/fastdeploy/inter_communicator/ipc_signal.py index 129141933f7..9e105eaacf3 100644 --- a/fastdeploy/inter_communicator/ipc_signal.py +++ b/fastdeploy/inter_communicator/ipc_signal.py @@ -14,8 +14,6 @@ # limitations under the License. """ -import fcntl -import os from multiprocessing.shared_memory import SharedMemory import numpy as np @@ -116,58 +114,3 @@ def clear(self) -> None: if shared_memory_exists(self.shm.name): self.shm.close() self.shm.unlink() - - -class IPCLock: - """A file-based inter-process lock using fcntl.flock. - - Provides mutual exclusion between processes that may be spawned via - subprocess (not just fork/multiprocessing). Lock files are stored in - /dev/shm/ for performance, falling back to /tmp/. - - Args: - name: Unique identifier for the lock. - suffix: Optional suffix appended to the name to avoid conflicts - when multiple engines are launched. - create: If True, creates the lock file; otherwise opens an - existing one. - """ - - def __init__(self, name: str, suffix: int = None, create: bool = True) -> None: - if suffix is not None: - name = f"{name}.{suffix}" - - lock_dir = "/dev/shm" if os.path.isdir("/dev/shm") else "/tmp" - self._lock_path = os.path.join(lock_dir, f"fd_lock_{name}") - - if create: - llm_logger.debug(f"creating ipc lock: {self._lock_path}") - # Use restrictive permissions to avoid other users acquiring the lock. - self._fd = os.open(self._lock_path, os.O_CREAT | os.O_RDWR, 0o600) - else: - llm_logger.debug(f"attaching ipc lock: {self._lock_path}") - try: - self._fd = os.open(self._lock_path, os.O_RDWR) - except FileNotFoundError as e: - llm_logger.error( - f"Failed to attach IPC lock: {self._lock_path} does not exist. " - "Ensure that the lock has been created (create=True) with the same " - "name and suffix before attaching." - ) - raise RuntimeError(f"IPC lock file not found: {self._lock_path}") from e - - def acquire(self) -> None: - """Acquire the lock (blocking). Uses kernel-level flock for atomicity.""" - fcntl.flock(self._fd, fcntl.LOCK_EX) - - def release(self) -> None: - """Release the lock.""" - fcntl.flock(self._fd, fcntl.LOCK_UN) - - def clear(self) -> None: - """Close the file descriptor and remove the lock file.""" - os.close(self._fd) - try: - os.unlink(self._lock_path) - except FileNotFoundError: - pass diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 3aab9f1203e..64430414aba 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -55,7 +55,6 @@ from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue from fastdeploy.inter_communicator import ( ExistTaskStatus, - IPCLock, IPCSignal, ModelWeightsStatus, RearrangeExpertStatus, @@ -285,14 +284,6 @@ def init_health_status(self) -> None: create=False, ) - # gpu_cache_lock: file-based lock for mutual exclusion between worker - # and CPU transfer when accessing GPU KV cache. - self.gpu_cache_lock = IPCLock( - name="gpu_cache_lock", - suffix=self.parallel_config.local_engine_worker_queue_port, - create=False, - ) - def update_weights_from_tensor(self, mmap_infos): """ update_weights_from_tensor @@ -435,35 +426,6 @@ def _run_eplb(self, tp_rank): self.rearrange_experts_signal.value[0] = RearrangeExpertStatus.DONE.value logger.info("redundant_expert: done") - def _acquire_kvcache_lock(self, tp_rank): - """Acquire the GPU KV cache lock for the worker process. - - Uses a file-based lock (fcntl.flock) to ensure mutual exclusion - between the worker and the CPU transfer process during model - execution. Only rank 0 acquires the lock to avoid deadlock among - tensor-parallel workers. - - Args: - tp_rank: Tensor parallel rank of the current worker. Only rank 0 - acquires the lock. - """ - if not envs.FD_USE_KVCACHE_LOCK: - return - if tp_rank == 0: - self.gpu_cache_lock.acquire() - - def _release_kvcache_lock(self, tp_rank): - """Release the GPU KV cache lock held by the worker process. - - Args: - tp_rank: Tensor parallel rank of the current worker. Only rank 0 - releases the lock. - """ - if not envs.FD_USE_KVCACHE_LOCK: - return - if tp_rank == 0: - self.gpu_cache_lock.release() - def event_loop_normal(self) -> None: """Main event loop for Paddle Distributed Workers. TODO(gongshaotian): support remote calling of functions that control worker. @@ -633,9 +595,7 @@ def event_loop_normal(self) -> None: # These generated tokens can be obtained through get_output op. start_execute_time = time.time() - self._acquire_kvcache_lock(tp_rank) self.worker.execute_model(req_dicts, max_occupied_batch_index) - self._release_kvcache_lock(tp_rank) # Only v0 use this signal if not envs.ENABLE_V1_KVCACHE_SCHEDULER: