Source code for supervisely.api.module_api

# coding: utf-8
from collections import namedtuple
from copy import deepcopy
from typing import TYPE_CHECKING, List

import requests

from supervisely._utils import batched, camel_to_snake

if TYPE_CHECKING:
    from supervisely.api.api import Api


[docs]class ApiField: """ApiField""" ID = "id" """""" NAME = "name" """""" DESCRIPTION = "description" """""" CREATED_AT = "createdAt" """""" UPDATED_AT = "updatedAt" """""" ROLE = "role" """""" TEAM_ID = "teamId" """""" WORKSPACE_ID = "workspaceId" """""" CONFIG = "config" """""" SIZE = "size" """""" PLUGIN_ID = "pluginId" """""" PLUGIN_VERSION = "pluginVersion" """""" HASH = "hash" """""" STATUS = "status" """""" ONLY_TRAIN = "onlyTrain" """""" USER_ID = "userId" """""" README = "readme" """""" PROJECT_ID = "projectId" """""" IMAGES_COUNT = "imagesCount" """""" ITEMS_COUNT = "itemsCount" """""" DATASET_ID = "datasetId" """""" LINK = "link" """""" WIDTH = "width" """""" HEIGHT = "height" """""" WEIGHTS_LOCATION = "weightsLocation" """""" IMAGE_ID = "imageId" """""" IMAGE_NAME = "imageName" """""" ANNOTATION = "annotation" """""" TASK_ID = "taskId" """""" LABELS_COUNT = "labelsCount" """""" FILTER = "filter" """""" FILTERS = "filters" """""" META = "meta" """""" FILE_META = "fileMeta" """""" SHARED_LINK = "sharedLinkToken" """""" EXPLORE_PATH = "explorePath" """""" MIME = "mime" """""" EXT = "ext" """""" TYPE = "type" """""" DEFAULT_VERSION = "defaultVersion" """""" DOCKER_IMAGE = "dockerImage" """""" CONFIGS = "configs" """""" VERSIONS = "versions" """""" VERSION = "version" """""" TOKEN = "token" """""" CAPABILITIES = "capabilities" """""" STARTED_AT = "startedAt" """""" FINISHED_AT = "finishedAt" """""" AGENT_ID = "agentId" """""" MODEL_ID = "modelId" """""" RESTART_POLICY = "restartPolicy" """""" SETTINGS = "settings" """""" SORT = "sort" """""" SORT_ORDER = "sort_order" """""" IMAGES = "images" """""" IMAGE_IDS = "imageIds" """""" ANNOTATIONS = "annotations" """""" EMAIL = "email" """""" LOGIN = "login" """""" LOGINS = "logins" """""" DISABLED = "disabled" """""" LAST_LOGIN = "lastLogin" """""" PASSWORD = "password" """""" ROLE_ID = "roleId" """""" IS_RESTRICTED = "isRestricted" """""" DISABLE = "disable" """""" TEAMS = "teams" """""" USER_IDS = "userIds" """""" PROJECT_NAME = (["projectTitle"], "project_name") """""" DATASET_NAME = (["datasetTitle"], "dataset_name") """""" WORKSPACE_NAME = (["workspaceTitle"], "workspace_name") """""" CREATED_BY_ID = (["createdBy"], "created_by_id") """""" CREATED_BY_LOGIN = (["managerLogin"], "created_by_login") """""" ASSIGNED_TO_ID = (["userId"], "assigned_to_id") """""" ASSIGNED_TO_LOGIN = (["labelerLogin"], "assigned_to_login") """""" FINISHED_IMAGES_COUNT = "finishedImagesCount" """""" PROGRESS_IMAGES_COUNT = "progressImagesCount" """""" CLASSES_TO_LABEL = (["meta", "classes"], "classes_to_label") """""" TAGS_TO_LABEL = (["meta", "projectTags"], "tags_to_label") """""" IMAGES_RANGE = (["meta", "range"], "images_range") """""" REJECTED_IMAGES_COUNT = "rejectedImagesCount" """""" ACCEPTED_IMAGES_COUNT = "acceptedImagesCount" """""" OBJECTS_LIMIT_PER_IMAGE = (["meta", "imageFiguresLimit"], "objects_limit_per_image") """""" TAGS_LIMIT_PER_IMAGE = (["meta", "imageTagsLimit"], "tags_limit_per_image") """""" FILTER_IMAGES_BY_TAGS = (["meta", "imageTags"], "filter_images_by_tags") """""" INCLUDE_IMAGES_WITH_TAGS = ([], "include_images_with_tags") """""" EXCLUDE_IMAGES_WITH_TAGS = ([], "exclude_images_with_tags") """""" SIZEB = (["size"], "sizeb") """""" FRAMES = "frames" """""" FRAMES_COUNT = (["fileMeta", "framesCount"], "frames_count") """""" PATH_ORIGINAL = "pathOriginal" """""" OBJECTS_COUNT = "objectsCount" """""" FRAMES_TO_TIMECODES = (["fileMeta", "framesToTimecodes"], "frames_to_timecodes") """""" TAGS = "tags" """""" VIDEO_ID = "videoId" """""" FRAME_INDEX = (["meta", "frame"], "frame_index") """""" LABELING_TOOL = "tool" """""" GEOMETRY_TYPE = "geometryType" """""" GEOMETRY = "geometry" """""" GEOMETRY_META = "geometryMeta" """""" OBJECT_ID = "objectId" """""" FRAME = "frame" """""" # FIGURES_COUNT = (['labelsCount'], 'figures_count') """""" STREAMS = "streams" """""" VIDEO_IDS = "videoIds" """""" FRAME_WIDTH = (["fileMeta", "width"], "frame_width") """""" FRAME_HEIGHT = (["fileMeta", "height"], "frame_height") """""" VIDEO_NAME = "videoName" """""" FRAME_RANGE = "frameRange" """""" TRACK_ID = "trackId" """""" PROGRESS = "progress" """""" CURRENT = "current" """""" TOTAL = "total" """""" STOPPED = "stopped" """""" VIDEOS = "videos" """""" FILENAME = "filename" """""" SHAPE = "shape" """""" COLOR = "color" """""" CLASS_ID = "classId" """""" ENTITY_ID = "entityId" """""" ANNOTATION_OBJECTS = "annotationObjects" """""" TAG_ID = "tagId" """""" TAG_IDS = "tagIds" """""" ERROR = "error" """""" MESSAGE = "message" """""" CONTENT = "content" """""" FIGURES = "figures" """""" LAYOUT = "layout" """""" WIDGETS = "widgets" """""" CLOUD_MIME = (["fileMeta", "mime"], "cloud_mime") """""" PREVIEW = "preview" """""" FIGURES_COUNT = "figuresCount" """""" ANN_OBJECTS_COUNT = (["annotationObjectsCount"], "objects_count") """""" POINTCLOUD_ID = "pointCloudId" """""" POINTCLOUD_IDS = "pointCloudIds" """""" POINTCLOUDS = "pointClouds" """""" ADVANCED = "advanced" """""" IGNORE_AGENT = "ignoreAgent" """""" SCRIPT = "script" """""" LOGS = "logs" """""" FILES = "files" """""" HASHES = "hashes" """""" SUBTITLE = "subtitle" """""" COMMAND = "command" """""" DEFAULT_VALUE = "defaultValue" """""" TITLE = "title" """""" AREA = "area" """""" OPTIONS = "options" """""" REPORT_ID = "reportId" """""" WIDGET = "widget" """""" PAYLOAD = "payload" """""" FIELD = "field" """""" FIELDS = "fields" """""" APPEND = "append" """""" WITH_CUSTOM_DATA = "withCustomBigData" """""" PATH = "path" """""" SESSION_ID = "sessionId" """""" ACTION = "action" """""" FIGURE_ID = "figureId" """""" FIGURE_IDS = "figureIds" """""" VALUE = "value" """""" ZOOM_FACTOR = "zoomFactor" """""" FULL_STORAGE_URL = "fullStorageUrl" """""" REVIEWER_ID = "reviewerId" """""" REVIEWER_LOGIN = "reviewerLogin" """""" RECURSIVE = "recursive" """""" ECOSYSTEM_ITEM_ID = "moduleId" """""" APP_ID = "appId" """""" PROJECT = "project" """""" OUTPUT = "output" """""" REFERENCE_IMAGE_URL = "referenceImageUrl" """""" GENERAL = "general" """""" ENTITIES = "entities" """""" STORAGE_PATH = "storagePath" """""" EXT2 = (["meta", "ext"], "ext") """""" MIME2 = (["meta", "mime"], "mime") """""" SIZEB2 = (["meta", "size"], "sizeb") """""" JOB_ID = "jobId" """""" DATASETS_COUNT = "datasetsCount" """""" CUSTOM_DATA = "customData" """""" CONTEXT = "context" """""" STATE = "state" """""" IDS = "ids" """""" DATE = "date" """""" PARAMS = "params" """""" LOG_LEVEL = "logLevel" """""" APP_VERSION = "appVersion" """""" IS_BRANCH = "isBranch" """""" TASK_NAME = "taskName" """""" PROXY_KEEP_URL = "proxyKeepUrl" """""" USERS_IDS = "usersIds" """""" MODULE_ID = "moduleId" """""" USER_LOGIN = "userLogin" """""" SLUG = "slug" """""" IS_SHARED = "isShared" """""" TASKS = "tasks" """""" REPO = "repo" """""" VOLUMES = "volumes" """""" PROCESSING_PATH = "processingPath" """""" VOLUME_ID = "volumeId" """""" VOLUME_SLICES = "volumeSlices" """""" VOLUME_IDS = "volumeIds" """""" VOLUME_NAME = "volumeName" """""" SIZEB3 = (["fileMeta", "size"], "sizeb") """""" INCLUDE = "include" """""" CLASSES = "classes" """""" PROJECT_TAGS = "projectTags" """""" DATASETS = "datasets" """""" IMAGES_TAGS = "imagesTags" """""" ANNOTATION_OBJECTS_TAGS = "annotationObjectsTags" """""" FIGURES_TAGS = "figuresTags" """""" REDIRECT_REQUESTS = "redirectRequests" """""" PROCESSING_PATH = "processingPath" """""" FORCE_METADATA_FOR_LINKS = "forceMetadataForLinks" """""" SKIP_VALIDATION = "skipValidation" """""" PAGINATION_MODE = "pagination_mode" """""" PER_PAGE = "per_page" """""" SKIP_BOUNDS_VALIDATION = "skipBoundsValidation" """""" PATHS = "paths" """""" PROJECTS = "projects" """""" URL = "url" """""" ANN_URL = "annotationsUrl" """""" ARCHIVE_URL = "archiveUrl" """""" ANN_ARCHIVE_URL = "annotationsArchiveUrl" """""" BACKUP_ARCHIVE = "backupArchive" """""" SKIP_EXPORTED = "skipExported" """""" FROM = "from" """""" TO = "to" """""" DATA = "data" """""" DURATION = "duration" """""" RAW_VIDEO_META = "rawVideoMeta" """""" IS_DIR = "isDir" """""" FIGURE_CLASS_ID = "figureClassId" """""" FIGURE_CLASS_TITLE = "figureClassTitle" """""" TOOL_CLASS_ID = "toolClassId" """""" TOOL_STATE = "toolState" """""" OPTION = "option" """""" DECOMPRESS_BITMAP = "decompressBitmap" """""" FIGURE_STATE = "figureState" """""" BITMAP = "bitmap" """""" ACCOUNT_TYPE = "plan" """""" SILENT = "silent" """""" PAGE = "page" """""" PRESERVE_PROJECT_CARD = "preserveProjectCard" """""" GPU_INFO = "gpuInfo" """""" IS_PUBLIC = "isPublic" """""" USAGE = "usage" """""" PARENT_ID = "parentId" """""" SRC_ID = "srcId" """""" DEST_ID = "destId" """""" COMPLETE_EXISTING = "completeExisting" """""" MODE = "mode" """""" FOLDERS = "folders" """""" WITH_METADATA = "withMetadata" """""" LIMIT = "limit" """""" IMPORT_SETTINGS = "importSettings" """"""
def _get_single_item(items): """_get_single_item""" if len(items) == 0: return None if len(items) > 1: raise RuntimeError("There are several items with the same name") return items[0] class _JsonConvertibleModule: """_JsonConvertibleModule""" def _convert_json_info(self, info: dict, skip_missing=False): """_convert_json_info""" raise NotImplementedError()
[docs]class ModuleApiBase(_JsonConvertibleModule): """ModuleApiBase""" MAX_WAIT_ATTEMPTS = 999 """ Maximum number of attempts that will be made to wait for a certain condition to be met.""" WAIT_ATTEMPT_TIMEOUT_SEC = 1 """Number of seconds for intervals between attempts."""
[docs] @staticmethod def info_sequence(): """Get list of all class field names.""" raise NotImplementedError()
[docs] @staticmethod def info_tuple_name(): """Get string name of NamedTuple.""" raise NotImplementedError()
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) try: field_names = [] for name in cls.info_sequence(): if type(name) is str: field_names.append(camel_to_snake(name)) elif type(name) is tuple and type(name[1]) is str: field_names.append(name[1]) else: raise RuntimeError("Can not parse field {!r}".format(name)) cls.InfoType = namedtuple(cls.info_tuple_name(), field_names) except NotImplementedError: pass def __init__(self, api: "Api"): self._api = api def _add_sort_param(self, data): """_add_sort_param""" results = deepcopy(data) results[ApiField.SORT] = ApiField.ID results[ApiField.SORT_ORDER] = "asc" # @TODO: move to enum return results
[docs] def get_list_all_pages( self, method, data, progress_cb=None, convert_json_info_cb=None, limit: int = None, return_first_response: bool = False, ): """ Get list of all or limited quantity entities from the Supervisely server. :param method: Request method name :type method: str :param data: Dictionary with request body info :type data: dict :param progress_cb: Function for tracking download progress. :type progress_cb: Progress, optional :param convert_json_info_cb: Function for convert json info :type convert_json_info_cb: Callable, optional :param limit: Number of entity to retrieve :type limit: int, optional :param return_first_response: Specify if return first response :type return_first_response: bool, optional """ if convert_json_info_cb is None: convert_func = self._convert_json_info else: convert_func = convert_json_info_cb if ApiField.SORT not in data: data = self._add_sort_param(data) first_response = self._api.post(method, data).json() total = first_response["total"] per_page = first_response["perPage"] pages_count = first_response["pagesCount"] limit_exceeded = False results = first_response["entities"] if limit is not None and len(results) > limit: limit_exceeded = True if progress_cb is not None: progress_cb(len(results)) if (pages_count == 1 and len(results) == total) or limit_exceeded is True: pass else: for page_idx in range(2, pages_count + 1): temp_resp = self._api.post(method, {**data, "page": page_idx, "per_page": per_page}) temp_items = temp_resp.json()["entities"] results.extend(temp_items) if progress_cb is not None: progress_cb(len(temp_items)) if limit is not None and len(results) > limit: limit_exceeded = True break if len(results) != total and limit is None: raise RuntimeError( "Method {!r}: error during pagination, some items are missed".format(method) ) if limit is not None: results = results[:limit] if return_first_response: return [convert_func(item) for item in results], first_response return [convert_func(item) for item in results]
[docs] def get_list_all_pages_generator( self, method, data, progress_cb=None, convert_json_info_cb=None, limit: int = None, return_first_response: bool = False, ): """ This generator function retrieves a list of all or a limited quantity of entities from the Supervisely server, yielding batches of entities as they are retrieved :param method: Request method name :type method: str :param data: Dictionary with request body info :type data: dict :param progress_cb: Function for tracking download progress. :type progress_cb: Progress, optional :param convert_json_info_cb: Function for convert json info :type convert_json_info_cb: Callable, optional :param limit: Number of entity to retrieve :type limit: int, optional :param return_first_response: Specify if return first response :type return_first_response: bool, optional """ if convert_json_info_cb is None: convert_func = self._convert_json_info else: convert_func = convert_json_info_cb if ApiField.SORT not in data: data = self._add_sort_param(data) first_response = self._api.post(method, data).json() total = first_response["total"] # per_page = first_response["perPage"] after = first_response["after"] # pages_count = first_response["pagesCount"] limit_exceeded = False results = first_response["entities"] processed = len(results) yield [convert_func(item) for item in results] if limit is not None and len(results) > limit: limit_exceeded = True if progress_cb is not None: progress_cb(len(results)) if len(first_response["entities"]) == total or limit_exceeded is True: pass else: while after is not None: temp_resp = self._api.post(method, {**data, "after": after}).json() after = temp_resp.get("after") results = temp_resp["entities"] # results.extend(temp_items) if progress_cb is not None: progress_cb(len(results)) processed += len(results) yield [convert_func(item) for item in results] if limit is not None and processed > limit: limit_exceeded = True break if processed != total and limit is None: raise RuntimeError( "Method {!r}: error during pagination, some items are missed".format(method) )
@staticmethod def _get_info_by_name(get_info_by_filters_fn, name): """_get_info_by_name""" filters = [{"field": ApiField.NAME, "operator": "=", "value": name}] return get_info_by_filters_fn(filters)
[docs] def get_info_by_id(self, id): """ Get information about an entity by its ID from the Supervisely server. :param id: ID of the entity. :type id: int """ raise NotImplementedError()
@staticmethod def _get_free_name(exist_check_fn, name): """_get_free_name""" res_title = name suffix = 1 while exist_check_fn(res_title): res_title = "{}_{:03d}".format(name, suffix) suffix += 1 return res_title def _convert_json_info(self, info: dict, skip_missing=False): """_convert_json_info""" def _get_value(dict, field_name, skip_missing): if skip_missing is True: return dict.get(field_name, None) else: return dict[field_name] if info is None: return None else: field_values = [] for field_name in self.info_sequence(): if type(field_name) is str: field_values.append(_get_value(info, field_name, skip_missing)) elif type(field_name) is tuple: value = None for sub_name in field_name[0]: if value is None: value = _get_value(info, sub_name, skip_missing) else: value = _get_value(value, sub_name, skip_missing) field_values.append(value) else: raise RuntimeError("Can not parse field {!r}".format(field_name)) return self.InfoType(*field_values) def _get_response_by_id(self, id, method, id_field, fields=None): """_get_response_by_id""" try: data = {id_field: id} if fields is not None: data.update(fields) return self._api.post(method, data) except requests.exceptions.HTTPError as error: if error.response.status_code == 404: return None else: raise error def _get_info_by_id(self, id, method, fields=None): """_get_info_by_id""" response = self._get_response_by_id(id, method, id_field=ApiField.ID, fields=fields) return self._convert_json_info(response.json()) if (response is not None) else None
[docs]class ModuleApi(ModuleApiBase): """Base class for entities that have a parent object in the system.""" MAX_WAIT_ATTEMPTS = ModuleApiBase.MAX_WAIT_ATTEMPTS """Maximum number of attempts that will be made to wait for a certain condition to be met.""" WAIT_ATTEMPT_TIMEOUT_SEC = ModuleApiBase.WAIT_ATTEMPT_TIMEOUT_SEC """Number of seconds for intervals between attempts.""" def __init__(self, api): super().__init__(api) self._api = api
[docs] def get_info_by_name(self, parent_id: int, name: str, fields: List[str] = []): """ Get information about an entity by its name from the Supervisely server. :param parent_id: ID of the parent entity. :type parent_id: int :param name: Name of the entity for which the information is being retrieved. :type name: str :param fields: The list of api fields which will be returned with the response. :type fields: List[str] :Usage example: .. code-block:: python import supervisely as sly # You can connect to API directly address = 'https://app.supervise.ly/' token = 'Your Supervisely API Token' api = sly.Api(address, token) # Or you can use API from environment os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly' os.environ['API_TOKEN'] = 'Your Supervisely API Token' api = sly.Api.from_env() dataset_id = 55832 name = "IMG_0315.jpeg" info = api.image.get_info_by_name(dataset_id, name) print(info) # Output: ImageInfo(id=19369643, name='IMG_0315.jpeg', ...) """ return self._get_info_by_name( get_info_by_filters_fn=lambda module_name: self._get_info_by_filters( parent_id, module_name, fields ), name=name, )
def _get_info_by_filters(self, parent_id, filters, fields=[]): """_get_info_by_filters""" # pylint: disable=too-many-function-args from supervisely.api.project_api import ProjectApi if type(self) == ProjectApi: items = self.get_list(parent_id, filters, fields) else: items = self.get_list(parent_id, filters) return _get_single_item(items)
[docs] def get_list(self, parent_id, filters=None): """ Get list of entities in parent entity with given parent ID. :param parent_id: parent ID in Supervisely. :type parent_id: int :param filters: List of parameters to sort output entities. :type filters: List[Dict[str, str]], optional :Usage example: .. code-block:: python import supervisely as sly # You can connect to API directly address = 'https://app.supervise.ly/' token = 'Your Supervisely API Token' api = sly.Api(address, token) # Or you can use API from environment os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly' os.environ['API_TOKEN'] = 'Your Supervisely API Token' api = sly.Api.from_env() dataset_id = 55832 images = api.image.get_list(dataset_id) print(images) # Output: [ ImageInfo(id=19369642, ...) ImageInfo(id=19369643, ...) ImageInfo(id=19369644, ...) ] """ raise NotImplementedError()
[docs] def exists(self, parent_id, name): """ Checks if an entity with the given parent_id and name exists :param parent_id: ID of the parent entity. :type parent_id: int :param name: Name of the entity. :type name: str :return: Returns True if entity exists, and False if not :rtype: bool :Usage example: .. code-block:: python import supervisely as sly # You can connect to API directly address = 'https://app.supervise.ly/' token = 'Your Supervisely API Token' api = sly.Api(address, token) # Or you can use API from environment os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly' os.environ['API_TOKEN'] = 'Your Supervisely API Token' api = sly.Api.from_env() name = "IMG_0315.jpeg" dataset_id = 55832 exists = api.image.exists(dataset_id, name) print(exists) # True """ return self.get_info_by_name(parent_id, name) is not None
[docs] def get_free_name(self, parent_id, name): """ Generates a free name for an entity with the given parent_id and name. Adds an increasing suffix to original name until a unique name is found. :param parent_id: ID of the parent entity. :type parent_id: int :param name: Name of the entity. :type name: str :return: Returns free name. :rtype: str :Usage example: .. code-block:: python import supervisely as sly # You can connect to API directly address = 'https://app.supervise.ly/' token = 'Your Supervisely API Token' api = sly.Api(address, token) # Or you can use API from environment os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly' os.environ['API_TOKEN'] = 'Your Supervisely API Token' api = sly.Api.from_env() name = "IMG_0315.jpeg" dataset_id = 55832 free_name = api.image.get_free_name(dataset_id, name) print(free_name) # IMG_0315_001.jpeg """ return self._get_free_name( exist_check_fn=lambda module_name: self.exists(parent_id, module_name), name=name, )
def _get_effective_new_name(self, parent_id, name, change_name_if_conflict=False): """_get_effective_new_name""" return self.get_free_name(parent_id, name) if change_name_if_conflict else name
# Base class for entities that do not have a parent object in the system. class ModuleNoParent(ModuleApiBase): """ModuleNoParent""" def get_info_by_name(self, name): """get_info_by_name""" return self._get_info_by_name(get_info_by_filters_fn=self._get_info_by_filters, name=name) def _get_info_by_filters(self, filters): """_get_info_by_filters""" items = self.get_list(filters) return _get_single_item(items) def get_list(self, filters=None): """get_list""" raise NotImplementedError() def exists(self, name): """exists""" return self.get_info_by_name(name) is not None def get_free_name(self, name): """get_free_name""" return self._get_free_name( exist_check_fn=lambda module_name: self.exists(module_name), name=name ) def _get_effective_new_name(self, name, change_name_if_conflict=False): """""" return self.get_free_name(name) if change_name_if_conflict else name
[docs]class CloneableModuleApi(ModuleApi): """CloneableModuleApi""" MAX_WAIT_ATTEMPTS = ModuleApiBase.MAX_WAIT_ATTEMPTS """Maximum number of attempts that will be made to wait for a certain condition to be met.""" WAIT_ATTEMPT_TIMEOUT_SEC = ModuleApiBase.WAIT_ATTEMPT_TIMEOUT_SEC """Number of seconds for intervals between attempts.""" def _clone_api_method_name(self): """_clone_api_method_name""" raise NotImplementedError() def _clone(self, clone_type: dict, dst_workspace_id: int, dst_name: str): """_clone""" response = self._api.post( self._clone_api_method_name(), { **clone_type, ApiField.WORKSPACE_ID: dst_workspace_id, ApiField.NAME: dst_name, }, ) return response.json()[ApiField.TASK_ID]
[docs] def clone(self, id: int, dst_workspace_id: int, dst_name: str) -> int: """Clones the entity with the given ID to the given workspace with the given name. Returns the ID of the task that is created to perform the cloning operation. :param id: ID of the entity to clone. :type id: int :param dst_workspace_id: ID of the workspace to clone to. :type dst_workspace_id: int :param dst_name: Name of the cloned entity. :type dst_name: str :return: Returns the ID of the task that is created to perform the cloning operation. :rtype: int :Usage example: .. code-block:: python import os from dotenv import load_dotenv import supervisely as sly os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly' os.environ['API_TOKEN'] = 'Your Supervisely API Token' # Load secrets and create API object from .env file (recommended) # Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication load_dotenv(os.path.expanduser("~/supervisely.env")) api = sly.Api.from_env() # ID of the entity to clone project_id = 123456 # ID of the destination workspace workspace_id = 123456 # Create a task to clone the project task_id = api.project.clone(project_id, workspace_id, "my_cloned_project") # Wait until the task is finished api.task.wait(task_id, api.task.Status.FINISHED) task_info = api.task.get_info_by_id(task_id) dst_project_id = task_info["meta"]["output"]["project"]["id"] print(f"Cloned project ID: {dst_project_id}") """ return self._clone({ApiField.ID: id}, dst_workspace_id, dst_name)
[docs] def clone_from_explore(self, explore_path: str, dst_workspace_id: int, dst_name: str) -> int: """Clones the entity with the given explore path to the given workspace with the given name. Returns the ID of the task that is created to perform the cloning operation. :param explore_path: Explore path of the entity to clone. :type explore_path: str :param dst_workspace_id: ID of the workspace to clone to. :type dst_workspace_id: int :param dst_name: Name of the cloned entity. :type dst_name: str :return: Returns the ID of the task that is created to perform the cloning operation. :rtype: int """ return self._clone({ApiField.EXPLORE_PATH: explore_path}, dst_workspace_id, dst_name)
[docs] def get_or_clone_from_explore(self, explore_path, dst_workspace_id, dst_name): """get_or_clone_from_explore""" if not self.exists(dst_workspace_id, dst_name): task_id = self.clone_from_explore(explore_path, dst_workspace_id, dst_name) self._api.task.wait(task_id, self._api.task.Status.FINISHED) item = self.get_info_by_name(dst_workspace_id, dst_name) return item
[docs]class ModuleWithStatus: """ModuleWithStatus"""
[docs] def get_status(self, id): """get_status""" raise NotImplementedError()
[docs] def raise_for_status(self, status): """raise_for_status""" raise NotImplementedError()
class WaitingTimeExceeded(Exception): """WaitingTimeExceeded""" pass
[docs]class UpdateableModule(_JsonConvertibleModule): """UpdateableModule""" def __init__(self, api): self._api = api def _get_update_method(self): """_get_update_method""" raise NotImplementedError()
[docs] def update(self, id, name=None, description=None): """update""" if name is None and description is None: raise ValueError("'name' or 'description' or both have to be specified") body = {ApiField.ID: id} if name is not None: body[ApiField.NAME] = name if description is not None: body[ApiField.DESCRIPTION] = description response = self._api.post(self._get_update_method(), body) return self._convert_json_info(response.json())
[docs]class RemoveableModuleApi(ModuleApi): """RemoveableModuleApi""" MAX_WAIT_ATTEMPTS = ModuleApiBase.MAX_WAIT_ATTEMPTS """Maximum number of attempts that will be made to wait for a certain condition to be met.""" WAIT_ATTEMPT_TIMEOUT_SEC = ModuleApiBase.WAIT_ATTEMPT_TIMEOUT_SEC """Number of seconds for intervals between attempts.""" def _remove_api_method_name(self): """_remove_api_method_name""" raise NotImplementedError()
[docs] def remove(self, id): """ Remove an entity with the specified ID from the Supervisely server. :param id: Entity ID in Supervisely :type id: int """ self._api.post(self._remove_api_method_name(), {ApiField.ID: id})
[docs] def remove_batch(self, ids, progress_cb=None): """ Remove entities with given IDs from the Supervisely server. :param ids: IDs of entities in Supervisely. :type ids: List[int] :param progress_cb: Function for control remove progress. :type progress_cb: Callable """ for id in ids: self.remove(id) if progress_cb is not None: progress_cb(1)
[docs]class RemoveableBulkModuleApi(ModuleApi): """RemoveableBulkModuleApi""" MAX_WAIT_ATTEMPTS = ModuleApiBase.MAX_WAIT_ATTEMPTS """Maximum number of attempts that will be made to wait for a certain condition to be met.""" WAIT_ATTEMPT_TIMEOUT_SEC = ModuleApiBase.WAIT_ATTEMPT_TIMEOUT_SEC """Number of seconds for intervals between attempts.""" def _remove_batch_api_method_name(self): """_remove_batch_api_method_name""" raise NotImplementedError() def _remove_batch_field_name(self): """_remove_batch_field_name""" raise NotImplementedError()
[docs] def remove_batch(self, ids, progress_cb=None, batch_size=50): """ Remove entities in batches from the Supervisely server. All entity IDs must belong to the same nesting (for example team, or workspace, or project, or dataset). Therefore, it is necessary to sort IDs before calling this method. :param ids: IDs of entities in Supervisely. :type ids: List[int] :param progress_cb: Function for control remove progress. :type progress_cb: Callable :Usage example: .. code-block:: python import supervisely as sly # You can connect to API directly address = 'https://app.supervise.ly/' token = 'Your Supervisely API Token' api = sly.Api(address, token) # Or you can use API from environment os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly' os.environ['API_TOKEN'] = 'Your Supervisely API Token' api = sly.Api.from_env() image_ids = [19369645, 19369646, 19369647] api.image.remove_batch(image_ids) """ for ids_batch in batched(ids, batch_size=batch_size): self._api.post( self._remove_batch_api_method_name(), {self._remove_batch_field_name(): ids_batch}, ) if progress_cb is not None: progress_cb(len(ids_batch))
[docs] def remove(self, id): """ Remove an entity with the specified ID from the Supervisely server. :param id: Entity ID in Supervisely. :type id: int :Usage example: .. code-block:: python import supervisely as sly # You can connect to API directly address = 'https://app.supervise.ly/' token = 'Your Supervisely API Token' api = sly.Api(address, token) # Or you can use API from environment os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly' os.environ['API_TOKEN'] = 'Your Supervisely API Token' api = sly.Api.from_env() image_id = 19369643 api.image.remove(image_id) """ self.remove_batch([id])