"""
Module containing lakeFS reference implementation
"""
from __future__ import annotations
from typing import Optional, Generator, Union
import lakefs_sdk
from lakefs.models import Commit, Change, CommonPrefix, ObjectInfo, _OBJECT
from lakefs.client import Client, _BaseLakeFSObject
from lakefs.exceptions import api_exception_handler
from lakefs.object import StoredObject
[docs]
class Reference(_BaseLakeFSObject):
"""
Class representing a reference in lakeFS.
"""
_repo_id: str
_id: str
_commit: Optional[Commit] = None
def __init__(self, repository_id: str, reference_id: str, client: Optional[Client] = None) -> None:
"""Return a reference to a lakeFS commit.
:param repository_id: the repository holding the commit
:param reference_id: a reference expression to the commit
Any reference expression can be used as a reference_id, for example:
- 'main' (head of 'main' branch)
- 'main@' (head of 'main' branch, only committed objects)
- 'my_tag~3' (3 commits before 'my_tag')
See https://docs.lakefs.io/understand/model.html#ref-expressions for
details.
"""
self._repo_id = repository_id
self._id = reference_id
super().__init__(client)
@property
def repo_id(self) -> str:
"""
Return the repository id for this reference
"""
return self._repo_id
@property
def id(self) -> str:
"""
Returns the reference id
"""
return self._id
[docs]
def objects(self,
max_amount: Optional[int] = None,
after: Optional[str] = None,
prefix: Optional[str] = None,
delimiter: Optional[str] = None,
**kwargs) -> Generator[StoredObject | CommonPrefix]:
"""
Returns an object generator for this reference, the generator can yield either a StoredObject or a CommonPrefix
object depending on the listing parameters provided.
:param max_amount: Stop showing changes after this amount
:param after: Return items after this value
:param prefix: Return items prefixed with this value
:param delimiter: Group common prefixes by this delimiter
:param kwargs: Additional Keyword Arguments to send to the server
:raise NotFoundException: if this reference or other_ref does not exist
:raise NotAuthorizedException: if user is not authorized to perform this operation
:raise ServerException: for any other errors
"""
for res in generate_listing(self._client.sdk_client.objects_api.list_objects,
repository=self._repo_id,
ref=self._id,
max_amount=max_amount,
after=after,
prefix=prefix,
delimiter=delimiter,
**kwargs):
type_class = ObjectInfo if res.path_type == _OBJECT else CommonPrefix
yield type_class(**res.dict())
[docs]
def log(self, max_amount: Optional[int] = None, **kwargs) -> Generator[Commit]:
"""
Returns a generator of commits starting with this reference id
:param max_amount: (Optional) limits the amount of results to return from the server
:param kwargs: Additional Keyword Arguments to send to the server
:raise NotFoundException: if reference by this id does not exist
:raise NotAuthorizedException: if user is not authorized to perform this operation
:raise ServerException: for any other errors
"""
for res in generate_listing(self._client.sdk_client.refs_api.log_commits, self._repo_id, self._id,
max_amount=max_amount, **kwargs):
yield Commit(**res.dict())
[docs]
def get_commit(self) -> Commit:
"""
Returns the underlying commit referenced by this reference id
:raise NotFoundException: if this reference does not exist
:raise NotAuthorizedException: if user is not authorized to perform this operation
:raise ServerException: for any other errors
"""
if self._commit is None:
with api_exception_handler():
commit = self._client.sdk_client.commits_api.get_commit(self._repo_id, self._id)
self._commit = Commit(**commit.dict())
return self._commit
[docs]
def diff(self,
other_ref: ReferenceType,
max_amount: Optional[int] = None,
after: Optional[str] = None,
prefix: Optional[str] = None,
delimiter: Optional[str] = None,
**kwargs) -> Generator[Change]:
"""
Returns a diff generator of changes between this reference and other_ref
:param other_ref: The other ref to diff against
:param max_amount: Stop showing changes after this amount
:param after: Return items after this value
:param prefix: Return items prefixed with this value
:param delimiter: Group common prefixes by this delimiter
:param kwargs: Additional Keyword Arguments to send to the server
:raise NotFoundException: if this reference or other_ref does not exist
:raise NotAuthorizedException: if user is not authorized to perform this operation
:raise ServerException: for any other errors
"""
other_ref_id = other_ref if isinstance(other_ref, str) else other_ref.id
for diff in generate_listing(self._client.sdk_client.refs_api.diff_refs,
repository=self._repo_id,
left_ref=self._id,
right_ref=other_ref_id,
after=after,
max_amount=max_amount,
prefix=prefix,
delimiter=delimiter,
**kwargs):
yield Change(**diff.dict())
[docs]
def merge_into(self, destination_branch: ReferenceType, **kwargs) -> str:
"""
Merge this reference into destination branch
:param destination_branch: The merge destination (either ID or branch object)
:param kwargs: Additional Keyword Arguments to send to the server
:return: The reference id of the merge commit
:raise NotFoundException: if reference by this id does not exist, or branch doesn't exist
:raise NotAuthorizedException: if user is not authorized to perform this operation
:raise ServerException: for any other errors
"""
branch_id = destination_branch if isinstance(destination_branch, str) else destination_branch.id
with api_exception_handler():
merge = lakefs_sdk.Merge(**kwargs)
res = self._client.sdk_client.refs_api.merge_into_branch(self._repo_id,
self._id,
branch_id,
merge=merge)
return res.reference
[docs]
def object(self, path: str) -> StoredObject: # pylint: disable=C0103
"""
Returns an Object class representing a lakeFS object with this repo id, reference id and path
:param path: The object's path
"""
return StoredObject(self._repo_id, self._id, path, self._client)
def __repr__(self):
class_name = self.__class__.__name__
return f'{class_name}(repository="{self.repo_id}", id="{self.id}")'
[docs]
def generate_listing(func, *args, max_amount: Optional[int] = None, **kwargs):
"""
Generic generator function, for lakefs-sdk listings functionality
:param func: The listing function
:param args: The function args
:param max_amount: The max amount of objects to generate
:param kwargs: The function kwargs
:return: A generator based on the listing function
"""
has_more = True
with api_exception_handler():
while has_more:
page = func(*args, **kwargs)
has_more = page.pagination.has_more
kwargs["after"] = page.pagination.next_offset
for res in page.results:
yield res
if max_amount is not None:
max_amount -= 1
if max_amount <= 0:
return
ReferenceType = Union[str, Reference, Commit]