lakefs.import_manager module¶
Import module provides a simpler interface to the lakeFS SDK import functionality
- class lakefs.import_manager.ImportManager(repository_id, branch_id, commit_message='', commit_metadata=None, client=None)[source]¶
Bases:
_BaseLakeFSObject
ImportManager provides an easy-to-use interface to perform imports with multiple sources. It provides both synchronous and asynchronous functionality allowing the user to start an import process, continue executing logic and poll for the import completion.
ImportManager usage example:
import lakefs branch = lakefs.repository("<repository_name>").branch("<branch_name>") mgr = branch.import_data(commit_message="my imported data", metadata={"foo": "bar"}) # add sources for import mgr.prefix(object_store_uri="s3://import-bucket/data1/", destination="import-prefix/").object(object_store_uri="s3://import-bucket/data2/imported_file", destination="import-prefix/imported_file") # start import and wait mgr.run()
- cancel()[source]¶
Cancel an ongoing import process
- Raises:
NotFoundException – if branch, repository or import id do not exist
NotAuthorizedException – if user is not authorized to perform this operation
ConflictException – if the import was already completed
ServerException – for any other errors
- Return type:
None
- commit_message: str¶
- commit_metadata: Optional[Dict]¶
- property import_id: str¶
Returns the id of the current import process
- object(object_store_uri, destination)[source]¶
Creates a new import source of type “object” and adds it to the list of sources
- Parameters:
object_store_uri (
str
) – The URI from which to import the objectdestination (
str
) – The destination path for the object relative to the branch
- Return type:
- Returns:
The ImportManager instance (self) after update, to allow operations chaining
- prefix(object_store_uri, destination)[source]¶
Creates a new import source of type “common_prefix” and adds it to the list of sources
- Parameters:
object_store_uri (
str
) – The URI from which to import the objectsdestination (
str
) – The destination prefix relative to the branch
- Return type:
- Returns:
The ImportManager instance (self) after update, to allow operations chaining
- run(poll_interval=None)[source]¶
Same as calling start() and then wait()
- Parameters:
poll_interval (
Optional
[timedelta
]) – The interval for polling the import status.- Return type:
- Returns:
Import status as returned by the lakeFS server
- Raises:
See start(), wait()
- sources: List[lakefs_sdk.ImportLocation]¶
- start()[source]¶
Start import, reporting back (and storing) a process id
- Return type:
str
- Returns:
The import process identifier in lakeFS
- Raises:
ImportManagerException – if an import process is already in progress
NotFoundException – if branch or repository do not exist
NotAuthorizedException – if user is not authorized to perform this operation
ValidationError – if path_type is not one of the allowed values
ServerException – for any other errors
- status()[source]¶
Get the current import status
- Return type:
- Returns:
Import status as returned by the lakeFS server
- Raises:
ImportManagerException – if no import is in progress
NotFoundException – if branch, repository or import id do not exist
NotAuthorizedException – if user is not authorized to perform this operation
ServerException – for any other errors
- wait(poll_interval=datetime.timedelta(seconds=2))[source]¶
Poll a started import task ID, blocking until completion
- Parameters:
poll_interval (
Optional
[timedelta
]) – The interval for polling the import status.- Return type:
- Returns:
Import status as returned by the lakeFS server
- Raises:
ImportManagerException – if no import is in progress
NotFoundException – if branch, repository or import id do not exist
NotAuthorizedException – if user is not authorized to perform this operation
ServerException – for any other errors