# coding: utf-8
"""api connection to the server which allows user to communicate with Supervisely"""
from __future__ import annotations
import asyncio
import datetime
import gc
import glob
import json
import os
import shutil
from logging import Logger
from pathlib import Path
from typing import (
Any,
AsyncGenerator,
AsyncIterable,
Dict,
Generator,
Iterable,
Literal,
Mapping,
Optional,
Union,
)
from urllib.parse import urljoin, urlparse
import httpx
import jwt
import requests
from dotenv import get_key, load_dotenv, set_key
from pkg_resources import parse_version
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
import supervisely.api.advanced_api as advanced_api
import supervisely.api.agent_api as agent_api
import supervisely.api.annotation_api as annotation_api
import supervisely.api.app_api as app_api
import supervisely.api.dataset_api as dataset_api
import supervisely.api.entities_collection_api as entities_collection_api
import supervisely.api.file_api as file_api
import supervisely.api.github_api as github_api
import supervisely.api.image_annotation_tool_api as image_annotation_tool_api
import supervisely.api.image_api as image_api
import supervisely.api.import_storage_api as import_stoarge_api
import supervisely.api.issues_api as issues_api
import supervisely.api.labeling_job_api as labeling_job_api
import supervisely.api.labeling_queue_api as labeling_queue_api
import supervisely.api.nn.neural_network_api as neural_network_api
import supervisely.api.object_class_api as object_class_api
import supervisely.api.plugin_api as plugin_api
import supervisely.api.pointcloud.pointcloud_api as pointcloud_api
import supervisely.api.pointcloud.pointcloud_episode_api as pointcloud_episode_api
import supervisely.api.project_api as project_api
import supervisely.api.remote_storage_api as remote_storage_api
import supervisely.api.report_api as report_api
import supervisely.api.role_api as role_api
import supervisely.api.storage_api as storage_api
import supervisely.api.task_api as task_api
import supervisely.api.team_api as team_api
import supervisely.api.user_api as user_api
import supervisely.api.video.video_api as video_api
import supervisely.api.video_annotation_tool_api as video_annotation_tool_api
import supervisely.api.volume.volume_api as volume_api
import supervisely.api.workspace_api as workspace_api
import supervisely.io.env as sly_env
from supervisely._utils import camel_to_snake, is_community, is_development
from supervisely.api.module_api import ApiField
from supervisely.io.network_exceptions import (
RetryableRequestException,
process_requests_exception,
process_requests_exception_async,
process_unhandled_request,
)
from supervisely.project.project_meta import ProjectMeta
from supervisely.sly_logger import logger
SUPERVISELY_TASK_ID = "SUPERVISELY_TASK_ID"
SUPERVISELY_PUBLIC_API_RETRIES = "SUPERVISELY_PUBLIC_API_RETRIES"
SUPERVISELY_PUBLIC_API_RETRY_SLEEP_SEC = "SUPERVISELY_PUBLIC_API_RETRY_SLEEP_SEC"
SERVER_ADDRESS = "SERVER_ADDRESS"
SUPERVISELY_API_SERVER_ADDRESS = "SUPERVISELY_API_SERVER_ADDRESS"
API_TOKEN = "API_TOKEN"
TASK_ID = "TASK_ID"
SUPERVISELY_ENV_FILE = os.path.join(Path.home(), "supervisely.env")
class ApiContext:
"""
Context manager for the API object for optimization purposes.
Use this context manager when you need to perform a series of operations on the same project or dataset.
It allows you to avoid redundant API calls to get the same project or dataset info multiple times.
:param api: API object.
:type api: :class:`Api`
:param project_id: Project ID.
:type project_id: int, optional
:param dataset_id: Dataset ID.
:type dataset_id: int, optional
:param project_meta: ProjectMeta object.
:type project_meta: :class:`ProjectMeta`, optional
:raises: :class:`RuntimeError`, if api is None.
:Usage example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
with ApiContext(
api,
project_id=33333,
dataset_id=99999,
project_meta=project_meta,
with_alpha_masks=True,
):
api.annotation.upload_paths(image_ids, ann_paths, anns_progress)
# another code here
"""
def __init__(
self,
api: Api,
project_id: Optional[int] = None,
dataset_id: Optional[int] = None,
project_meta: Optional[ProjectMeta] = None,
with_alpha_masks: Optional[bool] = True,
):
if api is None:
raise RuntimeError("Api object is None")
self.api = api
self.project_id = project_id
self.dataset_id = dataset_id
self.project_meta = project_meta
self.with_alpha_masks = with_alpha_masks
def __enter__(self):
self.api.optimization_context = {
"project_id": self.project_id,
"dataset_id": self.dataset_id,
"project_meta": self.project_meta,
"with_alpha_masks": self.with_alpha_masks,
}
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.api.optimization_context = {}
class UserSession:
"""
UserSession object contains info that is returned after user authentication.
:param server: Server url.
:type server: str
:raises: :class:`RuntimeError`, if server url is invalid.
"""
def __init__(self, server_address: str):
self.api_token = None
self.team_id = None
self.workspace_id = None
self.server_address = server_address
if not self._normalize_and_validate_server_url():
raise RuntimeError(f"Invalid server url: {server_address}")
def __str__(self):
return f"UserSession(server={self.server_address})"
def __repr__(self):
return self.__str__()
def _normalize_and_validate_server_url(self) -> bool:
"""
Validate server url.
:return: True if server url is valid, False otherwise.
"""
self.server_address = Api.normalize_server_address(self.server_address)
if not self.server_address.startswith("https://"):
response = requests.get(self.server_address, allow_redirects=False)
if (300 <= response.status_code < 400) or (
response.headers.get("Location", "").startswith("https://")
):
self.server_address = self.server_address.replace("http://", "https://")
result = urlparse(self.server_address)
if all([result.scheme, result.netloc]):
try:
response = requests.get(self.server_address)
if response.status_code == 200:
return True
except requests.RequestException:
pass
return False
def _setattrs_user_session(self, decoded_token):
"""
Add decoded info to UserSession object.
:param decoded_token: Decoded token.
:type decoded_token: dict
:return: None
:rtype: :class:`NoneType`
"""
for key, value in decoded_token.items():
if key == "group":
self.team = value
self.team_id = value["id"]
elif key == "workspace":
self.workspace = value
self.workspace_id = value["id"]
else:
key = camel_to_snake(key)
setattr(self, key, value)
def log_in(self, login: str, password: str) -> UserSession:
"""
Authenticate user and return UserSession object with decoded info from JWT token.
:param login: User login.
:type login: str
:param password: User password.
:type password: str
:return: UserSession object
:rtype: :class:`UserSession`
"""
login_url = urljoin(self.server_address, "api/account")
payload = {"login": login, "password": password}
response = requests.post(login_url, data=payload)
del password
gc.collect()
if response.status_code == 200:
data = response.json()
jwt_token = data.get("token", None)
decoded_token = jwt.decode(jwt_token, options={"verify_signature": False})
self._setattrs_user_session(decoded_token)
return self
else:
raise RuntimeError(f"Failed to authenticate user: status code {response.status_code}")
[docs]class Api:
"""
An API connection to the server with which you can communicate with your teams, workspaces and projects. :class:`Api<Api>` object is immutable.
:param server_address: Address of the server.
:type server_address: str
:param token: Unique secret token associated with your agent.
:type token: str
:param retry_count: The number of attempts to connect to the server.
:type retry_count: int, optional
:param retry_sleep_sec: The number of seconds to delay between attempts to connect to the server.
:type retry_sleep_sec: int, optional
:param external_logger: Logger class object.
:type external_logger: logger, optional
:param ignore_task_id:
:type ignore_task_id: bool, optional
:param api_server_address: Address of the API server.
:type api_server_address: str, optional
:param check_instance_version: Check if the given version is lower or equal to the current
Supervisely instance version. If set to True, will try to read the version from the environment variable
"MINIMUM_INSTANCE_VERSION_FOR_SDK". If set to a string, will use this string as the version to check.
If set to False, will skip the check.
:type check_instance_version: bool or str, optional
:raises: :class:`ValueError`, if token is None or it length != 128
:Usage example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
# Pass values into the API constructor (optional, not recommended)
# api = sly.Api(server_address="https://app.supervisely.com", token="4r47N...xaTatb")
"""
_checked_servers = set()
def __init__(
self,
server_address: Optional[str] = None,
token: Optional[str] = None,
retry_count: Optional[int] = 10,
retry_sleep_sec: Optional[int] = None,
external_logger: Optional[Logger] = None,
ignore_task_id: bool = False,
api_server_address: Optional[str] = None,
check_instance_version: Union[bool, str] = False,
):
self.logger = external_logger or logger
if server_address is None:
server_address = os.environ.get(SERVER_ADDRESS, None)
if token is None:
token = os.environ.get(API_TOKEN, None)
if server_address is None:
raise ValueError(
"SERVER_ADDRESS env variable is undefined, https://developer.supervisely.com/getting-started/basics-of-authentication"
)
self.server_address = Api.normalize_server_address(server_address)
self._api_server_address = None
if api_server_address is None:
api_server_address = os.environ.get(SUPERVISELY_API_SERVER_ADDRESS, None)
if api_server_address is not None:
self._api_server_address = Api.normalize_server_address(api_server_address)
if retry_count is None:
retry_count = int(os.getenv(SUPERVISELY_PUBLIC_API_RETRIES, "10"))
if retry_sleep_sec is None:
retry_sleep_sec = int(os.getenv(SUPERVISELY_PUBLIC_API_RETRY_SLEEP_SEC, "1"))
self.token = token
self.headers = {}
if token is not None:
self.headers["x-api-key"] = token
self.task_id = os.getenv(SUPERVISELY_TASK_ID)
if self.task_id is not None and ignore_task_id is False:
self.headers["x-task-id"] = self.task_id
self.context = {}
self.additional_fields = {}
self.optimization_context = {}
self.team = team_api.TeamApi(self)
self.workspace = workspace_api.WorkspaceApi(self)
self.project = project_api.ProjectApi(self)
self.nn = neural_network_api.NeuralNetworkApi(self)
self.task = task_api.TaskApi(self)
self.dataset = dataset_api.DatasetApi(self)
self.image = image_api.ImageApi(self)
self.annotation = annotation_api.AnnotationApi(self)
self.plugin = plugin_api.PluginApi(self)
self.agent = agent_api.AgentApi(self)
self.role = role_api.RoleApi(self)
self.user = user_api.UserApi(self)
self.labeling_job = labeling_job_api.LabelingJobApi(self)
self.labeling_queue = labeling_queue_api.LabelingQueueApi(self)
self.video = video_api.VideoApi(self)
# self.project_class = project_class_api.ProjectClassApi(self)
self.object_class = object_class_api.ObjectClassApi(self)
self.report = report_api.ReportApi(self)
self.pointcloud = pointcloud_api.PointcloudApi(self)
self.pointcloud_episode = pointcloud_episode_api.PointcloudEpisodeApi(self)
self.app = app_api.AppApi(self)
self.file = file_api.FileApi(self)
self.storage = storage_api.StorageApi(self)
self.img_ann_tool = image_annotation_tool_api.ImageAnnotationToolApi(self)
self.vid_ann_tool = video_annotation_tool_api.VideoAnnotationToolApi(self)
self.advanced = advanced_api.AdvancedApi(self)
self.import_storage = import_stoarge_api.ImportStorageApi(self)
self.remote_storage = remote_storage_api.RemoteStorageApi(self)
self.github = github_api.GithubApi(self)
self.volume = volume_api.VolumeApi(self)
self.issues = issues_api.IssuesApi(self)
self.entities_collection = entities_collection_api.EntitiesCollectionApi(self)
self.retry_count = retry_count
self.retry_sleep_sec = retry_sleep_sec
skip_from_env = sly_env.supervisely_skip_https_user_helper_check()
self._skip_https_redirect_check = (
skip_from_env or self.server_address in Api._checked_servers
)
self.logger.trace(
f"Skip HTTPS redirect check on API init: {self._skip_https_redirect_check}. ENV: {skip_from_env}. Checked servers: {Api._checked_servers}"
)
self._require_https_redirect_check = (
False
if self._skip_https_redirect_check
else not self.server_address.startswith("https://")
)
if check_instance_version:
self._check_version(None if check_instance_version is True else check_instance_version)
self.async_httpx_client: httpx.AsyncClient = None
self.httpx_client: httpx.Client = None
self._semaphore = None
self._instance_version = None
@classmethod
def normalize_server_address(cls, server_address: str) -> str:
""" """
result = server_address.strip("/")
if ("http://" not in result) and ("https://" not in result):
result = "http://" + result
return result
[docs] @classmethod
def from_env(
cls,
retry_count: int = 10,
ignore_task_id: bool = False,
env_file: str = SUPERVISELY_ENV_FILE,
check_instance_version: Union[bool, str] = False,
) -> Api:
"""
Initialize API use environment variables.
:param retry_count: The number of attempts to connect to the server.
:type retry_count: int
:param ignore_task_id:
:type ignore_task_id: bool
:param env_file: Path to your .env file.
:type env_file: str
:param check_instance_version: Check if the given version is lower or equal to the current
version of the Supervisely instance.
:type check_instance_version: bool or str, optional
:return: Api object
:rtype: :class:`Api<supervisely.api.api.Api>`
:Usage example:
.. code-block:: python
import supervisely as sly
os.environ['SERVER_ADDRESS'] = 'https://app.supervisely.com'
os.environ['API_TOKEN'] = 'Your Supervisely API Token'
api = sly.Api.from_env()
# alternatively you can store SERVER_ADDRESS and API_TOKEN
# in "~/supervisely.env" .env file
# Learn more here: https://developer.supervisely.com/app-development/basics/add-private-app#create-.env-file-supervisely.env-with-the-following-content-learn-more-here
api = sly.Api.from_env()
"""
server_address = sly_env.server_address(raise_not_found=False)
token = sly_env.api_token(raise_not_found=False)
if is_development() and None in (server_address, token):
env_path = os.path.expanduser(env_file)
if os.path.exists(env_path):
_, extension = os.path.splitext(env_path)
if extension == ".env":
load_dotenv(env_path)
server_address = sly_env.server_address()
token = sly_env.api_token()
else:
raise ValueError(f"'{env_path}' is not an '*.env' file")
else:
raise FileNotFoundError(f"File not found: '{env_path}'")
if server_address is None:
raise ValueError(
"SERVER_ADDRESS env variable is undefined. Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication"
)
if token is None:
raise ValueError(
"API_TOKEN env variable is undefined. Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication"
)
return cls(
server_address,
token,
retry_count=retry_count,
ignore_task_id=ignore_task_id,
check_instance_version=check_instance_version,
)
[docs] def add_additional_field(self, key: str, value: str) -> None:
"""
Add given key and value to additional_fields dictionary.
:param key: New key.
:type key: str
:param value: New value.
:type value: str
:return: None
:rtype: :class:`NoneType`
"""
self.additional_fields[key] = value
@property
def instance_version(self) -> str:
"""Return Supervisely instance version, e.g. "6.9.13".
If the version cannot be determined, return "unknown".
:return: Supervisely instance version or "unknown" if the version cannot be determined.
:rtype: str
:Usage example:
.. code-block:: python
import supervisely as sly
api = sly.Api(server_address='https://app.supervisely.com', token='4r47N...xaTatb')
print(api.instance_version)
# Output:
# '6.9.13'
"""
try:
if self._instance_version is None:
self._instance_version = (
self.post("instance.version", {}).json().get(ApiField.VERSION)
)
except Exception as e:
logger.warning(f"Failed to get instance version from server: {e}")
self._instance_version = "unknown"
return self._instance_version
[docs] def is_version_supported(self, version: Optional[str] = None) -> Union[bool, None]:
"""Check if the given version is lower or equal to the current Supervisely instance version.
If the version omitted, will try to read it from the environment variable "MINIMUM_INSTANCE_VERSION_FOR_SDK".
If the version is lower or equal, return True, otherwise False.
If the version of the instance cannot be determined, return False.
:param version: Version to check.
:type version: Optional[str], e.g. "6.9.13"
:return: True if the given version is lower or equal to the current Supervisely
instance version, otherwise False.
:rtype: bool
:Usage example:
.. code-block:: python
import supervisely as sly
api = sly.Api(server_address='https://app.supervisely.com', token='4r47N...xaTatb')
version_to_check = "6.9.13"
print(api.is_version_supported(version_to_check))
# Output:
# True
"""
instance_version = self.instance_version
if instance_version == "unknown":
return
if not version:
version = sly_env.mininum_instance_version_for_sdk()
if not version:
logger.debug(
"Cant find MINIMUM_INSTANCE_VERSION_FOR_SDK in environment variables, "
"check of the minimum version is skipped."
)
return
try:
version = str(version)
except Exception:
logger.warning(
f"Provided version {version!r} is not a valid version string "
f"(expected format: 'x.y.z'). The output of this function will be incorrect."
)
return
return parse_version(instance_version) >= parse_version(version)
def _check_version(self, version: Optional[str] = None) -> None:
"""Check if the given version is compatible with the current Supervisely instance version.
Compatible means that the given version is lower or equal to the current Supervisely instance version.
If check was not successful, log a debug message, if the version is not supported, log a warning message.
:param version: Version to check.
:type version: Optional[str], e.g. "6.9.13"
"""
# Since it's a informational message, we don't raise an exception if the check fails
# in any case, we don't want to interrupt the user's workflow.
try:
check_result = self.is_version_supported(version)
if check_result is None:
logger.debug(
"Failed to check if the instance version meets the minimum requirements "
"of current SDK version. "
"Ensure that the MINIMUM_INSTANCE_VERSION_FOR_SDK environment variable is set. "
"Usually you can ignore this message, but if you're adding new features, "
"which will require upgrade of the Supervisely instance, you should update "
"it supervisely.__init__.py file."
)
if check_result is False:
message = (
"The current version of the Supervisely instance is not supported by the SDK. "
"Some features may not work correctly."
)
if not is_community():
message += (
" Please upgrade the Supervisely instance to the latest version (recommended) "
"or downgrade the SDK to the version that supports the current instance (not recommended). "
"Refer to this docs for more information: "
"https://docs.supervisely.com/enterprise-edition/get-supervisely/upgrade "
"Check out changelog for the latest version of Supervisely: "
"https://app.supervisely.com/changelog"
)
logger.warning(message)
except Exception as e:
logger.debug(
f"Tried to check version compatibility between SDK and instance, but failed: {e}"
)
[docs] def post(
self,
method: str,
data: Dict,
retries: Optional[int] = None,
stream: Optional[bool] = False,
raise_error: Optional[bool] = False,
) -> requests.Response:
"""
Performs POST request to server with given parameters.
:param method: Method name.
:type method: str
:param data: Dictionary to send in the body of the :class:`Request`.
:type data: dict
:param retries: The number of attempts to connect to the server.
:type retries: int, optional
:param stream: Define, if you'd like to get the raw socket response from the server.
:type stream: bool, optional
:param raise_error: Define, if you'd like to raise error if connection is failed. Retries will be ignored.
:type raise_error: bool, optional
:return: Response object
:rtype: :class:`Response<Response>`
"""
if not self._skip_https_redirect_check:
self._check_https_redirect()
if retries is None:
retries = self.retry_count
url = self.api_server_address + "/v3/" + method
logger.trace(f"POST {url}")
for retry_idx in range(retries):
response = None
try:
if type(data) is bytes:
response = requests.post(url, data=data, headers=self.headers, stream=stream)
elif type(data) is MultipartEncoderMonitor or type(data) is MultipartEncoder:
response = requests.post(
url,
data=data,
headers={**self.headers, "Content-Type": data.content_type},
stream=stream,
)
else:
json_body = data
if type(data) is dict:
json_body = {**data, **self.additional_fields}
response = requests.post(
url, json=json_body, headers=self.headers, stream=stream
)
if response.status_code != requests.codes.ok: # pylint: disable=no-member
self._check_version()
Api._raise_for_status(response)
return response
except requests.RequestException as exc:
if (
isinstance(exc, requests.exceptions.HTTPError)
and response.status_code == 400
and self.token is None
):
self.logger.warning(
"API_TOKEN env variable is undefined. See more: https://developer.supervisely.com/getting-started/basics-of-authentication"
)
if raise_error:
raise exc
else:
process_requests_exception(
self.logger,
exc,
method,
url,
verbose=True,
swallow_exc=True,
sleep_sec=min(self.retry_sleep_sec * (2**retry_idx), 60),
response=response,
retry_info={"retry_idx": retry_idx + 1, "retry_limit": retries},
)
except Exception as exc:
process_unhandled_request(self.logger, exc)
raise requests.exceptions.RetryError("Retry limit exceeded ({!r})".format(url))
[docs] def get(
self,
method: str,
params: Dict,
retries: Optional[int] = None,
stream: Optional[bool] = False,
use_public_api: Optional[bool] = True,
) -> requests.Response:
"""
Performs GET request to server with given parameters.
:param method:
:type method: str
:param params: Dictionary to send in the body of the :class:`Request`.
:type method: dict
:param retries: The number of attempts to connect to the server.
:type method: int, optional
:param stream: Define, if you'd like to get the raw socket response from the server.
:type method: bool, optional
:param use_public_api:
:type method: bool, optional
:return: Response object
:rtype: :class:`Response<Response>`
"""
if not self._skip_https_redirect_check:
self._check_https_redirect()
if retries is None:
retries = self.retry_count
url = self.api_server_address + "/v3/" + method
if use_public_api is False:
url = os.path.join(self.server_address, method)
logger.trace(f"GET {url}")
for retry_idx in range(retries):
response = None
try:
json_body = params
if type(params) is dict:
json_body = {**params, **self.additional_fields}
response = requests.get(url, params=json_body, headers=self.headers, stream=stream)
if response.status_code != requests.codes.ok: # pylint: disable=no-member
Api._raise_for_status(response)
return response
except requests.RequestException as exc:
if (
isinstance(exc, requests.exceptions.HTTPError)
and response.status_code == 400
and self.token is None
):
self.logger.warning(
"API_TOKEN env variable is undefined. See more: https://developer.supervisely.com/getting-started/basics-of-authentication"
)
process_requests_exception(
self.logger,
exc,
method,
url,
verbose=True,
swallow_exc=True,
sleep_sec=min(self.retry_sleep_sec * (2**retry_idx), 60),
response=response,
retry_info={"retry_idx": retry_idx + 2, "retry_limit": retries},
)
except Exception as exc:
process_unhandled_request(self.logger, exc)
@staticmethod
def _raise_for_status(response: requests.Response):
"""
Raise error and show message with error code if given response can not connect to server.
:param response: Request class object
"""
http_error_msg = ""
if isinstance(response.reason, bytes):
try:
reason = response.reason.decode("utf-8")
except UnicodeDecodeError:
reason = response.reason.decode("iso-8859-1")
else:
reason = response.reason
if 400 <= response.status_code < 500:
http_error_msg = "%s Client Error: %s for url: %s (%s)" % (
response.status_code,
reason,
response.url,
response.content.decode("utf-8"),
)
elif 500 <= response.status_code < 600:
http_error_msg = "%s Server Error: %s for url: %s (%s)" % (
response.status_code,
reason,
response.url,
response.content.decode("utf-8"),
)
if http_error_msg:
raise requests.exceptions.HTTPError(http_error_msg, response=response)
@staticmethod
def _raise_for_status_httpx(response: httpx.Response):
"""
Raise error and show message with error code if given response can not connect to server.
:param response: Response class object
"""
http_error_msg = ""
if hasattr(response, "reason_phrase"):
reason = response.reason_phrase
else:
reason = "Can't get reason"
def decode_response_content(response: httpx.Response):
try:
return response.content.decode("utf-8")
except Exception as e:
if hasattr(response, "is_stream_consumed"):
return f"Stream is consumed. {e}"
else:
return f"Can't decode response content: {e}"
if 400 <= response.status_code < 500:
http_error_msg = "%s Client Error: %s for url: %s (%s)" % (
response.status_code,
reason,
response.url,
decode_response_content(response),
)
elif 500 <= response.status_code < 600:
http_error_msg = "%s Server Error: %s for url: %s (%s)" % (
response.status_code,
reason,
response.url,
decode_response_content(response),
)
if http_error_msg:
raise httpx.HTTPStatusError(
message=http_error_msg, response=response, request=response.request
)
[docs] @staticmethod
def parse_error(
response: requests.Response,
default_error: Optional[str] = "Error",
default_message: Optional[str] = "please, contact administrator",
):
"""
Processes error from response.
:param response: Request object.
:type method: Request
:param default_error: Error description.
:type method: str, optional
:param default_message: Message to user.
:type method: str, optional
:return: Number of error and message about curren connection mistake
:rtype: :class:`int`, :class:`str`
"""
ERROR_FIELD = "error"
MESSAGE_FIELD = "message"
DETAILS_FIELD = "details"
try:
data_str = response.content.decode("utf-8")
data = json.loads(data_str)
error = data.get(ERROR_FIELD, default_error)
details = data.get(DETAILS_FIELD, {})
if type(details) is dict:
message = details.get(MESSAGE_FIELD, default_message)
else:
message = details[0].get(MESSAGE_FIELD, default_message)
return error, message
except Exception:
return "", ""
def pop_header(self, key: str) -> str:
""" """
if key not in self.headers:
raise KeyError(f"Header {key!r} not found")
return self.headers.pop(key)
def _check_https_redirect(self):
"""
Check if HTTP server should be redirected to HTTPS.
If the server has already been checked before (for any instance of this class),
skip the check to avoid redundant network requests.
"""
if self._require_https_redirect_check is True:
if self.server_address in Api._checked_servers:
self._require_https_redirect_check = False
return
try:
response = requests.get(
self.server_address.replace("http://", "https://"),
allow_redirects=False,
timeout=(5, 15),
)
response.raise_for_status()
self.server_address = self.server_address.replace("http://", "https://")
msg = (
"You're using HTTP server address while the server requires HTTPS. "
"Supervisely automatically changed the server address to HTTPS for you. "
f"Consider updating your server address to {self.server_address}"
)
self.logger.warning(msg)
except:
pass
finally:
Api._checked_servers.add(self.server_address)
self._require_https_redirect_check = False
[docs] @classmethod
def from_credentials(
cls,
server_address: str,
login: str,
password: str,
override: bool = False,
env_file: str = SUPERVISELY_ENV_FILE,
check_instance_version: Union[bool, str] = False,
) -> Api:
"""
Create Api object using credentials and optionally save them to ".env" file with overriding environment variables.
If ".env" file already exists, backup will be created automatically.
All backups will be stored in the same directory with postfix "_YYYYMMDDHHMMSS". You can have not more than 5 last backups.
This method can be used also to update ".env" file.
:param server_address: Supervisely server url.
:type server_address: str
:param login: User login.
:type login: str
:param password: User password.
:type password: str
:param override: If False, return Api object. If True, additionally create ".env" file or overwrite existing (backup file will be created automatically), and override environment variables.
:type override: bool, optional
:param env_file: Path to your .env file.
:type env_file: str, optional
:param check_instance_version: Check if the given version is lower or equal to the current
version of the Supervisely instance.
:type check_instance_version: bool or str, optional
:return: Api object
:Usage example:
.. code-block:: python
import supervisely as sly
server_address = 'https://app.supervisely.com'
login = 'user'
password = 'pass'
api = sly.Api.from_credentials(server_address, login, password)
"""
session = UserSession(server_address).log_in(login, password)
del password
gc.collect()
api = cls(
session.server_address,
session.api_token,
ignore_task_id=True,
check_instance_version=check_instance_version,
)
if override:
if os.path.isfile(env_file):
# create backup
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
backup_file = f"{env_file}_{timestamp}"
shutil.copy2(env_file, backup_file)
if api.token != get_key(env_file, API_TOKEN):
# create new file
os.remove(env_file)
Path(env_file).touch()
# remove old backups
all_backups = sorted(glob.glob(f"{env_file}_" + "[0-9]" * 14))
while len(all_backups) > 5:
os.remove(all_backups.pop(0))
set_key(env_file, SERVER_ADDRESS, session.server_address)
set_key(env_file, API_TOKEN, session.api_token)
if session.team_id:
set_key(env_file, "INIT_GROUP_ID", f"{session.team_id}")
if session.workspace_id:
set_key(env_file, "INIT_WORKSPACE_ID", f"{session.workspace_id}")
load_dotenv(env_file, override=override)
return api
@property
def api_server_address(self) -> str:
"""
Get API server address.
:return: API server address.
:rtype: :class:`str`
:Usage example:
.. code-block:: python
import supervisely as sly
api = sly.Api(server_address='https://app.supervisely.com', token='4r47N...xaTatb')
print(api.api_server_address)
# Output:
# 'https://app.supervisely.com/public/api'
"""
if self._api_server_address is not None:
return self._api_server_address
return f"{self.server_address}/public/api"
[docs] def post_httpx(
self,
method: str,
json: Dict = None,
content: Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]] = None,
files: Union[Mapping] = None,
params: Union[str, bytes] = None,
headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
raise_error: Optional[bool] = False,
timeout: httpx._types.TimeoutTypes = 60,
) -> httpx.Response:
"""
Performs POST request to server with given parameters using httpx.
:param method: Method name.
:type method: str
:param json: Dictionary to send in the body of request.
:type json: dict, optional
:param content: Bytes with data content or dictionary with params.
:type content: bytes or dict, optional
:param files: Files to send in the body of request.
:type files: dict, optional
:param params: URL query parameters.
:type params: str, bytes, optional
:param headers: Custom headers to include in the request.
:type headers: dict, optional
:param retries: The number of attempts to connect to the server.
:type retries: int, optional
:param raise_error: Define, if you'd like to raise error if connection is failed.
:type raise_error: bool, optional
:param timeout: Overall timeout for the request.
:type timeout: float, optional
:return: Response object
:rtype: :class:`httpx.Response`
"""
self._set_client()
if retries is None:
retries = self.retry_count
url = self.api_server_address + "/v3/" + method
logger.trace(f"POST {url}")
if headers is None:
headers = self.headers.copy()
else:
headers = {**self.headers, **headers}
for retry_idx in range(retries):
response = None
try:
response = self.httpx_client.post(
url,
content=content,
files=files,
json=json,
params=params,
headers=headers,
timeout=timeout,
)
if response.status_code != httpx.codes.OK:
self._check_version()
Api._raise_for_status_httpx(response)
return response
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
if (
isinstance(exc, httpx.HTTPStatusError)
and response.status_code == 400
and self.token is None
):
self.logger.warning(
"API_TOKEN env variable is undefined. See more: https://developer.supervisely.com/getting-started/basics-of-authentication"
)
if raise_error:
raise exc
else:
process_requests_exception(
self.logger,
exc,
method,
url,
verbose=True,
swallow_exc=True,
sleep_sec=min(self.retry_sleep_sec * (2**retry_idx), 60),
response=response,
retry_info={"retry_idx": retry_idx + 1, "retry_limit": retries},
)
except Exception as exc:
process_unhandled_request(self.logger, exc)
raise httpx.RequestError(
f"Retry limit exceeded ({url})",
request=getattr(response, "request", None),
)
[docs] def get_httpx(
self,
method: str,
params: httpx._types.QueryParamTypes,
retries: Optional[int] = None,
use_public_api: Optional[bool] = True,
timeout: httpx._types.TimeoutTypes = 60,
) -> httpx.Response:
"""
Performs GET request to server with given parameters.
:param method: Method name.
:type method: str
:param params: URL query parameters.
:type params: httpx._types.QueryParamTypes
:param retries: The number of attempts to connect to the server.
:type retries: int, optional
:param use_public_api: Define if public API should be used. Default is True.
:type use_public_api: bool, optional
:param timeout: Overall timeout for the request.
:type timeout: float, optional
:return: Response object
:rtype: :class:`Response<Response>`
"""
self._set_client()
if retries is None:
retries = self.retry_count
url = self.api_server_address + "/v3/" + method
if use_public_api is False:
url = os.path.join(self.server_address, method)
logger.trace(f"GET {url}")
if isinstance(params, Dict):
request_params = {**params, **self.additional_fields}
else:
request_params = params
for retry_idx in range(retries):
response = None
try:
response = self.httpx_client.get(
url,
params=request_params,
headers=self.headers,
timeout=timeout,
)
if response.status_code != httpx.codes.OK:
Api._raise_for_status_httpx(response)
return response
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
if (
isinstance(exc, httpx.HTTPStatusError)
and response.status_code == 400
and self.token is None
):
self.logger.warning(
"API_TOKEN env variable is undefined. See more: https://developer.supervisely.com/getting-started/basics-of-authentication"
)
process_requests_exception(
self.logger,
exc,
method,
url,
verbose=True,
swallow_exc=True,
sleep_sec=min(self.retry_sleep_sec * (2**retry_idx), 60),
response=response,
retry_info={"retry_idx": retry_idx + 2, "retry_limit": retries},
)
except Exception as exc:
process_unhandled_request(self.logger, exc)
[docs] def stream(
self,
method: str,
method_type: Literal["GET", "POST"],
data: Union[bytes, Dict],
headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
range_start: Optional[int] = None,
range_end: Optional[int] = None,
raise_error: Optional[bool] = False,
chunk_size: int = 8192,
use_public_api: Optional[bool] = True,
timeout: httpx._types.TimeoutTypes = 60,
) -> Generator:
"""
Performs streaming GET or POST request to server with given parameters.
Multipart is not supported.
:param method: Method name for the request.
:type method: str
:param method_type: Request type ('GET' or 'POST').
:type method_type: str
:param data: Bytes with data content or dictionary with params.
:type data: bytes or dict
:param headers: Custom headers to include in the request.
:type headers: dict, optional
:param retries: The number of retry attempts.
:type retries: int, optional
:param range_start: Start byte position for streaming.
:type range_start: int, optional
:param range_end: End byte position for streaming.
:type range_end: int, optional
:param raise_error: If True, raise raw error if the request fails.
:type raise_error: bool, optional
:param chunk_size: Size of the chunks to stream.
:type chunk_size: int, optional
:param use_public_api: Define if public API should be used.
:type use_public_api: bool, optional
:param timeout: Overall timeout for the request.
:type timeout: float, optional
:return: Generator object.
:rtype: :class:`Generator`
"""
self._set_client()
if retries is None:
retries = self.retry_count
url = self.api_server_address + "/v3/" + method
if not use_public_api:
url = os.path.join(self.server_address, method)
if headers is None:
headers = self.headers.copy()
else:
headers = {**self.headers, **headers}
logger.trace(f"{method_type} {url}")
if isinstance(data, (bytes, Generator)):
content = data
json_body = None
params = None
elif isinstance(data, Dict):
json_body = {**data, **self.additional_fields}
content = None
params = None
else:
params = data
content = None
json_body = None
if range_start is not None or range_end is not None:
headers["Range"] = f"bytes={range_start or ''}-{range_end or ''}"
logger.debug(f"Setting Range header: {headers['Range']}")
for retry_idx in range(retries):
total_streamed = 0
try:
if method_type == "POST":
response = self.httpx_client.stream(
method_type,
url,
content=content,
json=json_body,
params=params,
headers=headers,
timeout=timeout,
)
elif method_type == "GET":
response = self.httpx_client.stream(
method_type,
url,
params=json_body or params,
headers=headers,
timeout=timeout,
)
else:
raise NotImplementedError(
f"Unsupported method type: {method_type}. Supported types: 'GET', 'POST'"
)
with response as resp:
expected_size = int(resp.headers.get("content-length", 0))
if resp.status_code not in [
httpx.codes.OK,
httpx.codes.PARTIAL_CONTENT,
]:
self._check_version()
Api._raise_for_status_httpx(resp)
hhash = resp.headers.get("x-content-checksum-sha256", None)
try:
for chunk in resp.iter_raw(chunk_size):
yield chunk, hhash
total_streamed += len(chunk)
except Exception as e:
raise RetryableRequestException(repr(e))
if expected_size != 0 and total_streamed != expected_size:
raise ValueError(
f"Streamed size does not match the expected: {total_streamed} != {expected_size}"
)
logger.trace(f"Streamed size: {total_streamed}, expected size: {expected_size}")
return
except (httpx.RequestError, httpx.HTTPStatusError, RetryableRequestException) as e:
if (
isinstance(e, httpx.HTTPStatusError)
and resp.status_code == 400
and self.token is None
):
self.logger.warning(
"API_TOKEN env variable is undefined. See more: https://developer.supervisely.com/getting-started/basics-of-authentication"
)
retry_range_start = total_streamed + (range_start or 0)
if total_streamed != 0:
retry_range_start += 1
headers["Range"] = f"bytes={retry_range_start}-{range_end or ''}"
logger.debug(f"Setting Range header {headers['Range']} for retry")
if raise_error:
raise e
else:
process_requests_exception(
self.logger,
e,
method,
url,
verbose=True,
swallow_exc=True,
sleep_sec=min(self.retry_sleep_sec * (2**retry_idx), 60),
response=locals().get("resp"),
retry_info={"retry_idx": retry_idx + 1, "retry_limit": retries},
)
except Exception as e:
process_unhandled_request(self.logger, e)
raise httpx.RequestError(
message=f"Retry limit exceeded ({url})",
request=resp.request if locals().get("resp") else None,
)
[docs] async def post_async(
self,
method: str,
json: Dict = None,
content: Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]] = None,
files: Union[Mapping] = None,
params: Union[str, bytes] = None,
headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
raise_error: Optional[bool] = False,
timeout: httpx._types.TimeoutTypes = 60,
) -> httpx.Response:
"""
Performs POST request to server with given parameters using httpx.
:param method: Method name.
:type method: str
:param json: Dictionary to send in the body of request.
:type json: dict, optional
:param content: Bytes with data content or dictionary with params.
:type content: bytes or dict, optional
:param files: Files to send in the body of request.
:type files: dict, optional
:param params: URL query parameters.
:type params: str, bytes, optional
:param headers: Custom headers to include in the request.
:type headers: dict, optional
:param retries: The number of attempts to connect to the server.
:type retries: int, optional
:param raise_error: Define, if you'd like to raise error if connection is failed.
:type raise_error: bool, optional
:param timeout: Overall timeout for the request.
:type timeout: float, optional
:return: Response object
:rtype: :class:`httpx.Response`
"""
self._set_async_client()
if retries is None:
retries = self.retry_count
url = self.api_server_address + "/v3/" + method
logger.trace(f"POST {url}")
if headers is None:
headers = self.headers.copy()
else:
headers = {**self.headers, **headers}
for retry_idx in range(retries):
response = None
try:
response = await self.async_httpx_client.post(
url,
content=content,
files=files,
json=json,
params=params,
headers=headers,
timeout=timeout,
)
if response.status_code != httpx.codes.OK:
self._check_version()
Api._raise_for_status_httpx(response)
return response
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
if (
isinstance(exc, httpx.HTTPStatusError)
and response.status_code == 400
and self.token is None
):
self.logger.warning(
"API_TOKEN env variable is undefined. See more: https://developer.supervisely.com/getting-started/basics-of-authentication"
)
if raise_error:
raise exc
else:
await process_requests_exception_async(
self.logger,
exc,
method,
url,
verbose=True,
swallow_exc=True,
sleep_sec=min(self.retry_sleep_sec * (2**retry_idx), 60),
response=response,
retry_info={"retry_idx": retry_idx + 1, "retry_limit": retries},
)
except Exception as exc:
process_unhandled_request(self.logger, exc)
raise httpx.RequestError(
f"Retry limit exceeded ({url})",
request=getattr(response, "request", None),
)
[docs] async def stream_async(
self,
method: str,
method_type: Literal["GET", "POST"],
data: Union[bytes, Dict],
headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
range_start: Optional[int] = None,
range_end: Optional[int] = None,
chunk_size: int = 8192,
use_public_api: Optional[bool] = True,
timeout: httpx._types.TimeoutTypes = 60,
**kwargs,
) -> AsyncGenerator:
"""
Performs asynchronous streaming GET or POST request to server with given parameters.
Yield chunks of data and hash of the whole content to check integrity of the data stream.
:param method: Method name for the request.
:type method: str
:param method_type: Request type ('GET' or 'POST').
:type method_type: str
:param data: Bytes with data content or dictionary with params.
:type data: bytes or dict
:param headers: Custom headers to include in the request.
:type headers: dict, optional
:param retries: The number of retry attempts.
:type retries: int, optional
:param range_start: Start byte position for streaming.
:type range_start: int, optional
:param range_end: End byte position for streaming.
:type range_end: int, optional
:param chunk_size: Size of the chunk to read from the stream. Default is 8192.
:type chunk_size: int, optional
:param use_public_api: Define if public API should be used.
:type use_public_api: bool, optional
:param timeout: Overall timeout for the request.
:type timeout: float, optional
:return: Async generator object.
:rtype: :class:`AsyncGenerator`
"""
self._set_async_client()
if retries is None:
retries = self.retry_count
url = self.api_server_address + "/v3/" + method
if not use_public_api:
url = os.path.join(self.server_address, method)
logger.trace(f"{method_type} {url}")
if headers is None:
headers = self.headers.copy()
else:
headers = {**self.headers, **headers}
params = kwargs.get("params", None)
if "content" in kwargs or "json_body" in kwargs:
content = kwargs.get("content", None)
json_body = kwargs.get("json_body", None)
else:
if isinstance(data, (bytes, Generator)):
content = data
json_body = None
elif isinstance(data, Dict):
json_body = {**data, **self.additional_fields}
content = None
else:
raise ValueError("Data should be either bytes or dict")
if range_start is not None or range_end is not None:
headers["Range"] = f"bytes={range_start or ''}-{range_end or ''}"
logger.debug(f"Setting Range header: {headers['Range']}")
for retry_idx in range(retries):
total_streamed = 0
try:
if method_type == "POST":
response = self.async_httpx_client.stream(
method_type,
url,
content=content,
json=json_body,
headers=headers,
timeout=timeout,
params=params,
)
elif method_type == "GET":
response = self.async_httpx_client.stream(
method_type,
url,
content=content,
json=json_body,
headers=headers,
timeout=timeout,
params=params,
)
else:
raise NotImplementedError(
f"Unsupported method type: {method_type}. Supported types: 'GET', 'POST'"
)
async with response as resp:
expected_size = int(resp.headers.get("content-length", 0))
if resp.status_code not in [
httpx.codes.OK,
httpx.codes.PARTIAL_CONTENT,
]:
self._check_version()
Api._raise_for_status_httpx(resp)
# received hash of the content to check integrity of the data stream
hhash = resp.headers.get("x-content-checksum-sha256", None)
try:
async for chunk in resp.aiter_raw(chunk_size):
yield chunk, hhash
total_streamed += len(chunk)
except Exception as e:
raise RetryableRequestException(repr(e))
if expected_size != 0 and total_streamed != expected_size:
raise ValueError(
f"Streamed size does not match the expected: {total_streamed} != {expected_size}"
)
logger.trace(f"Streamed size: {total_streamed}, expected size: {expected_size}")
return
except (httpx.RequestError, httpx.HTTPStatusError, RetryableRequestException) as e:
if (
isinstance(e, httpx.HTTPStatusError)
and resp.status_code == 400
and self.token is None
):
self.logger.warning(
"API_TOKEN env variable is undefined. See more: https://developer.supervisely.com/getting-started/basics-of-authentication"
)
retry_range_start = total_streamed + (range_start or 0)
if total_streamed != 0:
retry_range_start += 1
headers["Range"] = f"bytes={retry_range_start}-{range_end or ''}"
logger.debug(f"Setting Range header {headers['Range']} for retry")
await process_requests_exception_async(
self.logger,
e,
method,
url,
verbose=True,
swallow_exc=True,
sleep_sec=min(self.retry_sleep_sec * (2**retry_idx), 60),
response=locals().get("resp"),
retry_info={"retry_idx": retry_idx + 1, "retry_limit": retries},
)
except Exception as e:
process_unhandled_request(self.logger, e)
raise httpx.RequestError(
message=f"Retry limit exceeded ({url})",
request=resp.request if locals().get("resp") else None,
)
def _set_async_client(self):
"""
Set async httpx client with HTTP/2 if it is not set yet.
"""
if self.async_httpx_client is None:
self.async_httpx_client = httpx.AsyncClient(http2=True)
def _set_client(self):
"""
Set sync httpx client with HTTP/2 if it is not set yet.
"""
if self.httpx_client is None:
self.httpx_client = httpx.Client(http2=True)
[docs] def get_default_semaphore(self) -> asyncio.Semaphore:
"""
Get default global API semaphore for async requests.
If the semaphore is not set, it will be initialized.
During initialization, the semaphore size will be set from the environment variable SUPERVISELY_ASYNC_SEMAPHORE.
If the environment variable is not set, the default value will be set based on the server address.
Depending on the server address, the semaphore size will be set to 10 for HTTPS and 5 for HTTP.
:return: Semaphore object.
:rtype: :class:`asyncio.Semaphore`
"""
if self._semaphore is None:
self._initialize_semaphore()
return self._semaphore
def _initialize_semaphore(self):
"""
Initialize the semaphore for async requests.
If the environment variable SUPERVISELY_ASYNC_SEMAPHORE is set, create a semaphore with the given value.
Otherwise, create a semaphore with a default value:
- If server supports HTTPS, create a semaphore with value 10.
- If server supports HTTP, create a semaphore with value 5.
"""
semaphore_size = sly_env.semaphore_size()
if semaphore_size is not None:
self._semaphore = asyncio.Semaphore(semaphore_size)
logger.debug(
f"Setting global API semaphore size to {semaphore_size} from environment variable"
)
else:
if not self._skip_https_redirect_check:
self._check_https_redirect()
if self.server_address.startswith("https://"):
size = 10
if "app.supervise" in self.server_address:
size = 7
logger.debug(f"Setting global API semaphore size to {size} for HTTPS")
else:
size = 5
logger.debug(f"Setting global API semaphore size to {size} for HTTP")
self._semaphore = asyncio.Semaphore(size)
[docs] def set_semaphore_size(self, size: int = None):
"""
Set the global API semaphore with the given size. Will replace the existing semaphore.
If the size is not set, will set from the environment variable SUPERVISELY_ASYNC_SEMAPHORE.
If the environment variable is not set, will set the default value.
:param size: Size of the semaphore.
:type size: int, optional
"""
if size is not None:
self._semaphore = asyncio.Semaphore(size)
else:
self._initialize_semaphore()
@property
def semaphore(self) -> asyncio.Semaphore:
"""
Get the global API semaphore for async requests.
:return: Semaphore object.
:rtype: :class:`asyncio.Semaphore`
"""
return self._semaphore