182 lines
4.7 KiB
Python
182 lines
4.7 KiB
Python
import json
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from mock import MagicMock, patch
|
|
from structured_logger import (
|
|
AutoSaveDict,
|
|
CSVStrategy,
|
|
JSONStrategy,
|
|
StructuredLogger,
|
|
YAMLStrategy,
|
|
)
|
|
|
|
|
|
@pytest.fixture(params=[CSVStrategy, JSONStrategy, YAMLStrategy])
|
|
def strategy(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture
|
|
def file_extension(strategy):
|
|
if strategy == CSVStrategy:
|
|
return "csv"
|
|
elif strategy == JSONStrategy:
|
|
return "json"
|
|
elif strategy == YAMLStrategy:
|
|
return "yaml"
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_file(tmp_path):
|
|
return tmp_path / "test.json"
|
|
|
|
|
|
def test_guess_strategy_from_file(tmp_path, strategy, file_extension):
|
|
file_name = tmp_path / f"test_guess.{file_extension}"
|
|
Path(file_name).touch()
|
|
guessed_strategy = StructuredLogger.guess_strategy_from_file(file_name)
|
|
assert isinstance(guessed_strategy, strategy)
|
|
|
|
|
|
def test_get_strategy(strategy, file_extension):
|
|
result = StructuredLogger.get_strategy(file_extension)
|
|
assert isinstance(result, strategy)
|
|
|
|
|
|
def test_invalid_file_extension(tmp_path):
|
|
file_name = tmp_path / "test_invalid.xyz"
|
|
Path(file_name).touch()
|
|
|
|
with pytest.raises(ValueError, match="Unknown strategy for: xyz"):
|
|
StructuredLogger.guess_strategy_from_file(file_name)
|
|
|
|
|
|
def test_non_existent_file(tmp_path, strategy, file_extension):
|
|
file_name = tmp_path / f"non_existent.{file_extension}"
|
|
logger = StructuredLogger(file_name, strategy())
|
|
|
|
assert logger.file_path.exists()
|
|
assert "_timestamp" in logger._data
|
|
|
|
|
|
@pytest.fixture
|
|
def structured_logger_module():
|
|
with patch.dict("sys.modules", {"polars": None, "ruamel.yaml": None}):
|
|
import importlib
|
|
|
|
import structured_logger
|
|
|
|
importlib.reload(structured_logger)
|
|
yield structured_logger
|
|
|
|
|
|
def test_missing_csv_library(tmp_path, structured_logger_module):
|
|
with pytest.raises(RuntimeError, match="Can't parse CSV files. Missing library"):
|
|
structured_logger_module.CSVStrategy()
|
|
|
|
|
|
def test_missing_yaml_library(tmp_path, structured_logger_module):
|
|
with pytest.raises(RuntimeError, match="Can't parse YAML files. Missing library"):
|
|
structured_logger_module.YAMLStrategy()
|
|
|
|
|
|
def test_autosavedict_setitem():
|
|
save_callback = MagicMock()
|
|
d = AutoSaveDict(save_callback=save_callback)
|
|
d["key"] = "value"
|
|
assert d["key"] == "value"
|
|
save_callback.assert_called_once()
|
|
|
|
|
|
def test_autosavedict_delitem():
|
|
save_callback = MagicMock()
|
|
d = AutoSaveDict({"key": "value"}, save_callback=save_callback)
|
|
del d["key"]
|
|
assert "key" not in d
|
|
save_callback.assert_called_once()
|
|
|
|
|
|
def test_autosavedict_pop():
|
|
save_callback = MagicMock()
|
|
d = AutoSaveDict({"key": "value"}, save_callback=save_callback)
|
|
result = d.pop("key")
|
|
assert result == "value"
|
|
assert "key" not in d
|
|
save_callback.assert_called_once()
|
|
|
|
|
|
def test_autosavedict_update():
|
|
save_callback = MagicMock()
|
|
d = AutoSaveDict({"key": "old_value"}, save_callback=save_callback)
|
|
d.update({"key": "new_value"})
|
|
assert d["key"] == "new_value"
|
|
save_callback.assert_called_once()
|
|
|
|
|
|
def test_structured_logger_setitem(tmp_file):
|
|
logger = StructuredLogger(tmp_file, JSONStrategy())
|
|
logger.data["field"] = "value"
|
|
|
|
with open(tmp_file, "r") as f:
|
|
data = json.load(f)
|
|
|
|
assert data["field"] == "value"
|
|
|
|
|
|
def test_structured_logger_set_recursive(tmp_file):
|
|
logger = StructuredLogger(tmp_file, JSONStrategy())
|
|
logger.data["field"] = {"test": True}
|
|
other = logger.data["field"]
|
|
other["late"] = True
|
|
|
|
with open(tmp_file, "r") as f:
|
|
data = json.load(f)
|
|
|
|
assert data["field"]["test"]
|
|
assert data["field"]["late"]
|
|
|
|
|
|
def test_structured_logger_set_list(tmp_file):
|
|
logger = StructuredLogger(tmp_file, JSONStrategy())
|
|
logger.data["field"] = [True]
|
|
other = logger.data["field"]
|
|
other.append(True)
|
|
|
|
with open(tmp_file, "r") as f:
|
|
data = json.load(f)
|
|
|
|
assert data["field"][0]
|
|
assert data["field"][1]
|
|
|
|
|
|
def test_structured_logger_delitem(tmp_file):
|
|
logger = StructuredLogger(tmp_file, JSONStrategy())
|
|
logger.data["field"] = "value"
|
|
del logger.data["field"]
|
|
|
|
with open(tmp_file, "r") as f:
|
|
data = json.load(f)
|
|
|
|
assert "field" not in data
|
|
|
|
|
|
def test_structured_logger_pop(tmp_file):
|
|
logger = StructuredLogger(tmp_file, JSONStrategy())
|
|
logger.data["field"] = "value"
|
|
logger.data.pop("field")
|
|
|
|
with open(tmp_file, "r") as f:
|
|
data = json.load(f)
|
|
|
|
assert "field" not in data
|
|
|
|
|
|
def test_structured_logger_update(tmp_file):
|
|
logger = StructuredLogger(tmp_file, JSONStrategy())
|
|
logger.data.update({"field": "value"})
|
|
|
|
with open(tmp_file, "r") as f:
|
|
data = json.load(f)
|
|
|
|
assert data["field"] == "value"
|