"""
Exceptions module
"""
import http
import json
from contextlib import contextmanager
from typing import Optional, Callable
import logging
import lakefs_sdk.exceptions
from urllib3 import HTTPResponse
[docs]
class LakeFSException(Exception):
"""
Base exception for all SDK exceptions
"""
[docs]
class ServerException(LakeFSException):
"""
Generic exception when no other exception is applicable
"""
status_code: Optional[int]
reason: Optional[str]
body: dict
headers: dict
def __init__(self, status=None, reason=None, body=None, headers=None):
self.status_code = status
self.reason = reason
self.headers = headers if headers is not None else {}
if body is not None:
try: # Try to get message from body
self.body = json.loads(body)
except json.JSONDecodeError:
logging.debug("failed to decode response body: status (%s), reason (%s), body (%s)",
status, reason, body)
self.body = {}
else:
self.body = {}
def __str__(self):
return f"code: {self.status_code}, reason: {self.reason}, body: {self.body}"
[docs]
class NotFoundException(ServerException):
"""
Resource could not be found on lakeFS server
"""
[docs]
class ForbiddenException(ServerException):
"""
Operation not permitted
"""
[docs]
class NoAuthenticationFound(LakeFSException):
"""
Raised when no authentication method could be found on Client instantiation
"""
[docs]
class UnsupportedCredentialsProviderType(LakeFSException):
"""
Raised when the credentials provider type is not supported
"""
[docs]
class BadRequestException(ServerException):
"""
Bad Request
"""
[docs]
class NotAuthorizedException(ServerException):
"""
User not authorized to perform operation
"""
[docs]
class UnsupportedOperationException(ServerException):
"""
Operation not supported by lakeFS server or SDK
"""
[docs]
class ConflictException(ServerException):
"""
Resource / request conflict
"""
[docs]
class ObjectNotFoundException(NotFoundException, FileNotFoundError):
"""
Raised when the currently used object no longer exist in the lakeFS server
"""
[docs]
class ObjectExistsException(ServerException, FileExistsError):
"""
Raised when Object('...').create(mode='x') and object exists
"""
[docs]
class PermissionException(NotAuthorizedException, PermissionError):
"""
Raised by Object.open() and Object.create() for compatibility with python
"""
[docs]
class InvalidRangeException(ServerException, OSError):
"""
Raised when an object's read request start position exceeds file size
"""
def __init__(self, status=None, reason=None, body=None, headers=None):
super().__init__(status, reason, body, headers)
self.size: int | None = None
# Parse size from Content-Range header if present
# Format: "bytes */<size>" (e.g., "bytes */0" for empty object)
if self.headers:
content_range = self.headers.get('Content-Range', '')
if content_range.startswith('bytes */'):
try:
self.size = int(content_range.split('/')[1])
except (IndexError, ValueError):
pass
[docs]
class ImportManagerException(LakeFSException):
"""
Import manager exceptions that are not originated from the SDK
"""
[docs]
class TransactionException(LakeFSException):
"""
Exceptions during the transaction commit logic
"""
_STATUS_CODE_TO_EXCEPTION: dict[int, type[ServerException]] = {
http.HTTPStatus.BAD_REQUEST.value: BadRequestException,
http.HTTPStatus.UNAUTHORIZED.value: NotAuthorizedException,
http.HTTPStatus.FORBIDDEN.value: ForbiddenException,
http.HTTPStatus.NOT_FOUND.value: NotFoundException,
http.HTTPStatus.METHOD_NOT_ALLOWED.value: UnsupportedOperationException,
http.HTTPStatus.CONFLICT.value: ConflictException,
http.HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE.value: InvalidRangeException
}
[docs]
@contextmanager
def api_exception_handler(custom_handler: Optional[Callable[[LakeFSException], LakeFSException]] = None):
"""
Context that converts lakefs_sdk API exceptions to LakeFS exceptions and handles them.
:param custom_handler: Optional handler which can be used to provide custom behavior for specific exceptions.
If custom_handler returns an exception, this function will raise the exception at the end of the
custom_handler invocation.
"""
try:
yield
except lakefs_sdk.ApiException as e:
# Convert headers list of tuples to dict for easier access
headers = getattr(e, 'headers', None)
headers_dict = dict(headers) if headers else None
exc_class = ServerException
if e.status is not None:
exc_class = _STATUS_CODE_TO_EXCEPTION.get(e.status, ServerException)
lakefs_ex: LakeFSException = exc_class(e.status, e.reason, e.body, headers_dict)
if custom_handler is not None:
lakefs_ex = custom_handler(lakefs_ex)
if lakefs_ex is not None:
raise lakefs_ex from e
[docs]
def handle_http_error(resp: HTTPResponse) -> None:
"""
Handles http response and raises the appropriate lakeFS exception if needed
:param resp: The response to parse
"""
if not http.HTTPStatus.OK <= resp.status < http.HTTPStatus.MULTIPLE_CHOICES:
# Convert headers to dict for consistent interface
headers = getattr(resp, 'headers', None)
headers_dict = dict(headers) if headers else None
exc_class = _STATUS_CODE_TO_EXCEPTION.get(resp.status, ServerException)
lakefs_ex = exc_class(resp.status, resp.reason, resp.data, headers_dict)
raise lakefs_ex