"""
.. _loggers:
fl_sim.utils.loggers
----------------------
This module contains various loggers.
"""
import json
import logging
import re
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime
from numbers import Real
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import torch
import yaml
from torch_ecg.utils import ReprMixin, add_docstring, get_date_str, get_kwargs, init_logger
from .const import LOG_DIR as DEFAULT_LOG_DIR
from .const import NAME as LOG_NAME
from .misc import default_dict_to_dict, make_serializable
__all__ = [
"BaseLogger",
"TxtLogger",
"JsonLogger",
"LoggerManager",
]
[docs]class BaseLogger(ReprMixin, ABC):
"""Abstract base class of all loggers."""
__name__ = "BaseLogger"
__time_fmt__ = "%Y-%m-%d %H:%M:%S"
[docs] @staticmethod
def set_log_dir(log_dir: Optional[Union[str, Path]] = None) -> Path:
"""Set the log directory.
Parameters
----------
log_dir : str or pathlib.Path
The log directory.
Returns
-------
None
"""
if log_dir is None:
log_dir = DEFAULT_LOG_DIR
elif Path(log_dir).is_absolute():
log_dir = Path(log_dir)
else:
log_dir = DEFAULT_LOG_DIR / log_dir
log_dir.mkdir(exist_ok=True, parents=True)
return log_dir
[docs] @abstractmethod
def log_metrics(
self,
client_id: Union[int, type(None)],
metrics: Dict[str, Union[Real, torch.Tensor]],
step: Optional[int] = None,
epoch: Optional[int] = None,
part: str = "val",
) -> None:
"""Log metrics.
Parameters
----------
client_id : int
Index of the client, ``None`` for the server.
metrics : dict
The metrics to be logged.
step : int, optional
The current number of (global) steps of training.
epoch : int, optional
The current epoch number of training.
part : str, default "val"
The part of the training data the metrics computed from,
can be ``"train"`` or ``"val"`` or ``"test"``, etc.
Returns
-------
None
"""
raise NotImplementedError
[docs] @abstractmethod
def log_message(self, msg: str, level: int = logging.INFO) -> None:
"""Log a message.
Parameters
----------
msg : str
The message to be logged.
level : int, optional
The level of the message, can be one of
``logging.DEBUG``, ``logging.INFO``, ``logging.WARNING``,
``logging.ERROR``, ``logging.CRITICAL``
Returns
-------
None
"""
raise NotImplementedError
[docs] @abstractmethod
def flush(self) -> None:
"""Flush the message buffer."""
raise NotImplementedError
[docs] @abstractmethod
def close(self) -> None:
"""Close the logger."""
raise NotImplementedError
[docs] @abstractmethod
def reset(self) -> None:
"""Reset the logger."""
raise NotImplementedError
[docs] @classmethod
@abstractmethod
def from_config(cls, config: Dict[str, Any]) -> Any:
"""Create a logger instance from a configuration."""
raise NotImplementedError
[docs] def epoch_start(self, epoch: int) -> None:
"""Actions to be performed at the start of each epoch.
Parameters
----------
epoch : int
The number of the current epoch.
Returns
-------
None
"""
pass
[docs] def epoch_end(self, epoch: int) -> None:
"""Actions to be performed at the end of each epoch.
Parameters
----------
epoch : int
The number of the current epoch.
Returns
-------
None
"""
pass
@property
def log_dir(self) -> str:
"""Directory to save the log file."""
return self._log_dir
@property
@abstractmethod
def filename(self) -> str:
"""Name of the log file."""
raise NotImplementedError
[docs]class TxtLogger(BaseLogger):
"""Logger that logs to a text file.
Parameters
----------
algorithm, dataset, model : str
Used to form the prefix of the log file.
log_dir : str or pathlib.Path, optional
Directory to save the log file.
If ``None``, use the default log directory.
If not absolute, use ``DEFAULT_LOG_DIR/log_dir``.
log_suffix : str, optional
Suffix of the log file.
verbose : int, default 1
The verbosity level.
"""
__name__ = "TxtLogger"
def __init__(
self,
algorithm: str,
dataset: str,
model: str,
log_dir: Optional[Union[str, Path]] = None,
log_suffix: Optional[str] = None,
verbose: int = 1,
) -> None:
assert all([isinstance(x, str) for x in [algorithm, dataset, model]]), "algorithm, dataset, model must be str"
self.log_prefix = re.sub("[\\s]+", "_", f"{algorithm}-{dataset}-{model}")
self._log_dir = self.set_log_dir(log_dir)
if log_suffix is None:
self.log_suffix = ""
else:
self.log_suffix = f"_{log_suffix}"
self.log_file = f"{self.log_prefix}_{get_date_str()}{self.log_suffix}.txt"
self.verbose = verbose
self.logger = init_logger(
self.log_dir,
self.log_file,
log_name=LOG_NAME,
verbose=verbose,
)
self.step = -1
[docs] def log_metrics(
self,
client_id: Union[int, type(None)],
metrics: Dict[str, Union[Real, torch.Tensor]],
step: Optional[int] = None,
epoch: Optional[int] = None,
part: str = "val",
) -> None:
if step is not None:
self.step = step
else:
self.step += 1
prefix = f"Step {step}: "
if epoch is not None:
prefix = f"Epoch {epoch} / {prefix}"
_metrics = {k: v.item() if isinstance(v, torch.Tensor) else v for k, v in metrics.items()}
spaces = len(max(_metrics.keys(), key=len))
node = "Server" if client_id is None else f"Client {client_id}"
msg = (
f"{node} {part.capitalize()} Metrics:\n{self.short_sep}\n"
+ "\n".join([f"{prefix}{part}/{k} : {' '*(spaces-len(k))}{v:.4f}" for k, v in _metrics.items()])
+ f"\n{self.short_sep}"
)
self.log_message(msg)
[docs] def log_message(self, msg: str, level: int = logging.INFO) -> None:
self.logger.log(level, msg)
@property
def long_sep(self) -> str:
"""Long separator for logging messages."""
return "-" * 110
@property
def short_sep(self) -> str:
"""Short separator for logging messages."""
return "-" * 50
[docs] def epoch_start(self, epoch: int) -> None:
self.logger.info(f"Train epoch_{epoch}:\n{self.long_sep}")
[docs] def epoch_end(self, epoch: int) -> None:
self.logger.info(f"{self.long_sep}\n")
[docs] def flush(self) -> None:
for h in self.logger.handlers:
if hasattr(h, "flush"):
h.flush()
[docs] def close(self) -> None:
handlers = self.logger.handlers
for h in handlers:
self.logger.removeHandler(h)
h.close()
# logging.shutdown()
[docs] def reset(self) -> None:
"""Reset the logger.
Close the current logger and create a new one,
with new log file name.
"""
self.close()
self.log_file = f"{self.log_prefix}_{get_date_str()}{self.log_suffix}.txt"
self.logger = init_logger(
self.log_dir,
self.log_file,
log_name="FLSim",
verbose=self.verbose,
)
self.step = -1
[docs] @classmethod
def from_config(cls, config: Dict[str, Any]) -> "TxtLogger":
"""Create a :class:`TxtLogger` instance from a configuration.
Parameters
----------
config : dict
Configuration for the logger. The following keys are used:
- ``"algorithm"``: :obj:`str`,
name of the algorithm.
- ``"dataset"``: :obj:`str`,
name of the dataset.
- ``"model"``: :obj:`str`,
name of the model.
- ``"log_dir"``: :obj:`str` or :class:`pathlib.Path`, optional,
directory to save the log file.
- ``"log_suffix"``: :obj:`str`, optional,
suffix of the log file.
Returns
-------
TxtLogger
A :class:`TxtLogger` instance.
"""
return cls(**config)
@property
def filename(self) -> str:
return str(self.log_dir / self.log_file)
[docs]class JsonLogger(BaseLogger):
"""Logger that logs to a JSON file,
or a yaml file.
The structure is as follows for example:
.. dropdown::
:animate: fade-in-slide-down
.. code-block:: json
{
"train": {
"client0": [
{
"epoch": 1,
"step": 1,
"time": "2020-01-01 00:00:00",
"loss": 0.1,
"acc": 0.2,
"top3_acc": 0.3,
"top5_acc": 0.4,
"num_samples": 100
}
]
},
"val": {
"client0": [
{
"epoch": 1,
"step": 1,
"time": "2020-01-01 00:00:00",
"loss": 0.1,
"acc": 0.2,
"top3_acc": 0.3,
"top5_acc": 0.4,
"num_samples": 100
}
]
}
}
Parameters
----------
algorithm, dataset, model : str
Used to form the prefix of the log file.
fmt : {"json", "yaml"}, optional
Format of the log file.
log_dir : str or pathlib.Path, optional
Directory to save the log file
log_suffix : str, optional
Suffix of the log file.
verbose : int, default 1
The verbosity level.
Not used in this logger,
but is kept for compatibility with other loggers.
"""
__name__ = "JsonLogger"
def __init__(
self,
algorithm: str,
dataset: str,
model: str,
fmt: str = "json",
log_dir: Optional[Union[str, Path]] = None,
log_suffix: Optional[str] = None,
verbose: int = 1,
) -> None:
assert all([isinstance(x, str) for x in [algorithm, dataset, model]]), "algorithm, dataset, model must be str"
self.log_prefix = re.sub("[\\s]+", "_", f"{algorithm}-{dataset}-{model}")
self._log_dir = self.set_log_dir(log_dir)
if log_suffix is None:
self.log_suffix = ""
else:
self.log_suffix = f"_{log_suffix}"
self.log_file = f"{self.log_prefix}_{get_date_str()}{self.log_suffix}.{fmt}"
self.fmt = fmt.lower()
assert self.fmt in ["json", "yaml"], "fmt must be json or yaml"
self.logger = defaultdict(lambda: defaultdict(list))
self.step = -1
self._flushed = True
[docs] def log_metrics(
self,
client_id: Union[int, type(None)],
metrics: Dict[str, Union[Real, torch.Tensor]],
step: Optional[int] = None,
epoch: Optional[int] = None,
part: str = "val",
) -> None:
if step is not None:
self.step = step
else:
self.step += 1
node = "Server" if client_id is None else f"Client{client_id}"
append_item = {
"step": self.step,
"time": self.strftime(datetime.now()),
}
if epoch is not None:
append_item.update({"epoch": epoch})
append_item.update({k: v.item() if isinstance(v, torch.Tensor) else v for k, v in metrics.items()})
self.logger[part][node].append(append_item)
self._flushed = False
[docs] def log_message(self, msg: str, level: int = logging.INFO) -> None:
pass
[docs] def flush(self) -> None:
if not self._flushed:
# convert to list to make it json serializable
flush_buffer = make_serializable(default_dict_to_dict(self.logger))
if self.fmt == "json":
Path(self.filename).write_text(json.dumps(flush_buffer, indent=4, ensure_ascii=False))
else: # yaml
Path(self.filename).write_text(yaml.dump(flush_buffer, allow_unicode=True))
print(f"{self.fmt} log file saved to {self.filename}")
# clear the buffer
self.logger = defaultdict(lambda: defaultdict(list))
self._flushed = True
[docs] def close(self) -> None:
self.flush()
[docs] def reset(self) -> None:
"""Reset the logger.
Close the current logger and create a new one,
with new log file name.
"""
self.close()
self.log_file = f"{self.log_prefix}_{get_date_str()}{self.log_suffix}.{self.fmt}"
self.logger = defaultdict(lambda: defaultdict(list))
self.step = -1
self._flushed = True
def __del__(self):
self.flush()
del self
[docs] @classmethod
def from_config(cls, config: Dict[str, Any]) -> "JsonLogger":
"""Create a :class:`JsonLogger` instance from a configuration.
Parameters
----------
config : dict
Configuration for the logger. The following keys are used:
- ``"algorithm"``: :obj:`str`,
name of the algorithm.
- ``"dataset"``: :obj:`str`,
name of the dataset.
- ``"model"``: :obj:`str`,
name of the model.
- ``"fmt"``: {"json", "yaml"}, optional,
format of the log file, default: ``"json"``.
- ``"log_dir"``: :obj:`str` or :class:`pathlib.Path`, optional,
directory to save the log file.
- ``"log_suffix"``: :obj:`str`, optional,
suffix of the log file.
Returns
-------
JsonLogger
A :class:`JsonLogger` instance.
"""
return cls(**config)
@property
def filename(self) -> str:
return str(self.log_dir / self.log_file)
[docs] @staticmethod
def strftime(time: datetime) -> str:
return time.strftime(JsonLogger.__time_fmt__)
[docs] @staticmethod
def strptime(time: str) -> datetime:
return datetime.strptime(time, JsonLogger.__time_fmt__)
[docs]class LoggerManager(ReprMixin):
"""Manager for loggers.
Parameters
----------
algorithm, dataset, model : str
Used to form the prefix of the log file.
log_dir : str or pathlib.Path, optional
Directory to save the log file
log_suffix : str, optional
Suffix of the log file.
verbose : int, default 1
The verbosity level.
"""
__name__ = "LoggerManager"
def __init__(
self,
algorithm: str,
dataset: str,
model: str,
log_dir: Optional[Union[str, Path]] = None,
log_suffix: Optional[str] = None,
verbose: int = 1,
) -> None:
self._algorith = algorithm
self._dataset = dataset
self._model = model
self._log_dir = BaseLogger.set_log_dir(log_dir)
self._log_suffix = log_suffix
self._verbose = verbose
self._loggers = []
def _add_txt_logger(self) -> None:
"""Add a :class:`TxtLogger` instance to the manager."""
self.loggers.append(
TxtLogger(
self._algorith,
self._dataset,
self._model,
self._log_dir,
self._log_suffix,
self._verbose,
)
)
def _add_json_logger(self, fmt: str = "json") -> None:
"""Add a :class:`JsonLogger` instance to the manager."""
self.loggers.append(
JsonLogger(
self._algorith,
self._dataset,
self._model,
fmt,
self._log_dir,
self._log_suffix,
self._verbose,
)
)
[docs] @add_docstring(BaseLogger.log_message.__doc__)
def log_metrics(
self,
client_id: Union[int, type(None)],
metrics: Dict[str, Union[Real, torch.Tensor]],
step: Optional[int] = None,
epoch: Optional[int] = None,
part: str = "val",
) -> None:
for lgs in self.loggers:
lgs.log_metrics(client_id, metrics, step, epoch, part)
[docs] @add_docstring(BaseLogger.log_message.__doc__)
def log_message(self, msg: str, level: int = logging.INFO) -> None:
for lgs in self.loggers:
lgs.log_message(msg, level)
[docs] @add_docstring(BaseLogger.epoch_start.__doc__)
def epoch_start(self, epoch: int) -> None:
for lgs in self.loggers:
lgs.epoch_start(epoch)
[docs] @add_docstring(BaseLogger.epoch_end.__doc__)
def epoch_end(self, epoch: int) -> None:
for lgs in self.loggers:
lgs.epoch_end(epoch)
[docs] @add_docstring(BaseLogger.flush.__doc__)
def flush(self) -> None:
for lgs in self.loggers:
lgs.flush()
[docs] @add_docstring(BaseLogger.close.__doc__)
def close(self) -> None:
for lgs in self.loggers:
lgs.close()
[docs] @add_docstring(BaseLogger.reset.__doc__)
def reset(self) -> None:
for lgs in self.loggers:
lgs.reset()
@property
def loggers(self) -> List[BaseLogger]:
"""The list of loggers."""
return self._loggers
@property
def log_dir(self) -> str:
"""Directory to save the log files."""
return self._log_dir
@property
def log_suffix(self) -> str:
"""Suffix of the log files."""
return self._log_suffix
[docs] @classmethod
def from_config(cls, config: Dict[str, Any]) -> "LoggerManager":
"""Create a :class:`LoggerManager` instance from a configuration.
Parameters
----------
config : dict
Configuration of the logger manager. The following keys are used:
- ``"algorithm"``: :obj:`str`,
algorithm name.
- ``"dataset"``: :obj:`str`,
dataset name.
- ``"model"``: :obj:`str`,
model name.
- ``"log_dir"``: :obj:`str` or :class:`pathlib.Path`, optional,
directory to save the log files.
- ``"log_suffix"``: :obj:`str`, optional,
suffix of the log files.
- ``"txt_logger"``: :obj:`bool`, optional,
whether to add a :class:`TxtLogger` instance.
- ``"json_logger"``: :obj:`bool`, optional,
whether to add a :class:`JsonLogger` instance.
- ``"fmt"``: {"json", "yaml"}, optional,
format of the json log file, default: ``"json"``,
valid when ``"json_logger"`` is ``True``.
- ``"verbose"``: :obj:`int`, optional,
verbosity level of the logger manager.
Returns
-------
LoggerManager
A :class:`LoggerManager` instance.
"""
lm = cls(
config["algorithm"],
config["dataset"],
config["model"],
config.get("log_dir", None),
config.get("log_suffix", None),
config.get("verbose", 1),
)
if config.get("txt_logger", True):
lm._add_txt_logger()
if config.get("json_logger", True):
lm._add_json_logger(fmt=config.get("fmt", get_kwargs(JsonLogger)["fmt"]))
return lm