"""
Module containing lakeFS reference implementation
"""
from __future__ import annotations
import base64
import binascii
import http
import io
import json
import os
import tempfile
import urllib.parse
from abc import abstractmethod
from typing import AnyStr, IO, Iterator, List, Literal, Optional, Union, get_args
import lakefs_sdk
from lakefs_sdk import StagingMetadata
from lakefs.client import Client, _BaseLakeFSObject, SINGLE_STORAGE_ID
from lakefs.exceptions import (
    api_exception_handler,
    handle_http_error,
    LakeFSException,
    NotFoundException,
    ObjectNotFoundException,
    NotAuthorizedException,
    ForbiddenException,
    PermissionException,
    ObjectExistsException,
    InvalidRangeException,
)
from lakefs.models import ObjectInfo
_LAKEFS_METADATA_PREFIX = "x-lakefs-meta-"
# _BUFFER_SIZE - Writer buffer size. While buffer size not exceed, data will be maintained in memory and file will
#                not be created.
_WRITER_BUFFER_SIZE = 32 * 1024 * 1024
ReadModes = Literal['r', 'rb']
WriteModes = Literal['x', 'xb', 'w', 'wb']
AllModes = Union[ReadModes, WriteModes]
[docs]
class LakeFSIOBase(_BaseLakeFSObject, IO):
    """
    Base class for the lakeFS Reader and Writer classes
    """
    _obj: StoredObject
    _mode: AllModes
    _pos: int
    _pre_sign: Optional[bool] = None
    def __init__(self, obj: StoredObject, mode: AllModes, pre_sign: Optional[bool] = None,
                 client: Optional[Client] = None) -> None:
        self._obj = obj
        self._mode = mode
        self._pos = 0
        super().__init__(client)
        # must be set after super().__init__ to ensure the client is properly initialized.
        self._pre_sign = pre_sign if pre_sign is not None \
            
else self._client.storage_config_by_id(obj.storage_id()).pre_sign_support
    @property
    def mode(self) -> str:
        """
        Returns the open mode for this object
        """
        return self._mode
    @property
    def name(self) -> str:
        """
        Returns the name of the object relative to the repo and reference
        """
        return self._obj.path
[docs]
    def close(self) -> None:
        """
        Finalizes any existing operations on object and close open descriptors
        Inheriting classes need to override close() as needed.
        """ 
    @abstractmethod
    def _abort(self) -> None:
        """
        Specific implementation for when the IO object should be discarded
        """
        raise NotImplementedError
[docs]
    def fileno(self) -> int:
        """
        The file descriptor number as defined by the operating system. In the context of lakeFS it has no meaning
        :raise io.UnsupportedOperation: Always, since fileno is not supported for lakeFS objects
        """
        raise io.UnsupportedOperation 
[docs]
    @abstractmethod
    def flush(self) -> None:
        raise NotImplementedError 
[docs]
    def isatty(self) -> bool:
        """
        Irrelevant for the lakeFS implementation
        """
        return False 
[docs]
    @abstractmethod
    def readable(self) -> bool:
        raise NotImplementedError 
[docs]
    def readline(self, limit: int = -1):
        """
        Must be explicitly implemented by inheriting class
        """
        raise io.UnsupportedOperation 
[docs]
    def readlines(self, hint: int = -1):
        """
        Must be explicitly implemented by inheriting class
        """
        raise io.UnsupportedOperation 
[docs]
    @abstractmethod
    def seekable(self) -> bool:
        raise NotImplementedError 
[docs]
    def truncate(self, size: int = None) -> int:
        """
        Unsupported by lakeFS implementation
        """
        raise io.UnsupportedOperation 
[docs]
    @abstractmethod
    def writable(self) -> bool:
        raise NotImplementedError 
[docs]
    @abstractmethod
    def write(self, s: AnyStr) -> int:
        raise NotImplementedError 
[docs]
    def writelines(self, lines: List[AnyStr]) -> None:
        """
        Unsupported by lakeFS implementation
        """
        raise io.UnsupportedOperation 
    def __next__(self) -> AnyStr:
        line = self.readline()
        if len(line) == 0:
            raise StopIteration
        return line
    def __iter__(self) -> Iterator[AnyStr]:
        return self
    def __enter__(self) -> LakeFSIOBase:
        return self
    def __exit__(self, typ, value, traceback) -> bool:
        if typ is None:  # Perform logic in case no exception was raised
            self.close()
        else:
            self._abort()  # perform logic regardless of exception
        return False  # Don't suppress an exception
[docs]
    @abstractmethod
    def seek(self, offset: int, whence: int = 0) -> int:
        raise NotImplementedError 
[docs]
    @abstractmethod
    def read(self, n: int = None) -> str | bytes:
        raise NotImplementedError 
[docs]
    def tell(self) -> int:
        """
        For readers - read position, for writers can be used as an indication for bytes written
        """
        return self._pos 
 
[docs]
class ObjectReader(LakeFSIOBase):
    """
    ObjectReader provides read-only functionality for lakeFS objects with IO semantics.
    This Object is instantiated and returned for immutable reference types (Commit, Tag...)
    """
    _readlines_buf: io.BytesIO
    def __init__(self, obj: StoredObject, mode: ReadModes, pre_sign: Optional[bool] = None,
                 client: Optional[Client] = None, **kwargs) -> None:
        if mode not in get_args(ReadModes):
            raise ValueError(f"invalid read mode: '{mode}'. ReadModes: {ReadModes}")
        super().__init__(obj, mode, pre_sign, client)
        self._readlines_buf = io.BytesIO(b"")
        self._is_closed = False
        self._kwargs = kwargs
    @property
    def pre_sign(self):
        """
        Returns whether the pre_sign mode is enabled
        """
        if self._pre_sign is None:
            self._pre_sign = self._client.storage_config_by_id(self._obj.storage_id()).pre_sign_support
        return self._pre_sign
    @pre_sign.setter
    def pre_sign(self, value: bool) -> None:
        """
        Set the pre_sign mode to value
        :param value: The new value for pre_sign mode
        """
        self._pre_sign = value
    @property
    def closed(self) -> bool:
        """
        Returns True after the object is closed
        """
        return self._is_closed
[docs]
    def readable(self) -> bool:
        """
        Returns True always
        """
        return True 
[docs]
    def write(self, s: AnyStr) -> int:
        """
        Unsupported for reader object
        """
        raise io.UnsupportedOperation 
[docs]
    def seekable(self) -> bool:
        """
        Returns True always
        """
        return True 
[docs]
    def writable(self) -> bool:
        """
        Unsupported - read only object
        """
        return False 
[docs]
    def seek(self, offset: int, whence: int = 0) -> int:
        """
        Move the object's reading position
        :param offset: The offset from the beginning of the file
        :param whence: Optional. The whence argument is optional and defaults to
            os.SEEK_SET or 0 (absolute file positioning);
            other values are os.SEEK_CUR or 1 (seek relative to the current position) and os.SEEK_END or 2
            (seek relative to the file’s end)
        :raise ValueError: if reader is closed
        :raise OSError: if calculated new position is negative
        :raise io.UnsupportedOperation: If whence value is unsupported
        """
        if self._is_closed:
            raise ValueError("I/O operation on closed file")
        if whence == os.SEEK_SET:
            pos = offset
        elif whence == os.SEEK_CUR:
            pos = self._pos + offset
        elif whence == os.SEEK_END:
            size = self._obj.stat().size_bytes  # Seek end requires us to know the size of the file
            pos = size + offset
        else:
            raise io.UnsupportedOperation(f"whence={whence} is not supported")
        if pos < 0:
            raise OSError("position must be a non-negative integer")
        self._pos = pos
        return pos 
    def _cast_by_mode(self, retval):
        if 'b' not in self.mode:
            return retval.decode('utf-8')
        return retval
    def _read(self, read_range: str) -> str | bytes:
        try:
            with api_exception_handler(_io_exception_handler):
                return self._client.sdk_client.objects_api.get_object(self._obj.repo,
                                                                      self._obj.ref,
                                                                      self._obj.path,
                                                                      range=read_range,
                                                                      presign=self.pre_sign,
                                                                      **self._kwargs)
        except InvalidRangeException:
            # This is done in order to behave like the built-in open() function
            return b''
[docs]
    def read(self, n: int = None) -> str | bytes:
        """
        Read object data
        :param n: How many bytes to read. If read_bytes is None, will read from current position to end.
            If current position + read_bytes > object size.
        :return: The bytes read
        :raise ValueError: if reader is closed
        :raise OSError: if read_bytes is non-positive
        :raise ObjectNotFoundException: if repository id, reference id or object path does not exist
        :raise PermissionException: if user is not authorized to perform this operation, or operation is forbidden
        :raise ServerException: for any other errors
        """
        if self._is_closed:
            raise ValueError("I/O operation on closed file")
        if n and n <= 0:
            raise OSError("read_bytes must be a positive integer")
        read_range = self._get_range_string(start=self._pos, read_bytes=n)
        contents = self._read(read_range)
        self._pos += len(contents)  # Update pointer position
        return self._cast_by_mode(contents) 
[docs]
    def readline(self, limit: int = -1):
        """
        Read and return a line from the stream.
        :param limit: If limit > -1 returns at most limit bytes
        :raise ValueError: if reader is closed
        :raise ObjectNotFoundException: if repository id, reference id or object path does not exist
        :raise PermissionException: if user is not authorized to perform this operation, or operation is forbidden
        :raise ServerException: for any other errors
        """
        if self._is_closed:
            raise ValueError("I/O operation on closed file")
        if self._readlines_buf.getbuffer().nbytes == 0:
            self._readlines_buf = io.BytesIO(self._read(self._get_range_string(0)))
        self._readlines_buf.seek(self._pos)
        line = self._readlines_buf.readline(limit)
        self._pos = self._readlines_buf.tell()
        return self._cast_by_mode(line) 
[docs]
    def flush(self) -> None:
        """
        Nothing to do for reader
        :raise ValueError: if reader is closed
        """
        if self._is_closed:
            raise ValueError("I/O operation on closed file") 
[docs]
    def close(self) -> None:
        """
        Close open descriptors
        """
        if self._is_closed:
            return
        self._is_closed = True
        self._readlines_buf.close() 
    def _abort(self):
        """
        Closes reader
        """
        self.close()
    @staticmethod
    def _get_range_string(start, read_bytes=None):
        if start == 0 and read_bytes is None:
            return None
        if read_bytes is None:
            return f"bytes={start}-"
        return f"bytes={start}-{start + read_bytes - 1}"
    def __str__(self):
        return self._obj.path
    def __repr__(self):
        return f'ObjectReader(path="{self._obj.path}")' 
[docs]
class ObjectWriter(LakeFSIOBase):
    """
    ObjectWriter provides write-only functionality for lakeFS objects with IO semantics.
    This Object is instantiated and returned from the WriteableObject writer method.
    For the data to be actually written to the lakeFS server the close() method must be invoked explicitly or
    implicitly when using writer as a context.
    """
    _fd: tempfile.SpooledTemporaryFile
    _obj_stats: ObjectInfo = None
    def __init__(self,
                 obj: StoredObject,
                 mode: WriteModes,
                 pre_sign: Optional[bool] = None,
                 content_type: Optional[str] = None,
                 metadata: Optional[dict[str, str]] = None,
                 client: Optional[Client] = None,
                 **kwargs) -> None:
        if 'x' in mode and obj.exists():  # Requires explicit create
            raise ObjectExistsException
        if mode not in get_args(WriteModes):
            raise ValueError(f"invalid write mode: '{mode}'. WriteModes: {WriteModes}")
        self.content_type = content_type
        self.metadata = metadata
        open_kwargs = {
            # TODO: Once the upstream urllib3 < 2.0 is out of support,
            # pass the specified write mode and conditional encoding (None, utf-8).
            "encoding": None,  # Must be none for binary write modes. "utf-8" otherwise
            "mode": 'wb+',  # Always write to file in binary mode (bug in urllib3 < 2.0,
            "max_size": _WRITER_BUFFER_SIZE,
        }
        self._fd = tempfile.SpooledTemporaryFile(**open_kwargs)  # pylint: disable=consider-using-with
        super().__init__(obj, mode, pre_sign, client)
        self._kwargs = kwargs
    @property
    def pre_sign(self) -> bool:
        """
        Returns whether the pre_sign mode is enabled
        """
        if self._pre_sign is None:
            self._pre_sign = self._client.storage_config_by_id(self._obj.storage_id()).pre_sign_support
        return self._pre_sign
    @pre_sign.setter
    def pre_sign(self, value: bool) -> None:
        """
        Set the pre_sign mode to value
        :param value: The new value for pre_sign mode
        """
        self._pre_sign = value
    @property
    def closed(self) -> bool:
        """
        Returns True after the object is closed
        """
        return self._fd.closed
[docs]
    def flush(self) -> None:
        """
        Flush buffer to file. Prevent flush if total write size is still smaller than _BUFFER_SIZE so that we avoid
        unnecessary write to disk.
        :raise ValueError: if writer is closed
        """
        if self._fd.closed:
            raise ValueError("I/O operation on closed file")
        # Don't flush buffer to file if we didn't exceed buffer size
        # We want to avoid using the file if possible
        if self._pos > _WRITER_BUFFER_SIZE:
            self._fd.flush() 
[docs]
    def write(self, s: AnyStr) -> int:
        """
        Write data to buffer
        :param s: The data to write
        :return: The number of bytes written to buffer
        :raise ValueError: if writer is closed
        """
        contents = s.encode('utf-8') if isinstance(s, str) else s
        count = self._fd.write(contents)
        self._pos += count
        return count 
[docs]
    def discard(self) -> None:
        """
        Discards of the write buffer and closes writer
        """
        self._abort() 
    def _upload(self) -> ObjectInfo:
        stats = self._upload_presign() if self.pre_sign else self._upload_raw()
        return ObjectInfo(**stats.dict())
[docs]
    def close(self) -> None:
        """
        Write the data to the lakeFS server
        """
        if self._fd.closed:
            return
        try:
            self._obj_stats = self._upload()
        finally:
            self._fd.close() 
    def _abort(self) -> None:
        """
        Close open descriptors but create nothing on lakeFS.
        """
        if not self._fd.closed:
            self._fd.close()
    @staticmethod
    def _extract_etag_from_response(headers) -> str:
        # prefer Content-MD5 if exists
        content_md5 = headers.get("Content-MD5")
        if content_md5 is not None and len(content_md5) > 0:
            try:  # decode base64, return as hex
                decode_md5 = base64.b64decode(content_md5)
                return binascii.hexlify(decode_md5).decode("utf-8")
            except binascii.Error:
                pass
        # fallback to ETag
        etag = headers.get("ETag", "").strip(' "')
        return etag
    def _upload_raw(self) -> lakefs_sdk.ObjectStats:
        """
        Use raw upload API call to bypass validation of content parameter
        """
        auth_settings = ['basic_auth', 'cookie_auth', 'oidc_auth', 'saml_auth', 'jwt_token']
        headers = {
            "Accept": "application/json",
            "Content-Type": self.content_type if self.content_type is not None else "application/octet-stream"
        }
        if self._mode.startswith("x"):
            headers["If-None-Match"] = "*"
        # Create user metadata headers
        if self.metadata is not None:
            for k, v in self.metadata.items():
                headers[_LAKEFS_METADATA_PREFIX + k] = v
        self._fd.seek(0)
        resource_path = urllib.parse.quote(f"/repositories/{self._obj.repo}/branches/{self._obj.ref}/objects",
                                           encoding="utf-8")
        query_params = urllib.parse.urlencode({"path": self._obj.path}, encoding="utf-8")
        url = self._client.config.host + resource_path + f"?{query_params}"
        self._client.sdk_client.objects_api.api_client.update_params_for_auth(headers, None, auth_settings,
                                                                              resource_path, "POST", self._fd)
        resp = self._client.sdk_client.objects_api.api_client.rest_client.pool_manager.request(url=url,
                                                                                               method="POST",
                                                                                               headers=headers,
                                                                                               body=self._fd)
        if self._mode.startswith("x") and resp.status == http.HTTPStatus.PRECONDITION_FAILED:
            raise ObjectExistsException(resp.status, resp.reason, resp.data)
        handle_http_error(resp)
        return lakefs_sdk.ObjectStats(**json.loads(resp.data))
    def _upload_presign(self) -> lakefs_sdk.ObjectStats:
        # Extract staging-specific kwargs
        staging_kwargs = {k: v for k, v in self._kwargs.items() if k in ['_request_timeout', 'async_req']}
        staging_location = self._client.sdk_client.staging_api.get_physical_address(self._obj.repo,
                                                                                    self._obj.ref,
                                                                                    self._obj.path,
                                                                                    True,
                                                                                    **staging_kwargs)
        url = staging_location.presigned_url
        headers = {"Content-Length": self._pos}
        if self.content_type:
            headers["Content-Type"] = self.content_type
        if self._client.storage_config_by_id(self._obj.storage_id()).blockstore_type == "azure":
            headers["x-ms-blob-type"] = "BlockBlob"
        self._fd.seek(0)
        resp = self._client.sdk_client.staging_api.api_client.rest_client.pool_manager.request(method="PUT",
                                                                                               url=url,
                                                                                               body=self._fd,
                                                                                               headers=headers)
        handle_http_error(resp)
        etag = ObjectWriter._extract_etag_from_response(resp.headers)
        size_bytes = self._pos
        staging_metadata = StagingMetadata(staging=staging_location,
                                           size_bytes=size_bytes,
                                           checksum=etag,
                                           user_metadata=self.metadata,
                                           content_type=self.content_type)
        if_none_match = "*" if self._mode.startswith("x") else None
        try:
            return self._client.sdk_client.staging_api.link_physical_address(self._obj.repo,
                                                                             self._obj.ref,
                                                                             self._obj.path,
                                                                             staging_metadata=staging_metadata,
                                                                             if_none_match=if_none_match,
                                                                             **staging_kwargs)
        except lakefs_sdk.ApiException as e:
            if self._mode.startswith("x") and e.status == http.HTTPStatus.PRECONDITION_FAILED:
                raise ObjectExistsException(e.status, e.reason, e.body) from e
            raise
[docs]
    def readable(self) -> bool:
        """
        ObjectWriter is write-only - return False always
        """
        return False 
[docs]
    def seekable(self) -> bool:
        """
        ObjectWriter is not seekable. Returns False always
        """
        return False 
[docs]
    def writable(self) -> bool:
        """
        Returns True always
        """
        return True 
[docs]
    def seek(self, offset: int, whence: int = 0) -> int:
        """
        Unsupported for writer class
        """
        raise io.UnsupportedOperation 
[docs]
    def read(self, n: int = None) -> str | bytes:
        """
        Unsupported for writer class
        """
        raise io.UnsupportedOperation 
    def __repr__(self):
        return f'ObjectWriter(path="{self._obj.path}")' 
[docs]
class StoredObject(_BaseLakeFSObject):
    """
    Class representing an object in lakeFS.
    """
    _repo_id: str
    _ref_id: str
    _path: str
    _stats: Optional[ObjectInfo] = None
    _storage_id: Optional[str] = None
    def __init__(self, repository_id: str, reference_id: str, path: str, client: Optional[Client] = None):
        self._repo_id = repository_id
        self._ref_id = reference_id
        self._path = path
        super().__init__(client)
    def __str__(self) -> str:
        return self.path
    def __repr__(self):
        return f'StoredObject(repository="{self.repo}", reference="{self.ref}", path="{self.path}")'
    @property
    def repo(self) -> str:
        """
        Returns the object's repository id
        """
        return self._repo_id
    @property
    def ref(self) -> str:
        """
        Returns the object's reference id
        """
        return self._ref_id
    @property
    def path(self) -> str:
        """
        Returns the object's path relative to repository and reference ids
        """
        return self._path
[docs]
    def reader(self, mode: ReadModes = 'rb', pre_sign: Optional[bool] = None, **kwargs) -> ObjectReader:
        """
        Context manager which provide a file-descriptor like object that allow reading the given object.
        Usage Example:
        .. code-block:: python
            import lakefs
            obj = lakefs.repository("<repository_name>").branch("<branch_name>").object("file.txt")
            file_size = obj.stat().size_bytes
            with obj.reader(mode='r', pre_sign=True) as fd:
                # print every other 10 chars
                while fd.tell() < file_size
                    print(fd.read(10))
                    fd.seek(10, os.SEEK_CUR)
        :param mode: Read mode - as supported by ReadModes
        :param pre_sign: (Optional), enforce the pre_sign mode on the lakeFS server. If not set, will probe server for
            information.
        :return: A Reader object
        """
        return ObjectReader(self, mode=mode, pre_sign=pre_sign, client=self._client, **kwargs) 
[docs]
    def stat(self, pre_sign: Optional[bool] = None, **kwargs) -> ObjectInfo:
        """
        Return the Stat object representing this object
        """
        # don't cache pre-signed stats
        if self._stats is None or pre_sign:
            with api_exception_handler(_io_exception_handler):
                stat = self._client.sdk_client.objects_api.stat_object(
                    self._repo_id, self._ref_id, self._path, presign=pre_sign, **kwargs)
                self._stats = ObjectInfo(**stat.dict())
        return self._stats 
[docs]
    def storage_id(self) -> str:
        """
        Return the storage ID associated with this object
        """
        if self._storage_id is None:
            with api_exception_handler(_io_exception_handler):
                repo = self._client.sdk_client.repositories_api.get_repository(self._repo_id)
                self._storage_id = repo.storage_id if repo.storage_id else SINGLE_STORAGE_ID
        return self._storage_id 
[docs]
    def exists(self, **kwargs) -> bool:
        """
        Returns True if object exists in lakeFS, False otherwise
        """
        exists = False
        def exist_handler(e: LakeFSException):
            if isinstance(e, NotFoundException):
                return None  # exists = False
            return _io_exception_handler(e)
        with api_exception_handler(exist_handler):
            self._client.sdk_client.objects_api.head_object(self._repo_id, self._ref_id, self._path, **kwargs)
            exists = True
        return exists 
[docs]
    def copy(self, destination_branch_id: str, destination_path: str, **kwargs) -> WriteableObject:
        """
        Copy the object to a destination branch
        :param destination_branch_id: The destination branch to copy the object to
        :param destination_path: The path of the copied object in the destination branch
        :return: The newly copied Object
        :raise ObjectNotFoundException: if repo id,reference id, destination branch id or object path does not exist
        :raise PermissionException: if user is not authorized to perform this operation, or operation is forbidden
        :raise ServerException: for any other errors
        """
        with api_exception_handler():
            object_copy_creation = lakefs_sdk.ObjectCopyCreation(src_ref=self._ref_id, src_path=self._path)
            self._client.sdk_client.objects_api.copy_object(repository=self._repo_id,
                                                            branch=destination_branch_id,
                                                            dest_path=destination_path,
                                                            object_copy_creation=object_copy_creation,
                                                            **kwargs)
        return WriteableObject(repository_id=self._repo_id, reference_id=destination_branch_id, path=destination_path,
                               client=self._client) 
 
[docs]
class WriteableObject(StoredObject):
    """
    WriteableObject inherits from ReadableObject and provides read/write functionality for lakeFS objects
    using IO semantics.
    This Object is instantiated and returned upon invoking writer() on Branch reference type.
    """
    def __init__(self, repository_id: str, reference_id: str, path: str,
                 client: Optional[Client] = None) -> None:
        super().__init__(repository_id, reference_id, path, client=client)
    def __repr__(self):
        return f'WriteableObject(repository="{self.repo}", reference="{self.ref}", path="{self.path}")'
[docs]
    def upload(self,
               data: str | bytes,
               mode: WriteModes = 'w',
               pre_sign: Optional[bool] = None,
               content_type: Optional[str] = None,
               metadata: Optional[dict[str, str]] = None,
               **kwargs) -> WriteableObject:
        """
        Upload a new object or overwrites an existing object
        :param data: The contents of the object to write (can be bytes or string)
        :param mode: Write mode:
            'x'     - Open for exclusive creation
            'xb'    - Open for exclusive creation in binary mode
            'w'     - Create a new object or truncate if exists
            'wb'    - Create or truncate in binary mode
        :param pre_sign: (Optional) Explicitly state whether to use pre_sign mode when uploading the object.
            If None, will be taken from pre_sign property.
        :param content_type: (Optional) Explicitly set the object Content-Type
        :param metadata: (Optional) User metadata
        :return: The Stat object representing the newly created object
        :raise ObjectExistsException: if object exists and mode is exclusive ('x')
        :raise ObjectNotFoundException: if repo id, reference id or object path does not exist
        :raise PermissionException: if user is not authorized to perform this operation, or operation is forbidden
        :raise ServerException: for any other errors
        """
        with ObjectWriter(self, mode, pre_sign, content_type, metadata, self._client, **kwargs) as writer:
            writer.write(data)
        return self 
[docs]
    def delete(self, **kwargs) -> None:
        """
        Delete object from lakeFS
        :raise ObjectNotFoundException: if repo id, reference id or object path does not exist
        :raise PermissionException: if user is not authorized to perform this operation, or operation is forbidden
        :raise ServerException: for any other errors
        """
        with api_exception_handler(_io_exception_handler):
            self._client.sdk_client.objects_api.delete_object(self._repo_id, self._ref_id, self._path, **kwargs)
            self._stats = None 
[docs]
    def writer(self,
               mode: WriteModes = 'wb',
               pre_sign: Optional[bool] = None,
               content_type: Optional[str] = None,
               metadata: Optional[dict[str, str]] = None,
               **kwargs) -> ObjectWriter:
        """
        Context manager which provide a file-descriptor like object that allow writing the given object to lakeFS
        The writes are saved in a buffer as long as the writer is open. Only when it closes it writes the data into
        lakeFS. The optional parameters can be modified by accessing the respective fields as long as the writer is
        still open.
        Usage example of reading a file from local file system and writing it to lakeFS:
        .. code-block:: python
            import lakefs
            obj = lakefs.repository("<repository_name>").branch("<branch_name>").object("my_image")
            with open("my_local_image", mode='rb') as reader, obj.writer("wb") as writer:
                writer.write(reader.read())
        :param mode: Write mode - as supported by WriteModes
        :param pre_sign: (Optional), enforce the pre_sign mode on the lakeFS server. If not set, will probe server for
            information.
        :param content_type: (Optional) Specify the data media type
        :param metadata: (Optional) User defined metadata to save on the object
        :return: A Writer object
        """
        return ObjectWriter(self,
                            mode=mode,
                            pre_sign=pre_sign,
                            content_type=content_type,
                            metadata=metadata,
                            client=self._client,
                            **kwargs) 
 
def _io_exception_handler(e: LakeFSException):
    if isinstance(e, NotFoundException):
        return ObjectNotFoundException(e.status_code, e.reason)
    if isinstance(e, (NotAuthorizedException, ForbiddenException)):
        return PermissionException(e.status_code, e.reason)
    return e