# coding: utf-8
from __future__ import annotations
import asyncio
import io
import json
import os
import tarfile
import tempfile
from collections import namedtuple
from typing import Callable, Dict, List, NamedTuple, Optional, Tuple, Union
import zstd
from tqdm import tqdm
from supervisely._utils import batched, logger
from supervisely.api.api import Api
from supervisely.api.dataset_api import DatasetInfo
from supervisely.api.module_api import ApiField
from supervisely.api.project_api import ProjectInfo
from supervisely.api.video.video_api import VideoInfo
from supervisely.collection.key_indexed_collection import KeyIndexedCollection
from supervisely.io.fs import clean_dir, mkdir, touch, touch_async
from supervisely.io.json import dump_json_file, dump_json_file_async, load_json_file
from supervisely.project.project import Dataset, OpenMode, Project
from supervisely.project.project import read_single_project as read_project_wrapper
from supervisely.project.project_meta import ProjectMeta
from supervisely.project.project_settings import LabelingInterface
from supervisely.project.project_type import ProjectType
from supervisely.project.versioning.common import (
DEFAULT_VIDEO_SCHEMA_VERSION,
get_video_snapshot_schema,
)
from supervisely.project.versioning.schema_fields import VersionSchemaField
from supervisely.task.progress import tqdm_sly
from supervisely.video import video as sly_video
from supervisely.video_annotation.key_id_map import KeyIdMap
from supervisely.video_annotation.video_annotation import VideoAnnotation
[docs]
class VideoItemPaths(NamedTuple):
"""
Paths to a video's data file and its annotation file within a local Supervisely dataset.
"""
video_path: str
# Full video file path of item
ann_path: str
# Full annotation file path of item
[docs]
class VideoDataset(Dataset):
"""
A dataset directory for video items inside a local Supervisely video project.
Stores videos, per-video annotations and related metadata on disk.
"""
#: str: Items data directory name
item_dir_name = "video"
#: str: Annotations directory name
ann_dir_name = "ann"
#: str: Items info directory name
item_info_dir_name = "video_info"
#: str: Metadata directory name
metadata_dir_name = "metadata"
#: str: Segmentation masks directory name
seg_dir_name = None
annotation_class = VideoAnnotation
item_info_class = VideoInfo
datasets_dir_name = "datasets"
def __init__(
self,
directory: str,
mode: Optional[OpenMode] = None,
parents: Optional[List[str]] = None,
dataset_id: Optional[int] = None,
api: Optional[Api] = None,
):
"""
VideoDataset is a dataset for video data. VideoDataset object is immutable.
:param directory: Path to dataset directory.
:type directory: str
:param mode: Determines working mode for the given dataset.
:type mode: :class:`~supervisely.project.project.OpenMode`, optional. If not provided, dataset_id must be provided.
:param parents: List of parent directories, e.g. ["ds1", "ds2", "ds3"].
:type parents: List[str]
:param dataset_id: Dataset ID if the Dataset is opened in API mode.
If dataset_id is specified then api must be specified as well.
:type dataset_id: Optional[int]
:param api: API object if the Dataset is opened in API mode.
:type api: :class:`~supervisely.api.api.Api`, optional.
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.project_dir)
# Output: "/home/admin/work/supervisely/projects/videos_example"
"""
super().__init__(directory, mode, parents, dataset_id, api)
@property
def project_dir(self) -> str:
"""
Path to the video project containing the video dataset.
:returns: Path to the video project.
:rtype: str
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.project_dir)
# Output: "/home/admin/work/supervisely/projects/videos_example"
"""
return super().project_dir
@property
def name(self) -> str:
"""
Video Dataset name.
:returns: Video Dataset Name.
:rtype: str
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.name)
# Output: "ds0"
"""
return super().name
@property
def directory(self) -> str:
"""
Path to the video dataset directory.
:returns: Path to the video dataset directory.
:rtype: str
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.directory)
# Output: '/home/admin/work/supervisely/projects/videos_example/ds0'
"""
return super().directory
@property
def metadata_directory(self) -> str:
"""
Path to the video dataset metadata directory.
:return: Path to the video dataset metadata directory.
:rtype: str
:Usage example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.metadata_directory)
# Output: '/home/admin/work/supervisely/projects/videos_example/ds0/metadata'
"""
return os.path.join(self.directory, self.metadata_dir_name)
@property
def item_dir(self) -> str:
"""
Path to the video dataset items directory.
:returns: Path to the video dataset items directory.
:rtype: str
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.item_dir)
# Output: '/home/admin/work/supervisely/projects/videos_example/ds0/video'
"""
return super().item_dir
@property
def img_dir(self) -> str:
"""
Not available for VideoDataset class object.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Property 'img_dir' is not supported for {type(self).__name__} object."
)
@property
def ann_dir(self) -> str:
"""
Path to the video dataset annotations directory.
:returns: Path to the video dataset directory with annotations.
:rtype: str
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.ann_dir)
# Output: '/home/admin/work/supervisely/projects/videos_example/ds0/ann'
"""
return super().ann_dir
@property
def img_info_dir(self):
"""
Not available for VideoDataset class object.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Property 'img_info_dir' is not supported for {type(self).__name__} object."
)
@property
def item_info_dir(self):
"""
Path to the video dataset item with items info.
:returns: Path to the video dataset directory with items info.
:rtype: str
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.item_info_dir)
# Output: '/home/admin/work/supervisely/projects/videos_example/ds0/video_info'
"""
return super().item_info_dir
@property
def seg_dir(self):
"""
Not available for VideoDataset class object.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Property 'seg_dir' is not supported for {type(self).__name__} object."
)
@classmethod
def _has_valid_ext(cls, path: str) -> bool:
"""
Checks if file from given path is supported
:param path: str
:returns: bool
"""
return sly_video.has_valid_ext(path)
[docs]
def get_items_names(self) -> list:
"""
List of video dataset item names.
:returns: List of item names.
:rtype: List[str]
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.get_item_names())
# Output: ['video_0002.mp4', 'video_0005.mp4', 'video_0008.mp4', ...]
"""
return super().get_items_names()
[docs]
def item_exists(self, item_name: str) -> bool:
"""
Checks if given item name belongs to the video dataset.
:param item_name: Item name.
:type item_name: str
:returns: True if item exist, otherwise False.
:rtype: bool
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
ds.item_exists("video_0748") # False
ds.item_exists("video_0748.mp4") # True
"""
return super().item_exists(item_name)
[docs]
def get_item_path(self, item_name: str) -> str:
"""
Path to the given item.
:param item_name: Item name.
:type item_name: str
:returns: Path to the given item.
:rtype: str
:raises RuntimeError: if item not found in the project
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.get_item_path("video_0748"))
# Output: RuntimeError: Item video_0748 not found in the project.
print(ds.get_item_path("video_0748.mp4"))
# Output: '/home/admin/work/supervisely/projects/videos_example/ds0/video/video_0748.mp4'
"""
return super().get_item_path(item_name)
[docs]
def get_img_path(self, item_name: str) -> str:
"""
Not available for VideoDataset class object.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Method 'get_img_path(item_name)' is not supported for {type(self).__name__} object."
)
[docs]
def get_ann(
self, item_name, project_meta: ProjectMeta, key_id_map: Optional[KeyIdMap] = None
) -> VideoAnnotation:
"""
Read annotation of item from json.
:param item_name: Item name.
:type item_name: str
:param project_meta: Project meta.
:type project_meta: :class:`~supervisely.project.project_meta.ProjectMeta`
:param key_id_map: KeyIdMap object.
:type key_id_map: :class:`~supervisely.video_annotation.key_id_map.KeyIdMap`, optional
:returns: VideoAnnotation object.
:rtype: :class:`~supervisely.video_annotation.video_annotation.VideoAnnotation`
:raises RuntimeError: if item not found in the project
:Usage Example:
.. code-block:: python
import supervisely as sly
project_path = "/home/admin/work/supervisely/projects/videos_example"
project = sly.VideoProject(project_path, sly.OpenMode.READ)
ds = project.datasets.get('ds0')
annotation = ds.get_ann("video_0748", project.meta)
# Output: RuntimeError: Item video_0748 not found in the project.
annotation = ds.get_ann("video_0748.mp4", project.meta)
print(annotation.to_json())
# Output: {
# "description": "",
# "size": {
# "height": 500,
# "width": 700
# },
# "key": "e9ef52dbbbbb490aa10f00a50e1fade6",
# "tags": [],
# "objects": [],
# "frames": [{
# "index": 0,
# "figures": []
# }]
# "framesCount": 1
# }
"""
ann_path = self.get_ann_path(item_name)
return self.annotation_class.load_json_file(ann_path, project_meta, key_id_map)
[docs]
def get_ann_path(self, item_name: str) -> str:
"""
Path to the given annotation json file.
:param item_name: Item name.
:type item_name: str
:returns: Path to the given annotation json file.
:rtype: str
:raises RuntimeError: if item not found in the project
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.get_ann_path("video_0748"))
# Output: RuntimeError: Item video_0748 not found in the project.
print(ds.get_ann_path("video_0748.mp4"))
# Output: '/home/admin/work/supervisely/projects/videos_example/ds0/ann/video_0748.mp4.json'
"""
return super().get_ann_path(item_name)
[docs]
def get_img_info_path(self, img_name: str) -> str:
"""
Not available for VideoDataset class object.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Method 'get_img_info_path(item_name)' is not supported for {type(self).__name__} object."
)
[docs]
def get_item_info_path(self, item_name: str) -> str:
"""
Get path to the item info json file without checking if the file exists.
:param item_name: Item name.
:type item_name: str
:returns: Path to the given item info json file.
:rtype: str
:raises RuntimeError: if item not found in the project.
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.get_item_info_path("video_0748"))
# Output: RuntimeError: Item video_0748 not found in the project.
print(ds.get_item_info_path("video_0748.mp4"))
# Output: '/home/admin/work/supervisely/projects/videos_example/ds0/video_info/video_0748.mp4.json'
"""
return super().get_item_info_path(item_name)
[docs]
def get_image_info(self, item_name: str) -> None:
"""
Not available for VideoDataset class object.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Method 'get_image_info(item_name)' is not supported for {type(self).__name__} object."
)
[docs]
def get_item_info(self, item_name: str) -> VideoInfo:
"""
Information for Item with given name.
:param item_name: Item name.
:type item_name: str
:returns: VideoInfo object.
:rtype: :class:`~supervisely.api.video.video_api.VideoInfo`
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
print(ds.get_item_info("video_0748.mp4"))
# Output:
# VideoInfo(
# id=198702499,
# name='video_0748.mp4',
# hash='ehYHLNFWmMNuF2fPUgnC/g/tkIIEjNIOhdbNLQXkE8Y=',
# team_id=16087,
# workspace_id=23821,
# project_id=124974,
# dataset_id=466639,
# path_original='/h5un6l2bnaz1vj8a9qgms4-public/videos/w/7/i4/GZYoCs...9F3kyVJ7.mp4',
# frames_to_timecodes=[0, 0.033367, 0.066733, 0.1001,...,10.777433, 10.8108, 10.844167],
# frames_count=326,
# frame_width=3840,
# frame_height=2160,
# created_at='2021-03-23T13:14:25.536Z',
# updated_at='2021-03-23T13:16:43.300Z'
# )
"""
item_info_path = self.get_item_info_path(item_name)
item_info_dict = load_json_file(item_info_path)
item_info_named_tuple = namedtuple(self.item_info_class.__name__, item_info_dict)
return item_info_named_tuple(**item_info_dict)
[docs]
def get_seg_path(self, item_name: str) -> str:
"""
Not available for VideoDataset class object.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Method 'get_seg_path(item_name)' is not supported for {type(self).__name__} object."
)
[docs]
def add_item_file(
self,
item_name: str,
item_path: str,
ann: Optional[Union[VideoAnnotation, str]] = None,
_validate_item: Optional[bool] = True,
_use_hardlink: Optional[bool] = False,
item_info: Optional[Union[VideoInfo, Dict, str]] = None,
) -> None:
"""
Adds given item file to dataset items directory, and adds given annotation to dataset
annotations directory. if ann is None, creates empty annotation file.
:param item_name: Item name.
:type item_name: str
:param item_path: Path to the item.
:type item_path: str
:param ann: VideoAnnotation object or path to annotation json file.
:type ann: :class:`~supervisely.video_annotation.video_annotation.VideoAnnotation` or str, optional
:param _validate_item: Checks input files format.
:type _validate_item: bool, optional
:param _use_hardlink: If True creates a hardlink pointing to src named dst, otherwise don't.
:type _use_hardlink: bool, optional
:param item_info: VideoInfo object or VideoInfo object converted to dict or path to item info json file for copying to dataset item info directory.
:type item_info: :class:`~supervisely.api.video.video_api.VideoInfo` or dict or str, optional
:returns: None
:rtype: NoneType
:raises RuntimeError: if item_name already exists in dataset or item name has unsupported extension.
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
ann = "/home/admin/work/supervisely/projects/videos_example/ds0/ann/video_8888.mp4.json"
ds.add_item_file("video_8888.mp4", "/home/admin/work/supervisely/projects/videos_example/ds0/video/video_8888.mp4", ann=ann)
print(ds.item_exists("video_8888.mp4"))
# Output: True
"""
return super().add_item_file(
item_name=item_name,
item_path=item_path,
ann=ann,
_validate_item=_validate_item,
_use_hardlink=_use_hardlink,
item_info=item_info,
)
[docs]
def add_item_np(self, item_name, img, ann=None, img_info=None):
"""
Not available for VideoDataset class object.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Method 'add_item_np()' is not supported for {type(self).__name__} object."
)
[docs]
def add_item_raw_bytes(self, item_name, item_raw_bytes, ann=None, img_info=None):
"""
Not available for VideoDataset class object.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Method 'add_item_raw_bytes()' is not supported for {type(self).__name__} object."
)
def get_classes_stats(
self,
project_meta: Optional[ProjectMeta] = None,
return_objects_count: Optional[bool] = True,
return_figures_count: Optional[bool] = True,
return_items_count: Optional[bool] = True,
):
if project_meta is None:
project = VideoProject(self.project_dir, OpenMode.READ)
project_meta = project.meta
class_items = {}
class_objects = {}
class_figures = {}
for obj_class in project_meta.obj_classes:
class_items[obj_class.name] = 0
class_objects[obj_class.name] = 0
class_figures[obj_class.name] = 0
for item_name in self:
item_ann = self.get_ann(item_name, project_meta)
item_class = {}
for ann_obj in item_ann.objects:
class_objects[ann_obj.obj_class.name] += 1
for video_figure in item_ann.figures:
class_figures[video_figure.parent_object.obj_class.name] += 1
item_class[video_figure.parent_object.obj_class.name] = True
for obj_class in project_meta.obj_classes:
if obj_class.name in item_class.keys():
class_items[obj_class.name] += 1
result = {}
if return_items_count:
result["items_count"] = class_items
if return_objects_count:
result["objects_count"] = class_objects
if return_figures_count:
result["figures_count"] = class_figures
return result
def _get_empty_annotaion(self, item_name):
"""
Create empty VideoAnnotation for given video
:param item_name: str
:returns: VideoAnnotation object
"""
img_size, frames_count = sly_video.get_image_size_and_frames_count(item_name)
return self.annotation_class(img_size, frames_count)
def _add_item_raw_bytes(self, item_name, item_raw_bytes):
"""
Not available for VideoDataset class object.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Method '_add_item_raw_bytes()' is not supported for {type(self).__name__} object."
)
def _add_img_np(self, item_name, img):
"""
Not available for VideoDataset class object.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Method '_add_img_np()' is not supported for {type(self).__name__} object."
)
def _validate_added_item_or_die(self, item_path):
"""
Make sure we actually received a valid video file, clean it up and fail if not so.
:param item_path: str
"""
# Make sure we actually received a valid video file, clean it up and fail if not so.
try:
sly_video.validate_format(item_path)
except Exception as e:
os.remove(item_path)
raise e
[docs]
def set_ann(
self, item_name: str, ann: VideoAnnotation, key_id_map: Optional[KeyIdMap] = None
) -> None:
"""
Replaces given annotation for given item name to dataset annotations directory in json format.
:param item_name: Item name.
:type item_name: str
:param ann: VideoAnnotation object.
:type ann: :class:`~supervisely.video_annotation.video_annotation.VideoAnnotation`
:param key_id_map: KeyIdMap object.
:type key_id_map: :class:`~supervisely.video_annotation.key_id_map.KeyIdMap`, optional
:returns: None
:rtype: NoneType
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
height, width = 500, 700
new_ann = sly.VideoAnnotation((height, width), frames_count=0)
ds.set_ann("video_0748.mp4", new_ann)
"""
if type(ann) is not self.annotation_class:
raise TypeError(
f"Type of 'ann' should be {self.annotation_class.__name__}, not a {type(ann).__name__}"
)
dst_ann_path = self.get_ann_path(item_name)
dump_json_file(ann.to_json(key_id_map), dst_ann_path, indent=4)
[docs]
def get_item_paths(self, item_name) -> VideoItemPaths:
"""
Generates VideoItemPaths object with paths to item and annotation directories for item with given name.
:param item_name: Item name.
:type item_name: str
:returns: VideoItemPaths object
:rtype: :class:`~supervisely.project.video_project.VideoItemPaths`
:Usage Example:
.. code-block:: python
import supervisely as sly
dataset_path = "/home/admin/work/supervisely/projects/videos_example/ds0"
ds = sly.VideoDataset(dataset_path, sly.OpenMode.READ)
video_path, ann_path = dataset.get_item_paths("video_0748.mp4")
print("video_path:", video_path)
print("ann_path:", ann_path)
# Output:
# video_path: /home/admin/work/supervisely/projects/videos_example/ds0/video/video_0748.mp4
# ann_path: /home/admin/work/supervisely/projects/videos_example/ds0/ann/video_0748.mp4.json
"""
return VideoItemPaths(
video_path=self.get_item_path(item_name), ann_path=self.get_ann_path(item_name)
)
[docs]
@staticmethod
def get_url(project_id: int, dataset_id: int) -> str:
"""
Get URL to dataset items list in Supervisely.
:param project_id: VideoProject ID in Supervisely.
:type project_id: int
:param dataset_id: VideoDataset ID in Supervisely.
:type dataset_id: int
:returns: URL to dataset items list.
:rtype: str
:Usage Example:
.. code-block:: python
from supervisely import VideoDataset
project_id = 10093
dataset_id = 45330
ds_items_link = VideoDataset.get_url(project_id, dataset_id)
print(ds_items_link)
# Output: "/projects/10093/datasets/45330"
"""
return super().get_url(project_id, dataset_id)
[docs]
class VideoProject(Project):
"""
A local Supervisely project for video data.
Contains one or more :class:`~supervisely.project.video_project.VideoDataset` datasets with videos
and their annotations.
"""
dataset_class = VideoDataset
[docs]
class DatasetDict(KeyIndexedCollection):
"""Key-indexed collection of :class:`~supervisely.project.video_project.VideoDataset` datasets."""
item_type = VideoDataset
def __init__(self, directory, mode: OpenMode):
"""
VideoProject is a parent directory for video dataset. VideoProject object is immutable.
:param directory: Path to video project directory.
:type directory: str
:param mode: Determines working mode for the given project.
:type mode: :class:`~supervisely.project.project.OpenMode`
:Usage Example:
.. code-block:: python
import supervisely as sly
project_path = "/home/admin/work/supervisely/projects/videos_example"
project = sly.Project(project_path, sly.OpenMode.READ)
"""
self._key_id_map: KeyIdMap = None
super().__init__(directory, mode)
[docs]
@staticmethod
def get_url(id: int) -> str:
"""
Get URL to video datasets list in Supervisely.
:param id: VideoProject ID in Supervisely.
:type id: int
:returns: URL to datasets list.
:rtype: str
:Usage Example:
.. code-block:: python
from supervisely import VideoProject
project_id = 10093
datasets_link = VideoProject.get_url(project_id)
print(datasets_link)
# Output: "/projects/10093/datasets"
"""
return super().get_url(id)
def get_classes_stats(
self,
dataset_names: Optional[List[str]] = None,
return_objects_count: Optional[bool] = True,
return_figures_count: Optional[bool] = True,
return_items_count: Optional[bool] = True,
):
return super(VideoProject, self).get_classes_stats(
dataset_names, return_objects_count, return_figures_count, return_items_count
)
def _read(self):
"""
Download project from given project directory. Checks item and annotation directoris existing and dataset not empty.
Consistency checks. Every video must have an annotation, and the correspondence must be one to one.
"""
super()._read()
self._key_id_map = KeyIdMap()
if os.path.exists(self._get_key_id_map_path()):
self._key_id_map = self._key_id_map.load_json(self._get_key_id_map_path())
def _create(self):
"""
Creates a leaf directory and empty meta.json file. Generate exception error if project directory already exists and is not empty.
"""
super()._create()
self.set_key_id_map(KeyIdMap())
@property
def key_id_map(self):
# TODO: write docstring
return self._key_id_map
@property
def type(self) -> str:
"""
Project type.
:returns: Project type.
:rtype: str
:Usage Example:
.. code-block:: python
import supervisely as sly
project = sly.VideoProject("/home/admin/work/supervisely/projects/video", sly.OpenMode.READ)
print(project.type)
# Output: 'videos'
"""
return ProjectType.VIDEOS.value
[docs]
def set_key_id_map(self, new_map: KeyIdMap):
"""
Save given KeyIdMap object to project dir in json format.
:param new_map: KeyIdMap object.
:type new_map: :class:`~supervisely.video_annotation.key_id_map.KeyIdMap`
"""
self._key_id_map = new_map
self._key_id_map.dump_json(self._get_key_id_map_path())
def _get_key_id_map_path(self):
"""
:returns: str (full path to key_id_map.json)
"""
return os.path.join(self.directory, "key_id_map.json")
[docs]
def copy_data(
self,
dst_directory: str,
dst_name: Optional[str] = None,
_validate_item: Optional[bool] = True,
_use_hardlink: Optional[bool] = False,
) -> VideoProject:
"""
Makes a copy of the VideoProject.
:param dst_directory: Path to video project parent directory.
:type dst_directory: str
:param dst_name: Video Project name.
:type dst_name: str, optional
:param _validate_item: Checks input files format.
:type _validate_item: bool, optional
:param _use_hardlink: If True creates a hardlink pointing to src named dst, otherwise don't.
:type _use_hardlink: bool, optional
:returns: New instance of VideoProject object.
:rtype: :class:`~supervisely.project.video_project.VideoProject`
:Usage Example:
.. code-block:: python
import supervisely as sly
project = sly.VideoProject("/home/admin/work/supervisely/projects/videos_example", sly.OpenMode.READ)
print(project.total_items)
# Output: 6
new_project = project.copy_data("/home/admin/work/supervisely/projects/", "videos_example_copy")
print(new_project.total_items)
# Output: 6
"""
dst_name = dst_name if dst_name is not None else self.name
new_project = VideoProject(os.path.join(dst_directory, dst_name), OpenMode.CREATE)
new_project.set_meta(self.meta)
for ds in self:
new_ds = new_project.create_dataset(ds.name)
for item_name in ds:
item_path, ann_path = ds.get_item_paths(item_name)
item_info_path = ds.get_item_info_path(item_name)
item_path = item_path if os.path.isfile(item_path) else None
ann_path = ann_path if os.path.isfile(ann_path) else None
item_info_path = item_info_path if os.path.isfile(item_info_path) else None
try:
new_ds.add_item_file(
item_name,
item_path,
ann_path,
_validate_item=_validate_item,
_use_hardlink=_use_hardlink,
item_info=item_info_path,
)
except Exception as e:
logger.info(
"INFO FOR DEBUGGING",
extra={
"source_project_name": self.name,
"dst_directory": dst_directory,
"ds_name": ds.name,
"item_name": item_name,
"item_path": item_path,
"ann_path": ann_path,
"item_info": item_info_path,
},
)
raise e
new_project.set_key_id_map(self.key_id_map)
return new_project
[docs]
@staticmethod
def to_segmentation_task(
src_project_dir: str,
dst_project_dir: Optional[str] = None,
inplace: Optional[bool] = False,
target_classes: Optional[List[str]] = None,
progress_cb: Optional[Union[tqdm, Callable]] = None,
segmentation_type: Optional[str] = "semantic",
) -> None:
"""
Not available for VideoProject class.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Static method 'to_segmentation_task()' is not supported for VideoProject class now."
)
[docs]
@staticmethod
def to_detection_task(
src_project_dir: str,
dst_project_dir: Optional[str] = None,
inplace: Optional[bool] = False,
progress_cb: Optional[Union[tqdm, Callable]] = None,
) -> None:
"""
Not available for VideoProject class.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Static method 'to_detection_task()' is not supported for VideoProject class now."
)
[docs]
@staticmethod
def remove_classes_except(
project_dir: str,
classes_to_keep: Optional[List[str]] = None,
inplace: Optional[bool] = False,
) -> None:
"""
Not available for VideoProject class.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Static method 'remove_classes_except()' is not supported for VideoProject class now."
)
[docs]
@staticmethod
def remove_classes(
project_dir: str,
classes_to_remove: Optional[List[str]] = None,
inplace: Optional[bool] = False,
) -> None:
"""
Not available for VideoProject class.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Static method 'remove_classes()' is not supported for VideoProject class now."
)
@staticmethod
def _remove_items(
project_dir,
without_objects=False,
without_tags=False,
without_objects_and_tags=False,
inplace=False,
):
"""
Not available for VideoProject class.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Static method '_remove_items()' is not supported for VideoProject class now."
)
[docs]
@staticmethod
def remove_items_without_objects(project_dir: str, inplace: Optional[bool] = False) -> None:
"""
Not available for VideoProject class.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Static method 'remove_items_without_objects()' is not supported for VideoProject class now."
)
[docs]
@staticmethod
def get_train_val_splits_by_count(project_dir: str, train_count: int, val_count: int) -> None:
"""
Not available for VideoProject class.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Static method 'get_train_val_splits_by_count()' is not supported for VideoProject class now."
)
[docs]
@staticmethod
def get_train_val_splits_by_tag(
project_dir: str,
train_tag_name: str,
val_tag_name: str,
untagged: Optional[str] = "ignore",
) -> None:
"""
Not available for VideoProject class.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Static method 'get_train_val_splits_by_tag()' is not supported for VideoProject class now."
)
[docs]
@staticmethod
def get_train_val_splits_by_dataset(
project_dir: str, train_datasets: List[str], val_datasets: List[str]
) -> None:
"""
Not available for VideoProject class.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Static method 'get_train_val_splits_by_tag()' is not supported for VideoProject class now."
)
[docs]
@staticmethod
def get_train_val_splits_by_collections(
project_dir: str,
train_collections: List[int],
val_collections: List[int],
project_id: int,
api: Api,
) -> None:
"""
Not available for VideoProject class.
:raises NotImplementedError: in all cases.
"""
raise NotImplementedError(
f"Static method 'get_train_val_splits_by_collections()' is not supported for VideoProject class now."
)
[docs]
@classmethod
def read_single(cls, dir):
"""
Read project from given ditectory. Generate exception error if given dir contains more than one subdirectory
:param dir: str
:returns: New instance of VideoProject object.
:rtype: :class:`~supervisely.project.video_project.VideoProject`
"""
return read_project_wrapper(dir, cls)
[docs]
@staticmethod
def download(
api: Api,
project_id: int,
dest_dir: str,
dataset_ids: List[int] = None,
download_videos: bool = True,
save_video_info: bool = False,
log_progress: bool = True,
progress_cb: Optional[Union[tqdm, Callable]] = None,
resume_download: Optional[bool] = False,
) -> None:
"""
Download video project from Supervisely to the given directory.
:param api: Supervisely API object.
:type api: :class:`~supervisely.api.api.Api`
:param project_id: Project ID in Supervisely.
:type project_id: int
:param dest_dir: Directory to download video project.
:type dest_dir: str
:param dataset_ids: Datasets IDs in Supervisely to download.
:type dataset_ids: List[int], optional
:param download_videos: Download videos from Supervisely video project in dest_dir or not.
:type download_videos: bool, optional
:param save_video_info: Save video infos or not.
:type save_video_info: bool, optional
:param log_progress: Log download progress or not.
:type log_progress: bool
:param progress_cb: Function for tracking download progress.
:type progress_cb: tqdm or callable, optional
:returns: None
:rtype: NoneType
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
# Download Video Project
project_id = 8888
save_directory = "/home/admin/work/supervisely/source/video_project"
sly.VideoProject.download(api, project_id, save_directory)
project_fs = sly.VideoProject(save_directory, sly.OpenMode.READ)
"""
download_video_project(
api=api,
project_id=project_id,
dest_dir=dest_dir,
dataset_ids=dataset_ids,
download_videos=download_videos,
save_video_info=save_video_info,
log_progress=log_progress,
progress_cb=progress_cb,
resume_download=resume_download,
)
[docs]
@staticmethod
def upload(
dir: str,
api: Api,
workspace_id: int,
project_name: Optional[str] = None,
log_progress: bool = True,
progress_cb: Optional[Union[tqdm, Callable]] = None,
) -> Tuple[int, str]:
"""
Upload video project from given directory in Supervisely.
:param dir: Directory with video project.
:type dir: str
:param api: Supervisely API object.
:type api: :class:`~supervisely.api.api.Api`
:param workspace_id: Workspace ID in Supervisely to upload video project.
:type workspace_id: int
:param project_name: Name of video project.
:type project_name: str
:param log_progress: Logging progress of download video project or not.
:type log_progress: bool
:returns: New video project ID in Supervisely and project name
:rtype: int, str
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
# Upload Video Project
project_directory = "/home/admin/work/supervisely/source/video_project"
project_id, project_name = sly.VideoProject.upload(
project_directory,
api,
workspace_id=45,
project_name="My Video Project"
)
"""
return upload_video_project(
dir=dir,
api=api,
workspace_id=workspace_id,
project_name=project_name,
log_progress=log_progress,
progress_cb=progress_cb,
)
[docs]
@staticmethod
async def download_async(
api: Api,
project_id: int,
dest_dir: str,
semaphore: Optional[Union[asyncio.Semaphore, int]] = None,
dataset_ids: List[int] = None,
download_videos: bool = True,
save_video_info: bool = False,
log_progress: bool = True,
progress_cb: Optional[Union[tqdm, Callable]] = None,
include_custom_data: bool = False,
resume_download: Optional[bool] = False,
**kwargs,
) -> None:
"""
Download video project from Supervisely to the given directory asynchronously.
:param api: Supervisely :class:`~supervisely.api.api.Api` class object.
:type api: :class:`~supervisely.api.api.Api`
:param project_id: Project ID in Supervisely.
:type project_id: int
:param dest_dir: Directory to download video project.
:type dest_dir: str
:param semaphore: Semaphore to limit the number of concurrent downloads of items.
:type semaphore: :class:`asyncio.Semaphore` or int, optional
:param dataset_ids: Datasets IDs in Supervisely to download.
:type dataset_ids: List[int], optional
:param download_videos: Download videos from Supervisely video project in dest_dir or not.
:type download_videos: bool, optional
:param save_video_info: Save video infos or not.
:type save_video_info: bool, optional
:param log_progress: Log download progress or not.
:type log_progress: bool
:param progress_cb: Function for tracking download progress.
:type progress_cb: :class:`tqdm`, optional
:param include_custom_data: Include custom data in the download.
:type include_custom_data: bool, optional
:returns: None
:rtype: NoneType
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
from supervisely._utils import run_coroutine
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
save_directory = "/home/admin/work/supervisely/source/video_project"
project_id = 8888
coroutine = sly.VideoProject.download_async(api, project_id, save_directory)
run_coroutine(coroutine)
"""
await download_video_project_async(
api=api,
project_id=project_id,
dest_dir=dest_dir,
semaphore=semaphore,
dataset_ids=dataset_ids,
download_videos=download_videos,
save_video_info=save_video_info,
log_progress=log_progress,
progress_cb=progress_cb,
include_custom_data=include_custom_data,
resume_download=resume_download,
**kwargs,
)
# --------------------- #
# Video Data Versioning #
# --------------------- #
[docs]
@staticmethod
def download_bin(
api: Api,
project_id: int,
dest_dir: Optional[str] = None,
dataset_ids: Optional[List[int]] = None,
batch_size: int = 50,
log_progress: bool = True,
progress_cb: Optional[Union[tqdm, Callable]] = None,
return_bytesio: bool = False,
) -> Union[str, io.BytesIO]:
"""
Download video project snapshot in Arrow/Parquet-based binary format.
Result is a .tar.zst archive containing:
- project_info.json
- project_meta.json
- key_id_map.json
- manifest.json
- datasets.parquet
- videos.parquet
- objects.parquet
- figures.parquet
:param api: Supervisely API client.
:type api: :class:`~supervisely.api.api.Api`
:param project_id: Source project ID.
:type project_id: int
:param dest_dir: Directory to save the resulting ``.tar.zst`` file. Required if ``return_bytesio`` is False.
:type dest_dir: Optional[str]
:param dataset_ids: Optional list of dataset IDs to include. If provided, only those datasets (and their videos/annotations) will be included in the snapshot.
:type dataset_ids: Optional[List[int]]
:param batch_size: Batch size for downloading video annotations. Cannot be greater than 100 due to API limitations. Default is 50.
:type batch_size: int
:param log_progress: If True, shows progress (uses internal tqdm progress bars) when ``progress_cb`` is not provided.
:type log_progress: bool
:param progress_cb: Optional progress callback. Can be a tqdm or callable, accepting an integer increment.
:type progress_cb: Optional[Union[tqdm, Callable]]
:param return_bytesio: If True, return the snapshot as io.BytesIO. If False, write the snapshot to dest_dir and return the output file path.
:type return_bytesio: bool
:returns: Either output file path (.tar.zst) when return_bytesio is False, or an in-memory snapshot stream when return_bytesio is True.
:rtype: Union[str, io.BytesIO]
"""
if dest_dir is None and not return_bytesio:
raise ValueError(
"dest_dir must be specified if return_bytesio is False in VideoProject.download_bin"
)
snapshot_io = VideoProject.build_snapshot(
api,
project_id=project_id,
dataset_ids=dataset_ids,
batch_size=batch_size,
log_progress=log_progress,
progress_cb=progress_cb,
)
if return_bytesio:
snapshot_io.seek(0)
return snapshot_io
project_info = api.project.get_info_by_id(project_id)
os.makedirs(dest_dir, exist_ok=True)
out_path = os.path.join(
dest_dir,
f"{project_info.id}_{project_info.name}.tar.zst",
)
with open(out_path, "wb") as dst:
dst.write(snapshot_io.read())
return out_path
[docs]
@staticmethod
def upload_bin(
api: Api,
file: Union[str, io.BytesIO],
workspace_id: int,
project_name: Optional[str] = None,
with_custom_data: bool = True,
log_progress: bool = True,
progress_cb: Optional[Union[tqdm, Callable]] = None,
skip_missed: bool = False,
project_description: Optional[str] = None,
) -> "ProjectInfo":
"""
Restore a video project from an Arrow/Parquet-based binary snapshot.
:param api: Supervisely API client.
:type api: :class:`~supervisely.api.api.Api`
:param file: Snapshot file path (.tar.zst) or in-memory snapshot stream (io.BytesIO).
:type file: Union[str, io.BytesIO]
:param workspace_id: Target workspace ID where the project will be created.
:type workspace_id: int
:param project_name: Optional new project name. If not provided, the name from the snapshot will be used. If the name already exists in the workspace, a free name will be chosen.
:type project_name: Optional[str]
:param with_custom_data: If True, restore project/dataset/video custom data (when present in the snapshot).
:type with_custom_data: bool
:param log_progress: If True, shows progress (uses internal tqdm progress bars) when ``progress_cb`` is not provided.
:type log_progress: bool
:param progress_cb: Optional progress callback. Can be a tqdm or callable, accepting an integer increment.
:type progress_cb: Optional[Union[tqdm, Callable]]
:param skip_missed: If True, skip videos that are missing on server when restoring by hash.
:type skip_missed: bool
:param project_description: Description of the destination project in Supervisely.
:type project_description: str, optional
:returns: ProjectInfo object.
:rtype: :class:`~supervisely.api.project_api.ProjectInfo`
"""
if isinstance(file, io.BytesIO):
snapshot_bytes = file.getvalue()
else:
with open(file, "rb") as f:
snapshot_bytes = f.read()
return VideoProject.restore_snapshot(
api,
snapshot_bytes=snapshot_bytes,
workspace_id=workspace_id,
project_name=project_name,
with_custom_data=with_custom_data,
log_progress=log_progress,
progress_cb=progress_cb,
skip_missed=skip_missed,
project_description=project_description,
)
[docs]
@staticmethod
def build_snapshot(
api: Api,
project_id: int,
dataset_ids: Optional[List[int]] = None,
batch_size: int = 50,
log_progress: bool = True,
progress_cb: Optional[Union[tqdm, Callable]] = None,
schema_version: str = DEFAULT_VIDEO_SCHEMA_VERSION,
) -> io.BytesIO:
"""
Create a video project snapshot in Arrow/Parquet+tar.zst format and return it as BytesIO.
:param api: Supervisely API client.
:type api: :class:`~supervisely.api.api.Api`
:param project_id: Source project ID.
:type project_id: int
:param dataset_ids: Optional list of dataset IDs to include. If provided, only those datasets (and their videos/annotations) will be included in the snapshot.
:type dataset_ids: Optional[List[int]]
:param batch_size: Batch size for downloading video annotations.
:type batch_size: int
:param log_progress: If True, shows progress (uses internal tqdm progress bars) when progress_cb is not provided.
:type log_progress: bool
:param progress_cb: Optional progress callback. Can be a tqdm or callable, accepting an integer increment.
:type progress_cb: Optional[Union[tqdm, Callable]]
:param schema_version: Snapshot schema version. Controls the internal Parquet layout/fields. Supported values are the keys from get_video_snapshot_schema (currently: "v2.0.0").
:type schema_version: str
:returns: In-memory snapshot stream (io.BytesIO).
:rtype: io.BytesIO
"""
try:
import pyarrow # pylint: disable=import-error
import pyarrow.parquet as parquet # pylint: disable=import-error
except Exception as e:
raise RuntimeError(
"pyarrow is required to build video snapshot. Please install pyarrow."
) from e
if batch_size > 100:
logger.warning(
"Batch size cannot be greater than 100 due to Video Project API limitations. Setting batch_size to 100."
)
batch_size = 100
project_info = api.project.get_info_by_id(project_id)
meta = ProjectMeta.from_json(api.project.get_meta(project_id, with_settings=True))
key_id_map = KeyIdMap()
snapshot_schema = get_video_snapshot_schema(schema_version)
tmp_root = tempfile.mkdtemp()
payload_dir = os.path.join(tmp_root, "payload")
mkdir(payload_dir)
try:
# project_info / meta
proj_info_path = os.path.join(payload_dir, "project_info.json")
dump_json_file(project_info._asdict(), proj_info_path)
proj_meta_path = os.path.join(payload_dir, "project_meta.json")
dump_json_file(meta.to_json(), proj_meta_path)
datasets_rows: List[dict] = []
videos_rows: List[dict] = []
objects_rows: List[dict] = []
figures_rows: List[dict] = []
dataset_ids_filter = set(dataset_ids) if dataset_ids is not None else None
# api.dataset.tree() doesn't include custom_data
ds_custom_data_by_id: Dict[int, dict] = {}
try:
for ds in api.dataset.get_list(
project_id, recursive=True, include_custom_data=True
):
if getattr(ds, "custom_data", None) is not None:
ds_custom_data_by_id[ds.id] = ds.custom_data
except Exception:
ds_custom_data_by_id = {}
for parents, ds_info in api.dataset.tree(project_id):
if dataset_ids_filter is not None and ds_info.id not in dataset_ids_filter:
continue
full_path = Dataset._get_dataset_path(ds_info.name, parents)
ds_custom_data = ds_custom_data_by_id.get(ds_info.id)
datasets_rows.append(
snapshot_schema.dataset_row_from_ds_info(
ds_info, full_path=full_path, custom_data=ds_custom_data
)
)
videos = api.video.get_list(ds_info.id)
ds_progress = progress_cb
if log_progress and progress_cb is None:
ds_progress = tqdm_sly(
desc=f"Collecting videos from '{ds_info.name}'",
total=len(videos),
)
for batch in batched(videos, batch_size):
video_ids = [v.id for v in batch]
ann_jsons = api.video.annotation.download_bulk(ds_info.id, video_ids)
for video_info, ann_json in zip(batch, ann_jsons):
if video_info.name != ann_json[ApiField.VIDEO_NAME]:
raise RuntimeError(
"Error in api.video.annotation.download_bulk: broken order"
)
videos_rows.append(
snapshot_schema.video_row_from_video_info(
video_info, src_dataset_id=ds_info.id, ann_json=ann_json
)
)
video_ann = VideoAnnotation.from_json(ann_json, meta, key_id_map)
obj_key_to_src_id: Dict[str, int] = {}
for obj in video_ann.objects:
src_obj_id = len(objects_rows) + 1
obj_key_to_src_id[obj.key().hex] = src_obj_id
objects_rows.append(
snapshot_schema.object_row_from_object(
obj, src_object_id=src_obj_id, src_video_id=video_info.id
)
)
for frame in video_ann.frames:
for fig in frame.figures:
parent_key = fig.parent_object.key().hex
src_obj_id = obj_key_to_src_id.get(parent_key)
if src_obj_id is None:
logger.warning(
f"Figure parent object with key '{parent_key}' "
f"not found in objects for video '{video_info.name}'"
)
continue
figures_rows.append(
snapshot_schema.figure_row_from_figure(
fig,
figure_row_idx=len(figures_rows),
src_object_id=src_obj_id,
src_video_id=video_info.id,
frame_index=frame.index,
)
)
if ds_progress is not None:
ds_progress(len(batch))
# key_id_map.json
key_id_map_path = os.path.join(payload_dir, "key_id_map.json")
key_id_map.dump_json(key_id_map_path)
# Arrow schemas
tables_meta = []
datasets_schema = snapshot_schema.datasets_schema(pyarrow)
videos_schema = snapshot_schema.videos_schema(pyarrow)
objects_schema = snapshot_schema.objects_schema(pyarrow)
figures_schema = snapshot_schema.figures_schema(pyarrow)
if datasets_rows:
ds_table = pyarrow.Table.from_pylist(datasets_rows, schema=datasets_schema)
ds_path = os.path.join(payload_dir, "datasets.parquet")
parquet.write_table(ds_table, ds_path)
tables_meta.append(
{
"name": "datasets",
"path": "datasets.parquet",
"row_count": ds_table.num_rows,
}
)
if videos_rows:
v_table = pyarrow.Table.from_pylist(videos_rows, schema=videos_schema)
v_path = os.path.join(payload_dir, "videos.parquet")
parquet.write_table(v_table, v_path)
tables_meta.append(
{
"name": "videos",
"path": "videos.parquet",
"row_count": v_table.num_rows,
}
)
if objects_rows:
o_table = pyarrow.Table.from_pylist(objects_rows, schema=objects_schema)
o_path = os.path.join(payload_dir, "objects.parquet")
parquet.write_table(o_table, o_path)
tables_meta.append(
{
"name": "objects",
"path": "objects.parquet",
"row_count": o_table.num_rows,
}
)
if figures_rows:
f_table = pyarrow.Table.from_pylist(figures_rows, schema=figures_schema)
f_path = os.path.join(payload_dir, "figures.parquet")
parquet.write_table(f_table, f_path)
tables_meta.append(
{
"name": "figures",
"path": "figures.parquet",
"row_count": f_table.num_rows,
}
)
manifest = {
VersionSchemaField.SCHEMA_VERSION: schema_version,
VersionSchemaField.TABLES: tables_meta,
}
manifest_path = os.path.join(payload_dir, "manifest.json")
dump_json_file(manifest, manifest_path)
tar_path = os.path.join(tmp_root, "snapshot.tar")
with tarfile.open(tar_path, "w") as tar:
tar.add(payload_dir, arcname=".")
chunk_size = 1024 * 1024 * 50 # 50 MiB
zst_path = os.path.join(tmp_root, "snapshot.tar.zst")
# Try streaming compression first, fallback to single-shot
try:
cctx = zstd.ZstdCompressor()
with open(tar_path, "rb") as src, open(zst_path, "wb") as dst:
try:
stream = cctx.stream_writer(dst, closefd=False)
except TypeError:
stream = cctx.stream_writer(dst)
with stream as compressor:
while True:
chunk = src.read(chunk_size)
if not chunk:
break
compressor.write(chunk)
# Fallback: single-shot compression
except Exception:
with open(tar_path, "rb") as src, open(zst_path, "wb") as dst:
dst.write(zstd.compress(src.read()))
with open(zst_path, "rb") as f:
outio = io.BytesIO(f.read())
outio.seek(0)
return outio
finally:
try:
clean_dir(tmp_root)
except Exception:
pass
[docs]
@staticmethod
def restore_snapshot(
api: Api,
snapshot_bytes: bytes,
workspace_id: int,
project_name: Optional[str] = None,
with_custom_data: bool = True,
log_progress: bool = True,
progress_cb: Optional[Union[tqdm, Callable]] = None,
skip_missed: bool = False,
project_description: Optional[str] = None,
) -> ProjectInfo:
"""
Restore a video project from a snapshot and return ProjectInfo.
"""
try:
import pyarrow # pylint: disable=import-error
import pyarrow.parquet as parquet # pylint: disable=import-error
except Exception as e:
raise RuntimeError(
"pyarrow is required to restore video snapshot. Please install pyarrow."
) from e
tmp_root = tempfile.mkdtemp()
payload_dir = os.path.join(tmp_root, "payload")
mkdir(payload_dir)
try:
try:
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(io.BytesIO(snapshot_bytes)) as reader:
with tarfile.open(fileobj=reader, mode="r|") as tar:
tar.extractall(payload_dir)
except Exception:
tar_bytes = zstd.decompress(snapshot_bytes)
with tarfile.open(fileobj=io.BytesIO(tar_bytes), mode="r") as tar:
tar.extractall(payload_dir)
proj_info_path = os.path.join(payload_dir, "project_info.json")
proj_meta_path = os.path.join(payload_dir, "project_meta.json")
key_id_map_path = os.path.join(payload_dir, "key_id_map.json")
manifest_path = os.path.join(payload_dir, "manifest.json")
project_info_json = load_json_file(proj_info_path)
meta_json = load_json_file(proj_meta_path)
manifest = load_json_file(manifest_path)
meta = ProjectMeta.from_json(meta_json)
_ = KeyIdMap().load_json(key_id_map_path)
schema_version = manifest.get(VersionSchemaField.SCHEMA_VERSION) or manifest.get(
"schema_version"
)
try:
_ = get_video_snapshot_schema(schema_version)
except Exception:
raise RuntimeError(f"Unsupported video snapshot schema_version: {schema_version}")
src_project_name = project_info_json.get("name")
src_project_desc = project_info_json.get("description")
src_project_readme = project_info_json.get("readme")
if project_name is None:
project_name = src_project_name
if project_description is None:
project_description = src_project_desc
if api.project.exists(workspace_id, project_name):
project_name = api.project.get_free_name(workspace_id, project_name)
project = api.project.create(
workspace_id,
project_name,
ProjectType.VIDEOS,
project_description,
readme=src_project_readme,
)
new_meta = api.project.update_meta(project.id, meta.to_json())
is_multiview = False
try:
if new_meta.labeling_interface == LabelingInterface.MULTIVIEW:
is_multiview = True
except AttributeError:
is_multiview = False
if with_custom_data:
src_custom_data = project_info_json.get("custom_data") or {}
try:
api.project.update_custom_data(project.id, src_custom_data, silent=True)
except Exception:
logger.warning("Failed to restore project custom_data from snapshot")
if progress_cb is not None:
log_progress = False
# Datasets
ds_rows = []
datasets_path = os.path.join(payload_dir, "datasets.parquet")
if os.path.exists(datasets_path):
ds_table = parquet.read_table(datasets_path)
ds_rows = ds_table.to_pylist()
ds_rows.sort(
key=lambda r: (
r["parent_src_dataset_id"] is not None,
r["parent_src_dataset_id"],
)
)
dataset_mapping: Dict[int, DatasetInfo] = {}
for row in ds_rows:
src_ds_id = row["src_dataset_id"]
parent_src_id = row["parent_src_dataset_id"]
if parent_src_id is not None:
parent_ds = dataset_mapping.get(parent_src_id)
parent_id = parent_ds.id if parent_ds is not None else None
else:
parent_id = None
custom_data = None
if with_custom_data:
raw_cd = row.get("custom_data")
if isinstance(raw_cd, str) and raw_cd.strip():
try:
custom_data = json.loads(raw_cd)
except Exception:
logger.warning(
f"Failed to parse dataset custom_data for '{row.get('name')}', skipping it."
)
elif isinstance(raw_cd, dict):
custom_data = raw_cd
ds = api.dataset.create(
project.id,
name=row["name"],
description=row["description"],
parent_id=parent_id,
custom_data=custom_data,
)
if with_custom_data and custom_data is not None:
try:
api.dataset.update_custom_data(ds.id, custom_data)
except Exception:
logger.warning(
f"Failed to restore custom_data for dataset '{row.get('name')}'"
)
dataset_mapping[src_ds_id] = ds
# Videos
v_rows = []
videos_path = os.path.join(payload_dir, "videos.parquet")
if os.path.exists(videos_path):
v_table = parquet.read_table(videos_path)
v_rows = v_table.to_pylist()
videos_by_dataset: Dict[int, List[dict]] = {}
for row in v_rows:
src_ds_id = row["src_dataset_id"]
videos_by_dataset.setdefault(src_ds_id, []).append(row)
src_to_new_video: Dict[int, VideoInfo] = {}
for src_ds_id, rows in videos_by_dataset.items():
ds_info = dataset_mapping.get(src_ds_id)
if ds_info is None:
logger.warning(
f"Dataset with src id={src_ds_id} not found in mapping. "
f"Skipping its videos."
)
continue
dataset_id = ds_info.id
hashed_rows = [r for r in rows if r.get("hash")]
link_rows = [r for r in rows if not r.get("hash") and r.get("link")]
ds_progress = progress_cb
if log_progress and progress_cb is None:
ds_progress = tqdm_sly(
desc=f"Uploading videos to '{ds_info.name}'",
total=len(rows),
)
if hashed_rows:
if skip_missed:
existing_hashes = api.video.check_existing_hashes(
list({r["hash"] for r in hashed_rows})
)
kept_hashed_rows = [r for r in hashed_rows if r["hash"] in existing_hashes]
if not kept_hashed_rows:
logger.warning(
f"All hashed videos for dataset '{ds_info.name}' "
f"are missing on server; nothing to upload."
)
hashed_rows = kept_hashed_rows
hashes = [r["hash"] for r in hashed_rows]
names = [r["name"] for r in hashed_rows]
metas: List[dict] = []
for r in hashed_rows:
meta_dict: dict = {}
if r.get("meta"):
try:
meta_dict.update(json.loads(r["meta"]))
except Exception:
pass
metas.append(meta_dict)
if hashes:
new_infos = api.video.upload_hashes(
dataset_id,
names=names,
hashes=hashes,
metas=metas,
progress_cb=ds_progress,
)
for row, new_info in zip(hashed_rows, new_infos):
src_to_new_video[row["src_video_id"]] = new_info
if with_custom_data and row.get("custom_data"):
try:
cd = json.loads(row["custom_data"])
api.video.update_custom_data(new_info.id, cd)
except Exception:
logger.warning(
f"Failed to restore custom_data for video '{new_info.name}'"
)
if link_rows:
links = [r["link"] for r in link_rows]
names = [r["name"] for r in link_rows]
metas: List[dict] = []
for r in link_rows:
meta_dict: dict = {}
if r.get("meta"):
try:
meta_dict.update(json.loads(r["meta"]))
except Exception:
pass
metas.append(meta_dict)
new_infos_links = api.video.upload_links(
dataset_id,
links=links,
names=names,
metas=metas,
progress_cb=ds_progress,
)
for row, new_info in zip(link_rows, new_infos_links):
src_to_new_video[row["src_video_id"]] = new_info
if with_custom_data and row.get("custom_data"):
try:
cd = json.loads(row["custom_data"])
api.video.update_custom_data(new_info.id, cd)
except Exception:
logger.warning(
f"Failed to restore custom_data for video '{new_info.name}'"
)
if ds_progress is not None:
ds_progress(len(rows))
# Annotations
ann_temp_dir = os.path.join(tmp_root, "anns")
mkdir(ann_temp_dir)
anns_by_dataset: Dict[int, List[Tuple[int, str]]] = {}
for row in v_rows:
src_vid = row["src_video_id"]
new_info = src_to_new_video.get(src_vid)
if new_info is None:
continue
src_ds_id = row["src_dataset_id"]
anns_by_dataset.setdefault(src_ds_id, []).append((new_info.id, row["ann_json"]))
for src_ds_id, items in anns_by_dataset.items():
ds_info = dataset_mapping.get(src_ds_id)
if ds_info is None:
continue
video_ids: List[int] = []
ann_paths: List[str] = []
for vid_id, ann_json_str in items:
video_ids.append(vid_id)
ann_path = os.path.join(ann_temp_dir, f"{vid_id}.json")
try:
parsed = json.loads(ann_json_str)
except Exception:
logger.warning(
f"Failed to parse ann_json for restored video id={vid_id}, "
f"skipping its annotation."
)
continue
dump_json_file(parsed, ann_path)
ann_paths.append(ann_path)
if not video_ids:
continue
anns_progress = progress_cb
if log_progress and progress_cb is None:
anns_progress = tqdm_sly(
desc=f"Uploading annotations to '{ds_info.name}'",
total=len(video_ids),
leave=False,
)
key_id_map = KeyIdMap()
multiview_key_id_map = KeyIdMap()
for vid_id, ann_path in zip(video_ids, ann_paths):
try:
ann_json = load_json_file(ann_path)
ann = VideoAnnotation.from_json(
ann_json,
new_meta,
key_id_map=key_id_map,
)
except Exception as e:
logger.warning(
f"Failed to deserialize annotation for restored video id={vid_id}: {e}"
)
continue
try:
if not is_multiview:
api.video.annotation.append(vid_id, ann)
else:
api.video.annotation.upload_anns_multiview(
[vid_id], [ann], key_id_map=multiview_key_id_map
)
if anns_progress is not None:
anns_progress(1)
except Exception as e:
logger.warning(
f"Failed to upload annotation for dataset '{ds_info.name}', "
f"video id={vid_id}: {e}"
)
continue
return project
finally:
try:
clean_dir(tmp_root)
except Exception:
pass
# --------------------- #
def download_video_project(
api: Api,
project_id: int,
dest_dir: str,
dataset_ids: Optional[List[int]] = None,
download_videos: Optional[bool] = True,
save_video_info: Optional[bool] = False,
log_progress: bool = True,
progress_cb: Optional[Union[tqdm, Callable]] = None,
include_custom_data: Optional[bool] = False,
resume_download: Optional[bool] = False,
) -> None:
"""
Download video project to the local directory.
:param api: Supervisely API address and token.
:type api: :class:`~supervisely.api.api.Api`
:param project_id: Project ID to download
:type project_id: int
:param dest_dir: Destination path to local directory.
:type dest_dir: str
:param dataset_ids: Specified list of Dataset IDs which will be downloaded. Datasets could be downloaded from different projects but with the same data type.
:type dataset_ids: list(int), optional
:param download_videos: Include videos in the download.
:type download_videos: bool, optional
:param save_video_info: Include video info in the download.
:type save_video_info: bool, optional
:param log_progress: Show downloading logs in the output.
:type log_progress: bool
:param progress_cb: Function for tracking the download progress.
:type progress_cb: tqdm or callable, optional
:returns: None.
:rtype: NoneType
:Usage Example:
.. code-block:: python
import os
from tqdm import tqdm
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
dest_dir = 'your/local/dest/dir'
# Download video project
project_id = 17758
project_info = api.project.get_info_by_id(project_id)
num_videos = project_info.items_count
p = tqdm(desc="Downloading video project", total=num_videos)
sly.download(
api,
project_id,
dest_dir,
progress_cb=p,
)
"""
LOG_BATCH_SIZE = 1
key_id_map = KeyIdMap()
project_fs = None
meta = ProjectMeta.from_json(api.project.get_meta(project_id, with_settings=True))
if os.path.exists(dest_dir) and resume_download:
dump_json_file(meta.to_json(), os.path.join(dest_dir, "meta.json"))
try:
project_fs = VideoProject(dest_dir, OpenMode.READ)
except RuntimeError as e:
if "Project is empty" in str(e):
clean_dir(dest_dir)
project_fs = None
else:
raise
if project_fs is None:
project_fs = VideoProject(dest_dir, OpenMode.CREATE)
project_fs.set_meta(meta)
if progress_cb is not None:
log_progress = False
dataset_ids = set(dataset_ids) if (dataset_ids is not None) else None
existing_datasets = {dataset.path: dataset for dataset in project_fs.datasets}
for parents, dataset in api.dataset.tree(project_id):
if dataset_ids is not None and dataset.id not in dataset_ids:
continue
dataset_path = Dataset._get_dataset_path(dataset.name, parents)
if dataset_path in existing_datasets:
dataset_fs = existing_datasets[dataset_path]
else:
dataset_fs = project_fs.create_dataset(dataset.name, dataset_path)
videos = api.video.get_list(dataset.id)
ds_progress = progress_cb
if log_progress:
ds_progress = tqdm_sly(
desc="Downloading videos from {!r}".format(dataset.name),
total=len(videos),
)
for batch in batched(videos, batch_size=LOG_BATCH_SIZE):
video_ids = [video_info.id for video_info in batch]
video_names = [video_info.name for video_info in batch]
custom_datas = [video_info.custom_data for video_info in batch]
try:
ann_jsons = api.video.annotation.download_bulk(dataset.id, video_ids)
except Exception as e:
logger.info(
"INFO FOR DEBUGGING",
extra={
"project_id": project_id,
"dataset_id": dataset.id,
"video_ids": video_ids,
},
)
raise e
for video_id, video_name, custom_data, ann_json, video_info in zip(
video_ids, video_names, custom_datas, ann_jsons, batch
):
if video_name != ann_json[ApiField.VIDEO_NAME]:
raise RuntimeError("Error in api.video.annotation.download_batch: broken order")
video_file_path = dataset_fs.generate_item_path(video_name)
if include_custom_data:
CUSTOM_DATA_DIR = os.path.join(dest_dir, dataset.name, "custom_data")
mkdir(CUSTOM_DATA_DIR)
custom_data_path = os.path.join(CUSTOM_DATA_DIR, f"{video_name}.json")
dump_json_file(custom_data, custom_data_path)
if download_videos:
try:
video_file_size = video_info.file_meta.get("size")
if ds_progress is not None and video_file_size is not None:
item_progress = tqdm_sly(
desc=f"Downloading '{video_name}'",
total=int(video_file_size),
unit="B",
unit_scale=True,
leave=False,
)
api.video.download_path(video_id, video_file_path, item_progress)
else:
api.video.download_path(video_id, video_file_path)
except Exception as e:
logger.info(
"INFO FOR DEBUGGING",
extra={
"project_id": project_id,
"dataset_id": dataset.id,
"video_id": video_id,
"video_file_path": video_file_path,
},
)
raise e
else:
touch(video_file_path)
item_info = video_info._asdict() if save_video_info else None
try:
video_ann = VideoAnnotation.from_json(ann_json, project_fs.meta, key_id_map)
except Exception as e:
logger.info(
"INFO FOR DEBUGGING",
extra={
"project_id": project_id,
"dataset_id": dataset.id,
"video_id": video_id,
"video_name": video_name,
"ann_json": ann_json,
},
)
raise e
try:
dataset_fs.add_item_file(
video_name,
video_file_path,
ann=video_ann,
_validate_item=False,
_use_hardlink=True,
item_info=item_info,
)
except Exception as e:
logger.info(
"INFO FOR DEBUGGING",
extra={
"project_id": project_id,
"dataset_id": dataset.id,
"video_id": video_id,
"video_name": video_name,
"video_file_path": video_file_path,
"item_info": item_info,
},
)
raise e
if progress_cb is not None:
progress_cb(1)
if log_progress:
ds_progress(len(batch))
project_fs.set_key_id_map(key_id_map)
def upload_video_project(
dir: str,
api: Api,
workspace_id: int,
project_name: Optional[str] = None,
log_progress: bool = True,
include_custom_data: Optional[bool] = False,
progress_cb: Optional[Union[tqdm, Callable]] = None,
) -> Tuple[int, str]:
project_fs = VideoProject.read_single(dir)
if project_name is None:
project_name = project_fs.name
is_multiview = False
try:
if project_fs.meta.labeling_interface == LabelingInterface.MULTIVIEW:
is_multiview = True
except AttributeError:
is_multiview = False
if api.project.exists(workspace_id, project_name):
project_name = api.project.get_free_name(workspace_id, project_name)
project = api.project.create(workspace_id, project_name, ProjectType.VIDEOS)
project_meta = api.project.update_meta(project.id, project_fs.meta.to_json())
if progress_cb is not None:
log_progress = False
dataset_map = {}
for dataset_fs in project_fs.datasets:
dataset_fs: VideoDataset
if len(dataset_fs.parents) > 0:
parent = f"{os.path.sep}".join(dataset_fs.parents)
parent_id = dataset_map.get(parent)
else:
parent = ""
parent_id = None
dataset = api.dataset.create(project.id, dataset_fs.short_name, parent_id=parent_id)
dataset_map[os.path.join(parent, dataset.name)] = dataset.id
names, item_paths, ann_paths, metas = [], [], [], []
for item_name in dataset_fs:
video_path, ann_path = dataset_fs.get_item_paths(item_name)
names.append(item_name)
item_paths.append(video_path)
ann_paths.append(ann_path)
# Read video metadata from metadata folder (includes offset for multiview)
meta = None
metadata_path = os.path.join(dataset_fs.metadata_directory, f"{item_name}.meta.json")
if os.path.exists(metadata_path):
try:
meta = load_json_file(metadata_path)
except Exception:
pass
metas.append(meta)
if len(item_paths) == 0:
continue
ds_progress = progress_cb
if log_progress is True:
ds_progress = tqdm_sly(
desc="Uploading videos to {!r}".format(dataset.name),
total=len(item_paths),
position=0,
)
try:
item_infos = api.video.upload_paths(
dataset.id, names, item_paths, ds_progress, metas=metas
)
video_ids = [item_info.id for item_info in item_infos]
if include_custom_data:
for item_info in item_infos:
item_name = item_info.name
custom_data_path = os.path.join(
dir, dataset_fs.name, "custom_data", f"{item_name}.json"
)
if os.path.exists(custom_data_path):
custom_data = load_json_file(custom_data_path)
api.video.update_custom_data(item_info.id, custom_data)
except Exception as e:
logger.info(
"INFO FOR DEBUGGING",
extra={
"project_id": project.id,
"dataset_id": dataset.id,
"names": names,
"item_paths": item_paths,
},
)
raise e
anns_progress = None
if log_progress or progress_cb is not None:
anns_progress = tqdm_sly(
desc="Uploading annotations to {!r}".format(dataset.name),
total=len(video_ids),
leave=False,
)
try:
if is_multiview:
api.video.annotation.upload_paths_multiview(
video_ids, ann_paths, project_meta, anns_progress
)
else:
api.video.annotation.upload_paths(
video_ids, ann_paths, project_fs.meta, anns_progress
)
except Exception as e:
logger.info(
"INFO FOR DEBUGGING",
extra={
"project_id": project.id,
"dataset_id": dataset.id,
"item_ids": video_ids,
"ann_paths": ann_paths,
},
)
raise e
return project.id, project.name
async def download_video_project_async(
api: Api,
project_id: int,
dest_dir: str,
semaphore: Optional[Union[asyncio.Semaphore, int]] = None,
dataset_ids: Optional[List[int]] = None,
download_videos: Optional[bool] = True,
save_video_info: Optional[bool] = False,
log_progress: bool = True,
progress_cb: Optional[Union[tqdm, Callable]] = None,
include_custom_data: Optional[bool] = False,
resume_download: Optional[bool] = False,
**kwargs,
) -> None:
"""
Download video project to the local directory.
:param api: Supervisely API address and token.
:type api: :class:`~supervisely.api.api.Api`
:param project_id: Project ID to download
:type project_id: int
:param dest_dir: Destination path to local directory.
:type dest_dir: str
:param semaphore: Semaphore to limit the number of simultaneous downloads of items.
:type semaphore: asyncio.Semaphore or int, optional
:param dataset_ids: Specified list of Dataset IDs which will be downloaded. Datasets could be downloaded from different projects but with the same data type.
:type dataset_ids: list(int), optional
:param download_videos: Include videos in the download.
:type download_videos: bool, optional
:param save_video_info: Include video info in the download.
:type save_video_info: bool, optional
:param log_progress: Show downloading logs in the output.
:type log_progress: bool
:param progress_cb: Function for tracking the download progress.
:type progress_cb: tqdm or callable, optional
:param include_custom_data: Include custom data in the download.
:type include_custom_data: bool, optional
:returns: None.
:rtype: NoneType
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
dest_dir = 'your/local/dest/dir'
project_id = 17758
loop = sly.utils.get_or_create_event_loop()
loop.run_until_complete(
sly.download_async(api, project_id, dest_dir)
)
"""
if semaphore is None:
semaphore = api.get_default_semaphore()
elif isinstance(semaphore, int):
semaphore = asyncio.Semaphore(semaphore)
key_id_map = KeyIdMap()
project_fs = None
meta = ProjectMeta.from_json(api.project.get_meta(project_id, with_settings=True))
if os.path.exists(dest_dir) and resume_download:
dump_json_file(meta.to_json(), os.path.join(dest_dir, "meta.json"))
try:
project_fs = VideoProject(dest_dir, OpenMode.READ)
except RuntimeError as e:
if "Project is empty" in str(e):
clean_dir(dest_dir)
project_fs = None
else:
raise
if project_fs is None:
project_fs = VideoProject(dest_dir, OpenMode.CREATE)
project_fs.set_meta(meta)
if progress_cb is not None:
log_progress = False
dataset_ids = set(dataset_ids) if (dataset_ids is not None) else None
for parents, dataset in api.dataset.tree(project_id):
if dataset_ids is not None and dataset.id not in dataset_ids:
continue
dataset_path = Dataset._get_dataset_path(dataset.name, parents)
existing_datasets = {dataset.path: dataset for dataset in project_fs.datasets}
if dataset_path in existing_datasets:
dataset_fs = existing_datasets[dataset_path]
else:
dataset_fs = project_fs.create_dataset(dataset.name, dataset_path)
videos = api.video.get_list(dataset.id)
if log_progress is True:
progress_cb = tqdm_sly(
desc="Downloading videos from {!r}".format(dataset.name),
total=len(videos),
)
tasks = []
for video in videos:
task = _download_project_item_async(
api=api,
video=video,
semaphore=semaphore,
dataset=dataset,
dest_dir=dest_dir,
project_fs=project_fs,
key_id_map=key_id_map,
dataset_fs=dataset_fs,
include_custom_data=include_custom_data,
download_videos=download_videos,
save_video_info=save_video_info,
progress_cb=progress_cb,
)
tasks.append(task)
await asyncio.gather(*tasks)
project_fs.set_key_id_map(key_id_map)
def _log_warning(
video: VideoInfo,
video_file_path: Optional[str] = None,
ann_json: Optional[dict] = None,
item_info: Optional[dict] = None,
):
"""
This function logs a warning with additional information for debugging.
It is used in the _download_project_item_async function.
"""
logger.info(
"INFO FOR DEBUGGING",
extra={
"project_id": video.project_id,
"dataset_id": video.dataset_id,
"video_id": video.id,
"video_name": video.name,
"video_file_path": video_file_path,
"ann_json": ann_json,
"item_info": item_info,
},
)
async def _download_project_item_async(
api: Api,
video: VideoInfo,
semaphore: Union[asyncio.Semaphore, int],
dataset: DatasetInfo,
dest_dir: str,
project_fs: Project,
key_id_map: KeyIdMap,
dataset_fs: VideoDataset,
include_custom_data: bool,
download_videos: bool,
save_video_info: bool = False,
progress_cb: Optional[Union[tqdm, Callable]] = None,
) -> None:
"""
This function downloads a video item from the project in Supervisely platform asynchronously.
"""
if isinstance(semaphore, int):
semaphore = asyncio.Semaphore(semaphore)
try:
ann_json = await api.video.annotation.download_async(video.id, video, semaphore=semaphore)
ann_json = ann_json[0]
except Exception as e:
_log_warning(video)
raise e
video_file_path = dataset_fs.generate_item_path(video.name)
if include_custom_data:
CUSTOM_DATA_DIR = os.path.join(dest_dir, dataset.name, "custom_data")
mkdir(CUSTOM_DATA_DIR)
custom_data_path = os.path.join(CUSTOM_DATA_DIR, f"{video.name}.json")
await dump_json_file_async(video.custom_data, custom_data_path)
if download_videos:
try:
video_file_size = video.file_meta.get("size")
if progress_cb is not None and video_file_size is not None:
item_progress = tqdm_sly(
desc=f"Downloading '{video.name}'",
total=int(video_file_size),
unit="B",
unit_scale=True,
leave=False,
)
await api.video.download_path_async(
video.id,
video_file_path,
semaphore=semaphore,
progress_cb=item_progress,
progress_cb_type="size",
)
else:
await api.video.download_path_async(video.id, video_file_path, semaphore=semaphore)
except Exception as e:
_log_warning(video, video_file_path)
raise e
else:
await touch_async(video_file_path)
item_info = video._asdict() if save_video_info else None
try:
video_ann = VideoAnnotation.from_json(ann_json, project_fs.meta, key_id_map)
except Exception as e:
_log_warning(video, ann_json=ann_json)
raise e
try:
await dataset_fs.add_item_file_async(
video.name,
None,
ann=video_ann,
_validate_item=False,
_use_hardlink=True,
item_info=item_info,
)
except Exception as e:
_log_warning(video, video_file_path, item_info)
raise e
if progress_cb is not None:
progress_cb(1)