5
\$\begingroup\$

Today, I've got a small FlowLogger for you to review. The idea is not to log pure messages, but focus on the flow of the app that might contain such items as:

  • state - json-snapshot of variables etc
  • event - something worth logging has occured
  • started - the flow has started
  • altered - the flow has been altered by some condition like an if
  • canceled - the flow has been canceled before it run to completion
  • faulted - the flow experienced an exception
  • completed - the flow has succesfuly run to completion

Implementation

The APIs are implemented by the FlowLogger that is build on top of the standard logging without exposing the original methods:

import json import logging import functools import pyodbc from typing import Union, List from timeit import default_timer as timer from datetime import datetime from logging import Handler 
 class FlowLogger: def __init__(self, name: str, handlers: List[logging.Handler]): self._logger = logging.Logger(name) for h in handlers: self._logger.addHandler(h) def state(self, **kwargs): self._logger.info(json.dumps(kwargs, sort_keys=True), (), **dict(extra={"flow": FlowLogger.state.__name__})) def event(self, name: str): self._logger.info(name, (), **dict(extra={"flow": FlowLogger.event().__name__})) def started(self, name: Union[str | None] = None, **kwargs): self._logger.info(name, **dict(extra=dict(**kwargs, **{"flow": FlowLogger.started.__name__}))) def altered(self, reason: str): self._logger.info(reason, **dict(extra={"flow": FlowLogger.altered.__name__})) def canceled(self, reason: Union[str | None]): self._logger.warning(reason, **dict(extra={"flow": FlowLogger.canceled.__name__})) def faulted(self, **kwargs): self._logger.exception(None, extra=dict(**kwargs, **{"flow": FlowLogger.faulted.__name__})) def completed(self, **kwargs): self._logger.info(None, **dict(extra=dict(**kwargs, **{"flow": FlowLogger.completed.__name__}))) 

In order to not repeat myself when logging the flow of entire methods the telemetry decorator takes over:

def telemetry(flow: FlowLogger): def decorate(decoratee): @functools.wraps(decoratee) def wrapper(*decoratee_args, **decoratee_kwargs): with FlowScope(flow, decoratee.__name__): return decoratee(*decoratee_args, **decoratee_kwargs) return wrapper return decorate 

It's supported by the FlowScope that handles the time measurement and exception handling:

class FlowScope: def __init__(self, flow: FlowLogger, api: str): self.flow = flow self.api = api self.start = timer() pass def __enter__(self): flow.started(api=self.api) def __exit__(self, exc_type, exc_val, exc_tb): if exc_type: flow.faulted(api=self.api, elapsed=timer() - self.start) else: flow.completed(api=self.api, elapsed=timer() - self.start) 

The output is handled by the SqlServerHandler:

 create table log( [id] int identity(1,1) primary key, [timestamp] datetime2, [logger] nvarchar(50), [module] nvarchar(200), [api] nvarchar(200), [flow] nvarchar(50), [elapsed] float, [message] nvarchar(max), [line] int ) 
class SqlServerHandler(Handler): def __init__(self, server: str, database: str, username: str, password: str): super().__init__(logging.INFO) connection = pyodbc.connect(f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}") self.db = connection.cursor() def emit(self, record): if record.exc_info: record.exc_text = logging.Formatter().formatException(record.exc_info) self.db.execute( "INSERT INTO log([timestamp], [logger], [module], [api], [flow], [elapsed], [message], [line]) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", datetime.fromtimestamp(record.created), record.name, record.module, record.api if hasattr(record, "api") else record.funcName, record.flow if hasattr(record, "flow") else None, record.elapsed if hasattr(record, "elapsed") else None, record.exc_text if record.exc_text else record.msg, record.lineno ) self.db.commit() 

Example

This is my dirty code for how I'm testing it:

 def initialize_telemetry() -> FlowLogger: stream_handler = logging.StreamHandler() stream_handler.setFormatter(logging.Formatter(style="{", fmt="{asctime} | {module}.{funcName} | {flow} | {message}", defaults={"flow": "<flow>", "message": "<message>"})) sql_server_handler = SqlServerHandler(server="localhost,1433", database="master", username="sa", password="***") flow = FlowLogger("test-logger", [stream_handler, sql_server_handler]) try: return flow finally: flow.completed() flow = initialize_telemetry() @telemetry(flow) def flow_decorator_test(): raise ValueError pass def flow_test(): flow.started("Custom flow.") flow.state(foo="bar") if True: flow.altered("The value was true.") try: raise ValueError except: flow.faulted() flow.completed() if __name__ == "__main__": flow_test() flow_decorator_test() 

What do you think? Would say I'm doing something terribly wrong here?

\$\endgroup\$
1
  • \$\begingroup\$There is a new version, a follow-up of this question here.\$\endgroup\$
    – t3chb0t
    CommentedOct 25, 2023 at 10:43

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.