feat(shared): load env config using jsonschema

The env loader is now capable of loading lists of objects, union types
or list of union types from the env variables.
They are some limitations: for example it doesn't support unions of
different shapes `list | dict` or `str | dict`.
This commit is contained in:
jo 2022-07-30 21:15:39 +02:00 committed by Kyle Robbertze
parent 6c449e3019
commit bcd877266f
3 changed files with 500 additions and 47 deletions

View file

@ -1,16 +1,14 @@
import sys
from os import environ
from itertools import zip_longest
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from loguru import logger
# pylint: disable=no-name-in-module
from pydantic import BaseModel, ValidationError
from pydantic.fields import ModelField
from pydantic.utils import deep_update
from yaml import YAMLError, safe_load
from ._env import EnvLoader
DEFAULT_ENV_PREFIX = "LIBRETIME"
DEFAULT_CONFIG_FILEPATH = Path("/etc/libretime/config.yml")
@ -36,54 +34,19 @@ class BaseConfig(BaseModel):
if filepath is not None:
filepath = Path(filepath)
file_values = self._load_file_values(filepath)
env_values = self._load_env_values(env_prefix, env_delimiter)
env_loader = EnvLoader(self.schema(), env_prefix, env_delimiter)
values = deep_merge_dict(
self._load_file_values(filepath),
env_loader.load(),
)
try:
super().__init__(**deep_update(file_values, env_values))
super().__init__(**values)
except ValidationError as error:
logger.critical(error)
sys.exit(1)
def _load_env_values(self, env_prefix: str, env_delimiter: str) -> Dict[str, Any]:
return self._get_fields_from_env(env_prefix, env_delimiter, self.__fields__)
def _get_fields_from_env(
self,
env_prefix: str,
env_delimiter: str,
fields: Dict[str, ModelField],
) -> Dict[str, Any]:
result: Dict[str, Any] = {}
if env_prefix != "":
env_prefix += env_delimiter
for field in fields.values():
env_name = (env_prefix + field.name).upper()
if field.is_complex():
children: Union[List[Any], Dict[str, Any]] = []
if field.sub_fields:
if env_name in environ:
children = [v.strip() for v in environ[env_name].split(",")]
else:
children = self._get_fields_from_env(
env_name,
env_delimiter,
field.type_.__fields__,
)
if len(children) != 0:
result[field.name] = children
else:
if env_name in environ:
result[field.name] = environ[env_name]
return result
def _load_file_values(
self,
filepath: Optional[Path] = None,
@ -102,3 +65,38 @@ class BaseConfig(BaseModel):
logger.error(f"config file '{filepath}' is not a valid yaml file: {error}")
return {}
def deep_merge_dict(base: Dict[str, Any], next_: Dict[str, Any]) -> Dict[str, Any]:
result = base.copy()
for key, value in next_.items():
if key in result:
if isinstance(result[key], dict) and isinstance(value, dict):
result[key] = deep_merge_dict(result[key], value)
continue
if isinstance(result[key], list) and isinstance(value, list):
result[key] = deep_merge_list(result[key], value)
continue
if value:
result[key] = value
return result
def deep_merge_list(base: List[Any], next_: List[Any]) -> List[Any]:
result: List[Any] = []
for base_item, next_item in zip_longest(base, next_):
if isinstance(base_item, list) and isinstance(next_item, list):
result.append(deep_merge_list(base_item, next_item))
continue
if isinstance(base_item, dict) and isinstance(next_item, dict):
result.append(deep_merge_dict(base_item, next_item))
continue
if next_item:
result.append(next_item)
return result