Remove multiple job failure tables in favor of one.
We don't know yet how useful the job failue tables will be, and maintaining multiple failure tables--one for every entity involved in CSP provisioning--is burdensome. This collapses them all into a single table that track the entity type (environment, portfolio, etc.) and the entity ID. That way we can construct queries when needed to find task results.
This commit is contained in:
parent
02438dc39b
commit
bfc0692063
60
alembic/versions/508957112ed6_combine_job_failures.py
Normal file
60
alembic/versions/508957112ed6_combine_job_failures.py
Normal file
@ -0,0 +1,60 @@
|
||||
"""combine job failures
|
||||
|
||||
Revision ID: 508957112ed6
|
||||
Revises: 07e0598199f6
|
||||
Create Date: 2020-01-25 15:03:06.377442
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '508957112ed6' # pragma: allowlist secret
|
||||
down_revision = '07e0598199f6' # pragma: allowlist secret
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('job_failures',
|
||||
sa.Column('time_created', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False),
|
||||
sa.Column('time_updated', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False),
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('task_id', sa.String(), nullable=False),
|
||||
sa.Column('entity', sa.String(), nullable=False),
|
||||
sa.Column('entity_id', sa.String(), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.drop_table('environment_job_failures')
|
||||
op.drop_table('environment_role_job_failures')
|
||||
op.drop_table('portfolio_job_failures')
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('portfolio_job_failures',
|
||||
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
|
||||
sa.Column('task_id', sa.VARCHAR(), autoincrement=False, nullable=False),
|
||||
sa.Column('portfolio_id', postgresql.UUID(), autoincrement=False, nullable=False),
|
||||
sa.ForeignKeyConstraint(['portfolio_id'], ['portfolios.id'], name='portfolio_job_failures_portfolio_id_fkey'),
|
||||
sa.PrimaryKeyConstraint('id', name='portfolio_job_failures_pkey')
|
||||
)
|
||||
op.create_table('environment_role_job_failures',
|
||||
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
|
||||
sa.Column('task_id', sa.VARCHAR(), autoincrement=False, nullable=False),
|
||||
sa.Column('environment_role_id', postgresql.UUID(), autoincrement=False, nullable=False),
|
||||
sa.ForeignKeyConstraint(['environment_role_id'], ['environment_roles.id'], name='environment_role_job_failures_environment_role_id_fkey'),
|
||||
sa.PrimaryKeyConstraint('id', name='environment_role_job_failures_pkey')
|
||||
)
|
||||
op.create_table('environment_job_failures',
|
||||
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
|
||||
sa.Column('task_id', sa.VARCHAR(), autoincrement=False, nullable=False),
|
||||
sa.Column('environment_id', postgresql.UUID(), autoincrement=False, nullable=False),
|
||||
sa.ForeignKeyConstraint(['environment_id'], ['environments.id'], name='environment_job_failures_environment_id_fkey'),
|
||||
sa.PrimaryKeyConstraint('id', name='environment_job_failures_pkey')
|
||||
)
|
||||
op.drop_table('job_failures')
|
||||
# ### end Alembic commands ###
|
57
atst/jobs.py
57
atst/jobs.py
@ -3,12 +3,7 @@ import pendulum
|
||||
|
||||
from atst.database import db
|
||||
from atst.queue import celery
|
||||
from atst.models import (
|
||||
EnvironmentJobFailure,
|
||||
EnvironmentRoleJobFailure,
|
||||
EnvironmentRole,
|
||||
PortfolioJobFailure,
|
||||
)
|
||||
from atst.models import EnvironmentRole, JobFailure
|
||||
from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException
|
||||
from atst.domain.environments import Environments
|
||||
from atst.domain.portfolios import Portfolios
|
||||
@ -17,32 +12,26 @@ from atst.models.utils import claim_for_update
|
||||
from atst.utils.localization import translate
|
||||
|
||||
|
||||
class RecordPortfolioFailure(celery.Task):
|
||||
class RecordFailure(celery.Task):
|
||||
_ENTITIES = [
|
||||
"portfolio_id",
|
||||
"application_id",
|
||||
"environment_id",
|
||||
"environment_role_id",
|
||||
]
|
||||
|
||||
def _derive_entity_info(self, kwargs):
|
||||
matches = [e for e in self._ENTITIES if e in kwargs.keys()]
|
||||
if matches:
|
||||
match = matches[0]
|
||||
return {"entity": match.replace("_id", ""), "entity_id": kwargs[match]}
|
||||
else:
|
||||
return None
|
||||
|
||||
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
||||
if "portfolio_id" in kwargs:
|
||||
failure = PortfolioJobFailure(
|
||||
portfolio_id=kwargs["portfolio_id"], task_id=task_id
|
||||
)
|
||||
db.session.add(failure)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
class RecordEnvironmentFailure(celery.Task):
|
||||
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
||||
if "environment_id" in kwargs:
|
||||
failure = EnvironmentJobFailure(
|
||||
environment_id=kwargs["environment_id"], task_id=task_id
|
||||
)
|
||||
db.session.add(failure)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
class RecordEnvironmentRoleFailure(celery.Task):
|
||||
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
||||
if "environment_role_id" in kwargs:
|
||||
failure = EnvironmentRoleJobFailure(
|
||||
environment_role_id=kwargs["environment_role_id"], task_id=task_id
|
||||
)
|
||||
info = self._derive_entity_info(kwargs)
|
||||
if info:
|
||||
failure = JobFailure(**info, task_id=task_id)
|
||||
db.session.add(failure)
|
||||
db.session.commit()
|
||||
|
||||
@ -143,17 +132,17 @@ def do_provision_portfolio(csp: CloudProviderInterface, portfolio_id=None):
|
||||
fsm.trigger_next_transition()
|
||||
|
||||
|
||||
@celery.task(bind=True, base=RecordPortfolioFailure)
|
||||
@celery.task(bind=True, base=RecordFailure)
|
||||
def provision_portfolio(self, portfolio_id=None):
|
||||
do_work(do_provision_portfolio, self, app.csp.cloud, portfolio_id=portfolio_id)
|
||||
|
||||
|
||||
@celery.task(bind=True, base=RecordEnvironmentFailure)
|
||||
@celery.task(bind=True, base=RecordFailure)
|
||||
def create_environment(self, environment_id=None):
|
||||
do_work(do_create_environment, self, app.csp.cloud, environment_id=environment_id)
|
||||
|
||||
|
||||
@celery.task(bind=True, base=RecordEnvironmentFailure)
|
||||
@celery.task(bind=True, base=RecordFailure)
|
||||
def create_atat_admin_user(self, environment_id=None):
|
||||
do_work(
|
||||
do_create_atat_admin_user, self, app.csp.cloud, environment_id=environment_id
|
||||
|
@ -7,11 +7,7 @@ from .audit_event import AuditEvent
|
||||
from .clin import CLIN, JEDICLINType
|
||||
from .environment import Environment
|
||||
from .environment_role import EnvironmentRole, CSPRole
|
||||
from .job_failure import (
|
||||
EnvironmentJobFailure,
|
||||
EnvironmentRoleJobFailure,
|
||||
PortfolioJobFailure,
|
||||
)
|
||||
from .job_failure import JobFailure
|
||||
from .notification_recipient import NotificationRecipient
|
||||
from .permissions import Permissions
|
||||
from .permission_set import PermissionSet
|
||||
|
@ -30,8 +30,6 @@ class Environment(
|
||||
|
||||
claimed_until = Column(TIMESTAMP(timezone=True))
|
||||
|
||||
job_failures = relationship("EnvironmentJobFailure")
|
||||
|
||||
roles = relationship(
|
||||
"EnvironmentRole",
|
||||
back_populates="environment",
|
||||
|
@ -32,8 +32,6 @@ class EnvironmentRole(
|
||||
)
|
||||
application_role = relationship("ApplicationRole")
|
||||
|
||||
job_failures = relationship("EnvironmentRoleJobFailure")
|
||||
|
||||
csp_user_id = Column(String())
|
||||
claimed_until = Column(TIMESTAMP(timezone=True))
|
||||
|
||||
|
@ -1,22 +1,21 @@
|
||||
from sqlalchemy import Column, ForeignKey
|
||||
from celery.result import AsyncResult
|
||||
from sqlalchemy import Column, String, Integer
|
||||
|
||||
from atst.models.base import Base
|
||||
import atst.models.mixins as mixins
|
||||
|
||||
|
||||
class EnvironmentJobFailure(Base, mixins.JobFailureMixin):
|
||||
__tablename__ = "environment_job_failures"
|
||||
class JobFailure(Base, mixins.TimestampsMixin):
|
||||
__tablename__ = "job_failures"
|
||||
|
||||
environment_id = Column(ForeignKey("environments.id"), nullable=False)
|
||||
id = Column(Integer(), primary_key=True)
|
||||
task_id = Column(String(), nullable=False)
|
||||
entity = Column(String(), nullable=False)
|
||||
entity_id = Column(String(), nullable=False)
|
||||
|
||||
@property
|
||||
def task(self):
|
||||
if not hasattr(self, "_task"):
|
||||
self._task = AsyncResult(self.task_id)
|
||||
|
||||
class EnvironmentRoleJobFailure(Base, mixins.JobFailureMixin):
|
||||
__tablename__ = "environment_role_job_failures"
|
||||
|
||||
environment_role_id = Column(ForeignKey("environment_roles.id"), nullable=False)
|
||||
|
||||
|
||||
class PortfolioJobFailure(Base, mixins.JobFailureMixin):
|
||||
__tablename__ = "portfolio_job_failures"
|
||||
|
||||
portfolio_id = Column(ForeignKey("portfolios.id"), nullable=False)
|
||||
return self._task
|
||||
|
@ -3,5 +3,4 @@ from .auditable import AuditableMixin
|
||||
from .permissions import PermissionsMixin
|
||||
from .deletable import DeletableMixin
|
||||
from .invites import InvitesMixin
|
||||
from .job_failure import JobFailureMixin
|
||||
from .state_machines import FSMMixin
|
||||
|
@ -1,14 +0,0 @@
|
||||
from celery.result import AsyncResult
|
||||
from sqlalchemy import Column, String, Integer
|
||||
|
||||
|
||||
class JobFailureMixin(object):
|
||||
id = Column(Integer(), primary_key=True)
|
||||
task_id = Column(String(), nullable=False)
|
||||
|
||||
@property
|
||||
def task(self):
|
||||
if not hasattr(self, "_task"):
|
||||
self._task = AsyncResult(self.task_id)
|
||||
|
||||
return self._task
|
@ -8,8 +8,7 @@ from atst.domain.csp.cloud import MockCloudProvider
|
||||
from atst.domain.portfolios import Portfolios
|
||||
|
||||
from atst.jobs import (
|
||||
RecordEnvironmentFailure,
|
||||
RecordEnvironmentRoleFailure,
|
||||
RecordFailure,
|
||||
dispatch_create_environment,
|
||||
dispatch_create_atat_admin_user,
|
||||
dispatch_provision_portfolio,
|
||||
@ -29,7 +28,7 @@ from tests.factories import (
|
||||
PortfolioStateMachineFactory,
|
||||
ApplicationRoleFactory,
|
||||
)
|
||||
from atst.models import CSPRole, EnvironmentRole, ApplicationRoleStatus
|
||||
from atst.models import CSPRole, EnvironmentRole, ApplicationRoleStatus, JobFailure
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="function")
|
||||
@ -43,8 +42,17 @@ def portfolio():
|
||||
return portfolio
|
||||
|
||||
|
||||
def test_environment_job_failure(celery_app, celery_worker):
|
||||
@celery_app.task(bind=True, base=RecordEnvironmentFailure)
|
||||
def _find_failure(session, entity, id_):
|
||||
return (
|
||||
session.query(JobFailure)
|
||||
.filter(JobFailure.entity == entity)
|
||||
.filter(JobFailure.entity_id == id_)
|
||||
.one()
|
||||
)
|
||||
|
||||
|
||||
def test_environment_job_failure(session, celery_app, celery_worker):
|
||||
@celery_app.task(bind=True, base=RecordFailure)
|
||||
def _fail_hard(self, environment_id=None):
|
||||
raise ValueError("something bad happened")
|
||||
|
||||
@ -56,13 +64,12 @@ def test_environment_job_failure(celery_app, celery_worker):
|
||||
with pytest.raises(ValueError):
|
||||
task.get()
|
||||
|
||||
assert environment.job_failures
|
||||
job_failure = environment.job_failures[0]
|
||||
job_failure = _find_failure(session, "environment", str(environment.id))
|
||||
assert job_failure.task == task
|
||||
|
||||
|
||||
def test_environment_role_job_failure(celery_app, celery_worker):
|
||||
@celery_app.task(bind=True, base=RecordEnvironmentRoleFailure)
|
||||
def test_environment_role_job_failure(session, celery_app, celery_worker):
|
||||
@celery_app.task(bind=True, base=RecordFailure)
|
||||
def _fail_hard(self, environment_role_id=None):
|
||||
raise ValueError("something bad happened")
|
||||
|
||||
@ -74,8 +81,7 @@ def test_environment_role_job_failure(celery_app, celery_worker):
|
||||
with pytest.raises(ValueError):
|
||||
task.get()
|
||||
|
||||
assert role.job_failures
|
||||
job_failure = role.job_failures[0]
|
||||
job_failure = _find_failure(session, "environment_role", str(role.id))
|
||||
assert job_failure.task == task
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user