import io
import os
import tarfile
import tempfile
import time
from datetime import datetime
from typing import List, NamedTuple, Optional, Tuple, Union
import requests
import zstd
from supervisely._utils import logger
from supervisely.api.module_api import ApiField, ModuleApiBase
from supervisely.api.project_api import ProjectInfo
from supervisely.io import json
from supervisely.io.fs import remove_dir, silent_remove
[docs]class VersionInfo(NamedTuple):
"""
Object with image parameters from Supervisely that describes the version of the project.
"""
id: int
project_id: int
created_by: int
team_file_id: int
version: int
description: str
status: str
created_at: str
updated_at: str
project_updated_at: str
team_id: int
name: str
[docs]class DataVersion(ModuleApiBase):
"""
Class for managing project versions.
This class provides methods for creating, restoring, and managing project versions.
"""
def __init__(self, api):
"""
Class for managing project versions.
"""
from supervisely import Api
self._api: Api = api
self.__storage_dir: str = "/system/versions/"
self.__version_format: str = "v1.0.0"
self.project_info = None
self.project_dir = None
self.versions_path = None
self.versions = None
[docs] @staticmethod
def info_sequence():
"""
NamedTuple VersionInfo with API Fields containing information about Project Version.
"""
return [
ApiField.ID,
ApiField.PROJECT_ID,
ApiField.CREATED_BY_ID,
ApiField.TEAM_FILE_ID,
ApiField.VERSION,
ApiField.DESCRIPTION,
ApiField.STATUS,
ApiField.CREATED_AT,
ApiField.UPDATED_AT,
ApiField.PROJECT_UPDATED_AT,
ApiField.TEAM_ID,
ApiField.NAME,
]
[docs] @staticmethod
def info_tuple_name():
"""
NamedTuple name - **VersionInfo**.
"""
return "VersionInfo"
[docs] def initialize(self, project_info: Union[ProjectInfo, int]):
"""
Initialize project versions.
:param project_info: ProjectInfo object or project ID
:type project_info: Union[ProjectInfo, int]
"""
if isinstance(project_info, int):
project_info = self._api.project.get_info_by_id(project_info)
self.project_info: ProjectInfo = project_info
self.project_dir: str = os.path.join(self.__storage_dir, str(self.project_info.id))
self.versions_path: str = os.path.join(self.project_dir, "versions.json")
self.versions: dict = self.get_map(self.project_info, do_initialization=False)
if self.project_info.version is None:
self._create_warning_system_file()
[docs] def get_list(self, project_id: int, filters: Optional[List] = None) -> List[VersionInfo]:
"""
Get list of project versions.
:param project_id: Project ID
:type project_id: int
:param filters: Filters
:type filters: Optional[List]
:return: List of project versions
:rtype: List[VersionInfo]
"""
data = {ApiField.PROJECT_ID: project_id}
if filters:
data[ApiField.FILTER] = filters
return self.get_list_all_pages("projects.versions.list", data)
[docs] def get_id_by_number(self, project_id: int, version_num: int) -> int:
"""
Get version ID by version number.
:param project_id: Project ID
:type project_id: int
:param version_num: Version number
:type version_num: int
:return: Version ID
:rtype: int or None
"""
filter = [
{
ApiField.FIELD: ApiField.VERSION,
ApiField.OPERATOR: "=",
ApiField.VALUE: int(version_num),
}
]
versions = self.get_list(project_id, filters=filter)
if len(versions) > 0:
return versions[0].id
return None
[docs] def get_map(self, project_info: Union[ProjectInfo, int], do_initialization: bool = True):
"""
Get project versions map from storage.
:param project_info: ProjectInfo object or project ID
:type project_info: Union[ProjectInfo, int]
:param do_initialization: Initialize project versions. Set to False for internal use.
:type do_initialization: bool
:return: Project versions
:rtype: dict
"""
if do_initialization:
self.initialize(project_info)
try:
versions = self._api.file.get_json_file_content(
self.project_info.team_id, self.versions_path
)
versions = versions if versions else {}
except FileNotFoundError:
versions = {"format": self.__version_format}
return versions
[docs] def set_map(self, project_info: Union[ProjectInfo, int], initialize: bool = True):
"""
Save project versions map to storage.
:param project_info: ProjectInfo object or project ID
:type project_info: Union[ProjectInfo, int]
:param initialize: Initialize project versions. Set to False for internal use.
:type initialize: bool
:return: None
"""
if initialize:
self.initialize(project_info)
temp_dir = tempfile.mkdtemp()
local_versions = os.path.join(temp_dir, "versions.json")
json.dump_json_file(self.versions, local_versions)
file_info = self._api.file.upload(
self.project_info.team_id, local_versions, self.versions_path
)
if file_info is None:
raise RuntimeError("Failed to save versions")
remove_dir(temp_dir)
[docs] def create(
self,
project_info: Union[ProjectInfo, int],
version_title: Optional[str] = None,
version_description: Optional[str] = None,
) -> int:
"""
Create a new project version.
Returns the ID of the new version.
If the project is already on the latest version, returns the latest version ID.
If the project version cannot be created, returns None.
:param project_info: ProjectInfo object or project ID
:type project_info: Union[ProjectInfo, int]
:param version_title: Version title
:type version_title: Optional[str]
:param version_description: Version description
:type version_description: Optional[str]
:return: Version ID
:rtype: int
"""
if isinstance(project_info, int):
project_info = self._api.project.get_info_by_id(project_info)
if (
"app.supervise.ly" in self._api.server_address
or "app.supervisely.com" in self._api.server_address
):
if self._api.team.get_info_by_id(project_info.team_id).usage.plan == "free":
logger.warning(
"Project versioning is not available for teams with Free plan. Please upgrade to Pro to enable versioning."
)
return None
self.initialize(project_info)
path = self._generate_save_path()
latest = self._get_latest_id()
try:
version_id, commit_token = self.reserve(project_info.id)
except Exception as e:
logger.error(f"Failed to reserve version. Exception: {e}")
return None
if version_id is None and commit_token is None:
return latest
try:
file_info = self._compress_and_upload(path)
self.versions[version_id] = {
"path": path,
"updated_at": project_info.updated_at,
"previous": latest,
"number": int(self.versions[str(latest)]["number"]) + 1 if latest else 1,
}
self.versions["latest"] = version_id
self.set_map(project_info, initialize=False)
self.commit(
version_id,
commit_token,
project_info.updated_at,
file_info.id,
title=version_title,
description=version_description,
)
return version_id
except Exception as e:
if self.cancel_reservation(version_id, commit_token):
logger.error(f"Version creation failed. Reservation was cancelled. Exception: {e}")
else:
logger.error(
f"Failed to cancel reservation when handling exception. You can cancel your reservation on the web under the Versions tab of the project. Exception: {e}"
)
return None
[docs] def commit(
self,
version_id: int,
commit_token: str,
updated_at: str,
file_id: int,
title: Optional[str] = None,
description: Optional[str] = None,
):
"""
Commit project version.
This method is used to finalize the version creation process.
Requires active reservation.
You must call this method after creating project version backup and setting version map
:param version_id: Version ID
:type version_id: int
:param commit_token: Commit token
:type commit_token: str
:param updated_at: Updated at timestamp
:type updated_at: str
:param file_id: File ID
:type file_id: int
:param title: Version title
:type title: Optional[str]
:param description: Version description
:type description: Optional[str]
:return: None
"""
body = {
ApiField.ID: version_id,
ApiField.COMMIT_TOKEN: commit_token,
ApiField.PROJECT_UPDATED_AT: updated_at,
ApiField.TEAM_FILE_ID: file_id,
}
if title:
body[ApiField.TITLE] = title
if description:
body[ApiField.DESCRIPTION] = description
response = self._api.post("projects.versions.commit", body)
commit_info = response.json()
if not commit_info.get("success"):
raise RuntimeError("Failed to commit version")
[docs] def reserve(self, project_id: int, retries: int = 6) -> Tuple[int, str]:
"""
Reserve project version.
This method is used before backing up a version to prevent another attempt to create a version at the same time.
The first delay of retry is 2 seconds, which doubles with each subsequent attempt.
:param project_id: Project ID
:type project_id: int
:param retries: Number of attempts to reserve version
:type retries: int
:return: Version ID and commit token
:rtype: Tuple[int, str]
"""
retry_delay = 2 # seconds
max_delay = retry_delay * 2**retries
while True:
try:
response = self._api.post(
"projects.versions.reserve", {ApiField.PROJECT_ID: project_id}
)
reserve_info = response.json()
return reserve_info.get(ApiField.ID), reserve_info.get(ApiField.COMMIT_TOKEN)
except requests.exceptions.HTTPError as e:
if e.response.json().get("details", {}).get("useExistingVersion"):
version_id = e.response.json().get("details", {}).get("version").get("id")
version = e.response.json().get("details", {}).get("version").get("version")
logger.info(
f"No changes to the project since the last version '{version}' with ID '{version_id}'"
)
return (None, None)
elif "is already committing" in e.response.json().get("details", {}).get("message"):
if retry_delay >= max_delay:
raise RuntimeError(
"Failed to reserve version. Another process is already committing a version. Maximum number of attempts reached."
)
version = e.response.json().get("details", {}).get("version").get("version")
time.sleep(retry_delay)
retry_delay *= 2
[docs] def cancel_reservation(self, version_id: int, commit_token: str):
"""
Cancel version reservation for a project.
:param version_id: Version ID
:type version_id: int
:param commit_token: Commit token
:type commit_token: str
:return: True if reservation was cancelled, False otherwise
"""
response = self._api.post(
"projects.versions.cancel-reservation",
{ApiField.ID: version_id, ApiField.COMMIT_TOKEN: commit_token},
)
reserve_info = response.json()
return True if reserve_info.get("success") else False
[docs] def restore(
self,
project_info: Union[ProjectInfo, int],
version_id: Optional[int] = None,
version_num: Optional[int] = None,
skip_missed_entities: bool = False,
) -> ProjectInfo:
"""
Restore project to a specific version.
Version can be specified by ID or number.
:param project_info: ProjectInfo object or project ID
:type project_info: Union[ProjectInfo, int]
:param version_id: Version ID
:type version_id: Optional[int]
:param version_num: Version number
:type version_num: Optional[int]
:param skip_missed_entities: Skip missed Images
:type skip_missed_entities: bool, default False
:return: ProjectInfo object of the restored project
:rtype: ProjectInfo or None
"""
from supervisely.project.project import Project
if version_id is None and version_num is None:
raise ValueError("Either version_id or version_num must be provided")
self.initialize(project_info)
if version_num:
version_id = None
for key, value in self.versions.items():
# pylint: disable=no-member
if isinstance(value, dict) and value.get("number") == version_num:
version_id = key
break
if version_id is None:
raise ValueError(f"Version {version_num} does not exist")
else:
if str(version_id) not in self.versions:
raise ValueError(f"Version {version_id} does not exist")
version_num = self.versions[str(version_id)]["number"]
updated_at = self.versions[str(version_id)]["updated_at"]
backup_files = self.versions[str(version_id)]["path"]
# turn off this check for now (treating this as a project clone operation)
# if updated_at == self.project_info.updated_at:
# logger.warning(
# f"Project is already on version {version_num} with the same updated_at timestamp"
# )
# return
if backup_files is None:
logger.warning(
f"Project can't be restored to version {version_num} because it doesn't have restore point."
)
return
bin_io = self._download_and_extract(backup_files)
new_project_info = Project.upload_bin(
self._api,
bin_io,
self.project_info.workspace_id,
skip_missed=skip_missed_entities,
)
return new_project_info
def _create_warning_system_file(self):
"""
Create a file in the system directory to indicate that you cannot manually modify its contents.
Path = /system/DO_NOT_DELETE_ANYTHING_HERE.txt
"""
warning_file = "/system/DO_NOT_DELETE_ANYTHING_HERE.txt"
if not self._api.file.exists(self.project_info.team_id, warning_file, recursive=False):
temp_file = tempfile.NamedTemporaryFile(delete=False)
with open(temp_file.name, "w") as f:
f.write("This directory is managed by Supervisely. Do not modify its contents.")
self._api.file.upload(self.project_info.team_id, temp_file.name, warning_file)
def _download_and_extract(self, path: str) -> io.BytesIO:
"""
Download and extract version data to memory.
:param path: Path to the version file
:type path: str
:return: Binary IO object with extracted file
:rtype: io.BytesIO
"""
temp_dir = tempfile.mkdtemp()
local_path = os.path.join(temp_dir, "download.tar.zst")
try:
self._api.file.download(self.project_info.team_id, path, local_path)
with open(local_path, "rb") as zst:
decompressed_data = zstd.decompress(zst.read())
with tarfile.open(fileobj=io.BytesIO(decompressed_data)) as tar:
file = tar.extractfile("version.bin")
if not file:
raise RuntimeError("version.bin not found in the archive")
data = file.read()
bin_io = io.BytesIO(data)
return bin_io
except Exception as e:
raise RuntimeError(f"Failed to extract version: {e}")
finally:
remove_dir(temp_dir)
def _generate_save_path(self):
"""
Generate a path for the new version archive where it will be saved in the Team Files.
Archive format: {timestamp}.tar.zst
:return: Path for the new version archive
:rtype: str
"""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
path = os.path.join(self.project_dir, timestamp + ".tar.zst")
return path
def _get_latest_id(self):
"""
Get the ID of the latest version from the versions map (versions.json).
"""
latest = self.versions.get("latest", None)
if not latest:
return None
return latest
def _compress_and_upload(self, path: str) -> dict:
"""
Save project in binary format in archive to the Team Files.
Binary file name: version.bin
:param changes: Changes between current and previous version
:type changes: bool
:return: File info
:rtype: dict
"""
from supervisely.project.project import Project
temp_dir = tempfile.mkdtemp()
data = Project.download_bin(
self._api, self.project_info.id, batch_size=200, return_bytesio=True
)
data.seek(0)
info = tarfile.TarInfo(name="version.bin")
info.size = len(data.getvalue())
chunk_size = 1024 * 1024 * 50 # 50 MiB
tar_data = io.BytesIO()
# Create a tarfile object that writes into the BytesIO object
with tarfile.open(fileobj=tar_data, mode="w") as tar:
tar.addfile(tarinfo=info, fileobj=data)
data.close()
# Reset the BytesIO object's cursor to the beginning
tar_data.seek(0)
zst_archive_path = os.path.join(temp_dir, "download.tar.zst")
with open(zst_archive_path, "wb") as zst:
while True:
chunk = tar_data.read(chunk_size)
if not chunk:
break
zst.write(zstd.compress(chunk))
file_info = self._api.file.upload(self.project_info.team_id, zst_archive_path, path)
tar_data.close()
remove_dir(temp_dir)
return file_info