diff --git a/alembic/versions/691b04ecd85e_add_environment_claimed_at.py b/alembic/versions/691b04ecd85e_add_environment_claimed_until.py similarity index 74% rename from alembic/versions/691b04ecd85e_add_environment_claimed_at.py rename to alembic/versions/691b04ecd85e_add_environment_claimed_until.py index 1b47650d..6fa4e5a5 100644 --- a/alembic/versions/691b04ecd85e_add_environment_claimed_at.py +++ b/alembic/versions/691b04ecd85e_add_environment_claimed_until.py @@ -1,4 +1,4 @@ -"""add Environment claimed_at +"""add Environment claimed_until Revision ID: 691b04ecd85e Revises: cfab6c8243cb @@ -18,11 +18,11 @@ depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.add_column('environments', sa.Column('claimed_at', sa.TIMESTAMP(timezone=True), nullable=True)) + op.add_column('environments', sa.Column('claimed_until', sa.TIMESTAMP(timezone=True), nullable=True)) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.drop_column('environments', 'claimed_at') + op.drop_column('environments', 'claimed_until') # ### end Alembic commands ### diff --git a/atst/models/environment.py b/atst/models/environment.py index 9f86d198..fb30e6d4 100644 --- a/atst/models/environment.py +++ b/atst/models/environment.py @@ -29,7 +29,7 @@ class Environment( root_user_info = Column(JSONB) baseline_info = Column(JSONB) - claimed_at = Column(TIMESTAMP(timezone=True)) + claimed_until = Column(TIMESTAMP(timezone=True)) job_failures = relationship("EnvironmentJobFailure") diff --git a/atst/models/utils.py b/atst/models/utils.py index 4e515429..6fb6a08b 100644 --- a/atst/models/utils.py +++ b/atst/models/utils.py @@ -1,4 +1,4 @@ -from sqlalchemy import func, sql +from sqlalchemy import func, sql, Interval, and_, or_ from contextlib import contextmanager from atst.database import db @@ -6,13 +6,22 @@ from atst.domain.exceptions import ClaimFailedException @contextmanager -def claim_for_update(resource): +def claim_for_update(resource, minutes=30): Model = resource.__class__ + claim_until = func.now() + func.cast( + sql.functions.concat(minutes, " MINUTES"), Interval + ) + rows_updated = ( db.session.query(Model) - .filter_by(id=resource.id, claimed_at=None) - .update({"claimed_at": func.now()}, synchronize_session=False) + .filter( + and_( + Model.id == resource.id, + or_(Model.claimed_until == None, Model.claimed_until < func.now()), + ) + ) + .update({"claimed_until": claim_until}, synchronize_session=False) ) if rows_updated < 1: raise ClaimFailedException(resource) @@ -23,5 +32,5 @@ def claim_for_update(resource): yield claimed finally: db.session.query(Model).filter(Model.id == resource.id).filter( - Model.claimed_at != None - ).update({"claimed_at": sql.null()}, synchronize_session=False) + Model.claimed_until != None + ).update({"claimed_until": sql.null()}, synchronize_session=False) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 76f4f845..13ebb89e 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -235,7 +235,7 @@ def test_create_environment_no_dupes(session, celery_app, celery_worker): environment = session.query(Environment).get(environment.id) assert environment.cloud_id == first_cloud_id - assert environment.claimed_at == None + assert environment.claimed_until == None def test_claim_for_update(session):