feat: replace loguru with logging

This commit is contained in:
jo 2023-02-26 01:27:00 +01:00 committed by Kyle Robbertze
parent cced09f1ac
commit c6940db289
34 changed files with 138 additions and 245 deletions

View File

@ -4,7 +4,7 @@ from typing import Optional
import click
from libretime_shared.cli import cli_config_options, cli_logging_options
from libretime_shared.config import DEFAULT_ENV_PREFIX
from libretime_shared.logging import level_from_name, setup_logger
from libretime_shared.logging import setup_logger
from .config import Config
from .message_listener import MessageListener
@ -33,7 +33,7 @@ def cli(
"""
Run analyzer.
"""
setup_logger(level_from_name(log_level), log_filepath)
setup_logger(log_level, log_filepath)
config = Config(config_filepath)
# Start up the StatusReporter process

View File

@ -1,15 +1,17 @@
import json
import logging
import signal
import time
from queue import Queue
import pika
from loguru import logger
from .config import Config
from .pipeline import Pipeline, PipelineOptions, PipelineStatus
from .status_reporter import StatusReporter
logger = logging.getLogger(__name__)
EXCHANGE = "airtime-uploads"
EXCHANGE_TYPE = "topic"
ROUTING_KEY = ""

View File

@ -1,6 +1,7 @@
import logging
from subprocess import CalledProcessError, CompletedProcess, run
from loguru import logger
logger = logging.getLogger(__name__)
def run_(*args, **kwargs) -> CompletedProcess:

View File

@ -1,12 +1,13 @@
import logging
from datetime import timedelta
from math import isclose
from subprocess import CalledProcessError
from typing import Any, Dict
from loguru import logger
from ._ffmpeg import compute_silences, probe_duration
logger = logging.getLogger(__name__)
def analyze_duration(filepath: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""

View File

@ -1,10 +1,12 @@
import logging
from datetime import timedelta
from pathlib import Path
from typing import Any, Dict
import mutagen
from libretime_shared.files import compute_md5
from loguru import logger
logger = logging.getLogger(__name__)
def analyze_metadata(filepath_: str, metadata: Dict[str, Any]):

View File

@ -1,10 +1,11 @@
import logging
from subprocess import CalledProcessError
from typing import Any, Dict
from loguru import logger
from ._liquidsoap import _liquidsoap
logger = logging.getLogger(__name__)
class UnplayableFileError(Exception):
pass

View File

@ -1,8 +1,9 @@
import logging
import shutil
from pathlib import Path
from uuid import uuid4
from loguru import logger
logger = logging.getLogger(__name__)
MAX_DIR_LEN = 48
MAX_FILE_LEN = 48

View File

@ -1,8 +1,8 @@
import logging
from enum import Enum
from queue import Queue
from typing import Any, Dict, Protocol
from loguru import logger
from pydantic import BaseModel
from .analyze_cuepoint import analyze_cuepoint, analyze_duration
@ -11,6 +11,8 @@ from .analyze_playability import UnplayableFileError, analyze_playability
from .analyze_replaygain import analyze_replaygain
from .organise_file import organise_file
logger = logging.getLogger(__name__)
class Step(Protocol):
@staticmethod

View File

@ -1,5 +1,6 @@
import collections
import json
import logging
import pickle
import queue
import threading
@ -7,9 +8,10 @@ import time
from urllib.parse import urlparse
import requests
from loguru import logger
from requests.exceptions import HTTPError
logger = logging.getLogger(__name__)
class PicklableHttpRequest:
def __init__(self, method, url, api_key, data):

View File

@ -2,11 +2,11 @@ import shutil
from pathlib import Path
import pytest
from libretime_shared.logging import TRACE, setup_logger
from libretime_shared.logging import setup_logger
from .fixtures import fixtures_path
setup_logger(TRACE)
setup_logger("debug")
AUDIO_FILENAME = "s1-stereo-tagged.mp3"
AUDIO_FILE = fixtures_path / AUDIO_FILENAME

View File

@ -1,11 +1,13 @@
import logging
from typing import Optional
from loguru import logger
from requests import Response, Session as BaseSession
from requests.adapters import HTTPAdapter
from requests.exceptions import RequestException
from urllib3.util import Retry
logger = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 5

View File

@ -8,6 +8,8 @@ from libretime_shared.config import BaseConfig, GeneralConfig
from ._utils import ApiRequest, RequestProvider
logger = logging.getLogger(__name__)
class Config(BaseConfig):
general: GeneralConfig
@ -72,9 +74,7 @@ class ApiClient:
UPLOAD_RETRIES = 3
UPLOAD_WAIT = 60
def __init__(self, logger=None, config_path="/etc/libretime/config.yml"):
self.logger = logger or logging
def __init__(self, config_path="/etc/libretime/config.yml"):
config = Config(config_path)
self.base_url = config.general.public_url
self.api_key = config.general.api_key
@ -89,27 +89,27 @@ class ApiClient:
try:
return self.services.version_url()["api_version"]
except Exception as exception:
self.logger.exception(exception)
logger.exception(exception)
return -1
def is_server_compatible(self, verbose=True):
api_version = self.__get_api_version()
if api_version == -1:
if verbose:
self.logger.info("Unable to get Airtime API version number.\n")
logger.info("Unable to get Airtime API version number.\n")
return False
if api_version[0:3] != AIRTIME_API_VERSION[0:3]:
if verbose:
self.logger.info("Airtime API version found: " + str(api_version))
self.logger.info(
logger.info("Airtime API version found: " + str(api_version))
logger.info(
"pypo is only compatible with API version: " + AIRTIME_API_VERSION
)
return False
if verbose:
self.logger.info("Airtime API version found: " + str(api_version))
self.logger.info(
logger.info("Airtime API version found: " + str(api_version))
logger.info(
"pypo is only compatible with API version: " + AIRTIME_API_VERSION
)
return True
@ -118,7 +118,7 @@ class ApiClient:
try:
self.services.notify_liquidsoap_started()
except Exception as exception:
self.logger.exception(exception)
logger.exception(exception)
def notify_media_item_start_playing(self, media_id):
"""
@ -129,14 +129,14 @@ class ApiClient:
try:
return self.services.update_start_playing_url(media_id=media_id)
except Exception as exception:
self.logger.exception(exception)
logger.exception(exception)
return None
def get_shows_to_record(self):
try:
return self.services.show_schedule_url()
except Exception as exception:
self.logger.exception(exception)
logger.exception(exception)
return None
def upload_recorded_show(self, files, show_id):
@ -147,19 +147,19 @@ class ApiClient:
url = self.construct_rest_url("upload_file_url")
self.logger.debug(url)
logger.debug(url)
for i in range(0, retries):
self.logger.debug("Upload attempt: %s", i + 1)
self.logger.debug(files)
self.logger.debug(ApiRequest.API_HTTP_REQUEST_TIMEOUT)
logger.debug("Upload attempt: %s", i + 1)
logger.debug(files)
logger.debug(ApiRequest.API_HTTP_REQUEST_TIMEOUT)
try:
request = requests.post(
url, files=files, timeout=float(ApiRequest.API_HTTP_REQUEST_TIMEOUT)
)
response = request.json()
self.logger.debug(response)
logger.debug(response)
# FIXME: We need to tell LibreTime that the uploaded track was recorded
# for a specific show
@ -183,14 +183,14 @@ class ApiClient:
break
except requests.exceptions.HTTPError as exception:
self.logger.error(f"Http error code: {exception.response.status_code}")
self.logger.exception(exception)
logger.error(f"Http error code: {exception.response.status_code}")
logger.exception(exception)
except requests.exceptions.ConnectionError as exception:
self.logger.exception(f"Server is down: {exception}")
logger.exception(f"Server is down: {exception}")
except Exception as exception:
self.logger.exception(exception)
logger.exception(exception)
# wait some time before next retry
time.sleep(retries_wait)
@ -203,7 +203,7 @@ class ApiClient:
username=username, password=password, djtype=dj_type
)
except Exception as exception:
self.logger.exception(exception)
logger.exception(exception)
return {}
def construct_rest_url(self, action_key):
@ -239,7 +239,7 @@ class ApiClient:
boot_time=time,
).retry(5)
except Exception as exception:
self.logger.exception(exception)
logger.exception(exception)
def notify_source_status(self, sourcename, status):
try:
@ -247,7 +247,7 @@ class ApiClient:
sourcename=sourcename, status=status
).retry(5)
except Exception as exception:
self.logger.exception(exception)
logger.exception(exception)
def get_bootstrap_info(self):
"""
@ -260,7 +260,7 @@ class ApiClient:
Update the server with the latest metadata we've received from the
external webstream
"""
self.logger.info(
logger.info(
self.services.notify_webstream_data.req(
_post_data={"data": data}, media_id=str(media_id)
).retry(5)
@ -268,7 +268,7 @@ class ApiClient:
def get_stream_parameters(self):
response = self.services.get_stream_parameters()
self.logger.debug(response)
logger.debug(response)
return response
def push_stream_stats(self, data):
@ -285,7 +285,7 @@ class ApiClient:
)
return response
except Exception as exception:
self.logger.exception(exception)
logger.exception(exception)
def update_metadata_on_tunein(self):
self.services.update_metadata_on_tunein()

View File

@ -1,3 +1,4 @@
import logging
from dataclasses import dataclass
from datetime import datetime
from threading import Thread
@ -6,7 +7,6 @@ from typing import Any, Dict, List, Optional, Union
from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_shared.config import IcecastOutput, ShoutcastOutput
from loguru import logger
from lxml import etree
from requests import Session
from requests.exceptions import ( # pylint: disable=redefined-builtin
@ -17,6 +17,8 @@ from requests.exceptions import ( # pylint: disable=redefined-builtin
from ..config import Config
logger = logging.getLogger(__name__)
AnyOutput = Union[IcecastOutput, ShoutcastOutput]

View File

@ -1,15 +1,16 @@
import logging
from pathlib import Path
from subprocess import CalledProcessError, check_output, run
from time import sleep
from typing import Any, Literal, Optional, Tuple
from loguru import logger
from ..models import MessageFormatKind
from ..utils import quote
from ..version import parse_liquidsoap_version
from ._connection import LiquidsoapConnection
logger = logging.getLogger(__name__)
class LiquidsoapClientError(Exception):
"""

View File

@ -1,9 +1,10 @@
import logging
from pathlib import Path
from socket import AF_UNIX, SOCK_STREAM, create_connection, socket
from threading import Lock
from typing import Optional
from loguru import logger
logger = logging.getLogger(__name__)
class InvalidConnection(Exception):
@ -61,7 +62,7 @@ class LiquidsoapConnection:
self.close()
def connect(self):
logger.trace("trying to acquire lock")
logger.debug("trying to acquire lock")
# pylint: disable=consider-using-with
self._lock.acquire()
logger.debug(f"connecting to {self.address()}")

View File

@ -1,3 +1,4 @@
import logging
import os
from pathlib import Path
from typing import Optional
@ -6,14 +7,15 @@ import click
from libretime_api_client.v2 import ApiClient
from libretime_shared.cli import cli_config_options, cli_logging_options
from libretime_shared.config import DEFAULT_ENV_PREFIX
from libretime_shared.logging import level_from_name, setup_logger
from loguru import logger
from libretime_shared.logging import setup_logger
from ..config import Config
from .entrypoint import generate_entrypoint
from .models import Info, StreamPreferences
from .version import get_liquidsoap_version
logger = logging.getLogger(__name__)
here = Path(__file__).parent
@ -24,7 +26,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
"""
Run liquidsoap.
"""
logger_level, _ = setup_logger(level_from_name(log_level), log_filepath)
setup_logger(log_level, log_filepath)
config = Config(config_filepath)
api_client = ApiClient(
@ -53,7 +55,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
"--verbose",
str(entrypoint_filepath),
]
if logger_level.is_debug():
if log_level == "debug":
exec_args.append("--debug")
logger.debug(f"liquidsoap {version} using script: {entrypoint_filepath}")

View File

@ -2,6 +2,7 @@
Python part of radio playout (pypo)
"""
import logging
import sys
import time
from datetime import datetime
@ -14,8 +15,7 @@ from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_api_client.v2 import ApiClient
from libretime_shared.cli import cli_config_options, cli_logging_options
from libretime_shared.config import DEFAULT_ENV_PREFIX
from libretime_shared.logging import level_from_name, setup_logger
from loguru import logger
from libretime_shared.logging import setup_logger
from .config import CACHE_DIR, RECORD_DIR, Config
from .history.stats import StatsCollectorThread
@ -28,6 +28,8 @@ from .player.liquidsoap import PypoLiquidsoap
from .player.push import PypoPush
from .recorder import Recorder
logger = logging.getLogger(__name__)
@click.command(context_settings={"auto_envvar_prefix": DEFAULT_ENV_PREFIX})
@cli_logging_options()
@ -36,7 +38,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
"""
Run playout.
"""
setup_logger(level_from_name(log_level), log_filepath)
setup_logger(log_level, log_filepath)
config = Config(config_filepath)
try:

View File

@ -1,4 +1,5 @@
import json
import logging
from queue import Queue as ThreadQueue
from signal import SIGTERM, signal
from time import sleep
@ -8,10 +9,11 @@ from typing import Any, Dict
from kombu.connection import Connection
from kombu.messaging import Exchange, Queue
from kombu.mixins import ConsumerMixin
from loguru import logger
from .config import Config
logger = logging.getLogger(__name__)
class MessageHandler(ConsumerMixin):
def __init__(

View File

@ -12,6 +12,7 @@ Main case:
media id from it, and then calls back to the API to tell about it about it.
"""
import logging
from pathlib import Path
from typing import Optional
@ -19,12 +20,13 @@ import click
from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_shared.cli import cli_logging_options
from libretime_shared.config import DEFAULT_ENV_PREFIX
from libretime_shared.logging import level_from_name, setup_logger
from loguru import logger
from libretime_shared.logging import setup_logger
logger = logging.getLogger(__name__)
def api_client():
return LegacyClient(logger=logger)
return LegacyClient()
@click.group(context_settings={"auto_envvar_prefix": DEFAULT_ENV_PREFIX})
@ -33,7 +35,7 @@ def cli(log_level: str, log_filepath: Optional[Path]):
"""
A gateway between Liquidsoap and the API.
"""
setup_logger(level_from_name(log_level), log_filepath, rotate=False)
setup_logger(log_level, log_filepath, rotate=False)
@cli.command()

View File

@ -1,5 +1,6 @@
import copy
import json
import logging
import mimetypes
import os
import time
@ -11,7 +12,6 @@ from typing import Any, Dict
from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_api_client.v2 import ApiClient
from loguru import logger
from requests import RequestException
from ..config import CACHE_DIR, POLL_INTERVAL, Config
@ -21,6 +21,8 @@ from ..timeout import ls_timeout
from .liquidsoap import PypoLiquidsoap
from .schedule import get_schedule
logger = logging.getLogger(__name__)
class PypoFetch(Thread):
name = "fetch"

View File

@ -1,4 +1,5 @@
import hashlib
import logging
import os
import stat
import time
@ -7,9 +8,10 @@ from threading import Thread
from typing import Any, Dict
from libretime_api_client.v2 import ApiClient
from loguru import logger
from requests.exceptions import ConnectionError, HTTPError, Timeout
logger = logging.getLogger(__name__)
class PypoFile(Thread):
name = "file"

View File

@ -1,13 +1,14 @@
import logging
import time
from datetime import datetime, timedelta
from loguru import logger
from ..liquidsoap.client import LiquidsoapClient
from ..utils import seconds_between
from .events import EventKind
from .liquidsoap_gateway import TelnetLiquidsoap
logger = logging.getLogger(__name__)
class PypoLiquidsoap:
def __init__(self, liq_client: LiquidsoapClient):

View File

@ -1,10 +1,11 @@
import logging
from typing import List
from loguru import logger
from ..liquidsoap.client import LiquidsoapClient
from ..timeout import ls_timeout
logger = logging.getLogger(__name__)
def create_liquidsoap_annotation(media):
# We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade.

View File

@ -1,3 +1,4 @@
import logging
import math
import time
from datetime import datetime
@ -5,12 +6,12 @@ from queue import Queue
from threading import Thread
from typing import Any, Dict
from loguru import logger
from ..config import PUSH_INTERVAL, Config
from .liquidsoap import PypoLiquidsoap
from .queue import PypoLiqQueue
logger = logging.getLogger(__name__)
def is_stream(media_item):
return media_item["type"] == "stream_output_start"

View File

@ -1,14 +1,15 @@
import logging
from collections import deque
from datetime import datetime
from queue import Empty, Queue
from threading import Thread
from typing import Any, Dict
from loguru import logger
from ..utils import seconds_between
from .liquidsoap import PypoLiquidsoap
logger = logging.getLogger(__name__)
class PypoLiqQueue(Thread):
name = "liquidsoap_queue"

View File

@ -1,5 +1,6 @@
import datetime
import json
import logging
import math
import os
import re
@ -12,7 +13,6 @@ from threading import Thread
import mutagen
from libretime_api_client.v1 import ApiClient as LegacyClient
from loguru import logger
from libretime_playout.config import PUSH_INTERVAL, RECORD_DIR, Config
@ -21,6 +21,7 @@ if sys.version_info < (3, 9):
else:
from zoneinfo import ZoneInfo
logger = logging.getLogger(__name__)
# TODO : add docstrings everywhere in this module

View File

@ -1,3 +1,4 @@
import logging
from pathlib import Path
from random import randint
from subprocess import PIPE, STDOUT, Popen
@ -5,13 +6,14 @@ from textwrap import dedent
from time import sleep
import pytest
from libretime_shared.logging import TRACE, setup_logger
from loguru import logger
from libretime_shared.logging import setup_logger
from libretime_playout.liquidsoap.client import LiquidsoapConnection
from libretime_playout.liquidsoap.version import get_liquidsoap_version
setup_logger(TRACE)
logger = logging.getLogger(__name__)
setup_logger("debug")
LIQ_VERSION = get_liquidsoap_version()
LIQ_VERSION_STR = ".".join(map(str, LIQ_VERSION))

View File

@ -7,7 +7,6 @@ The `libretime_shared` package contains reusable functions and classes for the L
This library assumes that:
- You will use [`Click`](https://github.com/pallets/click) to build a CLI for your app.
- You will use [`Loguru`](https://github.com/delgan/loguru) to log messages from your app.
- You will use [`Pydantic`](https://github.com/samuelcolvin/pydantic/) to validate objects in your app.
### Configuration

View File

@ -3,8 +3,6 @@ from typing import Any, Callable, Optional
import click
from .logging import INFO, LOG_LEVEL_MAP
def cli_logging_options() -> Callable:
def decorator(func: Callable) -> Callable:
@ -18,8 +16,8 @@ def cli_logging_options() -> Callable:
func = click.option(
"--log-level",
"log_level",
type=click.Choice(list(LOG_LEVEL_MAP.keys())),
default=INFO.name,
type=click.Choice(["error", "warning", "info", "debug"]),
default="info",
help="Name of the logging level.",
)(func)

View File

@ -1,14 +1,16 @@
import logging
import sys
from itertools import zip_longest
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from loguru import logger
from pydantic import BaseModel, ValidationError
from yaml import YAMLError, safe_load
from ._env import EnvLoader
logger = logging.getLogger(__name__)
DEFAULT_ENV_PREFIX = "LIBRETIME"
DEFAULT_CONFIG_FILEPATH = Path("/etc/libretime/config.yml")

View File

@ -1,127 +1,38 @@
import sys
from copy import deepcopy
import logging
from logging.handlers import TimedRotatingFileHandler
from pathlib import Path
from typing import TYPE_CHECKING, NamedTuple, Optional, Tuple
from typing import List, Optional, Tuple
from loguru import logger
if TYPE_CHECKING:
from loguru import Logger
logger.remove()
class LogLevel(NamedTuple):
name: str
no: int
def is_debug(self) -> bool:
return self.no <= 10
# See https://loguru.readthedocs.io/en/stable/api/logger.html#levels
ERROR = LogLevel(name="error", no=40)
WARNING = LogLevel(name="warning", no=30)
INFO = LogLevel(name="info", no=20)
DEBUG = LogLevel(name="debug", no=10)
TRACE = LogLevel(name="trace", no=5)
LOG_LEVEL_MAP = {
ERROR.name: ERROR,
WARNING.name: WARNING,
INFO.name: INFO,
DEBUG.name: DEBUG,
TRACE.name: TRACE,
}
def level_from_name(name: str) -> LogLevel:
"""
Find logging level, depending on the name provided.
:param name: name (one of "error", "warning", "info", "debug", "trace") of the log level
:returns: log level guessed from the name
:raises ValueError: on invalid level name
"""
name = name.lower()
if name not in LOG_LEVEL_MAP:
raise ValueError(f"invalid level name '{name}'")
return LOG_LEVEL_MAP[name]
logger = logging.getLogger(__name__)
def setup_logger(
level: LogLevel,
level: str,
filepath: Optional[Path] = None,
serialize: bool = False,
serialize: bool = False, # pylint: disable=unused-argument
rotate: bool = True,
) -> Tuple[LogLevel, Optional[Path]]:
) -> Tuple[str, Optional[Path]]:
"""
Configure the logger and return the computed log level.
See https://loguru.readthedocs.io/en/stable/overview.html
:param verbosity: verbosity (between -1 and 3) of the logger
:param filepath: write logs to filepath
:param serialize: generate JSON formatted log records
:param rotate: enable log rotation and retention
:returns: log level guessed from the verbosity
Configure the logger and return the log level and log filepath.
"""
handlers = [{"sink": sys.stderr, "level": level.no, "serialize": serialize}]
level = level.upper()
root = logging.getLogger()
root.setLevel(level)
formatter = logging.Formatter(
"%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)s - %(message)s"
)
handlers: List[logging.Handler] = [logging.StreamHandler()]
if filepath is not None:
file_handler = {
"sink": filepath,
"enqueue": True,
"level": level.no,
"serialize": serialize,
"encoding": "utf-8",
}
if rotate:
file_handler.update(
{
"rotation": "12:00",
"retention": "7 days",
"compression": "gz",
}
)
handlers.append(TimedRotatingFileHandler(filepath, when="midnight"))
else:
handlers.append(logging.FileHandler(filepath))
handlers.append(file_handler)
logger.configure(handlers=handlers)
for handler in handlers:
handler.setFormatter(formatter)
root.addHandler(handler)
return level, filepath
_empty_logger = deepcopy(logger)
def create_task_logger(
level: LogLevel,
filepath: Path,
serialize: bool = False,
) -> "Logger":
"""
Create and configure an independent logger for a task, return the new logger.
See #creating-independent-loggers-with-separate-set-of-handlers in
https://loguru.readthedocs.io/en/stable/resources/recipes.html
:returns: new logger
"""
task_logger = deepcopy(_empty_logger)
task_logger.configure(
handlers=[
{
"sink": filepath,
"enqueue": True,
"level": level.no,
"serialize": serialize,
"rotation": "12:00",
"retention": "7 days",
"encoding": "utf-8",
}
],
)
return task_logger

View File

@ -2,6 +2,5 @@
# This file is auto-generated by tools/extract_requirements.py.
backports.zoneinfo>=0.2.1,<0.3;python_version<'3.9'
click>=8.0.4,<8.2
loguru==0.6.0
pydantic>=1.7.4,<1.11
pyyaml>=5.3.1,<6.1

View File

@ -12,7 +12,6 @@ setup(
install_requires=[
"backports.zoneinfo>=0.2.1,<0.3;python_version<'3.9'",
"click>=8.0.4,<8.2",
"loguru==0.6.0",
"pydantic>=1.7.4,<1.11",
"pyyaml>=5.3.1,<6.1",
],

View File

@ -1,52 +0,0 @@
from pathlib import Path
import pytest
from loguru import logger
from libretime_shared.logging import (
DEBUG,
INFO,
create_task_logger,
level_from_name,
setup_logger,
)
@pytest.mark.parametrize(
"name,level_name,level_no",
[
("error", "error", 40),
("warning", "warning", 30),
("info", "info", 20),
("debug", "debug", 10),
("trace", "trace", 5),
],
)
def test_level_from_name(name, level_name, level_no):
level = level_from_name(name)
assert level.name == level_name
assert level.no == level_no
def test_level_from_name_invalid():
with pytest.raises(ValueError):
level_from_name("invalid")
def test_setup_logger(tmp_path: Path):
log_filepath = tmp_path / "test.log"
extra_log_filepath = tmp_path / "extra.log"
setup_logger(INFO, log_filepath)
extra_logger = create_task_logger(DEBUG, extra_log_filepath, True)
logger.info("test info")
extra_logger.info("extra info")
logger.debug("test debug")
extra_logger.complete()
logger.complete()
assert len(log_filepath.read_text(encoding="utf-8").splitlines()) == 1
assert len(extra_log_filepath.read_text(encoding="utf-8").splitlines()) == 1