From 945debe6eede888b159fc7a64aad4d0c4b320626 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 10:39:59 -0400 Subject: [PATCH] Move claim_for_update to models.utils --- atst/domain/exceptions.py | 9 +++++++++ atst/jobs.py | 31 +------------------------------ atst/models/utils.py | 27 +++++++++++++++++++++++++++ tests/test_jobs.py | 4 ++-- 4 files changed, 39 insertions(+), 32 deletions(-) create mode 100644 atst/models/utils.py diff --git a/atst/domain/exceptions.py b/atst/domain/exceptions.py index 8c2b0caf..a89aa986 100644 --- a/atst/domain/exceptions.py +++ b/atst/domain/exceptions.py @@ -44,3 +44,12 @@ class NoAccessError(Exception): @property def message(self): return "Route for {} cannot be accessed".format(self.resource_name) + + +class ClaimFailedException(Exception): + def __init__(self, resource): + self.resource = resource + message = ( + f"Could not acquire claim for {resource.__class__.__name__} {resource.id}." + ) + super().__init__(message) diff --git a/atst/jobs.py b/atst/jobs.py index 7bd098b2..36a33064 100644 --- a/atst/jobs.py +++ b/atst/jobs.py @@ -1,13 +1,12 @@ from flask import current_app as app import pendulum -from sqlalchemy import func, sql -from contextlib import contextmanager from atst.database import db from atst.queue import celery from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException from atst.domain.environments import Environments +from atst.models.utils import claim_for_update class RecordEnvironmentFailure(celery.Task): @@ -45,34 +44,6 @@ def send_notification_mail(recipients, subject, body): app.mailer.send(recipients, subject, body) -class ClaimFailedException(Exception): - pass - - -@contextmanager -def claim_for_update(resource): - Model = resource.__class__ - - rows_updated = ( - db.session.query(Model) - .filter_by(id=resource.id, claimed_at=None) - .update({"claimed_at": func.now()}, synchronize_session=False) - ) - if rows_updated < 1: - raise ClaimFailedException( - f"Could not acquire claim for {Model.__name__} {resource.id}." - ) - - claimed = db.session.query(Model).filter_by(id=resource.id).one() - - try: - yield claimed - finally: - db.session.query(Model).filter(Model.id == resource.id).filter( - Model.claimed_at != None - ).update({"claimed_at": sql.null()}, synchronize_session=False) - - def do_create_environment(csp: CloudProviderInterface, environment_id=None): environment = Environments.get(environment_id) diff --git a/atst/models/utils.py b/atst/models/utils.py new file mode 100644 index 00000000..4e515429 --- /dev/null +++ b/atst/models/utils.py @@ -0,0 +1,27 @@ +from sqlalchemy import func, sql +from contextlib import contextmanager + +from atst.database import db +from atst.domain.exceptions import ClaimFailedException + + +@contextmanager +def claim_for_update(resource): + Model = resource.__class__ + + rows_updated = ( + db.session.query(Model) + .filter_by(id=resource.id, claimed_at=None) + .update({"claimed_at": func.now()}, synchronize_session=False) + ) + if rows_updated < 1: + raise ClaimFailedException(resource) + + claimed = db.session.query(Model).filter_by(id=resource.id).one() + + try: + yield claimed + finally: + db.session.query(Model).filter(Model.id == resource.id).filter( + Model.claimed_at != None + ).update({"claimed_at": sql.null()}, synchronize_session=False) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 279d4e4b..76f4f845 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -16,9 +16,9 @@ from atst.jobs import ( dispatch_create_atat_admin_user, dispatch_create_environment_baseline, create_environment, - claim_for_update, - ClaimFailedException, ) +from atst.models.utils import claim_for_update +from atst.domain.exceptions import ClaimFailedException from tests.factories import ( EnvironmentFactory, EnvironmentRoleFactory,