Today, I've got a small FlowLogger
for you to review. The idea is not to log pure messages, but focus on the flow of the app that might contain such items as:
state
- json-snapshot of variables etcevent
- something worth logging has occuredstarted
- the flow has startedaltered
- the flow has been altered by some condition like anif
canceled
- the flow has been canceled before it run to completionfaulted
- the flow experienced an exceptioncompleted
- the flow has succesfuly run to completion
Implementation
The APIs are implemented by the FlowLogger
that is build on top of the standard logging without exposing the original methods:
import json import logging import functools import pyodbc from typing import Union, List from timeit import default_timer as timer from datetime import datetime from logging import Handler
class FlowLogger: def __init__(self, name: str, handlers: List[logging.Handler]): self._logger = logging.Logger(name) for h in handlers: self._logger.addHandler(h) def state(self, **kwargs): self._logger.info(json.dumps(kwargs, sort_keys=True), (), **dict(extra={"flow": FlowLogger.state.__name__})) def event(self, name: str): self._logger.info(name, (), **dict(extra={"flow": FlowLogger.event().__name__})) def started(self, name: Union[str | None] = None, **kwargs): self._logger.info(name, **dict(extra=dict(**kwargs, **{"flow": FlowLogger.started.__name__}))) def altered(self, reason: str): self._logger.info(reason, **dict(extra={"flow": FlowLogger.altered.__name__})) def canceled(self, reason: Union[str | None]): self._logger.warning(reason, **dict(extra={"flow": FlowLogger.canceled.__name__})) def faulted(self, **kwargs): self._logger.exception(None, extra=dict(**kwargs, **{"flow": FlowLogger.faulted.__name__})) def completed(self, **kwargs): self._logger.info(None, **dict(extra=dict(**kwargs, **{"flow": FlowLogger.completed.__name__})))
In order to not repeat myself when logging the flow of entire methods the telemetry
decorator takes over:
def telemetry(flow: FlowLogger): def decorate(decoratee): @functools.wraps(decoratee) def wrapper(*decoratee_args, **decoratee_kwargs): with FlowScope(flow, decoratee.__name__): return decoratee(*decoratee_args, **decoratee_kwargs) return wrapper return decorate
It's supported by the FlowScope
that handles the time measurement and exception handling:
class FlowScope: def __init__(self, flow: FlowLogger, api: str): self.flow = flow self.api = api self.start = timer() pass def __enter__(self): flow.started(api=self.api) def __exit__(self, exc_type, exc_val, exc_tb): if exc_type: flow.faulted(api=self.api, elapsed=timer() - self.start) else: flow.completed(api=self.api, elapsed=timer() - self.start)
The output is handled by the SqlServerHandler
:
create table log( [id] int identity(1,1) primary key, [timestamp] datetime2, [logger] nvarchar(50), [module] nvarchar(200), [api] nvarchar(200), [flow] nvarchar(50), [elapsed] float, [message] nvarchar(max), [line] int )
class SqlServerHandler(Handler): def __init__(self, server: str, database: str, username: str, password: str): super().__init__(logging.INFO) connection = pyodbc.connect(f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}") self.db = connection.cursor() def emit(self, record): if record.exc_info: record.exc_text = logging.Formatter().formatException(record.exc_info) self.db.execute( "INSERT INTO log([timestamp], [logger], [module], [api], [flow], [elapsed], [message], [line]) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", datetime.fromtimestamp(record.created), record.name, record.module, record.api if hasattr(record, "api") else record.funcName, record.flow if hasattr(record, "flow") else None, record.elapsed if hasattr(record, "elapsed") else None, record.exc_text if record.exc_text else record.msg, record.lineno ) self.db.commit()
Example
This is my dirty code for how I'm testing it:
def initialize_telemetry() -> FlowLogger: stream_handler = logging.StreamHandler() stream_handler.setFormatter(logging.Formatter(style="{", fmt="{asctime} | {module}.{funcName} | {flow} | {message}", defaults={"flow": "<flow>", "message": "<message>"})) sql_server_handler = SqlServerHandler(server="localhost,1433", database="master", username="sa", password="***") flow = FlowLogger("test-logger", [stream_handler, sql_server_handler]) try: return flow finally: flow.completed() flow = initialize_telemetry() @telemetry(flow) def flow_decorator_test(): raise ValueError pass def flow_test(): flow.started("Custom flow.") flow.state(foo="bar") if True: flow.altered("The value was true.") try: raise ValueError except: flow.faulted() flow.completed() if __name__ == "__main__": flow_test() flow_decorator_test()
What do you think? Would say I'm doing something terribly wrong here?