Source code for supervisely.project.data_version

import io
import os
import tarfile
import tempfile
import time
from datetime import datetime
from typing import List, NamedTuple, Optional, Tuple, Union

import requests
import zstd

from supervisely._utils import logger
from supervisely.api.module_api import ApiField, ModuleApiBase
from supervisely.api.project_api import ProjectInfo
from supervisely.io import json
from supervisely.io.fs import remove_dir, silent_remove
from supervisely.project.versioning.schema_fields import VersionSchemaField
from supervisely.project.versioning.common import (
    DEFAULT_IMAGE_SCHEMA_VERSION,
    DEFAULT_VIDEO_SCHEMA_VERSION,
    DEFAULT_VOLUME_SCHEMA_VERSION,
)


[docs]class VersionInfo(NamedTuple): """ Object with image parameters from Supervisely that describes the version of the project. """ id: int project_id: int created_by: int team_file_id: int version: int description: str status: str created_at: str updated_at: str project_updated_at: str team_id: int name: str
[docs]class DataVersion(ModuleApiBase): """ Class for managing project versions. This class provides methods for creating, restoring, and managing project versions. """ PROJECT_NAME_TEMPLATE = "{project_name}, from ver. {version_num}" PROJECT_DESC_TEMPLATE = ( "Restored from version {version_num}. " "Source project ID: {project_id}, version ID: {version_id}" ) def __init__(self, api): """ Class for managing project versions. """ from supervisely import Api self._api: Api = api self.__storage_dir: str = "/system/versions/" self.__version_format: str = DEFAULT_IMAGE_SCHEMA_VERSION self.project_info = None self.project_dir = None self.versions_path = None self.versions = None self._batch_size = None
[docs] @staticmethod def info_sequence(): """ NamedTuple VersionInfo with API Fields containing information about Project Version. """ return [ ApiField.ID, ApiField.PROJECT_ID, ApiField.CREATED_BY_ID, ApiField.TEAM_FILE_ID, ApiField.VERSION, ApiField.DESCRIPTION, ApiField.STATUS, ApiField.CREATED_AT, ApiField.UPDATED_AT, ApiField.PROJECT_UPDATED_AT, ApiField.TEAM_ID, ApiField.NAME, ]
[docs] @staticmethod def info_tuple_name(): """ NamedTuple name - **VersionInfo**. """ return "VersionInfo"
@property def project_cls(self): from supervisely.project import ( Project, ProjectType, VideoProject, VolumeProject, ) if self.project_info is None: raise ValueError("Project info is not initialized. Call 'initialize' method first.") self._batch_size = None project_type = self.project_info.type if project_type == ProjectType.IMAGES.value: self.__version_format = DEFAULT_IMAGE_SCHEMA_VERSION self._batch_size = 200 return Project elif project_type == ProjectType.VIDEOS.value: self.__version_format = DEFAULT_VIDEO_SCHEMA_VERSION self._batch_size = 50 return VideoProject elif project_type == ProjectType.VOLUMES.value: self.__version_format = DEFAULT_VOLUME_SCHEMA_VERSION self._batch_size = 50 return VolumeProject else: raise ValueError(f"Unsupported project type: {project_type}")
[docs] def initialize(self, project_info: Union[ProjectInfo, int]): """ Initialize project versions. :param project_info: ProjectInfo object or project ID :type project_info: Union[ProjectInfo, int] """ if isinstance(project_info, int): project_info = self._api.project.get_info_by_id(project_info) self.project_info: ProjectInfo = project_info self.project_dir: str = os.path.join(self.__storage_dir, str(self.project_info.id)) self.versions_path: str = os.path.join(self.project_dir, "versions.json") self.versions: dict = self.get_map(self.project_info, do_initialization=False) if self.project_info.version is None: self._create_warning_system_file()
[docs] def get_list(self, project_id: int, filters: Optional[List] = None) -> List[VersionInfo]: """ Get list of project versions. :param project_id: Project ID :type project_id: int :param filters: Filters :type filters: Optional[List] :return: List of project versions :rtype: List[VersionInfo] """ data = {ApiField.PROJECT_ID: project_id} if filters: data[ApiField.FILTER] = filters return self.get_list_all_pages("projects.versions.list", data)
[docs] def get_id_by_number(self, project_id: int, version_num: int) -> int: """ Get version ID by version number. :param project_id: Project ID :type project_id: int :param version_num: Version number :type version_num: int :return: Version ID :rtype: int or None """ filter = [ { ApiField.FIELD: ApiField.VERSION, ApiField.OPERATOR: "=", ApiField.VALUE: int(version_num), } ] versions = self.get_list(project_id, filters=filter) if len(versions) > 0: return versions[0].id return None
[docs] def get_map(self, project_info: Union[ProjectInfo, int], do_initialization: bool = True): """ Get project versions map from storage. :param project_info: ProjectInfo object or project ID :type project_info: Union[ProjectInfo, int] :param do_initialization: Initialize project versions. Set to False for internal use. :type do_initialization: bool :return: Project versions :rtype: dict """ if do_initialization: self.initialize(project_info) try: versions = self._api.file.get_json_file_content( self.project_info.team_id, self.versions_path ) return versions or {} except FileNotFoundError: # versions = {"format": self.__version_format} return {}
[docs] def set_map(self, project_info: Union[ProjectInfo, int], initialize: bool = True): """ Save project versions map to storage. :param project_info: ProjectInfo object or project ID :type project_info: Union[ProjectInfo, int] :param initialize: Initialize project versions. Set to False for internal use. :type initialize: bool :return: None """ if initialize: self.initialize(project_info) if "format" not in self.versions: self.versions["format"] = self.__version_format temp_dir = tempfile.mkdtemp() local_versions = os.path.join(temp_dir, "versions.json") json.dump_json_file(self.versions, local_versions) file_info = self._api.file.upload( self.project_info.team_id, local_versions, self.versions_path ) if file_info is None: raise RuntimeError("Failed to save versions") remove_dir(temp_dir)
[docs] def create( self, project_info: Union[ProjectInfo, int], version_title: Optional[str] = None, version_description: Optional[str] = None, ) -> int: """ Create a new project version. Returns the ID of the new version. If the project is already on the latest version, returns the latest version ID. If the project version cannot be created, returns None. :param project_info: ProjectInfo object or project ID :type project_info: Union[ProjectInfo, int] :param version_title: Version title :type version_title: Optional[str] :param version_description: Version description :type version_description: Optional[str] :return: Version ID :rtype: int """ if isinstance(project_info, int): project_info = self._api.project.get_info_by_id(project_info) if ( "app.supervise.ly" in self._api.server_address or "app.supervisely.com" in self._api.server_address ): if self._api.team.get_info_by_id(project_info.team_id).usage.plan == "free": logger.warning( "Project versioning is not available for teams with Free plan. Please upgrade to Pro to enable versioning." ) return None self.initialize(project_info) path = self._generate_save_path() latest = self._get_latest_id() try: version_id, commit_token = self.reserve(project_info.id) except Exception as e: logger.error(f"Failed to reserve version. Exception: {e}") return None if version_id is None and commit_token is None: return latest try: file_info = self._compress_and_upload(path) self.versions[version_id] = { "path": path, "updated_at": project_info.updated_at, "previous": latest, "number": int(self.versions[str(latest)]["number"]) + 1 if latest else 1, } self.versions["latest"] = version_id self.set_map(project_info, initialize=False) self.commit( version_id, commit_token, project_info.updated_at, file_info.id, title=version_title, description=version_description, ) return version_id except Exception as e: if self.cancel_reservation(version_id, commit_token): logger.error(f"Version creation failed. Reservation was cancelled. Exception: {e}") else: logger.error( f"Failed to cancel reservation when handling exception. You can cancel your reservation on the web under the Versions tab of the project. Exception: {e}" ) return None
[docs] def commit( self, version_id: int, commit_token: str, updated_at: str, file_id: int, title: Optional[str] = None, description: Optional[str] = None, ): """ Commit project version. This method is used to finalize the version creation process. Requires active reservation. You must call this method after creating project version backup and setting version map :param version_id: Version ID :type version_id: int :param commit_token: Commit token :type commit_token: str :param updated_at: Updated at timestamp :type updated_at: str :param file_id: File ID :type file_id: int :param title: Version title :type title: Optional[str] :param description: Version description :type description: Optional[str] :return: None """ body = { ApiField.ID: version_id, ApiField.COMMIT_TOKEN: commit_token, ApiField.PROJECT_UPDATED_AT: updated_at, ApiField.TEAM_FILE_ID: file_id, } if title: body[ApiField.TITLE] = title if description: body[ApiField.DESCRIPTION] = description response = self._api.post("projects.versions.commit", body) commit_info = response.json() if not commit_info.get("success"): raise RuntimeError("Failed to commit version")
[docs] def reserve(self, project_id: int, retries: int = 6) -> Tuple[int, str]: """ Reserve project version. This method is used before backing up a version to prevent another attempt to create a version at the same time. The first delay of retry is 2 seconds, which doubles with each subsequent attempt. :param project_id: Project ID :type project_id: int :param retries: Number of attempts to reserve version :type retries: int :return: Version ID and commit token :rtype: Tuple[int, str] """ retry_delay = 2 # seconds max_delay = retry_delay * 2**retries while True: try: response = self._api.post( "projects.versions.reserve", {ApiField.PROJECT_ID: project_id} ) reserve_info = response.json() return reserve_info.get(ApiField.ID), reserve_info.get(ApiField.COMMIT_TOKEN) except requests.exceptions.HTTPError as e: details = {} if e.response is not None: try: details = (e.response.json() or {}).get("details", {}) # type: ignore[union-attr] except Exception: details = {} if details.get("useExistingVersion"): version_id = details.get("version", {}).get("id") version = details.get("version", {}).get("version") logger.info( f"No changes to the project since the last version '{version}' with ID '{version_id}'" ) return (None, None) message = (details.get("message") or "").lower() if "is already committing" in message: if retry_delay >= max_delay: raise RuntimeError( "Failed to reserve version. Another process is already committing a version. Maximum number of attempts reached." ) time.sleep(retry_delay) retry_delay *= 2 continue raise
[docs] def cancel_reservation(self, version_id: int, commit_token: str): """ Cancel version reservation for a project. :param version_id: Version ID :type version_id: int :param commit_token: Commit token :type commit_token: str :return: True if reservation was cancelled, False otherwise """ response = self._api.post( "projects.versions.cancel-reservation", {ApiField.ID: version_id, ApiField.COMMIT_TOKEN: commit_token}, ) reserve_info = response.json() return True if reserve_info.get("success") else False
[docs] def restore( self, project_info: Union[ProjectInfo, int], version_id: Optional[int] = None, version_num: Optional[int] = None, skip_missed_entities: bool = False, ) -> ProjectInfo: """ Restore project to a specific version. Version can be specified by ID or number. :param project_info: ProjectInfo object or project ID :type project_info: Union[ProjectInfo, int] :param version_id: Version ID :type version_id: Optional[int] :param version_num: Version number :type version_num: Optional[int] :param skip_missed_entities: Skip missed Images :type skip_missed_entities: bool, default False :return: ProjectInfo object of the restored project :rtype: ProjectInfo or None """ if version_id is None and version_num is None: raise ValueError("Either version_id or version_num must be provided") self.initialize(project_info) if version_num: version_id = None for key, value in self.versions.items(): # pylint: disable=no-member if isinstance(value, dict) and value.get("number") == version_num: version_id = key break if version_id is None: raise ValueError(f"Version {version_num} does not exist") else: if str(version_id) not in self.versions: raise ValueError(f"Version {version_id} does not exist") version_num = self.versions[str(version_id)]["number"] # updated_at = self.versions[str(version_id)]["updated_at"] backup_files = self.versions[str(version_id)]["path"] # turn off this check for now (treating this as a project clone operation) # if updated_at == self.project_info.updated_at: # logger.warning( # f"Project is already on version {version_num} with the same updated_at timestamp" # ) # return if backup_files is None: logger.warning( f"Project can't be restored to version {version_num} because it doesn't have restore point." ) return dst_project_name = self.PROJECT_NAME_TEMPLATE.format( project_name=self.project_info.name, version_num=version_num ) dst_project_desc = self.PROJECT_DESC_TEMPLATE.format( version_num=version_num, project_id=self.project_info.id, version_id=version_id, ) bin_io = self._download_and_extract(backup_files) new_project_info = self.project_cls.upload_bin( self._api, bin_io, workspace_id=self.project_info.workspace_id, project_name=dst_project_name, project_description=dst_project_desc, skip_missed=skip_missed_entities, ) return new_project_info
def _create_warning_system_file(self): """ Create a file in the system directory to indicate that you cannot manually modify its contents. Path = /system/DO_NOT_DELETE_ANYTHING_HERE.txt """ warning_file = "/system/DO_NOT_DELETE_ANYTHING_HERE.txt" if not self._api.file.exists(self.project_info.team_id, warning_file, recursive=False): temp_file = tempfile.NamedTemporaryFile(delete=False) with open(temp_file.name, "w") as f: f.write("This directory is managed by Supervisely. Do not modify its contents.") self._api.file.upload(self.project_info.team_id, temp_file.name, warning_file) def _download_and_extract(self, path: str) -> io.BytesIO: """ Download and extract version data to memory. :param path: Path to the version file :type path: str :return: Binary IO object with extracted file :rtype: io.BytesIO """ temp_dir = tempfile.mkdtemp() local_path = os.path.join(temp_dir, "download.tar.zst") try: self._api.file.download(self.project_info.team_id, path, local_path) # Stream-decompress and stream-read tar to avoid loading the whole archive in memory. try: dctx = zstd.ZstdDecompressor() with open(local_path, "rb") as zst_f: with dctx.stream_reader(zst_f) as reader: with tarfile.open(fileobj=reader, mode="r|") as tar: for member in tar: if member.name == "version.bin": file = tar.extractfile(member) if not file: raise RuntimeError("version.bin not found in the archive") return io.BytesIO(file.read()) raise RuntimeError("version.bin not found in the archive") except Exception: # Fallback: one-shot decompress with open(local_path, "rb") as zst_f: decompressed_data = zstd.decompress(zst_f.read()) with tarfile.open(fileobj=io.BytesIO(decompressed_data), mode="r") as tar: file = tar.extractfile("version.bin") if not file: raise RuntimeError("version.bin not found in the archive") return io.BytesIO(file.read()) except Exception as e: raise RuntimeError(f"Failed to extract version: {e}") finally: remove_dir(temp_dir) def _generate_save_path(self): """ Generate a path for the new version archive where it will be saved in the Team Files. Archive format: {timestamp}.tar.zst :return: Path for the new version archive :rtype: str """ timestamp = datetime.now().strftime("%Y%m%d%H%M%S") path = os.path.join(self.project_dir, timestamp + ".tar.zst") return path def _get_latest_id(self): """ Get the ID of the latest version from the versions map (versions.json). """ latest = self.versions.get("latest", None) if not latest: return None return latest def _compress_and_upload(self, path: str) -> dict: """ Save project in binary format in archive to the Team Files. Binary file name: version.bin :param changes: Changes between current and previous version :type changes: bool :return: File info :rtype: dict """ temp_dir = tempfile.mkdtemp() data = None try: data = self.project_cls.download_bin( self._api, self.project_info.id, batch_size=self._batch_size, return_bytesio=True ) info = tarfile.TarInfo(name="version.bin") data.seek(0, io.SEEK_END) info.size = data.tell() data.seek(0) zst_archive_path = os.path.join(temp_dir, "download.tar.zst") # Stream-decompress and stream-read tar to avoid loading the whole archive in memory. try: cctx = zstd.ZstdCompressor() with open(zst_archive_path, "wb") as zst_f: try: stream = cctx.stream_writer(zst_f, closefd=False) except TypeError: stream = cctx.stream_writer(zst_f) with stream as compressor: with tarfile.open(fileobj=compressor, mode="w|") as tar: tar.addfile(tarinfo=info, fileobj=data) except Exception: # Fallback: build tar in memory + one-shot compress tar_data = io.BytesIO() with tarfile.open(fileobj=tar_data, mode="w") as tar: tar.addfile(tarinfo=info, fileobj=data) tar_data.seek(0) with open(zst_archive_path, "wb") as zst_f: zst_f.write(zstd.compress(tar_data.read())) file_info = self._api.file.upload(self.project_info.team_id, zst_archive_path, path) return file_info finally: if data is not None: try: data.close() except Exception: pass remove_dir(temp_dir)