Source code for lakefs.config

"""
Client configuration module
"""

from __future__ import annotations

import dataclasses
import json
import os
from enum import Enum
from json import JSONDecodeError
from pathlib import Path
from typing import Optional, Dict

import yaml
from lakefs_sdk import Configuration
from lakefs.exceptions import NoAuthenticationFound, UnsupportedCredentialsProviderType, InvalidEnvVarFormat
from lakefs.namedtuple import LenientNamedTuple

_LAKECTL_YAML_PATH = os.path.join(Path.home(), ".lakectl.yaml")
_LAKECTL_ENDPOINT_URL_ENV = "LAKECTL_SERVER_ENDPOINT_URL"
_LAKECTL_ACCESS_KEY_ID_ENV = "LAKECTL_CREDENTIALS_ACCESS_KEY_ID"
_LAKECTL_SECRET_ACCESS_KEY_ENV = "LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY"
# lakefs access token, used for authentication when logging in with an IAM role
_LAKECTL_CREDENTIALS_SESSION_TOKEN = "LAKECTL_CREDENTIALS_SESSION_TOKEN"
_LAKECTL_CREDENTIALS_PROVIDER_TYPE = "LAKECTL_CREDENTIALS_PROVIDER_TYPE"
_LAKECTL_CREDENTIALS_PROVIDER_AWS_IAM_TOKEN_TTL_SECONDS = "LAKECTL_CREDENTIALS_PROVIDER_AWS_IAM_TOKEN_TTL_SECONDS"
_LAKECTL_CREDENTIALS_PROVIDER_AWS_IAM_PRESIGNED_URL_TTL_SECONDS = \
    "LAKECTL_CREDENTIALS_PROVIDER_AWS_IAM_PRESIGNED_URL_TTL_SECONDS"
_LAKECTL_CREDENTIALS_PROVIDER_AWS_IAM_TOKEN_REQUEST_HEADERS = \
    "LAKECTL_CREDENTIALS_PROVIDER_AWS_IAM_TOKEN_REQUEST_HEADERS"

# Defaults
_DEFAULT_IAM_TOKEN_TTL_SECONDS = 3600
_DEFAULT_IAM_URL_PRESIGN_TTL_SECONDS = 60

TOKEN_TTL_SECONDS_CONFIG = "token_ttl_seconds"
URL_PRESIGN_TTL_SECONDS_CONFIG = "url_presign_ttl_seconds"
TOKEN_REQUEST_HEADERS_CONFIG = "token_request_headers"

AWS_IAM_PROVIDER_TYPE = "aws_iam"
SUPPORTED_IAM_PROVIDERS = [AWS_IAM_PROVIDER_TYPE]

[docs] class ClientConfig(Configuration): """ Configuration class for the SDK Client. Instantiation will try to get authentication methods using the following chain: 1. Provided kwargs to __init__ func (should contain necessary credentials as defined in lakefs_sdk.Configuration) 2. Use LAKECTL_SERVER_ENDPOINT_URL, LAKECTL_ACCESS_KEY_ID and LAKECTL_ACCESS_SECRET_KEY if set 3. Try to read ~/.lakectl.yaml if exists 4. Use IAM role from current machine (using AWS IAM role will work only with enterprise/cloud) This class also encapsulates the required lakectl configuration for authentication and used to unmarshall the lakectl yaml file. """
[docs] class Server(LenientNamedTuple): """ lakectl configuration's server block """ endpoint_url: str
[docs] class Credentials(LenientNamedTuple): """ lakectl configuration's credentials block """ access_key_id: str secret_access_key: str
[docs] class AuthType(Enum): """ Enum for the supported authentication types """ SESSION_TOKEN = 1 CREDENTIALS = 2 IAM = 3
[docs] class ProviderType(Enum): """ Enum for the supported authentication provider types """ AWS_IAM = "aws_iam" UNKNOWN = "unknown"
[docs] @dataclasses.dataclass class AWSIAMProviderConfig: """ lakectl configuration's credentials block """ token_ttl_seconds: int url_presign_ttl_seconds: int token_request_headers: Optional[Dict]
[docs] @dataclasses.dataclass class IAMProvider: """ An IAM authentication provider """ type: ClientConfig.ProviderType aws_iam: Optional[ClientConfig.AWSIAMProviderConfig]
server = Server(endpoint_url="") credentials = Credentials(access_key_id="", secret_access_key="") username = None password = None access_token = None _iam_provider = None def __init__(self, verify_ssl: Optional[bool] = None, proxy: Optional[str] = None, **kwargs): super().__init__(**kwargs) if verify_ssl is not None: self.verify_ssl = verify_ssl if proxy is not None: self.proxy = proxy if kwargs: return self._load_from_config_file() self._load_from_environment() if not self._has_valid_authentication(): raise NoAuthenticationFound def _load_from_config_file(self): """Load configuration from .lakectl.yaml file if it exists""" try: with open(_LAKECTL_YAML_PATH, encoding="utf-8") as fd: config_data = yaml.load(fd, Loader=yaml.Loader) if "server" in config_data: self.server = ClientConfig.Server(**config_data["server"]) if "credentials" in config_data: if ("access_key_id" in config_data["credentials"] and "secret_access_key" in config_data["credentials"]): self.credentials = ClientConfig.Credentials(**config_data["credentials"]) self.username = self.credentials.access_key_id self.password = self.credentials.secret_access_key if self.username is None or self.password is None: self._set_iam_provider_from_config_file(config_data) except FileNotFoundError: pass def _load_from_environment(self): """Load configuration from environment variables, which take precedence""" endpoint_env = os.getenv(_LAKECTL_ENDPOINT_URL_ENV) if endpoint_env is not None: self.host = endpoint_env elif hasattr(self, 'server') and self.server: self.host = self.server.endpoint_url # Session token takes precedence over basic credentials and IAM provider. If specified, set it and override # all others. Currently, session token setting is only available through environment variables. token_env = os.getenv(_LAKECTL_CREDENTIALS_SESSION_TOKEN) if token_env is not None: self.access_token = token_env self.username = None self.password = None self.credentials = None self._iam_provider = None return # Credentials take precedence over IAM provider. If specified, set it and override the IAM provider key_env = os.getenv(_LAKECTL_ACCESS_KEY_ID_ENV) secret_env = os.getenv(_LAKECTL_SECRET_ACCESS_KEY_ENV) if key_env is not None and secret_env is not None: self.username = key_env self.password = secret_env self.credentials = ClientConfig.Credentials( access_key_id=key_env, secret_access_key=secret_env ) self._iam_provider = None return # If no other method was specified, try to set IAM provider from env vars if self.username is None or self.password is None: self._set_iam_provider_from_env_vars() def _has_valid_authentication(self) -> bool: """Check if we have valid authentication credentials""" if self.access_token is not None: return True if (self.username is not None and len(self.username) > 0 and self.password is not None and len(self.password) > 0): return True if self._iam_provider is not None: return True return False
[docs] def get_auth_type(self) -> Optional[ClientConfig.AuthType]: """ Returns the type of authentication used: either SessionToken, Credentials, or IAMProvider ORDER MATTERS! SessionToken > Credentials > IAMProvider. self._iam_provider will be none if Session Token auth is used. self.access_token will be populated for both Session Token and IAMProvider auth, therefore it's tested after self._iam_provider. :return: ClientConfig.AuthType """ if self._iam_provider is not None: return ClientConfig.AuthType.IAM if self.access_token is not None: return ClientConfig.AuthType.SESSION_TOKEN if self.credentials is not None: return ClientConfig.AuthType.CREDENTIALS return None
@property def iam_provider(self) -> Optional[ClientConfig.IAMProvider]: """ Returns the IAM provider used for authentication. :return: ClientConfig.IAMProvider """ return self._iam_provider def _set_iam_provider_from_config_file(self, config_data: Dict): """ Set the IAM provider from the configuration file. """ provider_type = _get_provider_type_from_config_file(config_data) if provider_type is None: self._iam_provider = None elif provider_type not in SUPPORTED_IAM_PROVIDERS: raise UnsupportedCredentialsProviderType(provider_type) else: provider_config = _get_provider_config_from_config_data(config_data, provider_type) if provider_config is not None: if provider_type == AWS_IAM_PROVIDER_TYPE: aws_iam_provider_config = _generate_aws_iam_provider_config(provider_config) self._iam_provider = ClientConfig.IAMProvider( type=ClientConfig.ProviderType.AWS_IAM, aws_iam=aws_iam_provider_config ) def _set_iam_provider_from_env_vars(self): """ Set the IAM provider from environment variables. """ provider_type = _get_iam_provider_type_from_env_vars() if provider_type is None: return if provider_type == AWS_IAM_PROVIDER_TYPE: if self._iam_provider is None: self._iam_provider = ClientConfig.IAMProvider( type=ClientConfig.ProviderType.AWS_IAM, aws_iam=ClientConfig.AWSIAMProviderConfig( token_ttl_seconds=_DEFAULT_IAM_TOKEN_TTL_SECONDS, url_presign_ttl_seconds=_DEFAULT_IAM_URL_PRESIGN_TTL_SECONDS, token_request_headers=None ) ) env_token_ttl = os.getenv(_LAKECTL_CREDENTIALS_PROVIDER_AWS_IAM_TOKEN_TTL_SECONDS, self._iam_provider.aws_iam.token_ttl_seconds) self._iam_provider.aws_iam.token_ttl_seconds = int(env_token_ttl) env_presign_url_ttl = os.getenv(_LAKECTL_CREDENTIALS_PROVIDER_AWS_IAM_PRESIGNED_URL_TTL_SECONDS, self._iam_provider.aws_iam.url_presign_ttl_seconds) self._iam_provider.aws_iam.url_presign_ttl_seconds= int(env_presign_url_ttl) env_headers = os.getenv(_LAKECTL_CREDENTIALS_PROVIDER_AWS_IAM_TOKEN_REQUEST_HEADERS, None) if env_headers is not None: try: token_request_headers = json.loads(env_headers) self._iam_provider.aws_iam.token_request_headers = token_request_headers except JSONDecodeError as e: raise InvalidEnvVarFormat( f"Invalid format for {env_headers} environment variable. Expected JSON format." ) from e else: raise UnsupportedCredentialsProviderType(provider_type)
def _get_provider_type_from_config_file(data: Optional[Dict] = None) -> Optional[str]: """Extract provider type from environment or config data.""" if data is not None: try: return data['credentials']['provider']['type'] except KeyError: return None return None def _get_iam_provider_type_from_env_vars() -> Optional[str]: """ Get IAM provider configuration from environment variables. :return: IAMProvider if configured, None otherwise """ provider_type = os.getenv(_LAKECTL_CREDENTIALS_PROVIDER_TYPE) if provider_type is not None and provider_type not in SUPPORTED_IAM_PROVIDERS: raise UnsupportedCredentialsProviderType(provider_type) return provider_type def _safe_int_or_default(value: Optional[str], default: int) -> int: """ Safely convert a value to an int, returning a default if conversion fails. """ try: return int(value) except (ValueError, TypeError): return default def _get_provider_config_from_config_data(data: Optional[Dict], provider: str) -> Optional[Dict]: if data is not None: try: return data['credentials']['provider'][provider] except KeyError: return None return None def _generate_aws_iam_provider_config(aws_config: Dict) -> ClientConfig.AWSIAMProviderConfig: token_ttl_seconds = _safe_int_or_default(aws_config.get(TOKEN_TTL_SECONDS_CONFIG), _DEFAULT_IAM_TOKEN_TTL_SECONDS) url_presign_ttl_seconds = _safe_int_or_default( aws_config.get(URL_PRESIGN_TTL_SECONDS_CONFIG), _DEFAULT_IAM_URL_PRESIGN_TTL_SECONDS) token_request_headers = None if TOKEN_REQUEST_HEADERS_CONFIG in aws_config: token_request_headers = aws_config[TOKEN_REQUEST_HEADERS_CONFIG] return ClientConfig.AWSIAMProviderConfig( token_ttl_seconds=token_ttl_seconds, url_presign_ttl_seconds=url_presign_ttl_seconds, token_request_headers=token_request_headers )