From 9ae20b4a2a27cb072b31740876c2d04a3ec5326e Mon Sep 17 00:00:00 2001 From: dandds Date: Fri, 22 Nov 2019 15:53:49 -0500 Subject: [PATCH] JSON logging for Celery workers. This enables JSON logging for Celery workers if the LOG_JSON conig value is set. It uses the same JsonFormatter class used by the Flask applications. That class has been updated in two ways: - It takes a `source` kwarg to define the log source for the formatter. - The `msg` attribute of the log record is formatted with any arguments that may have been passed. This is necessary for Celery to render task type, completion time, etc. into the log output. --- atst/queue.py | 1 + atst/utils/logging.py | 19 +++++++++++++++---- celery_worker.py | 12 ++++++++++++ tests/utils/test_logging.py | 2 ++ 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/atst/queue.py b/atst/queue.py index 1cefbcbd..dfe9d894 100644 --- a/atst/queue.py +++ b/atst/queue.py @@ -1,5 +1,6 @@ from celery import Celery + celery = Celery(__name__) diff --git a/atst/utils/logging.py b/atst/utils/logging.py index f277ec4f..94e7aab6 100644 --- a/atst/utils/logging.py +++ b/atst/utils/logging.py @@ -32,14 +32,25 @@ class JsonFormatter(logging.Formatter): ("dod_edipi", lambda r: r.__dict__.get("dod_edipi")), ("severity", lambda r: r.levelname), ("tags", lambda r: r.__dict__.get("tags")), - ("message", lambda r: r.msg), ("audit_event", lambda r: r.__dict__.get("audit_event")), ] - def format(self, record): - message_dict = {"source": "atst"} + def __init__(self, *args, source="atst", **kwargs): + self.source = source + super().__init__(self) + + def format(self, record, *args, **kwargs): + message_dict = {"source": self.source} + for field, func in self._DEFAULT_RECORD_FIELDS: - message_dict[field] = func(record) + result = func(record) + if result: + message_dict[field] = result + + if record.args: + message_dict["message"] = record.msg % record.args + else: + message_dict["message"] = record.msg if record.__dict__.get("exc_info") is not None: message_dict["details"] = { diff --git a/celery_worker.py b/celery_worker.py index f1371182..d2a99b3a 100644 --- a/celery_worker.py +++ b/celery_worker.py @@ -1,7 +1,19 @@ #!/usr/bin/env python +import logging from atst.app import celery, make_app, make_config +from celery.signals import after_setup_task_logger + +from atst.utils.logging import JsonFormatter config = make_config() app = make_app(config) app.app_context().push() + + +@after_setup_task_logger.connect +def setup_task_logger(*args, **kwargs): + if app.config.get("LOG_JSON"): + logger = logging.getLogger() + for handler in logger.handlers: + handler.setFormatter(JsonFormatter(source="queue")) diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py index fd934d07..415c89d6 100644 --- a/tests/utils/test_logging.py +++ b/tests/utils/test_logging.py @@ -69,10 +69,12 @@ def test_request_context_filter(logger, log_stream_content, request_ctx, monkeyp user = Mock(spec=["id"]) user.id = user_uuid + user.dod_id = "5678901234" monkeypatch.setattr("atst.utils.logging.g", Mock(current_user=user)) request_ctx.request.environ["HTTP_X_REQUEST_ID"] = request_uuid logger.info("this user is doing something") log = json.loads(log_stream_content()) assert log["user_id"] == str(user_uuid) + assert log["dod_edipi"] == str(user.dod_id) assert log["request_id"] == request_uuid