# coding: utf-8
"""Create, monitor, and manage Supervisely tasks."""
import json
import os
import time
from collections import OrderedDict, defaultdict
from pathlib import Path
# docs
from typing import Any, Callable, Dict, List, Literal, NamedTuple, Optional, Union
import requests
from pydantic import BaseModel, Field
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
from tqdm import tqdm
from supervisely import logger
from supervisely._utils import batched, take_with_default
from supervisely.api.module_api import (
ApiField,
ModuleApiBase,
ModuleWithStatus,
WaitingTimeExceeded,
)
from supervisely.collection.str_enum import StrEnum
from supervisely.io.env import app_categories
from supervisely.io.fs import (
ensure_base_path,
get_file_hash,
get_file_name,
get_file_name_with_ext,
)
class KubernetesSettings(BaseModel):
"""Application resource limits and requests (CPUs, memory, GPU, storage) for tasks."""
use_health_check: Optional[bool] = Field(None, alias="useHealthCheck")
request_cpus: Optional[int] = Field(None, alias="requestCpus")
limit_cpus: Optional[int] = Field(None, alias="limitCpus")
limit_memory_gb: Optional[int] = Field(None, alias="limitMemoryGb")
limit_shm_gb: Optional[int] = Field(None, alias="limitShmGb")
limit_storage_gb: Optional[int] = Field(None, alias="limitStorageGb")
limit_gpus: Optional[int] = Field(None, alias="limitGpus")
limit_gpu_memory_mb: Optional[int] = Field(None, alias="limitGpuMemoryMb")
limit_gpu_cores_perc: Optional[int] = Field(None, alias="limitGpuCoresPerc")
model_config = {"populate_by_name": True}
def to_dict(self) -> Dict[str, Any]:
"""Convert to dict with only non-None values using aliases."""
return self.model_dump(exclude_none=True, by_alias=True)
class TaskFinishedWithError(Exception):
"""Raised when a task finishes in an error state while waiting/polling."""
pass
[docs]
class TaskApi(ModuleApiBase, ModuleWithStatus):
"""API for working with tasks."""
[docs]
class RestartPolicy(StrEnum):
"""Task restart policy used for app deployments."""
NEVER = "never"
""""""
ON_ERROR = "on_error"
""""""
[docs]
class PluginTaskType(StrEnum):
"""Task type for plugin-based tasks (train/inference/smarttool/etc.)."""
TRAIN = "train"
""""""
INFERENCE = "inference"
""""""
INFERENCE_RPC = "inference_rpc"
""""""
SMART_TOOL = "smarttool"
""""""
CUSTOM = "custom"
""""""
[docs]
class Status(StrEnum):
"""Task lifecycle status values returned by the API."""
QUEUED = "queued"
"""Application is queued for execution"""
CONSUMED = "consumed"
"""Application is consumed by an agent"""
STARTED = "started"
"""Application has been started"""
DEPLOYED = "deployed"
"""Only for Plugins"""
ERROR = "error"
"""Application has finished with an error"""
FINISHED = "finished"
"""Application has finished successfully"""
TERMINATING = "terminating"
"""Application is being terminated"""
STOPPED = "stopped"
"""Application has been stopped"""
def __init__(self, api):
"""
:param api: :class:`~supervisely.api.api.Api` object to use for API connection.
:type api: :class:`~supervisely.api.api.Api`
:Usage Example:
.. code-block:: python
import supervisely as sly
api = sly.Api.from_env()
task_info = api.task.get_info_by_id(task_id)
"""
ModuleApiBase.__init__(self, api)
ModuleWithStatus.__init__(self)
[docs]
def get_list(
self, workspace_id: int, filters: Optional[List[Dict[str, str]]] = None
) -> List[NamedTuple]:
"""
List of Application Tasks in the given Workspace.
:param workspace_id: Workspace ID.
:type workspace_id: int
:param filters: List of params to sort output Projects.
:type filters: List[dict], optional
:returns: List of Tasks with information for the given Workspace.
:rtype: :class:`List[NamedTuple]`
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
workspace_id = 23821
task_infos = api.task.get_list(workspace_id)
task_infos_filter = api.task.get_list(23821, filters=[{'field': 'id', 'operator': '=', 'value': 121230}])
print(task_infos_filter)
# Output: [
# {
# "id": 121230,
# "type": "clone",
# "status": "finished",
# "startedAt": "2019-12-19T12:13:09.702Z",
# "finishedAt": "2019-12-19T12:13:09.701Z",
# "meta": {
# "input": {
# "model": {
# "id": 1849
# },
# "isExternal": true,
# "pluginVersionId": 84479
# },
# "output": {
# "model": {
# "id": 12380
# },
# "pluginVersionId": 84479
# }
# },
# "description": ""
# }
# ]
"""
return self.get_list_all_pages(
"tasks.list",
{ApiField.WORKSPACE_ID: workspace_id, ApiField.FILTER: filters or []},
)
[docs]
def get_info_by_id(self, id: int) -> NamedTuple:
"""
Get Application Task information by ID.
:param id: Task ID in Supervisely.
:type id: int
:returns: Information about Task.
:rtype: :class:`NamedTuple`
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
task_id = 121230
task_info = api.task.get_info_by_id(task_id)
print(task_info)
# Output: {
# "id": 121230,
# "workspaceId": 23821,
# "description": "",
# "type": "clone",
# "status": "finished",
# "startedAt": "2019-12-19T12:13:09.702Z",
# "finishedAt": "2019-12-19T12:13:09.701Z",
# "userId": 16154,
# "meta": {
# "app": {
# "id": 10370,
# "name": "Auto Import",
# "version": "test-branch",
# "isBranch": true,
# },
# "input": {
# "model": {
# "id": 1849
# },
# "isExternal": true,
# "pluginVersionId": 84479
# },
# "output": {
# "model": {
# "id": 12380
# },
# "pluginVersionId": 84479
# }
# },
# "settings": {},
# "agentName": null,
# "userLogin": "alexxx",
# "teamId": 16087,
# "agentId": null
# }
"""
return self._get_info_by_id(id, "tasks.info")
[docs]
def get_status(self, task_id: int) -> Status:
"""
Check status of Application Task by ID.
:param task_id: Task ID in Supervisely.
:type task_id: int
:returns: Status object
:rtype: :class:`Status`
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
task_id = 121230
task_status = api.task.get_status(task_id)
print(task_status)
# Output: finished
"""
status_str = self.get_info_by_id(task_id)[ApiField.STATUS] # @TODO: convert json to tuple
return self.Status(status_str)
[docs]
def raise_for_status(self, status: Status) -> None:
"""
Raise error if Application Task status is ERROR.
:param status: Status object.
:type status: Status
:returns: None
:rtype: None
"""
if status is self.Status.ERROR:
raise TaskFinishedWithError(f"Task finished with status {str(self.Status.ERROR)}")
[docs]
def wait(
self,
id: int,
target_status: Status,
wait_attempts: Optional[int] = None,
wait_attempt_timeout_sec: Optional[int] = None,
):
"""
Awaiting achievement by given Application Task of a given status.
:param id: Task ID in Supervisely.
:type id: int
:param target_status: Status object(status of task we expect to destinate).
:type target_status: Status
:param wait_attempts: The number of attempts to determine the status of the task that we are waiting for.
:type wait_attempts: int, optional
:param wait_attempt_timeout_sec: Number of seconds for intervals between attempts(raise error if waiting time exceeded).
:type wait_attempt_timeout_sec: int, optional
:returns: True if the desired status is reached, False otherwise
:rtype: bool
"""
wait_attempts = wait_attempts or self.MAX_WAIT_ATTEMPTS
effective_wait_timeout = wait_attempt_timeout_sec or self.WAIT_ATTEMPT_TIMEOUT_SEC
for attempt in range(wait_attempts):
status = self.get_status(id)
self.raise_for_status(status)
if status in [
target_status,
self.Status.FINISHED,
self.Status.DEPLOYED,
self.Status.STOPPED,
]:
return
time.sleep(effective_wait_timeout)
raise WaitingTimeExceeded(
f"Waiting time exceeded: total waiting time {wait_attempts * effective_wait_timeout} seconds, i.e. {wait_attempts} attempts for {effective_wait_timeout} seconds each"
)
[docs]
def get_context(self, id: int) -> Dict:
"""
Get context information by Application Task ID.
:param id: Task ID in Supervisely.
:type id: int
:returns: Context information in dict format
:rtype: dict
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
task_id = 121230
context = api.task.get_context(task_id)
print(context)
# Output: {
# "team": {
# "id": 16087,
# "name": "alexxx"
# },
# "workspace": {
# "id": 23821,
# "name": "my_super_workspace"
# }
# }
"""
response = self._api.post("GetTaskContext", {ApiField.ID: id})
return response.json()
def _convert_json_info(self, info: dict):
"""_convert_json_info"""
return info
[docs]
def start(
self,
agent_id,
app_id: Optional[int] = None,
workspace_id: Optional[int] = None,
description: Optional[str] = "application description",
params: Dict[str, Any] = None,
log_level: Optional[Literal["info", "debug", "warning", "error"]] = "info",
users_ids: Optional[List[int]] = None,
app_version: Optional[str] = "",
is_branch: Optional[bool] = False,
task_name: Optional[str] = "pythonSpawned",
restart_policy: Optional[Literal["never", "on_error"]] = "never",
proxy_keep_url: Optional[bool] = False,
module_id: Optional[int] = None,
redirect_requests: Optional[Dict[str, int]] = {},
limit_by_workspace: bool = False,
kubernetes_settings: Optional[Union[KubernetesSettings, Dict[str, Any]]] = None,
multi_user_session: bool = False,
) -> Dict[str, Any]:
"""Starts the Application Task on the agent.
:param agent_id: Agent ID. Can be obtained from TeamCluster page in UI.
:type agent_id: int
:param app_id: Deprecated. Use module_id instead.
:type app_id: int, optional
:param workspace_id: Workspace ID where the task will be created.
:type workspace_id: int, optional
:param description: Task description which will be shown in UI.
:type description: str, optional
:param params: Task parameters which will be passed to the application, check the
code example below for more details.
:type params: Dict[str, Any], optional
:param log_level: Log level for the application.
:type log_level: Literal["info", "debug", "warning", "error"], optional
:param users_ids: List of user IDs for which will be created an instance of the application.
For each user a separate task will be created.
:type users_ids: List[int], optional
:param app_version: Application version e.g. "v1.0.0" or branch name e.g. "dev".
:type app_version: str, optional
:param is_branch: If the application version is a branch name, set this parameter to True.
:type is_branch: bool, optional
:param task_name: Task name which will be shown in UI.
:type task_name: str, optional
:param restart_policy: when the task should be restarted: never or if error occurred.
:type restart_policy: Literal["never", "on_error"], optional
:param proxy_keep_url: For internal usage only.
:type proxy_keep_url: bool, optional
:param module_id: Module ID. Can be obtained from the apps page in UI.
:type module_id: int, optional
:param redirect_requests: For internal usage only in Develop and Debug mode.
:type redirect_requests: Dict[str, int], optional
:param limit_by_workspace: If set to True tasks will be only visible inside of the workspace
with specified workspace_id.
:type limit_by_workspace: bool, optional
:param kubernetes_settings: Kubernetes settings for the application.
:type kubernetes_settings: Union[:class:`~supervisely.api.task_api.KubernetesSettings`, Dict[str, Any]], optional
:param multi_user_session: If True, the application session will be created as multi-user.
In this case, multiple users will be able to connect to the same application session.
All users will have separate application states.
Available only for applications that support multi-user sessions.
:type multi_user_session: bool, default is False
:returns: Task information in JSON format.
:rtype: Dict[str, Any]
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
app_slug = "supervisely-ecosystem/export-to-supervisely-format"
module_id = api.app.get_ecosystem_module_id(app_slug)
module_info = api.app.get_ecosystem_module_info(module_id)
project_id = 12345
agent_id = 12345
workspace_id = 12345
params = module_info.get_arguments(images_project=project_id)
session = api.app.start(
agent_id=agent_id,
module_id=module_id,
workspace_id=workspace_id,
task_name="Prepare download link",
params=params,
app_version="dninja",
is_branch=True,
)
"""
if app_id is not None and module_id is not None:
raise ValueError("Only one of the arguments (app_id or module_id) have to be defined")
if app_id is None and module_id is None:
raise ValueError("One of the arguments (app_id or module_id) have to be defined")
advanced_settings = {
ApiField.LIMIT_BY_WORKSPACE: limit_by_workspace,
}
if kubernetes_settings is not None:
if isinstance(kubernetes_settings, KubernetesSettings):
kubernetes_settings = kubernetes_settings.to_dict()
if not isinstance(kubernetes_settings, dict):
raise TypeError(
f"kubernetes_settings must be a dict or an instance of KubernetesSettings, got {type(kubernetes_settings)}"
)
advanced_settings.update(kubernetes_settings)
data = {
ApiField.AGENT_ID: agent_id,
# "nodeId": agent_id,
ApiField.WORKSPACE_ID: workspace_id,
ApiField.DESCRIPTION: description,
ApiField.PARAMS: take_with_default(params, {"state": {}}),
ApiField.LOG_LEVEL: log_level,
ApiField.USERS_IDS: take_with_default(users_ids, []),
ApiField.APP_VERSION: app_version,
ApiField.IS_BRANCH: is_branch,
ApiField.TASK_NAME: task_name,
ApiField.RESTART_POLICY: restart_policy,
ApiField.PROXY_KEEP_URL: proxy_keep_url,
ApiField.ADVANCED_SETTINGS: advanced_settings,
}
if len(redirect_requests) > 0:
data[ApiField.REDIRECT_REQUESTS] = redirect_requests
if app_id is not None:
data[ApiField.APP_ID] = app_id
if module_id is not None:
data[ApiField.MODULE_ID] = module_id
if multi_user_session:
# * Enables single multi-user session mode for all users in the users_ids list.
# * Otherwise, if users_ids contains multiple IDs, separate single-user sessions will be created for each.
# * If users_ids is empty, a session is created only for the current user.
data[ApiField.SINGLE_SESSION_MODE] = multi_user_session
resp = self._api.post(method="tasks.run.app", data=data)
task = resp.json()[0]
if "id" not in task:
task["id"] = task.get("taskId")
return task
[docs]
def stop(self, id: int):
"""stop"""
response = self._api.post("tasks.stop", {ApiField.ID: id})
return self.Status(response.json()[ApiField.STATUS])
[docs]
def submit_logs(self, logs) -> None:
"""submit_logs"""
response = self._api.post("tasks.logs.add", {ApiField.LOGS: logs})
# return response.json()[ApiField.TASK_ID]
[docs]
def upload_files(
self,
task_id: int,
abs_paths: List[str],
names: List[str],
progress_cb: Optional[Union[tqdm, Callable]] = None,
) -> None:
"""upload_files"""
if len(abs_paths) != len(names):
raise RuntimeError("Inconsistency: len(abs_paths) != len(names)")
hashes = []
if len(abs_paths) == 0:
return
hash_to_items = defaultdict(list)
hash_to_name = defaultdict(list)
for idx, item in enumerate(zip(abs_paths, names)):
path, name = item
item_hash = get_file_hash(path)
hashes.append(item_hash)
hash_to_items[item_hash].append(path)
hash_to_name[item_hash].append(name)
unique_hashes = set(hashes)
remote_hashes = self._api.image.check_existing_hashes(list(unique_hashes))
new_hashes = unique_hashes - set(remote_hashes)
# @TODO: upload remote hashes
if len(remote_hashes) != 0:
files = []
for hash in remote_hashes:
for name in hash_to_name[hash]:
files.append({ApiField.NAME: name, ApiField.HASH: hash})
for batch in batched(files):
resp = self._api.post(
"tasks.files.bulk.add-by-hash",
{ApiField.TASK_ID: task_id, ApiField.FILES: batch},
)
if progress_cb is not None:
progress_cb(len(remote_hashes))
for batch in batched(list(zip(abs_paths, names, hashes))):
content_dict = OrderedDict()
for idx, item in enumerate(batch):
path, name, hash = item
if hash in remote_hashes:
continue
content_dict["{}".format(idx)] = json.dumps({"fullpath": name, "hash": hash})
content_dict["{}-file".format(idx)] = (name, open(path, "rb"), "")
if len(content_dict) > 0:
encoder = MultipartEncoder(fields=content_dict)
resp = self._api.post("tasks.files.bulk.upload", encoder)
if progress_cb is not None:
progress_cb(len(content_dict))
# {
# data: {my_val: 1}
# obj: {val: 1, res: 2}
# }
# {
# obj: {new_val: 1}
# }
# // apped: true, recursive: false
# {
# data: {my_val: 1}
# obj: {new_val: 1}
# }(edited)
# // append: false, recursive: false
# {
# obj: {new_val: 1}
# }(edited)
#
# 16: 32
# // append: true, recursive: true
# {
# data: {my_val: 1}
# obj: {val: 1, res: 2, new_val: 1}
# }
[docs]
def set_fields(self, task_id: int, fields: List) -> Dict:
"""set_fields"""
for idx, obj in enumerate(fields):
for key in [ApiField.FIELD, ApiField.PAYLOAD]:
if key not in obj:
raise KeyError("Object #{} does not have field {!r}".format(idx, key))
data = {ApiField.TASK_ID: task_id, ApiField.FIELDS: fields}
resp = self._api.post("tasks.data.set", data)
return resp.json()
[docs]
def set_fields_from_dict(self, task_id: int, d: Dict) -> Dict:
"""set_fields_from_dict"""
fields = []
for k, v in d.items():
fields.append({ApiField.FIELD: k, ApiField.PAYLOAD: v})
return self.set_fields(task_id, fields)
[docs]
def set_field(
self,
task_id: int,
field: Dict,
payload: Dict,
append: Optional[bool] = False,
recursive: Optional[bool] = False,
) -> Dict:
"""set_field"""
fields = [
{
ApiField.FIELD: field,
ApiField.PAYLOAD: payload,
ApiField.APPEND: append,
ApiField.RECURSIVE: recursive,
}
]
return self.set_fields(task_id, fields)
[docs]
def get_fields(self, task_id, fields: List):
"""get_fields"""
data = {ApiField.TASK_ID: task_id, ApiField.FIELDS: fields}
resp = self._api.post("tasks.data.get", data)
return resp.json()["result"]
[docs]
def get_field(self, task_id: int, field: str):
"""get_field"""
result = self.get_fields(task_id, [field])
return result[field]
def _set_output(self):
"""_set_output"""
pass
[docs]
def set_output_project(
self,
task_id: int,
project_id: int,
project_name: Optional[str] = None,
project_preview: Optional[str] = None,
) -> Dict:
"""set_output_project"""
if "import" in app_categories():
self._api.project.add_import_history(project_id, task_id)
if project_name is None:
project = self._api.project.get_info_by_id(project_id, raise_error=True)
project_name = project.name
project_preview = project.image_preview_url
output = {ApiField.PROJECT: {ApiField.ID: project_id, ApiField.TITLE: project_name}}
if project_preview is not None:
output[ApiField.PROJECT][ApiField.PREVIEW] = project_preview
resp = self._api.post(
"tasks.output.set", {ApiField.TASK_ID: task_id, ApiField.OUTPUT: output}
)
return resp.json()
[docs]
def set_output_report(
self,
task_id: int,
file_id: int,
file_name: str,
description: Optional[str] = "Report",
) -> Dict:
"""set_output_report"""
return self._set_custom_output(
task_id,
file_id,
file_name,
description=description,
icon="zmdi zmdi-receipt",
)
def _set_custom_output(
self,
task_id,
file_id,
file_name,
file_url=None,
description="File",
icon="zmdi zmdi-file-text",
color="#33c94c",
background_color="#d9f7e4",
download=False,
):
"""_set_custom_output"""
if file_url is None:
file_url = self._api.file.get_url(file_id)
output = {
ApiField.GENERAL: {
"icon": {
"className": icon,
"color": color,
"backgroundColor": background_color,
},
"title": file_name,
"titleUrl": file_url,
"download": download,
"description": description,
}
}
resp = self._api.post(
"tasks.output.set", {ApiField.TASK_ID: task_id, ApiField.OUTPUT: output}
)
return resp.json()
[docs]
def set_output_archive(
self, task_id: int, file_id: int, file_name: str, file_url: Optional[str] = None
) -> Dict:
"""set_output_archive"""
if file_url is None:
file_url = self._api.file.get_info_by_id(file_id).storage_path
return self._set_custom_output(
task_id,
file_id,
file_name,
file_url=file_url,
description="Download archive",
icon="zmdi zmdi-archive",
download=True,
)
[docs]
def set_output_file_download(
self,
task_id: int,
file_id: int,
file_name: str,
file_url: Optional[str] = None,
download: Optional[bool] = True,
) -> Dict:
"""set_output_file_download"""
if file_url is None:
file_url = self._api.file.get_info_by_id(file_id).storage_path
return self._set_custom_output(
task_id,
file_id,
file_name,
file_url=file_url,
description="Download file",
icon="zmdi zmdi-file",
download=download,
)
[docs]
def send_request(
self,
task_id: int,
method: str,
data: Dict,
context: Optional[Dict] = {},
skip_response: bool = False,
timeout: Optional[int] = 60,
outside_request: bool = True,
retries: int = 10,
raise_error: bool = False,
):
"""send_request"""
if type(data) is not dict:
raise TypeError("data argument has to be a dict")
context["outside_request"] = outside_request
resp = self._api.post(
"tasks.request.direct",
{
ApiField.TASK_ID: task_id,
ApiField.COMMAND: method,
ApiField.CONTEXT: context,
ApiField.STATE: data,
"skipResponse": skip_response,
"timeout": timeout,
},
retries=retries,
raise_error=raise_error,
)
return resp.json()
[docs]
def set_output_directory(self, task_id, file_id, directory_path):
"""set_output_directory"""
return self._set_custom_output(
task_id,
file_id,
directory_path,
description="Directory",
icon="zmdi zmdi-folder",
)
def _update_app_content(self, task_id: int, data_patch: List[Dict] = None, state: Dict = None):
payload = {}
if data_patch is not None and len(data_patch) > 0:
payload[ApiField.DATA] = data_patch
if state is not None and len(state) > 0:
payload[ApiField.STATE] = state
resp = self._api.post(
"tasks.app-v2.data.set",
{ApiField.TASK_ID: task_id, ApiField.PAYLOAD: payload},
)
return resp.json()
[docs]
def set_output_error(
self,
task_id: int,
title: str,
description: Optional[str] = None,
show_logs: Optional[bool] = True,
) -> Dict:
"""
Set custom error message to the task output.
:param task_id: Application task ID.
:type task_id: int
:param title: Error message to be displayed in the task output.
:type title: str
:param description: Description to be displayed in the task output.
:type description: Optional[str]
:param show_logs: If True, the link to the task logs will be displayed in the task output.
:type show_logs: Optional[bool], default True
:returns: Response JSON.
:rtype: Dict
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
task_id = 12345
title = "Something went wrong"
description = "Please check the task logs"
show_logs = True
api.task.set_output_error(task_id, title, description, show_logs)
"""
output = {
ApiField.GENERAL: {
"icon": {
"className": "zmdi zmdi-alert-octagon",
"color": "#ff83a6",
"backgroundColor": "#ffeae9",
},
"title": title,
"showLogs": show_logs,
"isError": True,
}
}
if description is not None:
output[ApiField.GENERAL]["description"] = description
resp = self._api.post(
"tasks.output.set",
{ApiField.TASK_ID: task_id, ApiField.OUTPUT: output},
)
return resp.json()
[docs]
def set_output_text(
self,
task_id: int,
title: str,
description: Optional[str] = None,
show_logs: Optional[bool] = False,
zmdi_icon: Optional[str] = "zmdi-comment-alt-text",
icon_color: Optional[str] = "#33c94c",
background_color: Optional[str] = "#d9f7e4",
) -> Dict:
"""
Set custom text message to the task output.
:param task_id: Application task ID.
:type task_id: int
:param title: Text message to be displayed in the task output.
:type title: str
:param description: Description to be displayed in the task output.
:type description: Optional[str]
:param show_logs: If True, the link to the task logs will be displayed in the task output.
:type show_logs: Optional[bool], default False
:param zmdi_icon: Icon class name from Material Design Icons set (ZMDI).
:type zmdi_icon: Optional[str], default "zmdi-comment-alt-text"
:param icon_color: Icon color in HEX format.
:type icon_color: Optional[str], default "#33c94c" (nearest Duron Jolly Green)
:param background_color: Background color in HEX format.
:type background_color: Optional[str], default "#d9f7e4" (Cosmic Latte)
:returns: Response JSON.
:rtype: Dict
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
task_id = 12345
title = "Task is finished"
api.task.set_output_text(task_id, title)
"""
output = {
ApiField.GENERAL: {
"icon": {
"className": f"zmdi {zmdi_icon}",
"color": icon_color,
"backgroundColor": background_color,
},
"title": title,
"showLogs": show_logs,
"isError": False,
}
}
if description is not None:
output[ApiField.GENERAL]["description"] = description
resp = self._api.post(
"tasks.output.set",
{ApiField.TASK_ID: task_id, ApiField.OUTPUT: output},
)
return resp.json()
[docs]
def update_status(
self,
task_id: int,
status: Status,
) -> None:
"""Sets the specified status for the task.
:param task_id: Task ID in Supervisely.
:type task_id: int
:param status: Task status to set.
:type status: One of the values from :class:`Status`, e.g. Status.FINISHED, Status.ERROR, etc.
:raises ValueError: If the status value is not allowed.
"""
# If status was passed without converting to string, convert it.
# E.g. Status.FINISHED -> "finished"
status = str(status)
if status not in self.Status.values():
raise ValueError(
f"Invalid status value: {status}. Allowed values: {self.Status.values()}"
)
self._api.post("tasks.status.update", {ApiField.ID: task_id, ApiField.STATUS: status})
[docs]
def set_output_experiment(self, task_id: int, experiment_info: dict) -> Dict:
"""
Sets output for the task with experiment info.
:param task_id: Task ID in Supervisely.
:type task_id: int
:param experiment_info: Experiment info from :class:`~supervisely.nn.training.train_app.TrainApp`. See class :class:`~supervisely.nn.experiments.ExperimentInfo`.
:type experiment_info: dict
:returns: Server response JSON.
:rtype: dict
:Usage Example:
.. code-block:: python
experiment_info = {
"experiment_name": "247 Lemons RT-DETRv2-M",
"framework_name": "RT-DETRv2",
"model_name": "RT-DETRv2-M",
"task_type": "object detection",
"project_id": 76,
"project_version": {"id": 222, "version": 4},
"task_id": 247,
"model_files": {"config": "model_config.yml"},
"checkpoints": [
"checkpoints/best.pth",
"checkpoints/checkpoint0025.pth",
"checkpoints/checkpoint0050.pth",
"checkpoints/last.pth",
],
"best_checkpoint": "best.pth",
"export": {"ONNXRuntime": "export/best.onnx"},
"app_state": "app_state.json",
"model_meta": "model_meta.json",
"train_val_split": "train_val_split.json",
"train_size": 4,
"val_size": 2,
"train_collection_id": 530,
"val_collection_id": 531,
"hyperparameters": "hyperparameters.yaml",
"artifacts_dir": "/experiments/76_Lemons/247_RT-DETRv2/",
"datetime": "2025-01-22 18:13:43",
"evaluation_report_id": 12961,
"evaluation_report_link": "https://app.supervisely.com/model-benchmark?id=12961",
"evaluation_metrics": {"mAP": 0.994, "AP50": 1.0, "AP75": 1.0}, # and other metrics
"primary_metric": "mAP",
"logs": {"type": "tensorboard", "link": "/experiments/76_Lemons/247_RT-DETRv2/logs/"},
}
"""
output = {
ApiField.EXPERIMENT: {ApiField.DATA: {**experiment_info}},
}
resp = self._api.post(
"tasks.output.set", {ApiField.TASK_ID: task_id, ApiField.OUTPUT: output}
)
return resp.json()
[docs]
def is_running(self, task_id: int) -> bool:
"""
Check if the task is running.
:param task_id: Task ID in Supervisely.
:type task_id: int
:returns: True if the task is running, False otherwise.
:rtype: bool
"""
try:
self.send_request(task_id, "is_running", {}, retries=1, raise_error=True)
except requests.exceptions.HTTPError as e:
return False
return True
[docs]
def is_ready(self, task_id: int) -> bool:
"""
Check if the task is ready.
:param task_id: Task ID in Supervisely.
:type task_id: int
:returns: True if the task is ready, False otherwise.
:rtype: bool
"""
try:
return (
self.send_request(task_id, "is_ready", {}, retries=1, raise_error=True)["status"]
== "ready"
)
except requests.exceptions.HTTPError as e:
return False
[docs]
def get_logs(self, task_id: int, since_time: Optional[str] = None, limit: Optional[int] = None) -> List[Dict]:
"""
Returns the log entries for the given task ID in JSON format.
:param task_id: Task ID in Supervisely.
:type task_id: int
:param since_time: Optional ISO 8601 formatted timestamp to filter logs that were created after the specified time.
:type since_time: Optional[str]
:param limit: Optional integer to limit the number of log entries returned.
:type limit: Optional[int]
:returns: List of log entries in JSON format.
:rtype: List[Dict]
:Usage Example:
.. code-block:: python
since_time = "2026-03-30T14:28:58.816Z"
limit = 100
task_id = 12345
logs = api.task.get_logs(task_id, since_time=since_time, limit=limit)
for log_entry in logs:
print(log_entry) # {'level': 'info', 'message': "Added object class... 'timestamp': '2026-03-30T14:28:56.525Z', 'task_id': 58595}
"""
payload = {
ApiField.ID: task_id,
}
if since_time is not None:
payload[ApiField.SINCE_TIME] = since_time
if limit is not None:
payload[ApiField.LIMIT] = limit
resp = self._api.get(
"tasks.log.download",
payload,
)
return resp.json()