Claim resource only temporarily

This commit is contained in:
richard-dds 2019-09-17 11:03:33 -04:00
parent 945debe6ee
commit abeadee3f3
4 changed files with 20 additions and 11 deletions

View File

@ -1,4 +1,4 @@
"""add Environment claimed_at """add Environment claimed_until
Revision ID: 691b04ecd85e Revision ID: 691b04ecd85e
Revises: cfab6c8243cb Revises: cfab6c8243cb
@ -18,11 +18,11 @@ depends_on = None
def upgrade(): def upgrade():
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
op.add_column('environments', sa.Column('claimed_at', sa.TIMESTAMP(timezone=True), nullable=True)) op.add_column('environments', sa.Column('claimed_until', sa.TIMESTAMP(timezone=True), nullable=True))
# ### end Alembic commands ### # ### end Alembic commands ###
def downgrade(): def downgrade():
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
op.drop_column('environments', 'claimed_at') op.drop_column('environments', 'claimed_until')
# ### end Alembic commands ### # ### end Alembic commands ###

View File

@ -29,7 +29,7 @@ class Environment(
root_user_info = Column(JSONB) root_user_info = Column(JSONB)
baseline_info = Column(JSONB) baseline_info = Column(JSONB)
claimed_at = Column(TIMESTAMP(timezone=True)) claimed_until = Column(TIMESTAMP(timezone=True))
job_failures = relationship("EnvironmentJobFailure") job_failures = relationship("EnvironmentJobFailure")

View File

@ -1,4 +1,4 @@
from sqlalchemy import func, sql from sqlalchemy import func, sql, Interval, and_, or_
from contextlib import contextmanager from contextlib import contextmanager
from atst.database import db from atst.database import db
@ -6,13 +6,22 @@ from atst.domain.exceptions import ClaimFailedException
@contextmanager @contextmanager
def claim_for_update(resource): def claim_for_update(resource, minutes=30):
Model = resource.__class__ Model = resource.__class__
claim_until = func.now() + func.cast(
sql.functions.concat(minutes, " MINUTES"), Interval
)
rows_updated = ( rows_updated = (
db.session.query(Model) db.session.query(Model)
.filter_by(id=resource.id, claimed_at=None) .filter(
.update({"claimed_at": func.now()}, synchronize_session=False) and_(
Model.id == resource.id,
or_(Model.claimed_until == None, Model.claimed_until < func.now()),
)
)
.update({"claimed_until": claim_until}, synchronize_session=False)
) )
if rows_updated < 1: if rows_updated < 1:
raise ClaimFailedException(resource) raise ClaimFailedException(resource)
@ -23,5 +32,5 @@ def claim_for_update(resource):
yield claimed yield claimed
finally: finally:
db.session.query(Model).filter(Model.id == resource.id).filter( db.session.query(Model).filter(Model.id == resource.id).filter(
Model.claimed_at != None Model.claimed_until != None
).update({"claimed_at": sql.null()}, synchronize_session=False) ).update({"claimed_until": sql.null()}, synchronize_session=False)

View File

@ -235,7 +235,7 @@ def test_create_environment_no_dupes(session, celery_app, celery_worker):
environment = session.query(Environment).get(environment.id) environment = session.query(Environment).get(environment.id)
assert environment.cloud_id == first_cloud_id assert environment.cloud_id == first_cloud_id
assert environment.claimed_at == None assert environment.claimed_until == None
def test_claim_for_update(session): def test_claim_for_update(session):