Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions fastdeploy/cache_manager/cache_messager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import json
import math
import queue
import sys
import tempfile
import threading
import time
import traceback
Expand Down Expand Up @@ -163,7 +165,8 @@ def __init__(
if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM:
address = (pod_ip, engine_worker_queue_port)
else:
address = f"/dev/shm/fd_task_queue_{engine_worker_queue_port}.sock"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
address = f"{_shm_dir}/fd_task_queue_{engine_worker_queue_port}.sock"
self.engine_worker_queue = EngineWorkerQueue(
address=address,
is_server=False,
Expand Down Expand Up @@ -505,7 +508,8 @@ def __init__(
if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM:
address = (pod_ip, engine_worker_queue_port)
else:
address = f"/dev/shm/fd_task_queue_{engine_worker_queue_port}.sock"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
address = f"{_shm_dir}/fd_task_queue_{engine_worker_queue_port}.sock"
self.engine_worker_queue = EngineWorkerQueue(
address=address,
is_server=False,
Expand Down
6 changes: 4 additions & 2 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ def launch_cache_manager(
+ f" >{log_dir}/launch_cache_transfer_manager_{int(device_ids[i])}.log 2>&1"
)
logger.info(f"Launch cache transfer manager, command:{launch_cmd}")
cache_manager_processes.append(subprocess.Popen(launch_cmd, shell=True, preexec_fn=os.setsid))
_popen_kwargs = {} if sys.platform == "win32" else {"preexec_fn": os.setsid}
cache_manager_processes.append(subprocess.Popen(launch_cmd, shell=True, **_popen_kwargs))

logger.info("PrefixCacheManager is waiting for cache transfer manager to be initialized.")
while np.sum(self.cache_transfer_inited_signal.value) != tensor_parallel_size:
Expand Down Expand Up @@ -426,7 +427,8 @@ def launch_cache_messager(
+ f" >{log_dir}/launch_cache_messager_{i}.log 2>&1"
)
logger.info(f"Launch cache messager, command:{launch_cmd}")
cache_messager_processes.append(subprocess.Popen(launch_cmd, shell=True, preexec_fn=os.setsid))
_popen_kwargs = {} if sys.platform == "win32" else {"preexec_fn": os.setsid}
cache_messager_processes.append(subprocess.Popen(launch_cmd, shell=True, **_popen_kwargs))

logger.info("Waiting for cache ready...")
while np.sum(self.cache_ready_signal.value) != tensor_parallel_size:
Expand Down
25 changes: 18 additions & 7 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import signal
import subprocess
import sys
import tempfile
import threading
import time
import traceback
Expand Down Expand Up @@ -409,7 +410,8 @@ def start_worker_queue_service(self, start_queue):
if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM:
address = (self.cfg.master_ip, self.cfg.parallel_config.local_engine_worker_queue_port)
else:
address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.local_engine_worker_queue_port}.sock"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
address = f"{_shm_dir}/fd_task_queue_{self.cfg.parallel_config.local_engine_worker_queue_port}.sock"

if self.cfg.host_ip == self.cfg.master_ip or self.cfg.master_ip == "0.0.0.0":
if start_queue:
Expand Down Expand Up @@ -1790,8 +1792,11 @@ def _exit_sub_services(self):
if hasattr(self, "worker_proc") and self.worker_proc is not None:
self.llm_logger.info("Cleaning up worker processes...")
try:
pgid = os.getpgid(self.worker_proc.pid)
os.killpg(pgid, signal.SIGTERM)
if sys.platform != "win32":
pgid = os.getpgid(self.worker_proc.pid)
os.killpg(pgid, signal.SIGTERM)
else:
self.worker_proc.terminate()
except Exception as e:
self.llm_logger.error(f"Error extracting sub services: {e}, {str(traceback.format_exc())}")

Expand All @@ -1803,8 +1808,11 @@ def _exit_sub_services(self):
for p in self.cache_manager_processes:
self.llm_logger.info(f"Killing cache manager process {p.pid}")
try:
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
if sys.platform != "win32":
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
else:
p.terminate()
except Exception as e:
self.llm_logger.error(
f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}"
Expand Down Expand Up @@ -2118,7 +2126,7 @@ def _start_worker_service(self):
pd_cmd,
stdout=subprocess.PIPE,
shell=True,
preexec_fn=os.setsid,
**({} if sys.platform == "win32" else {"preexec_fn": os.setsid}),
)
return p

Expand Down Expand Up @@ -2182,7 +2190,10 @@ def launch_components(self):
int(self.cfg.parallel_config.engine_worker_queue_port[i]),
)
else:
address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
address = (
f"{_shm_dir}/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock"
)

self.llm_logger.info(f"dp start queue service {address}")
self.dp_engine_worker_queue_server.append(
Expand Down
24 changes: 17 additions & 7 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import signal
import subprocess
import sys
import tempfile
import threading
import time
import traceback
Expand Down Expand Up @@ -432,8 +433,11 @@ def _exit_sub_services(self):
for p in self.cache_manager_processes:
llm_logger.info(f"Killing cache manager process {p.pid}")
try:
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
if sys.platform != "win32":
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
else:
p.terminate()
except Exception as e:
console_logger.error(
f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}"
Expand All @@ -446,8 +450,11 @@ def _exit_sub_services(self):

if hasattr(self, "worker_proc") and self.worker_proc is not None:
try:
pgid = os.getpgid(self.worker_proc.pid)
os.killpg(pgid, signal.SIGTERM)
if sys.platform != "win32":
pgid = os.getpgid(self.worker_proc.pid)
os.killpg(pgid, signal.SIGTERM)
else:
self.worker_proc.terminate()
except Exception as e:
console_logger.error(f"Error extracting sub services: {e}, {str(traceback.format_exc())}")

Expand Down Expand Up @@ -671,7 +678,7 @@ def _start_worker_service(self):
pd_cmd,
stdout=subprocess.PIPE,
shell=True,
preexec_fn=os.setsid,
**({} if sys.platform == "win32" else {"preexec_fn": os.setsid}),
)
return p

Expand Down Expand Up @@ -805,7 +812,10 @@ def launch_components(self):
int(self.cfg.parallel_config.engine_worker_queue_port[i]),
)
else:
address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
address = (
f"{_shm_dir}/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock"
)

llm_logger.info(f"dp start queue service {address}")
self.dp_engine_worker_queue_server.append(
Expand All @@ -816,7 +826,7 @@ def launch_components(self):
local_data_parallel_size=self.cfg.parallel_config.data_parallel_size,
)
)
ctx = multiprocessing.get_context("fork")
ctx = multiprocessing.get_context("spawn" if sys.platform == "win32" else "fork")
cfg = copy.deepcopy(self.cfg)
self.dp_processed.append(
ctx.Process(
Expand Down
8 changes: 6 additions & 2 deletions fastdeploy/engine/expert_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import os
import signal
import sys
import threading
import time
import traceback
Expand Down Expand Up @@ -201,8 +202,11 @@ def _exit_sub_services(self):
for p in self.cache_manager_processes:
self.llm_logger.info(f"Killing cache manager process {p.pid}")
try:
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
if sys.platform != "win32":
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
else:
p.terminate()
except Exception as e:
console_logger.error(
f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}"
Expand Down
7 changes: 5 additions & 2 deletions fastdeploy/eplb/async_expert_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import ctypes
import os
import sys
import tempfile
import time
import traceback
from typing import List, Tuple
Expand Down Expand Up @@ -61,6 +63,7 @@
MAIN_MODEL_REDUNDANT_SHM_SIZE = 5

MODEL_MAIN_NAME = "eplb_main"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()


def create_mmap(model_name: List, ep_rank: int, ep_size: int, shm_uuid: str, eplb_config: EPLBConfig, logger=None):
Expand All @@ -77,7 +80,7 @@ def create_mmap(model_name: List, ep_rank: int, ep_size: int, shm_uuid: str, epl

mmap_infos = {}
for name in model_name:
expert_weight_file = f"/dev/shm/{name}_rank_{ep_rank}_expert_weight_{shm_uuid}"
expert_weight_file = f"{_shm_dir}/{name}_rank_{ep_rank}_expert_weight_{shm_uuid}"
shm_size = main_size

if not os.path.isfile(expert_weight_file):
Expand Down Expand Up @@ -420,7 +423,7 @@ def load_model_weights_process(
)
if success:
model_name = MODEL_MAIN_NAME
file_path = f"/dev/shm/{model_name}_rank_{rank}_expert_weight_{shm_uuid}"
file_path = f"{_shm_dir}/{model_name}_rank_{rank}_expert_weight_{shm_uuid}"
weight_infos = save_tensor_to_shm_mem(ep_loader.cached_weights, file_path, logger)
logger.info(
"redundant_expert: async load save_tensor_to_shm_mem, "
Expand Down
4 changes: 3 additions & 1 deletion fastdeploy/inter_communicator/fmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import asyncio
import json
import sys
import tempfile
import time
import uuid
from dataclasses import dataclass, field
Expand Down Expand Up @@ -78,7 +80,7 @@ class Endpoint:

@dataclass
class Config:
ipc_root: str = "/dev/shm"
ipc_root: str = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
io_threads: int = 1
copy: bool = False
endpoints: Dict[str, Endpoint] = field(default_factory=dict)
Expand Down
26 changes: 22 additions & 4 deletions fastdeploy/inter_communicator/ipc_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
# limitations under the License.
"""

import fcntl
import os
import sys
from multiprocessing.shared_memory import SharedMemory

if sys.platform != "win32":
import fcntl

import numpy as np

from fastdeploy.utils import llm_logger
Expand Down Expand Up @@ -137,7 +140,12 @@ def __init__(self, name: str, suffix: int = None, create: bool = True) -> None:
if suffix is not None:
name = f"{name}.{suffix}"

lock_dir = "/dev/shm" if os.path.isdir("/dev/shm") else "/tmp"
if sys.platform == "win32":
import tempfile

lock_dir = tempfile.gettempdir()
else:
lock_dir = "/dev/shm" if os.path.isdir("/dev/shm") else "/tmp"
self._lock_path = os.path.join(lock_dir, f"fd_lock_{name}")

if create:
Expand All @@ -158,11 +166,21 @@ def __init__(self, name: str, suffix: int = None, create: bool = True) -> None:

def acquire(self) -> None:
"""Acquire the lock (blocking). Uses kernel-level flock for atomicity."""
fcntl.flock(self._fd, fcntl.LOCK_EX)
if sys.platform == "win32":
import msvcrt

msvcrt.locking(self._fd, msvcrt.LK_LOCK, 1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug Windows 文件锁实现不正确。

msvcrt.locking(fd, mode, nbytes) 的第三个参数 nbytes 表示要锁定/解锁的字节数。当前代码设置为 1,意味着只锁定文件的第 1 个字节。

这会导致在多进程环境下,多个进程可能同时访问同一个锁文件的不同部分,破坏互斥性。

建议修复:

使用 os.lseek() 获取文件大小,然后锁定整个文件:

def acquire(self) -> None:
    """Acquire the lock (blocking). Uses kernel-level flock for atomicity."""
    if sys.platform == "win32":
        import msvcrt
        # Move to start of file
        os.lseek(self._fd, 0, os.SEEK_SET)
        # Lock the entire file (get file size)
        file_size = os.path.getsize(self._lock_path)
        msvcrt.locking(self._fd, msvcrt.LK_LOCK, file_size)
    else:
        fcntl.flock(self._fd, fcntl.LOCK_EX)

def release(self) -> None:
    """Release the lock."""
    if sys.platform == "win32":
        import msvcrt
        # Move to start of file
        os.lseek(self._fd, 0, os.SEEK_SET)
        # Unlock the entire file
        file_size = os.path.getsize(self._lock_path)
        msvcrt.locking(self._fd, msvcrt.LK_UNLCK, file_size)
    else:
        fcntl.flock(self._fd, fcntl.LOCK_UN)

或者,更简单的方法是使用固定大小(如 1 字节),但确保所有进程都锁定相同位置(从文件开头开始)。

else:
fcntl.flock(self._fd, fcntl.LOCK_EX)

def release(self) -> None:
"""Release the lock."""
fcntl.flock(self._fd, fcntl.LOCK_UN)
if sys.platform == "win32":
import msvcrt

msvcrt.locking(self._fd, msvcrt.LK_UNLCK, 1)
else:
fcntl.flock(self._fd, fcntl.LOCK_UN)

def clear(self) -> None:
"""Close the file descriptor and remove the lock file."""
Expand Down
5 changes: 4 additions & 1 deletion fastdeploy/inter_communicator/zmq_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
"""

import sys
import tempfile
import time
from abc import ABC, abstractmethod
from multiprocessing.reduction import ForkingPickler
Expand Down Expand Up @@ -152,7 +154,8 @@ class ZmqIpcClient(ZmqClientBase):
def __init__(self, name, mode):
self.name = name
self.mode = mode
self.file_name = f"/dev/shm/{name}.socket"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
self.file_name = f"{_shm_dir}/{name}.socket"
self.context = zmq.Context()
self.socket = self.context.socket(self.mode)

Expand Down
10 changes: 7 additions & 3 deletions fastdeploy/inter_communicator/zmq_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"""

import os
import sys
import tempfile
import threading
import time
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -435,10 +437,11 @@ def __init__(self, name, mode):
self.file_name = None
return

_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
if mode == zmq.PULL:
self.file_name = f"/dev/shm/{name}.socket"
self.file_name = f"{_shm_dir}/{name}.socket"
elif mode == zmq.ROUTER:
self.file_name = f"/dev/shm/router_{name}.ipc"
self.file_name = f"{_shm_dir}/router_{name}.ipc"
else:
raise ValueError(f"Unsupported ZMQ mode: {mode}")
self._create_socket()
Expand All @@ -461,7 +464,8 @@ def _get_worker_push_socket(self, worker_pid):
sock = self.context.socket(zmq.PUSH)
sock.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM)
sock.setsockopt(zmq.SNDTIMEO, -1)
address = f"ipc:///dev/shm/response_{self.push_name_prefix}_w{worker_pid}.pull"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
address = f"ipc://{_shm_dir}/response_{self.push_name_prefix}_w{worker_pid}.pull"
sock.connect(address)
self.worker_push_sockets[worker_pid] = sock
self.worker_push_addresses[worker_pid] = address
Expand Down
5 changes: 4 additions & 1 deletion fastdeploy/worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import asyncio
import json
import os
import sys
import tempfile
import time
import traceback
from typing import Tuple
Expand Down Expand Up @@ -710,7 +712,8 @@ def start_task_queue_service(self):
self.parallel_config.local_engine_worker_queue_port,
)
else:
task_address = f"/dev/shm/fd_task_queue_{self.parallel_config.local_engine_worker_queue_port}.sock"
_shm_dir = "/dev/shm" if sys.platform != "win32" else tempfile.gettempdir()
task_address = f"{_shm_dir}/fd_task_queue_{self.parallel_config.local_engine_worker_queue_port}.sock"
logger.info(f"connect task queue address {task_address}")
self.task_queue = TaskQueue(
address=task_address,
Expand Down
Loading