Source code for supervisely.project.pointcloud_episode_project

# coding: utf-8

# docs
from __future__ import annotations

import os
import random
from typing import Callable, Dict, List, NamedTuple, Optional, Tuple, Union
from uuid import UUID

from tqdm import tqdm

import supervisely.imaging.image as sly_image
from supervisely._utils import batched
from supervisely.api.api import Api
from supervisely.api.module_api import ApiField
from supervisely.api.pointcloud.pointcloud_api import PointcloudInfo
from supervisely.collection.key_indexed_collection import KeyIndexedCollection
from supervisely.io.fs import dir_exists, get_file_name, list_files, mkdir, touch
from supervisely.io.json import dump_json_file, load_json_file
from supervisely.pointcloud_annotation.constants import OBJECT_KEY
from supervisely.pointcloud_annotation.pointcloud_episode_annotation import (
    PointcloudEpisodeAnnotation,
)
from supervisely.project.pointcloud_project import PointcloudDataset, PointcloudProject
from supervisely.project.project import OpenMode
from supervisely.project.project import read_single_project as read_project_wrapper
from supervisely.project.project_meta import ProjectMeta
from supervisely.project.project_type import ProjectType
from supervisely.sly_logger import logger
from supervisely.task.progress import Progress, tqdm_sly
from supervisely.video_annotation.frame import Frame
from supervisely.video_annotation.key_id_map import KeyIdMap

KITTI_ITEM_DIR_NAME = "velodyne"


class EpisodeItemPaths(NamedTuple):
    """Paths and frame index for a point cloud item inside an episode dataset."""

    #: str: Full pointcloud file path of item
    pointcloud_path: str

    #: str: Path to related images directory of item
    related_images_dir: str

    #: int: Index of frame in episode annotation of dataset
    frame_index: int


class EpisodeItemInfo(NamedTuple):
    """Basic info about a point cloud item and where it is stored in an episode dataset."""

    #: str: Item's dataset name
    dataset_name: str

    #: str: Item name
    name: str

    #: str: Full pointcloud file path of item
    pointcloud_path: str

    #: str: Path to related images directory of item
    related_images_dir: str

    #: int: Index of frame in episode annotation of dataset
    frame_index: int


[docs] class PointcloudEpisodeDataset(PointcloudDataset): """Dataset directory for point cloud episode frames with a single shared episode annotation.""" #: str: Items data directory name item_dir_name = "pointcloud" #: str: Annotations directory name ann_dir_name = "ann" #: str: Items info directory name item_info_dir_name = "pointcloud_info" #: str: Related images directory name related_images_dir_name = "related_images" #: str: Segmentation masks directory name seg_dir_name = None item_info_type = PointcloudInfo annotation_class = PointcloudEpisodeAnnotation @property def ann_dir(self) -> None: raise NotImplementedError( f"{type(self).__name__} object don't have correct path for 'ann_dir' property. \ Use 'get_ann_path()' method instead of this." ) def get_item_paths(self, item_name: str) -> EpisodeItemPaths: return EpisodeItemPaths( pointcloud_path=self.get_pointcloud_path(item_name), related_images_dir=self.get_related_images_path(item_name), frame_index=self.get_frame_idx(item_name), ) def get_ann_path(self) -> str: return os.path.join(self.directory, "annotation.json")
[docs] def get_ann( self, project_meta: ProjectMeta, key_id_map: Optional[KeyIdMap] = None ) -> PointcloudEpisodeAnnotation: """ Read pointcloud annotation of item from json. :param item_name: Pointcloud name. :type item_name: str :param project_meta: Project Meta. :type project_meta: :class:`~supervisely.project.project_meta.ProjectMeta` :param key_id_map: :class:`~supervisely.video_annotation.key_id_map.KeyIdMap` object. :type key_id_map: :class:`~supervisely.video_annotation.key_id_map.KeyIdMap`, optional :returns: Pointcloud episode annotation object :rtype: :class:`~supervisely.pointcloud_annotation.pointcloud_episode_annotation.PointcloudEpisodeAnnotation` :raises RuntimeError: if item not found in the project :Usage Example: .. code-block:: python import supervisely as sly project_path = "/home/admin/work/supervisely/projects/pointcloud_project" project = sly.PointcloudProject(project_path, sly.OpenMode.READ) ds = project.datasets.get('ds1') annotation = ds.get_ann("PTC_0056") # Output: RuntimeError: Item PTC_0056 not found in the project. annotation = ds.get_ann("PTC_0056.pcd") print(type(annotation).__name__) # Output: PointcloudEpisodeAnnotation """ ann_path = self.get_ann_path() return self.annotation_class.load_json_file(ann_path, project_meta, key_id_map)
def get_ann_frame(self, item_name: str, annotation: PointcloudEpisodeAnnotation) -> Frame: frame_idx = self.get_frame_idx(item_name) if frame_idx is None: raise ValueError(f"Frame wasn't assigned to pointcloud with name {item_name}.") return annotation.frames.get(frame_idx) def get_frame_pointcloud_map_path(self) -> str: return os.path.join(self.directory, "frame_pointcloud_map.json") def set_ann(self, ann: PointcloudEpisodeAnnotation) -> None: if type(ann) is not self.annotation_class: raise TypeError( f"Type of 'ann' should be {self.annotation_class.__name__}, not a {type(ann).__name__}" ) dst_ann_path = self.get_ann_path() dump_json_file(ann.to_json(), dst_ann_path) def _create(self): mkdir(self.item_dir) def _read(self): if not dir_exists(self.item_dir): message = f"Cannot read dataset '{self.name}': '{self.item_dir}' directory not found" if dir_exists(os.path.join(self.directory, KITTI_ITEM_DIR_NAME)): message = f"Cannot read dataset '{self.name}'. The item directory '{KITTI_ITEM_DIR_NAME}' was found. This appears to be a KITTI dataset and will be skipped." raise NotADirectoryError(message) try: item_paths = sorted(list_files(self.item_dir, filter_fn=self._has_valid_ext)) item_names = sorted([os.path.basename(path) for path in item_paths]) map_file_path = self.get_frame_pointcloud_map_path() if os.path.isfile(map_file_path): self._frame_to_pc_map = load_json_file(map_file_path) else: self._frame_to_pc_map = { frame_index: item_names[frame_index] for frame_index in range(len(item_names)) } self._pc_to_frame = {v: k for k, v in self._frame_to_pc_map.items()} self._item_to_ann = {name: self._pc_to_frame[name] for name in item_names} except Exception as ex: raise Exception(f"Cannot read dataset ({self.name}): {repr(ex)}")
[docs] def add_item_file( self, item_name: str, item_path: str, frame: Optional[Union[str, int]] = None, _validate_item: Optional[bool] = True, _use_hardlink: Optional[bool] = False, item_info: Optional[NamedTuple] = None, ) -> None: """ Adds given item file to dataset items directory, and adds given annotation to dataset ann directory. if ann is None, creates empty annotation file. :param item_name: Item name. :type item_name: str :param item_path: Path to the item. :type item_path: str :param frame: Frame number. :type frame: str or int, optional :param _validate_item: Checks input files format. :type _validate_item: bool, optional :param _use_hardlink: If True creates a hardlink pointing to src named dst, otherwise don't. :type _use_hardlink: bool, optional :param item_info: NamedTuple ImageInfo containing information about pointcloud. :type item_info: NamedTuple, optional :returns: None :rtype: None :raises Exception: if item_name already exists in dataset or item name has unsupported extension. :Usage Example: .. code-block:: python import supervisely as sly dataset_path = "/home/admin/work/supervisely/projects/episodes_project/episode_0" ds = sly.PointcloudEpisodeDataset(dataset_path, sly.OpenMode.READ) ds.add_item_file("PTC_777.pcd", "/home/admin/work/supervisely/projects/episodes_project/episode_0/pointcloud/PTC_777.pcd", frame=3) """ if item_path is None and item_info is None: raise RuntimeError("No item_path or ann or item_info provided.") self._add_item_file( item_name, item_path, _validate_item=_validate_item, _use_hardlink=_use_hardlink, ) self._add_ann_by_type(item_name, frame) self._add_item_info(item_name, item_info)
def get_classes_stats( self, project_meta: Optional[ProjectMeta] = None, return_objects_count: Optional[bool] = True, return_figures_count: Optional[bool] = True, return_items_count: Optional[bool] = True, ): if project_meta is None: project = PointcloudEpisodeProject(self.project_dir, OpenMode.READ) project_meta = project.meta class_items = {} class_objects = {} class_figures = {} for obj_class in project_meta.obj_classes: class_items[obj_class.name] = 0 class_objects[obj_class.name] = 0 class_figures[obj_class.name] = 0 episode_ann: PointcloudEpisodeAnnotation = self.get_ann(project_meta) for ann_obj in episode_ann.objects: class_objects[ann_obj.obj_class.name] += 1 for item_name in self: frame_index = self.get_frame_idx(item_name) item_figures = episode_ann.get_figures_on_frame(frame_index) item_class = {} for ptc_figure in item_figures: class_figures[ptc_figure.parent_object.obj_class.name] += 1 item_class[ptc_figure.parent_object.obj_class.name] = True for obj_class in project_meta.obj_classes: if obj_class.name in item_class.keys(): class_items[obj_class.name] += 1 result = {} if return_items_count: result["items_count"] = class_items if return_objects_count: result["objects_count"] = class_objects if return_figures_count: result["figures_count"] = class_figures return result def _add_ann_by_type(self, item_name, frame): if frame is None: self._item_to_ann[item_name] = "" elif isinstance(frame, int): self._item_to_ann[item_name] = str(frame) elif type(frame) is str: self._item_to_ann[item_name] = frame else: raise TypeError("Unsupported type {!r} for ann argument".format(type(frame))) def get_frame_idx(self, item_name: str) -> int: frame = self._item_to_ann.get(item_name, None) if frame is None: raise RuntimeError("Item {} not found in the project.".format(item_name)) if self._item_to_ann[item_name] == "": return None return int(self._item_to_ann[item_name])
[docs] class PointcloudEpisodeProject(PointcloudProject): """Local Supervisely project for point cloud episodes (sequences of point clouds with shared timeline).""" dataset_class = PointcloudEpisodeDataset
[docs] class DatasetDict(KeyIndexedCollection): """Collection of :class:`~supervisely.project.pointcloud_episode_project.PointcloudEpisodeDataset` by name.""" item_type = PointcloudEpisodeDataset
def __init__(self, directory: str, mode: OpenMode): """ PointcloudEpisodeProject is a parent directory for pointcloud episode datasets. PointcloudEpisodeProject object is immutable. :param directory: Path to pointcloud episode project directory. :type directory: str :param mode: Determines working mode for the given project. :type mode: :class:`~supervisely.project.project.OpenMode` :Usage Example: .. code-block:: python import supervisely as sly project_path = "/home/admin/work/supervisely/projects/ptc_episode_project" project = sly.PointcloudEpisodeProject(project_path, sly.OpenMode.READ) """ super().__init__(directory, mode) @classmethod def read_single(cls, dir) -> PointcloudEpisodeProject: return read_project_wrapper(dir, cls) def get_classes_stats( self, dataset_names: Optional[List[str]] = None, return_objects_count: Optional[bool] = True, return_figures_count: Optional[bool] = True, return_items_count: Optional[bool] = True, ): return super(PointcloudEpisodeProject, self).get_classes_stats( dataset_names, return_objects_count, return_figures_count, return_items_count ) @property def type(self) -> str: """ Project type. :returns: Project type. :rtype: str :Usage Example: .. code-block:: python import supervisely as sly project = sly.PointcloudEpisodeProject("/home/admin/work/supervisely/projects/pointcloud_episode_project", sly.OpenMode.READ) print(project.type) # Output: 'point_cloud_episodes' """ return ProjectType.POINT_CLOUD_EPISODES.value
[docs] @staticmethod def get_train_val_splits_by_count( project_dir: str, train_count: int, val_count: int ) -> Tuple[List[EpisodeItemInfo], List[EpisodeItemInfo]]: """ Get train and val items information from project by given train and val counts. :param project_dir: Path to project directory. :type project_dir: str :param train_count: Number of train items. :type train_count: int :param val_count: Number of val items. :type val_count: int :raises ValueError: if total_count != train_count + val_count :returns: Tuple with lists of train items information and val items information :rtype: :class:`Tuple[List[EpisodeItemInfo], List[EpisodeItemInfo]]` :Usage Example: .. code-block:: python import supervisely as sly project_path = "/home/admin/work/supervisely/projects/pointcloud_project" project = sly.PointcloudEpisodeProject(project_path, sly.OpenMode.READ) train_count = 16 val_count = 4 train_items, val_items = project.get_train_val_splits_by_count(project_path, train_count, val_count) """ def _list_items_for_splits(project) -> List[EpisodeItemInfo]: items = [] for dataset in project.datasets: dataset: PointcloudEpisodeDataset for item_name in dataset: items.append( EpisodeItemInfo( dataset_name=dataset.name, name=item_name, pointcloud_path=dataset.get_pointcloud_path(item_name), related_images_dir=dataset.get_related_images_path(item_name), frame_index=dataset.get_frame_idx(item_name), ) ) return items project = PointcloudEpisodeProject(project_dir, OpenMode.READ) if project.total_items != train_count + val_count: raise ValueError("total_count != train_count + val_count") all_items = _list_items_for_splits(project) random.shuffle(all_items) train_items = all_items[:train_count] val_items = all_items[train_count:] return train_items, val_items
[docs] @staticmethod def get_train_val_splits_by_tag( project_dir: str, train_tag_name: str, val_tag_name: str, untagged: Optional[str] = "ignore", ) -> Tuple[List[EpisodeItemInfo], List[EpisodeItemInfo]]: """ Get train and val items information from project by given train and val tags names. :param project_dir: Path to project directory. :type project_dir: str :param train_tag_name: Train tag name. :type train_tag_name: str :param val_tag_name: Val tag name. :type val_tag_name: str :param untagged: Actions in case of absence of train_tag_name and val_tag_name in project. :type untagged: str, optional :raises ValueError: if untagged not in ["ignore", "train", "val"] :returns: Tuple with lists of train items information and val items information :rtype: :class:`Tuple[List[EpisodeItemInfo], List[EpisodeItemInfo]]` :Usage Example: .. code-block:: python import supervisely as sly project_path = "/home/admin/work/supervisely/projects/pointcloud_project" project = sly.PointcloudEpisodeProject(project_path, sly.OpenMode.READ) train_tag_name = 'train' val_tag_name = 'val' train_items, val_items = project.get_train_val_splits_by_tag(project_path, train_tag_name, val_tag_name) """ untagged_actions = ["ignore", "train", "val"] if untagged not in untagged_actions: raise ValueError( f"Unknown untagged action {untagged}. Should be one of {untagged_actions}" ) project = PointcloudEpisodeProject(project_dir, OpenMode.READ) train_items = [] val_items = [] for dataset in project.datasets: ann = dataset.get_ann(project.meta) for item_name in dataset: item_paths = dataset.get_item_paths(item_name) frame_idx = dataset.get_frame_idx(item_name) info = EpisodeItemInfo( dataset_name=dataset.name, name=item_name, pointcloud_path=item_paths.pointcloud_path, related_images_dir=item_paths.related_images_dir, frame_index=frame_idx, ) frame_tags = ann.get_tags_on_frame(frame_idx) if frame_tags.get(train_tag_name) is not None: train_items.append(info) if frame_tags.get(val_tag_name) is not None: val_items.append(info) if frame_tags.get(train_tag_name) is None and frame_tags.get(val_tag_name) is None: # untagged item if untagged == "ignore": continue elif untagged == "train": train_items.append(info) elif untagged == "val": val_items.append(info) return train_items, val_items
[docs] @staticmethod def get_train_val_splits_by_dataset( project_dir: str, train_datasets: List[str], val_datasets: List[str] ) -> Tuple[List[EpisodeItemInfo], List[EpisodeItemInfo]]: """ Get train and val items information from project by given train and val datasets names. :param project_dir: Path to project directory. :type project_dir: str :param train_datasets: List of train datasets names. :type train_datasets: List[str] :param val_datasets: List of val datasets names. :type val_datasets: List[str] :raises KeyError: if dataset name not found in project :returns: Tuple with lists of train items information and val items information :rtype: :class:`Tuple[List[EpisodeItemInfo], List[EpisodeItemInfo]]` :Usage Example: .. code-block:: python import supervisely as sly project_path = "/home/admin/work/supervisely/projects/pointcloud_project" project = sly.PointcloudEpisodeProject(project_path, sly.OpenMode.READ) train_datasets = ['ds1', 'ds2'] val_datasets = ['ds3', 'ds4'] train_items, val_items = project.get_train_val_splits_by_dataset(project_path, train_datasets, val_datasets) """ def _add_items_to_list(project, datasets_names, items_list): for dataset_name in datasets_names: dataset = project.datasets.get(dataset_name) if dataset is None: raise KeyError(f"Dataset '{dataset_name}' not found") for item_name in dataset: item_paths = dataset.get_item_paths(item_name) frame_idx = dataset.get_frame_idx(item_name) info = EpisodeItemInfo( dataset_name=dataset.name, name=item_name, pointcloud_path=item_paths.pointcloud_path, related_images_dir=item_paths.related_images_dir, frame_index=frame_idx, ) items_list.append(info) project = PointcloudEpisodeProject(project_dir, OpenMode.READ) train_items = [] _add_items_to_list(project, train_datasets, train_items) val_items = [] _add_items_to_list(project, val_datasets, val_items) return train_items, val_items
[docs] @staticmethod def get_train_val_splits_by_collections( project_dir: str, train_collections: List[int], val_collections: List[int], project_id: int, api: Api, ) -> None: """ Not available for PointcloudEpisodeProject class. :raises NotImplementedError: in all cases. """ raise NotImplementedError( f"Static method 'get_train_val_splits_by_collections()' is not supported for PointcloudEpisodeProject class now." )
[docs] @staticmethod def download( api: Api, project_id: int, dest_dir: str, dataset_ids: Optional[List[int]] = None, download_pointclouds: Optional[bool] = True, download_related_images: Optional[bool] = True, download_pointclouds_info: Optional[bool] = False, batch_size: Optional[int] = 10, log_progress: bool = True, progress_cb: Optional[Union[tqdm, Callable]] = None, **kwargs, ) -> None: """ Download pointcloud episodes project from Supervisely to the given directory. :param api: Supervisely API address and token. :type api: :class:`~supervisely.api.api.Api` :param project_id: Supervisely downloadable project ID. :type project_id: int :param dest_dir: Destination directory. :type dest_dir: str :param dataset_ids: Dataset IDs. :type dataset_ids: List[int], optional :param download_pointclouds: Download pointcloud data files or not. :type download_pointclouds: bool, optional :param download_related_images: Download related images or not. :type download_related_images: bool, optional :param download_pointclouds_info: Download pointcloud info .json files or not. :type download_pointclouds_info: bool, optional :param batch_size: The number of images in the batch when they are loaded to a host. :type batch_size: int, optional :param log_progress: Show uploading progress bar. :type log_progress: bool :param progress_cb: Function for tracking download progress. :type progress_cb: tqdm or callable, optional :returns: None :rtype: NoneType :Usage Example: .. code-block:: python import os from dotenv import load_dotenv import supervisely as sly # Load secrets and create API object from .env file (recommended) # Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication if sly.is_development(): load_dotenv(os.path.expanduser("~/supervisely.env")) api = sly.Api.from_env() project_id = 8888 # Local destination project folder save_directory = "/home/admin/work/supervisely/source/ptc_project" # Download Project sly.PointcloudEpisodeProject.download(api, project_id, save_directory) project_fs = sly.PointcloudEpisodeProject(save_directory, sly.OpenMode.READ) """ download_pointcloud_episode_project( api=api, project_id=project_id, dest_dir=dest_dir, dataset_ids=dataset_ids, download_pcd=download_pointclouds, download_related_images=download_related_images, download_annotations=True, download_pointclouds_info=download_pointclouds_info, batch_size=batch_size, log_progress=log_progress, progress_cb=progress_cb, )
[docs] @staticmethod def upload( directory: str, api: Api, workspace_id: int, project_name: Optional[str] = None, log_progress: bool = True, progress_cb: Optional[Union[tqdm, Callable]] = None, ) -> Tuple[int, str]: """ Uploads pointcloud episodes project to Supervisely from the given directory. :param directory: Path to project directory. :type directory: str :param api: Supervisely API address and token. :type api: :class:`~supervisely.api.api.Api` :param workspace_id: Workspace ID, where project will be uploaded. :type workspace_id: int :param project_name: Name of the project in Supervisely. Can be changed if project with the same name is already exists. :type project_name: str, optional :param log_progress: Show uploading progress bar. :type log_progress: bool :param progress_cb: Function for tracking download progress. :type progress_cb: tqdm or callable, optional :returns: Project ID and name. It is recommended to check that returned project name coincides with provided project name. :rtype: int, str :Usage Example: .. code-block:: python import os from dotenv import load_dotenv import supervisely as sly # Load secrets and create API object from .env file (recommended) # Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication if sly.is_development(): load_dotenv(os.path.expanduser("~/supervisely.env")) api = sly.Api.from_env() # Local folder with Pointcloud Episodes Project project_directory = "/home/admin/work/supervisely/source/episodes_project" # Upload Pointcloud Project project_id, project_name = sly.PointcloudEpisodeProject.upload( project_directory, api, workspace_id=45, project_name="My Episodes Project" ) """ return upload_pointcloud_episode_project( directory=directory, api=api, workspace_id=workspace_id, project_name=project_name, log_progress=log_progress, progress_cb=progress_cb, )
@staticmethod async def download_async(*args, **kwargs): raise NotImplementedError( f"Static method 'download_async()' is not supported for PointcloudEpisodeProject class now." ) # ----------------------------------- # # Pointcloud Episodes Data Versioning # # ----------------------------------- # @staticmethod def download_bin(*args, **kwargs): raise NotImplementedError("Data versioning is not supported for PointcloudEpisodeProject.") @staticmethod def upload_bin(*args, **kwargs): raise NotImplementedError("Data versioning is not supported for PointcloudEpisodeProject.") @staticmethod def build_snapshot(*args, **kwargs): raise NotImplementedError("Data versioning is not supported for PointcloudEpisodeProject.") @staticmethod def restore_snapshot(*args, **kwargs): raise NotImplementedError("Data versioning is not supported for PointcloudEpisodeProject.")
def download_pointcloud_episode_project( api: Api, project_id: int, dest_dir: str, dataset_ids: Optional[List[int]] = None, download_pcd: Optional[bool] = True, download_related_images: Optional[bool] = True, download_annotations: Optional[bool] = True, download_pointclouds_info: Optional[bool] = False, batch_size: Optional[int] = 10, log_progress: bool = True, progress_cb: Optional[Union[tqdm, Callable]] = None, ) -> None: """ Download pointcloud episode project to the local directory. :param api: Supervisely API address and token. :type api: :class:`~supervisely.api.api.Api` :param project_id: Project ID to download. :type project_id: int :param dest_dir: Destination path to local directory. :type dest_dir: str :param dataset_ids: Specified list of Dataset IDs which will be downloaded. Datasets could be downloaded from different projects but with the same data type. :type dataset_ids: List[int], optional :param download_pcd: Include pointcloud episode items in the download. :type download_pcd: bool, optional :param download_related_images: Include related context images in the download. :type download_related_images: bool, optional :param download_annotations: Include annotations in the download. :type download_annotations: bool, optional :param download_pointclouds_info: Include pointclouds info in the download. :type download_pointclouds_info: bool, optional :param batch_size: Size of a downloading batch. :type batch_size: int, optional :param log_progress: Show downloading logs in the output. :type log_progress: bool :param progress_cb: Function for tracking download progress. :type progress_cb: tqdm or callable, optional :returns: None. :rtype: NoneType :Usage Example: .. code-block:: python import os from tqdm import tqdm from dotenv import load_dotenv import supervisely as sly # Load secrets and create API object from .env file (recommended) # Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication if sly.is_development(): load_dotenv(os.path.expanduser("~/supervisely.env")) api = sly.Api.from_env() dest_dir = 'your/local/dest/dir' # Download pointcloud episodes project project_id = 19636 project_info = api.project.get_info_by_id(project_id) num_pointclouds_ep = project_info.items_count p = tqdm(desc="Downloading pointcloud project", total=num_pointclouds_ep) sly.download_pointcloud_episode_project( api, project_id, dest_dir, progress_cb=p, ) """ key_id_map = KeyIdMap() project_fs = PointcloudEpisodeProject(dest_dir, OpenMode.CREATE) meta = ProjectMeta.from_json(api.project.get_meta(project_id)) project_fs.set_meta(meta) if progress_cb is not None: log_progress = False filter_fn = lambda x: True if dataset_ids is not None: filter_fn = lambda ds: ds.id in dataset_ids for parents, dataset in api.dataset.tree(project_id): if not filter_fn(dataset): continue dataset_path = None if parents: dataset_path = "/datasets/".join(parents + [dataset.name]) dataset_fs: PointcloudEpisodeDataset = project_fs.create_dataset( dataset.name, ds_path=dataset_path ) pointclouds = api.pointcloud_episode.get_list(dataset.id) # Download annotation to project_path/dataset_path/annotation.json if download_annotations is True: ann_json = api.pointcloud_episode.annotation.download(dataset.id) annotation = dataset_fs.annotation_class.from_json(ann_json, meta, key_id_map) dataset_fs.set_ann(annotation) # frames --> pointcloud mapping to project_path/dataset_path/frame_pointcloud_map.json frame_name_map = api.pointcloud_episode.get_frame_name_map(dataset.id) frame_pointcloud_map_path = dataset_fs.get_frame_pointcloud_map_path() dump_json_file(frame_name_map, frame_pointcloud_map_path) ds_progress = progress_cb if log_progress: ds_progress = tqdm_sly( desc="Downloading episodes from: {!r}".format(dataset.name), total=len(pointclouds), ) for batch in batched(pointclouds, batch_size=batch_size): pointcloud_ids = [pointcloud_info.id for pointcloud_info in batch] pointcloud_names = [pointcloud_info.name for pointcloud_info in batch] map_file_path = dataset_fs.get_frame_pointcloud_map_path() frame_to_pc_map = load_json_file(map_file_path) pc_to_frame = {v: k for k, v in frame_to_pc_map.items()} item_to_ann = {name: pc_to_frame[name] for name in pointcloud_names} batch_rimg_figures = {} if download_related_images: try: rimgs = api.pointcloud_episode.get_list_related_images_batch( dataset.id, pointcloud_ids ) if len(rimgs) > 0: rimg_ids = [rimg[ApiField.ID] for rimg in rimgs] batch_rimg_figures = api.image.figure.download( dataset_id=dataset.id, image_ids=rimg_ids ) else: batch_rimg_figures = [] except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project_id, "dataset_id": dataset.id, "pointcloud_ids": pointcloud_ids, }, ) raise e for pointcloud_id, pointcloud_name, pointcloud_info in zip( pointcloud_ids, pointcloud_names, batch ): pointcloud_file_path = dataset_fs.generate_item_path(pointcloud_name) if download_pcd: try: api.pointcloud_episode.download_path(pointcloud_id, pointcloud_file_path) except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project_id, "dataset_id": dataset.id, "pointcloud_id": pointcloud_id, "pointcloud_name": pointcloud_name, "pointcloud_file_path": pointcloud_file_path, }, ) raise e else: touch(pointcloud_file_path) if download_related_images: related_images_path = dataset_fs.get_related_images_path(pointcloud_name) try: related_images = api.pointcloud_episode.get_list_related_images( pointcloud_id ) except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project_id, "dataset_id": dataset.id, "pointcloud_id": pointcloud_id, "pointcloud_name": pointcloud_name, }, ) raise e for rimage_info in related_images: name = rimage_info[ApiField.NAME] if not sly_image.has_valid_ext(name): new_name = get_file_name(name) # to fix cases like .png.json if sly_image.has_valid_ext(new_name): name = new_name rimage_info[ApiField.NAME] = name else: raise RuntimeError( "Something wrong with photo context filenames.\ Please, contact support" ) rimage_id = rimage_info[ApiField.ID] path_img = os.path.join(related_images_path, name) path_json = os.path.join(related_images_path, name + ".json") path_figures = os.path.join(related_images_path, name + ".figures.json") try: api.pointcloud_episode.download_related_image(rimage_id, path_img) except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project_id, "dataset_id": dataset.id, "pointcloud_id": pointcloud_id, "pointcloud_name": pointcloud_name, "rimage_id": rimage_id, "path_img": path_img, }, ) raise e dump_json_file(rimage_info, path_json) try: if rimage_id in batch_rimg_figures: rimg_figures = batch_rimg_figures[rimage_id] rimg_figures_json = [] for fig in rimg_figures: fig_json = fig.to_json() if ApiField.OBJECT_ID in fig_json: fig_json[OBJECT_KEY] = str( key_id_map.get_object_key(fig_json[ApiField.OBJECT_ID]) ) fig_json.pop(ApiField.OBJECT_ID, None) rimg_figures_json.append(fig_json) else: raise RuntimeError(f"Figure {fig} has no object id") dump_json_file(rimg_figures_json, path_figures) except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project_id, "dataset_id": dataset.id, "pointcloud_id": pointcloud_id, "pointcloud_name": pointcloud_name, "rimage_id": rimage_id, "path_figures": path_figures, }, ) raise e pointcloud_info = pointcloud_info._asdict() if download_pointclouds_info else None try: dataset_fs.add_item_file( pointcloud_name, pointcloud_file_path, item_to_ann[pointcloud_name], _validate_item=False, item_info=pointcloud_info, ) except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project_id, "dataset_id": dataset.id, "pointcloud_id": pointcloud_id, "pointcloud_name": pointcloud_name, "pointcloud_file_path": pointcloud_file_path, "item_info": pointcloud_info, }, ) raise e if log_progress is True or progress_cb is not None: ds_progress(1) project_fs.set_key_id_map(key_id_map) def upload_pointcloud_episode_project( directory: str, api: Api, workspace_id: int, project_name: Optional[str] = None, log_progress: bool = True, progress_cb: Optional[Union[tqdm, Callable]] = None, ) -> Tuple[int, str]: # STEP 0 - create project remotely project_fs = PointcloudEpisodeProject.read_single(directory) project_name = project_fs.name if project_name is None else project_name if api.project.exists(workspace_id, project_name): project_name = api.project.get_free_name(workspace_id, project_name) project = api.project.create(workspace_id, project_name, ProjectType.POINT_CLOUD_EPISODES) api.project.update_meta(project.id, project_fs.meta.to_json()) if progress_cb is not None: log_progress = False name_to_dsinfo = {} key_id_map = KeyIdMap() for dataset_fs in sorted(project_fs.datasets, key=lambda ds: len(ds.parents)): dataset_fs: PointcloudEpisodeDataset ann_json_path = dataset_fs.get_ann_path() if os.path.isfile(ann_json_path): ann_json = load_json_file(ann_json_path) episode_annotation = PointcloudEpisodeAnnotation.from_json(ann_json, project_fs.meta) else: episode_annotation = PointcloudEpisodeAnnotation() # removesuffix workaround for python < 3.9 parent_path = dataset_fs.name[: -len(dataset_fs.short_name)].rstrip("/") parent_info = name_to_dsinfo.get(parent_path) parent_id = parent_info.id if parent_info else None dataset = api.dataset.create( project.id, dataset_fs.short_name, description=episode_annotation.description, change_name_if_conflict=True, parent_id=parent_id, ) name_to_dsinfo[dataset_fs.name] = dataset # STEP 1 - upload episodes items_infos = {"names": [], "paths": [], "metas": []} for item_name in dataset_fs: item_path, related_images_dir, frame_idx = dataset_fs.get_item_paths(item_name) item_meta = {"frame": frame_idx} items_infos["names"].append(item_name) items_infos["paths"].append(item_path) items_infos["metas"].append(item_meta) if not items_infos["names"]: logger.info(f"Dataset {dataset.name} has no items, skipping upload") continue ds_progress = progress_cb if log_progress: ds_progress = tqdm_sly( desc="Uploading clouds to {!r}".format(dataset.name), total=len(dataset_fs), ) try: pcl_infos = api.pointcloud_episode.upload_paths( dataset.id, names=items_infos["names"], paths=items_infos["paths"], metas=items_infos["metas"], progress_cb=ds_progress, ) except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project.id, "dataset_id": dataset.id, "item_names": items_infos["names"], "item_paths": items_infos["paths"], "item_metas": items_infos["metas"], }, ) raise e # STEP 2 - upload annotations frame_to_pcl_ids = {pcl_info.frame: pcl_info.id for pcl_info in pcl_infos} try: api.pointcloud_episode.annotation.append( dataset.id, episode_annotation, frame_to_pcl_ids, key_id_map ) except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project.id, "dataset_id": dataset.id, "frame_to_pcl_ids": frame_to_pcl_ids, "ann": episode_annotation.to_json(), }, ) raise e # STEP 3 - upload photo context img_infos = {"img_paths": [], "img_metas": []} # STEP 3.1 - upload images pcl_to_rimg_figures: Dict[int, Dict[str, List[Dict]]] = {} pcl_to_hash_to_id: Dict[int, Dict[str, int]] = {} for pcl_info in pcl_infos: related_items = dataset_fs.get_related_images(pcl_info.name) images_paths_for_frame = [img_path for img_path, _ in related_items] img_infos["img_paths"].extend(images_paths_for_frame) rltd_progress = None if log_progress or progress_cb is not None: rltd_progress = tqdm_sly( desc="Uploading photo context to {!r}".format(dataset.name), total=len(img_infos["img_paths"]), leave=False, ) try: images_hashes = api.pointcloud_episode.upload_related_images( img_infos["img_paths"], progress_cb=rltd_progress, ) except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project.id, "dataset_id": dataset.id, "img_paths": img_infos["img_paths"], }, ) raise e # STEP 3.2 - upload images metas images_hashes_iterator = images_hashes.__iter__() for pcl_info in pcl_infos: related_items = dataset_fs.get_related_images(pcl_info.name) for img_ind, (_, meta_json) in enumerate(related_items): img_hash = next(images_hashes_iterator) if "deviceId" not in meta_json[ApiField.META].keys(): meta_json[ApiField.META]["deviceId"] = f"CAM_{str(img_ind).zfill(2)}" img_infos["img_metas"].append( { ApiField.ENTITY_ID: pcl_info.id, ApiField.NAME: meta_json[ApiField.NAME], ApiField.HASH: img_hash, ApiField.META: meta_json[ApiField.META], } ) img_name = meta_json[ApiField.NAME] related_images_dir = dataset_fs.get_related_images_path(pcl_info.name) fig_json_path = os.path.join(related_images_dir, img_name + ".figures.json") if os.path.isfile(fig_json_path): try: figs_json = load_json_file(fig_json_path) pcl_to_rimg_figures.setdefault(pcl_info.id, {})[img_hash] = figs_json except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project.id, "dataset_id": dataset.id, "pointcloud_id": pcl_info.id, "fig_json_path": fig_json_path, }, ) raise e if len(img_infos["img_metas"]) > 0: try: uploaded_rimgs = api.pointcloud_episode.add_related_images(img_infos["img_metas"]) # build mapping hash->id for info, uploaded in zip(img_infos["img_metas"], uploaded_rimgs): img_hash = info.get(ApiField.HASH) img_id = ( uploaded.get(ApiField.ID) if isinstance(uploaded, dict) else getattr(uploaded, "id", None) ) if img_hash is not None and img_id is not None: pcl_to_hash_to_id[img_hash] = img_id except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project.id, "dataset_id": dataset.id, "rimg_infos": img_infos["img_metas"], }, ) raise e for pcl_info in pcl_infos: rimg_figures = pcl_to_rimg_figures.get(pcl_info.id) if not rimg_figures: continue try: for img_hash, figs_json in rimg_figures.items(): if img_hash in pcl_to_hash_to_id: img_id = pcl_to_hash_to_id[img_hash] for fig in figs_json: fig[ApiField.ENTITY_ID] = img_id fig[ApiField.DATASET_ID] = dataset.id fig[ApiField.PROJECT_ID] = project.id fig[ApiField.OBJECT_ID] = key_id_map.get_object_id( UUID(fig[OBJECT_KEY]) ) api.image.figure.create_bulk( figures_json=[fig for figs in rimg_figures.values() for fig in figs], dataset_id=dataset.id, ) except Exception as e: logger.info( "INFO FOR DEBUGGING", extra={ "project_id": project.id, "dataset_id": dataset.id, "pointcloud_id": pcl_info.id, "pointcloud_name": pcl_info.name, "rimg_figures": rimg_figures, }, ) raise e return project.id, project.name