From 2bbe974755d678e6506802e29af62607dcdce33d Mon Sep 17 00:00:00 2001 From: richard-dds Date: Fri, 13 Sep 2019 15:50:51 -0400 Subject: [PATCH 01/18] Implement simple locking system for environments --- ...691b04ecd85e_add_environment_claimed_at.py | 28 ++++++ atst/domain/environments.py | 3 +- atst/jobs.py | 91 ++++++++++++----- atst/models/environment.py | 6 +- tests/test_jobs.py | 98 ++++++++++++++++++- 5 files changed, 198 insertions(+), 28 deletions(-) create mode 100644 alembic/versions/691b04ecd85e_add_environment_claimed_at.py diff --git a/alembic/versions/691b04ecd85e_add_environment_claimed_at.py b/alembic/versions/691b04ecd85e_add_environment_claimed_at.py new file mode 100644 index 00000000..2c0a21d9 --- /dev/null +++ b/alembic/versions/691b04ecd85e_add_environment_claimed_at.py @@ -0,0 +1,28 @@ +"""add Environment claimed_at + +Revision ID: 691b04ecd85e +Revises: cfab6c8243cb +Create Date: 2019-09-13 11:51:24.677399 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '691b04ecd85e' # pragma: allowlist secret +down_revision = 'cfab6c8243cb' # pragma: allowlist secret +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('environments', sa.Column('claimed_at', sa.TIMESTAMP(timezone=True), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('environments', 'claimed_at') + # ### end Alembic commands ### diff --git a/atst/domain/environments.py b/atst/domain/environments.py index ace75a3a..2d9eef35 100644 --- a/atst/domain/environments.py +++ b/atst/domain/environments.py @@ -1,7 +1,8 @@ -from sqlalchemy import text +from sqlalchemy import text, func from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm import load_only from typing import List +from contextlib import contextmanager from atst.database import db from atst.models import Environment, Application, Portfolio, TaskOrder, CLIN diff --git a/atst/jobs.py b/atst/jobs.py index 050cb4ca..cad4877b 100644 --- a/atst/jobs.py +++ b/atst/jobs.py @@ -1,12 +1,16 @@ from flask import current_app as app import pendulum +from celery.utils.log import get_task_logger +from sqlalchemy import func, orm, sql +from sqlalchemy import update from atst.database import db from atst.queue import celery from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException from atst.domain.environments import Environments -from atst.domain.users import Users + +logger = get_task_logger(__name__) class RecordEnvironmentFailure(celery.Task): @@ -44,32 +48,64 @@ def send_notification_mail(recipients, subject, body): app.mailer.send(recipients, subject, body) -def do_create_environment( - csp: CloudProviderInterface, environment_id=None, atat_user_id=None -): +from contextlib import contextmanager + + +class ClaimFailedException(Exception): + pass + + +@contextmanager +def claim_for_update(resource): + rows_updated = ( + db.session.query(resource.__class__) + .filter_by(id=resource.id, claimed_at=None) + .update({"claimed_at": func.now()}, synchronize_session="fetch") + ) + if rows_updated < 1: + raise ClaimFailedException( + f"Could not acquire claim for {resource.__class__.__name__} {resource.id}." + ) + + claimed = db.session.query(resource.__class__).filter_by(id=resource.id).one() + + try: + yield claimed + finally: + db.session.query(resource.__class__).filter( + resource.__class__.id == resource.id + ).filter(resource.__class__.claimed_at != None).update( + {"claimed_at": sql.null()}, synchronize_session="fetch" + ) + + +def do_create_environment(csp: CloudProviderInterface, environment_id=None): + logger.info(environment_id) environment = Environments.get(environment_id) - if environment.cloud_id is not None: - # TODO: Return value for this? - return + with claim_for_update(environment) as environment: - user = Users.get(atat_user_id) + if environment.cloud_id is not None: + # TODO: Return value for this? + return - # we'll need to do some checking in this job for cases where it's retrying - # when a failure occured after some successful steps - # (e.g. if environment.cloud_id is not None, then we can skip first step) + user = environment.creator - # credentials either from a given user or pulled from config? - # if using global creds, do we need to log what user authorized action? - atat_root_creds = csp.root_creds() + # we'll need to do some checking in this job for cases where it's retrying + # when a failure occured after some successful steps + # (e.g. if environment.cloud_id is not None, then we can skip first step) - # user is needed because baseline root account in the environment will - # be assigned to the requesting user, open question how to handle duplicate - # email addresses across new environments - csp_environment_id = csp.create_environment(atat_root_creds, user, environment) - environment.cloud_id = csp_environment_id - db.session.add(environment) - db.session.commit() + # credentials either from a given user or pulled from config? + # if using global creds, do we need to log what user authorized action? + atat_root_creds = csp.root_creds() + + # user is needed because baseline root account in the environment will + # be assigned to the requesting user, open question how to handle duplicate + # email addresses across new environments + csp_environment_id = csp.create_environment(atat_root_creds, user, environment) + environment.cloud_id = csp_environment_id + db.session.add(environment) + db.session.commit() def do_create_atat_admin_user(csp: CloudProviderInterface, environment_id=None): @@ -107,17 +143,24 @@ def do_work(fn, task, csp, **kwargs): @celery.task(bind=True) def create_environment(self, environment_id=None, atat_user_id=None): - do_work(do_create_environment, self, app.csp.cloud, **kwargs) + do_work(do_create_environment, self, app.csp.cloud, environment_id=environment_id) @celery.task(bind=True) def create_atat_admin_user(self, environment_id=None): - do_work(do_create_atat_admin_user, self, app.csp.cloud, **kwargs) + do_work( + do_create_atat_admin_user, self, app.csp.cloud, environment_id=environment_id + ) @celery.task(bind=True) def create_environment_baseline(self, environment_id=None): - do_work(do_create_environment_baseline, self, app.csp.cloud, **kwargs) + do_work( + do_create_environment_baseline, + self, + app.csp.cloud, + environment_id=environment_id, + ) @celery.task(bind=True) diff --git a/atst/models/environment.py b/atst/models/environment.py index fcdea074..a6774434 100644 --- a/atst/models/environment.py +++ b/atst/models/environment.py @@ -1,11 +1,13 @@ -from sqlalchemy import Column, ForeignKey, String +from sqlalchemy import Column, ForeignKey, String, TIMESTAMP from sqlalchemy.orm import relationship from sqlalchemy.dialects.postgresql import JSONB from enum import Enum +import contextlib from atst.models import Base from atst.models.types import Id from atst.models import mixins +from atst.database import db class Environment( @@ -29,6 +31,8 @@ class Environment( root_user_info = Column(JSONB) baseline_info = Column(JSONB) + claimed_at = Column(TIMESTAMP(timezone=True)) + job_failures = relationship("EnvironmentJobFailure") class ProvisioningStatus(Enum): diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 749b5ad5..079114c8 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -14,6 +14,9 @@ from atst.jobs import ( dispatch_create_environment, dispatch_create_atat_admin_user, dispatch_create_environment_baseline, + create_environment, + claim_for_update, + ClaimFailedException, ) from tests.factories import ( EnvironmentFactory, @@ -22,6 +25,9 @@ from tests.factories import ( PortfolioFactory, ) +from threading import Thread +from time import sleep + def test_environment_job_failure(celery_app, celery_worker): @celery_app.task(bind=True, base=RecordEnvironmentFailure) @@ -73,7 +79,7 @@ def csp(): def test_create_environment_job(session, csp): user = UserFactory.create() environment = EnvironmentFactory.create() - do_create_environment(csp, environment.id, user.id) + do_create_environment(csp, environment.id) environment_id = environment.id del environment @@ -86,7 +92,7 @@ def test_create_environment_job(session, csp): def test_create_environment_job_is_idempotent(csp, session): user = UserFactory.create() environment = EnvironmentFactory.create(cloud_id=uuid4().hex) - do_create_environment(csp, environment.id, user.id) + do_create_environment(csp, environment.id) csp.create_environment.assert_not_called() @@ -196,3 +202,91 @@ def test_dispatch_create_environment_baseline(session, monkeypatch): dispatch_create_environment_baseline.run() mock.delay.assert_called_once_with(environment_id=environment.id) + + +def test_create_environment_no_dupes(session, celery_app, celery_worker): + portfolio = PortfolioFactory.create( + applications=[ + { + "environments": [ + { + "cloud_id": uuid4().hex, + "root_user_info": {}, + "baseline_info": None, + } + ] + } + ], + task_orders=[ + { + "create_clins": [ + { + "start_date": pendulum.now().subtract(days=1), + "end_date": pendulum.now().add(days=1), + } + ] + } + ], + ) + environment = portfolio.applications[0].environments[0] + + create_environment.run(environment_id=environment.id) + environment = session.query(Environment).get(environment.id) + first_cloud_id = environment.cloud_id + + create_environment.run(environment_id=environment.id) + environment = session.query(Environment).get(environment.id) + + assert environment.cloud_id == first_cloud_id + assert environment.claimed_at == None + + +def test_claim(session): + portfolio = PortfolioFactory.create( + applications=[ + { + "environments": [ + { + "cloud_id": uuid4().hex, + "root_user_info": {}, + "baseline_info": None, + } + ] + } + ], + task_orders=[ + { + "create_clins": [ + { + "start_date": pendulum.now().subtract(days=1), + "end_date": pendulum.now().add(days=1), + } + ] + } + ], + ) + environment = portfolio.applications[0].environments[0] + + events = [] + + class FirstThread(Thread): + def run(self): + with claim_for_update(environment): + events.append("first") + + class SecondThread(Thread): + def run(self): + try: + with claim_for_update(environment): + events.append("second") + except Exception: + pass + + t1 = FirstThread() + t2 = SecondThread() + t1.start() + t2.start() + t1.join() + t2.join() + + assert events == ["first"] From 030d67719b43d47bd42c168182ae7e72449e1057 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Mon, 16 Sep 2019 16:47:29 -0400 Subject: [PATCH 02/18] Remove unused imports --- atst/domain/environments.py | 3 +-- atst/jobs.py | 14 ++++---------- atst/models/environment.py | 2 -- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/atst/domain/environments.py b/atst/domain/environments.py index 2d9eef35..ace75a3a 100644 --- a/atst/domain/environments.py +++ b/atst/domain/environments.py @@ -1,8 +1,7 @@ -from sqlalchemy import text, func +from sqlalchemy import text from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm import load_only from typing import List -from contextlib import contextmanager from atst.database import db from atst.models import Environment, Application, Portfolio, TaskOrder, CLIN diff --git a/atst/jobs.py b/atst/jobs.py index cad4877b..fa07c9da 100644 --- a/atst/jobs.py +++ b/atst/jobs.py @@ -1,8 +1,7 @@ from flask import current_app as app import pendulum -from celery.utils.log import get_task_logger -from sqlalchemy import func, orm, sql -from sqlalchemy import update +from sqlalchemy import func, sql +from contextlib import contextmanager from atst.database import db from atst.queue import celery @@ -10,8 +9,6 @@ from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException from atst.domain.environments import Environments -logger = get_task_logger(__name__) - class RecordEnvironmentFailure(celery.Task): def on_failure(self, exc, task_id, args, kwargs, einfo): @@ -48,9 +45,6 @@ def send_notification_mail(recipients, subject, body): app.mailer.send(recipients, subject, body) -from contextlib import contextmanager - - class ClaimFailedException(Exception): pass @@ -60,7 +54,7 @@ def claim_for_update(resource): rows_updated = ( db.session.query(resource.__class__) .filter_by(id=resource.id, claimed_at=None) - .update({"claimed_at": func.now()}, synchronize_session="fetch") + .update({"claimed_at": func.now()}, synchronize_session=False) ) if rows_updated < 1: raise ClaimFailedException( @@ -75,7 +69,7 @@ def claim_for_update(resource): db.session.query(resource.__class__).filter( resource.__class__.id == resource.id ).filter(resource.__class__.claimed_at != None).update( - {"claimed_at": sql.null()}, synchronize_session="fetch" + {"claimed_at": sql.null()}, synchronize_session=False ) diff --git a/atst/models/environment.py b/atst/models/environment.py index a6774434..9f86d198 100644 --- a/atst/models/environment.py +++ b/atst/models/environment.py @@ -2,12 +2,10 @@ from sqlalchemy import Column, ForeignKey, String, TIMESTAMP from sqlalchemy.orm import relationship from sqlalchemy.dialects.postgresql import JSONB from enum import Enum -import contextlib from atst.models import Base from atst.models.types import Id from atst.models import mixins -from atst.database import db class Environment( From 5012c5a4d3338e02074acfd6362ffef507070c62 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Mon, 16 Sep 2019 16:53:03 -0400 Subject: [PATCH 03/18] Fix migration path --- alembic/versions/691b04ecd85e_add_environment_claimed_at.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alembic/versions/691b04ecd85e_add_environment_claimed_at.py b/alembic/versions/691b04ecd85e_add_environment_claimed_at.py index 2c0a21d9..1b47650d 100644 --- a/alembic/versions/691b04ecd85e_add_environment_claimed_at.py +++ b/alembic/versions/691b04ecd85e_add_environment_claimed_at.py @@ -11,7 +11,7 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '691b04ecd85e' # pragma: allowlist secret -down_revision = 'cfab6c8243cb' # pragma: allowlist secret +down_revision = '502e79c55d2d' # pragma: allowlist secret branch_labels = None depends_on = None From c0a629ae9a3e21c5a9d1b58a8e6c6a5e2afd7549 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Mon, 16 Sep 2019 16:54:49 -0400 Subject: [PATCH 04/18] Alias resource class to Model --- atst/jobs.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/atst/jobs.py b/atst/jobs.py index fa07c9da..78bbb2dc 100644 --- a/atst/jobs.py +++ b/atst/jobs.py @@ -51,24 +51,26 @@ class ClaimFailedException(Exception): @contextmanager def claim_for_update(resource): + Model = resource.__class__ + rows_updated = ( - db.session.query(resource.__class__) + db.session.query(Model) .filter_by(id=resource.id, claimed_at=None) .update({"claimed_at": func.now()}, synchronize_session=False) ) if rows_updated < 1: raise ClaimFailedException( - f"Could not acquire claim for {resource.__class__.__name__} {resource.id}." + f"Could not acquire claim for {Model.__name__} {resource.id}." ) - claimed = db.session.query(resource.__class__).filter_by(id=resource.id).one() + claimed = db.session.query(Model).filter_by(id=resource.id).one() try: yield claimed finally: - db.session.query(resource.__class__).filter( - resource.__class__.id == resource.id - ).filter(resource.__class__.claimed_at != None).update( + db.session.query(Model).filter( + Model.id == resource.id + ).filter(Model.claimed_at != None).update( {"claimed_at": sql.null()}, synchronize_session=False ) From 97cefc75013f00b18f941e5e788de73f0fac7679 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Mon, 16 Sep 2019 16:58:28 -0400 Subject: [PATCH 05/18] Formatting --- atst/jobs.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/atst/jobs.py b/atst/jobs.py index 78bbb2dc..7bd098b2 100644 --- a/atst/jobs.py +++ b/atst/jobs.py @@ -68,15 +68,12 @@ def claim_for_update(resource): try: yield claimed finally: - db.session.query(Model).filter( - Model.id == resource.id - ).filter(Model.claimed_at != None).update( - {"claimed_at": sql.null()}, synchronize_session=False - ) + db.session.query(Model).filter(Model.id == resource.id).filter( + Model.claimed_at != None + ).update({"claimed_at": sql.null()}, synchronize_session=False) def do_create_environment(csp: CloudProviderInterface, environment_id=None): - logger.info(environment_id) environment = Environments.get(environment_id) with claim_for_update(environment) as environment: From 4624acd1c5aa65350b7b4306232381ef700054f3 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Mon, 16 Sep 2019 16:59:18 -0400 Subject: [PATCH 06/18] Remove unused import --- tests/test_jobs.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 079114c8..37b96f1e 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -26,7 +26,6 @@ from tests.factories import ( ) from threading import Thread -from time import sleep def test_environment_job_failure(celery_app, celery_worker): @@ -68,7 +67,6 @@ def test_environment_role_job_failure(celery_app, celery_worker): now = pendulum.now() yesterday = now.subtract(days=1) tomorrow = now.add(days=1) -from atst.domain.environments import Environments @pytest.fixture(autouse=True, scope="function") From 67a2905d5107b1a572db860ae888af75787365c9 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Mon, 16 Sep 2019 17:03:57 -0400 Subject: [PATCH 07/18] Make claim_for_update easier to follow --- tests/test_jobs.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 37b96f1e..279d4e4b 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -2,6 +2,7 @@ import pendulum import pytest from uuid import uuid4 from unittest.mock import Mock +from threading import Thread from atst.models import Environment from atst.domain.csp.cloud import MockCloudProvider @@ -25,8 +26,6 @@ from tests.factories import ( PortfolioFactory, ) -from threading import Thread - def test_environment_job_failure(celery_app, celery_worker): @celery_app.task(bind=True, base=RecordEnvironmentFailure) @@ -239,7 +238,7 @@ def test_create_environment_no_dupes(session, celery_app, celery_worker): assert environment.claimed_at == None -def test_claim(session): +def test_claim_for_update(session): portfolio = PortfolioFactory.create( applications=[ { @@ -265,19 +264,21 @@ def test_claim(session): ) environment = portfolio.applications[0].environments[0] - events = [] + satisfied_claims = [] + # Two threads that race to acquire a claim on the same environment. + # SecondThread's claim will be rejected, which will result in a ClaimFailedException. class FirstThread(Thread): def run(self): with claim_for_update(environment): - events.append("first") + satisfied_claims.append("FirstThread") class SecondThread(Thread): def run(self): try: with claim_for_update(environment): - events.append("second") - except Exception: + satisfied_claims.append("SecondThread") + except ClaimFailedException: pass t1 = FirstThread() @@ -287,4 +288,4 @@ def test_claim(session): t1.join() t2.join() - assert events == ["first"] + assert satisfied_claims == ["FirstThread"] From 945debe6eede888b159fc7a64aad4d0c4b320626 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 10:39:59 -0400 Subject: [PATCH 08/18] Move claim_for_update to models.utils --- atst/domain/exceptions.py | 9 +++++++++ atst/jobs.py | 31 +------------------------------ atst/models/utils.py | 27 +++++++++++++++++++++++++++ tests/test_jobs.py | 4 ++-- 4 files changed, 39 insertions(+), 32 deletions(-) create mode 100644 atst/models/utils.py diff --git a/atst/domain/exceptions.py b/atst/domain/exceptions.py index 8c2b0caf..a89aa986 100644 --- a/atst/domain/exceptions.py +++ b/atst/domain/exceptions.py @@ -44,3 +44,12 @@ class NoAccessError(Exception): @property def message(self): return "Route for {} cannot be accessed".format(self.resource_name) + + +class ClaimFailedException(Exception): + def __init__(self, resource): + self.resource = resource + message = ( + f"Could not acquire claim for {resource.__class__.__name__} {resource.id}." + ) + super().__init__(message) diff --git a/atst/jobs.py b/atst/jobs.py index 7bd098b2..36a33064 100644 --- a/atst/jobs.py +++ b/atst/jobs.py @@ -1,13 +1,12 @@ from flask import current_app as app import pendulum -from sqlalchemy import func, sql -from contextlib import contextmanager from atst.database import db from atst.queue import celery from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException from atst.domain.environments import Environments +from atst.models.utils import claim_for_update class RecordEnvironmentFailure(celery.Task): @@ -45,34 +44,6 @@ def send_notification_mail(recipients, subject, body): app.mailer.send(recipients, subject, body) -class ClaimFailedException(Exception): - pass - - -@contextmanager -def claim_for_update(resource): - Model = resource.__class__ - - rows_updated = ( - db.session.query(Model) - .filter_by(id=resource.id, claimed_at=None) - .update({"claimed_at": func.now()}, synchronize_session=False) - ) - if rows_updated < 1: - raise ClaimFailedException( - f"Could not acquire claim for {Model.__name__} {resource.id}." - ) - - claimed = db.session.query(Model).filter_by(id=resource.id).one() - - try: - yield claimed - finally: - db.session.query(Model).filter(Model.id == resource.id).filter( - Model.claimed_at != None - ).update({"claimed_at": sql.null()}, synchronize_session=False) - - def do_create_environment(csp: CloudProviderInterface, environment_id=None): environment = Environments.get(environment_id) diff --git a/atst/models/utils.py b/atst/models/utils.py new file mode 100644 index 00000000..4e515429 --- /dev/null +++ b/atst/models/utils.py @@ -0,0 +1,27 @@ +from sqlalchemy import func, sql +from contextlib import contextmanager + +from atst.database import db +from atst.domain.exceptions import ClaimFailedException + + +@contextmanager +def claim_for_update(resource): + Model = resource.__class__ + + rows_updated = ( + db.session.query(Model) + .filter_by(id=resource.id, claimed_at=None) + .update({"claimed_at": func.now()}, synchronize_session=False) + ) + if rows_updated < 1: + raise ClaimFailedException(resource) + + claimed = db.session.query(Model).filter_by(id=resource.id).one() + + try: + yield claimed + finally: + db.session.query(Model).filter(Model.id == resource.id).filter( + Model.claimed_at != None + ).update({"claimed_at": sql.null()}, synchronize_session=False) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 279d4e4b..76f4f845 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -16,9 +16,9 @@ from atst.jobs import ( dispatch_create_atat_admin_user, dispatch_create_environment_baseline, create_environment, - claim_for_update, - ClaimFailedException, ) +from atst.models.utils import claim_for_update +from atst.domain.exceptions import ClaimFailedException from tests.factories import ( EnvironmentFactory, EnvironmentRoleFactory, From abeadee3f36d1d616bf05c7cc2bb54a161c81893 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 11:03:33 -0400 Subject: [PATCH 09/18] Claim resource only temporarily --- ...04ecd85e_add_environment_claimed_until.py} | 6 +++--- atst/models/environment.py | 2 +- atst/models/utils.py | 21 +++++++++++++------ tests/test_jobs.py | 2 +- 4 files changed, 20 insertions(+), 11 deletions(-) rename alembic/versions/{691b04ecd85e_add_environment_claimed_at.py => 691b04ecd85e_add_environment_claimed_until.py} (74%) diff --git a/alembic/versions/691b04ecd85e_add_environment_claimed_at.py b/alembic/versions/691b04ecd85e_add_environment_claimed_until.py similarity index 74% rename from alembic/versions/691b04ecd85e_add_environment_claimed_at.py rename to alembic/versions/691b04ecd85e_add_environment_claimed_until.py index 1b47650d..6fa4e5a5 100644 --- a/alembic/versions/691b04ecd85e_add_environment_claimed_at.py +++ b/alembic/versions/691b04ecd85e_add_environment_claimed_until.py @@ -1,4 +1,4 @@ -"""add Environment claimed_at +"""add Environment claimed_until Revision ID: 691b04ecd85e Revises: cfab6c8243cb @@ -18,11 +18,11 @@ depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.add_column('environments', sa.Column('claimed_at', sa.TIMESTAMP(timezone=True), nullable=True)) + op.add_column('environments', sa.Column('claimed_until', sa.TIMESTAMP(timezone=True), nullable=True)) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.drop_column('environments', 'claimed_at') + op.drop_column('environments', 'claimed_until') # ### end Alembic commands ### diff --git a/atst/models/environment.py b/atst/models/environment.py index 9f86d198..fb30e6d4 100644 --- a/atst/models/environment.py +++ b/atst/models/environment.py @@ -29,7 +29,7 @@ class Environment( root_user_info = Column(JSONB) baseline_info = Column(JSONB) - claimed_at = Column(TIMESTAMP(timezone=True)) + claimed_until = Column(TIMESTAMP(timezone=True)) job_failures = relationship("EnvironmentJobFailure") diff --git a/atst/models/utils.py b/atst/models/utils.py index 4e515429..6fb6a08b 100644 --- a/atst/models/utils.py +++ b/atst/models/utils.py @@ -1,4 +1,4 @@ -from sqlalchemy import func, sql +from sqlalchemy import func, sql, Interval, and_, or_ from contextlib import contextmanager from atst.database import db @@ -6,13 +6,22 @@ from atst.domain.exceptions import ClaimFailedException @contextmanager -def claim_for_update(resource): +def claim_for_update(resource, minutes=30): Model = resource.__class__ + claim_until = func.now() + func.cast( + sql.functions.concat(minutes, " MINUTES"), Interval + ) + rows_updated = ( db.session.query(Model) - .filter_by(id=resource.id, claimed_at=None) - .update({"claimed_at": func.now()}, synchronize_session=False) + .filter( + and_( + Model.id == resource.id, + or_(Model.claimed_until == None, Model.claimed_until < func.now()), + ) + ) + .update({"claimed_until": claim_until}, synchronize_session=False) ) if rows_updated < 1: raise ClaimFailedException(resource) @@ -23,5 +32,5 @@ def claim_for_update(resource): yield claimed finally: db.session.query(Model).filter(Model.id == resource.id).filter( - Model.claimed_at != None - ).update({"claimed_at": sql.null()}, synchronize_session=False) + Model.claimed_until != None + ).update({"claimed_until": sql.null()}, synchronize_session=False) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 76f4f845..13ebb89e 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -235,7 +235,7 @@ def test_create_environment_no_dupes(session, celery_app, celery_worker): environment = session.query(Environment).get(environment.id) assert environment.cloud_id == first_cloud_id - assert environment.claimed_at == None + assert environment.claimed_until == None def test_claim_for_update(session): From c1b87356ce5be50a0fac3307ed7ade751765c7aa Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 11:06:23 -0400 Subject: [PATCH 10/18] Update session while claiming Not totally sure if this is necessary, but I feel like it makes more sense to err on the side of more data correctness, rather than hypothesizing about performance --- atst/models/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/atst/models/utils.py b/atst/models/utils.py index 6fb6a08b..fe0fe4f7 100644 --- a/atst/models/utils.py +++ b/atst/models/utils.py @@ -21,7 +21,7 @@ def claim_for_update(resource, minutes=30): or_(Model.claimed_until == None, Model.claimed_until < func.now()), ) ) - .update({"claimed_until": claim_until}, synchronize_session=False) + .update({"claimed_until": claim_until}, synchronize_session="fetch") ) if rows_updated < 1: raise ClaimFailedException(resource) @@ -33,4 +33,4 @@ def claim_for_update(resource, minutes=30): finally: db.session.query(Model).filter(Model.id == resource.id).filter( Model.claimed_until != None - ).update({"claimed_until": sql.null()}, synchronize_session=False) + ).update({"claimed_until": sql.null()}, synchronize_session="fetch") From 23261da3af8c7a64f1655eee8afc7db5b19ca86e Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 11:15:25 -0400 Subject: [PATCH 11/18] Use None isntead of sql.null --- atst/models/utils.py | 2 +- tests/test_jobs.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/atst/models/utils.py b/atst/models/utils.py index fe0fe4f7..e7b69032 100644 --- a/atst/models/utils.py +++ b/atst/models/utils.py @@ -33,4 +33,4 @@ def claim_for_update(resource, minutes=30): finally: db.session.query(Model).filter(Model.id == resource.id).filter( Model.claimed_until != None - ).update({"claimed_until": sql.null()}, synchronize_session="fetch") + ).update({"claimed_until": None}, synchronize_session="fetch") diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 13ebb89e..7973936e 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -288,4 +288,10 @@ def test_claim_for_update(session): t1.join() t2.join() + session.refresh(environment) + + # Only FirstThread acquired a claim and wrote to satisfied_claims assert satisfied_claims == ["FirstThread"] + + # The claim is released as soon as work is done + assert environment.claimed_until is None From 7004f7d37e96d14d60257f73b4043317605633f0 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 11:30:07 -0400 Subject: [PATCH 12/18] Add a docstring and some comments --- atst/models/utils.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/atst/models/utils.py b/atst/models/utils.py index e7b69032..bde09c91 100644 --- a/atst/models/utils.py +++ b/atst/models/utils.py @@ -7,12 +7,22 @@ from atst.domain.exceptions import ClaimFailedException @contextmanager def claim_for_update(resource, minutes=30): + """ + Claim a mutually exclusive expiring hold on a resource. + Uses the database as a central source of time in case the server clocks have drifted. + + Args: + resource: A SQLAlchemy model instance with a `claimed_until` attribute. + minutes: The maximum amount of time, in minutes, to hold the claim. + """ Model = resource.__class__ claim_until = func.now() + func.cast( sql.functions.concat(minutes, " MINUTES"), Interval ) + # Optimistically query for and update the resource in question. If it's + # already claimed, `rows_updated` will be 0 and we can give up. rows_updated = ( db.session.query(Model) .filter( @@ -26,11 +36,14 @@ def claim_for_update(resource, minutes=30): if rows_updated < 1: raise ClaimFailedException(resource) + # Fetch the claimed resource claimed = db.session.query(Model).filter_by(id=resource.id).one() try: + # Give the resource to the caller. yield claimed finally: + # Release the claim. db.session.query(Model).filter(Model.id == resource.id).filter( Model.claimed_until != None ).update({"claimed_until": None}, synchronize_session="fetch") From 68ac7aecdfd752bd2e4a67eeb15c1b5e0ba5e6fd Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 11:47:06 -0400 Subject: [PATCH 13/18] More test cleanup --- tests/test_jobs.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 7973936e..ab36a6c9 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -227,14 +227,19 @@ def test_create_environment_no_dupes(session, celery_app, celery_worker): ) environment = portfolio.applications[0].environments[0] + # create_environment is run twice on the same environment create_environment.run(environment_id=environment.id) - environment = session.query(Environment).get(environment.id) + session.refresh(environment) + first_cloud_id = environment.cloud_id create_environment.run(environment_id=environment.id) - environment = session.query(Environment).get(environment.id) + session.refresh(environment) + # The environment's cloud_id was not overwritten in the second run assert environment.cloud_id == first_cloud_id + + # The environment's claim was released assert environment.claimed_until == None @@ -267,7 +272,7 @@ def test_claim_for_update(session): satisfied_claims = [] # Two threads that race to acquire a claim on the same environment. - # SecondThread's claim will be rejected, which will result in a ClaimFailedException. + # SecondThread's claim will be rejected, resulting in a ClaimFailedException. class FirstThread(Thread): def run(self): with claim_for_update(environment): @@ -293,5 +298,5 @@ def test_claim_for_update(session): # Only FirstThread acquired a claim and wrote to satisfied_claims assert satisfied_claims == ["FirstThread"] - # The claim is released as soon as work is done + # The claim is released assert environment.claimed_until is None From 5b7a544403b699857330abd9f7fd224ccba64a64 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 11:51:09 -0400 Subject: [PATCH 14/18] Claim environment in all three provisioning tasks --- atst/jobs.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/atst/jobs.py b/atst/jobs.py index 36a33064..017b98d4 100644 --- a/atst/jobs.py +++ b/atst/jobs.py @@ -74,28 +74,31 @@ def do_create_environment(csp: CloudProviderInterface, environment_id=None): def do_create_atat_admin_user(csp: CloudProviderInterface, environment_id=None): environment = Environments.get(environment_id) - atat_root_creds = csp.root_creds() - atat_remote_root_user = csp.create_atat_admin_user( - atat_root_creds, environment.cloud_id - ) - environment.root_user_info = atat_remote_root_user - db.session.add(environment) - db.session.commit() + with claim_for_update(environment) as environment: + atat_root_creds = csp.root_creds() + + atat_remote_root_user = csp.create_atat_admin_user( + atat_root_creds, environment.cloud_id + ) + environment.root_user_info = atat_remote_root_user + db.session.add(environment) + db.session.commit() def do_create_environment_baseline(csp: CloudProviderInterface, environment_id=None): environment = Environments.get(environment_id) - # ASAP switch to use remote root user for provisioning - atat_remote_root_creds = environment.root_user_info["credentials"] + with claim_for_update(environment) as environment: + # ASAP switch to use remote root user for provisioning + atat_remote_root_creds = environment.root_user_info["credentials"] - baseline_info = csp.create_environment_baseline( - atat_remote_root_creds, environment.cloud_id - ) - environment.baseline_info = baseline_info - db.session.add(environment) - db.session.commit() + baseline_info = csp.create_environment_baseline( + atat_remote_root_creds, environment.cloud_id + ) + environment.baseline_info = baseline_info + db.session.add(environment) + db.session.commit() def do_work(fn, task, csp, **kwargs): From 53e993ea34ade96698c4708e036934c49b60718c Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 11:54:18 -0400 Subject: [PATCH 15/18] Filter out claimed environments --- atst/domain/environments.py | 8 +++++++- atst/models/utils.py | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/atst/domain/environments.py b/atst/domain/environments.py index ace75a3a..41824dc5 100644 --- a/atst/domain/environments.py +++ b/atst/domain/environments.py @@ -1,4 +1,4 @@ -from sqlalchemy import text +from sqlalchemy import text, func, or_ from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm import load_only from typing import List @@ -104,6 +104,12 @@ class Environments(object): .join(CLIN) .filter(CLIN.start_date <= now) .filter(CLIN.end_date > now) + .filter( + or_( + Environment.claimed_until == None, + Environment.claimed_until <= func.now(), + ) + ) # select only these columns .options(load_only("id", "creator_id")) ) diff --git a/atst/models/utils.py b/atst/models/utils.py index bde09c91..a3df73e3 100644 --- a/atst/models/utils.py +++ b/atst/models/utils.py @@ -28,7 +28,7 @@ def claim_for_update(resource, minutes=30): .filter( and_( Model.id == resource.id, - or_(Model.claimed_until == None, Model.claimed_until < func.now()), + or_(Model.claimed_until == None, Model.claimed_until <= func.now()), ) ) .update({"claimed_until": claim_until}, synchronize_session="fetch") From ade7dc08fd3a7740d4fffb4ca3a2539905d6e02e Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 15:27:03 -0400 Subject: [PATCH 16/18] Only fetch environment_id in provisioning queries --- atst/domain/environments.py | 23 +++++++++++++---------- atst/jobs.py | 18 +++++++++--------- tests/test_jobs.py | 4 +--- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/atst/domain/environments.py b/atst/domain/environments.py index 41824dc5..829fe3fa 100644 --- a/atst/domain/environments.py +++ b/atst/domain/environments.py @@ -1,7 +1,7 @@ from sqlalchemy import text, func, or_ from sqlalchemy.orm.exc import NoResultFound -from sqlalchemy.orm import load_only from typing import List +from uuid import UUID from atst.database import db from atst.models import Environment, Application, Portfolio, TaskOrder, CLIN @@ -97,7 +97,7 @@ class Environments(object): @classmethod def base_provision_query(cls, now): return ( - db.session.query(Environment) + db.session.query(Environment.id) .join(Application) .join(Portfolio) .join(TaskOrder) @@ -110,37 +110,40 @@ class Environments(object): Environment.claimed_until <= func.now(), ) ) - # select only these columns - .options(load_only("id", "creator_id")) ) @classmethod - def get_environments_pending_creation(cls, now) -> List[Environment]: + def get_environments_pending_creation(cls, now) -> List[UUID]: """ Any environment with an active CLIN that doesn't yet have a `cloud_id`. """ - return cls.base_provision_query(now).filter(Environment.cloud_id == None).all() + results = ( + cls.base_provision_query(now).filter(Environment.cloud_id == None).all() + ) + return [id_ for id_, in results] @classmethod - def get_environments_pending_atat_user_creation(cls, now) -> List[Environment]: + def get_environments_pending_atat_user_creation(cls, now) -> List[UUID]: """ Any environment with an active CLIN that has a cloud_id but no `root_user_info`. """ - return ( + results = ( cls.base_provision_query(now) .filter(Environment.cloud_id != None) .filter(Environment.root_user_info == text("'null'")) ).all() + return [id_ for id_, in results] @classmethod - def get_environments_pending_baseline_creation(cls, now) -> List[Environment]: + def get_environments_pending_baseline_creation(cls, now) -> List[UUID]: """ Any environment with an active CLIN that has a `cloud_id` and `root_user_info` but no `baseline_info`. """ - return ( + results = ( cls.base_provision_query(now) .filter(Environment.cloud_id != None) .filter(Environment.root_user_info != text("'null'")) .filter(Environment.baseline_info == text("'null'")) ).all() + return [id_ for id_, in results] diff --git a/atst/jobs.py b/atst/jobs.py index 017b98d4..ec639306 100644 --- a/atst/jobs.py +++ b/atst/jobs.py @@ -109,7 +109,7 @@ def do_work(fn, task, csp, **kwargs): @celery.task(bind=True) -def create_environment(self, environment_id=None, atat_user_id=None): +def create_environment(self, environment_id=None): do_work(do_create_environment, self, app.csp.cloud, environment_id=environment_id) @@ -132,23 +132,23 @@ def create_environment_baseline(self, environment_id=None): @celery.task(bind=True) def dispatch_create_environment(self): - for environment in Environments.get_environments_pending_creation(pendulum.now()): - create_environment.delay( - environment_id=environment.id, atat_user_id=environment.creator_id - ) + for environment_id in Environments.get_environments_pending_creation( + pendulum.now() + ): + create_environment.delay(environment_id=environment_id) @celery.task(bind=True) def dispatch_create_atat_admin_user(self): - for environment in Environments.get_environments_pending_atat_user_creation( + for environment_id in Environments.get_environments_pending_atat_user_creation( pendulum.now() ): - create_atat_admin_user.delay(environment_id=environment.id) + create_atat_admin_user.delay(environment_id=environment_id) @celery.task(bind=True) def dispatch_create_environment_baseline(self): - for environment in Environments.get_environments_pending_baseline_creation( + for environment_id in Environments.get_environments_pending_baseline_creation( pendulum.now() ): - create_environment_baseline.delay(environment_id=environment.id) + create_environment_baseline.delay(environment_id=environment_id) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index ab36a6c9..537716e4 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -138,9 +138,7 @@ def test_dispatch_create_environment(session, monkeypatch): dispatch_create_environment.run() - mock.delay.assert_called_once_with( - environment_id=environment.id, atat_user_id=environment.creator_id - ) + mock.delay.assert_called_once_with(environment_id=environment.id) def test_dispatch_create_atat_admin_user(session, monkeypatch): From 3f072bac3cf9a9aa18c2395b647e46681852c163 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 15:28:07 -0400 Subject: [PATCH 17/18] Remove unused users --- tests/test_jobs.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 537716e4..fd6efc44 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -74,7 +74,6 @@ def csp(): def test_create_environment_job(session, csp): - user = UserFactory.create() environment = EnvironmentFactory.create() do_create_environment(csp, environment.id) @@ -87,7 +86,6 @@ def test_create_environment_job(session, csp): def test_create_environment_job_is_idempotent(csp, session): - user = UserFactory.create() environment = EnvironmentFactory.create(cloud_id=uuid4().hex) do_create_environment(csp, environment.id) From 343e5a52acbb40a62d66ea74822a2450596a9786 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 17 Sep 2019 15:29:33 -0400 Subject: [PATCH 18/18] Use session.refresh in tests --- tests/test_jobs.py | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index fd6efc44..09c3e645 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -76,13 +76,9 @@ def csp(): def test_create_environment_job(session, csp): environment = EnvironmentFactory.create() do_create_environment(csp, environment.id) + session.refresh(environment) - environment_id = environment.id - del environment - - updated_environment = session.query(Environment).get(environment_id) - - assert updated_environment.cloud_id + assert environment.cloud_id def test_create_environment_job_is_idempotent(csp, session): @@ -95,12 +91,9 @@ def test_create_environment_job_is_idempotent(csp, session): def test_create_atat_admin_user(csp, session): environment = EnvironmentFactory.create(cloud_id="something") do_create_atat_admin_user(csp, environment.id) + session.refresh(environment) - environment_id = environment.id - del environment - updated_environment = session.query(Environment).get(environment_id) - - assert updated_environment.root_user_info + assert environment.root_user_info def test_create_environment_baseline(csp, session): @@ -108,12 +101,9 @@ def test_create_environment_baseline(csp, session): root_user_info={"credentials": csp.root_creds()} ) do_create_environment_baseline(csp, environment.id) + session.refresh(environment) - environment_id = environment.id - del environment - updated_environment = session.query(Environment).get(environment_id) - - assert updated_environment.baseline_info + assert environment.baseline_info def test_dispatch_create_environment(session, monkeypatch):