Source code for supervisely.api.pointcloud.pointcloud_figure_api
# coding: utf-8
"""Work with point cloud figures via the Supervisely API."""
from typing import Callable, Dict, List, Optional, Union
from requests_toolbelt import MultipartEncoder
from tqdm import tqdm
from supervisely._utils import batched
from supervisely.api.entity_annotation.figure_api import FigureApi, FigureInfo
from supervisely.api.module_api import ApiField
from supervisely.geometry.constants import INDICES
from supervisely.geometry.pointcloud import Pointcloud
from supervisely.io.fs import decode_uint32_le, encode_uint32_le
from supervisely.pointcloud_annotation.pointcloud_figure import PointcloudFigure
from supervisely.video_annotation.key_id_map import KeyIdMap
[docs]
class PointcloudFigureApi(FigureApi):
"""
API for working with :class:`~supervisely.pointcloud_annotation.pointcloud_figure.PointcloudFigure`.
:class:`~supervisely.api.pointcloud.pointcloud_figure_api.PointcloudFigureApi` object is immutable.
"""
[docs]
def create(
self,
pointcloud_id: int,
object_id: int,
geometry_json: Dict,
geometry_type: str,
track_id: Optional[int] = None,
) -> int:
"""
Create new PointcloudFigure of given point cloud object in point cloud with given ID.
:param pointcloud_id: Point cloud ID in Supervisely.
:type pointcloud_id: int
:param object_id: ID of the object to which the PointcloudFigure belongs.
:type object_id: int
:param geometry_json: Parameters of geometry for :class:`~supervisely.pointcloud_annotation.pointcloud_figure.PointcloudFigure`.
:type geometry_json: dict
:param geometry_type: Type of PointcloudFigure geometry.
:type geometry_type: str
:param track_id: int, optional.
:type track_id: int, optional
:returns: New figure ID
:rtype: int
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
pcd_id = 19618685
object_id = 5565921
geometry_json = {'points': {'exterior': [[500, 500], [1555, 1500]], 'interior': []}}
geometry_type = 'rectangle'
figure_id = api.pointcloud.figure.create(pcd_id, object_id, geometry_json, geometry_type) # 643182610
"""
return super().create(pointcloud_id, object_id, {}, geometry_json, geometry_type, track_id)
[docs]
def append_bulk(
self,
pointcloud_id: int,
figures: List[PointcloudFigure],
key_id_map: KeyIdMap,
) -> None:
"""
Add PointcloudFigures to given point cloud by ID.
:param pointcloud_id: Point cloud ID in Supervisely.
:type pointcloud_id: int
:param figures: List of point cloud figures to append.
:type figures: List[:class:`~supervisely.pointcloud_annotation.pointcloud_figure.PointcloudFigure`]
:param key_id_map: KeyIdMap object.
:type key_id_map: :class:`~supervisely.video_annotation.key_id_map.KeyIdMap`
:returns: None
:rtype: None
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
project_id = 124976
meta_json = api.project.get_meta(project_id)
meta = sly.ProjectMeta.from_json(meta_json)
key_id_map = KeyIdMap()
pcd_id = 198703212
ann_info = api.pointcloud.annotation.download(pcd_id)
ann = sly.PointcloudAnnotation.from_json(ann_info, meta, key_id_map)
figures = ann.figures[:5]
api.video.figure.append_bulk(pcd_id, figures, key_id_map)
"""
regular_keys = []
regular_figures_json = []
pc_keys = []
pc_indices = []
pc_figures_json = []
for figure in figures:
figure_json = figure.to_json(key_id_map)
if figure.geometry.name() == Pointcloud.geometry_name():
pc_keys.append(figure.key())
pc_indices.append(figure.geometry.indices)
figure_json.pop(ApiField.GEOMETRY, None)
pc_figures_json.append(figure_json)
else:
regular_keys.append(figure.key())
regular_figures_json.append(figure_json)
self._append_bulk(pointcloud_id, regular_figures_json, regular_keys, key_id_map)
self._append_bulk(pointcloud_id, pc_figures_json, pc_keys, key_id_map)
figure_ids = [key_id_map.get_figure_id(key) for key in pc_keys]
if len(figure_ids) != 0:
self.upload_indices_batch(figure_ids, pc_indices)
[docs]
def append_to_dataset(
self,
dataset_id: int,
figures: List[PointcloudFigure],
entity_ids: List[int],
key_id_map: KeyIdMap,
) -> None:
"""
Add pointcloud figures to Dataset annotations.
:param dataset_id: Dataset ID in Supervisely.
:type dataset_id: int
:param figures: List of point cloud figures.
:type figures: List[:class:`~supervisely.pointcloud_annotation.pointcloud_figure.PointcloudFigure`]
:param entity_ids: List of point cloud IDs.
:type entity_ids: List[int]
:param key_id_map: KeyIdMap object.
:type key_id_map: :class:`~supervisely.video_annotation.key_id_map.KeyIdMap`, optional
:rtype: None
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
from supervisely.geometry.cuboid_3d import Cuboid3d, Vector3d
from supervisely.pointcloud_annotation.pointcloud_annotation import PointcloudObjectCollection
from supervisely.pointcloud_annotation.pointcloud_figure import PointcloudFigure
from supervisely.video_annotation.key_id_map import KeyIdMap
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
project_id = 17231
dataset_id = 55875
pointcloud_id = 19373403
project = api.project.get_info_by_id(project_id)
dataset = api.dataset.get_info_by_id(dataset_id)
class_car = sly.ObjClass('car', Cuboid3d)
classes = sly.ObjClassCollection([class_car])
project_meta = sly.ProjectMeta(classes)
updated_meta = api.project.update_meta(project.id, project_meta.to_json())
key_id_map = KeyIdMap()
car_object = sly.PointcloudObject(class_car)
objects_collection = PointcloudObjectCollection([car_object])
uploaded_objects_ids = api.pointcloud_episode.object.append_to_dataset(
dataset.id,
objects_collection,
key_id_map,
)
position, rotation, dimension = Vector3d(-32.4, 33.9, -0.7), Vector3d(0., 0, 0.1), Vector3d(1.8, 3.9, 1.6)
cuboid = Cuboid3d(position, rotation, dimension)
figure_1 = PointcloudFigure(car_object, cuboid)
api.pointcloud_episode.figure.append_to_dataset(
dataset.id,
[figure_1],
[pointcloud_id],
key_id_map,
)
"""
regular_keys = []
regular_figures_json = []
pc_keys = []
pc_indices = []
pc_figures_json = []
for figure, entity_id in zip(figures, entity_ids):
figure_json = figure.to_json(key_id_map)
figure_json[ApiField.ENTITY_ID] = entity_id
if figure_json.get(ApiField.GEOMETRY_TYPE) == Pointcloud.geometry_name():
pc_keys.append(figure.key())
pc_indices.append(figure.geometry.indices)
figure_json.pop(ApiField.GEOMETRY, None)
pc_figures_json.append(figure_json)
else:
regular_keys.append(figure.key())
regular_figures_json.append(figure_json)
self._append_bulk(
dataset_id,
regular_figures_json,
regular_keys,
key_id_map,
field_name=ApiField.DATASET_ID,
)
self._append_bulk(
dataset_id, pc_figures_json, pc_keys, key_id_map, field_name=ApiField.DATASET_ID
)
figure_ids = [key_id_map.get_figure_id(key) for key in pc_keys]
if len(figure_ids) != 0:
self.upload_indices_batch(figure_ids, pc_indices)
def _convert_json_info(self, info: dict, skip_missing=True):
return super()._convert_json_info(info, skip_missing)
[docs]
def download(
self,
dataset_id: int,
pointcloud_ids: List[int] = None,
skip_geometry: bool = False,
**kwargs
) -> Dict[int, List[FigureInfo]]:
"""
Method returns a dictionary with pairs of pointcloud ID and list of FigureInfo for the given dataset ID. Can be filtered by pointcloud IDs.
:param dataset_id: Dataset ID in Supervisely.
:type dataset_id: int
:param pointcloud_ids: Specify the list of pointcloud IDs within the given dataset ID. If pointcloud_ids is None, the method returns all possible pairs of images with figures. Note: Consider using `sly.batched()` to ensure that no figures are lost in the response.
:type pointcloud_ids: List[int], optional
:param skip_geometry: Skip the download of figure geometry. May be useful for a significant api request speed increase in the large datasets.
:type skip_geometry: bool
:returns: A dictionary where keys are pointcloud IDs and values are lists of figures.
:rtype: Dict[int, List[:class:`~supervisely.api.entity_annotation.figure_api.FigureInfo`]]
"""
if kwargs.get("image_ids", False) is not False:
pointcloud_ids = kwargs["image_ids"] # backward compatibility
return super().download(dataset_id, pointcloud_ids, skip_geometry)
[docs]
def upload_indices_batch(self, figure_ids: List[int], indices_batch: List[List[int]]) -> None:
"""
Upload point cloud figure geometry as raw little-endian uint32 index data to storage.
:param figure_ids: List of figure IDs in Supervisely.
:type figure_ids: List[int]
:param indices_batch: Point indices per figure, aligned with ``figure_ids``.
:type indices_batch: List[List[int]]
:returns: None
:rtype: None
"""
if len(figure_ids) != len(indices_batch):
raise ValueError(
f"figure_ids and indices_batch must have the same length: "
f"{len(figure_ids)} != {len(indices_batch)}."
)
for batch in batched(list(zip(figure_ids, indices_batch)), batch_size=50):
fields = []
for figure_id, indices in batch:
fields.append((ApiField.FIGURE_ID, str(figure_id)))
fields.append(
(
ApiField.GEOMETRY,
(str(figure_id), encode_uint32_le(indices), "application/octet-stream"),
)
)
encoder = MultipartEncoder(fields=fields)
self._api.post("figures.bulk.upload.geometry", encoder)
[docs]
def download_indices_batch(
self,
figure_ids: List[int],
progress_cb: Optional[Union[tqdm, Callable]] = None,
) -> List[List[int]]:
"""
Download point cloud figure geometry stored as raw little-endian uint32 index data.
Progress is updated by one for each downloaded figure geometry.
:param figure_ids: List of figure IDs in Supervisely.
:type figure_ids: List[int]
:param progress_cb: Progress bar or callback to track download progress.
:type progress_cb: Union[tqdm, Callable], optional
:returns: Point indices per figure, aligned with ``figure_ids``.
:rtype: List[List[int]]
"""
geometries = {}
for figure_id, part in self._download_geometries_generator(figure_ids):
geometries[figure_id] = decode_uint32_le(part.content)
if progress_cb is not None:
if hasattr(progress_cb, "update") and callable(getattr(progress_cb, "update")):
progress_cb.update(1)
elif callable(progress_cb):
progress_cb(1)
if len(geometries) != len(figure_ids):
raise RuntimeError("Not all point cloud geometries were downloaded")
return [geometries[figure_id] for figure_id in figure_ids]
[docs]
def inject_geometries_into_annotations(self, anns_json: List[Dict]) -> List[Dict]:
"""
Patch ``point_cloud`` figures whose index geometry is stored separately (not inline)
by downloading the indices and writing them back into the annotation JSON in-place.
Figures that already carry inline indices (old format) are left untouched, so both
the old and the new server formats are supported.
:param anns_json: List of annotation JSONs (each with ``frames`` -> ``figures``).
:type anns_json: List[dict]
:returns: The same list with separately stored geometries injected.
:rtype: List[dict]
"""
refs = []
for ann in anns_json:
if not isinstance(ann, dict):
continue
for figure_json in self._iter_pointcloud_figures(ann):
if figure_json.get(ApiField.GEOMETRY_TYPE) != Pointcloud.geometry_name():
continue
if self._extract_indices(figure_json.get(ApiField.GEOMETRY)) is not None:
continue
figure_id = figure_json.get(ApiField.ID)
if figure_id is None:
continue
refs.append((figure_id, figure_json))
if len(refs) == 0:
return anns_json
figure_ids = [figure_id for figure_id, _ in refs]
indices_batch = self.download_indices_batch(figure_ids)
for (_, figure_json), indices in zip(refs, indices_batch):
figure_json[ApiField.GEOMETRY] = {INDICES: indices}
return anns_json
@staticmethod
def _iter_pointcloud_figures(ann: Dict):
"""
Yield figure-level JSON dicts from a point cloud annotation, supporting both
shapes:
- Single point cloud annotation: figures at top level (``ann["figures"]``).
- Episode annotation: figures inside frames (``ann["frames"][i]["figures"]``).
"""
frames = ann.get(ApiField.FRAMES)
if isinstance(frames, list):
for frame in frames:
if not isinstance(frame, dict):
continue
for figure_json in frame.get(ApiField.FIGURES, []) or []:
if isinstance(figure_json, dict):
yield figure_json
return
for figure_json in ann.get(ApiField.FIGURES, []) or []:
if isinstance(figure_json, dict):
yield figure_json
@staticmethod
def _extract_indices(geometry) -> Optional[List[int]]:
if not isinstance(geometry, dict):
return None
indices = geometry.get(INDICES)
if isinstance(indices, list):
return indices
return None