This has been my approach for "simplifying" MongoDB aggregation queries in a pythonic syntax - was my intent, at the very least:
from datetime import datetime, timedelta class MatchStage(list): def _get_day_range(self): current_time = datetime.now() return (current_time - timedelta(days=1)), current_time def diferent(self, **kwargs): for field, value in kwargs.items(): self.append({field: {"$ne": value}}) def equals(self, **kwargs): for field, value in kwargs.items(): self.append({field: {"$eq": value}}) def set_interval(self, field, data): self.starttime, self.endtime = map(datetime.fromtimestamp, ( int(data["starttime"]), int(data["endtime"]) )) \ if {"starttime", "endtime"} <= data.keys() else self._get_day_range() self.append({ field: { "$gte": self.starttime, "$lt": self.endtime } }) def set_devices(self, devices): self.devices = devices self.append({"device": {"$in": [device.id for device in devices]}}) class SortStage(dict): @staticmethod def ascending_order(*fields): return {"$sort": {field: 1 for field in fields} } @staticmethod def descending_order(*fields): return {"$sort": {field: -1 for field in fields} } class ReRootStage(dict): @staticmethod def reset_root(*args): return {"$replaceRoot": {"newRoot": {"$mergeObjects": [{key: '$' + key for key in args}, "$_id"]}}} class GroupStage(dict): def sum_metrics(self, **kwargs): self.update({k: {"$sum": v if isinstance(v, int) else {"$toLong": v if v.startswith('$') else '$' + v}} for k, v in kwargs.items()}) def group_fields(self, *fields): self["_id"].update({field: '$' + field for field in fields}) def _nest_field(self, key, field): self[key] = {"$push": field} def nest_field(self, key, field): if not field.startswith('$'): field = '$' + field self._nest_field(key, field) def nest_fields(self, *fields, key, **kfields): if not fields and not kfields: raise Exception("No field specified") self._nest_field(key, {field: '$' + field for field in fields} or {field: '$' + value for field, value in kfields.items()}) class BasePipeline(list): _match = MatchStage _group = GroupStage _sort = SortStage _reroot = ReRootStage def __init__(self, data, devices): self._set_match().set_interval("time", data) if "devid" in data: devices = devices.filter(id__in=data["devid"].split(',')) assert devices.exists(), "Device not found" self.match.set_devices(devices) def _set_group(self): self.group = self._group() self.append({"$group": self.group}) return self.group def _set_match(self): self.match = self._match() self.append({"$match": {"$and": self.match}}) return self.match def set_simple_group(self, field): self._set_group().update({"_id": field if field.startswith('$') else '$' + field}) self.extend([{"$addFields":{"id": "$_id"}}, {"$project": {"_id": 0}}]) def set_composite_group(self, *fields, **kfields): if not fields and not kfields: raise Exception("No fields specified") self._set_group().update({"_id": {field: '$' + field for field in fields} or {field: (value if not isinstance(value, str) or value.startswith('$') else '$' + value) for field, value in kfields.items()}}) def sort_documents(self, *fields, descending=False): self.append(self._sort.descending_order(*fields) if descending else self._sort.ascending_order(*fields)) class WebFilterBlockedPipeline(BasePipeline): def aggregate_root(self, *args): self.append(self._reroot.reset_root(*args)) def aggregate_data(self, **kfields): self.group.sum_metrics(**kfields) self.aggregate_root(*kfields) @classmethod def create(cls, data, devices): pipeline = cls(data, devices) pipeline.match.equals(action="blocked") pipeline.set_composite_group("url", "source_ip", "profile") pipeline.aggregate_data(count=1) pipeline.sort_documents("count", descending=True) pipeline.set_simple_group(field="url") pipeline.group.sum_metrics(count="count") pipeline.group.nest_field("source_ip", "profile", "count", key="users") pipeline.sort_documents("count", descending=True) return pipeline
Example output of WebFilterBlockedPipeline.create()
in JSON-format:
[{ "$match": { "$and": [{ "time": { "$gte": "2020-07-15T16:04:19", "$lt": "2020-07-16T16:04:19" } }, { "device": { "$in": ["FG100ETK18035573"] } }, { "action": { "$eq": "blocked" } }] } }, { "$group": { "_id": { "url": "$url", "source_ip": "$source_ip", "profile": "$profile" }, "count": { "$sum": 1 } } }, { "$replaceRoot": { "newRoot": { "$mergeObjects": [{ "count": "$count" }, "$_id"] } } }, { "$sort": { "count": -1 } }, { "$group": { "_id": "$url", "count": { "$sum": { "$toLong": "$count" } }, "users": { "$push": { "source_ip": "$source_ip", "profile": "$profile", "count": "$count" } } } }, { "$addFields": { "id": "$_id" } }, { "$project": { "_id": 0 } }, { "$sort": { "count": -1 } }]
I'd like to ask if this is an acceptable implementation and, if not, if I should just scratch it completely, follow another design pattern or any constructive criticism.