sync with OpenBSD -current
This commit is contained in:
parent
5d45cd7ee8
commit
155eb8555e
5506 changed files with 1786257 additions and 1416034 deletions
294
lib/mesa/bin/ci/structured_logger.py
Normal file
294
lib/mesa/bin/ci/structured_logger.py
Normal file
|
@ -0,0 +1,294 @@
|
|||
"""
|
||||
A structured logging utility supporting multiple data formats such as CSV, JSON,
|
||||
and YAML.
|
||||
|
||||
The main purpose of this script, besides having relevant information available
|
||||
in a condensed and deserialized.
|
||||
|
||||
This script defines a protocol for different file handling strategies and provides
|
||||
implementations for CSV, JSON, and YAML formats. The main class, StructuredLogger,
|
||||
allows for easy interaction with log data, enabling users to load, save, increment,
|
||||
set, and append fields in the log. The script also includes context managers for
|
||||
file locking and editing log data to ensure data integrity and avoid race conditions.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from collections.abc import MutableMapping, MutableSequence
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Protocol
|
||||
|
||||
import fire
|
||||
from filelock import FileLock
|
||||
|
||||
try:
|
||||
import polars as pl
|
||||
|
||||
CSV_LIB_EXCEPTION = None
|
||||
except ImportError as e:
|
||||
CSV_LIB_EXCEPTION: ImportError = e
|
||||
|
||||
try:
|
||||
from ruamel.yaml import YAML
|
||||
|
||||
YAML_LIB_EXCEPTION = None
|
||||
except ImportError as e:
|
||||
YAML_LIB_EXCEPTION: ImportError = e
|
||||
|
||||
|
||||
class ContainerProxy:
|
||||
"""
|
||||
A proxy class that wraps a mutable container object (such as a dictionary or
|
||||
a list) and calls a provided save_callback function whenever the container
|
||||
or its contents
|
||||
are changed.
|
||||
"""
|
||||
def __init__(self, container, save_callback):
|
||||
self.container = container
|
||||
self.save_callback = save_callback
|
||||
|
||||
def __getitem__(self, key):
|
||||
value = self.container[key]
|
||||
if isinstance(value, (MutableMapping, MutableSequence)):
|
||||
return ContainerProxy(value, self.save_callback)
|
||||
return value
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.container[key] = value
|
||||
self.save_callback()
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self.container[key]
|
||||
self.save_callback()
|
||||
|
||||
def __getattr__(self, name):
|
||||
attr = getattr(self.container, name)
|
||||
|
||||
if callable(attr):
|
||||
def wrapper(*args, **kwargs):
|
||||
result = attr(*args, **kwargs)
|
||||
self.save_callback()
|
||||
return result
|
||||
|
||||
return wrapper
|
||||
return attr
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.container)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.container)
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.container)
|
||||
|
||||
|
||||
class AutoSaveDict(dict):
|
||||
"""
|
||||
A subclass of the built-in dict class with additional functionality to
|
||||
automatically save changes to the dictionary. It maintains a timestamp of
|
||||
the last modification and automatically wraps nested mutable containers
|
||||
using ContainerProxy.
|
||||
"""
|
||||
timestamp_key = "_timestamp"
|
||||
|
||||
def __init__(self, *args, save_callback, register_timestamp=True, **kwargs):
|
||||
self.save_callback = save_callback
|
||||
self.__register_timestamp = register_timestamp
|
||||
self.__heartbeat()
|
||||
super().__init__(*args, **kwargs)
|
||||
self.__wrap_dictionaries()
|
||||
|
||||
def __heartbeat(self):
|
||||
if self.__register_timestamp:
|
||||
self[AutoSaveDict.timestamp_key] = datetime.now().isoformat()
|
||||
|
||||
def __save(self):
|
||||
self.__heartbeat()
|
||||
self.save_callback()
|
||||
|
||||
def __wrap_dictionaries(self):
|
||||
for key, value in self.items():
|
||||
if isinstance(value, MutableMapping) and not isinstance(
|
||||
value, AutoSaveDict
|
||||
):
|
||||
self[key] = AutoSaveDict(
|
||||
value, save_callback=self.save_callback, register_timestamp=False
|
||||
)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if isinstance(value, MutableMapping) and not isinstance(value, AutoSaveDict):
|
||||
value = AutoSaveDict(
|
||||
value, save_callback=self.save_callback, register_timestamp=False
|
||||
)
|
||||
super().__setitem__(key, value)
|
||||
|
||||
if self.__register_timestamp and key == AutoSaveDict.timestamp_key:
|
||||
return
|
||||
self.__save()
|
||||
|
||||
def __getitem__(self, key):
|
||||
value = super().__getitem__(key)
|
||||
if isinstance(value, (MutableMapping, MutableSequence)):
|
||||
return ContainerProxy(value, self.__save)
|
||||
return value
|
||||
|
||||
def __delitem__(self, key):
|
||||
super().__delitem__(key)
|
||||
self.__save()
|
||||
|
||||
def pop(self, *args, **kwargs):
|
||||
result = super().pop(*args, **kwargs)
|
||||
self.__save()
|
||||
return result
|
||||
|
||||
def update(self, *args, **kwargs):
|
||||
super().update(*args, **kwargs)
|
||||
self.__wrap_dictionaries()
|
||||
self.__save()
|
||||
|
||||
|
||||
class StructuredLoggerStrategy(Protocol):
|
||||
def load_data(self, file_path: Path) -> dict:
|
||||
pass
|
||||
|
||||
def save_data(self, file_path: Path, data: dict) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class CSVStrategy:
|
||||
def __init__(self) -> None:
|
||||
if CSV_LIB_EXCEPTION:
|
||||
raise RuntimeError(
|
||||
"Can't parse CSV files. Missing library"
|
||||
) from CSV_LIB_EXCEPTION
|
||||
|
||||
def load_data(self, file_path: Path) -> dict:
|
||||
dicts: list[dict[str, Any]] = pl.read_csv(
|
||||
file_path, try_parse_dates=True
|
||||
).to_dicts()
|
||||
data = {}
|
||||
for d in dicts:
|
||||
for k, v in d.items():
|
||||
if k != AutoSaveDict.timestamp_key and k in data:
|
||||
if isinstance(data[k], list):
|
||||
data[k].append(v)
|
||||
continue
|
||||
data[k] = [data[k], v]
|
||||
else:
|
||||
data[k] = v
|
||||
return data
|
||||
|
||||
def save_data(self, file_path: Path, data: dict) -> None:
|
||||
pl.DataFrame(data).write_csv(file_path)
|
||||
|
||||
|
||||
class JSONStrategy:
|
||||
def load_data(self, file_path: Path) -> dict:
|
||||
return json.loads(file_path.read_text())
|
||||
|
||||
def save_data(self, file_path: Path, data: dict) -> None:
|
||||
with open(file_path, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
|
||||
class YAMLStrategy:
|
||||
def __init__(self):
|
||||
if YAML_LIB_EXCEPTION:
|
||||
raise RuntimeError(
|
||||
"Can't parse YAML files. Missing library"
|
||||
) from YAML_LIB_EXCEPTION
|
||||
self.yaml = YAML()
|
||||
self.yaml.indent(sequence=4, offset=2)
|
||||
self.yaml.default_flow_style = False
|
||||
self.yaml.representer.add_representer(AutoSaveDict, self.represent_dict)
|
||||
|
||||
@classmethod
|
||||
def represent_dict(cls, dumper, data):
|
||||
return dumper.represent_mapping("tag:yaml.org,2002:map", data)
|
||||
|
||||
def load_data(self, file_path: Path) -> dict:
|
||||
return self.yaml.load(file_path.read_text())
|
||||
|
||||
def save_data(self, file_path: Path, data: dict) -> None:
|
||||
with open(file_path, "w") as f:
|
||||
self.yaml.dump(data, f)
|
||||
|
||||
|
||||
class StructuredLogger:
|
||||
def __init__(
|
||||
self, file_name: str, strategy: StructuredLoggerStrategy = None, truncate=False
|
||||
):
|
||||
self.file_name: str = file_name
|
||||
self.file_path = Path(self.file_name)
|
||||
self._data: AutoSaveDict = AutoSaveDict(save_callback=self.save_data)
|
||||
|
||||
if strategy is None:
|
||||
self.strategy: StructuredLoggerStrategy = self.guess_strategy_from_file(
|
||||
self.file_path
|
||||
)
|
||||
else:
|
||||
self.strategy = strategy
|
||||
|
||||
if not self.file_path.exists():
|
||||
Path.mkdir(self.file_path.parent, exist_ok=True)
|
||||
self.save_data()
|
||||
return
|
||||
|
||||
if truncate:
|
||||
with self.get_lock():
|
||||
os.truncate(self.file_path, 0)
|
||||
self.save_data()
|
||||
|
||||
def load_data(self):
|
||||
self._data = self.strategy.load_data(self.file_path)
|
||||
|
||||
def save_data(self):
|
||||
self.strategy.save_data(self.file_path, self._data)
|
||||
|
||||
@property
|
||||
def data(self) -> AutoSaveDict:
|
||||
return self._data
|
||||
|
||||
@contextmanager
|
||||
def get_lock(self):
|
||||
with FileLock(f"{self.file_path}.lock", timeout=10):
|
||||
yield
|
||||
|
||||
@contextmanager
|
||||
def edit_context(self):
|
||||
"""
|
||||
Context manager that ensures proper loading and saving of log data when
|
||||
performing multiple modifications.
|
||||
"""
|
||||
with self.get_lock():
|
||||
try:
|
||||
self.load_data()
|
||||
yield
|
||||
finally:
|
||||
self.save_data()
|
||||
|
||||
@staticmethod
|
||||
def guess_strategy_from_file(file_path: Path) -> StructuredLoggerStrategy:
|
||||
file_extension = file_path.suffix.lower().lstrip(".")
|
||||
return StructuredLogger.get_strategy(file_extension)
|
||||
|
||||
@staticmethod
|
||||
def get_strategy(strategy_name: str) -> StructuredLoggerStrategy:
|
||||
strategies = {
|
||||
"csv": CSVStrategy,
|
||||
"json": JSONStrategy,
|
||||
"yaml": YAMLStrategy,
|
||||
"yml": YAMLStrategy,
|
||||
}
|
||||
|
||||
try:
|
||||
return strategies[strategy_name]()
|
||||
except KeyError as e:
|
||||
raise ValueError(f"Unknown strategy for: {strategy_name}") from e
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
fire.Fire(StructuredLogger)
|
Loading…
Add table
Add a link
Reference in a new issue