-
Notifications
You must be signed in to change notification settings - Fork 736
[BugFix] Remove ipc lock to avoid nan #7312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -552,24 +552,6 @@ def recycle_cpu_blocks(self, cpu_block_ids): | |
| else: | ||
| heapq.heappush(self.cpu_free_block_list, cpu_block_ids) | ||
|
|
||
| def _acquire_kvcache_lock(self): | ||
| """Acquire the GPU KV cache lock for the transfer process. | ||
|
|
||
| Uses a file-based lock (fcntl.flock) to ensure mutual exclusion | ||
| between the worker and the CPU transfer process. This prevents | ||
| concurrent GPU KV cache access which may cause NaN errors under | ||
| certain DP+EP configurations. | ||
| """ | ||
| if not envs.FD_USE_KVCACHE_LOCK: | ||
| return | ||
| self.gpu_cache_lock.acquire() | ||
|
|
||
| def _release_kvcache_lock(self): | ||
| """Release the GPU KV cache lock held by the transfer process.""" | ||
| if not envs.FD_USE_KVCACHE_LOCK: | ||
| return | ||
| self.gpu_cache_lock.release() | ||
|
|
||
| def issue_swap_task( | ||
| self, | ||
| transfer_task_id, | ||
|
|
@@ -590,14 +572,12 @@ def issue_swap_task( | |
| is_sync: bool, whether to wait for the result of the swap task | ||
| """ | ||
| assert is_sync, "Only support is sync for swap_task now." | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 建议 缺少单元测试 移除锁机制会影响
相关测试文件: |
||
| self._acquire_kvcache_lock() | ||
| self.task_swapping_event[transfer_task_id] = Event() | ||
| self.cache_task_queue.put_transfer_task( | ||
| (event_type, transfer_task_id, swap_node_ids, gpu_block_ids, cpu_block_ids) | ||
| ) | ||
| if is_sync: | ||
|
Comment on lines
575
to
579
|
||
| self.sync_swap_task(transfer_task_id) | ||
| self._release_kvcache_lock() | ||
|
|
||
| def sync_swap_task(self, transfer_task_id): | ||
| """ | ||
|
|
@@ -1143,20 +1123,17 @@ def issue_write_back_storage_task(self, task: WriteStorageTask, is_sync=True): | |
| if self.kvcache_storage_backend is None: | ||
| return | ||
|
|
||
| assert is_sync, "Only support is_sync=True for now." | ||
| self._acquire_kvcache_lock() | ||
| try: | ||
| if len(task.keys) != len(task.gpu_block_ids): | ||
| err_msg = f"write_back_storage error: hash_keys({len(task.keys)}) != gpu_block_ids({len(task.gpu_block_ids)})" | ||
| logger.error(err_msg) | ||
| raise ValueError(err_msg) | ||
|
|
||
| self.task_write_back_event[task.task_id] = Event() | ||
| self.cache_task_queue.put_transfer_task((CacheStatus.GPU2STORAGE, task)) | ||
| if is_sync: | ||
| self.wait_write_storage_task(task.task_id) | ||
| finally: | ||
| self._release_kvcache_lock() | ||
| if len(task.keys) != len(task.gpu_block_ids): | ||
| err_msg = ( | ||
| f"write_back_storage error: hash_keys({len(task.keys)}) != gpu_block_ids({len(task.gpu_block_ids)})" | ||
| ) | ||
| logger.error(err_msg) | ||
| raise ValueError(err_msg) | ||
|
|
||
| self.task_write_back_event[task.task_id] = Event() | ||
| self.cache_task_queue.put_transfer_task((CacheStatus.GPU2STORAGE, task)) | ||
| if is_sync: | ||
|
Comment on lines
+1132
to
+1135
|
||
| self.wait_write_storage_task(task.task_id) | ||
|
|
||
| def wait_write_storage_task(self, req_id): | ||
| """ | ||
|
|
@@ -1173,18 +1150,13 @@ def issue_prefetch_storage_task(self, task: ReadStorageTask, is_sync=True): | |
| if self.kvcache_storage_backend is None: | ||
| return [] | ||
|
|
||
| assert is_sync, "Only support is_sync=True for now." | ||
| self._acquire_kvcache_lock() | ||
| storage_block_ids = [] | ||
| self.task_prefetch_event[task.task_id] = Event() | ||
| # issue task to cache_transfer_manager | ||
| self.cache_task_queue.put_transfer_task((CacheStatus.STORAGE2GPU, task)) | ||
| if is_sync: | ||
|
Comment on lines
+1153
to
+1157
|
||
| storage_block_ids = self.wait_prefetch_storage_task(task.task_id) | ||
|
|
||
| try: | ||
| storage_block_ids = [] | ||
| self.task_prefetch_event[task.task_id] = Event() | ||
| # issue task to cache_transfer_manager | ||
| self.cache_task_queue.put_transfer_task((CacheStatus.STORAGE2GPU, task)) | ||
| if is_sync: | ||
| storage_block_ids = self.wait_prefetch_storage_task(task.task_id) | ||
| finally: | ||
| self._release_kvcache_lock() | ||
| return storage_block_ids | ||
|
|
||
| def wait_prefetch_storage_task(self, req_id): | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -217,12 +217,6 @@ | |||||||||
| "FILE_BACKEND_STORAGE_DIR": lambda: str(os.getenv("FILE_BACKEND_STORAGE_DIR", "/tmp/fastdeploy")), | ||||||||||
|
||||||||||
| "FILE_BACKEND_STORAGE_DIR": lambda: str(os.getenv("FILE_BACKEND_STORAGE_DIR", "/tmp/fastdeploy")), | |
| "FILE_BACKEND_STORAGE_DIR": lambda: str(os.getenv("FILE_BACKEND_STORAGE_DIR", "/tmp/fastdeploy")), | |
| # Deprecated: keep this env readable for backward compatibility even if it is no longer used. | |
| "FD_USE_KVCACHE_LOCK": lambda: bool(int(os.getenv("FD_USE_KVCACHE_LOCK", "0"))), |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,7 @@ | |
|
|
||
| from .engine_cache_queue import EngineCacheQueue | ||
| from .engine_worker_queue import EngineWorkerQueue | ||
| from .ipc_signal import IPCLock, IPCSignal, shared_memory_exists | ||
| from .ipc_signal import IPCSignal, shared_memory_exists | ||
| from .ipc_signal_const import ( | ||
|
Comment on lines
17
to
20
|
||
| ExistTaskStatus, | ||
| KVCacheStatus, | ||
|
|
@@ -31,7 +31,6 @@ | |
| "ZmqIpcClient", | ||
| "ZmqIpcServer", | ||
| "ZmqTcpServer", | ||
| "IPCLock", | ||
| "IPCSignal", | ||
| "EngineWorkerQueue", | ||
| "EngineCacheQueue", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,7 +55,6 @@ | |
| from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue | ||
| from fastdeploy.inter_communicator import ( | ||
| ExistTaskStatus, | ||
| IPCLock, | ||
| IPCSignal, | ||
| ModelWeightsStatus, | ||
| RearrangeExpertStatus, | ||
|
|
@@ -285,14 +284,6 @@ def init_health_status(self) -> None: | |
| create=False, | ||
| ) | ||
|
|
||
| # gpu_cache_lock: file-based lock for mutual exclusion between worker | ||
| # and CPU transfer when accessing GPU KV cache. | ||
| self.gpu_cache_lock = IPCLock( | ||
| name="gpu_cache_lock", | ||
| suffix=self.parallel_config.local_engine_worker_queue_port, | ||
| create=False, | ||
| ) | ||
|
|
||
| def update_weights_from_tensor(self, mmap_infos): | ||
| """ | ||
| update_weights_from_tensor | ||
|
|
@@ -435,35 +426,6 @@ def _run_eplb(self, tp_rank): | |
| self.rearrange_experts_signal.value[0] = RearrangeExpertStatus.DONE.value | ||
| logger.info("redundant_expert: done") | ||
|
|
||
| def _acquire_kvcache_lock(self, tp_rank): | ||
| """Acquire the GPU KV cache lock for the worker process. | ||
|
|
||
| Uses a file-based lock (fcntl.flock) to ensure mutual exclusion | ||
| between the worker and the CPU transfer process during model | ||
| execution. Only rank 0 acquires the lock to avoid deadlock among | ||
| tensor-parallel workers. | ||
|
|
||
| Args: | ||
| tp_rank: Tensor parallel rank of the current worker. Only rank 0 | ||
| acquires the lock. | ||
| """ | ||
| if not envs.FD_USE_KVCACHE_LOCK: | ||
| return | ||
| if tp_rank == 0: | ||
| self.gpu_cache_lock.acquire() | ||
|
|
||
| def _release_kvcache_lock(self, tp_rank): | ||
| """Release the GPU KV cache lock held by the worker process. | ||
|
|
||
| Args: | ||
| tp_rank: Tensor parallel rank of the current worker. Only rank 0 | ||
| releases the lock. | ||
| """ | ||
| if not envs.FD_USE_KVCACHE_LOCK: | ||
| return | ||
| if tp_rank == 0: | ||
| self.gpu_cache_lock.release() | ||
|
|
||
| def event_loop_normal(self) -> None: | ||
| """Main event loop for Paddle Distributed Workers. | ||
| TODO(gongshaotian): support remote calling of functions that control worker. | ||
|
|
@@ -633,9 +595,7 @@ def event_loop_normal(self) -> None: | |
| # These generated tokens can be obtained through get_output op. | ||
| start_execute_time = time.time() | ||
|
|
||
| self._acquire_kvcache_lock(tp_rank) | ||
| self.worker.execute_model(req_dicts, max_occupied_batch_index) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Bug 潜在的并发安全问题 移除
建议:请提供并发场景下的验证结果或说明现有的同步机制 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 建议 缺少性能影响说明 移除锁机制可能会影响性能,建议说明:
|
||
| self._release_kvcache_lock(tp_rank) | ||
|
|
||
| # Only v0 use this signal | ||
| if not envs.ENABLE_V1_KVCACHE_SCHEDULER: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Bug 缺少充分的解释和测试验证
移除 IPCLock 机制是一个重大的变更,但 PR 描述中没有说明:
建议: