Source code for lakefs.exceptions

"""
Exceptions module
"""
import http
import json
from contextlib import contextmanager
from typing import Optional, Callable
import logging

import lakefs_sdk.exceptions
from urllib3 import HTTPResponse


[docs] class LakeFSException(Exception): """ Base exception for all SDK exceptions """
[docs] class ServerException(LakeFSException): """ Generic exception when no other exception is applicable """ status_code: Optional[int] reason: Optional[str] body: dict headers: dict def __init__(self, status=None, reason=None, body=None, headers=None): self.status_code = status self.reason = reason self.headers = headers if headers is not None else {} if body is not None: try: # Try to get message from body self.body = json.loads(body) except json.JSONDecodeError: logging.debug("failed to decode response body: status (%s), reason (%s), body (%s)", status, reason, body) self.body = {} else: self.body = {} def __str__(self): return f"code: {self.status_code}, reason: {self.reason}, body: {self.body}"
[docs] class NotFoundException(ServerException): """ Resource could not be found on lakeFS server """
[docs] class ForbiddenException(ServerException): """ Operation not permitted """
[docs] class NoAuthenticationFound(LakeFSException): """ Raised when no authentication method could be found on Client instantiation """
[docs] class UnsupportedCredentialsProviderType(LakeFSException): """ Raised when the credentials provider type is not supported """
[docs] class InvalidEnvVarFormat(LakeFSException): """ Raised when the passed env var is not of expected format """
[docs] class BadRequestException(ServerException): """ Bad Request """
[docs] class NotAuthorizedException(ServerException): """ User not authorized to perform operation """
[docs] class UnsupportedOperationException(ServerException): """ Operation not supported by lakeFS server or SDK """
[docs] class ConflictException(ServerException): """ Resource / request conflict """
[docs] class ObjectNotFoundException(NotFoundException, FileNotFoundError): """ Raised when the currently used object no longer exist in the lakeFS server """
[docs] class ObjectExistsException(ServerException, FileExistsError): """ Raised when Object('...').create(mode='x') and object exists """
[docs] class PermissionException(NotAuthorizedException, PermissionError): """ Raised by Object.open() and Object.create() for compatibility with python """
[docs] class InvalidRangeException(ServerException, OSError): """ Raised when an object's read request start position exceeds file size """ def __init__(self, status=None, reason=None, body=None, headers=None): super().__init__(status, reason, body, headers) self.size: int | None = None # Parse size from Content-Range header if present # Format: "bytes */<size>" (e.g., "bytes */0" for empty object) if self.headers: content_range = self.headers.get('Content-Range', '') if content_range.startswith('bytes */'): try: self.size = int(content_range.split('/')[1]) except (IndexError, ValueError): pass
[docs] class ImportManagerException(LakeFSException): """ Import manager exceptions that are not originated from the SDK """
[docs] class TransactionException(LakeFSException): """ Exceptions during the transaction commit logic """
_STATUS_CODE_TO_EXCEPTION: dict[int, type[ServerException]] = { http.HTTPStatus.BAD_REQUEST.value: BadRequestException, http.HTTPStatus.UNAUTHORIZED.value: NotAuthorizedException, http.HTTPStatus.FORBIDDEN.value: ForbiddenException, http.HTTPStatus.NOT_FOUND.value: NotFoundException, http.HTTPStatus.METHOD_NOT_ALLOWED.value: UnsupportedOperationException, http.HTTPStatus.CONFLICT.value: ConflictException, http.HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE.value: InvalidRangeException }
[docs] @contextmanager def api_exception_handler(custom_handler: Optional[Callable[[LakeFSException], LakeFSException]] = None): """ Context that converts lakefs_sdk API exceptions to LakeFS exceptions and handles them. :param custom_handler: Optional handler which can be used to provide custom behavior for specific exceptions. If custom_handler returns an exception, this function will raise the exception at the end of the custom_handler invocation. """ try: yield except lakefs_sdk.ApiException as e: # Convert headers list of tuples to dict for easier access headers = getattr(e, 'headers', None) headers_dict = dict(headers) if headers else None exc_class = ServerException if e.status is not None: exc_class = _STATUS_CODE_TO_EXCEPTION.get(e.status, ServerException) lakefs_ex: LakeFSException = exc_class(e.status, e.reason, e.body, headers_dict) if custom_handler is not None: lakefs_ex = custom_handler(lakefs_ex) if lakefs_ex is not None: raise lakefs_ex from e
[docs] def handle_http_error(resp: HTTPResponse) -> None: """ Handles http response and raises the appropriate lakeFS exception if needed :param resp: The response to parse """ if not http.HTTPStatus.OK <= resp.status < http.HTTPStatus.MULTIPLE_CHOICES: # Convert headers to dict for consistent interface headers = getattr(resp, 'headers', None) headers_dict = dict(headers) if headers else None exc_class = _STATUS_CODE_TO_EXCEPTION.get(resp.status, ServerException) lakefs_ex = exc_class(resp.status, resp.reason, resp.data, headers_dict) raise lakefs_ex