Source code for lakefs.tag

"""
Module containing lakeFS tag implementation
"""

from __future__ import annotations
from typing import Optional

import lakefs_sdk

from lakefs.client import Client
from lakefs.exceptions import api_exception_handler, LakeFSException, ConflictException
from lakefs.reference import Reference, ReferenceType


[docs] class Tag(Reference): """ Class representing a tag in lakeFS. """ def __init__(self, repository_id: str, tag_id: str, client: Optional[Client] = None): super().__init__(repository_id, reference_id=tag_id, client=client)
[docs] def create(self, source_ref: ReferenceType, exist_ok: Optional[bool] = False) -> Tag: """ Create a tag from the given source_ref :param source_ref: The reference to create the tag on (either ID or Reference object) :param exist_ok: If True returns the existing Tag reference otherwise raises exception :return: A lakeFS SDK Tag object :raise NotAuthorizedException: if user is not authorized to perform this operation :raise NotFoundException: if source_ref_id doesn't exist on the lakeFS server :raise ServerException: for any other errors. """ source_ref_id = source_ref if isinstance(source_ref, str) else source_ref.id tag_creation = lakefs_sdk.TagCreation(id=self.id, ref=source_ref_id) def handle_conflict(e: LakeFSException): if not (isinstance(e, ConflictException) and exist_ok): return e return None with api_exception_handler(handle_conflict): self._client.sdk_client.tags_api.create_tag(self._repo_id, tag_creation) return self
[docs] def delete(self) -> None: """ Delete the tag from the lakeFS server :raise NotAuthorizedException: if user is not authorized to perform this operation :raise NotFoundException: if source_ref_id doesn't exist on the lakeFS server :raise ServerException: for any other errors """ with api_exception_handler(): self._client.sdk_client.tags_api.delete_tag(self._repo_id, self.id) self._commit = None