"""
Module containing lakeFS repository implementation
"""
from __future__ import annotations
from typing import Optional, Generator
import lakefs_sdk
from lakefs.models import RepositoryProperties
from lakefs.tag import Tag
from lakefs.branch import Branch
from lakefs.client import Client, _BaseLakeFSObject
from lakefs.exceptions import api_exception_handler, ConflictException, LakeFSException
from lakefs.reference import Reference, generate_listing
[docs]
class Repository(_BaseLakeFSObject):
"""
Class representing a Repository in lakeFS.
The Repository object provides the context for the other objects that are found in it.
Access to these objects should be done from this class
"""
_id: str
_properties: RepositoryProperties = None
def __init__(self, repository_id: str, client: Optional[Client] = None) -> None:
self._id = repository_id
super().__init__(client)
[docs]
def create(self,
storage_namespace: str,
default_branch: str = "main",
include_samples: bool = False,
exist_ok: bool = False,
**kwargs) -> Repository:
"""
Create a new repository in lakeFS from this object
:param storage_namespace: Repository's storage namespace
:param default_branch: The default branch for the repository. If None, use server default name
:param include_samples: Whether to include sample data in repository creation
:param exist_ok: If False will throw an exception if a repository by this name already exists. Otherwise,
return the existing repository without creating a new one
:param kwargs: Additional Keyword Arguments to send to the server
:return: The lakeFS SDK object representing the repository
:raise NotAuthorizedException: if user is not authorized to perform this operation
:raise ServerException: for any other errors
"""
repository_creation = lakefs_sdk.RepositoryCreation(name=self._id,
storage_namespace=storage_namespace,
default_branch=default_branch,
sample_data=include_samples)
def handle_conflict(e: LakeFSException):
if isinstance(e, ConflictException) and exist_ok:
with api_exception_handler():
get_repo = self._client.sdk_client.repositories_api.get_repository(self._id)
self._properties = RepositoryProperties(**get_repo.dict())
return None
return e
with api_exception_handler(handle_conflict):
repo = self._client.sdk_client.repositories_api.create_repository(repository_creation, **kwargs)
self._properties = RepositoryProperties(**repo.dict())
return self
[docs]
def delete(self) -> None:
"""
Delete repository from lakeFS server
:raise NotFoundException: if repository by this id does not exist
:raise NotAuthorizedException: if user is not authorized to perform this operation
:raise ServerException: for any other errors
"""
with api_exception_handler():
self._client.sdk_client.repositories_api.delete_repository(self._id)
[docs]
def branch(self, branch_id: str) -> Branch:
"""
Return a branch object using the current repository id and client
:param branch_id: name of the branch
"""
return Branch(self._id, branch_id, self._client)
[docs]
def commit(self, commit_id: str) -> Reference:
"""
Return a reference object using the current repository id and client
:param commit_id: id of the commit reference
"""
return Reference(self._id, commit_id, self._client)
[docs]
def ref(self, ref_id: str) -> Reference:
"""
Return a reference object using the current repository id and client
:param ref_id: branch name, commit id or tag id
"""
return Reference(self._id, ref_id, self._client)
[docs]
def tag(self, tag_id: str) -> Tag:
"""
Return a tag object using the current repository id and client
:param tag_id: name of the tag
"""
return Tag(self._id, tag_id, self._client)
[docs]
def branches(self, max_amount: Optional[int] = None,
after: Optional[str] = None, prefix: Optional[str] = None, **kwargs) -> Generator[Branch]:
"""
Returns a generator listing for branches on the given repository
:param max_amount: Stop showing changes after this amount
:param after: Return items after this value
:param prefix: Return items prefixed with this value
:param kwargs: Additional Keyword Arguments to send to the server
:raise NotFoundException: if repository does not exist
:raise NotAuthorizedException: if user is not authorized to perform this operation
:raise ServerException: for any other errors
"""
for res in generate_listing(self._client.sdk_client.branches_api.list_branches, self._id,
max_amount=max_amount, after=after, prefix=prefix, **kwargs):
yield Branch(self._id, res.id, client=self._client)
@property
def metadata(self) -> dict[str, str]:
"""
Returns the repository metadata
"""
with api_exception_handler():
return self._client.sdk_client.repositories_api.get_repository_metadata(repository=self._id)
@property
def properties(self) -> RepositoryProperties:
"""
Return the repository's properties object
"""
if self._properties is None:
with api_exception_handler():
repo = self._client.sdk_client.repositories_api.get_repository(self._id)
self._properties = RepositoryProperties(**repo.dict())
return self._properties
return self._properties
@property
def id(self) -> str:
"""
Returns the repository's id
"""
return self._id
def __repr__(self) -> str:
return f'Repository(id="{self.id}")'
def __str__(self):
return str(self.properties)
[docs]
def repositories(client: Client = None,
prefix: Optional[str] = None,
after: Optional[str] = None,
**kwargs) -> Generator[Repository]:
"""
Creates a repositories object generator listing lakeFS repositories
:param client: The lakeFS client to use, if None, tries to use the default client
:param prefix: Return items prefixed with this value
:param after: Return items after this value
:return: A generator listing lakeFS repositories
"""
if client is None: # Try to get default client
client = Client()
for res in generate_listing(client.sdk_client.repositories_api.list_repositories,
prefix=prefix,
after=after,
**kwargs):
yield Repository(res.id, client)