Move claim_for_update to models.utils

This commit is contained in:
richard-dds 2019-09-17 10:39:59 -04:00
parent 67a2905d51
commit 945debe6ee
4 changed files with 39 additions and 32 deletions

View File

@ -44,3 +44,12 @@ class NoAccessError(Exception):
@property @property
def message(self): def message(self):
return "Route for {} cannot be accessed".format(self.resource_name) return "Route for {} cannot be accessed".format(self.resource_name)
class ClaimFailedException(Exception):
def __init__(self, resource):
self.resource = resource
message = (
f"Could not acquire claim for {resource.__class__.__name__} {resource.id}."
)
super().__init__(message)

View File

@ -1,13 +1,12 @@
from flask import current_app as app from flask import current_app as app
import pendulum import pendulum
from sqlalchemy import func, sql
from contextlib import contextmanager
from atst.database import db from atst.database import db
from atst.queue import celery from atst.queue import celery
from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure
from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException
from atst.domain.environments import Environments from atst.domain.environments import Environments
from atst.models.utils import claim_for_update
class RecordEnvironmentFailure(celery.Task): class RecordEnvironmentFailure(celery.Task):
@ -45,34 +44,6 @@ def send_notification_mail(recipients, subject, body):
app.mailer.send(recipients, subject, body) app.mailer.send(recipients, subject, body)
class ClaimFailedException(Exception):
pass
@contextmanager
def claim_for_update(resource):
Model = resource.__class__
rows_updated = (
db.session.query(Model)
.filter_by(id=resource.id, claimed_at=None)
.update({"claimed_at": func.now()}, synchronize_session=False)
)
if rows_updated < 1:
raise ClaimFailedException(
f"Could not acquire claim for {Model.__name__} {resource.id}."
)
claimed = db.session.query(Model).filter_by(id=resource.id).one()
try:
yield claimed
finally:
db.session.query(Model).filter(Model.id == resource.id).filter(
Model.claimed_at != None
).update({"claimed_at": sql.null()}, synchronize_session=False)
def do_create_environment(csp: CloudProviderInterface, environment_id=None): def do_create_environment(csp: CloudProviderInterface, environment_id=None):
environment = Environments.get(environment_id) environment = Environments.get(environment_id)

27
atst/models/utils.py Normal file
View File

@ -0,0 +1,27 @@
from sqlalchemy import func, sql
from contextlib import contextmanager
from atst.database import db
from atst.domain.exceptions import ClaimFailedException
@contextmanager
def claim_for_update(resource):
Model = resource.__class__
rows_updated = (
db.session.query(Model)
.filter_by(id=resource.id, claimed_at=None)
.update({"claimed_at": func.now()}, synchronize_session=False)
)
if rows_updated < 1:
raise ClaimFailedException(resource)
claimed = db.session.query(Model).filter_by(id=resource.id).one()
try:
yield claimed
finally:
db.session.query(Model).filter(Model.id == resource.id).filter(
Model.claimed_at != None
).update({"claimed_at": sql.null()}, synchronize_session=False)

View File

@ -16,9 +16,9 @@ from atst.jobs import (
dispatch_create_atat_admin_user, dispatch_create_atat_admin_user,
dispatch_create_environment_baseline, dispatch_create_environment_baseline,
create_environment, create_environment,
claim_for_update,
ClaimFailedException,
) )
from atst.models.utils import claim_for_update
from atst.domain.exceptions import ClaimFailedException
from tests.factories import ( from tests.factories import (
EnvironmentFactory, EnvironmentFactory,
EnvironmentRoleFactory, EnvironmentRoleFactory,