Source code for supervisely.project.project

# coding: utf-8

from __future__ import annotations

import asyncio
import io
import json
import os
import pickle
import random
import shutil
from collections import defaultdict, namedtuple
from enum import Enum
from pathlib import Path
from typing import (
    Callable,
    Dict,
    Generator,
    List,
    Literal,
    NamedTuple,
    Optional,
    Tuple,
    Union,
)

import aiofiles
import numpy as np
from tqdm import tqdm

import supervisely as sly
from supervisely._utils import (
    abs_url,
    batched,
    get_or_create_event_loop,
    is_development,
    removesuffix,
    snake_to_human,
)
from supervisely.annotation.annotation import ANN_EXT, Annotation, TagCollection
from supervisely.annotation.obj_class import ObjClass
from supervisely.annotation.obj_class_collection import ObjClassCollection
from supervisely.api.api import Api, ApiContext, ApiField
from supervisely.api.image_api import (
    OFFSETS_PKL_BATCH_SIZE,
    OFFSETS_PKL_SUFFIX,
    BlobImageInfo,
    ImageInfo,
)
from supervisely.api.project_api import ProjectInfo
from supervisely.collection.key_indexed_collection import (
    KeyIndexedCollection,
    KeyObject,
)
from supervisely.geometry.bitmap import Bitmap
from supervisely.imaging import image as sly_image
from supervisely.io.fs import (
    clean_dir,
    copy_file,
    copy_file_async,
    dir_empty,
    dir_exists,
    ensure_base_path,
    file_exists,
    get_file_name_with_ext,
    list_dir_recursively,
    list_files,
    list_files_recursively,
    mkdir,
    silent_remove,
    subdirs_tree,
)
from supervisely.io.fs_cache import FileCache
from supervisely.io.json import dump_json_file, dump_json_file_async, load_json_file
from supervisely.project.project_meta import ProjectMeta
from supervisely.project.project_type import ProjectType
from supervisely.sly_logger import logger
from supervisely.task.progress import tqdm_sly

TF_BLOB_DIR = "blob-files"  # directory for project blob files in team files


class CustomUnpickler(pickle.Unpickler):
    """
    Custom Unpickler for loading pickled objects of the same class with differing definitions.
    Handles cases where a class object is reconstructed using a newer definition with additional fields
    or an outdated definition missing some fields.
    Supports loading namedtuple objects with missing or extra fields.
    """

    def __init__(self, file, **kwargs):
        super().__init__(file, **kwargs)
        self.warned_classes = set()  # To prevent multiple warnings for the same class
        self.sdk_update_notified = False

    def find_class(self, module, name):
        prefix = "Pickled"
        cls = super().find_class(module, name)
        if hasattr(cls, "_fields") and "Info" in cls.__name__:
            orig_new = cls.__new__

            def new(cls, *args, **kwargs):
                orig_class_name = cls.__name__[len(prefix) :]
                # Case when new definition of class has more fields than the old one
                if len(args) < len(cls._fields):
                    default_values = cls._field_defaults
                    # Set missed attrs to None
                    num_missing = len(cls._fields) - len(args)
                    args = list(args) + [None] * num_missing
                    # Replace only the added None values with default values where applicable
                    args[-num_missing:] = [
                        (
                            default_values.get(field, arg)
                            if arg is None and field in default_values
                            else arg
                        )
                        for field, arg in zip(cls._fields[-num_missing:], args[-num_missing:])
                    ]
                    if orig_class_name not in self.warned_classes:
                        new_fields = cls._fields[len(cls._fields) - num_missing :]
                        logger.warning(
                            f"New fields {new_fields} for the '{orig_class_name}' class objects are set to their default values or None due to an updated definition of this class."
                        )
                        self.warned_classes.add(orig_class_name)
                # Case when the object of new class definition creating within old class definition
                elif len(args) > len(cls._fields):
                    end_index = len(args)
                    args = args[: len(cls._fields)]
                    if orig_class_name not in self.warned_classes:
                        logger.warning(
                            f"Extra fields idx {list(range(len(cls._fields), end_index))} are ignored for '{orig_class_name}' class objects due to an outdated class definition"
                        )
                        self.warned_classes.add(orig_class_name)
                        if not self.sdk_update_notified:
                            logger.warning(
                                "It is recommended to update the SDK version to restore the project version correctly."
                            )
                            self.sdk_update_notified = True
                return orig_new(cls, *args, **kwargs)

            # Create a new subclass dynamically to prevent redefining the current class
            NewCls = type(f"{prefix}{cls.__name__}", (cls,), {"__new__": new})
            return NewCls

        return cls


# @TODO: rename img_path to item_path (maybe convert namedtuple to class and create fields and props)
[docs]class ItemPaths(NamedTuple): #: :class:`str`: Full image file path of item img_path: str #: :class:`str`: Full annotation file path of item ann_path: str
[docs]class ItemInfo(NamedTuple): #: :class:`str`: Item's dataset name dataset_name: str #: :class:`str`: Item name name: str #: :class:`str`: Full image file path of item img_path: str #: :class:`str`: Full annotation file path of item ann_path: str
[docs]class OpenMode(Enum): """ Defines the mode of using the :class:`Project<Project>` and :class:`Dataset<Dataset>`. """ #: :class:`int`: READ open mode. #: Loads project from given project directory. Checks that item and annotation directories #: exist and dataset is not empty. Consistency checks. Checks that every image has #: an annotation and the correspondence is one to one. READ = 1 #: :class:`int`: CREATE open mode. #: Creates a leaf directory and empty meta.json file. Generates error if #: project directory already exists and is not empty. CREATE = 2
def _get_effective_ann_name(img_name, ann_names): new_format_name = img_name + ANN_EXT if new_format_name in ann_names: return new_format_name else: old_format_name = os.path.splitext(img_name)[0] + ANN_EXT return old_format_name if (old_format_name in ann_names) else None
[docs]class Dataset(KeyObject): """ Dataset is where your labeled and unlabeled images and other data files live. :class:`Dataset<Dataset>` object is immutable. :param directory: Path to dataset directory. :type directory: str :param mode: Determines working mode for the given dataset. :type mode: :class:`OpenMode<OpenMode>`, optional. If not provided, dataset_id must be provided. :param parents: List of parent directories, e.g. ["ds1", "ds2", "ds3"]. :type parents: List[str] :param dataset_id: Dataset ID if the Dataset is opened in API mode. If dataset_id is specified then api must be specified as well. :type dataset_id: Optional[int] :param api: API object if the Dataset is opened in API mode. :type api: Optional[:class:`Api<supervis :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" # To open dataset locally in read mode ds = sly.Dataset(dataset_path, sly.OpenMode.READ) # To open dataset on API api = sly.Api.from_env() ds = sly.Dataset(dataset_path, dataset_id=1, api=api) """ annotation_class = Annotation item_info_class = ImageInfo item_dir_name = "img" ann_dir_name = "ann" item_info_dir_name = "img_info" seg_dir_name = "seg" meta_dir_name = "meta" datasets_dir_name = "datasets" blob_dir_name = "blob" def __init__( self, directory: str, mode: Optional[OpenMode] = None, parents: Optional[List[str]] = None, dataset_id: Optional[int] = None, api: Optional[sly.Api] = None, ): if dataset_id is not None: raise NotImplementedError( "Opening dataset from the API is not implemented yet. Please use the local mode " "by providing the 'directory' and 'mode' arguments." "This feature will be available later." ) if type(mode) is not OpenMode and mode is not None: raise TypeError( "Argument 'mode' has type {!r}. Correct type is OpenMode".format(type(mode)) ) if mode is None and dataset_id is None: raise ValueError("Either 'mode' or 'dataset_id' must be provided") if dataset_id is not None and api is None: raise ValueError("Argument 'api' must be provided if 'dataset_id' is provided") self.parents = parents or [] self.dataset_id = dataset_id self._api = api self._directory = directory self._item_to_ann = {} # item file name -> annotation file name parts = directory.split(os.path.sep) if self.datasets_dir_name not in parts: project_dir, ds_name = os.path.split(directory.rstrip("/")) full_ds_name = short_ds_name = ds_name else: nested_ds_dir_index = parts.index(self.datasets_dir_name) ds_dir_index = nested_ds_dir_index - 1 project_dir = os.path.join(*parts[:ds_dir_index]) full_ds_name = os.path.join( *[p for p in parts[ds_dir_index:] if p != self.datasets_dir_name] ) short_ds_name = os.path.basename(directory) self._project_dir = project_dir self._name = full_ds_name self._short_name = short_ds_name self._blob_offset_paths = [] if self.dataset_id is not None: self._read_api() elif mode is OpenMode.READ: self._read() else: self._create() @classmethod def ignorable_dirs(cls) -> List[str]: ignorable_dirs = [getattr(cls, attr) for attr in dir(cls) if attr.endswith("_dir_name")] return [p for p in ignorable_dirs if isinstance(p, str)] @classmethod def datasets_dir(cls) -> List[str]: return cls.datasets_dir_name @property def project_dir(self) -> str: """ Path to the project containing the dataset. :return: Path to the project. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds0" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.project_dir) # Output: "/home/admin/work/supervisely/projects/lemons_annotated" """ return self._project_dir @property def name(self) -> str: """ Full Dataset name, which includes it's parents, e.g. ds1/ds2/ds3. Use :attr:`short_name` to get only the name of the dataset. :return: Dataset Name. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.name) # Output: "ds1" """ return self._name @property def short_name(self) -> str: """ Short dataset name, which does not include it's parents. To get the full name of the dataset, use :attr:`name`. :return: Dataset Name. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.name) # Output: "ds1" """ return self._short_name @property def path(self) -> str: """Returns a relative local path to the dataset. :return: Relative local path to the dataset. :rtype: :class:`str` """ return self._get_dataset_path(self.short_name, self.parents) @staticmethod def _get_dataset_path(dataset_name: str, parents: List[dir]): """Returns a relative local path to the dataset. :param dataset_name: Dataset name. :type dataset_name: :class:`str` """ relative_path = os.path.sep.join(f"{parent}/datasets" for parent in parents) return os.path.join(relative_path, dataset_name) def key(self): # TODO: add docstring return self.name @property def directory(self) -> str: """ Path to the dataset directory. :return: Path to the dataset directory. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.directory) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1' """ return self._directory @property def item_dir(self) -> str: """ Path to the dataset items directory. :return: Path to the dataset directory with items. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.item_dir) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/img' """ return os.path.join(self.directory, self.item_dir_name) @property def img_dir(self) -> str: """ Path to the dataset images directory. Property is alias of item_dir. :return: Path to the dataset directory with images. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.img_dir) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/img' """ return self.item_dir @property def ann_dir(self) -> str: """ Path to the dataset annotations directory. :return: Path to the dataset directory with annotations. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.ann_dir) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/ann' """ return os.path.join(self.directory, self.ann_dir_name) @property def img_info_dir(self): """ Path to the dataset image info directory. Property is alias of item_info_dir. :return: Path to the dataset directory with images info. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.img_info_dir) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/img_info' """ return self.item_info_dir @property def item_info_dir(self): """ Path to the dataset item info directory. :return: Path to the dataset directory with items info. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.item_info_dir) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/img_info' """ return os.path.join(self.directory, self.item_info_dir_name) @property def seg_dir(self): """ Path to the dataset segmentation masks directory. :return: Path to the dataset directory with masks. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.seg_dir) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/seg' """ return os.path.join(self.directory, self.seg_dir_name) @property def meta_dir(self): """ Path to the dataset segmentation masks directory. :return: Path to the dataset directory with masks. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.meta_dir) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/meta' """ return os.path.join(self.directory, self.meta_dir_name) @property def blob_offsets(self): """ List of paths to the dataset blob offset files. :return: List of paths to the dataset blob offset files. :rtype: :class:`List[str]` """ return self._blob_offset_paths @blob_offsets.setter def blob_offsets(self, value: List[str]): """ Set the list of paths to the dataset blob offset files. """ self._blob_offset_paths = value @classmethod def _has_valid_ext(cls, path: str) -> bool: """ The function _has_valid_ext checks if a given file has a supported extension('.jpg', '.jpeg', '.mpo', '.bmp', '.png', '.webp') :param path: the path to the file :return: bool (True if a given file has a supported extension, False - in otherwise) """ return sly_image.has_valid_ext(path) def _read(self): """ Fills out the dictionary items: item file name -> annotation file name. Checks item and annotation directoris existing and dataset not empty. Consistency checks. Every item must have an annotation, and the correspondence must be one to one. If not - it generate exception error. """ blob_offset_paths = list_files( self.directory, filter_fn=lambda x: x.endswith(OFFSETS_PKL_SUFFIX) ) has_blob_offsets = len(blob_offset_paths) > 0 if not dir_exists(self.item_dir) and not has_blob_offsets: raise FileNotFoundError("Item directory not found: {!r}".format(self.item_dir)) if not dir_exists(self.ann_dir): raise FileNotFoundError("Annotation directory not found: {!r}".format(self.ann_dir)) raw_ann_paths = list_files(self.ann_dir, [ANN_EXT]) raw_ann_names = set(os.path.basename(path) for path in raw_ann_paths) if dir_exists(self.item_dir): img_paths = list_files(self.item_dir, filter_fn=self._has_valid_ext) img_names = [os.path.basename(path) for path in img_paths] else: img_names = [] # If we have blob offset files, add the image names from those if has_blob_offsets: self.blob_offsets = blob_offset_paths for offset_file_path in self.blob_offsets: try: blob_img_info_lists = BlobImageInfo.load_from_pickle_generator(offset_file_path) for blob_img_info_list in blob_img_info_lists: for blob_img_info in blob_img_info_list: img_names.append(blob_img_info.name) except Exception as e: logger.warning(f"Failed to read blob offset file {offset_file_path}: {str(e)}") if len(img_names) == 0 and len(raw_ann_names) == 0: logger.debug(f"Dataset '{self.name}' is empty") # raise RuntimeError("Dataset {!r} is empty".format(self.name)) if len(img_names) == 0: # items_names polyfield img_names = [os.path.splitext(ann_name)[0] for ann_name in raw_ann_names] # Consistency checks. Every image must have an annotation, and the correspondence must be one to one. effective_ann_names = set() for img_name in img_names: ann_name = _get_effective_ann_name(img_name, raw_ann_names) if ann_name is None: raise RuntimeError( "Item {!r} in dataset {!r} does not have a corresponding annotation file.".format( img_name, self.name ) ) if ann_name in effective_ann_names: raise RuntimeError( "Annotation file {!r} in dataset {!r} matches two different image files.".format( ann_name, self.name ) ) effective_ann_names.add(ann_name) self._item_to_ann[img_name] = ann_name def _read_api(self) -> None: """Method to read the dataset, which opened from the API.""" self._image_infos = self._api.image.get_list(self.dataset_id) img_names = [img_info.name for img_info in self._image_infos] for img_name in img_names: ann_name = f"{img_name}.json" self._item_to_ann[img_name] = ann_name @property def image_infos(self) -> List[ImageInfo]: """If the dataset is opened from the API, returns the list of ImageInfo objects. Otherwise raises an exception. :raises: ValueError: If the dataset is opened in local mode. :return: List of ImageInfo objects. :rtype: List[:class:`ImageInfo`] """ if not self.dataset_id: raise ValueError( "This dataset was open in local mode. It does not have access to the API." ) return self._image_infos def _create(self): """ Creates a leaf directory and all intermediate ones for items and annotations. """ mkdir(self.ann_dir) mkdir(self.item_dir)
[docs] def get_items_names(self) -> list: """ List of dataset item names. :return: List of item names. :rtype: :class:`list` [ :class:`str` ] :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.get_item_names()) # Output: ['IMG_0002.jpg', 'IMG_0005.jpg', 'IMG_0008.jpg', ...] """ return list(self._item_to_ann.keys())
[docs] def item_exists(self, item_name: str) -> bool: """ Checks if given item name belongs to the dataset. :param item_name: Item name. :type item_name: :class:`str` :return: True if item exist, otherwise False. :rtype: :class:`bool` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) ds.item_exists("IMG_0748") # False ds.item_exists("IMG_0748.jpeg") # True """ return item_name in self._item_to_ann
[docs] def get_item_path(self, item_name: str) -> str: """ Path to the given item. :param item_name: Item name. :type item_name: :class:`str` :return: Path to the given item. :rtype: :class:`str` :raises: :class:`RuntimeError` if item not found in the project :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.get_item_path("IMG_0748")) # Output: RuntimeError: Item IMG_0748 not found in the project. print(ds.get_item_path("IMG_0748.jpeg")) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/img/IMG_0748.jpeg' """ if not self.item_exists(item_name): raise RuntimeError("Item {} not found in the project.".format(item_name)) return os.path.join(self.item_dir, item_name)
[docs] def get_img_path(self, item_name: str) -> str: """ Path to the given image. Method is alias of get_item_path(item_name). :param item_name: Image name. :type item_name: :class:`str` :return: Path to the given image. :rtype: :class:`str` :raises: :class:`RuntimeError` if item not found in the project. :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.get_img_path("IMG_0748")) # Output: RuntimeError: Item IMG_0748 not found in the project. print(ds.get_img_path("IMG_0748.jpeg")) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/ann/IMG_0748.jpeg.json' """ return self.get_item_path(item_name)
[docs] def get_ann(self, item_name, project_meta: ProjectMeta) -> Annotation: """ Read annotation of item from json. :param item_name: Item name. :type item_name: :class:`str` :param project_meta: ProjectMeta object. :type project_meta: :class:`ProjectMeta<supervisely.project.project_meta.ProjectMeta>` :return: Annotation object. :rtype: :class:`Annotation<supervisely.annotation.annotation.Annotation>` :raises: :class:`RuntimeError` if item not found in the project :Usage example: .. code-block:: python import supervisely as sly project_path = "/home/admin/work/supervisely/projects/lemons_annotated" project = sly.Project(project_path, sly.OpenMode.READ) ds = project.datasets.get('ds1') annotation = ds.get_ann("IMG_0748", project.meta) # Output: RuntimeError: Item IMG_0748 not found in the project. annotation = ds.get_ann("IMG_0748.jpeg", project.meta) print(annotation.to_json()) # Output: { # "description": "", # "size": { # "height": 500, # "width": 700 # }, # "tags": [], # "objects": [], # "customBigData": {} # } """ ann_path = self.get_ann_path(item_name) return self.annotation_class.load_json_file(ann_path, project_meta)
[docs] def get_ann_path(self, item_name: str) -> str: """ Path to the given annotation json file. :param item_name: Item name. :type item_name: :class:`str` :return: Path to the given annotation json file. :rtype: :class:`str` :raises: :class:`RuntimeError` if item not found in the project :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.get_ann_path("IMG_0748")) # Output: RuntimeError: Item IMG_0748 not found in the project. print(ds.get_ann_path("IMG_0748.jpeg")) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/ann/IMG_0748.jpeg.json' """ ann_path = self._item_to_ann.get(item_name, None) if ann_path is None: raise RuntimeError("Item {} not found in the project.".format(item_name)) ann_path = ann_path.strip("/") return os.path.join(self.ann_dir, ann_path)
[docs] def get_img_info_path(self, img_name: str) -> str: """ Get path to the image info json file without checking if the file exists. Method is alias of get_item_info_path(item_name). :param item_name: Image name. :type item_name: :class:`str` :return: Path to the given image info json file. :rtype: :class:`str` :raises: :class:`RuntimeError` if image not found in the project. :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.get_img_info_path("IMG_0748")) # Output: RuntimeError: Item IMG_0748 not found in the project. print(ds.get_img_info_path("IMG_0748.jpeg")) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/img_info/IMG_0748.jpeg.json' """ return self.get_item_info_path(img_name)
[docs] def get_item_info_path(self, item_name: str) -> str: """ Get path to the item info json file without checking if the file exists. :param item_name: Item name. :type item_name: :class:`str` :return: Path to the given item info json file. :rtype: :class:`str` :raises: :class:`RuntimeError` if item not found in the project. :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.get_item_info_path("IMG_0748")) # Output: RuntimeError: Item IMG_0748 not found in the project. print(ds.get_item_info_path("IMG_0748.jpeg")) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/img_info/IMG_0748.jpeg.json' """ info_path = self._item_to_ann.get(item_name, None) if info_path is None: raise RuntimeError("Item {} not found in the project.".format(item_name)) return os.path.join(self.item_info_dir, info_path)
[docs] def get_item_meta_path(self, item_name: str) -> str: """ Get path to the item info json file without checking if the file exists. :param item_name: Item name. :type item_name: :class:`str` :return: Path to the given item info json file. :rtype: :class:`str` :raises: :class:`RuntimeError` if item not found in the project. :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.get_item_info_path("IMG_0748")) # Output: RuntimeError: Item IMG_0748 not found in the project. print(ds.get_item_info_path("IMG_0748.jpeg")) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/img_info/IMG_0748.jpeg.json' """ meta_path = self._item_to_ann.get(item_name, None) return os.path.join(self.meta_dir, meta_path)
[docs] def get_image_info(self, item_name: str) -> ImageInfo: """ Information for Item with given name. :param item_name: Item name. :type item_name: :class:`str` :return: ImageInfo object. :rtype: :class:`ImageInfo<supervisely.api.image_api.ImageInfo>` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds0" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.get_image_info("IMG_0748.jpeg")) # Output: # ImageInfo( # id=770915, # name='IMG_0748.jpeg', # link=None, # hash='ZdpMD+ZMJx0R8BgsCzJcqM7qP4M8f1AEtoYc87xZmyQ=', # mime='image/jpeg', # ext='jpeg', # size=148388, # width=1067, # height=800, # labels_count=4, # dataset_id=2532, # created_at='2021-03-02T10:04:33.973Z', # updated_at='2021-03-02T10:04:33.973Z', # meta={}, # path_original='/h5un6l2bnaz1vj8a9qgms4-public/images/original/7/h/Vo/...jpeg', # full_storage_url='http://app.supervisely.com/h5un6l2bnaz1vj8a9qgms4-public/images/original/7/h/Vo/...jpeg'), # tags=[] # ) """ return self.get_item_info(item_name)
[docs] def get_item_info(self, item_name: str) -> ImageInfo: """ Information for Item with given name. :param item_name: Item name. :type item_name: :class:`str` :return: ImageInfo object. :rtype: :class:`ImageInfo<supervisely.api.image_api.ImageInfo>` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds0" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.get_item_info("IMG_0748.jpeg")) # Output: # ImageInfo( # id=770915, # name='IMG_0748.jpeg', # link=None, # hash='ZdpMD+ZMJx0R8BgsCzJcqM7qP4M8f1AEtoYc87xZmyQ=', # mime='image/jpeg', # ext='jpeg', # size=148388, # width=1067, # height=800, # labels_count=4, # dataset_id=2532, # created_at='2021-03-02T10:04:33.973Z', # updated_at='2021-03-02T10:04:33.973Z', # meta={}, # path_original='/h5un6l2bnaz1vj8a9qgms4-public/images/original/7/h/Vo/...jpeg', # full_storage_url='http://app.supervisely.com/h5un6l2bnaz1vj8a9qgms4-public/images/original/7/h/Vo/...jpeg'), # tags=[] # ) """ item_info_path = self.get_item_info_path(item_name) item_info_dict = load_json_file(item_info_path) item_info_named_tuple = namedtuple(self.item_info_class.__name__, item_info_dict) return item_info_named_tuple(**item_info_dict)
[docs] def get_seg_path(self, item_name: str) -> str: """ Get path to the png segmentation mask file without checking if the file exists. Use :class:`Project.to_segmentation_task()<supervisely.project.project.Project.to_segmentation_task>` to create segmentation masks from annotations in your project. :param item_name: Item name. :type item_name: :class:`str` :return: Path to the given png mask file. :rtype: :class:`str` :raises: :class:`RuntimeError` if item not found in the project. :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.get_seg_path("IMG_0748")) # Output: RuntimeError: Item IMG_0748 not found in the project. print(ds.get_seg_path("IMG_0748.jpeg")) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/seg/IMG_0748.jpeg.png' """ ann_path = self._item_to_ann.get(item_name, None) if ann_path is None: raise RuntimeError("Item {} not found in the project.".format(item_name)) return os.path.join(self.seg_dir, f"{item_name}.png")
[docs] def add_item_file( self, item_name: str, item_path: str, ann: Optional[Union[Annotation, str]] = None, _validate_item: Optional[bool] = True, _use_hardlink: Optional[bool] = False, item_info: Optional[Union[ImageInfo, Dict, str]] = None, img_info: Optional[Union[ImageInfo, Dict, str]] = None, ) -> None: """ Adds given item file to dataset items directory, and adds given annotation to dataset annotations directory. if ann is None, creates empty annotation file. :param item_name: Item name. :type item_name: :class:`str` :param item_path: Path to the item. :type item_path: :class:`str` :param ann: Annotation object or path to annotation json file. :type ann: :class:`Annotation<supervisely.annotation.annotation.Annotation>` or :class:`str`, optional :param _validate_item: Checks input files format. :type _validate_item: :class:`bool`, optional :param _use_hardlink: If True creates a hardlink pointing to src named dst, otherwise don't. :type _use_hardlink: :class:`bool`, optional :param item_info: ImageInfo object or ImageInfo object converted to dict or path to item info json file for copying to dataset item info directory. :type item_info: :class:`ImageInfo<supervisely.api.image_api.ImageInfo>` or :class:`dict` or :class:`str`, optional :param img_info: Deprecated version of item_info parameter. Can be removed in future versions. :type img_info: :class:`ImageInfo<supervisely.api.image_api.ImageInfo>` or :class:`dict` or :class:`str`, optional :return: None :rtype: NoneType :raises: :class:`RuntimeError` if item_name already exists in dataset or item name has unsupported extension. :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) ann = "/home/admin/work/supervisely/projects/lemons_annotated/ds1/ann/IMG_8888.jpeg.json" ds.add_item_file("IMG_8888.jpeg", "/home/admin/work/supervisely/projects/lemons_annotated/ds1/img/IMG_8888.jpeg", ann=ann) print(ds.item_exists("IMG_8888.jpeg")) # Output: True """ # item_path is None when image is cached if item_path is None and ann is None and img_info is None: raise RuntimeError("No item_path or ann or img_info provided.") if item_info is not None and img_info is not None: raise RuntimeError( "At least one parameter of two (item_info and img_info) must be None." ) if img_info is not None: logger.warn( "img_info parameter of add_item_file() method is deprecated and can be removed in future versions. Use item_info parameter instead." ) item_info = img_info self._add_item_file( item_name, item_path, _validate_item=_validate_item, _use_hardlink=_use_hardlink, ) self._add_ann_by_type(item_name, ann) self._add_item_info(item_name, item_info)
[docs] def add_item_np( self, item_name: str, img: np.ndarray, ann: Optional[Union[Annotation, str]] = None, img_info: Optional[Union[ImageInfo, Dict, str]] = None, ) -> None: """ Adds given numpy matrix as an image to dataset items directory, and adds given annotation to dataset ann directory. if ann is None, creates empty annotation file. :param item_name: Item name. :type item_name: :class:`str` :param img: numpy Image matrix in RGB format. :type img: np.ndarray :param ann: Annotation object or path to annotation json file. :type ann: :class:`Annotation<supervisely.annotation.annotation.Annotation>` or :class:`str`, optional :param img_info: ImageInfo object or ImageInfo object converted to dict or path to item info json file for copying to dataset item info directory. :type img_info: :class:`ImageInfo<supervisely.api.image_api.ImageInfo>` or :class:`dict` or :class:`str`, optional :return: None :rtype: NoneType :raises: :class:`RuntimeError` if item_name already exists in dataset or item name has unsupported extension :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) img_path = "/home/admin/Pictures/Clouds.jpeg" img_np = sly.image.read(img_path) ds.add_item_np("IMG_050.jpeg", img_np) print(ds.item_exists("IMG_050.jpeg")) # Output: True """ if img is None and ann is None and img_info is None: raise RuntimeError("No img or ann or img_info provided.") self._add_img_np(item_name, img) self._add_ann_by_type(item_name, ann) self._add_item_info(item_name, img_info)
[docs] def add_item_raw_bytes( self, item_name: str, item_raw_bytes: bytes, ann: Optional[Union[Annotation, str]] = None, img_info: Optional[Union[ImageInfo, Dict, str]] = None, ) -> None: """ Adds given binary object as an image to dataset items directory, and adds given annotation to dataset ann directory. if ann is None, creates empty annotation file. :param item_name: Item name. :type item_name: :class:`str` :param item_raw_bytes: Binary object. :type item_raw_bytes: :class:`bytes` :param ann: Annotation object or path to annotation json file. :type ann: :class:`Annotation<supervisely.annotation.annotation.Annotation>` or :class:`str`, optional :param img_info: ImageInfo object or ImageInfo object converted to dict or path to item info json file for copying to dataset item info directory. :type img_info: :class:`ImageInfo<supervisely.api.image_api.ImageInfo>` or :class:`dict` or :class:`str`, optional :return: None :rtype: NoneType :raises: :class:`RuntimeError` if item_name already exists in dataset or item name has unsupported extension :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) img_path = "/home/admin/Pictures/Clouds.jpeg" img_np = sly.image.read(img_path) img_bytes = sly.image.write_bytes(img_np, "jpeg") ds.add_item_raw_bytes("IMG_050.jpeg", img_bytes) print(ds.item_exists("IMG_050.jpeg")) # Output: True """ if item_raw_bytes is None and ann is None and img_info is None: raise RuntimeError("No item_raw_bytes or ann or img_info provided.") self._add_item_raw_bytes(item_name, item_raw_bytes) self._add_ann_by_type(item_name, ann) self._add_item_info(item_name, img_info)
def get_classes_stats( self, project_meta: Optional[ProjectMeta] = None, return_objects_count: Optional[bool] = True, return_figures_count: Optional[bool] = True, return_items_count: Optional[bool] = True, ): if project_meta is None: project = Project(self.project_dir, OpenMode.READ) project_meta = project.meta class_items = {} class_objects = {} class_figures = {} for obj_class in project_meta.obj_classes: class_items[obj_class.name] = 0 class_objects[obj_class.name] = 0 class_figures[obj_class.name] = 0 for item_name in self: item_ann = self.get_ann(item_name, project_meta) item_class = {} for label in item_ann.labels: class_objects[label.obj_class.name] += 1 item_class[label.obj_class.name] = True for obj_class in project_meta.obj_classes: if obj_class.name in item_class.keys(): class_items[obj_class.name] += 1 result = {} if return_items_count: result["items_count"] = class_items if return_objects_count: result["objects_count"] = class_objects if return_figures_count: class_figures = class_objects.copy() # for Images project result["figures_count"] = class_figures return result def _get_empty_annotaion(self, item_name): """ Create empty annotation from given item. Generate exception error if item not found in project :param item_name: str :return: Annotation class object """ img_size = sly_image.read(self.get_img_path(item_name)).shape[:2] return self.annotation_class(img_size) def _add_ann_by_type(self, item_name, ann): """ Add given annotation to dataset annotations dir and to dictionary items: item file name -> annotation file name :param item_name: str :param ann: Annotation class object, str, dict, None (generate exception error if param type is another) """ # This is a new-style annotation name, so if there was no image with this name yet, there should not have been # an annotation either. self._item_to_ann[item_name] = item_name + ANN_EXT if ann is None: self.set_ann(item_name, self._get_empty_annotaion(item_name)) elif type(ann) is self.annotation_class: self.set_ann(item_name, ann) elif type(ann) is str: self.set_ann_file(item_name, ann) elif type(ann) is dict: self.set_ann_dict(item_name, ann) else: raise TypeError("Unsupported type {!r} for ann argument".format(type(ann))) def _add_item_info(self, item_name, item_info=None): if item_info is None: return dst_info_path = self.get_item_info_path(item_name) ensure_base_path(dst_info_path) if type(item_info) is dict: dump_json_file(item_info, dst_info_path, indent=4) elif type(item_info) is str and os.path.isfile(item_info): shutil.copy(item_info, dst_info_path) else: # item info named tuple (ImageInfo, VideoInfo, PointcloudInfo, ..) dump_json_file(item_info._asdict(), dst_info_path, indent=4) async def _add_item_info_async(self, item_name, item_info=None): if item_info is None: return dst_info_path = self.get_item_info_path(item_name) ensure_base_path(dst_info_path) if type(item_info) is dict: dump_json_file(item_info, dst_info_path, indent=4) elif type(item_info) is str and os.path.isfile(item_info): shutil.copy(item_info, dst_info_path) else: # item info named tuple (ImageInfo, VideoInfo, PointcloudInfo, ..) dump_json_file(item_info._asdict(), dst_info_path, indent=4) def _check_add_item_name(self, item_name): """ Generate exception error if item name already exists in dataset or has unsupported extension :param item_name: str """ if item_name in self._item_to_ann: raise RuntimeError( "Item {!r} already exists in dataset {!r}.".format(item_name, self.name) ) if not self._has_valid_ext(item_name): raise RuntimeError("Item name {!r} has unsupported extension.".format(item_name)) def _add_item_raw_bytes(self, item_name, item_raw_bytes): """ Write given binary object to dataset items directory, Generate exception error if item_name already exists in dataset or item name has unsupported extension. Make sure we actually received a valid image file, clean it up and fail if not so. :param item_name: str :param item_raw_bytes: binary object """ if item_raw_bytes is None: return self._check_add_item_name(item_name) item_name = item_name.strip("/") dst_img_path = os.path.join(self.item_dir, item_name) os.makedirs(os.path.dirname(dst_img_path), exist_ok=True) with open(dst_img_path, "wb") as fout: fout.write(item_raw_bytes) self._validate_added_item_or_die(dst_img_path) async def _add_item_raw_bytes_async(self, item_name, item_raw_bytes): """ Write given binary object to dataset items directory, Generate exception error if item_name already exists in dataset or item name has unsupported extension. Make sure we actually received a valid image file, clean it up and fail if not so. :param item_name: str :param item_raw_bytes: binary object """ if item_raw_bytes is None: return self._check_add_item_name(item_name) item_name = item_name.strip("/") dst_img_path = os.path.join(self.item_dir, item_name) os.makedirs(os.path.dirname(dst_img_path), exist_ok=True) async with aiofiles.open(dst_img_path, "wb") as fout: await fout.write(item_raw_bytes) self._validate_added_item_or_die(dst_img_path)
[docs] async def add_item_raw_bytes_async( self, item_name: str, item_raw_bytes: bytes, ann: Optional[Union[Annotation, str]] = None, img_info: Optional[Union[ImageInfo, Dict, str]] = None, ) -> None: """ Adds given binary object as an image to dataset items directory, and adds given annotation to dataset ann directory. If ann is None, creates empty annotation file. :param item_name: Item name. :type item_name: :class:`str` :param item_raw_bytes: Binary object. :type item_raw_bytes: :class:`bytes` :param ann: Annotation object or path to annotation json file. :type ann: :class:`Annotation<supervisely.annotation.annotation.Annotation>` or :class:`str`, optional :param img_info: ImageInfo object or ImageInfo object converted to dict or path to item info json file for copying to dataset item info directory. :type img_info: :class:`ImageInfo<supervisely.api.image_api.ImageInfo>` or :class:`dict` or :class:`str`, optional :return: None :rtype: NoneType :raises: :class:`RuntimeError` if item_name already exists in dataset or item name has unsupported extension :Usage example: .. code-block:: python import supervisely as sly from supervisely._utils import run_coroutine dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) img_path = "/home/admin/Pictures/Clouds.jpeg" img_np = sly.image.read(img_path) img_bytes = sly.image.write_bytes(img_np, "jpeg") coroutine = ds.add_item_raw_bytes_async("IMG_050.jpeg", img_bytes) run_coroutine(coroutine) print(ds.item_exists("IMG_050.jpeg")) # Output: True """ if item_raw_bytes is None and ann is None and img_info is None: raise RuntimeError("No item_raw_bytes or ann or img_info provided.") await self._add_item_raw_bytes_async(item_name, item_raw_bytes) await self._add_ann_by_type_async(item_name, ann) self._add_item_info(item_name, img_info)
[docs] def generate_item_path(self, item_name: str) -> str: """ Generates full path to the given item. :param item_name: Item name. :type item_name: :class:`str` :return: Full path to the given item :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(ds.generate_item_path("IMG_0748.jpeg")) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/ds1/img/IMG_0748.jpeg' """ # TODO: what the difference between this and ds.get_item_path() ? return os.path.join(self.item_dir, item_name)
def _add_img_np(self, item_name, img): """ Write given image(RGB format(numpy matrix)) to dataset items directory. Generate exception error if item_name already exists in dataset or item name has unsupported extension :param item_name: str :param img: image in RGB format(numpy matrix) """ if img is None: return self._check_add_item_name(item_name) dst_img_path = os.path.join(self.item_dir, item_name) sly_image.write(dst_img_path, img) def _add_item_file(self, item_name, item_path, _validate_item=True, _use_hardlink=False): """ Add given item file to dataset items directory. Generate exception error if item_name already exists in dataset or item name has unsupported extension :param item_name: str :param item_path: str :param _validate_item: bool :param _use_hardlink: bool """ if item_path is None: return self._check_add_item_name(item_name) dst_item_path = os.path.join(self.item_dir, item_name) if ( item_path != dst_item_path and item_path is not None ): # used only for agent + api during download project + None to optimize internal usage hardlink_done = False if _use_hardlink: try: os.link(item_path, dst_item_path) hardlink_done = True except OSError: pass if not hardlink_done: copy_file(item_path, dst_item_path) if _validate_item: self._validate_added_item_or_die(item_path) def _validate_added_item_or_die(self, item_path): """ Make sure we actually received a valid image file, clean it up and fail if not so :param item_path: str """ # Make sure we actually received a valid image file, clean it up and fail if not so. try: sly_image.validate_format(item_path) except (sly_image.UnsupportedImageFormat, sly_image.ImageReadException): os.remove(item_path) raise async def _validate_added_item_or_die_async(self, item_path): """ Make sure we actually received a valid image file, clean it up and fail if not so :param item_path: str """ # Make sure we actually received a valid image file, clean it up and fail if not so. try: sly_image.validate_format(item_path) except (sly_image.UnsupportedImageFormat, sly_image.ImageReadException): os.remove(item_path) raise
[docs] def set_ann(self, item_name: str, ann: Annotation) -> None: """ Replaces given annotation for given item name to dataset annotations directory in json format. :param item_name: Item name. :type item_name: :class:`str` :param ann: Annotation object. :type ann: :class:`Annotation<supervisely.annotation.annotation.Annotation>` :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) height, width = 500, 700 new_ann = sly.Annotation((height, width)) ds.set_ann("IMG_0748.jpeg", new_ann) """ if type(ann) is not self.annotation_class: raise TypeError( f"Type of 'ann' should be {self.annotation_class.__name__}, not a {type(ann).__name__}" ) dst_ann_path = self.get_ann_path(item_name) dump_json_file(ann.to_json(), dst_ann_path, indent=4)
[docs] def set_ann_file(self, item_name: str, ann_path: str) -> None: """ Replaces given annotation json file for given item name to dataset annotations directory in json format. :param item_name: Item Name. :type item_name: :class:`str` :param ann_path: Path to the :class:`Annotation<supervisely.annotation.annotation.Annotation>` json file. :type ann_path: :class:`str` :return: None :rtype: NoneType :raises: :class:`RuntimeError` if ann_path is not str :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) new_ann = "/home/admin/work/supervisely/projects/kiwi_annotated/ds1/ann/IMG_1812.jpeg.json" ds.set_ann_file("IMG_1812.jpeg", new_ann) """ if type(ann_path) is not str: raise TypeError("Annotation path should be a string, not a {}".format(type(ann_path))) dst_ann_path = self.get_ann_path(item_name) copy_file(ann_path, dst_ann_path)
[docs] def set_ann_dict(self, item_name: str, ann: Dict) -> None: """ Replaces given annotation json for given item name to dataset annotations directory in json format. :param item_name: Item name. :type item_name: :class:`str` :param ann: :class:`Annotation<supervisely.annotation.annotation.Annotation>` as a dict in json format. :type ann: :class:`dict` :return: None :rtype: NoneType :raises: :class:`RuntimeError` if ann_path is not str :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) new_ann_json = { "description":"", "size":{ "height":500, "width":700 }, "tags":[], "objects":[], "customBigData":{} } ds.set_ann_dict("IMG_8888.jpeg", new_ann_json) """ if type(ann) is not dict: raise TypeError("Ann should be a dict, not a {}".format(type(ann))) dst_ann_path = self.get_ann_path(item_name) os.makedirs(os.path.dirname(dst_ann_path), exist_ok=True) dump_json_file(ann, dst_ann_path, indent=4)
[docs] def get_item_paths(self, item_name: str) -> ItemPaths: """ Generates :class:`ItemPaths<ItemPaths>` object with paths to item and annotation directories for item with given name. :param item_name: Item name. :type item_name: :class:`str` :return: ItemPaths object :rtype: :class:`ItemPaths<ItemPaths>` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) img_path, ann_path = dataset.get_item_paths("IMG_0748.jpeg") print("img_path:", img_path) print("ann_path:", ann_path) # Output: # img_path: /home/admin/work/supervisely/projects/lemons_annotated/ds1/img/IMG_0748.jpeg # ann_path: /home/admin/work/supervisely/projects/lemons_annotated/ds1/ann/IMG_0748.jpeg.json """ return ItemPaths( img_path=self.get_item_path(item_name), ann_path=self.get_ann_path(item_name), )
def __len__(self): return len(self._item_to_ann) def __next__(self): for item_name in self._item_to_ann.keys(): yield item_name def __iter__(self): return next(self)
[docs] def items(self) -> Generator[Tuple[str, str, str]]: """ This method is used to iterate over dataset items, receiving item name, path to image and path to annotation json file. It is useful when you need to iterate over dataset items and get paths to images and annotations. :return: Generator object, that yields tuple of item name, path to image and path to annotation json file. :rtype: Generator[Tuple[str]] :Usage example: .. code-block:: python import supervisely as sly input = "path/to/local/directory" # Creating Supervisely project from local directory. project = sly.Project(input, sly.OpenMode.READ) for dataset in project.datasets: for item_name, image_path, ann_path in dataset.items(): print(f"Item '{item_name}': image='{image_path}', ann='{ann_path}'") """ for item_name in self._item_to_ann.keys(): img_path, ann_path = self.get_item_paths(item_name) yield item_name, img_path, ann_path
[docs] def delete_item(self, item_name: str) -> bool: """ Delete image, image info and annotation from :class:`Dataset<Dataset>`. :param item_name: Item name. :type item_name: :class:`str` :return: True if item was successfully deleted, False if item wasn't found in dataset. :rtype: :class:`bool` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) print(dataset.delete_item("IMG_0748")) # Output: False print(dataset.delete_item("IMG_0748.jpeg")) # Output: True """ if self.item_exists(item_name): data_path, ann_path = self.get_item_paths(item_name) item_info_path = self.get_item_info_path(item_name) silent_remove(data_path) silent_remove(ann_path) silent_remove(item_info_path) self._item_to_ann.pop(item_name) return True return False
[docs] @staticmethod def get_url(project_id: int, dataset_id: int) -> str: """ Get URL to dataset items list in Supervisely. :param project_id: :class:`Project<Project>` ID in Supervisely. :type project_id: :class:`int` :param dataset_id: :class:`Dataset<Dataset>` ID in Supervisely. :type dataset_id: :class:`int` :return: URL to dataset items list. :rtype: :class:`str` :Usage example: .. code-block:: python from supervisely import Dataset project_id = 10093 dataset_id = 45330 ds_items_link = Dataset.get_url(project_id, dataset_id) print(ds_items_link) # Output: "/projects/10093/datasets/45330" """ res = f"/projects/{project_id}/datasets/{dataset_id}" if is_development(): res = abs_url(res) return res
[docs] async def set_ann_file_async(self, item_name: str, ann_path: str) -> None: """ Replaces given annotation json file for given item name to dataset annotations directory in json format. :param item_name: Item Name. :type item_name: :class:`str` :param ann_path: Path to the :class:`Annotation<supervisely.annotation.annotation.Annotation>` json file. :type ann_path: :class:`str` :return: None :rtype: NoneType :raises: :class:`RuntimeError` if ann_path is not str :Usage example: .. code-block:: python import supervisely as sly from supervisely._utils import run_coroutine dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) new_ann = "/home/admin/work/supervisely/projects/kiwi_annotated/ds1/ann/IMG_1812.jpeg.json" coroutine = ds.set_ann_file_async("IMG_1812.jpeg", new_ann) run_coroutine(coroutine) """ if type(ann_path) is not str: raise TypeError("Annotation path should be a string, not a {}".format(type(ann_path))) dst_ann_path = self.get_ann_path(item_name) await copy_file_async(ann_path, dst_ann_path)
[docs] async def set_ann_dict_async(self, item_name: str, ann: Dict) -> None: """ Replaces given annotation json for given item name to dataset annotations directory in json format. :param item_name: Item name. :type item_name: :class:`str` :param ann: :class:`Annotation<supervisely.annotation.annotation.Annotation>` as a dict in json format. :type ann: :class:`dict` :return: None :rtype: NoneType :raises: :class:`RuntimeError` if ann_path is not str :Usage example: .. code-block:: python import supervisely as sly from supervisely._utils import run_coroutine dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) new_ann_json = { "description":"", "size":{ "height":500, "width":700 }, "tags":[], "objects":[], "customBigData":{} } coroutine = ds.set_ann_dict_async("IMG_8888.jpeg", new_ann_json) run_coroutine(coroutine) """ if type(ann) is not dict: raise TypeError("Ann should be a dict, not a {}".format(type(ann))) dst_ann_path = self.get_ann_path(item_name) os.makedirs(os.path.dirname(dst_ann_path), exist_ok=True) await dump_json_file_async(ann, dst_ann_path, indent=4)
[docs] async def set_ann_async(self, item_name: str, ann: Annotation) -> None: """ Replaces given annotation for given item name to dataset annotations directory in json format. :param item_name: Item name. :type item_name: :class:`str` :param ann: Annotation object. :type ann: :class:`Annotation<supervisely.annotation.annotation.Annotation>` :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly from supervisely._utils import run_coroutine dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) height, width = 500, 700 new_ann = sly.Annotation((height, width)) coroutine = ds.set_ann_async("IMG_0748.jpeg", new_ann) run_coroutine(coroutine) """ if type(ann) is not self.annotation_class: raise TypeError( f"Type of 'ann' should be {self.annotation_class.__name__}, not a {type(ann).__name__}" ) dst_ann_path = self.get_ann_path(item_name) await dump_json_file_async(ann.to_json(), dst_ann_path, indent=4)
async def _add_ann_by_type_async(self, item_name, ann): """ Add given annotation to dataset annotations dir and to dictionary items: item file name -> annotation file name :param item_name: str :param ann: Annotation class object, str, dict, None (generate exception error if param type is another) """ # This is a new-style annotation name, so if there was no image with this name yet, there should not have been # an annotation either. self._item_to_ann[item_name] = item_name + ANN_EXT if ann is None: await self.set_ann_async(item_name, self._get_empty_annotaion(item_name)) elif type(ann) is self.annotation_class: await self.set_ann_async(item_name, ann) elif type(ann) is str: await self.set_ann_file_async(item_name, ann) elif type(ann) is dict: await self.set_ann_dict_async(item_name, ann) else: raise TypeError("Unsupported type {!r} for ann argument".format(type(ann))) async def _add_item_file_async( self, item_name, item_path, _validate_item=True, _use_hardlink=False ): """ Add given item file to dataset items directory. Generate exception error if item_name already exists in dataset or item name has unsupported extension :param item_name: str :param item_path: str :param _validate_item: bool :param _use_hardlink: bool """ if item_path is None: return self._check_add_item_name(item_name) dst_item_path = os.path.join(self.item_dir, item_name) if ( item_path != dst_item_path and item_path is not None ): # used only for agent + api during download project + None to optimize internal usage hardlink_done = False if _use_hardlink: try: loop = get_or_create_event_loop() await loop.run_in_executor(None, os.link, item_path, dst_item_path) hardlink_done = True except OSError: pass if not hardlink_done: await copy_file_async(item_path, dst_item_path) if _validate_item: await self._validate_added_item_or_die_async(item_path)
[docs] async def add_item_file_async( self, item_name: str, item_path: str, ann: Optional[Union[Annotation, str]] = None, _validate_item: Optional[bool] = True, _use_hardlink: Optional[bool] = False, item_info: Optional[Union[ImageInfo, Dict, str]] = None, img_info: Optional[Union[ImageInfo, Dict, str]] = None, ) -> None: """ Adds given item file to dataset items directory, and adds given annotation to dataset annotations directory. If ann is None, creates empty annotation file. :param item_name: Item name. :type item_name: :class:`str` :param item_path: Path to the item. :type item_path: :class:`str` :param ann: Annotation object or path to annotation json file. :type ann: :class:`Annotation<supervisely.annotation.annotation.Annotation>` or :class:`str`, optional :param _validate_item: Checks input files format. :type _validate_item: :class:`bool`, optional :param _use_hardlink: If True creates a hardlink pointing to src named dst, otherwise don't. :type _use_hardlink: :class:`bool`, optional :param item_info: ImageInfo object or ImageInfo object converted to dict or path to item info json file for copying to dataset item info directory. :type item_info: :class:`ImageInfo<supervisely.api.image_api.ImageInfo>` or :class:`dict` or :class:`str`, optional :param img_info: Deprecated version of item_info parameter. Can be removed in future versions. :type img_info: :class:`ImageInfo<supervisely.api.image_api.ImageInfo>` or :class:`dict` or :class:`str`, optional :return: None :rtype: NoneType :raises: :class:`RuntimeError` if item_name already exists in dataset or item name has unsupported extension. :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds = sly.Dataset(dataset_path, sly.OpenMode.READ) ann = "/home/admin/work/supervisely/projects/lemons_annotated/ds1/ann/IMG_8888.jpeg.json" loop = sly.utils.get_or_create_event_loop() loop.run_until_complete( ds.add_item_file_async("IMG_8888.jpeg", "/home/admin/work/supervisely/projects/lemons_annotated/ds1/img/IMG_8888.jpeg", ann=ann) ) print(ds.item_exists("IMG_8888.jpeg")) # Output: True """ # item_path is None when image is cached if item_path is None and ann is None and img_info is None: raise RuntimeError("No item_path or ann or img_info provided.") if item_info is not None and img_info is not None: raise RuntimeError( "At least one parameter of two (item_info and img_info) must be None." ) if img_info is not None: logger.warning( "img_info parameter of add_item_file() method is deprecated and can be removed in future versions. Use item_info parameter instead." ) item_info = img_info await self._add_item_file_async( item_name, item_path, _validate_item=_validate_item, _use_hardlink=_use_hardlink, ) await self._add_ann_by_type_async(item_name, ann) await self._add_item_info_async(item_name, item_info)
[docs] def to_coco( self, meta: ProjectMeta, return_type: Literal["path", "dict"] = "path", dest_dir: Optional[str] = None, copy_images: bool = False, with_captions=False, log_progress: bool = False, progress_cb: Optional[Callable] = None, ) -> Tuple[Dict, Union[None, Dict]]: """ Convert Supervisely dataset to COCO format. Note: Depending on the `return_type` and `with_captions` parameters, the function returns different values. If `return_type` is "path", the COCO annotation files will be saved to the disk. If `return_type` is "dict", the function returns COCO dataset in dictionary format. If `with_captions` is True, the function returns Tuple (instances and captions). :param meta: Project meta information. :type meta: :class:`ProjectMeta<supervisely.project.project_meta.ProjectMeta>` :param return_type: Return type (`path` or `dict`). :type return_type: :class:`str`, optional :param dest_dir: Path to save COCO dataset. :type dest_dir: :class:`str`, optional :param copy_images: If True, copies images to the COCO dataset directory. :type copy_images: :class:`bool`, optional :param with_captions: If True, returns captions :type with_captions: :class:`bool`, optional :param log_progress: If True, log progress. :type log_progress: :class:`str`, optional :param progress_cb: Progress callback. :type progress_cb: :class:`Callable`, optional :return: COCO dataset in dictionary format. :rtype: :class:`dict` :Usage example: .. code-block:: python import supervisely as sly project_path = "/home/admin/work/supervisely/projects/lemons_annotated" project = sly.Project(project_path, sly.OpenMode.READ) for ds in project.datasets: dest_dir = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" coco: Tuple[Dict, Dict] = ds.to_coco(project.meta, save=True, dest_dir=dest_dir) """ from supervisely.convert import dataset_to_coco return dataset_to_coco( self, meta=meta, return_type=return_type, dest_dir=dest_dir, copy_images=copy_images, with_captions=with_captions, log_progress=log_progress, progress_cb=progress_cb, )
[docs] def to_yolo( self, meta: ProjectMeta, dest_dir: Optional[str] = None, task_type: Literal["detect", "segment", "pose"] = "detect", log_progress: bool = False, progress_cb: Optional[Callable] = None, is_val: Optional[bool] = None, ): """ Convert Supervisely dataset to YOLO format. :param meta: Project meta information. :type meta: :class:`ProjectMeta<supervisely.project.project_meta.ProjectMeta>` :param dest_dir: Path to save YOLO dataset. :type dest_dir: :class:`str`, optional :param task_type: Task type. :type task_type: :class:`str`, optional :param log_progress: If True, log progress. :type log_progress: :class:`str`, optional :param progress_cb: Progress callback. :type progress_cb: :class:`Callable`, optional :param is_val: If True, the dataset is a validation dataset. :type is_val: :class:`bool`, optional :return: YOLO dataset in dictionary format. :rtype: :class:`dict` :Usage example: .. code-block:: python import supervisely as sly project_path = "/home/admin/work/supervisely/projects/lemons_annotated" project = sly.Project(project_path, sly.OpenMode.READ) for ds in project.datasets: dest_dir = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds.to_yolo(project.meta, dest_dir=dest_dir) """ from supervisely.convert import dataset_to_yolo return dataset_to_yolo( self, meta=meta, dest_dir=dest_dir, task_type=task_type, log_progress=log_progress, progress_cb=progress_cb, is_val=is_val, )
[docs] def to_pascal_voc( self, meta: ProjectMeta, dest_dir: Optional[str] = None, train_val_split_coef: float = 0.8, log_progress: bool = False, progress_cb: Optional[Union[Callable, tqdm]] = None, ) -> Tuple[Dict, Union[None, Dict]]: """ Convert Supervisely dataset to Pascal VOC format. :param meta: Project meta information. :type meta: :class:`ProjectMeta<supervisely.project.project_meta.ProjectMeta>` :param dest_dir: Destination directory. :type dest_dir: :class:`str`, optional :param train_val_split_coef: Coefficient for splitting images into train and validation sets. :type train_val_split_coef: :class:`float`, optional :param log_progress: If True, log progress. :type log_progress: :class:`str`, optional :param progress_cb: Progress callback. :type progress_cb: :class:`Callable`, optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly project_path = "/home/admin/work/supervisely/projects/lemons_annotated" project = sly.Project(project_path, sly.OpenMode.READ) for ds in project.datasets: dest_dir = "/home/admin/work/supervisely/projects/lemons_annotated/ds1" ds.to_pascal_voc(project.meta, dest_dir=dest_dir) """ from supervisely.convert import dataset_to_pascal_voc dataset_to_pascal_voc( self, meta=meta, dest_dir=dest_dir, train_val_split_coef=train_val_split_coef, log_progress=log_progress, progress_cb=progress_cb, )
[docs] def get_blob_img_bytes(self, image_name: str) -> bytes: """ Get image bytes from blob file. :param image_name: Image name with extension. :type image_name: :class:`str` :return: Bytes of the image. :rtype: :class:`bytes` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/path/to/project/lemons_annotated/ds1" dataset = sly.Dataset(dataset_path, sly.OpenMode.READ) image_name = "IMG_0748.jpeg" img_bytes = dataset.get_blob_img_bytes(image_name) """ if self.project_dir is None: raise RuntimeError("Project directory is not set. Cannot get blob image bytes.") blob_image_info = None for offset in self.blob_offsets: for batch in BlobImageInfo.load_from_pickle_generator(offset): for file in batch: if file.name == image_name: blob_image_info = file blob_file_name = removesuffix(Path(offset).name, OFFSETS_PKL_SUFFIX) break if blob_image_info is None: logger.debug( f"Image '{image_name}' not found in blob offsets. " f"Make sure that the image is stored in the blob file." ) return None blob_file_path = os.path.join(self.project_dir, self.blob_dir_name, blob_file_name + ".tar") if file_exists(blob_file_path): with open(blob_file_path, "rb") as f: f.seek(blob_image_info.offset_start) img_bytes = f.read(blob_image_info.offset_end - blob_image_info.offset_start) else: logger.debug( f"Blob file '{blob_file_path}' not found. " f"Make sure that the blob file exists in the specified directory." ) img_bytes = None return img_bytes
[docs] def get_blob_img_np(self, image_name: str) -> np.ndarray: """ Get image as numpy array from blob file. :param image_name: Image name with extension. :type image_name: :class:`str` :return: Numpy array of the image. :rtype: :class:`numpy.ndarray` :Usage example: .. code-block:: python import supervisely as sly dataset_path = "/path/to/project/lemons_annotated/ds1" dataset = sly.Dataset(dataset_path, sly.OpenMode.READ) image_name = "IMG_0748.jpeg" img_np = dataset.get_blob_img_np(image_name) """ img_bytes = self.get_blob_img_bytes(image_name) if img_bytes is None: return None return sly_image.read_bytes(img_bytes)
[docs]class Project: """ Project is a parent directory for dataset. Project object is immutable. :param directory: Path to project directory. :type directory: :class:`str` :param mode: Determines working mode for the given project. :type mode: :class:`OpenMode<OpenMode>` :Usage example: .. code-block:: python import supervisely as sly project_path = "/home/admin/work/supervisely/projects/lemons_annotated" project = sly.Project(project_path, sly.OpenMode.READ) """ dataset_class = Dataset blob_dir_name = "blob"
[docs] class DatasetDict(KeyIndexedCollection): """ :class:`Datasets<Dataset>` collection of :class:`Project<Project>`. """ item_type = Dataset def __next__(self): for dataset in self.items(): yield dataset def items(self) -> List[KeyObject]: return sorted(self._collection.values(), key=lambda x: x.parents)
def __init__( self, directory: str, mode: Optional[OpenMode] = None, project_id: Optional[int] = None, api: Optional[sly.Api] = None, ): if project_id is not None: raise NotImplementedError( "Opening project from the API is not implemented yet. Please use local mode " "by providing directory and mode parameters. " "This feature will be implemented later." ) if mode is None and project_id is None: raise ValueError("One of the parameters 'mode' or 'project_id' should be set.") if type(mode) is not OpenMode and mode is not None: raise TypeError( "Argument 'mode' has type {!r}. Correct type is OpenMode".format(type(mode)) ) if project_id is not None and api is None: raise ValueError("Parameter 'api' should be set if 'project_id' is set.") parent_dir, name = Project._parse_path(directory) self._parent_dir = parent_dir self._blob_dir = os.path.join(directory, self.blob_dir_name) self._api = api self.project_id = project_id if project_id is not None: self._info = api.project.get_info_by_id(project_id) self._name = self._info.name else: self._info = None self._name = name self._datasets = Project.DatasetDict() # ds_name -> dataset object self._meta = None self._blob_files = [] if project_id is not None: self._read_api() elif mode is OpenMode.READ: self._read() else: self._create()
[docs] @staticmethod def get_url(id: int) -> str: """ Get URL to datasets list in Supervisely. :param id: :class:`Project<Project>` ID in Supervisely. :type id: :class:`int` :return: URL to datasets list. :rtype: :class:`str` :Usage example: .. code-block:: python from supervisely import Project project_id = 10093 datasets_link = Project.get_url(project_id) print(datasets_link) # Output: "/projects/10093/datasets" """ res = f"/projects/{id}/datasets" if is_development(): res = abs_url(res) return res
@property def parent_dir(self) -> str: """ Project parent directory. :return: Path to project parent directory :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) print(project.parent_dir) # Output: '/home/admin/work/supervisely/projects' """ return self._parent_dir @property def blob_dir(self) -> str: """ Directory for project blobs. Blobs are .tar files with images. Used for fast data transfer. :return: Path to project blob directory :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) print(project.blob_dir) # Output: '/home/admin/work/supervisely/projects/lemons_annotated/blob' """ return self._blob_dir @property def name(self) -> str: """ Project name. :return: Project name. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) print(project.name) # Output: 'lemons_annotated' """ return self._name @property def type(self) -> str: """ Project type. :return: Project type. :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) print(project.type) # Output: 'images' """ return ProjectType.IMAGES.value @property def datasets(self) -> Project.DatasetDict: """ Project datasets. :return: Datasets :rtype: :class:`DatasetDict<supervisely.project.project.Project.DatasetDict>` :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) for dataset in project.datasets: print(dataset.name) # Output: ds1 # ds2 """ return self._datasets @property def meta(self) -> ProjectMeta: """ Project meta. :return: ProjectMeta object :rtype: :class:`ProjectMeta<supervisely.project.project_meta.ProjectMeta>` :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) print(project.meta) # Output: # +-------+--------+----------------+--------+ # | Name | Shape | Color | Hotkey | # +-------+--------+----------------+--------+ # | kiwi | Bitmap | [255, 0, 0] | | # | lemon | Bitmap | [81, 198, 170] | | # +-------+--------+----------------+--------+ # Tags # +------+------------+-----------------+--------+---------------+--------------------+ # | Name | Value type | Possible values | Hotkey | Applicable to | Applicable classes | # +------+------------+-----------------+--------+---------------+--------------------+ """ return self._meta @property def directory(self) -> str: """ Path to the project directory. :return: Path to the project directory :rtype: :class:`str` :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) print(project.directory) # Output: '/home/admin/work/supervisely/projects/lemons_annotated' """ return os.path.join(self.parent_dir, self.name) @property def total_items(self) -> int: """ Total number of items in project. :return: Total number of items in project :rtype: :class:`int` :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) print(project.total_items) # Output: 12 """ return sum(len(ds) for ds in self._datasets) @property def blob_files(self) -> List[str]: """ List of blob files. :return: List of blob files :rtype: :class:`list` :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) print(project.blob_files) # Output: [] """ return self._blob_files @blob_files.setter def blob_files(self, blob_files: List[str]) -> None: """ Sets blob files to the project. :param blob_files: List of blob files. :type :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) project.blob_files = ["blob_file.tar"] """ self._blob_files = blob_files
[docs] def add_blob_file(self, file_name: str) -> None: """ Adds blob file to the project. :param file_name: File name. :type file_name: :class:`str` :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) project.add_blob_file("blob_file.tar") """ self._blob_files.append(file_name)
def get_classes_stats( self, dataset_names: Optional[List[str]] = None, return_objects_count: Optional[bool] = True, return_figures_count: Optional[bool] = True, return_items_count: Optional[bool] = True, ): result = {} for ds in self.datasets: ds: Dataset if dataset_names is not None and ds.name not in dataset_names: continue ds_stats = ds.get_classes_stats( self.meta, return_objects_count, return_figures_count, return_items_count, ) for stat_name, classes_stats in ds_stats.items(): if stat_name not in result.keys(): result[stat_name] = {} for class_name, class_count in classes_stats.items(): if class_name not in result[stat_name].keys(): result[stat_name][class_name] = 0 result[stat_name][class_name] += class_count return result def _get_project_meta_path(self): """ :return: str (path to project meta file(meta.json)) """ return os.path.join(self.directory, "meta.json") def _read(self): meta_json = load_json_file(self._get_project_meta_path()) self._meta = ProjectMeta.from_json(meta_json) if dir_exists(self.blob_dir): self.blob_files = [Path(file).name for file in list_files(self.blob_dir)] else: self.blob_files = [] ignore_dirs = self.dataset_class.ignorable_dirs() # dir names that can not be datasets ignore_content_dirs = ignore_dirs.copy() # dir names which can not contain datasets ignore_content_dirs.pop(ignore_content_dirs.index(self.dataset_class.datasets_dir())) possible_datasets = subdirs_tree(self.directory, ignore_dirs, ignore_content_dirs) for ds_name in possible_datasets: parents = ds_name.split(os.path.sep) parents = [p for p in parents if p != self.dataset_class.datasets_dir()] if len(parents) > 1: parents.pop(-1) else: parents = None try: current_dataset = self.dataset_class( os.path.join(self.directory, ds_name), OpenMode.READ, parents=parents, ) if current_dataset.name not in self._datasets._collection: self._datasets = self._datasets.add(current_dataset) else: logger.debug( f"Dataset '{current_dataset.name}' already exists in project '{self.name}'. Skip adding to collection." ) except Exception as ex: logger.warning(ex) if self.total_items == 0: raise RuntimeError("Project is empty") def _read_api(self): self._meta = ProjectMeta.from_json(self._api.project.get_meta(self.project_id)) for parents, dataset_info in self._api.dataset.tree(self.project_id): relative_path = self.dataset_class._get_dataset_path(dataset_info.name, parents) dataset_path = os.path.join(self.directory, relative_path) current_dataset = self.dataset_class( dataset_path, parents=parents, dataset_id=dataset_info.id, api=self._api ) self._datasets = self._datasets.add(current_dataset) def _create(self): if dir_exists(self.directory): if len(list_files_recursively(self.directory)) > 0: raise RuntimeError( "Cannot create new project {!r}. Directory {!r} already exists and is not empty".format( self.name, self.directory ) ) else: mkdir(self.directory) self.set_meta(ProjectMeta()) self.blob_files = [] def validate(self): # @TODO: remove? pass
[docs] def set_meta(self, new_meta: ProjectMeta) -> None: """ Saves given :class:`meta<supervisely.project.project_meta.ProjectMeta>` to project directory in json format. :param new_meta: ProjectMeta object. :type new_meta: :class:`ProjectMeta<supervisely.project.project_meta.ProjectMeta>` :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly proj_lemons = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) proj_kiwi = sly.Project("/home/admin/work/supervisely/projects/kiwi_annotated", sly.OpenMode.READ) proj_lemons.set_meta(proj_kiwi.meta) print(project.proj_lemons) # Output: # +-------+--------+----------------+--------+ # | Name | Shape | Color | Hotkey | # +-------+--------+----------------+--------+ # | kiwi | Bitmap | [255, 0, 0] | | # +-------+--------+----------------+--------+ """ self._meta = new_meta dump_json_file(self.meta.to_json(), self._get_project_meta_path(), indent=4)
def __iter__(self): return next(self) def __next__(self): for dataset in self._datasets: yield dataset
[docs] def create_dataset(self, ds_name: str, ds_path: Optional[str] = None) -> Dataset: """ Creates a subdirectory with given name and all intermediate subdirectories for items and annotations in project directory, and also adds created dataset to the collection of all datasets in the project. :param ds_name: Dataset name. :type ds_name: :class:`str` :return: Dataset object :rtype: :class:`Dataset<Dataset>` :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) for dataset in project.datasets: print(dataset.name) # Output: ds1 # ds2 project.create_dataset("ds3") for dataset in project.datasets: print(dataset.name) # Output: ds1 # ds2 # ds3 """ if ds_path is None: ds_path = os.path.join(self.directory, ds_name) else: ds_path = os.path.join(self.directory, ds_path) ds = self.dataset_class(ds_path, OpenMode.CREATE) self._datasets = self._datasets.add(ds) return ds
[docs] def copy_data( self, dst_directory: str, dst_name: Optional[str] = None, _validate_item: Optional[bool] = True, _use_hardlink: Optional[bool] = False, ) -> Project: """ Makes a copy of the :class:`Project<Project>`. :param dst_directory: Path to project parent directory. :type dst_directory: :class:`str` :param dst_name: Project name. :type dst_name: :class:`str`, optional :param _validate_item: Checks input files format. :type _validate_item: :class:`bool`, optional :param _use_hardlink: If True creates a hardlink pointing to src named dst, otherwise don't. :type _use_hardlink: :class:`bool`, optional :return: Project object. :rtype: :class:`Project<Project>` :Usage example: .. code-block:: python import supervisely as sly project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) print(project.total_items) # Output: 6 new_project = project.copy_data("/home/admin/work/supervisely/projects/", "lemons_copy") print(new_project.total_items) # Output: 6 """ dst_name = dst_name if dst_name is not None else self.name new_project = Project(os.path.join(dst_directory, dst_name), OpenMode.CREATE) new_project.set_meta(self.meta) for ds in self: new_ds = new_project.create_dataset(ds.name) for item_name in ds: item_path, ann_path = ds.get_item_paths(item_name) item_info_path = ds.get_item_info_path(item_name) item_path = item_path if os.path.isfile(item_path) else None ann_path = ann_path if os.path.isfile(ann_path) else None item_info_path = item_info_path if os.path.isfile(item_info_path) else None new_ds.add_item_file( item_name, item_path, ann_path, _validate_item=_validate_item, _use_hardlink=_use_hardlink, item_info=item_info_path, ) return new_project
@staticmethod def _parse_path(project_dir): """ Split given path to project on parent directory and directory where project is located :param project_dir: str :return: str, str """ # alternative implementation # temp_parent_dir = os.path.dirname(parent_dir) # temp_name = os.path.basename(parent_dir) parent_dir, pr_name = os.path.split(project_dir.rstrip("/")) if not pr_name: raise RuntimeError("Unable to determine project name.") return parent_dir, pr_name
[docs] @staticmethod def to_segmentation_task( src_project_dir: str, dst_project_dir: Optional[str] = None, inplace: Optional[bool] = False, target_classes: Optional[List[str]] = None, progress_cb: Optional[Union[tqdm, Callable]] = None, segmentation_type: Optional[str] = "semantic", bg_name: Optional[str] = "__bg__", bg_color: Optional[List[int]] = None, ) -> None: """ Makes a copy of the :class:`Project<Project>`, converts annotations to :class:`Bitmaps<supervisely.geometry.bitmap.Bitmap>` and updates :class:`project meta<supervisely.project.project_meta.ProjectMeta>`. You will able to get item's segmentation masks location by :class:`dataset.get_seg_path(item_name)<supervisely.project.project.Dataset.get_seg_path>` method. :param src_project_dir: Path to source project directory. :type src_project_dir: :class:`str` :param dst_project_dir: Path to destination project directory. Must be None If inplace=True. :type dst_project_dir: :class:`str`, optional :param inplace: Modifies source project If True. Must be False If dst_project_dir is specified. :type inplace: :class:`bool`, optional :param target_classes: Classes list to include to destination project. If segmentation_type="semantic", background class will be added automatically (by default "__bg__"). :type target_classes: :class:`list` [ :class:`str` ], optional :param progress_cb: Function for tracking download progress. :type progress_cb: tqdm or callable, optional :param segmentation_type: One of: {"semantic", "instance"}. If segmentation_type="semantic", background class will be added automatically (by default "__bg__") and instances will be converted to non overlapping semantic segmentation mask. :type segmentation_type: :class:`str` :param bg_name: Default background class name, used for semantic segmentation. :type bg_name: :class:`str`, optional :param bg_color: Default background class color, used for semantic segmentation. :type bg_color: :class:`list`, optional. Default is [0, 0, 0] :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly source_project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) seg_project_path = "/home/admin/work/supervisely/projects/lemons_segmentation" sly.Project.to_segmentation_task( src_project_dir=source_project.directory, dst_project_dir=seg_project_path ) seg_project = sly.Project(seg_project_path, sly.OpenMode.READ) """ _bg_class_name = bg_name bg_color = bg_color or [0, 0, 0] _bg_obj_class = ObjClass(_bg_class_name, Bitmap, color=bg_color) if dst_project_dir is None and inplace is False: raise ValueError( f"Original project in folder {src_project_dir} will be modified. Please, set 'inplace' " f"argument (inplace=True) directly" ) if inplace is True and dst_project_dir is not None: raise ValueError("dst_project_dir has to be None if inplace is True") if dst_project_dir is not None: if not dir_exists(dst_project_dir): mkdir(dst_project_dir) elif not dir_empty(dst_project_dir): raise ValueError(f"Destination directory {dst_project_dir} is not empty") src_project = Project(src_project_dir, OpenMode.READ) dst_meta = src_project.meta.clone() dst_meta, dst_mapping = dst_meta.to_segmentation_task(target_classes=target_classes) if segmentation_type == "semantic" and dst_meta.obj_classes.get(_bg_class_name) is None: dst_meta = dst_meta.add_obj_class(_bg_obj_class) if target_classes is not None: if segmentation_type == "semantic": if _bg_class_name not in target_classes: target_classes.append(_bg_class_name) # check that all target classes are in destination project meta for class_name in target_classes: if dst_meta.obj_classes.get(class_name) is None: raise KeyError(f"Class {class_name} not found in destination project meta") dst_meta = dst_meta.clone( obj_classes=ObjClassCollection( [dst_meta.obj_classes.get(class_name) for class_name in target_classes] ) ) if inplace is False: dst_project = Project(dst_project_dir, OpenMode.CREATE) dst_project.set_meta(dst_meta) for src_dataset in src_project.datasets: if inplace is False: dst_dataset = dst_project.create_dataset(src_dataset.name) for item_name in src_dataset: img_path, ann_path = src_dataset.get_item_paths(item_name) ann = Annotation.load_json_file(ann_path, src_project.meta) if segmentation_type == "semantic": seg_ann = ann.add_bg_object(_bg_obj_class) dst_mapping[_bg_obj_class] = _bg_obj_class seg_ann = seg_ann.to_nonoverlapping_masks(dst_mapping) # get_labels with bg seg_ann = seg_ann.to_segmentation_task() elif segmentation_type == "instance": seg_ann = ann.to_nonoverlapping_masks( dst_mapping ) # rendered instances and filter classes elif segmentation_type == "panoptic": raise NotImplementedError seg_path = None if inplace is False: if file_exists(img_path): dst_dataset.add_item_file(item_name, img_path, seg_ann) else: # if local project has no images dst_dataset._add_ann_by_type(item_name, seg_ann) seg_path = dst_dataset.get_seg_path(item_name) else: # replace existing annotation src_dataset.set_ann(item_name, seg_ann) seg_path = src_dataset.get_seg_path(item_name) # save rendered segmentation # seg_ann.to_indexed_color_mask(seg_path, palette=palette["colors"], colors=len(palette["names"])) seg_ann.to_indexed_color_mask(seg_path) if progress_cb is not None: progress_cb(1) if inplace is True: src_project.set_meta(dst_meta)
[docs] @staticmethod def to_detection_task( src_project_dir: str, dst_project_dir: Optional[str] = None, inplace: Optional[bool] = False, progress_cb: Optional[Union[tqdm, Callable]] = None, ) -> None: """ Makes a copy of the :class:`Project<Project>`, converts annotations to :class:`Rectangles<supervisely.geometry.rectangle.Rectangle>` and updates :class:`project meta<supervisely.project.project_meta.ProjectMeta>`. :param src_project_dir: Path to source project directory. :type src_project_dir: :class:`str` :param dst_project_dir: Path to destination project directory. Must be None If inplace=True. :type dst_project_dir: :class:`str`, optional :param inplace: Modifies source project If True. Must be False If dst_project_dir is specified. :type inplace: :class:`bool`, optional :param progress_cb: Function for tracking download progress. :type progress_cb: tqdm or callable, optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly source_project = sly.Project("/home/admin/work/supervisely/projects/lemons_annotated", sly.OpenMode.READ) det_project_path = "/home/admin/work/supervisely/projects/lemons_detection" sly.Project.to_detection_task( src_project_dir=source_project.directory, dst_project_dir=det_project_path ) det_project = sly.Project(det_project_path, sly.OpenMode.READ) """ if dst_project_dir is None and inplace is False: raise ValueError( f"Original project in folder {src_project_dir} will be modified. Please, set 'inplace' " f"argument (inplace=True) directly" ) if inplace is True and dst_project_dir is not None: raise ValueError("dst_project_dir has to be None if inplace is True") if dst_project_dir is not None: if not dir_exists(dst_project_dir): mkdir(dst_project_dir) elif not dir_empty(dst_project_dir): raise ValueError(f"Destination directory {dst_project_dir} is not empty") src_project = Project(src_project_dir, OpenMode.READ) det_meta, det_mapping = src_project.meta.to_detection_task(convert_classes=True) if inplace is False: dst_project = Project(dst_project_dir, OpenMode.CREATE) dst_project.set_meta(det_meta) for src_dataset in src_project.datasets: if inplace is False: dst_dataset = dst_project.create_dataset(src_dataset.name) for item_name in src_dataset: img_path, ann_path = src_dataset.get_item_paths(item_name) ann = Annotation.load_json_file(ann_path, src_project.meta) det_ann = ann.to_detection_task(det_mapping) if inplace is False: dst_dataset.add_item_file(item_name, img_path, det_ann) else: # replace existing annotation src_dataset.set_ann(item_name, det_ann) if progress_cb is not None: progress_cb(1) if inplace is True: src_project.set_meta(det_meta)
[docs] @staticmethod def remove_classes_except( project_dir: str, classes_to_keep: Optional[List[str]] = None, inplace: Optional[bool] = False, ) -> None: """ Removes classes from Project with the exception of some classes. :param project_dir: Path to project directory. :type project_dir: :class:`str` :param classes_to_keep: Classes to keep in project. :type classes_to_keep: :class:`list` [ :class:`str` ], optional :param inplace: Checkbox that determines whether to change the source data in project or not. :type inplace: :class:`bool`, optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly project = sly.Project(project_path, sly.OpenMode.READ) project.remove_classes_except(project_path, inplace=True) """ if classes_to_keep is None: classes_to_keep = [] classes_to_remove = [] project = Project(project_dir, OpenMode.READ) for obj_class in project.meta.obj_classes: if obj_class.name not in classes_to_keep: classes_to_remove.append(obj_class.name) Project.remove_classes(project_dir, classes_to_remove, inplace)
[docs] @staticmethod def remove_classes( project_dir: str, classes_to_remove: Optional[List[str]] = None, inplace: Optional[bool] = False, ) -> None: """ Removes given classes from Project. :param project_dir: Path to project directory. :type project_dir: :class:`str` :param classes_to_remove: Classes to remove. :type classes_to_remove: :class:`list` [ :class:`str` ], optional :param inplace: Checkbox that determines whether to change the source data in project or not. :type inplace: :class:`bool`, optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly project = sly.Project(project_path, sly.OpenMode.READ) classes_to_remove = ['lemon'] project.remove_classes(project_path, classes_to_remove, inplace=True) """ if classes_to_remove is None: classes_to_remove = [] if inplace is False: raise ValueError( "Original data will be modified. Please, set 'inplace' argument (inplace=True) directly" ) project = Project(project_dir, OpenMode.READ) for dataset in project.datasets: for item_name in dataset: img_path, ann_path = dataset.get_item_paths(item_name) ann = Annotation.load_json_file(ann_path, project.meta) new_labels = [] for label in ann.labels: if label.obj_class.name not in classes_to_remove: new_labels.append(label) new_ann = ann.clone(labels=new_labels) dataset.set_ann(item_name, new_ann) new_classes = [] for obj_class in project.meta.obj_classes: if obj_class.name not in classes_to_remove: new_classes.append(obj_class) new_meta = project.meta.clone(obj_classes=ObjClassCollection(new_classes)) project.set_meta(new_meta)
@staticmethod def _remove_items( project_dir, without_objects=False, without_tags=False, without_objects_and_tags=False, inplace=False, ): if inplace is False: raise ValueError( "Original data will be modified. Please, set 'inplace' argument (inplace=True) directly" ) if without_objects is False and without_tags is False and without_objects_and_tags is False: raise ValueError( "One of the flags (without_objects / without_tags or without_objects_and_tags) have to be defined" ) project = Project(project_dir, OpenMode.READ) for dataset in project.datasets: items_to_delete = [] for item_name in dataset: img_path, ann_path = dataset.get_item_paths(item_name) ann = Annotation.load_json_file(ann_path, project.meta) if ( (without_objects and len(ann.labels) == 0) or (without_tags and len(ann.img_tags) == 0) or (without_objects_and_tags and ann.is_empty()) ): items_to_delete.append(item_name) for item_name in items_to_delete: dataset.delete_item(item_name)
[docs] @staticmethod def remove_items_without_objects(project_dir: str, inplace: Optional[bool] = False) -> None: """ Remove items(images and annotations) without objects from Project. :param project_dir: Path to project directory. :type project_dir: :class:`str` :param inplace: Checkbox that determines whether to change the source data in project or not. :type inplace: :class:`bool`, optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly sly.Project.remove_items_without_objects(project_path, inplace=True) """ Project._remove_items(project_dir=project_dir, without_objects=True, inplace=inplace)
[docs] @staticmethod def remove_items_without_tags(project_dir: str, inplace: Optional[bool] = False) -> None: """ Remove items(images and annotations) without tags from Project. :param project_dir: Path to project directory. :type project_dir: :class:`str` :param inplace: Checkbox that determines whether to change the source data in project or not. :type inplace: :class:`bool`, optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly sly.Project.remove_items_without_tags(project_path, inplace=True) """ Project._remove_items(project_dir=project_dir, without_tags=True, inplace=inplace)
[docs] @staticmethod def remove_items_without_both_objects_and_tags( project_dir: str, inplace: Optional[bool] = False ) -> None: """ Remove items(images and annotations) without objects and tags from Project. :param project_dir: Path to project directory. :type project_dir: :class:`str` :param inplace: Checkbox that determines whether to change the source data in project or not. :type inplace: :class:`bool`, optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly sly.Project.remove_items_without_both_objects_and_tags(project_path, inplace=True) """ Project._remove_items( project_dir=project_dir, without_objects_and_tags=True, inplace=inplace )
def get_item_paths(self, item_name) -> ItemPaths: # TODO: remove? raise NotImplementedError("Method available only for dataset")
[docs] @staticmethod def get_train_val_splits_by_count( project_dir: str, train_count: int, val_count: int ) -> Tuple[List[ItemInfo], List[ItemInfo]]: """ Get train and val items information from project by given train and val counts. :param project_dir: Path to project directory. :type project_dir: :class:`str` :param train_count: Number of train items. :type train_count: :class:`int` :param val_count: Number of val items. :type val_count: :class:`int` :raises: :class:`ValueError` if total_count != train_count + val_count :return: Tuple with lists of train items information and val items information :rtype: :class:`list` [ :class:`ItemInfo<ItemInfo>` ], :class:`list` [ :class:`ItemInfo<ItemInfo>` ] :Usage example: .. code-block:: python import supervisely as sly train_count = 4 val_count = 2 train_items, val_items = sly.Project.get_train_val_splits_by_count( project_path, train_count, val_count ) """ def _list_items_for_splits(project) -> List[ItemInfo]: items = [] for dataset in project.datasets: for item_name in dataset: items.append( ItemInfo( dataset_name=dataset.name, name=item_name, img_path=dataset.get_img_path(item_name), ann_path=dataset.get_ann_path(item_name), ) ) return items project = Project(project_dir, OpenMode.READ) if project.total_items != train_count + val_count: raise ValueError("total_count != train_count + val_count") all_items = _list_items_for_splits(project) random.shuffle(all_items) train_items = all_items[:train_count] val_items = all_items[train_count:] return train_items, val_items
[docs] @staticmethod def get_train_val_splits_by_tag( project_dir: str, train_tag_name: str, val_tag_name: str, untagged: Optional[str] = "ignore", ) -> Tuple[List[ItemInfo], List[ItemInfo]]: """ Get train and val items information from project by given train and val tags names. :param project_dir: Path to project directory. :type project_dir: :class:`str` :param train_tag_name: Train tag name. :type train_tag_name: :class:`str` :param val_tag_name: Val tag name. :type val_tag_name: :class:`str` :param untagged: Actions in case of absence of train_tag_name and val_tag_name in project. :type untagged: :class:`str`, optional :raises: :class:`ValueError` if untagged not in ["ignore", "train", "val"] :return: Tuple with lists of train items information and val items information :rtype: :class:`list` [ :class:`ItemInfo<ItemInfo>` ], :class:`list` [ :class:`ItemInfo<ItemInfo>` ] :Usage example: .. code-block:: python import supervisely as sly train_tag_name = 'train' val_tag_name = 'val' train_items, val_items = sly.Project.get_train_val_splits_by_tag( project_path, train_tag_name, val_tag_name ) """ untagged_actions = ["ignore", "train", "val"] if untagged not in untagged_actions: raise ValueError( f"Unknown untagged action {untagged}. Should be one of {untagged_actions}" ) project = Project(project_dir, OpenMode.READ) train_items = [] val_items = [] for dataset in project.datasets: for item_name in dataset: img_path, ann_path = dataset.get_item_paths(item_name) info = ItemInfo(dataset.name, item_name, img_path, ann_path) ann = Annotation.load_json_file(ann_path, project.meta) if ann.img_tags.get(train_tag_name) is not None: train_items.append(info) if ann.img_tags.get(val_tag_name) is not None: val_items.append(info) if ( ann.img_tags.get(train_tag_name) is None and ann.img_tags.get(val_tag_name) is None ): # untagged item if untagged == "ignore": continue elif untagged == "train": train_items.append(info) elif untagged == "val": val_items.append(info) return train_items, val_items
[docs] @staticmethod def get_train_val_splits_by_dataset( project_dir: str, train_datasets: List[str], val_datasets: List[str] ) -> Tuple[List[ItemInfo], List[ItemInfo]]: """ Get train and val items information from project by given train and val datasets names. :param project_dir: Path to project directory. :type project_dir: :class:`str` :param train_datasets: List of train datasets names. :type train_datasets: :class:`list` [ :class:`str` ] :param val_datasets: List of val datasets names. :type val_datasets: :class:`list` [ :class:`str` ] :raises: :class:`KeyError` if dataset name not found in project :return: Tuple with lists of train items information and val items information :rtype: :class:`list` [ :class:`ItemInfo<ItemInfo>` ], :class:`list` [ :class:`ItemInfo<ItemInfo>` ] :Usage example: .. code-block:: python import supervisely as sly train_datasets = ['ds1', 'ds2'] val_datasets = ['ds3', 'ds4'] train_items, val_items = sly.Project.get_train_val_splits_by_dataset( project_path, train_datasets, val_datasets ) """ def _add_items_to_list(project, datasets_names, items_list): for dataset_name in datasets_names: dataset = project.datasets.get(dataset_name) if dataset is None: raise KeyError(f"Dataset '{dataset_name}' not found") for item_name in dataset: img_path, ann_path = dataset.get_item_paths(item_name) info = ItemInfo(dataset.name, item_name, img_path, ann_path) items_list.append(info) project = Project(project_dir, OpenMode.READ) train_items = [] _add_items_to_list(project, train_datasets, train_items) val_items = [] _add_items_to_list(project, val_datasets, val_items) return train_items, val_items
[docs] @staticmethod def get_train_val_splits_by_collections( project_dir: str, train_collections: List[int], val_collections: List[int], project_id: int, api: Api, ) -> Tuple[List[ItemInfo], List[ItemInfo]]: """ Get train and val items information from project by given train and val collections IDs. :param project_dir: Path to project directory. :type project_dir: :class:`str` :param train_collections: List of train collections IDs. :type train_collections: :class:`list` [ :class:`int` ] :param val_collections: List of val collections IDs. :type val_collections: :class:`list` [ :class:`int` ] :param project_id: Project ID. :type project_id: :class:`int` :param api: Supervisely API address and token. :type api: :class:`Api<supervisely.api.api.Api>` :raises: :class:`KeyError` if collection ID not found in project :return: Tuple with lists of train items information and val items information :rtype: :class:`list` [ :class:`ItemInfo<ItemInfo>` ], :class:`list` [ :class:`ItemInfo<ItemInfo>` ] """ from supervisely.api.entities_collection_api import CollectionTypeFilter project = Project(project_dir, OpenMode.READ) ds_id_to_name = {} for parents, ds_info in api.dataset.tree(project_id): full_name = "/".join(parents + [ds_info.name]) ds_id_to_name[ds_info.id] = full_name train_items = [] val_items = [] for collection_ids, items_dict in [ (train_collections, train_items), (val_collections, val_items), ]: for collection_id in collection_ids: collection_items = api.entities_collection.get_items( collection_id=collection_id, project_id=project_id, collection_type=CollectionTypeFilter.DEFAULT, ) for item in collection_items: ds_name = ds_id_to_name.get(item.dataset_id) ds = project.datasets.get(ds_name) img_path, ann_path = ds.get_item_paths(item.name) info = ItemInfo(ds_name, item.name, img_path, ann_path) items_dict.append(info) return train_items, val_items
[docs] @staticmethod def download( api: Api, project_id: int, dest_dir: str, dataset_ids: Optional[List[int]] = None, log_progress: bool = True, batch_size: Optional[int] = 50, cache: Optional[FileCache] = None, progress_cb: Optional[Union[tqdm, Callable]] = None, only_image_tags: Optional[bool] = False, save_image_info: Optional[bool] = False, save_images: bool = True, save_image_meta: bool = False, resume_download: bool = False, **kwargs, ) -> None: """ Download project from Supervisely to the given directory. :param api: Supervisely API address and token. :type api: :class:`Api<supervisely.api.api.Api>` :param project_id: Supervisely downloadable project ID. :type project_id: :class:`int` :param dest_dir: Destination directory. :type dest_dir: :class:`str` :param dataset_ids: Dataset IDs. :type dataset_ids: :class:`list` [ :class:`int` ], optional :param log_progress: Show uploading progress bar. :type log_progress: :class:`bool` :param batch_size: The number of images in the batch when they are loaded to a host. :type batch_size: :class:`int`, optional :param cache: FileCache object. :type cache: :class:`FileCache<supervisely.io.fs_cache.FileCache>`, optional :param progress_cb: Function for tracking download progress. :type progress_cb: tqdm or callable, optional :param only_image_tags: Download project with only images tags (without objects tags). :type only_image_tags: :class:`bool`, optional :param save_image_info: Download images infos or not. :type save_image_info: :class:`bool`, optional :param save_images: Download images or not. :type save_images: :class:`bool`, optional :param save_image_meta: Download images metadata in JSON format or not. :type save_image_meta: :class:`bool`, optional :param download_blob_files: Default is False. It will download images in classic way. If True, it will download blob files, if they are present in the project, to optimize download process. :type download_blob_files: bool, optional :param skip_create_readme: Skip creating README.md file. Default is False. :type skip_create_readme: bool, optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly # Local destination Project folder save_directory = "/home/admin/work/supervisely/source/project" # Obtain server address and your api_token from environment variables # Edit those values if you run this notebook on your own PC address = os.environ['SERVER_ADDRESS'] token = os.environ['API_TOKEN'] # Initialize API object api = sly.Api(address, token) project_id = 8888 # Download Project sly.Project.download(api, project_id, save_directory) project_fs = sly.Project(save_directory, sly.OpenMode.READ) """ download_project( api=api, project_id=project_id, dest_dir=dest_dir, dataset_ids=dataset_ids, log_progress=log_progress, batch_size=batch_size, cache=cache, progress_cb=progress_cb, only_image_tags=only_image_tags, save_image_info=save_image_info, save_images=save_images, save_image_meta=save_image_meta, resume_download=resume_download, **kwargs, )
[docs] @staticmethod def download_bin( api: sly.Api, project_id: int, dest_dir: str = None, dataset_ids: Optional[List[int]] = None, batch_size: Optional[int] = 100, log_progress: Optional[bool] = True, progress_cb: Optional[Callable] = None, return_bytesio: Optional[bool] = False, ) -> Union[str, io.BytesIO]: """ Download project to the local directory in binary format. Faster than downloading project in the usual way. This type of project download is more suitable for creating local backups. It is also suitable for cases where you don't need access to individual project files, such as images or annotations. Binary file contains the following data: - ProjectInfo - ProjectMeta - List of DatasetInfo - List of ImageInfo - Dict of Figures - Dict of AlphaGeometries :param api: Supervisely API address and token. :type api: :class:`Api<supervisely.api.api.Api>` :param project_id: Project ID to download. :type project_id: :class:`int` :param dest_dir: Destination path to local directory. :type dest_dir: :class:`str`, optional :param dataset_ids: Specified list of Dataset IDs which will be downloaded. If you want to download nested datasets, you should specify all nested IDs. :type dataset_ids: :class:`list` [ :class:`int` ], optional :param batch_size: Size of a downloading batch. :type batch_size: :class:`int`, optional :param log_progress: Show downloading logs in the output. :type log_progress: :class:`bool`, optional :param progress_cb: Function for tracking download progress. Has a higher priority than log_progress. :type progress_cb: :class:`tqdm` or :class:`callable`, optional :param return_bytesio: If True, returns BytesIO object instead of saving it to the disk. :type return_bytesio: :class:`bool`, optional :return: Path to the binary file or BytesIO object. :rtype: :class:`str` or :class:`BytesIO` :Usage example: .. code-block:: python import supervisely as sly # Local destination Project folder save_directory = "/home/admin/work/supervisely/source/project" # Obtain server address and your api_token from environment variables # Edit those values if you run this notebook on your own PC address = os.environ['SERVER_ADDRESS'] token = os.environ['API_TOKEN'] # Initialize API object api = sly.Api(address, token) project_id = 8888 # Download Project in binary format project_bin_path = sly.Project.download_bin(api, project_id, save_directory) """ if dest_dir is None and not return_bytesio: raise ValueError( "Local save directory dest_dir must be specified if return_bytesio is False" ) ds_filters = ( [{"field": "id", "operator": "in", "value": dataset_ids}] if dataset_ids is not None else None ) project_info = api.project.get_info_by_id(project_id) meta = ProjectMeta.from_json(api.project.get_meta(project_id, with_settings=True)) dataset_infos = api.dataset.get_list(project_id, filters=ds_filters, recursive=True) image_infos = [] figures = {} alpha_geometries = {} for dataset_info in dataset_infos: ds_image_infos = api.image.get_list(dataset_info.id) image_infos.extend(ds_image_infos) ds_progress = progress_cb if log_progress and progress_cb is None: ds_progress = tqdm_sly( desc="Downloading dataset: {!r}".format(dataset_info.name), total=len(ds_image_infos), ) for batch in batched(ds_image_infos, batch_size): image_ids = [image_info.id for image_info in batch] ds_figures = api.image.figure.download(dataset_info.id, image_ids) alpha_ids = [ figure.id for figures in ds_figures.values() for figure in figures if figure.geometry_type == sly.AlphaMask.name() ] if len(alpha_ids) > 0: geometries_list = api.image.figure.download_geometries_batch(alpha_ids) alpha_geometries.update(dict(zip(alpha_ids, geometries_list))) figures.update(ds_figures) if ds_progress is not None: ds_progress(len(batch)) if dataset_infos != [] and ds_progress is not None: ds_progress.close() data = (project_info, meta, dataset_infos, image_infos, figures, alpha_geometries) file = ( io.BytesIO() if return_bytesio else open(os.path.join(dest_dir, f"{project_info.id}_{project_info.name}"), "wb") ) if isinstance(file, io.BytesIO): pickle.dump(data, file) else: with file as f: pickle.dump(data, f) return file if return_bytesio else file.name
[docs] @staticmethod def upload_bin( api: Api, file: Union[str, io.BytesIO], workspace_id: int, project_name: Optional[str] = None, with_custom_data: Optional[bool] = True, log_progress: Optional[bool] = True, progress_cb: Optional[Union[tqdm, Callable]] = None, skip_missed: Optional[bool] = False, ) -> sly.ProjectInfo: """ Uploads project to Supervisely from the given binary file and suitable only for projects downloaded in binary format. This method is a counterpart to :func:`download_bin`. Faster than uploading project in the usual way. :param api: Supervisely API address and token. :type api: :class:`Api<supervisely.api.api.Api>` :param file: Path to the binary file or BytesIO object. :type file: :class:`str` or :class:`BytesIO` :param workspace_id: Workspace ID, where project will be uploaded. :type workspace_id: :class:`int` :param project_name: Name of the project in Supervisely. Can be changed if project with the same name is already exists. :type project_name: :class:`str`, optional :param with_custom_data: If True, custom data from source project will be added to a new project. :type with_custom_data: :class:`bool`, optional :param log_progress: Show uploading progress bar. :type log_progress: :class:`bool`, optional :param progress_cb: Function for tracking upload progress for datasets. Has a higher priority than log_progress. :type progress_cb: tqdm or callable, optional :param skip_missed: Skip missed images. :type skip_missed: :class:`bool`, optional :return: ProjectInfo object. :rtype: :class:`ProjectInfo<supervisely.api.project.ProjectInfo>` :Usage example: .. code-block:: python import supervisely as sly # Local folder with Project project_path = "/home/admin/work/supervisely/source/project/222_ProjectName" # Obtain server address and your api_token from environment variables # Edit those values if you run this notebook on your own PC address = os.environ['SERVER_ADDRESS'] token = os.environ['API_TOKEN'] # Initialize API object api = sly.Api(address, token) # Upload Project project_info = sly.Project.upload_bin( api, project_path, workspace_id=45, project_name="My Project" ) """ alpha_mask_name = sly.AlphaMask.name() project_info: sly.ProjectInfo meta: ProjectMeta dataset_infos: List[sly.DatasetInfo] image_infos: List[ImageInfo] figures: Dict[int, List[sly.FigureInfo]] # image_id: List of figure_infos alpha_geometries: Dict[int, List[dict]] # figure_id: List of geometries with file if isinstance(file, io.BytesIO) else open(file, "rb") as f: unpickler = CustomUnpickler(f) project_info, meta, dataset_infos, image_infos, figures, alpha_geometries = ( unpickler.load() ) if project_name is None: project_name = project_info.name new_project_info = api.project.create( workspace_id, project_name, change_name_if_conflict=True ) custom_data = new_project_info.custom_data version_num = project_info.version.get("version", None) if project_info.version else 0 custom_data["restored_from"] = { "project_id": project_info.id, "version_num": version_num + 1 if version_num is not None else "Unable to determine", } if with_custom_data: custom_data.update(project_info.custom_data) api.project.update_custom_data(new_project_info.id, custom_data, silent=True) new_meta = api.project.update_meta(new_project_info.id, meta) # remap tags old_tags = meta.tag_metas.to_json() new_tags = new_meta.tag_metas.to_json() old_new_tags_mapping = dict( map(lambda old_tag, new_tag: (old_tag["id"], new_tag["id"]), old_tags, new_tags) ) # remap classes old_classes = meta.obj_classes.to_json() new_classes = new_meta.obj_classes.to_json() old_new_classes_mapping = dict( map( lambda old_class, new_class: (old_class["id"], new_class["id"]), old_classes, new_classes, ) ) dataset_mapping = {} # Sort datasets by parent, so that datasets with parent = 0 are processed first sorted_dataset_infos = sorted( dataset_infos, key=lambda dataset: (dataset.parent_id is not None, dataset.parent_id) ) for dataset_info in sorted_dataset_infos: dataset_info: sly.DatasetInfo parent_ds_info = dataset_mapping.get(dataset_info.parent_id, None) new_parent_id = parent_ds_info.id if parent_ds_info else None if new_parent_id is None and dataset_info.parent_id is not None: logger.warning( f"Parent dataset for dataset '{dataset_info.name}' not found. Will be added to project root." ) new_dataset_info = api.dataset.create( new_project_info.id, dataset_info.name, parent_id=new_parent_id ) if new_dataset_info is None: raise RuntimeError(f"Failed to restore dataset {dataset_info.name}") dataset_mapping[dataset_info.id] = new_dataset_info info_values_by_dataset = defaultdict( lambda: {"infos": [], "ids": [], "names": [], "hashes": [], "metas": [], "links": []} ) if skip_missed: existing_hashes = api.image.check_existing_hashes( list(set([inf.hash for inf in image_infos if inf.hash and not inf.link])) ) workspace_info = api.workspace.get_info_by_id(workspace_id) existing_links = api.image.check_existing_links( list(set([inf.link for inf in image_infos if inf.link])), team_id=workspace_info.team_id, ) image_infos = sorted(image_infos, key=lambda info: info.link is not None) values_lists = ["infos", "ids", "names", "hashes", "metas", "links"] attributes = [None, "id", "name", "hash", "meta", "link"] for info in image_infos: # pylint: disable=possibly-used-before-assignment if skip_missed and info.hash and not info.link: if info.hash not in existing_hashes: logger.warning( f"Image with name {info.name} can't be uploaded. Hash {info.hash} not found" ) continue if skip_missed and info.link: if info.link not in existing_links: logger.warning( f"Image with name {info.name} can't be uploaded. Link {info.link} can't be accessed" ) continue for value_list, attr in zip(values_lists, attributes): if value_list == "infos": info_values_by_dataset[info.dataset_id][value_list].append(info) else: info_values_by_dataset[info.dataset_id][value_list].append(getattr(info, attr)) for dataset_id, values in info_values_by_dataset.items(): dataset_name = None if dataset_id in dataset_mapping: # return new dataset_id and name new_ds_info = dataset_mapping.get(dataset_id) dataset_id, dataset_name = new_ds_info.id, new_ds_info.name if dataset_id is None: raise KeyError(f"Dataset ID {dataset_id} not found in mapping") ds_progress = progress_cb if log_progress and progress_cb is None: ds_progress = tqdm_sly( desc="Uploading images to {!r}".format(dataset_name), total=len(values["names"]), ) # ------------------------------------ Determine Upload Method ----------------------------------- # none_link_indices = [i for i, link in enumerate(values["links"]) if link is None] if len(none_link_indices) == len(values["links"]): new_file_infos = api.image.upload_hashes( dataset_id, names=values["names"], hashes=values["hashes"], metas=values["metas"], batch_size=200, progress_cb=ds_progress, ) elif not none_link_indices: new_file_infos = api.image.upload_links( dataset_id, names=values["names"], links=values["links"], metas=values["metas"], batch_size=200, progress_cb=ds_progress, ) else: if not all( none_link_indices[i] - none_link_indices[i - 1] == 1 for i in range(1, len(none_link_indices)) ): raise ValueError( "Internal upload_bin Error. Images with links and without links are not in continuous blocks" ) i = none_link_indices[0] # first image without link j = none_link_indices[-1] # last image without link new_file_infos = api.image.upload_hashes( dataset_id, names=values["names"][i : j + 1], hashes=values["hashes"][i : j + 1], metas=values["metas"][i : j + 1], batch_size=200, progress_cb=ds_progress, ) new_file_infos_link = api.image.upload_links( dataset_id, names=values["names"][j + 1 :], links=values["links"][j + 1 :], metas=values["metas"][j + 1 :], batch_size=200, progress_cb=ds_progress, ) new_file_infos.extend(new_file_infos_link) # ----------------------------------------------- - ---------------------------------------------- # # image_lists_by_tags -> tagId: {tagValue: [imageId]} image_lists_by_tags = defaultdict(lambda: defaultdict(list)) alpha_figures = [] other_figures = [] all_figure_tags = defaultdict(list) # figure_id: List of (tagId, value) old_alpha_figure_ids = [] tags_list = [] # to append tags to figures in bulk if ds_progress is not None: ds_fig_progress = tqdm_sly( desc="Processing figures for images in {!r}".format(dataset_name), total=len(new_file_infos), ) for old_file_info, new_file_info in zip(values["infos"], new_file_infos): for tag in old_file_info.tags: new_tag_id = old_new_tags_mapping[tag.get("tagId")] image_lists_by_tags[new_tag_id][tag.get("value")].append(new_file_info.id) image_figures = figures.get(old_file_info.id, []) if len(image_figures) > 0: alpha_figure_jsons = [] other_figure_jsons = [] for figure in image_figures: figure_json = figure._asdict() if figure.geometry_type == alpha_mask_name: alpha_figure_jsons.append(figure_json) old_alpha_figure_ids.append(figure_json["id"]) else: other_figure_jsons.append(figure_json) def create_figure_json(figure, geometry): return { "meta": figure["meta"] if figure["meta"] is not None else {}, "entityId": new_file_info.id, "classId": old_new_classes_mapping[figure["class_id"]], "geometry": geometry, "geometryType": figure["geometry_type"], } new_figure_jsons = [ create_figure_json(figure, figure["geometry"]) for figure in other_figure_jsons ] new_alpha_figure_jsons = [ create_figure_json(figure, None) for figure in alpha_figure_jsons ] other_figures.extend(new_figure_jsons) alpha_figures.extend(new_alpha_figure_jsons) def process_figures(figure_jsons, figure_tags): for figure in figure_jsons: figure_tags[figure.get("id")].extend( (tag.get("tagId"), tag.get("value", None)) for tag in figure["tags"] ) process_figures(other_figure_jsons, all_figure_tags) process_figures(alpha_figure_jsons, all_figure_tags) if ds_progress is not None: ds_fig_progress.update(1) all_figure_ids = api.image.figure.create_bulk( other_figures, dataset_id=new_file_info.dataset_id, ) new_alpha_figure_ids = api.image.figure.create_bulk( alpha_figures, dataset_id=new_file_info.dataset_id ) all_figure_ids.extend(new_alpha_figure_ids) ordered_alpha_geometries = list(map(alpha_geometries.get, old_alpha_figure_ids)) api.image.figure.upload_geometries_batch(new_alpha_figure_ids, ordered_alpha_geometries) for tag, value in image_lists_by_tags.items(): for value, image_ids in value.items(): api.image.add_tag_batch(image_ids, tag, value, batch_size=200) for new_of_id, tags in zip(all_figure_ids, all_figure_tags.values()): for tag_id, tag_value in tags: new_tag_id = old_new_tags_mapping[tag_id] tags_list.append( {"tagId": new_tag_id, "figureId": new_of_id, "value": tag_value} ) api.image.tag.add_to_objects( new_project_info.id, tags_list, batch_size=300, log_progress=True if ds_progress is not None else False, ) return new_project_info
[docs] @staticmethod def upload( dir: str, api: Api, workspace_id: int, project_name: Optional[str] = None, log_progress: bool = True, progress_cb: Optional[Union[tqdm, Callable]] = None, ) -> Tuple[int, str]: """ Uploads project to Supervisely from the given directory. If you have a metadata.json files in the project directory for images, you will be able to upload images with added custom sort parameter. To do this, use context manager :func:`api.image.add_custom_sort` with the desired key name from the metadata.json file which will be used for sorting. More about project struture: https://developer.supervisely.com/getting-started/supervisely-annotation-format/project-structure#project-structure-example Refer to the example section for usage details. :param dir: Path to project directory. :type dir: :class:`str` :param api: Supervisely API address and token. :type api: :class:`Api<supervisely.api.api.Api>` :param workspace_id: Workspace ID, where project will be uploaded. :type workspace_id: :class:`int` :param project_name: Name of the project in Supervisely. Can be changed if project with the same name is already exists. :type project_name: :class:`str`, optional :param log_progress: Show uploading progress bar. :type log_progress: :class:`bool` :param progress_cb: Function for tracking download progress. :type progress_cb: tqdm or callable, optional :return: Project ID and name. It is recommended to check that returned project name coincides with provided project name. :rtype: :class:`int`, :class:`str` :Usage example: .. code-block:: python import supervisely as sly # Local folder with Project project_directory = "/home/admin/work/supervisely/source/project" # Obtain server address and your api_token from environment variables # Edit those values if you run this notebook on your own PC address = os.environ['SERVER_ADDRESS'] token = os.environ['API_TOKEN'] # Initialize API object api = sly.Api(address, token) # Upload Project project_id, project_name = sly.Project.upload( project_directory, api, workspace_id=45, project_name="My Project" ) # Upload project with added custom sort order # This context manager processes every image and adds a custom sort order # if `meta` is present in the image info file or image meta file. # Otherwise, it will be uploaded without a custom sort order. with api.image.add_custom_sort(key="key_name"): project_id, project_name = sly.Project.upload( project_directory, api, workspace_id=45, project_name="My Project" ) """ return upload_project( dir=dir, api=api, workspace_id=workspace_id, project_name=project_name, log_progress=log_progress, progress_cb=progress_cb, )
[docs] @staticmethod async def download_async( api: Api, project_id: int, dest_dir: str, dataset_ids: Optional[List[int]] = None, log_progress: bool = True, semaphore: asyncio.Semaphore = None, progress_cb: Optional[Union[tqdm, Callable]] = None, only_image_tags: Optional[bool] = False, save_image_info: Optional[bool] = False, save_images: bool = True, save_image_meta: bool = False, images_ids: Optional[List[int]] = None, resume_download: Optional[bool] = False, **kwargs, ) -> None: """ Download project from Supervisely to the given directory in asynchronous mode. :param api: Supervisely API address and token. :type api: :class:`Api<supervisely.api.api.Api>` :param project_id: Supervisely downloadable project ID. :type project_id: :class:`int` :param dest_dir: Destination directory. :type dest_dir: :class:`str` :param dataset_ids: Filter datasets by IDs. :type dataset_ids: :class:`list` [ :class:`int` ], optional :param log_progress: Show uploading progress bar. :type log_progress: :class:`bool` :param semaphore: Semaphore to limit the number of concurrent downloads of items. :type semaphore: :class:`asyncio.Semaphore`, optional :param progress_cb: Function for tracking download progress. :type progress_cb: tqdm or callable, optional :param only_image_tags: Download project with only images tags (without objects tags). :type only_image_tags: :class:`bool`, optional :param save_image_info: Download images infos or not. :type save_image_info: :class:`bool`, optional :param save_images: Download images or not. :type save_images: :class:`bool`, optional :param save_image_meta: Download images metadata in JSON format or not. :type save_image_meta: :class:`bool`, optional :param images_ids: Filter images by IDs. :type images_ids: :class:`list` [ :class:`int` ], optional :param resume_download: Resume download enables to download only missing files avoiding erase of existing files. :type resume_download: :class:`bool`, optional :param skip_create_readme: Skip creating README.md file. Default is False. :type skip_create_readme: bool, optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly from supervisely._utils import run_coroutine os.environ['SERVER_ADDRESS'] = 'https://app.supervisely.com' os.environ['API_TOKEN'] = 'Your Supervisely API Token' api = sly.Api.from_env() project_id = 8888 save_directory = "/path/to/save/projects" coroutine = sly.Project.download_async(api, project_id, save_directory) run_coroutine(coroutine) """ if kwargs.pop("cache", None) is not None: logger.warning( "Cache is not supported in async mode and will be ignored. " "Use resume_download parameter instead to optimize download process." ) await _download_project_async( api=api, project_id=project_id, dest_dir=dest_dir, dataset_ids=dataset_ids, log_progress=log_progress, semaphore=semaphore, only_image_tags=only_image_tags, save_image_info=save_image_info, save_images=save_images, progress_cb=progress_cb, save_image_meta=save_image_meta, images_ids=images_ids, resume_download=resume_download, **kwargs, )
[docs] def to_coco( self, dest_dir: Optional[str] = None, copy_images: bool = False, with_captions: bool = False, log_progress: bool = True, progress_cb: Optional[Callable] = None, ) -> None: """ Convert Supervisely project to COCO format. :param dest_dir: Destination directory. :type dest_dir: :class:`str`, optional :param copy_images: Copy images to the destination directory. :type copy_images: :class:`bool` :param with_captions: Return captions for images. :type with_captions: :class:`bool` :param log_progress: Show uploading progress bar. :type log_progress: :class:`bool` :param progress_cb: Function for tracking conversion progress (for all items in the project). :type progress_cb: callable, optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly # Local folder with Project project_directory = "/home/admin/work/supervisely/source/project" # Convert Project to COCO format sly.Project(project_directory).to_coco(log_progress=True) # or from supervisely.convert import to_coco to_coco(project_directory, dest_dir="./coco_project") """ from supervisely.convert import project_to_coco project_to_coco( project=self, dest_dir=dest_dir, copy_images=copy_images, with_captions=with_captions, log_progress=log_progress, progress_cb=progress_cb, )
[docs] def to_yolo( self, dest_dir: Optional[str] = None, task_type: Literal["detect", "segment", "pose"] = "detect", log_progress: bool = True, progress_cb: Optional[Callable] = None, val_datasets: Optional[List[str]] = None, ) -> None: """ Convert Supervisely project to YOLO format. :param dest_dir: Destination directory. :type dest_dir: :class:`str`, optional :param task_type: Task type for YOLO format. Possible values: 'detection', 'segmentation', 'pose'. :type task_type: :class:`str` or :class:`TaskType`, optional :param log_progress: Show uploading progress bar. :type log_progress: :class:`bool` :param progress_cb: Function for tracking conversion progress (for all items in the project). :type progress_cb: callable, optional :param val_datasets: List of dataset names for validation. Full dataset names are required (e.g., 'ds0/nested_ds1/ds3'). If specified, datasets from the list will be marked as val, others as train. If not specified, the function will determine the validation datasets automatically. :type val_datasets: :class:`list` [ :class:`str` ], optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly # Local folder with Project project_directory = "/home/admin/work/supervisely/source/project" # Convert Project to YOLO format sly.Project(project_directory).to_yolo(log_progress=True) # or from supervisely.convert import to_yolo to_yolo(project_directory, dest_dir="./yolo_project") """ from supervisely.convert import project_to_yolo return project_to_yolo( project=self, dest_dir=dest_dir, task_type=task_type, log_progress=log_progress, progress_cb=progress_cb, val_datasets=val_datasets, )
[docs] def to_pascal_voc( self, dest_dir: Optional[str] = None, train_val_split_coef: float = 0.8, log_progress: bool = True, progress_cb: Optional[Union[tqdm, Callable]] = None, ) -> None: """ Convert Supervisely project to Pascal VOC format. :param dest_dir: Destination directory. :type dest_dir: :class:`str`, optional :param train_val_split_coef: Coefficient for splitting images into train and validation sets. :type train_val_split_coef: :class:`float`, optional :param log_progress: Show uploading progress bar. :type log_progress: :class:`bool` :param progress_cb: Function for tracking conversion progress (for all items in the project). :type progress_cb: callable, optional :return: None :rtype: NoneType :Usage example: .. code-block:: python import supervisely as sly # Local folder with Project project_directory = "/home/admin/work/supervisely/source/project" # Convert Project to YOLO format sly.Project(project_directory).to_pascal_voc(log_progress=True) # or from supervisely.convert import to_pascal_voc to_pascal_voc(project_directory, dest_dir="./pascal_voc_project") """ from supervisely.convert import project_to_pascal_voc project_to_pascal_voc( project=self, dest_dir=dest_dir, train_val_split_coef=train_val_split_coef, log_progress=log_progress, progress_cb=progress_cb, )
def read_single_project( dir: str, project_class: Optional[ Union[ Project, sly.VideoProject, sly.VolumeProject, sly.PointcloudProject, sly.PointcloudEpisodeProject, ] ] = Project, ) -> Union[ Project, sly.VideoProject, sly.VolumeProject, sly.PointcloudProject, sly.PointcloudEpisodeProject, ]: """ Read project from given directory or tries to find project directory in subdirectories. :param dir: Path to directory, which contains project folder or have project folder in any subdirectory. :type dir: :class:`str` :param project_class: Project object of arbitrary modality :type project_class: :class: `Project` or `VideoProject` or `VolumeProject` or `PointcloudProject` or `PointcloudEpisodeProject`, optional :return: Project class object of arbitrary modality :rtype: :class: `Project` or `VideoProject` or `VolumeProject` or `PointcloudProject` or `PointcloudEpisodeProject` :raises: RuntimeError if the given directory and it's subdirectories contains more than one valid project folder. :raises: FileNotFoundError if the given directory or any of it's subdirectories doesn't contain valid project folder. :Usage example: .. code-block:: python import supervisely as sly proj_dir = "/home/admin/work/supervisely/source/project" # Project directory or directory with project subdirectory. project = sly.read_single_project(proj_dir) """ project_dirs = [project_dir for project_dir in find_project_dirs(dir, project_class)] if len(project_dirs) > 1: raise RuntimeError( f"The given directory {dir} and it's subdirectories contains more than one valid project folder. " f"The following project folders were found: {project_dirs}. " "Ensure that you have only one project in the given directory and it's subdirectories." ) elif len(project_dirs) == 0: raise FileNotFoundError( f"The given directory {dir} or any of it's subdirectories doesn't contain valid project folder." ) return project_class(project_dirs[0], OpenMode.READ) def find_project_dirs(dir: str, project_class: Optional[Project] = Project) -> Generator[str]: """Yields directories, that contain valid project folder in the given directory or in any of it's subdirectories. :param dir: Path to directory, which contains project folder or have project folder in any subdirectory. :type dir: str :param project_class: Project object :type project_class: :class:`Project<Project>` :return: Path to directory, that contain meta.json file. :rtype: str :Usage example: .. code-block:: python import supervisely as sly # Local folder (or any of it's subdirectories) which contains sly.Project files. input_directory = "/home/admin/work/supervisely/source" for project_dir in sly.find_project_dirs(input_directory): project_fs = sly.Project(meta_json_dir, sly.OpenMode.READ) # Do something with project_fs """ paths = list_dir_recursively(dir) for path in paths: if get_file_name_with_ext(path) == "meta.json": parent_dir = os.path.dirname(path) project_dir = os.path.join(dir, parent_dir) try: project_class(project_dir, OpenMode.READ) yield project_dir except Exception: pass def _download_project( api: sly.Api, project_id: int, dest_dir: str, dataset_ids: Optional[List[int]] = None, log_progress: bool = True, batch_size: Optional[int] = 50, only_image_tags: Optional[bool] = False, save_image_info: Optional[bool] = False, save_images: Optional[bool] = True, progress_cb: Optional[Callable] = None, save_image_meta: Optional[bool] = False, images_ids: Optional[List[int]] = None, resume_download: Optional[bool] = False, **kwargs, ): download_blob_files = kwargs.pop("download_blob_files", False) skip_create_readme = kwargs.pop("skip_create_readme", False) dataset_ids = set(dataset_ids) if (dataset_ids is not None) else None project_fs = None meta = ProjectMeta.from_json(api.project.get_meta(project_id, with_settings=True)) if os.path.exists(dest_dir) and resume_download: dump_json_file(meta.to_json(), os.path.join(dest_dir, "meta.json")) try: project_fs = Project(dest_dir, OpenMode.READ) except RuntimeError as e: if "Project is empty" in str(e): clean_dir(dest_dir) project_fs = None else: raise if project_fs is None: project_fs = Project(dest_dir, OpenMode.CREATE) project_fs.set_meta(meta) if progress_cb is not None: log_progress = False id_to_tagmeta = None if only_image_tags is True: id_to_tagmeta = meta.tag_metas.get_id_mapping() existing_datasets = {dataset.path: dataset for dataset in project_fs.datasets} for parents, dataset in api.dataset.tree(project_id): blob_files_to_download = {} dataset_path = Dataset._get_dataset_path(dataset.name, parents) dataset_id = dataset.id if dataset_ids is not None and dataset_id not in dataset_ids: continue if dataset_path in existing_datasets: dataset_fs = existing_datasets[dataset_path] else: dataset_fs = project_fs.create_dataset(dataset.name, dataset_path) all_images = api.image.get_list(dataset_id, force_metadata_for_links=False) images = [image for image in all_images if images_ids is None or image.id in images_ids] ds_total = len(images) ds_progress = progress_cb if log_progress is True: ds_progress = tqdm_sly( desc="Downloading images from {!r}".format(dataset.name), total=ds_total, ) anns_progress = None if log_progress or progress_cb is not None: anns_progress = tqdm_sly( desc="Downloading annotations from {!r}".format(dataset.name), total=ds_total, leave=False, ) with ApiContext( api, project_id=project_id, dataset_id=dataset_id, project_meta=meta, ): for batch in batched(images, batch_size): batch: List[ImageInfo] image_ids = [image_info.id for image_info in batch] image_names = [image_info.name for image_info in batch] existing_image_infos: Dict[str, ImageInfo] = {} for image_name in image_names: try: image_info = dataset_fs.get_item_info(image_name) except: image_info = None existing_image_infos[image_name] = image_info indexes_to_download = [] for i, image_info in enumerate(batch): existing_image_info = existing_image_infos[image_info.name] if ( existing_image_info is None or existing_image_info.updated_at != image_info.updated_at ): indexes_to_download.append(i) # Collect images that was added to the project as offsets from archive in Team Files indexes_with_offsets = [] for idx in indexes_to_download: image_info: ImageInfo = batch[idx] if image_info.related_data_id is not None: blob_files_to_download[image_info.related_data_id] = image_info.download_id indexes_with_offsets.append(idx) # Download images in numpy format batch_imgs_bytes = [None] * len(image_ids) if save_images and indexes_to_download: # For a lot of small files that stored in blob file. Downloads blob files to optimize download process. if download_blob_files and len(indexes_with_offsets) > 0: bytes_indexes_to_download = indexes_to_download.copy() for blob_file_id, download_id in blob_files_to_download.items(): if blob_file_id not in project_fs.blob_files: api.image.download_blob_file( project_id=project_id, download_id=download_id, path=os.path.join(project_fs.blob_dir, f"{blob_file_id}.tar"), log_progress=( True if log_progress or progress_cb is not None else False ), ) project_fs.add_blob_file(blob_file_id) # Process blob image offsets offsets_file_name = f"{blob_file_id}{OFFSETS_PKL_SUFFIX}" offsets_file_path = os.path.join( dataset_fs.directory, offsets_file_name ) # Initialize counter for total image offsets for this blob file total_offsets_count = 0 current_batch = [] # Get offsets from image infos for idx in indexes_with_offsets: image_info = batch[idx] if image_info.related_data_id == blob_file_id: blob_image_info = BlobImageInfo( name=image_info.name, offset_start=image_info.offset_start, offset_end=image_info.offset_end, ) current_batch.append(blob_image_info) bytes_indexes_to_download.remove(idx) # When batch size is reached, dump to file if len(current_batch) >= OFFSETS_PKL_BATCH_SIZE: BlobImageInfo.dump_to_pickle( current_batch, offsets_file_path ) total_offsets_count += len(current_batch) current_batch = [] # Dump any remaining items in the last batch if len(current_batch) > 0: BlobImageInfo.dump_to_pickle(current_batch, offsets_file_path) total_offsets_count += len(current_batch) if total_offsets_count > 0: logger.debug( f"Saved {total_offsets_count} image offsets for {blob_file_id} to {offsets_file_path} in {(total_offsets_count + OFFSETS_PKL_BATCH_SIZE - 1) // OFFSETS_PKL_BATCH_SIZE} batches" ) ds_progress(total_offsets_count) image_ids_to_download = [ image_ids[i] for i in bytes_indexes_to_download ] for index, img in zip( bytes_indexes_to_download, api.image.download_bytes( dataset_id, image_ids_to_download, progress_cb=ds_progress, ), ): batch_imgs_bytes[index] = img # If you want to download images in classic way else: image_ids_to_download = [image_ids[i] for i in indexes_to_download] for index, img in zip( indexes_to_download, api.image.download_bytes( dataset_id, image_ids_to_download, progress_cb=ds_progress, ), ): batch_imgs_bytes[index] = img if ds_progress is not None: ds_progress(len(batch) - len(indexes_to_download)) # download annotations in json format ann_jsons = [None] * len(image_ids) if only_image_tags is False: if indexes_to_download: for index, ann_info in zip( indexes_to_download, api.annotation.download_batch( dataset_id, [image_ids[i] for i in indexes_to_download], progress_cb=anns_progress, ), ): ann_jsons[index] = ann_info.annotation else: if indexes_to_download: for index in indexes_to_download: image_info = batch[index] tags = TagCollection.from_api_response( image_info.tags, meta.tag_metas, id_to_tagmeta, ) tmp_ann = Annotation( img_size=(image_info.height, image_info.width), img_tags=tags ) ann_jsons[index] = tmp_ann.to_json() if anns_progress is not None: anns_progress(len(indexes_to_download)) if anns_progress is not None: anns_progress(len(batch) - len(indexes_to_download)) for img_info, name, img_bytes, ann in zip( batch, image_names, batch_imgs_bytes, ann_jsons ): dataset_fs: Dataset # to fix already downloaded images that doesn't have info files dataset_fs.delete_item(name) dataset_fs.add_item_raw_bytes( item_name=name, item_raw_bytes=img_bytes if save_images is True else None, ann=dataset_fs.get_ann(name, meta) if ann is None else ann, img_info=img_info if save_image_info is True else None, ) if save_image_meta: meta_dir = dataset_fs.meta_dir for image_info in images: if image_info.meta: sly.fs.mkdir(meta_dir) sly.json.dump_json_file( image_info.meta, dataset_fs.get_item_meta_path(image_info.name) ) # delete redundant items items_names_set = set([img.name for img in all_images]) for item_name in dataset_fs.get_items_names(): if item_name not in items_names_set: dataset_fs.delete_item(item_name) if not skip_create_readme: try: if download_blob_files: project_info = api.project.get_info_by_id(project_id) create_blob_readme(project_fs=project_fs, project_info=project_info, api=api) else: create_readme(dest_dir, project_id, api) except Exception as e: logger.info(f"There was an error while creating README: {e}") def upload_project( dir: str, api: Api, workspace_id: int, project_name: Optional[str] = None, log_progress: bool = True, progress_cb: Optional[Union[tqdm, Callable]] = None, project_id: Optional[int] = None, ) -> Tuple[int, str]: project_fs = read_single_project(dir) if not project_id: if project_name is None: project_name = project_fs.name if api.project.exists(workspace_id, project_name): project_name = api.project.get_free_name(workspace_id, project_name) project = api.project.create(workspace_id, project_name, change_name_if_conflict=True) else: project = api.project.get_info_by_id(project_id) updated_meta = api.project.update_meta(project.id, project_fs.meta.to_json()) if progress_cb is not None: log_progress = False # image_id_dct, anns_paths_dct = {}, {} dataset_map = {} total_blob_size = 0 upload_blob_progress = None src_paths = [] dst_paths = [] for blob_file in project_fs.blob_files: if log_progress: total_blob_size += os.path.getsize(os.path.join(project_fs.blob_dir, blob_file)) src_paths.append(os.path.join(project_fs.blob_dir, blob_file)) dst_paths.append(os.path.join(f"/{TF_BLOB_DIR}", blob_file)) if log_progress and len(src_paths) > 0: upload_blob_progress = tqdm_sly( desc="Uploading blob files", total=total_blob_size, unit="B", unit_scale=True ) if len(src_paths) > 0: blob_file_infos = api.file.upload_bulk( team_id=project.team_id, src_paths=src_paths, dst_paths=dst_paths, progress_cb=upload_blob_progress, ) else: blob_file_infos = [] for ds_fs in project_fs.datasets: if len(ds_fs.parents) > 0: parent = f"{os.path.sep}".join(ds_fs.parents) parent_id = dataset_map.get(parent) else: parent = "" parent_id = None dataset = api.dataset.create(project.id, ds_fs.short_name, parent_id=parent_id) dataset_map[os.path.join(parent, dataset.name)] = dataset.id ds_fs: Dataset with ApiContext( api, project_id=project.id, dataset_id=dataset.id, project_meta=updated_meta, ): names, img_paths, img_infos, ann_paths = [], [], [], [] for item_name in ds_fs: img_path, ann_path = ds_fs.get_item_paths(item_name) img_info_path = ds_fs.get_img_info_path(item_name) names.append(item_name) img_paths.append(img_path) ann_paths.append(ann_path) if os.path.isfile(img_info_path): img_infos.append(ds_fs.get_image_info(item_name=item_name)) else: img_infos.append(None) # img_paths = list(filter(lambda x: os.path.isfile(x), img_paths)) source_img_paths_len = len(img_paths) valid_indices = [] valid_paths = [] offset_indices = [] for i, path in enumerate(img_paths): if os.path.isfile(path): valid_indices.append(i) valid_paths.append(path) else: offset_indices.append(i) img_paths = valid_paths ann_paths = list(filter(lambda x: os.path.isfile(x), ann_paths)) # Create a mapping from name to index position for quick lookups offset_name_to_idx = {names[i]: i for i in offset_indices} metas = [{} for _ in names] img_infos_count = sum(1 for item in img_infos if item is not None) if len(img_paths) == 0 and img_infos_count == 0 and len(offset_indices) == 0: # Dataset is empty continue meta_dir = os.path.join(dir, ds_fs.name, "meta") if os.path.isdir(meta_dir): metas = [] for name in names: meta_path = os.path.join(meta_dir, name + ".json") if os.path.isfile(meta_path): metas.append(sly.json.load_json_file(meta_path)) else: metas.append({}) ds_progress = progress_cb if log_progress is True: ds_progress = tqdm_sly( desc="Uploading images to {!r}".format(dataset.name), total=len(names), ) if img_infos_count != 0: merged_metas = [] for img_info, meta in zip(img_infos, metas): if img_info is None: merged_metas.append(meta) continue merged_meta = {**(img_info.meta or {}), **meta} merged_metas.append(merged_meta) metas = merged_metas if len(img_paths) != 0 or len(offset_indices) != 0: uploaded_img_infos = [None] * source_img_paths_len uploaded_img_infos_paths = api.image.upload_paths( dataset_id=dataset.id, names=[name for i, name in enumerate(names) if i in valid_indices], paths=img_paths, progress_cb=ds_progress, metas=[metas[i] for i in valid_indices], ) for i, img_info in zip(valid_indices, uploaded_img_infos_paths): uploaded_img_infos[i] = img_info for blob_offsets in ds_fs.blob_offsets: blob_file = None for blob_file_info in blob_file_infos: if Path(blob_file_info.name).stem == removesuffix( Path(blob_offsets).name, OFFSETS_PKL_SUFFIX ): blob_file = blob_file_info break if blob_file is None: raise ValueError( f"Cannot find blob file for offsets: {blob_offsets}. " f"Check the Team File directory '{TF_BLOB_DIR}', corresponding blob file should be uploaded." ) uploaded_img_infos_offsets = api.image.upload_by_offsets_generator( dataset=dataset, team_file_id=blob_file.id, offsets_file_path=blob_offsets, progress_cb=ds_progress, metas={names[i]: metas[i] for i in offset_indices}, ) for img_info_batch in uploaded_img_infos_offsets: for img_info in img_info_batch: idx = offset_name_to_idx.get(img_info.name) if idx is not None: uploaded_img_infos[idx] = img_info elif img_infos_count != 0: if img_infos_count != len(names): raise ValueError( f"Cannot upload Project: image info files count ({img_infos_count}) doesn't match with images count ({len(names)}) that are going to be uploaded. " "Check the directory structure, all annotation files should have corresponding image info files." ) uploaded_img_infos = api.image.upload_ids( dataset_id=dataset.id, names=names, ids=[img_info.id for img_info in img_infos], progress_cb=ds_progress, metas=metas, ) else: raise ValueError( "Cannot upload Project: img_paths is empty and img_infos_paths is empty" ) # image_id_dct[ds_fs.name] = image_ids = [img_info.id for img_info in uploaded_img_infos] # anns_paths_dct[ds_fs.name] = ann_paths anns_progress = None if log_progress or progress_cb is not None: anns_progress = tqdm_sly( desc="Uploading annotations to {!r}".format(dataset.name), total=len(image_ids), leave=False, ) api.annotation.upload_paths(image_ids, ann_paths, anns_progress) return project.id, project.name def download_project( api: Api, project_id: int, dest_dir: str, dataset_ids: Optional[List[int]] = None, log_progress: bool = True, batch_size: Optional[int] = 50, cache: Optional[FileCache] = None, progress_cb: Optional[Union[tqdm, Callable]] = None, only_image_tags: Optional[bool] = False, save_image_info: Optional[bool] = False, save_images: bool = True, save_image_meta: bool = False, images_ids: Optional[List[int]] = None, resume_download: Optional[bool] = False, **kwargs, ) -> None: """ Download image project to the local directory. :param api: Supervisely API address and token. :type api: Api :param project_id: Project ID to download :type project_id: int :param dest_dir: Destination path to local directory. :type dest_dir: str :param dataset_ids: Specified list of Dataset IDs which will be downloaded. :type dataset_ids: list(int), optional :param log_progress: Show downloading logs in the output. By default, it is True. :type log_progress: bool, optional :param batch_size: Size of a downloading batch. :type batch_size: int, optional :param cache: Cache of downloading files. :type cache: FileCache, optional :param progress_cb: Function for tracking download progress. :type progress_cb: tqdm or callable, optional :param only_image_tags: Specify if downloading images only with image tags. Alternatively, full annotations will be downloaded. :type only_image_tags: bool, optional :param save_image_info: Include image info in the download. :type save_image_info, bool, optional :param save_images: Include images in the download. :type save_images, bool, optional :param save_image_meta: Include images metadata in JSON format in the download. :type save_imgge_meta: bool, optional :param images_ids: Specified list of Image IDs which will be downloaded. :type images_ids: list(int), optional :param resume_download: Resume download enables to download only missing files avoiding erase of existing files. :type resume_download: bool, optional :param download_blob_files: Default is False. It will download images in classic way. If True, it will download blob files, if they are present in the project, to optimize download process. :type download_blob_files: bool, optional :param skip_create_readme: Skip creating README.md file. Default is False. :type skip_create_readme: bool, optional :return: None. :rtype: NoneType :Usage example: .. code-block:: python import os from dotenv import load_dotenv from tqdm import tqdm import supervisely as sly # Load secrets and create API object from .env file (recommended) # Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication if sly.is_development(): load_dotenv(os.path.expanduser("~/supervisely.env")) api = sly.Api.from_env() # Pass values into the API constructor (optional, not recommended) # api = sly.Api(server_address="https://app.supervisely.com", token="4r47N...xaTatb") dest_dir = 'your/local/dest/dir' # Download image project project_id = 17732 project_info = api.project.get_info_by_id(project_id) num_images = project_info.items_count p = tqdm(desc="Downloading image project", total=num_images) sly.download( api, project_id, dest_dir, progress_cb=p, ) """ if cache is None: _download_project( api, project_id, dest_dir, dataset_ids, log_progress, batch_size, only_image_tags=only_image_tags, save_image_info=save_image_info, save_images=save_images, progress_cb=progress_cb, save_image_meta=save_image_meta, images_ids=images_ids, resume_download=resume_download, **kwargs, ) else: _download_project_optimized( api, project_id, dest_dir, dataset_ids, cache, progress_cb, only_image_tags=only_image_tags, save_image_info=save_image_info, save_images=save_images, log_progress=log_progress, images_ids=images_ids, **kwargs, ) def _download_project_optimized( api: Api, project_id, project_dir, datasets_whitelist=None, cache=None, progress_cb=None, only_image_tags=False, save_image_info=False, save_images=True, log_progress=True, images_ids: List[int] = None, **kwargs, ): skip_create_readme = kwargs.pop("skip_create_readme", False) project_info = api.project.get_info_by_id(project_id) project_id = project_info.id logger.info("Annotations are not cached (always download latest version from server)") project_fs = Project(project_dir, OpenMode.CREATE) meta = ProjectMeta.from_json(api.project.get_meta(project_id, with_settings=True)) project_fs.set_meta(meta) if progress_cb is not None: log_progress = False for parents, dataset in api.dataset.tree(project_id): dataset_path = Dataset._get_dataset_path(dataset.name, parents) need_download = True if datasets_whitelist is not None and dataset.id not in datasets_whitelist: need_download = False if need_download is True: ds_progress = progress_cb if log_progress: ds_total = dataset.images_count if images_ids is not None: ds_total = len( api.image.get_list( dataset.id, filters=[{"field": "id", "operator": "in", "value": images_ids}], ) ) ds_progress = tqdm_sly( desc="Downloading images from {!r}".format(dataset.name), total=ds_total, ) dataset_fs = project_fs.create_dataset(dataset.name, dataset_path) _download_dataset( api, dataset_fs, dataset.id, cache=cache, progress_cb=ds_progress, project_meta=meta, only_image_tags=only_image_tags, save_image_info=save_image_info, save_images=save_images, images_ids=images_ids, ) if not skip_create_readme: try: create_readme(project_dir, project_id, api) except Exception as e: logger.info(f"There was an error while creating README: {e}") def _split_images_by_cache(images, cache): images_to_download = [] images_in_cache = [] images_cache_paths = [] for image in images: _, effective_ext = os.path.splitext(image.name) if len(effective_ext) == 0: # Fallback for the old format where we were cutting off extensions from image names. effective_ext = image.ext cache_path = cache.check_storage_object(image.hash, effective_ext) if cache_path is None: images_to_download.append(image) else: images_in_cache.append(image) images_cache_paths.append(cache_path) return images_to_download, images_in_cache, images_cache_paths def _maybe_append_image_extension(name, ext): name_split = os.path.splitext(name) if name_split[1] == "": normalized_ext = ("." + ext).replace("..", ".") result = name + normalized_ext sly_image.validate_ext(result) else: result = name return result def _download_dataset( api: Api, dataset: Dataset, dataset_id: int, cache=None, progress_cb=None, project_meta: ProjectMeta = None, only_image_tags=False, save_image_info=False, save_images=True, images_ids: List[int] = None, ): image_filters = None if images_ids is not None: image_filters = [{"field": "id", "operator": "in", "value": images_ids}] images = api.image.get_list(dataset_id, filters=image_filters) images_to_download = images if only_image_tags is True: if project_meta is None: raise ValueError("Project Meta is not defined") # pylint: disable=possibly-used-before-assignment id_to_tagmeta = project_meta.tag_metas.get_id_mapping() anns_progress = None if progress_cb is not None: anns_progress = tqdm_sly( desc="Downloading annotations from {!r}".format(dataset.name), total=len(images), leave=False, ) # copy images from cache to task folder and download corresponding annotations if cache: ( images_to_download, images_in_cache, images_cache_paths, ) = _split_images_by_cache(images, cache) if len(images_to_download) + len(images_in_cache) != len(images): raise RuntimeError("Error with images cache during download. Please contact support.") logger.info( f"Download dataset: {dataset.name}", extra={ "total": len(images), "in cache": len(images_in_cache), "to download": len(images_to_download), }, ) if len(images_in_cache) > 0: img_cache_ids = [img_info.id for img_info in images_in_cache] if only_image_tags is False: with ApiContext( api, dataset_id=dataset_id, project_meta=project_meta, ): ann_info_list = api.annotation.download_batch( dataset_id, img_cache_ids, anns_progress ) img_name_to_ann = {ann.image_id: ann.annotation for ann in ann_info_list} else: img_name_to_ann = {} for image_info in images_in_cache: # pylint: disable=possibly-used-before-assignment tags = TagCollection.from_api_response( image_info.tags, project_meta.tag_metas, id_to_tagmeta, ) tmp_ann = Annotation( img_size=(image_info.height, image_info.width), img_tags=tags ) img_name_to_ann[image_info.id] = tmp_ann.to_json() if progress_cb is not None: progress_cb(len(images_in_cache)) for batch in batched(list(zip(images_in_cache, images_cache_paths)), batch_size=50): for img_info, img_cache_path in batch: item_name = _maybe_append_image_extension(img_info.name, img_info.ext) img_info_to_add = None if save_image_info is True: img_info_to_add = img_info dataset.add_item_file( item_name, item_path=img_cache_path if save_images is True else None, ann=img_name_to_ann[img_info.id], _validate_item=False, _use_hardlink=True, item_info=img_info_to_add, ) if progress_cb is not None: progress_cb(len(batch)) # download images from server if len(images_to_download) > 0: # prepare lists for api methods img_ids = [] img_paths = [] for img_info in images_to_download: img_ids.append(img_info.id) img_paths.append( os.path.join( dataset.item_dir, _maybe_append_image_extension(img_info.name, img_info.ext), ) ) # download annotations if only_image_tags is False: ann_info_list = api.annotation.download_batch(dataset_id, img_ids, anns_progress) img_name_to_ann = {ann.image_id: ann.annotation for ann in ann_info_list} else: img_name_to_ann = {} for image_info in images_to_download: tags = TagCollection.from_api_response( image_info.tags, project_meta.tag_metas, id_to_tagmeta ) tmp_ann = Annotation(img_size=(image_info.height, image_info.width), img_tags=tags) img_name_to_ann[image_info.id] = tmp_ann.to_json() if progress_cb is not None: progress_cb(len(images_to_download)) # download images and write to dataset for img_info_batch in batched(images_to_download): if save_images: images_ids_batch = [image_info.id for image_info in img_info_batch] images_nps = api.image.download_nps( dataset_id, images_ids_batch, progress_cb=progress_cb ) else: images_nps = [None] * len(img_info_batch) for index, image_np in enumerate(images_nps): img_info = img_info_batch[index] image_name = _maybe_append_image_extension(img_info.name, img_info.ext) dataset.add_item_np( item_name=image_name, img=image_np if save_images is True else None, ann=img_name_to_ann[img_info.id], img_info=img_info if save_image_info is True else None, ) if cache is not None and save_images is True: img_hashes = [img_info.hash for img_info in images_to_download] cache.write_objects(img_paths, img_hashes) def create_readme( project_dir: str, project_id: int, api: sly.Api, ) -> str: """Creates a README.md file using the template, adds general information about the project and creates a dataset structure section. :param project_dir: Path to the project directory. :type project_dir: str :param project_id: Project ID. :type project_id: int :param api: Supervisely API address and token. :type api: :class:`Api<supervisely.api.api.Api>` :return: Path to the created README.md file. :rtype: str :Usage example: .. code-block:: python import supervisely as sly api = sly.Api.from_env() project_id = 123 project_dir = "/path/to/project" readme_path = sly.create_readme(project_dir, project_id, api) print(f"README.md file was created at {readme_path}") """ current_path = os.path.dirname(os.path.abspath(__file__)) template_path = os.path.join(current_path, "readme_template.md") with open(template_path, "r") as file: template = file.read() project_info = api.project.get_info_by_id(project_id) sly.fs.mkdir(project_dir) readme_path = os.path.join(project_dir, "README.md") template = template.replace("{{general_info}}", _project_info_md(project_info)) template = template.replace( "{{dataset_structure_info}}", _dataset_structure_md(project_info, api) ) template = template.replace( "{{dataset_description_info}}", _dataset_descriptions_md(project_info, api) ) with open(readme_path, "w") as f: f.write(template) return readme_path def _dataset_blob_structure_md( project_fs: Project, project_info: sly.ProjectInfo, entity_limit: Optional[int] = 2, ) -> str: """Creates a markdown string with the dataset structure of the project. Supports only images and videos projects. :project_fs: Project file system. :type project_fs: :class:`Project<supervisely.project.project.Project>` :param project_info: Project information. :type project_info: :class:`ProjectInfo<supervisely.project.project_info.ProjectInfo>` :param entity_limit: The maximum number of entities to display in the README. :type entity_limit: int, optional :return: Markdown string with the dataset structure of the project. :rtype: str """ supported_project_types = [sly.ProjectType.IMAGES.value] if project_info.type not in supported_project_types: return "" entity_icons = { "images": " 🏞️ ", "blob_files": " 📦 ", "pkl_files": " 📄 ", "annotations": " 📝 ", } dataset_icon = " 📂 " folder_icon = " 📁 " result_md = f"🗂️ {project_info.name}<br>" # Add project-level blob files if os.path.exists(project_fs.blob_dir) and project_fs.blob_files: result_md += "┣" + folder_icon + f"{Project.blob_dir_name}<br>" blob_files = [entry.name for entry in os.scandir(project_fs.blob_dir) if entry.is_file()] for idx, blob_file in enumerate(blob_files): if idx == entity_limit and len(blob_files) > entity_limit: result_md += "┃ ┗ ... " + str(len(blob_files) - entity_limit) + " more<br>" break symbol = "┗" if idx == len(blob_files) - 1 or idx == entity_limit - 1 else "┣" result_md += "┃ " + symbol + entity_icons["blob_files"] + blob_file + "<br>" # Build a dataset hierarchy tree dataset_tree = {} root_datasets = [] # First pass: create nodes for all datasets for dataset in project_fs.datasets: dataset_tree[dataset.directory] = { "dataset": dataset, "children": [], "parent_dir": os.path.dirname(dataset.directory) if dataset.parents else None, } # Second pass: build parent-child relationships for dir_path, node in dataset_tree.items(): parent_dir = node["parent_dir"] if parent_dir in dataset_tree: dataset_tree[parent_dir]["children"].append(dir_path) else: root_datasets.append(dir_path) # Function to recursively render the dataset tree def render_tree(dir_path, prefix=""): nonlocal result_md node = dataset_tree[dir_path] dataset = node["dataset"] children = node["children"] # Create dataset display with proper path dataset_path = Dataset._get_dataset_path(dataset.name, dataset.parents) result_md += prefix + "┣" + dataset_icon + f"[{dataset.name}]({dataset_path})<br>" # Set indentation for dataset content content_prefix = prefix + "┃ " # Add pkl files at the dataset level offset_files = [ entry.name for entry in os.scandir(dataset.directory) if entry.is_file() and entry.name.endswith(".pkl") ] if offset_files: for idx, pkl_file in enumerate(offset_files): last_file = idx == len(offset_files) - 1 has_more_content = ( os.path.exists(dataset.img_dir) or os.path.exists(dataset.ann_dir) or children ) symbol = "┗" if last_file and not has_more_content else "┣" result_md += content_prefix + symbol + entity_icons["pkl_files"] + pkl_file + "<br>" # Add img directory if os.path.exists(dataset.img_dir): has_ann_dir = os.path.exists(dataset.ann_dir) has_more_content = has_ann_dir or children symbol = "┣" if has_more_content else "┗" result_md += content_prefix + symbol + folder_icon + "img<br>" # Add image files entities = [entry.name for entry in os.scandir(dataset.img_dir) if entry.is_file()] entities = sorted(entities) selected_entities = entities[: min(len(entities), entity_limit)] img_prefix = content_prefix + "┃ " for idx, entity in enumerate(selected_entities): last_img = idx == len(selected_entities) - 1 symbol = "┗" if last_img and len(entities) <= entity_limit else "┣" result_md += img_prefix + symbol + entity_icons["images"] + entity + "<br>" if len(entities) > entity_limit: result_md += img_prefix + "┗ ... " + str(len(entities) - entity_limit) + " more<br>" # Add ann directory if os.path.exists(dataset.ann_dir): has_more_content = bool(children) symbol = "┣" result_md += content_prefix + "┣" + folder_icon + "ann<br>" anns = [entry.name for entry in os.scandir(dataset.ann_dir) if entry.is_file()] anns = sorted(anns) # Try to match annotations with displayed images possible_anns = [f"{entity}.json" for entity in selected_entities] matched_anns = [pa for pa in possible_anns if pa in anns] # Add additional annotations if we haven't reached the limit if len(matched_anns) < min(entity_limit, len(anns)): for ann in anns: if ann not in matched_anns and len(matched_anns) < entity_limit: matched_anns.append(ann) ann_prefix = content_prefix + "┃ " for idx, ann in enumerate(matched_anns): last_ann = idx == len(matched_anns) - 1 symbol = "┗" if last_ann and len(anns) <= entity_limit else "┣" result_md += ann_prefix + symbol + entity_icons["annotations"] + ann + "<br>" if len(anns) > entity_limit: result_md += ann_prefix + "┗ ... " + str(len(anns) - entity_limit) + " more<br>" if not has_more_content: result_md += content_prefix + "...<br>" # Recursively render child datasets for idx, child_dir in enumerate(children): render_tree(child_dir, content_prefix) # Start rendering from root datasets for root_dir in sorted(root_datasets): render_tree(root_dir) return result_md def create_blob_readme( project_fs: Project, project_info: ProjectInfo, api: Api, ) -> str: """Creates a README.md file using the template, adds general information about the project and creates a dataset structure section. :param project_fs: Project file system. :type project_fs: :class:`Project<supervisely.project.project.Project>` :param project_info: Project information. :type project_info: :class:`ProjectInfo<supervisely.project.project_info.ProjectInfo>` :return: Path to the created README.md file. :rtype: str :Usage example: .. code-block:: python import supervisely as sly api = sly.Api.from_env() project_id = 123 project_dir = "/path/to/project" readme_path = sly.create_readme(project_dir, project_id, api) print(f"README.md file was created at {readme_path}") """ current_path = os.path.dirname(os.path.abspath(__file__)) template_path = os.path.join(current_path, "readme_template.md") with open(template_path, "r") as file: template = file.read() readme_path = os.path.join(project_fs.directory, "README.md") template = template.replace("{{general_info}}", _project_info_md(project_info)) template = template.replace( "{{dataset_structure_info}}", _dataset_blob_structure_md(project_fs, project_info) ) template = template.replace( "{{dataset_description_info}}", _dataset_descriptions_md(project_info, api) ) with open(readme_path, "w") as f: f.write(template) return readme_path def _project_info_md(project_info: sly.ProjectInfo) -> str: """Creates a markdown string with general information about the project using the fields of the ProjectInfo NamedTuple. :param project_info: Project information. :type project_info: :class:`ProjectInfo<supervisely.project.project_info.ProjectInfo>` :return: Markdown string with general information about the project. :rtype: str """ result_md = "" # Iterating over fields of a NamedTuple. for field in project_info._fields: value = getattr(project_info, field) if not value or not isinstance(value, (str, int)): # To avoid useless information in the README. continue result_md += f"\n**{snake_to_human(field)}:** {value}<br>" return result_md def _dataset_structure_md( project_info: sly.ProjectInfo, api: sly.Api, entity_limit: Optional[int] = 4 ) -> str: """Creates a markdown string with the dataset structure of the project. Supports only images and videos projects. :param project_info: Project information. :type project_info: :class:`ProjectInfo<supervisely.project.project_info.ProjectInfo>` :param api: Supervisely API address and token. :type api: :class:`Api<supervisely.api.api.Api>` :param entity_limit: The maximum number of entities to display in the README. This is the limit for top level datasets and items in the dataset at the same time. :type entity_limit: int, optional :return: Markdown string with the dataset structure of the project. :rtype: str """ # TODO: Add support for other project types. supported_project_types = [sly.ProjectType.IMAGES.value, sly.ProjectType.VIDEOS.value] if project_info.type not in supported_project_types: return "" list_functions = { "images": api.image.get_list, "videos": api.video.get_list, } entity_icons = { "images": " 🏞️ ", "videos": " 🎥 ", "blob_files": " 📦 ", "pkl_files": " 📄 ", "annotations": " 📝 ", } dataset_icon = " 📂 " list_function = list_functions[project_info.type] entity_icon = entity_icons[project_info.type] result_md = f"🗂️ {project_info.name}<br>" # Build a dataset hierarchy tree dataset_tree = {} root_datasets = [] for parents, dataset_info in api.dataset.tree(project_info.id): level = len(parents) parent_id = dataset_info.parent_id if level == 0: # Root dataset root_datasets.append(dataset_info) dataset_tree[dataset_info.id] = { "info": dataset_info, "path": Dataset._get_dataset_path(dataset_info.name, parents), "level": level, "parents": parents, "children": [], } # Connect parents with children for ds_id, ds_data in dataset_tree.items(): parent_id = ds_data["info"].parent_id if parent_id in dataset_tree: dataset_tree[parent_id]["children"].append(ds_id) # Display only top entity_limit root datasets if len(root_datasets) > entity_limit: root_datasets = root_datasets[:entity_limit] result_md += f"(Showing only {entity_limit} top-level datasets)<br>" # Function to render a dataset and its children up to a certain depth def render_dataset(ds_id, current_depth=0, max_depth=2): if current_depth > max_depth: return ds_data = dataset_tree[ds_id] ds_info = ds_data["info"] basic_indent = "┃ " * current_depth # Render the dataset result_md.append( basic_indent + "┣ " + dataset_icon + f"[{ds_info.name}]({ds_data['path']})" + "<br>" ) # Render items in the dataset entity_infos = list_function(ds_info.id) for idx, entity_info in enumerate(entity_infos): if idx == entity_limit: result_md.append( basic_indent + "┃ ┗ ... " + str(len(entity_infos) - entity_limit) + " more<br>" ) break symbol = "┗" if idx == len(entity_infos) - 1 else "┣" result_md.append(basic_indent + "┃ " + symbol + entity_icon + entity_info.name + "<br>") # Render children (limited to entity_limit) children = ds_data["children"] if len(children) > entity_limit: children = children[:entity_limit] result_md.append(basic_indent + f"┃ (Showing only {entity_limit} child datasets)<br>") for child_id in children: render_dataset(child_id, current_depth + 1, max_depth) # Render each root dataset result_md = [result_md] # Convert to list for appending in the recursive function for root_ds in root_datasets: render_dataset(root_ds.id) return "".join(result_md) def _dataset_descriptions_md(project_info: sly.ProjectInfo, api: sly.Api) -> str: """Creates a markdown string with dictionary of descriptions and custom data of datasets. :param project_info: Project information. :type project_info: :class:`ProjectInfo<supervisely.project.project_info.ProjectInfo>` :param api: Supervisely API address and token. :type api: :class:`Api<supervisely.api.api.Api>` :return: Markdown string with dictionary of descriptions and custom data of datasets. :rtype: str """ data_found = False result_md = "All datasets in the project can have their own descriptions and custom data. You can add or edit the description and custom data of a dataset in the datasets list page. In this section, you can find this information for each dataset by dataset name (e.g. `ds1/ds2/ds3`, where `ds1` and `ds2` are parent datasets for `ds3` dataset).<br>" result_md += "\n\n```json\n{\n" for parents, dataset_info in api.dataset.tree(project_info.id): dataset_info = api.dataset.get_info_by_id(dataset_info.id) full_ds_name = "/".join(parents + [dataset_info.name]) if dataset_info.description or dataset_info.custom_data: data_found = True result_md += f' "{full_ds_name}": {{\n' if dataset_info.description: result_md += f' "description": "{dataset_info.description}",\n' if dataset_info.custom_data: formated_custom_data = json.dumps(dataset_info.custom_data, indent=4) formated_custom_data = formated_custom_data.replace("\n", "\n ") result_md += f' "custom_data": {formated_custom_data}\n' result_md += " },\n" result_md += "}\n```" if not data_found: result_md = "_No dataset descriptions or custom data found in the project._" return result_md async def _download_project_async( api: sly.Api, project_id: int, dest_dir: str, dataset_ids: Optional[List[int]] = None, log_progress: bool = True, semaphore: asyncio.Semaphore = None, only_image_tags: Optional[bool] = False, save_image_info: Optional[bool] = False, save_images: Optional[bool] = True, progress_cb: Optional[Union[tqdm, Callable]] = None, save_image_meta: Optional[bool] = False, images_ids: Optional[List[int]] = None, resume_download: Optional[bool] = False, **kwargs, ): """ Download image project to the local directory asynchronously. Uses queue and semaphore to control the number of parallel downloads. Every image goes through size check to decide if it should be downloaded in bulk or one by one. Checked images are split into two lists: small and large. Small images are downloaded in bulk, large images are downloaded one by one. As soon as the task is created, it is put into the queue. Workers take tasks from the queue and execute them. """ # to switch between single and bulk download switch_size = kwargs.get("switch_size", 1.28 * 1024 * 1024) # batch size for bulk download batch_size = kwargs.get("batch_size", 100) # control whether to download blob files download_blob_files = kwargs.get("download_blob_files", False) # control whether to create README file skip_create_readme = kwargs.get("skip_create_readme", False) if semaphore is None: semaphore = api.get_default_semaphore() dataset_ids = set(dataset_ids) if (dataset_ids is not None) else None project_fs = None meta = ProjectMeta.from_json(api.project.get_meta(project_id, with_settings=True)) if os.path.exists(dest_dir) and resume_download: dump_json_file(meta.to_json(), os.path.join(dest_dir, "meta.json")) try: project_fs = Project(dest_dir, OpenMode.READ) except RuntimeError as e: if "Project is empty" in str(e): clean_dir(dest_dir) project_fs = None else: raise if project_fs is None: project_fs = Project(dest_dir, OpenMode.CREATE) project_fs.set_meta(meta) if progress_cb is not None: log_progress = False id_to_tagmeta = None if only_image_tags is True: id_to_tagmeta = meta.tag_metas.get_id_mapping() existing_datasets = {dataset.path: dataset for dataset in project_fs.datasets} for parents, dataset in api.dataset.tree(project_id): dataset_path = Dataset._get_dataset_path(dataset.name, parents) dataset_id = dataset.id if dataset_ids is not None and dataset_id not in dataset_ids: continue if dataset_path in existing_datasets: dataset_fs = existing_datasets[dataset_path] else: dataset_fs = project_fs.create_dataset(dataset.name, dataset_path) force_metadata_for_links = False if save_images is False and only_image_tags is True: force_metadata_for_links = True all_images = api.image.get_list_generator_async( dataset_id, force_metadata_for_links=force_metadata_for_links, dataset_info=dataset ) small_images = [] large_images = [] dataset_images = [] blob_files_to_download = {} blob_images = [] async for image_batch in all_images: for image in image_batch: if images_ids is None or image.id in images_ids: dataset_images.append(image) # Check for images with blob offsets if download_blob_files and image.related_data_id is not None: blob_files_to_download[image.related_data_id] = image.download_id blob_images.append(image) elif image.size < switch_size: small_images.append(image) else: large_images.append(image) ds_progress = progress_cb if log_progress is True: ds_progress = tqdm_sly( desc="Downloading images from {!r}".format(dataset.name), total=len(small_images) + len(large_images) + len(blob_images), leave=False, ) with ApiContext( api, project_id=project_id, dataset_id=dataset_id, project_meta=meta, ): async def check_items(check_list: List[sly.ImageInfo]): to_download = [] for image in check_list: try: existing = dataset_fs.get_item_info(image.name) except: to_download.append(image) else: if existing.updated_at != image.updated_at: to_download.append(image) elif ds_progress is not None: ds_progress(1) return to_download async def run_tasks_with_delay(tasks, delay=0.1): created_tasks = [] for task in tasks: created_task = asyncio.create_task(task) created_tasks.append(created_task) await asyncio.sleep(delay) logger.debug( f"{len(created_tasks)} tasks have been created for dataset ID: {dataset.id}, Name: {dataset.name}" ) return created_tasks # Download blob files if required if download_blob_files and len(blob_files_to_download) > 0: blob_paths = [] download_ids = [] # Process each blob file for blob_file_id, download_id in blob_files_to_download.items(): if blob_file_id not in project_fs.blob_files: # Download the blob file blob_paths.append(os.path.join(project_fs.blob_dir, f"{blob_file_id}.tar")) download_ids.append(download_id) await api.image.download_blob_files_async( project_id=project_id, download_ids=download_ids, paths=blob_paths, semaphore=semaphore, log_progress=(True if log_progress or progress_cb is not None else False), ) for blob_file_id, download_id in blob_files_to_download.items(): project_fs.add_blob_file(blob_file_id) # Process blob image offsets offsets_file_name = f"{blob_file_id}{OFFSETS_PKL_SUFFIX}" offsets_file_path = os.path.join(dataset_fs.directory, offsets_file_name) total_offsets_count = 0 # for logging current_batch = [] for img in blob_images: if img.related_data_id == blob_file_id: blob_image_info = BlobImageInfo( name=img.name, offset_start=img.offset_start, offset_end=img.offset_end, ) current_batch.append(blob_image_info) if len(current_batch) >= OFFSETS_PKL_BATCH_SIZE: BlobImageInfo.dump_to_pickle(current_batch, offsets_file_path) total_offsets_count += len(current_batch) current_batch = [] if len(current_batch) > 0: BlobImageInfo.dump_to_pickle(current_batch, offsets_file_path) total_offsets_count += len(current_batch) if total_offsets_count > 0: logger.debug( f"Saved {total_offsets_count} image offsets for {blob_file_id} to {offsets_file_path} in {(total_offsets_count + OFFSETS_PKL_BATCH_SIZE - 1) // OFFSETS_PKL_BATCH_SIZE} batches" ) offset_tasks = [] # Download annotations for images with offsets for offsets_batch in batched(blob_images, batch_size=batch_size): offset_task = _download_project_items_batch_async( api=api, dataset_id=dataset_id, img_infos=offsets_batch, meta=meta, dataset_fs=dataset_fs, id_to_tagmeta=id_to_tagmeta, semaphore=semaphore, save_images=False, save_image_info=save_image_info, only_image_tags=only_image_tags, progress_cb=ds_progress, ) offset_tasks.append(offset_task) created_tasks = await run_tasks_with_delay(offset_tasks, 0.05) await asyncio.gather(*created_tasks) tasks = [] # Check which images need to be downloaded small_images = await check_items(small_images) large_images = await check_items(large_images) # If only one small image, treat it as a large image for efficiency if len(small_images) == 1: large_images.append(small_images.pop()) # Create batch download tasks for images_batch in batched(small_images, batch_size=batch_size): task = _download_project_items_batch_async( api=api, dataset_id=dataset_id, img_infos=images_batch, meta=meta, dataset_fs=dataset_fs, id_to_tagmeta=id_to_tagmeta, semaphore=semaphore, save_images=save_images, save_image_info=save_image_info, only_image_tags=only_image_tags, progress_cb=ds_progress, ) tasks.append(task) # Create individual download tasks for large images for image in large_images: task = _download_project_item_async( api=api, img_info=image, meta=meta, dataset_fs=dataset_fs, id_to_tagmeta=id_to_tagmeta, semaphore=semaphore, save_images=save_images, save_image_info=save_image_info, only_image_tags=only_image_tags, progress_cb=ds_progress, ) tasks.append(task) created_tasks = await run_tasks_with_delay(tasks) await asyncio.gather(*created_tasks) if save_image_meta: meta_dir = dataset_fs.meta_dir for image_info in dataset_images: if image_info.meta: sly.fs.mkdir(meta_dir) sly.json.dump_json_file( image_info.meta, dataset_fs.get_item_meta_path(image_info.name) ) # delete redundant items items_names_set = set([img.name for img in dataset_images]) for item_name in dataset_fs.get_items_names(): if item_name not in items_names_set: dataset_fs.delete_item(item_name) if not skip_create_readme: try: if download_blob_files: project_info = api.project.get_info_by_id(project_id) create_blob_readme(project_fs=project_fs, project_info=project_info, api=api) else: create_readme(dest_dir, project_id, api) except Exception as e: logger.info(f"There was an error while creating README: {e}") async def _download_project_item_async( api: sly.Api, img_info: sly.ImageInfo, meta: ProjectMeta, dataset_fs: Dataset, id_to_tagmeta: Dict[int, sly.TagMeta], semaphore: asyncio.Semaphore, save_images: bool, save_image_info: bool, only_image_tags: bool, progress_cb: Optional[Callable], ) -> None: """Download image and annotation from Supervisely API and save it to the local filesystem. Uses parameters from the parent function _download_project_async. """ if save_images: logger.debug( f"Downloading 1 image in single mode with _download_project_item_async. ID: {img_info.id}, Name: {img_info.name}" ) img_bytes = await api.image.download_bytes_single_async( img_info.id, semaphore=semaphore, check_hash=True ) if None in [img_info.height, img_info.width]: width, height = sly.image.get_size_from_bytes(img_bytes) img_info = img_info._replace(height=height, width=width) else: img_bytes = None if only_image_tags is False: ann_info = await api.annotation.download_async( img_info.id, semaphore=semaphore, force_metadata_for_links=not save_images, ) ann_json = ann_info.annotation try: tmp_ann = Annotation.from_json(ann_json, meta) except Exception: logger.error(f"Error while deserializing annotation for image with ID: {img_info.id}") raise if None in tmp_ann.img_size: tmp_ann = tmp_ann.clone(img_size=(img_info.height, img_info.width)) ann_json = tmp_ann.to_json() else: tags = TagCollection.from_api_response( img_info.tags, meta.tag_metas, id_to_tagmeta, ) tmp_ann = Annotation(img_size=(img_info.height, img_info.width), img_tags=tags) ann_json = tmp_ann.to_json() dataset_fs.delete_item(img_info.name) await dataset_fs.add_item_raw_bytes_async( item_name=img_info.name, item_raw_bytes=img_bytes if save_images is True else None, ann=ann_json, img_info=img_info if save_image_info is True else None, ) if progress_cb is not None: progress_cb(1) logger.debug(f"Single project item has been downloaded. Semaphore state: {semaphore._value}") async def _download_project_items_batch_async( api: sly.Api, dataset_id: int, img_infos: List[sly.ImageInfo], meta: ProjectMeta, dataset_fs: Dataset, id_to_tagmeta: Dict[int, sly.TagMeta], semaphore: asyncio.Semaphore, save_images: bool, save_image_info: bool, only_image_tags: bool, progress_cb: Optional[Callable], ): """ Download images and annotations from Supervisely API and save them to the local filesystem. Uses parameters from the parent function _download_project_async. It is used for batch download of images and annotations with the bulk download API methods. """ if save_images: img_ids = [img_info.id for img_info in img_infos] imgs_bytes = [None] * len(img_ids) temp_dict = {} logger.debug( f"Downloading {len(img_ids)} images in bulk with _download_project_items_batch_async" ) async for img_id, img_bytes in api.image.download_bytes_generator_async( dataset_id, img_ids, semaphore=semaphore, check_hash=True, ): temp_dict[img_id] = img_bytes # to be sure that the order is correct for idx, img_id in enumerate(img_ids): imgs_bytes[idx] = temp_dict[img_id] for img_info, img_bytes in zip(img_infos, imgs_bytes): if None in [img_info.height, img_info.width]: width, height = sly.image.get_size_from_bytes(img_bytes) img_info = img_info._replace(height=height, width=width) else: img_ids = [img_info.id for img_info in img_infos] imgs_bytes = [None] * len(img_infos) if only_image_tags is False: ann_infos = await api.annotation.download_bulk_async( dataset_id, img_ids, semaphore=semaphore, force_metadata_for_links=not save_images, ) ann_jsons = [] for img_info, ann_info in zip(img_infos, ann_infos): try: tmp_ann = Annotation.from_json(ann_info.annotation, meta) if None in tmp_ann.img_size: tmp_ann = tmp_ann.clone(img_size=(img_info.height, img_info.width)) ann_jsons.append(tmp_ann.to_json()) except Exception: logger.error( f"Error while deserializing annotation for image with ID: {img_info.id}" ) raise else: ann_jsons = [] for img_info in img_infos: tags = TagCollection.from_api_response( img_info.tags, meta.tag_metas, id_to_tagmeta, ) tmp_ann = Annotation(img_size=(img_info.height, img_info.width), img_tags=tags) ann_jsons.append(tmp_ann.to_json()) for img_info, ann_json, img_bytes in zip(img_infos, ann_jsons, imgs_bytes): dataset_fs.delete_item(img_info.name) await dataset_fs.add_item_raw_bytes_async( item_name=img_info.name, item_raw_bytes=img_bytes, ann=dataset_fs.get_ann(img_info.name, meta) if ann_json is None else ann_json, img_info=img_info if save_image_info is True else None, ) if progress_cb is not None: progress_cb(1) logger.debug(f"Batch of project items has been downloaded. Semaphore state: {semaphore._value}") DatasetDict = Project.DatasetDict