# coding: utf-8
"""
Deploy and manage model serving applications.
"""
from __future__ import annotations
import os
import time
from dataclasses import asdict
from pathlib import Path
from typing import Any, Dict, Literal, Optional, Tuple, Union
import supervisely.io.env as env
from supervisely._utils import get_valid_kwargs
from supervisely.api.api import Api
from supervisely.api.nn.utils import (
find_apps_by_framework,
find_team_by_path,
get_artifacts_dir_and_checkpoint_name,
)
from supervisely.io.fs import get_file_name_with_ext
from supervisely.nn.experiments import ExperimentInfo
from supervisely.nn.utils import RuntimeType
from supervisely.sly_logger import logger
def get_runtime(runtime: Optional[str] = None):
"""
Normalize runtime name/alias to :class:`~supervisely.nn.utils.RuntimeType`.
:param runtime: Runtime string or alias (e.g. ``'onnx'``, ``'onnxruntime'``, ``'tensorrt'``).
:type runtime: str, optional
:returns: Runtime enum value or None.
:rtype: :class:`~supervisely.nn.utils.RuntimeType` or None
:raises ValueError: If runtime is not supported.
"""
from supervisely.nn.utils import RuntimeType
if runtime is None:
return None
aliases = {
str(RuntimeType.PYTORCH): RuntimeType.PYTORCH,
str(RuntimeType.ONNXRUNTIME): RuntimeType.ONNXRUNTIME,
str(RuntimeType.TENSORRT): RuntimeType.TENSORRT,
"pytorch": RuntimeType.PYTORCH,
"torch": RuntimeType.PYTORCH,
"pt": RuntimeType.PYTORCH,
"onnxruntime": RuntimeType.ONNXRUNTIME,
"onnx": RuntimeType.ONNXRUNTIME,
"tensorrt": RuntimeType.TENSORRT,
"trt": RuntimeType.TENSORRT,
"engine": RuntimeType.TENSORRT,
}
if runtime in aliases:
return aliases[runtime]
runtime = aliases.get(runtime.lower(), None)
if runtime is None:
raise ValueError(
f"Runtime '{runtime}' is not supported. Supported runtimes are: {', '.join(aliases.keys())}"
)
return runtime
[docs]
class DeployApi:
"""
API for deploying models and controlling serving apps.
This class is used internally by :class:`~supervisely.api.nn.neural_network_api.NeuralNetworkApi`,
but it can also be used directly for advanced deployment workflows.
Key capabilities:
- deploy a pretrained model into a new serving task,
- deploy a custom checkpoint (from team files or experiment artifacts),
- load/replace model inside an existing serving task.
"""
def __init__(self, api: "Api"):
"""
:param api: :class:`~supervisely.api.api.Api` object to use for API connection.
:type api: :class:`~supervisely.api.api.Api`
"""
self._api = api
[docs]
def load_pretrained_model(
self,
session_id: int,
model_name: str,
device: Optional[str] = None,
runtime: str = None,
):
"""
Load a pretrained model in running serving App.
:param session_id: Task ID of the serving App.
:type session_id: int
:param model_name: Model name to deploy.
:type model_name: str
:param device: Device string. If not provided, will be chosen automatically.
:type device: Optional[str]
:param runtime: Runtime string, if not present will be defined automatically.
:type runtime: Optional[str]
"""
from supervisely.nn.utils import ModelSource
runtime = get_runtime(runtime)
deploy_params = {}
deploy_params["model_source"] = ModelSource.PRETRAINED
deploy_params["device"] = device
deploy_params["runtime"] = runtime
self._load_model_from_api(session_id, deploy_params, model_name=model_name)
[docs]
def load_custom_model(
self,
session_id: int,
team_id: int,
artifacts_dir: str,
checkpoint_name: Optional[str] = None,
device: Optional[str] = None,
runtime: str = None,
):
"""
Load a custom model in running serving App.
:param session_id: Task ID of the serving App.
:type session_id: int
:param team_id: Team ID in Supervisely.
:type team_id: int
:param artifacts_dir: Path to the artifacts directory in the team fies.
:type artifacts_dir: str
:param checkpoint_name: Checkpoint name (with file extension) to deploy, e.g. "best.pt".
If not provided, checkpoint will be chosen automatically, depending on the app version.
:type checkpoint_name: Optional[str]
:param device: Device string. If not provided, will be chosen automatically.
:type device: Optional[str]
:param runtime: Runtime string, if not present will be defined automatically.
:type runtime: Optional[str]
"""
from supervisely.nn.utils import ModelSource
runtime = get_runtime(runtime)
# Train V1 logic (if artifacts_dir does not start with '/experiments')
if not artifacts_dir.startswith("/experiments"):
logger.debug("Deploying model from Train V1 artifacts")
_, _, deploy_params = self._deploy_params_v1(
team_id, artifacts_dir, checkpoint_name, device, runtime, with_module=False
)
else: # Train V2 logic (when artifacts_dir starts with '/experiments')
logger.debug("Deploying model from Train V2 artifacts")
_, _, deploy_params = self._deploy_params_v2(
team_id, artifacts_dir, checkpoint_name, device, runtime, with_module=False
)
deploy_params["model_source"] = ModelSource.CUSTOM
self._load_model_from_api(session_id, deploy_params)
[docs]
def load_custom_model_from_experiment_info(
self,
session_id: int,
experiment_info: "ExperimentInfo",
checkpoint_name: Optional[str] = None,
device: Optional[str] = None,
runtime: str = None,
):
"""
Load a custom model in running serving App based on the training session.
:param session_id: Task ID of the serving App.
:type session_id: int
:param experiment_info: Experiment info from :class:`~supervisely.nn.training.train_app.TrainApp`.
:type experiment_info: :class:`~supervisely.nn.experiments.ExperimentInfo`
:param checkpoint_name: Checkpoint name (with file extension) to deploy, e.g. "best.pt".
If not provided, checkpoint will be chosen automatically, depending on the app version.
:type checkpoint_name: Optional[str]
:param device: Device string. If not provided, will be chosen automatically.
:type device: Optional[str]
:param runtime: Runtime string, if not present will be defined automatically.
:type runtime: Optional[str]
"""
from supervisely.nn.utils import ModelSource
runtime = get_runtime(runtime)
if checkpoint_name is None:
checkpoint_name = experiment_info.best_checkpoint
deploy_params = {
"device": device,
"model_source": ModelSource.CUSTOM,
"model_files": {
key: Path(experiment_info.artifacts_dir, value).as_posix()
for key, value in experiment_info.model_files.items()
},
"model_info": experiment_info.to_json(),
"runtime": runtime,
}
deploy_params["model_files"]["checkpoint"] = Path(
experiment_info.artifacts_dir, "checkpoints", checkpoint_name
).as_posix()
self._load_model_from_api(session_id, deploy_params)
def _find_agent(self, team_id: int = None, public=True, gpu=True):
"""
Find an agent in Supervisely with most available memory.
:param team_id: Team ID. If not provided, will be taken from the current context.
:type team_id: Optional[int]
:param public: If True, can find a public agent.
:type public: bool
:param gpu: If True, find an agent with GPU.
:type gpu: bool
:returns: Agent ID
:rtype: int
"""
if team_id is None:
team_id = env.team_id()
agents = self._api.agent.get_list_available(team_id, show_public=public, has_gpu=gpu)
if len(agents) == 0:
raise ValueError("No available agents found.")
agent_id_memory_map = {}
kubernetes_agents = []
for agent in agents:
if agent.type == "sly_agent":
# No multi-gpu support, always take the first one
agent_id_memory_map[agent.id] = agent.gpu_info["device_memory"][0]["available"]
elif agent.type == "kubernetes":
kubernetes_agents.append(agent.id)
if len(agent_id_memory_map) > 0:
return max(agent_id_memory_map, key=agent_id_memory_map.get)
if len(kubernetes_agents) > 0:
return kubernetes_agents[0]
[docs]
def deploy_pretrained_model(
self,
framework: Union[str, int],
model_name: str,
device: Optional[str] = None,
runtime: str = None,
workspace_id: int = None,
agent_id: Optional[int] = None,
app: Union[str, int] = None,
**kwargs,
) -> Dict[str, Any]:
"""
Deploy a pretrained model.
:param framework: Framework name or Framework ID in Supervisely.
:type framework: Union[str, int]
:param model_name: Model name to deploy.
:type model_name: str
:param device: Device string. If not provided, will be chosen automatically.
:type device: Optional[str]
:param runtime: Runtime string, if not present will be defined automatically.
:type runtime: Optional[str]
:param workspace_id: Workspace ID where the app will be deployed. If not provided, will be taken from the current context.
:type workspace_id: Optional[int]
:param agent_id: Agent ID. If not provided, will be found automatically.
:type agent_id: Optional[int]
:param app: App name or App module ID in Supervisely.
:type app: Union[str, int]
:param kwargs: Additional parameters to start the task. See :meth:`~supervisely.api.task_api.TaskApi.start` for more details.
:type kwargs: Dict[str, Any]
:returns: Task Info
:rtype: Dict[str, Any]
:raises ValueError: if no serving apps found for the app name or multiple serving apps found for the app name.
"""
from supervisely.nn.artifacts import (
RITM,
RTDETR,
Detectron2,
MMClassification,
MMDetection,
MMDetection3,
MMPretrain,
MMSegmentation,
UNet,
YOLOv5,
YOLOv5v2,
YOLOv8,
)
from supervisely.nn.artifacts.artifacts import BaseTrainArtifacts
from supervisely.nn.utils import ModelSource
workspace_info = self._api.workspace.get_info_by_id(workspace_id)
if workspace_info is None:
raise ValueError(f"Workspace with ID {workspace_id} not found")
team_id = workspace_info.team_id
# @TODO: Fix debug logs/ Fix code
# Skip HTTPS redirect check on API init: False. ENV: False. Checked servers: set()
frameworks_v1 = {
RITM(team_id).framework_name: RITM(team_id).serve_slug,
RTDETR(team_id).framework_name: RTDETR(team_id).serve_slug,
Detectron2(team_id).framework_name: Detectron2(team_id).serve_slug,
MMClassification(team_id).framework_name: MMClassification(team_id).serve_slug,
MMPretrain(team_id).framework_name: MMPretrain(team_id).serve_slug,
MMDetection(team_id).framework_name: MMDetection(team_id).serve_slug,
MMDetection3(team_id).framework_name: MMDetection3(team_id).serve_slug,
MMSegmentation(team_id).framework_name: MMSegmentation(team_id).serve_slug,
UNet(team_id).framework_name: UNet(team_id).serve_slug,
YOLOv5(team_id).framework_name: YOLOv5(team_id).serve_slug,
YOLOv5v2(team_id).framework_name: YOLOv5v2(team_id).serve_slug,
YOLOv8(team_id).framework_name: YOLOv8(team_id).serve_slug,
}
if framework in frameworks_v1:
slug = frameworks_v1[framework]
module_id = self.find_serving_app_by_slug(slug)
else:
module_id = None
if isinstance(app, int):
module_id = app
elif isinstance(app, str):
module_id = self._api.app.find_module_id_by_app_name(app)
else:
module_id = self.find_serving_app_by_framework(framework)["id"]
if module_id is None:
raise ValueError(
f"Serving app for framework '{framework}' not found. Make sure that you used correct framework name."
)
runtime = get_runtime(runtime)
if agent_id is None:
agent_id = self._find_agent()
task_info = self._run_serve_app(agent_id, module_id, workspace_id=workspace_id, **kwargs)
self.load_pretrained_model(
task_info["id"], model_name=model_name, device=device, runtime=runtime
)
return task_info
def _find_team_by_path(self, path: str, team_id: int = None, raise_not_found=True):
return find_team_by_path(self._api, path, team_id, raise_not_found)
[docs]
def deploy_custom_model_by_checkpoint(
self,
checkpoint: str,
device: Optional[str] = None,
runtime: str = None,
timeout: int = 100,
team_id: int = None,
workspace_id: int = None,
agent_id: int = None,
**kwargs,
) -> Dict[str, Any]:
"""
Deploy a custom model based on the checkpoint path.
:param checkpoint: Path to the checkpoint in Team Files.
:type checkpoint: str
:param device: Device string. If not provided, will be chosen automatically.
:type device: Optional[str]
:param runtime: Runtime string, if not present will be defined automatically.
:type runtime: Optional[str]
:param timeout: Timeout in seconds (default is 100). The maximum time to wait for the serving app to be ready.
:type timeout: Optional[int]
:param team_id: Team ID where the artifacts are stored. If not provided, will be taken from the current context.
:type team_id: Optional[int]
:param workspace_id: Workspace ID where the app will be deployed. If not provided, will be taken from the current context.
:type workspace_id: Optional[int]
:param agent_id: Agent ID. If not provided, will be found automatically.
:type agent_id: Optional[int]
:param kwargs: Additional parameters to start the task. See :meth:`~supervisely.api.task_api.TaskApi.start` for more details.
:type kwargs: Dict[str, Any]
:returns: Task Info
:rtype: Dict[str, Any]
:raises ValueError: if validations fail.
"""
artifacts_dir, checkpoint_name = get_artifacts_dir_and_checkpoint_name(checkpoint)
return self.deploy_custom_model_by_artifacts_dir(
artifacts_dir=artifacts_dir,
checkpoint_name=checkpoint_name,
device=device,
runtime=runtime,
timeout=timeout,
team_id=team_id,
workspace_id=workspace_id,
agent_id=agent_id,
**kwargs,
)
[docs]
def deploy_custom_model_by_artifacts_dir(
self,
artifacts_dir: str,
checkpoint_name: Optional[str] = None,
device: Optional[str] = None,
runtime: str = None,
timeout: int = 100,
team_id: int = None,
workspace_id: int = None,
agent_id: int = None,
**kwargs,
) -> Dict[str, Any]:
"""
Deploy a custom model based on the artifacts directory.
:param artifacts_dir: Path to the artifacts directory in the team fies.
:type artifacts_dir: str
:param checkpoint_name: Checkpoint name (with file extension) to deploy, e.g. "best.pt".
If not provided, checkpoint will be chosen automatically, depending on the app version.
:type checkpoint_name: Optional[str]
:param device: Device string. If not provided, will be chosen automatically.
:type device: Optional[str]
:param runtime: Runtime string, if not present will be defined automatically.
:type runtime: Optional[str]
:param timeout: Timeout in seconds (default is 100). The maximum time to wait for the serving app to be ready.
:type timeout: Optional[int]
:param team_id: Team ID where the artifacts are stored. If not provided, will be taken from the current context.
:type team_id: Optional[int]
:param workspace_id: Workspace ID where the app will be deployed. If not provided, will be taken from the current context.
:type workspace_id: Optional[int]
:param agent_id: Agent ID. If not provided, will be found automatically.
:type agent_id: Optional[int]
:param kwargs: Additional parameters to start the task. See :meth:`~supervisely.api.task_api.TaskApi.start` for more details.
:type kwargs: Dict[str, Any]
:returns: Task Info
:rtype: Dict[str, Any]
:raises ValueError: if validations fail.
"""
from supervisely.nn.utils import ModelSource
if not isinstance(artifacts_dir, str) or not artifacts_dir.strip():
raise ValueError("artifacts_dir must be a non-empty string.")
runtime = get_runtime(runtime)
if team_id is None:
team_id = self._find_team_by_path(artifacts_dir, team_id=team_id)
logger.debug(
f"Starting custom model deployment. Team: {team_id}, Artifacts Dir: '{artifacts_dir}'"
)
if agent_id is None:
agent_id = self._find_agent()
# Train V1 logic (if artifacts_dir does not start with '/experiments')
if not artifacts_dir.startswith("/experiments"):
logger.debug("Deploying model from Train V1 artifacts")
module_id, serve_app_name, deploy_params = self._deploy_params_v1(
team_id, artifacts_dir, checkpoint_name, device, runtime, with_module=True
)
else: # Train V2 logic (when artifacts_dir starts with '/experiments')
logger.debug("Deploying model from Train V2 artifacts")
module_id, serve_app_name, deploy_params = self._deploy_params_v2(
team_id, artifacts_dir, checkpoint_name, device, runtime, with_module=True
)
deploy_params["model_source"] = ModelSource.CUSTOM
logger.info(
f"{serve_app_name} app deployment started. Checkpoint: '{checkpoint_name}'. Deploy params: '{deploy_params}'"
)
try:
task_info = self._run_serve_app(
agent_id, module_id, workspace_id=workspace_id, **kwargs
)
self._load_model_from_api(task_info["id"], deploy_params)
except Exception as e:
raise RuntimeError(f"Failed to run '{serve_app_name}': {e}") from e
return task_info
[docs]
def deploy_custom_model_from_experiment_info(
self,
agent_id: int,
experiment_info: "ExperimentInfo",
checkpoint_name: Optional[str] = None,
device: Optional[str] = None,
runtime: str = None,
timeout: int = 100,
**kwargs,
) -> Dict[str, Any]:
"""
Deploy a custom model based on the training session.
:param experiment_info: Experiment info from :class:`~supervisely.nn.training.train_app.TrainApp`.
:type experiment_info: :class:`~supervisely.nn.experiments.ExperimentInfo`
:param checkpoint_name: Checkpoint name (with file extension) to deploy, e.g. "best.pt".
If not provided, the best checkpoint will be chosen.
:type checkpoint_name: Optional[str]
:param device: Device string. If not provided, will be chosen automatically.
:type device: Optional[str]
:param timeout: Timeout in seconds (default is 100). The maximum time to wait for the serving app to be ready.
:type timeout: Optional[int]
:param kwargs: Additional parameters to start the task. See :meth:`~supervisely.api.task_api.TaskApi.start` for more details.
:type kwargs: Dict[str, Any]
:returns: Task Info
:rtype: Dict[str, Any]
:raises ValueError: if validations fail.
"""
task_id = experiment_info.task_id
train_task_info = self._api.task.get_info_by_id(task_id)
runtime = get_runtime(runtime)
logger.debug(f"Starting model deployment from experiment info. Task ID: '{task_id}'")
train_module_id = train_task_info["meta"]["app"]["moduleId"]
module = self.get_serving_app_by_train_app(module_id=train_module_id)
serve_app_name = module["name"]
module_id = module["id"]
logger.debug(f"Serving app detected: '{serve_app_name}'. Module ID: '{module_id}'")
if checkpoint_name is None:
checkpoint_name = experiment_info.best_checkpoint
# Task parameters
experiment_name = experiment_info.experiment_name
task_name = experiment_name + f" ({checkpoint_name})"
if "task_name" not in kwargs:
kwargs["task_name"] = task_name
description = f"""Serve from experiment
Experiment name: {experiment_name}
Evaluation report: {experiment_info.evaluation_report_link}
"""
while len(description) > 255:
description = description.rsplit("\n", 1)[0]
if "description" not in kwargs:
kwargs["description"] = description
logger.info(f"{serve_app_name} app deployment started. Checkpoint: '{checkpoint_name}'.")
try:
task_info = self._run_serve_app(agent_id, module_id, timeout=timeout, **kwargs)
self.load_custom_model_from_experiment_info(
task_info["id"], experiment_info, checkpoint_name, device, runtime
)
except Exception as e:
raise RuntimeError(f"Failed to run '{serve_app_name}': {e}") from e
return task_info
[docs]
def start_serve_app(
self, agent_id: int, app_name=None, module_id=None, **kwargs
) -> Dict[str, Any]:
"""
Run a serving app. Either app_name or module_id must be provided.
:param app_name: App name in Supervisely.
:type app_name: Optional[str]
:param module_id: Module ID in Supervisely.
:type module_id: Optional[int]
:param kwargs: Additional parameters to start the task. See :meth:`~supervisely.api.task_api.TaskApi.start` for more details.
:type kwargs: Dict[str, Any]
:returns: Task Info
:rtype: Dict[str, Any]
"""
if app_name is None and module_id is None:
raise ValueError("Either app_name or module_id must be provided.")
if app_name is not None and module_id is not None:
raise ValueError("Only one of app_name or module_id must be provided.")
if module_id is None:
module_id = self._api.app.find_module_id_by_app_name(app_name)
self._run_serve_app(agent_id, module_id, **kwargs)
def _run_serve_app(
self, agent_id: int, module_id, workspace_id: int = None, timeout: int = 100, **kwargs
):
_attempt_delay_sec = 1
_attempts = timeout // _attempt_delay_sec
if workspace_id is None:
workspace_id = env.workspace_id()
kwargs = get_valid_kwargs(
kwargs=kwargs,
func=self._api.task.start,
exclude=["self", "module_id", "workspace_id", "agent_id"],
)
task_info = self._api.task.start(
agent_id=agent_id,
module_id=module_id,
workspace_id=workspace_id,
**kwargs,
)
ready = self._api.app.wait_until_ready_for_api_calls(
task_info["id"], _attempts, _attempt_delay_sec
)
if not ready:
raise TimeoutError(
f"Task {task_info['id']} is not ready for API calls after {timeout} seconds."
)
return task_info
def _load_model_from_api(self, task_id, deploy_params, model_name: Optional[str] = None):
logger.info("Loading model")
self._api.task.send_request(
task_id,
"deploy_from_api",
data={"deploy_params": deploy_params, "model_name": model_name},
raise_error=True,
)
logger.info("Model loaded successfully")
[docs]
def find_serving_app_by_framework(self, framework: str):
"""
Find the serving app by framework.
:param framework: Framework name.
:type framework: str
:returns: Serving app info
:rtype: Dict[str, Any]
"""
modules = find_apps_by_framework(self._api, framework, ["serve"])
if not modules:
return None
return modules[0]
[docs]
def find_serving_app_by_slug(self, slug: str) -> int:
"""
Find the serving app by slug.
:param slug: Slug of the serving app.
:type slug: str
:returns: Serving app ID
:rtype: int
"""
return self._api.app.get_ecosystem_module_id(slug)
[docs]
def get_serving_app_by_train_app(self, app_name: Optional[str] = None, module_id: int = None):
"""
Get the serving app by train app.
:param app_name: App name in Supervisely.
:type app_name: Optional[str]
:param module_id: Module ID in Supervisely.
:type module_id: int
:returns: Serving app info
:rtype: Dict[str, Any]
"""
if app_name is None and module_id is None:
raise ValueError("Either app_name or module_id must be provided.")
if app_name is not None:
module_id = self._api.app.find_module_id_by_app_name(app_name)
train_module_info = self._api.app.get_ecosystem_module_info(module_id)
train_app_config = train_module_info.config
categories = train_app_config["categories"]
framework = None
for category in categories:
if category.lower().startswith("framework:"):
framework = category.lstrip("framework:")
break
if framework is None:
raise ValueError(
"Unable to define serving app. Framework is not specified in the train app"
)
logger.debug(f"Detected framework: {framework}")
module = self.find_serving_app_by_framework(framework)
if module is None:
raise ValueError(f"No serving apps found for framework {framework}")
return module
[docs]
def get_deploy_info(self, task_id: int) -> Dict[str, Any]:
"""
Get deploy info of a serving task.
:param task_id: Task ID of the serving App.
:type task_id: int
:returns: Deploy Info
:rtype: Dict[str, Any]
"""
return self._api.task.send_request(task_id, "get_deploy_info", data={}, raise_error=True)
def _deploy_params_v1(
self,
team_id: int,
artifacts_dir: str,
checkpoint_name: str,
device: str,
runtime: str,
with_module: bool = True,
) -> Tuple[int, Dict[str, Any]]:
from supervisely.nn.artifacts import RITM, YOLOv5
from supervisely.nn.artifacts.artifacts import BaseTrainArtifacts
from supervisely.nn.utils import ModelSource
framework_cls = self._get_framework_by_path(artifacts_dir)
if not framework_cls:
raise ValueError(f"Unsupported framework for artifacts_dir: '{artifacts_dir}'")
framework: BaseTrainArtifacts = framework_cls(team_id)
if framework_cls is RITM or framework_cls is YOLOv5:
raise ValueError(
f"{framework.framework_name} framework is not supported for deployment"
)
runtime = get_runtime(runtime)
logger.debug(f"Detected framework: '{framework.framework_name}'")
module_id = None
serve_app_name = None
if with_module:
module_id = self._api.app.get_ecosystem_module_id(framework.serve_slug)
serve_app_name = framework.serve_app_name
logger.debug(f"Module ID fetched:' {module_id}'. App name: '{serve_app_name}'")
train_info = framework.get_info_by_artifacts_dir(artifacts_dir.rstrip("/"))
if not hasattr(train_info, "checkpoints") or not train_info.checkpoints:
raise ValueError("No checkpoints found in train info.")
checkpoint = None
if checkpoint_name is not None:
for cp in train_info.checkpoints:
if cp.name == checkpoint_name:
checkpoint = cp
break
if checkpoint is None:
raise ValueError(f"Checkpoint '{checkpoint_name}' not found in train info.")
else:
logger.info("Checkpoint name not provided. Using the last checkpoint.")
checkpoint = train_info.checkpoints[-1]
checkpoint_name = checkpoint.name
deploy_params = {
"device": device,
"model_source": ModelSource.CUSTOM,
"task_type": train_info.task_type,
"checkpoint_name": checkpoint_name,
"checkpoint_url": checkpoint.path,
}
if getattr(train_info, "config_path", None) is not None:
deploy_params["config_url"] = train_info.config_path
if framework.require_runtime:
deploy_params["runtime"] = runtime
return module_id, serve_app_name, deploy_params
def _deploy_params_v2(
self,
team_id: int,
artifacts_dir: str,
checkpoint_name: str,
device: str,
runtime: Optional[str] = None,
with_module: bool = True,
):
from supervisely.nn.experiments import get_experiment_info_by_artifacts_dir
from supervisely.nn.utils import ModelSource
experiment_info = get_experiment_info_by_artifacts_dir(self._api, team_id, artifacts_dir)
if not experiment_info:
raise ValueError(
f"Failed to retrieve experiment info for artifacts_dir: '{artifacts_dir}'"
)
runtime = get_runtime(runtime)
module = None
module_id = None
serve_app_name = None
if with_module:
framework_name = experiment_info.framework_name
module = self.find_serving_app_by_framework(framework_name)
serve_app_name = module["name"]
module_id = module["id"]
logger.debug(f"Serving app detected: '{serve_app_name}'. Module ID: '{module_id}'")
if len(experiment_info.checkpoints) == 0:
raise ValueError(f"No checkpoints found in: '{artifacts_dir}'.")
checkpoint = None
if checkpoint_name is not None:
if checkpoint_name.endswith(".pt") or checkpoint_name.endswith(".pth"):
for checkpoint_path in experiment_info.checkpoints:
if get_file_name_with_ext(checkpoint_path) == checkpoint_name:
checkpoint = get_file_name_with_ext(checkpoint_path)
break
elif checkpoint_name.endswith(".onnx"):
checkpoint_path = experiment_info.export.get("ONNXRuntime")
if checkpoint_path is None:
raise ValueError(f"ONNXRuntime export not found in: '{artifacts_dir}'.")
elif checkpoint_name.endswith(".engine"):
checkpoint_path = experiment_info.export.get("TensorRT")
if checkpoint_path is None:
raise ValueError(f"TensorRT export not found in: '{artifacts_dir}'.")
else:
raise ValueError(
f"Unknown checkpoint format: '{checkpoint_name}'. Expected formats: '.pt', '.pth', '.onnx' or '.engine'"
)
checkpoint = get_file_name_with_ext(checkpoint_path)
if checkpoint is None:
raise ValueError(f"Provided checkpoint '{checkpoint_name}' not found")
else:
logger.info("Checkpoint name not provided. Using the best checkpoint.")
checkpoint = experiment_info.best_checkpoint
model_info_dict = asdict(experiment_info)
model_info_dict["artifacts_dir"] = artifacts_dir
checkpoint_name = checkpoint
checkpoints_dir = self._get_checkpoints_dir(checkpoint_name)
checkpoint_path = f"/{artifacts_dir.strip('/')}/{checkpoints_dir}/{checkpoint_name}"
if runtime is None:
runtime = self._set_auto_runtime_by_checkpoint(checkpoint_path)
deploy_params = {
"device": device,
"model_source": ModelSource.CUSTOM,
"model_files": {"checkpoint": checkpoint_path},
"model_info": model_info_dict,
"runtime": runtime,
}
for file_key, file_path in experiment_info.model_files.items():
full_file_path = os.path.join(experiment_info.artifacts_dir, file_path)
if not self._api.file.exists(team_id, full_file_path):
logger.debug(
f"Model file not found: '{full_file_path}'. Trying to find it by checkpoint path."
)
full_file_path = os.path.join(artifacts_dir, file_path)
if not self._api.file.exists(team_id, full_file_path):
raise ValueError(
f"Model file not found: '{full_file_path}'. Make sure that the file exists in the artifacts directory."
)
deploy_params["model_files"][file_key] = full_file_path
logger.debug(f"Model file added: {full_file_path}")
return module_id, serve_app_name, deploy_params
def _set_auto_runtime_by_checkpoint(self, checkpoint_path: str) -> str:
if checkpoint_path.endswith(".pt") or checkpoint_path.endswith(".pth"):
return RuntimeType.PYTORCH
elif checkpoint_path.endswith(".onnx"):
return RuntimeType.ONNXRUNTIME
elif checkpoint_path.endswith(".engine"):
return RuntimeType.TENSORRT
else:
raise ValueError(f"Unknown checkpoint format: '{checkpoint_path}'")
[docs]
def wait(self, model_id, target: Literal["started", "deployed"] = "started", timeout=5 * 60):
"""
Wait for the model to be started or deployed.
:param model_id: Model ID.
:type model_id: int
:param target: Target status.
:type target: Literal["started", "deployed"]
:param timeout: Timeout in seconds.
:type timeout: int
"""
t = time.monotonic()
method = "is_alive" if target == "started" else "is_ready"
while time.monotonic() - t < timeout:
self._api.task.send_request(model_id, "is_ready", {})
time.sleep(1)
def _get_artifacts_dir_and_checkpoint_name(self, model: str) -> Tuple[str, str]:
if not model.startswith("/"):
raise ValueError(f"Path must start with '/'")
if model.startswith("/experiments"):
if model.endswith(".pt") or model.endswith(".pth"):
try:
artifacts_dir, checkpoint_name = model.split("/checkpoints/")
return artifacts_dir, checkpoint_name
except:
raise ValueError(
"Bad format of checkpoint path. Expected format: '/artifacts_dir/checkpoints/checkpoint_name'"
)
elif model.endswith(".onnx") or model.endswith(".engine"):
try:
artifacts_dir, checkpoint_name = model.split("/export/")
return artifacts_dir, checkpoint_name
except:
raise ValueError(
"Bad format of checkpoint path. Expected format: '/artifacts_dir/export/checkpoint_name'"
)
else:
raise ValueError(f"Unknown model format: '{get_file_name_with_ext(model)}'")
framework_cls = self._get_framework_by_path(model)
if framework_cls is None:
raise ValueError(f"Unknown path: '{model}'")
team_id = env.team_id()
framework = framework_cls(team_id)
checkpoint_name = get_file_name_with_ext(model)
checkpoints_dir = model.replace(checkpoint_name, "")
if framework.weights_folder is not None:
artifacts_dir = checkpoints_dir.replace(framework.weights_folder, "")
else:
artifacts_dir = checkpoints_dir
return artifacts_dir, checkpoint_name
def _get_checkpoints_dir(self, checkpoint_name: str) -> str:
if checkpoint_name.endswith(".pt") or checkpoint_name.endswith(".pth"):
return "checkpoints"
elif checkpoint_name.endswith(".onnx") or checkpoint_name.endswith(".engine"):
return "export"
else:
raise ValueError(f"Unknown checkpoint format: '{checkpoint_name}'")
def _get_framework_by_path(self, path: str):
from supervisely.nn.artifacts import (
RITM,
RTDETR,
Detectron2,
MMClassification,
MMDetection,
MMDetection3,
MMPretrain,
MMSegmentation,
UNet,
YOLOv5,
YOLOv5v2,
YOLOv8,
)
from supervisely.nn.artifacts.artifacts import BaseTrainArtifacts
from supervisely.nn.utils import ModelSource
path_obj = Path(path)
if len(path_obj.parts) < 2:
raise ValueError(f"Incorrect checkpoint path: '{path}'")
parent = path_obj.parts[1]
frameworks = {
"/detectron2": Detectron2,
"/mmclassification": MMClassification,
"/mmclassification-v2": MMPretrain,
"/mmdetection": MMDetection,
"/mmdetection-3": MMDetection3,
"/mmsegmentation": MMSegmentation,
"/RITM_training": RITM,
"/RT-DETR": RTDETR,
"/unet": UNet,
"/yolov5_train": YOLOv5,
"/yolov5_2.0_train": YOLOv5v2,
"/yolov8_train": YOLOv8,
}
if f"/{parent}" in frameworks:
return frameworks[f"/{parent}"]