# coding: utf-8
import mimetypes
from typing import Callable, List, Optional, Union
from requests_toolbelt import MultipartEncoder
from tqdm import tqdm
from supervisely import logger
from supervisely.api.module_api import ApiField, ModuleApiBase
from supervisely.collection.str_enum import StrEnum
from supervisely.io import env
from supervisely.io.fs import ensure_base_path, get_file_name_with_ext
class Provider(StrEnum):
"""Remote storage provider prefixes supported in Supervisely links (s3://, gcs://, etc.)."""
S3 = "s3"
"""S3"""
GOOGLE = "google"
"""GOOGLE"""
AZURE = "azure"
"""AZURE"""
FS = "fs"
"""FS"""
MINIO = "minio"
"""MINIO"""
GCS = "gcs"
"""GCS"""
@staticmethod
def validate_path(path: str):
if (
not path.startswith(str(Provider.S3))
and not path.startswith(str(Provider.GOOGLE))
and not path.startswith(str(Provider.AZURE))
and not path.startswith(str(Provider.FS))
and not path.startswith(str(Provider.MINIO))
and not path.startswith(str(Provider.GCS))
):
prefix = path.split("://")[0]
raise ValueError(
f"Incorrect cloud provider '{prefix}' in path, learn more here: https://docs.supervisely.com/enterprise-edition/advanced-tuning/s3#links-plugin-cloud-providers-support"
)
[docs]
class RemoteStorageApi(ModuleApiBase):
"""API for listing and downloading files from remote/cloud storages connected to a team."""
def _convert_json_info(self, info: dict):
"""_convert_json_info"""
return info
[docs]
def get_file_info_by_path(
self,
path: str,
team_id: int = None,
) -> Optional[dict]:
"""
Get info about file for given remote path.
:param path: Remote path to file.
:type path: str
:param team_id: Team ID (to get cloud storages connected to the team)
:type team_id: int
:returns: file info in the given remote path
:rtype: Optional[dict]
"""
team_id = team_id or env.team_id(raise_not_found=False)
Provider.validate_path(path)
path = path.rstrip("/")
json_body = {
ApiField.PATH: path,
ApiField.RECURSIVE: False,
ApiField.FILES: True,
ApiField.FOLDERS: False,
ApiField.LIMIT: 1,
"startAfter": "",
}
if team_id is not None:
json_body[ApiField.GROUP_ID] = team_id
resp = self._api.get("remote-storage.list", json_body)
if resp is None:
return None
return resp.json()[0]
[docs]
def list(
self,
path: str,
recursive: bool = True,
files: bool = True,
folders: bool = True,
limit: int = 10000,
start_after: str = "",
team_id: int = None,
) -> list:
"""
List files and directories for given remote path.
:param path: Remote path with items that you want to list.
:type path: str
:param recursive: List remote path revursively.
:type recursive: bool
:param files: List files in the given path.
:type files: bool
:param folders: List folders in the given path.
:type folders: bool
:param limit: Limit of files to list. 10000 is the maximum limit.
:type limit: int
:param start_after: Start listing path after given file name.
:type start_after: str
:param team_id: Team ID (to get cloud storages connected to the team)
:type team_id: int
:returns: List of files in the given remote path
:rtype: list
"""
team_id = team_id or env.team_id(raise_not_found=False)
Provider.validate_path(path)
path = path.rstrip("/") + "/"
json_body = {
ApiField.PATH: path,
ApiField.RECURSIVE: recursive,
ApiField.FILES: files,
ApiField.FOLDERS: folders,
ApiField.LIMIT: limit,
"startAfter": start_after,
}
if team_id is not None:
json_body[ApiField.GROUP_ID] = team_id
resp = self._api.get("remote-storage.list", json_body)
if resp is None:
return []
return resp.json()
[docs]
def download_path(
self,
remote_path: str,
save_path: str,
progress_cb: Optional[Union[tqdm, Callable]] = None,
team_id: int = None,
):
"""
Downloads item from given remote path to given local path.
:param remote_path: Remote path to item that you want to download.
:type remote_path: str
:param save_path: Local save path.
:type save_path: str
:param progress_cb: Progress function to download.
:type progress_cb: tqdm or callable, optional
:param team_id: Team ID (to get cloud storages connected to the team)
:type team_id: int
.. code-block:: python
provider = "s3" # can be one of ["s3", "google", "azure"]
bucket = "bucket-test-export"
path_in_bucket = "/demo/image.jpg"
remote_path = api.remote_storage.get_remote_path(provider, bucket, path_in_bucket)
# or alternatively use this:
# remote_path = f"{provider}://{bucket}{path_in_bucket}"
api.remote_storage.download_path(local_path="images/my-cats.jpg", remote_path=remote_path)
"""
team_id = team_id or env.team_id(raise_not_found=False)
Provider.validate_path(remote_path)
ensure_base_path(save_path)
json_body = {ApiField.LINK: remote_path}
if team_id is not None:
json_body[ApiField.GROUP_ID] = team_id
response = self._api.post("remote-storage.download", json_body, stream=True)
# if "Content-Length" in response.headers:
# length = int(response.headers['Content-Length'])
with open(save_path, "wb") as fd:
for chunk in response.iter_content(chunk_size=1024 * 1024):
fd.write(chunk)
if progress_cb is not None:
progress_cb(len(chunk))
[docs]
def upload_path(self, local_path: str, remote_path: str, team_id: int = None):
"""
Uploads item from given local path to given remote path.
:param local_path: Local path to item that you want to upload.
:type local_path: str
:param remote_path: Remote destination path.
:type remote_path: str
:param team_id: Team ID (to get cloud storages connected to the team)
:type team_id: int
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
provider = "s3" # can be one of ["s3", "google", "azure"]
bucket = "bucket-test-export"
path_in_bucket = "/demo/image.jpg"
remote_path = api.remote_storage.get_remote_path(provider, bucket, path_in_bucket)
team_id = 123
# or alternatively use this:
# remote_path = f"{provider}://{bucket}{path_in_bucket}"
api.remote_storage.upload_path("images/my-cats.jpg", remote_path, team_id)
"""
Provider.validate_path(remote_path)
return self._upload_paths_batch([local_path], [remote_path], team_id)
def _upload_paths_batch(self, local_paths, remote_paths, team_id: int = None):
"""_upload_paths_batch"""
team_id = team_id or env.team_id(raise_not_found=False)
if len(local_paths) != len(remote_paths):
raise ValueError("Inconsistency in paths, len(local_paths) != len(remote_paths)")
if len(local_paths) == 0:
return {}
def path_to_bytes_stream(path):
return open(path, "rb")
content = []
for idx, (src, dst) in enumerate(zip(local_paths, remote_paths)):
content.append((ApiField.PATH, dst))
name = get_file_name_with_ext(dst)
content.append(
(
"file",
(
name,
path_to_bytes_stream(src),
mimetypes.MimeTypes().guess_type(src)[0],
),
)
)
encoder = MultipartEncoder(fields=content)
url = f"remote-storage.upload"
if team_id is not None:
url += f"?teamId={team_id}"
resp = self._api.post(url, encoder)
return resp.json()
[docs]
def get_remote_path(self, provider: str, bucket: str, path_in_bucket: str) -> str:
"""
Returns remote path.
:param provider: Can be one of "s3", "google", "azure".
:type provider: str
:param bucket: Name of the bucket container.
:type bucket: str
:param path_in_bucket: Path to item in bucket.
:type path_in_bucket: str
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
provider = "s3"
bucket = "bucket-test-export"
path_in_bucket = "/demo/image.jpg"
remote_path = api.remote_storage.get_remote_path(provider, bucket, path_in_bucket)
# Output: s3://bucket-test-export/demo/image.jpg
"""
res_path = f"{provider}://{bucket}/{path_in_bucket.lstrip('/')}"
Provider.validate_path(res_path)
return res_path
[docs]
def get_list_available_providers(
self,
team_id: int = None,
) -> List[dict]:
"""
Get the list of available providers for the instance.
:param team_id: Team ID (to get cloud storages connected to the team)
:type team_id: int
:returns: List of available providers
:rtype: List[dict]
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
team_id = 123
available_providers = api.remote_storage.get_list_available_providers(team_id)
# Output example
# [
# {
# "id": "minio",
# "name": "Amazon S3",
# "defaultProtocol": "s3:",
# "protocols": [
# "s3:",
# "minio:"
# ],
# "buckets": [
# "bucket-test",
# "remote-img"
# ]
# }
# ]
"""
team_id = team_id or env.team_id(raise_not_found=False)
json_body = {}
if team_id is not None:
json_body[ApiField.GROUP_ID] = team_id
resp = self._api.get("remote-storage.available_providers", json_body)
return resp.json()
[docs]
def get_list_supported_providers(
self,
team_id: int = None,
) -> List[dict]:
"""
Get the list of supported providers for the instance.
:param team_id: Team ID (to get cloud storages connected to the team)
:type team_id: int
:returns: List of supported providers
:rtype: List[dict]
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
team_id = 123
supported_providers = api.remote_storage.get_list_supported_providers(team_id)
# Output example
# [
# {
# "id": "google",
# "name": "Google Cloud Storage",
# "defaultProtocol": "google:",
# "protocols": [
# "google:",
# "gcs:"
# ]
# }
# ]
"""
team_id = team_id or env.team_id(raise_not_found=False)
json_body = {}
if team_id is not None:
json_body[ApiField.GROUP_ID] = team_id
resp = self._api.get("remote-storage.supported_providers", json_body)
return resp.json()
[docs]
def is_path_exist(
self,
path: str,
team_id: int = None,
) -> bool:
"""
Check if the file path exists.
:param path: URL of the file in the bucket storage
:type path: str
:param team_id: Team ID (to get cloud storages connected to the team)
:type team_id: int
:returns: True if the file exists, False otherwise
:rtype: bool
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
path = "s3://bucket/lemons/ds1/img/IMG_444.jpeg"
is_exist = api.remote_storage.is_path_exist(path)
"""
team_id = team_id or env.team_id(raise_not_found=False)
Provider.validate_path(path)
json_body = {ApiField.PATH: path}
if team_id is not None:
json_body[ApiField.GROUP_ID] = team_id
resp = self._api.get("remote-storage.exists", json_body)
if resp is None:
return False
resp = resp.json()
if resp.get("exists"):
return True
else:
return False
[docs]
def get_path_stats(
self,
path: str,
team_id: int = None,
) -> Optional[dict]:
"""
Get information about file size and the date of its last modification in bucket storage.
:param path: URL of the file in the bucket storage
:type path: str
:param team_id: Team ID (to get cloud storages connected to the team)
:type team_id: int
:returns: File 'size' in bytes and 'lastModified' date if file exists, otherwise None
:rtype: Optional[dict]
:Usage Example:
.. code-block:: python
import os
from dotenv import load_dotenv
import supervisely as sly
# Load secrets and create API object from .env file (recommended)
# Learn more here: https://developer.supervisely.com/getting-started/basics-of-authentication
if sly.is_development():
load_dotenv(os.path.expanduser("~/supervisely.env"))
api = sly.Api.from_env()
path = "s3://bucket/lemons/ds1/img/IMG_444.jpeg"
team_id = 123
stats = api.remote_storage.get_path_stats(path, team_id)
# Output example
# {
# "size": 155790,
# "lastModified": "2023-01-26T09:20:27.000Z"
# }
"""
team_id = team_id or env.team_id(raise_not_found=False)
json_body = {ApiField.PATH: path}
if team_id is not None:
json_body[ApiField.GROUP_ID] = team_id
if self.is_path_exist(path, team_id):
resp = self._api.get("remote-storage.stat", json_body)
return resp.json()
else:
path_folers = path.split("/")[3:]
file_path = ""
for part in path_folers:
file_path += part + "/"
file_path = file_path.rstrip("/")
logger.warning(f"The file doesn't exist! Check the path: {file_path}")
return None
[docs]
@staticmethod
def is_bucket_url(url: str) -> bool:
"""
Check if the URL is a bucket URL.
:param url: URL
:type url: str
:returns: True if URL is a bucket URL, False otherwise
:rtype: bool
:Usage Example:
.. code-block:: python
from supervisely.api.remote_storage_api import RemoteStorageApi
url = "s3://bucket/lemons/ds1/img/IMG_444.jpeg"
RemoteStorageApi.is_bucket_url(url)
"""
provider_protocols = [
Provider.S3.value,
Provider.MINIO.value,
Provider.GOOGLE.value,
Provider.GCS.value,
Provider.AZURE.value,
Provider.FS.value,
]
return any(url.startswith(protocol + "://") for protocol in provider_protocols)