Record job failures with application context.

AT-AT needs to be able to track which user tasks failed and why. To
accomplish this we:

- Enabled Celery's results backend, which logs task results to a data
  store; a Postgres table, in our case.
  (https://docs.celeryproject.org/en/latest/userguide/tasks.html#result-backends)
- Created tables to track the relationships between the relevant models
  (Environment, EnvironmentRole) and their task failures.
- Added an `on_failure` hook that tasks can use. The hook will add
  records to the job failure tables.

Now a resource like an `Environment` has access to it task failures
through the corresponding failure table.

Notes:
- It might prove useful to use a real foreign key to the Celery results
  table eventually. I did not do it here because it requires that we
  explicitly store the Celery results table schema as a migration and
  add a model for it. In the current implementation, AT-AT can be
  agnostic about where the results live.
- We store the task results indefinitely, so it is important to specify
  tasks for which we do not care about the results (like `send_mail`)
  via the `ignore_result` kwarg.
This commit is contained in:
dandds 2019-09-05 11:07:51 -04:00
parent 1cbefb099b
commit 7010bdb09c
11 changed files with 158 additions and 2 deletions

View File

@ -0,0 +1,42 @@
"""add job failure tables
Revision ID: 30ea1cb20807
Revises: 4a3122ffe898
Create Date: 2019-09-06 06:56:25.685805
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '30ea1cb20807' # pragma: allowlist secret
down_revision = '4a3122ffe898' # pragma: allowlist secret
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('environment_job_failures',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('task_id', sa.String(), nullable=False),
sa.Column('environment_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['environment_id'], ['environments.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('environment_role_job_failures',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('task_id', sa.String(), nullable=False),
sa.Column('environment_role_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['environment_role_id'], ['environment_roles.id'], ),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('environment_role_job_failures')
op.drop_table('environment_job_failures')
# ### end Alembic commands ###

View File

@ -169,6 +169,12 @@ def map_config(config):
"LIMIT_CONCURRENT_SESSIONS": config.getboolean(
"default", "LIMIT_CONCURRENT_SESSIONS"
),
# Store the celery task results in a database table (celery_taskmeta)
"CELERY_RESULT_BACKEND": "db+{}".format(config.get("default", "DATABASE_URI")),
# Do not automatically delete results (by default, Celery will do this
# with a Beat job once a day)
"CELERY_RESULT_EXPIRES": 0,
"CELERY_RESULT_EXTENDED": True,
}

View File

@ -1,14 +1,36 @@
from flask import current_app as app
from atst.queue import celery
from atst.database import db
from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure
@celery.task()
class RecordEnvironmentFailure(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
if "environment_id" in kwargs:
failure = EnvironmentJobFailure(
environment_id=kwargs["environment_id"], task_id=task_id
)
db.session.add(failure)
db.session.commit()
class RecordEnvironmentRoleFailure(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
if "environment_role_id" in kwargs:
failure = EnvironmentRoleJobFailure(
environment_role_id=kwargs["environment_role_id"], task_id=task_id
)
db.session.add(failure)
db.session.commit()
@celery.task(ignore_result=True)
def send_mail(recipients, subject, body):
app.mailer.send(recipients, subject, body)
@celery.task()
@celery.task(ignore_result=True)
def send_notification_mail(recipients, subject, body):
app.logger.info(
"Sending a notification to these recipients: {}\n\nSubject: {}\n\n{}".format(

View File

@ -11,6 +11,7 @@ from .audit_event import AuditEvent
from .clin import CLIN, JEDICLINType
from .environment import Environment
from .environment_role import EnvironmentRole, CSPRole
from .job_failure import EnvironmentJobFailure, EnvironmentRoleJobFailure
from .notification_recipient import NotificationRecipient
from .permissions import Permissions
from .permission_set import PermissionSet

View File

@ -19,6 +19,8 @@ class Environment(
cloud_id = Column(String)
job_failures = relationship("EnvironmentJobFailure")
@property
def users(self):
return {r.application_role.user for r in self.roles}

View File

@ -31,6 +31,8 @@ class EnvironmentRole(
)
application_role = relationship("ApplicationRole")
job_failures = relationship("EnvironmentRoleJobFailure")
def __repr__(self):
return "<EnvironmentRole(role='{}', user='{}', environment='{}', id='{}')>".format(
self.role, self.application_role.user_name, self.environment.name, self.id

View File

@ -0,0 +1,16 @@
from sqlalchemy import Column, ForeignKey
from atst.models import Base
from atst.models import mixins
class EnvironmentJobFailure(Base, mixins.JobFailureMixin):
__tablename__ = "environment_job_failures"
environment_id = Column(ForeignKey("environments.id"), nullable=False)
class EnvironmentRoleJobFailure(Base, mixins.JobFailureMixin):
__tablename__ = "environment_role_job_failures"
environment_role_id = Column(ForeignKey("environment_roles.id"), nullable=False)

View File

@ -3,3 +3,4 @@ from .auditable import AuditableMixin
from .permissions import PermissionsMixin
from .deletable import DeletableMixin
from .invites import InvitesMixin
from .job_failure import JobFailureMixin

View File

@ -0,0 +1,14 @@
from celery.result import AsyncResult
from sqlalchemy import Column, String, Integer
class JobFailureMixin(object):
id = Column(Integer(), primary_key=True)
task_id = Column(String(), nullable=False)
@property
def task(self):
if not hasattr(self, "_task"):
self._task = AsyncResult(self.task_id)
return self._task

View File

@ -320,3 +320,12 @@ def notification_sender(app):
yield app.notification_sender
app.notification_sender = real_notification_sender
# This is the only effective means I could find to disable logging. Setting a
# `celery_enable_logging` fixture to return False should work according to the
# docs, but doesn't:
# https://docs.celeryproject.org/en/latest/userguide/testing.html#celery-enable-logging-override-to-enable-logging-in-embedded-workers
@pytest.fixture(scope="function")
def celery_worker_parameters():
return {"loglevel": "FATAL"}

41
tests/test_jobs.py Normal file
View File

@ -0,0 +1,41 @@
import pytest
from atst.jobs import RecordEnvironmentFailure, RecordEnvironmentRoleFailure
from tests.factories import EnvironmentFactory, EnvironmentRoleFactory
def test_environment_job_failure(celery_app, celery_worker):
@celery_app.task(bind=True, base=RecordEnvironmentFailure)
def _fail_hard(self, environment_id=None):
raise ValueError("something bad happened")
environment = EnvironmentFactory.create()
celery_worker.reload()
# Use apply instead of delay since we are testing the on_failure hook only
task = _fail_hard.apply(kwargs={"environment_id": environment.id})
with pytest.raises(ValueError):
task.get()
assert environment.job_failures
job_failure = environment.job_failures[0]
assert job_failure.task == task
def test_environment_role_job_failure(celery_app, celery_worker):
@celery_app.task(bind=True, base=RecordEnvironmentRoleFailure)
def _fail_hard(self, environment_role_id=None):
raise ValueError("something bad happened")
role = EnvironmentRoleFactory.create()
celery_worker.reload()
# Use apply instead of delay since we are testing the on_failure hook only
task = _fail_hard.apply(kwargs={"environment_role_id": role.id})
with pytest.raises(ValueError):
task.get()
assert role.job_failures
job_failure = role.job_failures[0]
assert job_failure.task == task