# coding: utf-8
"""api connection to the server which allows user to communicate with Supervisely"""
from __future__ import annotations
import datetime
import gc
import glob
import json
import os
import shutil
from logging import Logger
from pathlib import Path
from typing import Dict, Optional
from urllib.parse import urljoin, urlparse
import jwt
import requests
from dotenv import get_key, load_dotenv, set_key
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
import supervisely.api.advanced_api as advanced_api
import supervisely.api.agent_api as agent_api
import supervisely.api.annotation_api as annotation_api
import supervisely.api.app_api as app_api
import supervisely.api.dataset_api as dataset_api
import supervisely.api.file_api as file_api
import supervisely.api.github_api as github_api
import supervisely.api.image_annotation_tool_api as image_annotation_tool_api
import supervisely.api.image_api as image_api
import supervisely.api.import_storage_api as import_stoarge_api
import supervisely.api.labeling_job_api as labeling_job_api
import supervisely.api.neural_network_api as neural_network_api
import supervisely.api.object_class_api as object_class_api
import supervisely.api.plugin_api as plugin_api
import supervisely.api.pointcloud.pointcloud_api as pointcloud_api
import supervisely.api.pointcloud.pointcloud_episode_api as pointcloud_episode_api
import supervisely.api.project_api as project_api
import supervisely.api.remote_storage_api as remote_storage_api
import supervisely.api.report_api as report_api
import supervisely.api.role_api as role_api
import supervisely.api.storage_api as storage_api
import supervisely.api.task_api as task_api
import supervisely.api.team_api as team_api
import supervisely.api.user_api as user_api
import supervisely.api.video.video_api as video_api
import supervisely.api.video_annotation_tool_api as video_annotation_tool_api
import supervisely.api.volume.volume_api as volume_api
import supervisely.api.workspace_api as workspace_api
import supervisely.io.env as sly_env
from supervisely._utils import camel_to_snake, is_development
from supervisely.io.network_exceptions import (
process_requests_exception,
process_unhandled_request,
)
from supervisely.sly_logger import logger
SUPERVISELY_TASK_ID = "SUPERVISELY_TASK_ID"
SUPERVISELY_PUBLIC_API_RETRIES = "SUPERVISELY_PUBLIC_API_RETRIES"
SUPERVISELY_PUBLIC_API_RETRY_SLEEP_SEC = "SUPERVISELY_PUBLIC_API_RETRY_SLEEP_SEC"
SERVER_ADDRESS = "SERVER_ADDRESS"
SUPERVISELY_API_SERVER_ADDRESS = "SUPERVISELY_API_SERVER_ADDRESS"
API_TOKEN = "API_TOKEN"
TASK_ID = "TASK_ID"
SUPERVISELY_ENV_FILE = os.path.join(Path.home(), "supervisely.env")
class UserSession:
"""
UserSession object contains info that is returned after user authentication.
:param server: Server url.
:type server: str
:raises: :class:`RuntimeError`, if server url is invalid.
"""
def __init__(self, server_address: str):
self.api_token = None
self.team_id = None
self.workspace_id = None
self.server_address = server_address
if not self._normalize_and_validate_server_url():
raise RuntimeError(f"Invalid server url: {server_address}")
def __str__(self):
return f"UserSession(server={self.server_address})"
def __repr__(self):
return self.__str__()
def _normalize_and_validate_server_url(self) -> bool:
"""
Validate server url.
:return: True if server url is valid, False otherwise.
"""
self.server_address = Api.normalize_server_address(self.server_address)
if not self.server_address.startswith("https://"):
response = requests.get(self.server_address, allow_redirects=False)
if (300 <= response.status_code < 400) or (
response.headers.get("Location", "").startswith("https://")
):
self.server_address = self.server_address.replace("http://", "https://")
result = urlparse(self.server_address)
if all([result.scheme, result.netloc]):
try:
response = requests.get(self.server_address)
if response.status_code == 200:
return True
except requests.RequestException:
pass
return False
def _setattrs_user_session(self, decoded_token):
"""
Add decoded info to UserSession object.
:param decoded_token: Decoded token.
:type decoded_token: dict
:return: None
:rtype: :class:`NoneType`
"""
for key, value in decoded_token.items():
if key == "group":
self.team = value
self.team_id = value["id"]
elif key == "workspace":
self.workspace = value
self.workspace_id = value["id"]
else:
key = camel_to_snake(key)
setattr(self, key, value)
def log_in(self, login: str, password: str) -> UserSession:
"""
Authenticate user and return UserSession object with decoded info from JWT token.
:param login: User login.
:type login: str
:param password: User password.
:type password: str
:return: UserSession object
:rtype: :class:`UserSession`
"""
login_url = urljoin(self.server_address, "api/account")
payload = {"login": login, "password": password}
response = requests.post(login_url, data=payload)
del password
gc.collect()
if response.status_code == 200:
data = response.json()
jwt_token = data.get("token", None)
decoded_token = jwt.decode(jwt_token, options={"verify_signature": False})
self._setattrs_user_session(decoded_token)
return self
else:
raise RuntimeError(f"Failed to authenticate user: status code {response.status_code}")
[docs]class Api:
"""
An API connection to the server with which you can communicate with your teams, workspaces and projects. :class:`Api<Api>` object is immutable.
:param server_address: Address of the server.
:type server_address: str
:param token: Unique secret token associated with your agent.
:type token: str
:param retry_count: The number of attempts to connect to the server.
:type retry_count: int, optional
:param retry_sleep_sec: The number of seconds to delay between attempts to connect to the server.
:type retry_sleep_sec: int, optional
:param external_logger: Logger class object.
:type external_logger: logger, optional
:param ignore_task_id:
:type ignore_task_id: bool, optional
:raises: :class:`ValueError`, if token is None or it length != 128
:Usage example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
# Pass values into the API constructor (optional, not recommended)
# api = sly.Api(server_address="https://app.supervise.ly", token="4r47N...xaTatb")
"""
def __init__(
self,
server_address: str = None,
token: str = None,
retry_count: Optional[int] = 10,
retry_sleep_sec: Optional[int] = None,
external_logger: Optional[Logger] = None,
ignore_task_id: Optional[bool] = False,
api_server_address: str = None,
):
if server_address is None and token is None:
server_address = os.environ.get(SERVER_ADDRESS, None)
token = os.environ.get(API_TOKEN, None)
if server_address is None:
raise ValueError(
"SERVER_ADDRESS env variable is undefined, https://developer.supervise.ly/getting-started/basics-of-authentication"
)
if token is None:
raise ValueError(
"API_TOKEN env variable is undefined, https://developer.supervise.ly/getting-started/basics-of-authentication"
)
self.server_address = Api.normalize_server_address(server_address)
self._api_server_address = None
if api_server_address is None:
api_server_address = os.environ.get(SUPERVISELY_API_SERVER_ADDRESS, None)
if api_server_address is not None:
self._api_server_address = Api.normalize_server_address(api_server_address)
if retry_count is None:
retry_count = int(os.getenv(SUPERVISELY_PUBLIC_API_RETRIES, "10"))
if retry_sleep_sec is None:
retry_sleep_sec = int(os.getenv(SUPERVISELY_PUBLIC_API_RETRY_SLEEP_SEC, "1"))
if len(token) != 128:
raise ValueError("Invalid token {!r}: length != 128".format(token))
self.token = token
self.headers = {"x-api-key": token}
self.task_id = os.getenv(SUPERVISELY_TASK_ID)
if self.task_id is not None and ignore_task_id is False:
self.headers["x-task-id"] = self.task_id
self.context = {}
self.additional_fields = {}
self.team = team_api.TeamApi(self)
self.workspace = workspace_api.WorkspaceApi(self)
self.project = project_api.ProjectApi(self)
self.model = neural_network_api.NeuralNetworkApi(self)
self.task = task_api.TaskApi(self)
self.dataset = dataset_api.DatasetApi(self)
self.image = image_api.ImageApi(self)
self.annotation = annotation_api.AnnotationApi(self)
self.plugin = plugin_api.PluginApi(self)
self.agent = agent_api.AgentApi(self)
self.role = role_api.RoleApi(self)
self.user = user_api.UserApi(self)
self.labeling_job = labeling_job_api.LabelingJobApi(self)
self.video = video_api.VideoApi(self)
# self.project_class = project_class_api.ProjectClassApi(self)
self.object_class = object_class_api.ObjectClassApi(self)
self.report = report_api.ReportApi(self)
self.pointcloud = pointcloud_api.PointcloudApi(self)
self.pointcloud_episode = pointcloud_episode_api.PointcloudEpisodeApi(self)
self.app = app_api.AppApi(self)
self.file = file_api.FileApi(self)
self.storage = storage_api.StorageApi(self)
self.img_ann_tool = image_annotation_tool_api.ImageAnnotationToolApi(self)
self.vid_ann_tool = video_annotation_tool_api.VideoAnnotationToolApi(self)
self.advanced = advanced_api.AdvancedApi(self)
self.import_storage = import_stoarge_api.ImportStorageApi(self)
self.remote_storage = remote_storage_api.RemoteStorageApi(self)
self.github = github_api.GithubApi(self)
self.volume = volume_api.VolumeApi(self)
self.retry_count = retry_count
self.retry_sleep_sec = retry_sleep_sec
self.logger = external_logger or logger
self._require_https_redirect_check = not self.server_address.startswith("https://")
@classmethod
def normalize_server_address(cls, server_address: str) -> str:
""" """
result = server_address.strip("/")
if ("http://" not in result) and ("https://" not in result):
result = "http://" + result
return result
[docs] @classmethod
def from_env(
cls,
retry_count: int = 10,
ignore_task_id: bool = False,
env_file: str = SUPERVISELY_ENV_FILE,
) -> Api:
"""
Initialize API use environment variables.
:param retry_count: The number of attempts to connect to the server.
:type retry_count: int
:param ignore_task_id:
:type ignore_task_id: bool
:param path: Path to your .env file.
:type path: str
:return: Api object
:rtype: :class:`Api<supervisely.api.api.Api>`
:Usage example:
.. code-block:: python
import supervisely as sly
os.environ['SERVER_ADDRESS'] = 'https://app.supervise.ly'
os.environ['API_TOKEN'] = 'Your Supervisely API Token'
api = sly.Api.from_env()
# alternatively you can store SERVER_ADDRESS and API_TOKEN
# in "~/supervisely.env" .env file
# Learn more here: https://developer.supervise.ly/app-development/basics/add-private-app#create-.env-file-supervisely.env-with-the-following-content-learn-more-here
api = sly.Api.from_env()
"""
server_address = sly_env.server_address(raise_not_found=False)
token = sly_env.api_token(raise_not_found=False)
if is_development() and None in (server_address, token):
env_path = os.path.expanduser(env_file)
if os.path.exists(env_path):
_, extension = os.path.splitext(env_path)
if extension == ".env":
load_dotenv(env_path)
server_address = sly_env.server_address()
token = sly_env.api_token()
else:
raise ValueError(f"'{env_path}' is not an '*.env' file")
else:
raise FileNotFoundError(f"File not found: '{env_path}'")
if server_address is None:
raise ValueError(
"SERVER_ADDRESS env variable is undefined. Learn more here: https://developer.supervise.ly/getting-started/basics-of-authentication"
)
if token is None:
raise ValueError(
"API_TOKEN env variable is undefined. Learn more here: https://developer.supervise.ly/getting-started/basics-of-authentication"
)
return cls(
server_address,
token,
retry_count=retry_count,
ignore_task_id=ignore_task_id,
)
[docs] def add_additional_field(self, key: str, value: str) -> None:
"""
Add given key and value to additional_fields dictionary.
:param key: New key.
:type key: str
:param value: New value.
:type value: str
:return: None
:rtype: :class:`NoneType`
"""
self.additional_fields[key] = value
[docs] def post(
self,
method: str,
data: Dict,
retries: Optional[int] = None,
stream: Optional[bool] = False,
raise_error: Optional[bool] = False,
) -> requests.Response:
"""
Performs POST request to server with given parameters.
:param method: Method name.
:type method: str
:param data: Dictionary to send in the body of the :class:`Request`.
:type data: dict
:param retries: The number of attempts to connect to the server.
:type retries: int, optional
:param stream: Define, if you'd like to get the raw socket response from the server.
:type stream: bool, optional
:param raise_error: Define, if you'd like to raise error if connection is failed. Retries will be ignored.
:type raise_error: bool, optional
:return: Response object
:rtype: :class:`Response<Response>`
"""
self._check_https_redirect()
if retries is None:
retries = self.retry_count
url = self.api_server_address + "/v3/" + method
logger.trace(f"POST {url}")
for retry_idx in range(retries):
response = None
try:
if type(data) is bytes:
response = requests.post(url, data=data, headers=self.headers, stream=stream)
elif type(data) is MultipartEncoderMonitor or type(data) is MultipartEncoder:
response = requests.post(
url,
data=data,
headers={**self.headers, "Content-Type": data.content_type},
stream=stream,
)
else:
json_body = data
if type(data) is dict:
json_body = {**data, **self.additional_fields}
response = requests.post(
url, json=json_body, headers=self.headers, stream=stream
)
if response.status_code != requests.codes.ok: # pylint: disable=no-member
Api._raise_for_status(response)
return response
except requests.RequestException as exc:
if raise_error:
raise exc
else:
process_requests_exception(
self.logger,
exc,
method,
url,
verbose=True,
swallow_exc=True,
sleep_sec=min(self.retry_sleep_sec * (2**retry_idx), 60),
response=response,
retry_info={"retry_idx": retry_idx + 1, "retry_limit": retries},
)
except Exception as exc:
process_unhandled_request(self.logger, exc)
raise requests.exceptions.RetryError("Retry limit exceeded ({!r})".format(url))
[docs] def get(
self,
method: str,
params: Dict,
retries: Optional[int] = None,
stream: Optional[bool] = False,
use_public_api: Optional[bool] = True,
) -> requests.Response:
"""
Performs GET request to server with given parameters.
:param method:
:type method: str
:param params: Dictionary to send in the body of the :class:`Request`.
:type method: dict
:param retries: The number of attempts to connect to the server.
:type method: int, optional
:param stream: Define, if you'd like to get the raw socket response from the server.
:type method: bool, optional
:param use_public_api:
:type method: bool, optional
:return: Response object
:rtype: :class:`Response<Response>`
"""
self._check_https_redirect()
if retries is None:
retries = self.retry_count
url = self.api_server_address + "/v3/" + method
if use_public_api is False:
url = os.path.join(self.server_address, method)
logger.trace(f"GET {url}")
for retry_idx in range(retries):
response = None
try:
json_body = params
if type(params) is dict:
json_body = {**params, **self.additional_fields}
response = requests.get(url, params=json_body, headers=self.headers, stream=stream)
if response.status_code != requests.codes.ok: # pylint: disable=no-member
Api._raise_for_status(response)
return response
except requests.RequestException as exc:
process_requests_exception(
self.logger,
exc,
method,
url,
verbose=True,
swallow_exc=True,
sleep_sec=min(self.retry_sleep_sec * (2**retry_idx), 60),
response=response,
retry_info={"retry_idx": retry_idx + 2, "retry_limit": retries},
)
except Exception as exc:
process_unhandled_request(self.logger, exc)
@staticmethod
def _raise_for_status(response):
"""
Raise error and show message with error code if given response can not connect to server.
:param response: Request class object
"""
http_error_msg = ""
if isinstance(response.reason, bytes):
try:
reason = response.reason.decode("utf-8")
except UnicodeDecodeError:
reason = response.reason.decode("iso-8859-1")
else:
reason = response.reason
if 400 <= response.status_code < 500:
http_error_msg = "%s Client Error: %s for url: %s (%s)" % (
response.status_code,
reason,
response.url,
response.content.decode("utf-8"),
)
elif 500 <= response.status_code < 600:
http_error_msg = "%s Server Error: %s for url: %s (%s)" % (
response.status_code,
reason,
response.url,
response.content.decode("utf-8"),
)
if http_error_msg:
raise requests.exceptions.HTTPError(http_error_msg, response=response)
[docs] @staticmethod
def parse_error(
response: requests.Response,
default_error: Optional[str] = "Error",
default_message: Optional[str] = "please, contact administrator",
):
"""
Processes error from response.
:param response: Request object.
:type method: Request
:param default_error: Error description.
:type method: str, optional
:param default_message: Message to user.
:type method: str, optional
:return: Number of error and message about curren connection mistake
:rtype: :class:`int`, :class:`str`
"""
ERROR_FIELD = "error"
MESSAGE_FIELD = "message"
DETAILS_FIELD = "details"
try:
data_str = response.content.decode("utf-8")
data = json.loads(data_str)
error = data.get(ERROR_FIELD, default_error)
details = data.get(DETAILS_FIELD, {})
if type(details) is dict:
message = details.get(MESSAGE_FIELD, default_message)
else:
message = details[0].get(MESSAGE_FIELD, default_message)
return error, message
except Exception as e:
return "", ""
def pop_header(self, key: str) -> str:
""" """
if key not in self.headers:
raise KeyError(f"Header {key!r} not found")
return self.headers.pop(key)
def _check_https_redirect(self):
if self._require_https_redirect_check is True:
response = requests.get(self.server_address, allow_redirects=False)
if (300 <= response.status_code < 400) or (
response.headers.get("Location", "").startswith("https://")
):
self.server_address = self.server_address.replace("http://", "https://")
msg = (
"You're using HTTP server address while the server requires HTTPS. "
"Supervisely automatically changed the server address to HTTPS for you. "
f"Consider updating your server address to {self.server_address}"
)
self.logger.warn(msg)
self._require_https_redirect_check = False
[docs] @classmethod
def from_credentials(
cls,
server_address: str,
login: str,
password: str,
override: bool = False,
env_file: str = SUPERVISELY_ENV_FILE,
) -> Api:
"""
Create Api object using credentials and optionally save them to ".env" file with overriding environment variables.
If ".env" file already exists, backup will be created automatically.
All backups will be stored in the same directory with postfix "_YYYYMMDDHHMMSS". You can have not more than 5 last backups.
This method can be used also to update ".env" file.
:param server_address: Supervisely server url.
:type server_address: str
:param login: User login.
:type login: str
:param password: User password.
:type password: str
:param override: If False, return Api object. If True, additionally create ".env" file or overwrite existing (backup file will be created automatically), and override environment variables.
:type override: bool, optional
:param env_file: Path to your .env file.
:type env_file: str, optional
:return: Api object
:Usage example:
.. code-block:: python
import supervisely as sly
server_address = 'https://app.supervisely.com'
login = 'user'
password = 'pass'
api = sly.Api.from_credentials(server_address, login, password)
"""
session = UserSession(server_address).log_in(login, password)
del password
gc.collect()
api = cls(session.server_address, session.api_token, ignore_task_id=True)
if override:
if os.path.isfile(env_file):
# create backup
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
backup_file = f"{env_file}_{timestamp}"
shutil.copy2(env_file, backup_file)
if api.token != get_key(env_file, API_TOKEN):
# create new file
os.remove(env_file)
Path(env_file).touch()
# remove old backups
all_backups = sorted(glob.glob(f"{env_file}_" + "[0-9]" * 14))
while len(all_backups) > 5:
os.remove(all_backups.pop(0))
set_key(env_file, SERVER_ADDRESS, session.server_address)
set_key(env_file, API_TOKEN, session.api_token)
if session.team_id:
set_key(env_file, "INIT_GROUP_ID", f"{session.team_id}")
if session.workspace_id:
set_key(env_file, "INIT_WORKSPACE_ID", f"{session.workspace_id}")
load_dotenv(env_file, override=override)
return api
@property
def api_server_address(self) -> str:
"""
Get API server address.
:return: API server address.
:rtype: :class:`str`
:Usage example:
.. code-block:: python
import supervisely as sly
api = sly.Api(server_address='https://app.supervisely.com', token='4r47N...xaTatb')
print(api.api_server_address)
# Output:
# 'https://app.supervisely.com/public/api'
"""
if self._api_server_address is not None:
return self._api_server_address
return f"{self.server_address}/public/api"