From 7010bdb09c28881e973376255da572a8280dd1b2 Mon Sep 17 00:00:00 2001 From: dandds Date: Thu, 5 Sep 2019 11:07:51 -0400 Subject: [PATCH] Record job failures with application context. AT-AT needs to be able to track which user tasks failed and why. To accomplish this we: - Enabled Celery's results backend, which logs task results to a data store; a Postgres table, in our case. (https://docs.celeryproject.org/en/latest/userguide/tasks.html#result-backends) - Created tables to track the relationships between the relevant models (Environment, EnvironmentRole) and their task failures. - Added an `on_failure` hook that tasks can use. The hook will add records to the job failure tables. Now a resource like an `Environment` has access to it task failures through the corresponding failure table. Notes: - It might prove useful to use a real foreign key to the Celery results table eventually. I did not do it here because it requires that we explicitly store the Celery results table schema as a migration and add a model for it. In the current implementation, AT-AT can be agnostic about where the results live. - We store the task results indefinitely, so it is important to specify tasks for which we do not care about the results (like `send_mail`) via the `ignore_result` kwarg. --- .../30ea1cb20807_add_job_failure_tables.py | 42 +++++++++++++++++++ atst/app.py | 6 +++ atst/jobs.py | 26 +++++++++++- atst/models/__init__.py | 1 + atst/models/environment.py | 2 + atst/models/environment_role.py | 2 + atst/models/job_failure.py | 16 +++++++ atst/models/mixins/__init__.py | 1 + atst/models/mixins/job_failure.py | 14 +++++++ tests/conftest.py | 9 ++++ tests/test_jobs.py | 41 ++++++++++++++++++ 11 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 alembic/versions/30ea1cb20807_add_job_failure_tables.py create mode 100644 atst/models/job_failure.py create mode 100644 atst/models/mixins/job_failure.py create mode 100644 tests/test_jobs.py diff --git a/alembic/versions/30ea1cb20807_add_job_failure_tables.py b/alembic/versions/30ea1cb20807_add_job_failure_tables.py new file mode 100644 index 00000000..48f5d117 --- /dev/null +++ b/alembic/versions/30ea1cb20807_add_job_failure_tables.py @@ -0,0 +1,42 @@ +"""add job failure tables + +Revision ID: 30ea1cb20807 +Revises: 4a3122ffe898 +Create Date: 2019-09-06 06:56:25.685805 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '30ea1cb20807' # pragma: allowlist secret +down_revision = '4a3122ffe898' # pragma: allowlist secret +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('environment_job_failures', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('task_id', sa.String(), nullable=False), + sa.Column('environment_id', postgresql.UUID(as_uuid=True), nullable=False), + sa.ForeignKeyConstraint(['environment_id'], ['environments.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_table('environment_role_job_failures', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('task_id', sa.String(), nullable=False), + sa.Column('environment_role_id', postgresql.UUID(as_uuid=True), nullable=False), + sa.ForeignKeyConstraint(['environment_role_id'], ['environment_roles.id'], ), + sa.PrimaryKeyConstraint('id') + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('environment_role_job_failures') + op.drop_table('environment_job_failures') + # ### end Alembic commands ### diff --git a/atst/app.py b/atst/app.py index 69c87cbf..488b57e9 100644 --- a/atst/app.py +++ b/atst/app.py @@ -169,6 +169,12 @@ def map_config(config): "LIMIT_CONCURRENT_SESSIONS": config.getboolean( "default", "LIMIT_CONCURRENT_SESSIONS" ), + # Store the celery task results in a database table (celery_taskmeta) + "CELERY_RESULT_BACKEND": "db+{}".format(config.get("default", "DATABASE_URI")), + # Do not automatically delete results (by default, Celery will do this + # with a Beat job once a day) + "CELERY_RESULT_EXPIRES": 0, + "CELERY_RESULT_EXTENDED": True, } diff --git a/atst/jobs.py b/atst/jobs.py index 09c35093..2d4440b1 100644 --- a/atst/jobs.py +++ b/atst/jobs.py @@ -1,14 +1,36 @@ from flask import current_app as app from atst.queue import celery +from atst.database import db +from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure -@celery.task() +class RecordEnvironmentFailure(celery.Task): + def on_failure(self, exc, task_id, args, kwargs, einfo): + if "environment_id" in kwargs: + failure = EnvironmentJobFailure( + environment_id=kwargs["environment_id"], task_id=task_id + ) + db.session.add(failure) + db.session.commit() + + +class RecordEnvironmentRoleFailure(celery.Task): + def on_failure(self, exc, task_id, args, kwargs, einfo): + if "environment_role_id" in kwargs: + failure = EnvironmentRoleJobFailure( + environment_role_id=kwargs["environment_role_id"], task_id=task_id + ) + db.session.add(failure) + db.session.commit() + + +@celery.task(ignore_result=True) def send_mail(recipients, subject, body): app.mailer.send(recipients, subject, body) -@celery.task() +@celery.task(ignore_result=True) def send_notification_mail(recipients, subject, body): app.logger.info( "Sending a notification to these recipients: {}\n\nSubject: {}\n\n{}".format( diff --git a/atst/models/__init__.py b/atst/models/__init__.py index c24bc152..7cee8de7 100644 --- a/atst/models/__init__.py +++ b/atst/models/__init__.py @@ -11,6 +11,7 @@ from .audit_event import AuditEvent from .clin import CLIN, JEDICLINType from .environment import Environment from .environment_role import EnvironmentRole, CSPRole +from .job_failure import EnvironmentJobFailure, EnvironmentRoleJobFailure from .notification_recipient import NotificationRecipient from .permissions import Permissions from .permission_set import PermissionSet diff --git a/atst/models/environment.py b/atst/models/environment.py index bbd34615..1c85484a 100644 --- a/atst/models/environment.py +++ b/atst/models/environment.py @@ -19,6 +19,8 @@ class Environment( cloud_id = Column(String) + job_failures = relationship("EnvironmentJobFailure") + @property def users(self): return {r.application_role.user for r in self.roles} diff --git a/atst/models/environment_role.py b/atst/models/environment_role.py index 371072f3..9f6754f8 100644 --- a/atst/models/environment_role.py +++ b/atst/models/environment_role.py @@ -31,6 +31,8 @@ class EnvironmentRole( ) application_role = relationship("ApplicationRole") + job_failures = relationship("EnvironmentRoleJobFailure") + def __repr__(self): return "".format( self.role, self.application_role.user_name, self.environment.name, self.id diff --git a/atst/models/job_failure.py b/atst/models/job_failure.py new file mode 100644 index 00000000..50d553a1 --- /dev/null +++ b/atst/models/job_failure.py @@ -0,0 +1,16 @@ +from sqlalchemy import Column, ForeignKey + +from atst.models import Base +from atst.models import mixins + + +class EnvironmentJobFailure(Base, mixins.JobFailureMixin): + __tablename__ = "environment_job_failures" + + environment_id = Column(ForeignKey("environments.id"), nullable=False) + + +class EnvironmentRoleJobFailure(Base, mixins.JobFailureMixin): + __tablename__ = "environment_role_job_failures" + + environment_role_id = Column(ForeignKey("environment_roles.id"), nullable=False) diff --git a/atst/models/mixins/__init__.py b/atst/models/mixins/__init__.py index dd6d0faf..fbc5e448 100644 --- a/atst/models/mixins/__init__.py +++ b/atst/models/mixins/__init__.py @@ -3,3 +3,4 @@ from .auditable import AuditableMixin from .permissions import PermissionsMixin from .deletable import DeletableMixin from .invites import InvitesMixin +from .job_failure import JobFailureMixin diff --git a/atst/models/mixins/job_failure.py b/atst/models/mixins/job_failure.py new file mode 100644 index 00000000..c4f4cfa4 --- /dev/null +++ b/atst/models/mixins/job_failure.py @@ -0,0 +1,14 @@ +from celery.result import AsyncResult +from sqlalchemy import Column, String, Integer + + +class JobFailureMixin(object): + id = Column(Integer(), primary_key=True) + task_id = Column(String(), nullable=False) + + @property + def task(self): + if not hasattr(self, "_task"): + self._task = AsyncResult(self.task_id) + + return self._task diff --git a/tests/conftest.py b/tests/conftest.py index 3535e3a4..3bc9c2b6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -320,3 +320,12 @@ def notification_sender(app): yield app.notification_sender app.notification_sender = real_notification_sender + + +# This is the only effective means I could find to disable logging. Setting a +# `celery_enable_logging` fixture to return False should work according to the +# docs, but doesn't: +# https://docs.celeryproject.org/en/latest/userguide/testing.html#celery-enable-logging-override-to-enable-logging-in-embedded-workers +@pytest.fixture(scope="function") +def celery_worker_parameters(): + return {"loglevel": "FATAL"} diff --git a/tests/test_jobs.py b/tests/test_jobs.py new file mode 100644 index 00000000..4ad5220c --- /dev/null +++ b/tests/test_jobs.py @@ -0,0 +1,41 @@ +import pytest + +from atst.jobs import RecordEnvironmentFailure, RecordEnvironmentRoleFailure + +from tests.factories import EnvironmentFactory, EnvironmentRoleFactory + + +def test_environment_job_failure(celery_app, celery_worker): + @celery_app.task(bind=True, base=RecordEnvironmentFailure) + def _fail_hard(self, environment_id=None): + raise ValueError("something bad happened") + + environment = EnvironmentFactory.create() + celery_worker.reload() + + # Use apply instead of delay since we are testing the on_failure hook only + task = _fail_hard.apply(kwargs={"environment_id": environment.id}) + with pytest.raises(ValueError): + task.get() + + assert environment.job_failures + job_failure = environment.job_failures[0] + assert job_failure.task == task + + +def test_environment_role_job_failure(celery_app, celery_worker): + @celery_app.task(bind=True, base=RecordEnvironmentRoleFailure) + def _fail_hard(self, environment_role_id=None): + raise ValueError("something bad happened") + + role = EnvironmentRoleFactory.create() + celery_worker.reload() + + # Use apply instead of delay since we are testing the on_failure hook only + task = _fail_hard.apply(kwargs={"environment_role_id": role.id}) + with pytest.raises(ValueError): + task.get() + + assert role.job_failures + job_failure = role.job_failures[0] + assert job_failure.task == task