# coding: utf-8
# docs
from __future__ import annotations
import asyncio
import json
import re
from collections import defaultdict
from typing import (
Any,
AsyncGenerator,
Callable,
Dict,
Generator,
List,
NamedTuple,
Optional,
Tuple,
Union,
)
import numpy as np
from requests_toolbelt import MultipartDecoder, MultipartEncoder
from tqdm import tqdm
from supervisely._utils import batched, logger, run_coroutine
from supervisely.annotation.label import LabelingStatus
from supervisely.api.module_api import ApiField, ModuleApi, RemoveableBulkModuleApi
from supervisely.geometry.rectangle import Rectangle
from supervisely.video_annotation.key_id_map import KeyIdMap
class FigureApi(RemoveableBulkModuleApi):
"""
Figure object for :class:`~supervisely.video_annotation.video_annotation.VideoAnnotation`.
"""
def _remove_batch_api_method_name(self):
"""_remove_batch_api_method_name"""
return "figures.bulk.remove"
def _remove_batch_field_name(self):
"""_remove_batch_field_name"""
return ApiField.FIGURE_IDS
@staticmethod
def info_sequence():
"""
NamedTuple FigureInfo information about Figure.
:Example:
.. code-block:: python
FigureInfo(id=588801373,
updated_at='2020-12-22T06:37:13.183Z',
created_at='2020-12-22T06:37:13.183Z',
entity_id=186648101,
object_id=112482,
project_id=110366,
dataset_id=419886,
frame_index=0,
geometry_type='bitmap',
geometry={'bitmap': {'data': 'eJwdlns8...Cgj4=', 'origin': [335, 205]}})
"""
return [
ApiField.ID,
ApiField.CLASS_ID,
ApiField.UPDATED_AT,
ApiField.CREATED_AT,
ApiField.ENTITY_ID,
ApiField.OBJECT_ID,
ApiField.PROJECT_ID,
ApiField.DATASET_ID,
ApiField.FRAME_INDEX,
ApiField.GEOMETRY_TYPE,
ApiField.GEOMETRY,
ApiField.GEOMETRY_META,
ApiField.TAGS,
ApiField.META,
ApiField.AREA,
ApiField.PRIORITY,
ApiField.CUSTOM_DATA,
]
@staticmethod
def info_tuple_name():
"""
Get string name of NamedTuple for class.
:returns: NamedTuple name.
:rtype: str
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
tuple_name = api.video.figure.info_tuple_name()
print(tuple_name) # FigureInfo
"""
return "FigureInfo"
def get_info_by_id(self, id: int) -> FigureInfo:
"""
Get Figure information by ID.
:param id: Figure ID in Supervisely.
:type id: int
:returns: Information about Figure.
:rtype: :class:`~supervisely.api.entity_annotation.figure_api.FigureInfo`
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
figure_id = 588801373
figure_info = api.video.figure.get_info_by_id(figure_id)
print(figure_info)
# Output: [
# 588801373,
# "2020-12-22T06:37:13.183Z",
# "2020-12-22T06:37:13.183Z",
# 186648101,
# 112482,
# 110366,
# 419886,
# 0,
# "bitmap",
# {
# "bitmap": {
# "data": "eJw...Cgj4=",
# "origin": [
# 335,
# 205
# ]
# }
# }
# ]
"""
fields = [
"id",
"createdAt",
"updatedAt",
"imageId",
"objectId",
"classId",
"projectId",
"datasetId",
"geometry",
"geometryType",
"geometryMeta",
"tags",
"meta",
"area",
"priority",
"nnCreated",
"nnUpdated",
]
return self._get_info_by_id(id, "figures.info", {ApiField.FIELDS: fields})
def create(
self,
entity_id: int,
object_id: int,
meta: Dict,
geometry_json: Dict,
geometry_type: str,
track_id: Optional[int] = None,
custom_data: Optional[dict] = None,
status: Optional[LabelingStatus] = None,
) -> int:
""""""
input_figure = {
ApiField.META: meta,
ApiField.OBJECT_ID: object_id,
ApiField.GEOMETRY_TYPE: geometry_type,
ApiField.GEOMETRY: geometry_json,
}
if status is None:
status = LabelingStatus.MANUAL
nn_created, nn_updated = LabelingStatus.to_flags(status)
input_figure[ApiField.NN_CREATED] = nn_created
input_figure[ApiField.NN_UPDATED] = nn_updated
if track_id is not None:
input_figure[ApiField.TRACK_ID] = track_id
if custom_data is not None:
input_figure[ApiField.CUSTOM_DATA] = custom_data
body = {ApiField.ENTITY_ID: entity_id, ApiField.FIGURES: [input_figure]}
response = self._api.post("figures.bulk.add", body)
return response.json()[0][ApiField.ID]
def get_by_ids(self, dataset_id: int, ids: List[int]) -> List[FigureInfo]:
"""
Get Figures information by IDs from given dataset ID.
:param dataset_id: Dataset ID in Supervisely.
:type dataset_id: int
:param ids: List of Figures IDs.
:type ids: List[int]
:returns: List of information about Figures.
:rtype: List[:class:`~supervisely.api.entity_annotation.figure_api.FigureInfo`]
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
dataset_id = 466642
figures_ids = [642155547, 642155548, 642155549]
figures_infos = api.video.figure.get_by_ids(dataset_id, figures_ids)
print(figures_infos)
# Output: [
# [
# 642155547,
# "2021-03-23T13:25:34.705Z",
# "2021-03-23T13:25:34.705Z",
# 198703211,
# 152118,
# 124976,
# 466642,
# 0,
# "rectangle",
# {
# "points": {
# "exterior": [
# [
# 2240,
# 1041
# ],
# [
# 2463,
# 1187
# ]
# ],
# "interior": []
# }
# }
# ],
# [
# 642155548,
# "2021-03-23T13:25:34.705Z",
# "2021-03-23T13:25:34.705Z",
# 198703211,
# 152118,
# 124976,
# 466642,
# 1,
# "rectangle",
# {
# "points": {
# "exterior": [
# [
# 2248,
# 1048
# ],
# [
# 2455,
# 1176
# ]
# ],
# "interior": []
# }
# }
# ],
# [
# 642155549,
# "2021-03-23T13:25:34.705Z",
# "2021-03-23T13:25:34.705Z",
# 198703211,
# 152118,
# 124976,
# 466642,
# 2,
# "rectangle",
# {
# "points": {
# "exterior": [
# [
# 2237,
# 1046
# ],
# [
# 2464,
# 1179
# ]
# ],
# "interior": []
# }
# }
# ]
# ]
"""
filters = [{"field": "id", "operator": "in", "value": ids}]
fields = [
ApiField.ID,
ApiField.CREATED_AT,
ApiField.UPDATED_AT,
ApiField.IMAGE_ID,
ApiField.OBJECT_ID,
ApiField.CLASS_ID,
ApiField.PROJECT_ID,
ApiField.DATASET_ID,
ApiField.GEOMETRY,
ApiField.GEOMETRY_TYPE,
ApiField.GEOMETRY_META,
ApiField.TAGS,
ApiField.META,
ApiField.AREA,
ApiField.PRIORITY,
ApiField.CUSTOM_DATA,
ApiField.NN_CREATED,
ApiField.NN_UPDATED,
]
figures_infos = self.get_list_all_pages(
"figures.list",
{
ApiField.DATASET_ID: dataset_id,
ApiField.FILTER: filters,
ApiField.FIELDS: fields,
},
)
if len(ids) != len(figures_infos):
ids_downloaded = [info.id for info in figures_infos]
raise RuntimeError(
"Ids don't exist on server: {}".format(set(ids_downloaded) - set(ids))
)
id_to_item = {info.id: info for info in figures_infos}
figures = []
for input_id in ids:
figures.append(id_to_item[input_id])
return figures
def _append_bulk(
self,
entity_id,
figures_json,
figures_keys,
key_id_map: KeyIdMap,
field_name=ApiField.ENTITY_ID,
):
""""""
if len(figures_json) == 0:
return
for batch_keys, batch_jsons in zip(
batched(figures_keys, batch_size=100), batched(figures_json, batch_size=100)
):
resp = self._api.post(
"figures.bulk.add",
{field_name: entity_id, ApiField.FIGURES: batch_jsons},
)
for key, resp_obj in zip(batch_keys, resp.json()):
figure_id = resp_obj[ApiField.ID]
key_id_map.add_figure(key, figure_id)
def create_bulk(
self,
figures_json: List[dict],
entity_id: int = None,
dataset_id: int = None,
batch_size=200,
) -> List[int]:
"""
Create figures in Supervisely in bulk.
To optimize creation of a large number of figures use dataset ID instead of entity ID.
In this case figure jsons list can contain figures from different entities for the same dataset.
Every figure json must contain corresponding entity ID.
*NOTE*: Geometries for AlphaMask must be uploaded separately via `upload_geometries_batch` method.
:param figures_json: List of figures in Supervisely JSON format.
:type figures_json: List[dict]
:param entity_id: Entity ID.
:type entity_id: int
:param dataset_id: Dataset ID.
:type dataset_id: int
:returns: List of figure IDs.
"""
figure_ids = []
if len(figures_json) == 0:
return figure_ids
if entity_id is None and dataset_id is None:
raise ValueError("Either entity_id or dataset_id must be provided")
if dataset_id is not None:
body = {ApiField.DATASET_ID: dataset_id}
else:
body = {ApiField.ENTITY_ID: entity_id}
for batch_jsons in batched(figures_json, batch_size):
body[ApiField.FIGURES] = batch_jsons
response = self._api.post("figures.bulk.add", body)
for resp_obj in response.json():
figure_ids.append(resp_obj[ApiField.ID])
return figure_ids
def download(
self,
dataset_id: int,
image_ids: List[int] = None,
skip_geometry: bool = False,
filters: Optional[List[Dict[str, Any]]] = None,
) -> Dict[int, List[FigureInfo]]:
"""
Method returns a dictionary with pairs of image ID and list of FigureInfo for the given dataset ID. Can be filtered by image IDs.
:param dataset_id: Dataset ID in Supervisely.
:type dataset_id: int
:param image_ids: Specify the list of image IDs within the given dataset ID. If image_ids is None, the method returns all possible pairs of images with figures. Note: Consider using `sly.batched()` to ensure that no figures are lost in the response.
:type image_ids: List[int], optional
:param skip_geometry: Skip the download of figure geometry. May be useful for a significant api request speed increase in the large datasets.
:type skip_geometry: bool
:param filters: Filters for the figures. See https://api.docs.supervisely.com/#tag/Figures/paths/~1figures.list/get for more details.
:type filters: List[Dict[str, Any]], optional
:returns: A dictionary where keys are image IDs and values are lists of figures.
:rtype: Dict[int, List[:class:`~supervisely.api.entity_annotation.figure_api.FigureInfo`]]
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
dataset_id = 40568
image_id = 4877489
all_figures = api.image.figure.download(dataset_id, [image_id])
print(len(all_figures[image_id]))
filtered_figures = api.image.figure.download(
dataset_id,
[image_id],
filters=[
{
"type": "objects_class",
"data": {
"from": 1,
"to": 9999,
"include": True,
"classId": 154344,
},
}
],
)
print(len(filtered_figures[image_id]))
"""
fields = [
ApiField.ID,
ApiField.CREATED_AT,
ApiField.UPDATED_AT,
ApiField.IMAGE_ID,
ApiField.OBJECT_ID,
ApiField.CLASS_ID,
ApiField.PROJECT_ID,
ApiField.DATASET_ID,
ApiField.GEOMETRY,
ApiField.GEOMETRY_TYPE,
ApiField.GEOMETRY_META,
ApiField.TAGS,
ApiField.META,
ApiField.AREA,
ApiField.PRIORITY,
ApiField.CUSTOM_DATA,
ApiField.NN_CREATED,
ApiField.NN_UPDATED,
]
if skip_geometry is True:
fields = [x for x in fields if x != ApiField.GEOMETRY]
list_filter = []
if image_ids is not None:
list_filter.append(
{
ApiField.FIELD: ApiField.ENTITY_ID,
ApiField.OPERATOR: "in",
ApiField.VALUE: image_ids,
}
)
data = {
ApiField.DATASET_ID: dataset_id,
ApiField.FIELDS: fields,
ApiField.FILTER: list_filter,
}
if filters:
data[ApiField.PROJECT_ID] = self._api.dataset.get_info_by_id(dataset_id).project_id
data[ApiField.FILTERS] = filters
resp = self._api.post("figures.list", data)
infos = resp.json()
images_figures = defaultdict(list)
total_pages = infos["pagesCount"]
for page in range(1, total_pages + 1):
if page > 1:
data.update({ApiField.PAGE: page})
resp = self._api.post("figures.list", data)
infos = resp.json()
for info in infos["entities"]:
figure_info = self._convert_json_info(info, True)
images_figures[figure_info.entity_id].append(figure_info)
return dict(images_figures)
def _convert_json_info(self, info: dict, skip_missing=False):
res = super()._convert_json_info(info, skip_missing=True)
return FigureInfo(**res._asdict())
def _download_geometries_generator(
self, ids: List[int]
) -> Generator[Tuple[int, MultipartDecoder.Part], None, None]:
"""
Private method. Download figures geometries with given IDs from storage.
"""
for batch_ids in batched(ids):
response = self._api.post("figures.bulk.download.geometry", {ApiField.IDS: batch_ids})
decoder = MultipartDecoder.from_response(response)
for part in decoder.parts:
content_utf8 = part.headers[b"Content-Disposition"].decode("utf-8")
# Find name="1245" preceded by a whitespace, semicolon or beginning of line.
# The regex has 2 capture group: one for the prefix and one for the actual name value.
figure_id = int(re.findall(r'(^|[\s;])name="(\d*)"', content_utf8)[0][1])
yield figure_id, part
def download_geometry(self, figure_id: int) -> dict:
"""
Download figure geometry with given ID from storage.
:param figure_id: Figure ID in Supervisely.
:type figure_id: int
:returns: Figure geometry in Supervisely JSON format.
:rtype: dict
"""
return self.download_geometries_batch([figure_id])
def download_geometries_batch(
self,
ids: List[int],
progress_cb: Optional[Union[tqdm, Callable]] = None,
) -> List[dict]:
"""
Download figure geometries with given IDs from storage.
:param ids: List of figure IDs in Supervisely.
:type ids: List[int]
:param progress_cb: Progress bar to show the download progress. Shows the number of bytes downloaded.
:type progress_cb: Union[tqdm, Callable], optional
:returns: List of figure geometries in Supervisely JSON format.
:rtype: List[dict]
"""
geometries = {}
for idx, part in self._download_geometries_generator(ids):
try:
if progress_cb is not None:
progress_cb(len(part.content))
geometry_json = json.loads(part.content)
geometries[idx] = geometry_json
except Exception as e:
raise RuntimeError(f"Failed to decode geometry for figure ID {idx}") from e
if len(geometries) != len(ids):
raise RuntimeError("Not all geometries were downloaded")
ordered_results = [geometries[i] for i in ids]
return ordered_results
def upload_geometry(self, figure_id: int, geometry: dict):
"""
Upload figure geometry with given figure ID to storage.
:param figure_id: Figure ID in Supervisely.
:type figure_id: int
:param geometry: Figure geometry in Supervisely JSON format.
:type geometry: dict
:returns: None
:rtype: None
"""
self.upload_geometries_batch([figure_id], [geometry])
def upload_geometries_batch(self, figure_ids: List[int], geometries: List[dict]):
"""
Upload figure geometries with given figure IDs to storage.
:param figure_ids: List of figure IDs in Supervisely.
:type figure_ids: List[int]
:param geometries: List of figure geometries in Supervisely JSON format.
:type geometries: List[dict]
:returns: None
:rtype: None
"""
geometries = [json.dumps(geometry).encode("utf-8") for geometry in geometries]
for batch_ids, batch_geometries in zip(
batched(figure_ids, batch_size=100), batched(geometries, batch_size=100)
):
fields = []
for figure_id, geometry in zip(batch_ids, batch_geometries):
fields.append((ApiField.FIGURE_ID, str(figure_id)))
fields.append(
(
ApiField.GEOMETRY,
(str(figure_id), geometry, "application/octet-stream"),
)
)
encoder = MultipartEncoder(fields=fields)
self._api.post("figures.bulk.upload.geometry", encoder)
async def _download_geometries_generator_async(
self, ids: List[int], semaphore: Optional[asyncio.Semaphore] = None
) -> AsyncGenerator[Tuple[int, bytes], None, None]:
"""
Private method. Download figures geometries with given IDs from storage asynchronously.
:param ids: List of figure IDs in Supervisely.
:type ids: List[int]
:returns: Async generator with pairs of figure ID and figure geometry.
:rtype: AsyncGenerator[Tuple[int, bytes], None, None]
"""
if semaphore is None:
semaphore = self._api.get_default_semaphore()
for batch_ids in batched(ids):
async with semaphore:
response = await self._api.post_async(
"figures.bulk.download.geometry", {ApiField.IDS: batch_ids}
)
decoder = MultipartDecoder.from_response(response)
for part in decoder.parts:
content_utf8 = part.headers[b"Content-Disposition"].decode("utf-8")
# Find name="1245" preceded by a whitespace, semicolon or beginning of line.
# The regex has 2 capture group: one for the prefix and one for the actual name value.
figure_id = int(re.findall(r'(^|[\s;])name="(\d*)"', content_utf8)[0][1])
yield figure_id, part.content
async def download_geometries_batch_async(
self,
ids: List[int],
progress_cb: Optional[Union[tqdm, Callable]] = None,
semaphore: Optional[asyncio.Semaphore] = None,
) -> List[dict]:
"""
Download figure geometries with given IDs from storage asynchronously.
:param ids: List of figure IDs in Supervisely.
:type ids: List[int]
:param progress_cb: Progress bar to show the download progress. Shows the number of bytes downloaded.
:type progress_cb: Union[tqdm, Callable], optional
:param semaphore: Semaphore to limit the number of concurrent downloads.
:type semaphore: Optional[asyncio.Semaphore], optional
:returns: List of figure geometries in Supervisely JSON format.
:rtype: List[dict]
:Usage Example:
.. code-block:: python
import os
import asyncio
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
figure_ids = [642155547, 642155548, 642155549]
loop = sly.utils.get_or_create_event_loop()
geometries = loop.run_until_complete(
api.figure.download_geometries_batch_async(
figure_ids,
progress_cb=tqdm(total=len(figure_ids), desc="Downloading geometries"),
semaphore=asyncio.Semaphore(15),
)
)
"""
geometries = {}
async for idx, part in self._download_geometries_generator_async(ids, semaphore):
if progress_cb is not None:
progress_cb(len(part))
geometry_json = json.loads(part)
geometries[idx] = geometry_json
if len(geometries) != len(ids):
raise RuntimeError("Not all geometries were downloaded")
ordered_results = [geometries[i] for i in ids]
return ordered_results
async def upload_geometries_batch_async(
self,
figure_ids: List[int],
geometries: List[dict],
semaphore: Optional[asyncio.Semaphore] = None,
progress_cb: Optional[Union[tqdm, Callable]] = None,
) -> None:
"""
Upload figure geometries with given figure IDs to storage asynchronously in batches.
:param figure_ids: List of figure IDs in Supervisely.
:type figure_ids: List[int]
:param geometries: List of figure geometries in Supervisely JSON format.
:type geometries: List[dict]
:param semaphore: Semaphore to limit the number of concurrent uploads.
:type semaphore: Optional[asyncio.Semaphore], optional
:param progress_cb: Progress bar to show the upload progress. Shows the number of geometries uploaded.
:type progress_cb: Union[tqdm, Callable], optional
:returns: None
:rtype: None
:Usage Example:
.. code-block:: python
import os
import asyncio
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
figure_ids = [642155547, 642155548, 642155549]
geometries = [{...}, {...}, {...}] # Your geometry data
upload_coroutine = api.figure.upload_geometries_batch_async(
figure_ids,
geometries,
semaphore=asyncio.Semaphore(10),
)
sly.run_coroutine(upload_coroutine)
"""
if semaphore is None:
semaphore = self._api.get_default_semaphore()
encoded_geometries = [json.dumps(geometry).encode("utf-8") for geometry in geometries]
batch_size = 100
tasks = []
for batch_ids, batch_geometries in zip(
batched(figure_ids, batch_size),
batched(encoded_geometries, batch_size),
):
fields = []
for figure_id, geometry in zip(batch_ids, batch_geometries):
fields.append((ApiField.FIGURE_ID, str(figure_id)))
fields.append(
(
ApiField.GEOMETRY,
(str(figure_id), geometry, "application/octet-stream"),
)
)
async def upload_batch(fields, progress_cb, num):
async with semaphore:
encoder = MultipartEncoder(fields=fields)
headers = {"Content-Type": encoder.content_type}
async for _, _ in self._api.stream_async(
"figures.bulk.upload.geometry",
"POST",
data=encoder,
content=encoder.to_string(),
headers=headers,
):
pass
if progress_cb is not None:
progress_cb.update(num)
tasks.append(upload_batch(fields, progress_cb, len(batch_ids)))
if tasks:
await asyncio.gather(*tasks)
async def download_async(
self,
dataset_id: int,
image_ids: Optional[List[int]] = None,
skip_geometry: bool = False,
filters: Optional[List[Dict[str, Any]]] = None,
semaphore: Optional[asyncio.Semaphore] = None,
log_progress: bool = True,
batch_size: int = 300,
) -> Dict[int, List[FigureInfo]]:
"""
Asynchronously download figures for the given dataset ID. Can be filtered by image IDs.
This method is significantly faster than the synchronous version for large datasets.
:param dataset_id: Dataset ID in Supervisely.
:type dataset_id: int
:param image_ids: Specify the list of image IDs within the given dataset ID. If image_ids is None, the method returns all possible pairs of images with figures.
:type image_ids: List[int], optional
:param skip_geometry: Skip the download of figure geometry. May be useful for a significant api request speed increase in the large datasets.
:type skip_geometry: bool
:param filters: Filters for the figures. See https://api.docs.supervisely.com/#tag/Figures/paths/~1figures.list/get for more details.
:type filters: List[Dict[str, Any]], optional
:param semaphore: Semaphore to limit the number of concurrent downloads.
:type semaphore: Optional[asyncio.Semaphore], optional
:param log_progress: If True, log the progress of the download.
:type log_progress: bool, optional
:param batch_size: Size of the batch for downloading figures per 1 request. Default is 300.
Used for batching image_ids when filtering by specific images.
Adjust this value for optimal performance, value cannot exceed 500.
:type batch_size: int, optional
:returns: A dictionary where keys are image IDs and values are lists of figures.
:rtype: Dict[int, List[:class:`~supervisely.api.entity_annotation.figure_api.FigureInfo`]]
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
dataset_id = 40568
image_id = 4877489
download_coroutine = api.image.figure.download_async(dataset_id, [image_id])
figures = sly.run_coroutine(download_coroutine)
filtered_figures = sly.run_coroutine(
api.image.figure.download_async(
dataset_id,
[image_id],
filters=[
{
"type": "objects_class",
"data": {
"from": 1,
"to": 9999,
"include": True,
"classId": 154344,
},
}
],
)
)
"""
fields = [
ApiField.ID,
ApiField.CREATED_AT,
ApiField.UPDATED_AT,
ApiField.IMAGE_ID,
ApiField.OBJECT_ID,
ApiField.CLASS_ID,
ApiField.PROJECT_ID,
ApiField.DATASET_ID,
ApiField.GEOMETRY,
ApiField.GEOMETRY_TYPE,
ApiField.GEOMETRY_META,
ApiField.TAGS,
ApiField.META,
ApiField.AREA,
ApiField.PRIORITY,
ApiField.CUSTOM_DATA,
ApiField.NN_CREATED,
ApiField.NN_UPDATED,
]
if skip_geometry is True:
fields = [x for x in fields if x != ApiField.GEOMETRY]
# Base data setup
base_data = {
ApiField.DATASET_ID: dataset_id,
ApiField.FIELDS: fields,
}
if filters:
base_data[ApiField.PROJECT_ID] = self._api.dataset.get_info_by_id(dataset_id).project_id
base_data[ApiField.FILTERS] = filters
if semaphore is None:
semaphore = self._api.get_default_semaphore()
async def _get_page_figures(page_data, semaphore, progress_cb: tqdm = None):
"""Helper function to get figures from a single page"""
async with semaphore:
response = await self._api.post_async("figures.list", page_data)
response_json = response.json()
page_figures = []
for info in response_json["entities"]:
figure_info = self._convert_json_info(info, True)
page_figures.append(figure_info)
if progress_cb is not None:
progress_cb.update(len(response_json["entities"]))
return page_figures
async def _get_all_pages(ids_filter, progress_cb: tqdm = None):
"""Internal function to process all pages for given filter"""
data = base_data.copy()
data[ApiField.FILTER] = ids_filter
# Get first page to determine pagination
data[ApiField.PAGE] = 1
async with semaphore:
response = await self._api.post_async("figures.list", data)
response_json = response.json()
pages_count = response_json["pagesCount"]
all_figures = []
# Process first page
for info in response_json["entities"]:
figure_info = self._convert_json_info(info, True)
all_figures.append(figure_info)
if progress_cb is not None:
progress_cb.update(len(response_json["entities"]))
# Process remaining pages in parallel if needed
if pages_count > 1:
tasks = []
for page in range(2, pages_count + 1):
page_data = data.copy()
page_data[ApiField.PAGE] = page
tasks.append(
asyncio.create_task(
_get_page_figures(page_data, semaphore, progress_cb=progress_cb)
)
)
if tasks:
page_results = await asyncio.gather(*tasks)
for page_figures in page_results:
all_figures.extend(page_figures)
return all_figures
if log_progress:
progress_cb = tqdm(desc="Downloading figures", unit="figure", total=0)
else:
progress_cb = None
# Strategy: batch processing based on image_ids
tasks = []
if image_ids is None:
# Single task for all figures in dataset
tasks.append(_get_all_pages([], progress_cb=progress_cb))
else:
# Batch image_ids and create tasks for each batch
for batch_ids in batched(image_ids, batch_size):
list_filters = [
{
ApiField.FIELD: ApiField.ENTITY_ID,
ApiField.OPERATOR: "in",
ApiField.VALUE: list(batch_ids),
}
]
tasks.append(_get_all_pages(list_filters, progress_cb=progress_cb))
# Small delay between batches to reduce server load
await asyncio.sleep(0.02)
# Execute all tasks in parallel and collect results
all_results = await asyncio.gather(*tasks)
# Combine results from all batches
images_figures = defaultdict(list)
for batch_figures in all_results:
for figure in batch_figures:
images_figures[figure.entity_id].append(figure)
return dict(images_figures)
def download_fast(
self,
dataset_id: int,
image_ids: Optional[List[int]] = None,
skip_geometry: bool = False,
filters: Optional[List[Dict[str, Any]]] = None,
semaphore: Optional[asyncio.Semaphore] = None,
log_progress: bool = True,
batch_size: int = 300,
) -> Dict[int, List[FigureInfo]]:
"""
Download figures for the given dataset ID. Can be filtered by image IDs.
This method is significantly faster than the synchronous version for large datasets and
is designed to be used in an asynchronous context.
Will fallback to the synchronous version if async fails.
:param dataset_id: Dataset ID in Supervisely.
:type dataset_id: int
:param image_ids: Specify the list of image IDs within the given dataset ID. If image_ids is None, the method returns all possible pairs of images with figures.
:type image_ids: List[int], optional
:param skip_geometry: Skip the download of figure geometry. May be useful for a significant api request speed increase in the large datasets.
:type skip_geometry: bool
:param filters: Filters for the figures. See https://api.docs.supervisely.com/#tag/Figures/paths/~1figures.list/get for more details.
:type filters: List[Dict[str, Any]], optional
:param semaphore: Semaphore to limit the number of concurrent downloads.
:type semaphore: Optional[asyncio.Semaphore], optional
:param log_progress: If True, log the progress of the download.
:type log_progress: bool, optional
:param batch_size: Size of the batch for downloading figures per 1 request. Default is 300.
Used for batching image_ids when filtering by specific images.
Adjust this value for optimal performance, value cannot exceed 500.
:type batch_size: int, optional
:returns: A dictionary where keys are image IDs and values are lists of figures.
:rtype: Dict[int, List[:class:`~supervisely.api.entity_annotation.figure_api.FigureInfo`]]
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
dataset_id = 40568
image_id = 4877489
figures = api.image.figure.download_fast(dataset_id, [image_id])
filtered_figures = api.image.figure.download_fast(
dataset_id,
[image_id],
filters=[
{
"type": "objects_class",
"data": {
"from": 1,
"to": 9999,
"include": True,
"classId": 154344,
},
}
],
)
"""
try:
return run_coroutine(
self.download_async(
dataset_id=dataset_id,
image_ids=image_ids,
skip_geometry=skip_geometry,
filters=filters,
semaphore=semaphore,
log_progress=log_progress,
batch_size=batch_size,
)
)
except Exception:
# Fallback to the synchronous version if async fails
logger.debug("Async download of figures is failed, falling back to sync download.")
if log_progress:
logger.debug("Downloading figures without progress bar.")
return self.download(
dataset_id=dataset_id,
image_ids=image_ids,
skip_geometry=skip_geometry,
filters=filters,
)
def restore_batch(self, ids: List[int], progress_cb: Optional[Callable] = None, batch_size: int = 50):
"""
Restore archived figures in batches from the Supervisely server.
:param ids: IDs of figures in Supervisely.
:type ids: List[int]
:param progress_cb: Optional callback to track restore progress. Receives number of restored figures in the current batch.
:type progress_cb: Optional[Callable]
:param batch_size: Number of figure IDs to send in a single request.
:type batch_size: int
"""
for ids_batch in batched(ids, batch_size=batch_size):
self._api.post(
"figures.bulk.restore",
{ApiField.FIGURE_IDS: ids_batch},
)
if progress_cb is not None:
progress_cb(len(ids_batch))
def restore(self, id: int):
"""
Restore a single archived figure with the specified ID from the Supervisely server.
:param id: Figure ID in Supervisely.
:type id: int
"""
self.restore_batch([id])