Source code for supervisely.api.task_api

# coding: utf-8
"""api for working with tasks"""

import json
import os
import time
from collections import OrderedDict, defaultdict

# docs
from typing import Any, Callable, Dict, List, Literal, NamedTuple, Optional, Union

from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
from tqdm import tqdm

from supervisely._utils import batched, take_with_default
from supervisely.api.module_api import (
    ApiField,
    ModuleApiBase,
    ModuleWithStatus,
    WaitingTimeExceeded,
)
from supervisely.collection.str_enum import StrEnum
from supervisely.io.fs import ensure_base_path, get_file_hash, get_file_name


class TaskFinishedWithError(Exception):
    """TaskFinishedWithError"""

    pass


[docs]class TaskApi(ModuleApiBase, ModuleWithStatus): """ API for working with Tasks. :class:`TaskApi<TaskApi>` object is immutable. :param api: API connection to the server. :type api: Api :Usage example: .. code-block:: python import os from dotenv import load_dotenv import supervisely as sly # Load secrets and create API object from .env file (recommended) # Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication if sly.is_development(): load_dotenv(os.path.expanduser("~/supervisely.env")) api = sly.Api.from_env() # Pass values into the API constructor (optional, not recommended) # api = sly.Api(server_address="https://app.supervise.ly", token="4r47N...xaTatb") task_id = 121230 task_info = api.task.get_info_by_id(task_id) """
[docs] class RestartPolicy(StrEnum): """RestartPolicy""" NEVER = "never" """""" ON_ERROR = "on_error" """"""
[docs] class PluginTaskType(StrEnum): """PluginTaskType""" TRAIN = "train" """""" INFERENCE = "inference" """""" INFERENCE_RPC = "inference_rpc" """""" SMART_TOOL = "smarttool" """""" CUSTOM = "custom" """"""
[docs] class Status(StrEnum): """Status""" QUEUED = "queued" """Application is queued for execution""" CONSUMED = "consumed" """Application is consumed by an agent""" STARTED = "started" """Application has been started""" DEPLOYED = "deployed" """Only for Plugins""" ERROR = "error" """Application has finished with an error""" FINISHED = "finished" """Application has finished successfully""" TERMINATING = "terminating" """Application is being terminated""" STOPPED = "stopped" """Application has been stopped"""
def __init__(self, api): ModuleApiBase.__init__(self, api) ModuleWithStatus.__init__(self)
[docs] def get_list( self, workspace_id: int, filters: Optional[List[Dict[str, str]]] = None ) -> List[NamedTuple]: """ List of Tasks in the given Workspace. :param workspace_id: Workspace ID. :type workspace_id: int :param filters: List of params to sort output Projects. :type filters: List[dict], optional :return: List of Tasks with information for the given Workspace. :rtype: :class:`List[NamedTuple]` :Usage example: .. code-block:: python import supervisely as sly workspace_id = 23821 os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly' os.environ['API_TOKEN'] = 'Your Supervisely API Token' api = sly.Api.from_env() task_infos = api.task.get_list(workspace_id) task_infos_filter = api.task.get_list(23821, filters=[{'field': 'id', 'operator': '=', 'value': 121230}]) print(task_infos_filter) # Output: [ # { # "id": 121230, # "type": "clone", # "status": "finished", # "startedAt": "2019-12-19T12:13:09.702Z", # "finishedAt": "2019-12-19T12:13:09.701Z", # "meta": { # "input": { # "model": { # "id": 1849 # }, # "isExternal": true, # "pluginVersionId": 84479 # }, # "output": { # "model": { # "id": 12380 # }, # "pluginVersionId": 84479 # } # }, # "description": "" # } # ] """ return self.get_list_all_pages( "tasks.list", {ApiField.WORKSPACE_ID: workspace_id, ApiField.FILTER: filters or []}, )
[docs] def get_info_by_id(self, id: int) -> NamedTuple: """ Get Task information by ID. :param id: Task ID in Supervisely. :type id: int :return: Information about Task. :rtype: :class:`NamedTuple` :Usage example: .. code-block:: python import supervisely as sly task_id = 121230 os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly' os.environ['API_TOKEN'] = 'Your Supervisely API Token' api = sly.Api.from_env() task_info = api.task.get_info_by_id(task_id) print(task_info) # Output: { # "id": 121230, # "workspaceId": 23821, # "description": "", # "type": "clone", # "status": "finished", # "startedAt": "2019-12-19T12:13:09.702Z", # "finishedAt": "2019-12-19T12:13:09.701Z", # "userId": 16154, # "meta": { # "input": { # "model": { # "id": 1849 # }, # "isExternal": true, # "pluginVersionId": 84479 # }, # "output": { # "model": { # "id": 12380 # }, # "pluginVersionId": 84479 # } # }, # "settings": {}, # "agentName": null, # "userLogin": "alexxx", # "teamId": 16087, # "agentId": null # } """ return self._get_info_by_id(id, "tasks.info")
[docs] def get_status(self, task_id: int) -> Status: """ Check status of Task by ID. :param id: Task ID in Supervisely. :type id: int :return: Status object :rtype: :class:`Status` :Usage example: .. code-block:: python import supervisely as sly task_id = 121230 os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly' os.environ['API_TOKEN'] = 'Your Supervisely API Token' api = sly.Api.from_env() task_status = api.task.get_status(task_id) print(task_status) # Output: finished """ status_str = self.get_info_by_id(task_id)[ApiField.STATUS] # @TODO: convert json to tuple return self.Status(status_str)
[docs] def raise_for_status(self, status: Status) -> None: """ Raise error if Task status is ERROR. :param status: Status object. :type status: Status :return: None :rtype: :class:`NoneType` """ if status is self.Status.ERROR: raise TaskFinishedWithError(f"Task finished with status {str(self.Status.ERROR)}")
[docs] def wait( self, id: int, target_status: Status, wait_attempts: Optional[int] = None, wait_attempt_timeout_sec: Optional[int] = None, ): """ Awaiting achievement by given Task of a given status. :param id: Task ID in Supervisely. :type id: int :param target_status: Status object(status of task we expect to destinate). :type target_status: Status :param wait_attempts: The number of attempts to determine the status of the task that we are waiting for. :type wait_attempts: int, optional :param wait_attempt_timeout_sec: Number of seconds for intervals between attempts(raise error if waiting time exceeded). :type wait_attempt_timeout_sec: int, optional :return: True if the desired status is reached, False otherwise :rtype: :class:`bool` """ wait_attempts = wait_attempts or self.MAX_WAIT_ATTEMPTS effective_wait_timeout = wait_attempt_timeout_sec or self.WAIT_ATTEMPT_TIMEOUT_SEC for attempt in range(wait_attempts): status = self.get_status(id) self.raise_for_status(status) if status in [ target_status, self.Status.FINISHED, self.Status.DEPLOYED, self.Status.STOPPED, ]: return time.sleep(effective_wait_timeout) raise WaitingTimeExceeded( f"Waiting time exceeded: total waiting time {wait_attempts * effective_wait_timeout} seconds, i.e. {wait_attempts} attempts for {effective_wait_timeout} seconds each" )
[docs] def upload_dtl_archive( self, task_id: int, archive_path: str, progress_cb: Optional[Union[tqdm, Callable]] = None ): """upload_dtl_archive""" encoder = MultipartEncoder( { "id": str(task_id).encode("utf-8"), "name": get_file_name(archive_path), "archive": ( os.path.basename(archive_path), open(archive_path, "rb"), "application/x-tar", ), } ) def callback(monitor_instance): read_mb = monitor_instance.bytes_read / 1024.0 / 1024.0 if progress_cb is not None: progress_cb(read_mb) monitor = MultipartEncoderMonitor(encoder, callback) self._api.post("tasks.upload.dtl_archive", monitor)
def _deploy_model( self, agent_id, model_id, plugin_id=None, version=None, restart_policy=RestartPolicy.NEVER, settings=None, ): """_deploy_model""" response = self._api.post( "tasks.run.deploy", { ApiField.AGENT_ID: agent_id, ApiField.MODEL_ID: model_id, ApiField.RESTART_POLICY: restart_policy.value, ApiField.SETTINGS: settings or {"gpu_device": 0}, ApiField.PLUGIN_ID: plugin_id, ApiField.VERSION: version, }, ) return response.json()[ApiField.TASK_ID]
[docs] def get_context(self, id: int) -> Dict: """ Get context information by task ID. :param id: Task ID in Supervisely. :type id: int :return: Context information in dict format :rtype: :class:`dict` :Usage example: .. code-block:: python import supervisely as sly task_id = 121230 os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly' os.environ['API_TOKEN'] = 'Your Supervisely API Token' api = sly.Api.from_env() context = api.task.get_context(task_id) print(context) # Output: { # "team": { # "id": 16087, # "name": "alexxx" # }, # "workspace": { # "id": 23821, # "name": "my_super_workspace" # } # } """ response = self._api.post("GetTaskContext", {ApiField.ID: id}) return response.json()
def _convert_json_info(self, info: dict): """_convert_json_info""" return info
[docs] def run_dtl(self, workspace_id: int, dtl_graph: Dict, agent_id: Optional[int] = None): """run_dtl""" response = self._api.post( "tasks.run.dtl", { ApiField.WORKSPACE_ID: workspace_id, ApiField.CONFIG: dtl_graph, "advanced": {ApiField.AGENT_ID: agent_id}, }, ) return response.json()[ApiField.TASK_ID]
def _run_plugin_task( self, task_type, agent_id, plugin_id, version, config, input_projects, input_models, result_name, ): """_run_plugin_task""" response = self._api.post( "tasks.run.plugin", { "taskType": task_type, ApiField.AGENT_ID: agent_id, ApiField.PLUGIN_ID: plugin_id, ApiField.VERSION: version, ApiField.CONFIG: config, "projects": input_projects, "models": input_models, ApiField.NAME: result_name, }, ) return response.json()[ApiField.TASK_ID]
[docs] def run_train( self, agent_id: int, input_project_id: int, input_model_id: int, result_nn_name: str, train_config: Optional[Dict] = None, ): """run_train""" model_info = self._api.model.get_info_by_id(input_model_id) return self._run_plugin_task( task_type=TaskApi.PluginTaskType.TRAIN.value, agent_id=agent_id, plugin_id=model_info.plugin_id, version=None, input_projects=[input_project_id], input_models=[input_model_id], result_name=result_nn_name, config={} if train_config is None else train_config, )
[docs] def run_inference( self, agent_id: int, input_project_id: int, input_model_id: int, result_project_name: str, inference_config: Optional[Dict] = None, ): """run_inference""" model_info = self._api.model.get_info_by_id(input_model_id) return self._run_plugin_task( task_type=TaskApi.PluginTaskType.INFERENCE.value, agent_id=agent_id, plugin_id=model_info.plugin_id, version=None, input_projects=[input_project_id], input_models=[input_model_id], result_name=result_project_name, config={} if inference_config is None else inference_config, )
[docs] def get_training_metrics(self, task_id: int): """get_training_metrics""" response = self._get_response_by_id( id=task_id, method="tasks.train-metrics", id_field=ApiField.TASK_ID ) return response.json() if (response is not None) else None
[docs] def deploy_model(self, agent_id: int, model_id: int) -> int: """deploy_model""" task_ids = self._api.model.get_deploy_tasks(model_id) if len(task_ids) == 0: task_id = self._deploy_model(agent_id, model_id) else: task_id = task_ids[0] self.wait(task_id, self.Status.DEPLOYED) return task_id
[docs] def deploy_model_async(self, agent_id: int, model_id: int) -> int: """deploy_model_async""" task_ids = self._api.model.get_deploy_tasks(model_id) if len(task_ids) == 0: task_id = self._deploy_model(agent_id, model_id) else: task_id = task_ids[0] return task_id
[docs] def start( self, agent_id, app_id: Optional[int] = None, workspace_id: Optional[int] = None, description: Optional[str] = "application description", params: Dict[str, Any] = None, log_level: Optional[Literal["info", "debug", "warning", "error"]] = "info", users_ids: Optional[List[int]] = None, app_version: Optional[str] = "", is_branch: Optional[bool] = False, task_name: Optional[str] = "pythonSpawned", restart_policy: Optional[Literal["never", "on_error"]] = "never", proxy_keep_url: Optional[bool] = False, module_id: Optional[int] = None, redirect_requests: Optional[Dict[str, int]] = {}, ) -> Dict[str, Any]: """Starts the application task on the agent. :param agent_id: Agent ID. Can be obtained from TeamCluster page in UI. :type agent_id: int :param app_id: Deprecated. Use module_id instead. :type app_id: int, optional :param workspace_id: Workspace ID where the task will be created. :type workspace_id: int, optional :param description: Task description which will be shown in UI. :type description: str, optional :param params: Task parameters which will be passed to the application, check the code example below for more details. :type params: Dict[str, Any], optional :param log_level: Log level for the application. :type log_level: Literal["info", "debug", "warning", "error"], optional :param users_ids: List of user IDs for which will be created an instance of the application. For each user a separate task will be created. :type users_ids: List[int], optional :param app_version: Application version e.g. "v1.0.0" or branch name e.g. "dev". :type app_version: str, optional :param is_branch: If the application version is a branch name, set this parameter to True. :type is_branch: bool, optional :param task_name: Task name which will be shown in UI. :type task_name: str, optional :param restart_policy: when the task should be restarted: never or if error occurred. :type restart_policy: Literal["never", "on_error"], optional :param proxy_keep_url: For internal usage only. :type proxy_keep_url: bool, optional :param module_id: Module ID. Can be obtained from the apps page in UI. :type module_id: int, optional :param redirect_requests: For internal usage only in Develop and Debug mode. :type redirect_requests: Dict[str, int], optional :return: Task information in JSON format. :rtype: Dict[str, Any] :Usage example: .. code-block:: python import supervisely as sly app_slug = "supervisely-ecosystem/export-to-supervisely-format" module_id = api.app.get_ecosystem_module_id(app_slug) module_info = api.app.get_ecosystem_module_info(module_id) project_id = 12345 agent_id = 12345 workspace_id = 12345 params = module_info.get_arguments(images_project=project_id) session = api.app.start( agent_id=agent_id, module_id=module_id, workspace_id=workspace_id, task_name="Prepare download link", params=params, app_version="dninja", is_branch=True, ) """ if app_id is not None and module_id is not None: raise ValueError("Only one of the arguments (app_id or module_id) have to be defined") if app_id is None and module_id is None: raise ValueError("One of the arguments (app_id or module_id) have to be defined") data = { ApiField.AGENT_ID: agent_id, # "nodeId": agent_id, ApiField.WORKSPACE_ID: workspace_id, ApiField.DESCRIPTION: description, ApiField.PARAMS: take_with_default(params, {"state": {}}), ApiField.LOG_LEVEL: log_level, ApiField.USERS_IDS: take_with_default(users_ids, []), ApiField.APP_VERSION: app_version, ApiField.IS_BRANCH: is_branch, ApiField.TASK_NAME: task_name, ApiField.RESTART_POLICY: restart_policy, ApiField.PROXY_KEEP_URL: proxy_keep_url, } if len(redirect_requests) > 0: data[ApiField.REDIRECT_REQUESTS] = redirect_requests if app_id is not None: data[ApiField.APP_ID] = app_id if module_id is not None: data[ApiField.MODULE_ID] = module_id resp = self._api.post(method="tasks.run.app", data=data) task = resp.json()[0] if "id" not in task: task["id"] = task.get("taskId") return task
[docs] def stop(self, id: int): """stop""" response = self._api.post("tasks.stop", {ApiField.ID: id}) return self.Status(response.json()[ApiField.STATUS])
[docs] def get_import_files_list(self, id: int) -> Union[Dict, None]: """get_import_files_list""" response = self._api.post("tasks.import.files_list", {ApiField.ID: id}) return response.json() if (response is not None) else None
[docs] def download_import_file(self, id, file_path, save_path): """download_import_file""" response = self._api.post( "tasks.import.download_file", {ApiField.ID: id, ApiField.FILENAME: file_path}, stream=True, ) ensure_base_path(save_path) with open(save_path, "wb") as fd: for chunk in response.iter_content(chunk_size=1024 * 1024): fd.write(chunk)
[docs] def create_task_detached(self, workspace_id: int, task_type: Optional[str] = None): """create_task_detached""" response = self._api.post( "tasks.run.python", { ApiField.WORKSPACE_ID: workspace_id, ApiField.SCRIPT: "xxx", ApiField.ADVANCED: {ApiField.IGNORE_AGENT: True}, }, ) return response.json()[ApiField.TASK_ID]
[docs] def submit_logs(self, logs) -> None: """submit_logs""" response = self._api.post("tasks.logs.add", {ApiField.LOGS: logs})
# return response.json()[ApiField.TASK_ID]
[docs] def upload_files( self, task_id: int, abs_paths: List[str], names: List[str], progress_cb: Optional[Union[tqdm, Callable]] = None, ) -> None: """upload_files""" if len(abs_paths) != len(names): raise RuntimeError("Inconsistency: len(abs_paths) != len(names)") hashes = [] if len(abs_paths) == 0: return hash_to_items = defaultdict(list) hash_to_name = defaultdict(list) for idx, item in enumerate(zip(abs_paths, names)): path, name = item item_hash = get_file_hash(path) hashes.append(item_hash) hash_to_items[item_hash].append(path) hash_to_name[item_hash].append(name) unique_hashes = set(hashes) remote_hashes = self._api.image.check_existing_hashes(list(unique_hashes)) new_hashes = unique_hashes - set(remote_hashes) # @TODO: upload remote hashes if len(remote_hashes) != 0: files = [] for hash in remote_hashes: for name in hash_to_name[hash]: files.append({ApiField.NAME: name, ApiField.HASH: hash}) for batch in batched(files): resp = self._api.post( "tasks.files.bulk.add-by-hash", {ApiField.TASK_ID: task_id, ApiField.FILES: batch}, ) if progress_cb is not None: progress_cb(len(remote_hashes)) for batch in batched(list(zip(abs_paths, names, hashes))): content_dict = OrderedDict() for idx, item in enumerate(batch): path, name, hash = item if hash in remote_hashes: continue content_dict["{}".format(idx)] = json.dumps({"fullpath": name, "hash": hash}) content_dict["{}-file".format(idx)] = (name, open(path, "rb"), "") if len(content_dict) > 0: encoder = MultipartEncoder(fields=content_dict) resp = self._api.post("tasks.files.bulk.upload", encoder) if progress_cb is not None: progress_cb(len(content_dict))
# { # data: {my_val: 1} # obj: {val: 1, res: 2} # } # { # obj: {new_val: 1} # } # // apped: true, recursive: false # { # data: {my_val: 1} # obj: {new_val: 1} # }(edited) # // append: false, recursive: false # { # obj: {new_val: 1} # }(edited) # # 16: 32 # // append: true, recursive: true # { # data: {my_val: 1} # obj: {val: 1, res: 2, new_val: 1} # }
[docs] def set_fields(self, task_id: int, fields: List) -> Dict: """set_fields""" for idx, obj in enumerate(fields): for key in [ApiField.FIELD, ApiField.PAYLOAD]: if key not in obj: raise KeyError("Object #{} does not have field {!r}".format(idx, key)) data = {ApiField.TASK_ID: task_id, ApiField.FIELDS: fields} resp = self._api.post("tasks.data.set", data) return resp.json()
[docs] def set_fields_from_dict(self, task_id: int, d: Dict) -> Dict: """set_fields_from_dict""" fields = [] for k, v in d.items(): fields.append({ApiField.FIELD: k, ApiField.PAYLOAD: v}) return self.set_fields(task_id, fields)
[docs] def set_field( self, task_id: int, field: Dict, payload: Dict, append: Optional[bool] = False, recursive: Optional[bool] = False, ) -> Dict: """set_field""" fields = [ { ApiField.FIELD: field, ApiField.PAYLOAD: payload, ApiField.APPEND: append, ApiField.RECURSIVE: recursive, } ] return self.set_fields(task_id, fields)
[docs] def get_fields(self, task_id, fields: List): """get_fields""" data = {ApiField.TASK_ID: task_id, ApiField.FIELDS: fields} resp = self._api.post("tasks.data.get", data) return resp.json()["result"]
[docs] def get_field(self, task_id: int, field: Dict): """get_field""" result = self.get_fields(task_id, [field]) return result[field]
def _validate_checkpoints_support(self, task_id): """_validate_checkpoints_support""" # pylint: disable=too-few-format-args info = self.get_info_by_id(task_id) if info["type"] != str(TaskApi.PluginTaskType.TRAIN): raise RuntimeError( "Task (id={!r}) has type {!r}. " "Checkpoints are available only for tasks of type {!r}".format() )
[docs] def list_checkpoints(self, task_id: int): """list_checkpoints""" self._validate_checkpoints_support(task_id) resp = self._api.post("tasks.checkpoints.list", {ApiField.ID: task_id}) return resp.json()
[docs] def delete_unused_checkpoints(self, task_id: int) -> Dict: """delete_unused_checkpoints""" self._validate_checkpoints_support(task_id) resp = self._api.post("tasks.checkpoints.clear", {ApiField.ID: task_id}) return resp.json()
def _set_output(self): """_set_output""" pass
[docs] def set_output_project( self, task_id: int, project_id: int, project_name: Optional[str] = None ) -> Dict: """set_output_project""" if project_name is None: project = self._api.project.get_info_by_id(project_id, raise_error=True) project_name = project.name output = {ApiField.PROJECT: {ApiField.ID: project_id, ApiField.TITLE: project_name}} resp = self._api.post( "tasks.output.set", {ApiField.TASK_ID: task_id, ApiField.OUTPUT: output} ) return resp.json()
[docs] def set_output_report(self, task_id: int, file_id: int, file_name: str) -> Dict: """set_output_report""" return self._set_custom_output( task_id, file_id, file_name, description="Report", icon="zmdi zmdi-receipt" )
def _set_custom_output( self, task_id, file_id, file_name, file_url=None, description="File", icon="zmdi zmdi-file-text", color="#33c94c", background_color="#d9f7e4", download=False, ): """_set_custom_output""" if file_url is None: file_url = self._api.file.get_url(file_id) output = { ApiField.GENERAL: { "icon": { "className": icon, "color": color, "backgroundColor": background_color, }, "title": file_name, "titleUrl": file_url, "download": download, "description": description, } } resp = self._api.post( "tasks.output.set", {ApiField.TASK_ID: task_id, ApiField.OUTPUT: output} ) return resp.json()
[docs] def set_output_archive( self, task_id: int, file_id: int, file_name: str, file_url: Optional[str] = None ) -> Dict: """set_output_archive""" if file_url is None: file_url = self._api.file.get_info_by_id(file_id).storage_path return self._set_custom_output( task_id, file_id, file_name, file_url=file_url, description="Download archive", icon="zmdi zmdi-archive", download=True, )
[docs] def set_output_file_download( self, task_id: int, file_id: int, file_name: str, file_url: Optional[str] = None, download: Optional[bool] = True, ) -> Dict: """set_output_file_download""" if file_url is None: file_url = self._api.file.get_info_by_id(file_id).storage_path return self._set_custom_output( task_id, file_id, file_name, file_url=file_url, description="Download file", icon="zmdi zmdi-file", download=download, )
[docs] def send_request( self, task_id: int, method: str, data: Dict, context: Optional[Dict] = {}, skip_response: bool = False, timeout: Optional[int] = 60, outside_request: bool = True, retries: int = 10, raise_error: bool = False, ): """send_request""" if type(data) is not dict: raise TypeError("data argument has to be a dict") context["outside_request"] = outside_request resp = self._api.post( "tasks.request.direct", { ApiField.TASK_ID: task_id, ApiField.COMMAND: method, ApiField.CONTEXT: context, ApiField.STATE: data, "skipResponse": skip_response, "timeout": timeout, }, retries=retries, raise_error=raise_error, ) return resp.json()
[docs] def set_output_directory(self, task_id, file_id, directory_path): """set_output_directory""" return self._set_custom_output( task_id, file_id, directory_path, description="Directory", icon="zmdi zmdi-folder", )
[docs] def update_meta( self, id: int, data: dict, agent_storage_folder: str = None, relative_app_dir: str = None ): """ Update given task metadata :param id: int — task id :param data: dict — meta data to update """ if type(data) == dict: data.update({"id": id}) if agent_storage_folder is None and relative_app_dir is not None: raise ValueError( "Both arguments (agent_storage_folder and relative_app_dir) has to be defined or None" ) if agent_storage_folder is not None and relative_app_dir is None: raise ValueError( "Both arguments (agent_storage_folder and relative_app_dir) has to be defined or None" ) if agent_storage_folder is not None and relative_app_dir is not None: data["agentStorageFolder"] = { "hostDir": agent_storage_folder, "folder": relative_app_dir, } self._api.post("tasks.meta.update", data)
def _update_app_content(self, task_id: int, data_patch: List[Dict] = None, state: Dict = None): payload = {} if data_patch is not None and len(data_patch) > 0: payload[ApiField.DATA] = data_patch if state is not None and len(state) > 0: payload[ApiField.STATE] = state resp = self._api.post( "tasks.app-v2.data.set", {ApiField.TASK_ID: task_id, ApiField.PAYLOAD: payload}, ) return resp.json()
[docs] def set_output_error( self, task_id: int, title: str, description: Optional[str] = None, show_logs: Optional[bool] = True, ) -> Dict: """ Set custom error message to the task output. :param task_id: Application task ID. :type task_id: int :param title: Error message to be displayed in the task output. :type title: str :param description: Description to be displayed in the task output. :type description: Optional[str] :param show_logs: If True, the link to the task logs will be displayed in the task output. :type show_logs: Optional[bool], default True :return: Response JSON. :rtype: Dict :Usage example: .. code-block:: python import os from dotenv import load_dotenv import supervisely as sly # Load secrets and create API object from .env file (recommended) # Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication if sly.is_development(): load_dotenv(os.path.expanduser("~/supervisely.env")) api = sly.Api.from_env() task_id = 12345 title = "Something went wrong" description = "Please check the task logs" show_logs = True api.task.set_output_error(task_id, title, description, show_logs) """ output = { ApiField.GENERAL: { "icon": { "className": "zmdi zmdi-alert-octagon", "color": "#ff83a6", "backgroundColor": "#ffeae9", }, "title": title, "showLogs": show_logs, "isError": True, } } if description is not None: output[ApiField.GENERAL]["description"] = description resp = self._api.post( "tasks.output.set", {ApiField.TASK_ID: task_id, ApiField.OUTPUT: output}, ) return resp.json()