Source code for lakefs.exceptions

"""
Exceptions module
"""
import http
import json
from contextlib import contextmanager
from typing import Optional, Callable

import lakefs_sdk.exceptions
from urllib3 import HTTPResponse


[docs] class LakeFSException(Exception): """ Base exception for all SDK exceptions """
[docs] class ServerException(LakeFSException): """ Generic exception when no other exception is applicable """ status_code: int reason: str body: dict def __init__(self, status=None, reason=None, body=None): self.status_code = status self.reason = reason if body is not None: try: # Try to get message from body self.body = json.loads(body) except json.JSONDecodeError: self.body = {} else: self.body = {} def __str__(self): return f"code: {self.status_code}, reason: {self.reason}, body: {self.body}"
[docs] class NotFoundException(ServerException): """ Resource could not be found on lakeFS server """
[docs] class ForbiddenException(ServerException): """ Operation not permitted """
[docs] class NoAuthenticationFound(LakeFSException): """ Raised when no authentication method could be found on Client instantiation """
[docs] class BadRequestException(ServerException): """ Bad Request """
[docs] class NotAuthorizedException(ServerException): """ User not authorized to perform operation """
[docs] class UnsupportedOperationException(ServerException): """ Operation not supported by lakeFS server or SDK """
[docs] class ConflictException(ServerException): """ Resource / request conflict """
[docs] class ObjectNotFoundException(NotFoundException, FileNotFoundError): """ Raised when the currently used object no longer exist in the lakeFS server """
[docs] class ObjectExistsException(ServerException, FileExistsError): """ Raised when Object('...').create(mode='x') and object exists """
[docs] class PermissionException(NotAuthorizedException, PermissionError): """ Raised by Object.open() and Object.create() for compatibility with python """
[docs] class InvalidRangeException(ServerException, OSError): """ Raised when the reference could not be found in the lakeFS server """
[docs] class ImportManagerException(LakeFSException): """ Import manager exceptions that are not originated from the SDK """
[docs] class TransactionException(LakeFSException): """ Exceptions during the transaction commit logic """
_STATUS_CODE_TO_EXCEPTION = { http.HTTPStatus.BAD_REQUEST.value: BadRequestException, http.HTTPStatus.UNAUTHORIZED.value: NotAuthorizedException, http.HTTPStatus.FORBIDDEN.value: ForbiddenException, http.HTTPStatus.NOT_FOUND.value: NotFoundException, http.HTTPStatus.METHOD_NOT_ALLOWED.value: UnsupportedOperationException, http.HTTPStatus.CONFLICT.value: ConflictException, http.HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE.value: InvalidRangeException }
[docs] @contextmanager def api_exception_handler(custom_handler: Optional[Callable[[LakeFSException], LakeFSException]] = None): """ Contexts which converts lakefs_sdk API exceptions to LakeFS exceptions and handles them. :param custom_handler: Optional handler which can be used to provide custom behavior for specific exceptions. If custom_handler returns an exception, this function will raise the exception at the end of the custom_handler invocation. """ try: yield except lakefs_sdk.ApiException as e: lakefs_ex = _STATUS_CODE_TO_EXCEPTION.get(e.status, ServerException)(e.status, e.reason, e.body) if custom_handler is not None: lakefs_ex = custom_handler(lakefs_ex) if lakefs_ex is not None: raise lakefs_ex from e
[docs] def handle_http_error(resp: HTTPResponse) -> None: """ Handles http response and raises the appropriate lakeFS exception if needed :param resp: The response to parse """ if not http.HTTPStatus.OK <= resp.status < http.HTTPStatus.MULTIPLE_CHOICES: lakefs_ex = _STATUS_CODE_TO_EXCEPTION.get(resp.status, ServerException)(resp.status, resp.reason, resp.data) raise lakefs_ex