Skip to content
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/models/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

class VolumeStatus(str, Enum):
SUBMITTED = "submitted"
# PROVISIONING is currently not used since on all backends supporting volumes,
# volumes become ACTIVE (ready to be used) almost immediately after provisioning.
PROVISIONING = "provisioning"
ACTIVE = "active"
FAILED = "failed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dstack._internal.server.background.pipeline_tasks.placement_groups import (
PlacementGroupPipeline,
)
from dstack._internal.server.background.pipeline_tasks.volumes import VolumePipeline
from dstack._internal.utils.logging import get_logger

logger = get_logger(__name__)
Expand All @@ -17,6 +18,7 @@ def __init__(self) -> None:
ComputeGroupPipeline(),
GatewayPipeline(),
PlacementGroupPipeline(),
VolumePipeline(),
]
self._hinter = PipelineHinter(self._pipelines)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dstack._internal.server.services.compute_groups import compute_group_model_to_compute_group
from dstack._internal.server.services.instances import emit_instance_status_change_event
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.utils import sentry_utils
from dstack._internal.utils.common import get_current_datetime, run_async
from dstack._internal.utils.logging import get_logger

Expand Down Expand Up @@ -107,6 +108,7 @@ def __init__(
queue_check_delay=queue_check_delay,
)

@sentry_utils.instrument_named_task("pipeline_tasks.ComputeGroupFetcher.fetch")
async def fetch(self, limit: int) -> list[PipelineItem]:
compute_group_lock, _ = get_locker(get_db().dialect_name).get_lockset(
ComputeGroupModel.__tablename__
Expand Down Expand Up @@ -172,6 +174,7 @@ def __init__(
heartbeater=heartbeater,
)

@sentry_utils.instrument_named_task("pipeline_tasks.ComputeGroupWorker.process")
async def process(self, item: PipelineItem):
async with get_session_ctx() as session:
res = await session.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from dstack._internal.server.services.gateways.pool import gateway_connections_pool
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.services.logging import fmt
from dstack._internal.server.utils import sentry_utils
from dstack._internal.utils.common import get_current_datetime, run_async
from dstack._internal.utils.logging import get_logger

Expand Down Expand Up @@ -118,6 +119,7 @@ def __init__(
queue_check_delay=queue_check_delay,
)

@sentry_utils.instrument_named_task("pipeline_tasks.GatewayFetcher.fetch")
async def fetch(self, limit: int) -> list[GatewayPipelineItem]:
gateway_lock, _ = get_locker(get_db().dialect_name).get_lockset(GatewayModel.__tablename__)
async with gateway_lock:
Expand Down Expand Up @@ -193,6 +195,7 @@ def __init__(
heartbeater=heartbeater,
)

@sentry_utils.instrument_named_task("pipeline_tasks.GatewayWorker.process")
async def process(self, item: GatewayPipelineItem):
if item.to_be_deleted:
await _process_to_be_deleted_item(item)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dstack._internal.server.services import backends as backends_services
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.services.placement import placement_group_model_to_placement_group
from dstack._internal.server.utils import sentry_utils
from dstack._internal.utils.common import get_current_datetime, run_async
from dstack._internal.utils.logging import get_logger

Expand Down Expand Up @@ -103,6 +104,7 @@ def __init__(
queue_check_delay=queue_check_delay,
)

@sentry_utils.instrument_named_task("pipeline_tasks.PlacementGroupFetcher.fetch")
async def fetch(self, limit: int) -> list[PipelineItem]:
placement_group_lock, _ = get_locker(get_db().dialect_name).get_lockset(
PlacementGroupModel.__tablename__
Expand Down Expand Up @@ -170,6 +172,7 @@ def __init__(
heartbeater=heartbeater,
)

@sentry_utils.instrument_named_task("pipeline_tasks.PlacementGroupWorker.process")
async def process(self, item: PipelineItem):
async with get_session_ctx() as session:
res = await session.execute(
Expand Down Expand Up @@ -230,6 +233,7 @@ async def _delete_placement_group(placement_group_model: PlacementGroupModel) ->
backend_type=placement_group.provisioning_data.backend,
)
if backend is None:
# TODO: Retry deletion
logger.error(
"Failed to delete placement group %s. Backend not available. Please delete it manually.",
placement_group.name,
Expand All @@ -245,6 +249,7 @@ async def _delete_placement_group(placement_group_model: PlacementGroupModel) ->
)
return {}
except Exception:
# TODO: Retry deletion
logger.exception(
"Got exception when deleting placement group %s. Please delete it manually.",
placement_group.name,
Expand Down
Loading