diff --git a/alembic/versions/691b04ecd85e_add_environment_claimed_at.py b/alembic/versions/691b04ecd85e_add_environment_claimed_at.py new file mode 100644 index 00000000..2c0a21d9 --- /dev/null +++ b/alembic/versions/691b04ecd85e_add_environment_claimed_at.py @@ -0,0 +1,28 @@ +"""add Environment claimed_at + +Revision ID: 691b04ecd85e +Revises: cfab6c8243cb +Create Date: 2019-09-13 11:51:24.677399 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '691b04ecd85e' # pragma: allowlist secret +down_revision = 'cfab6c8243cb' # pragma: allowlist secret +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('environments', sa.Column('claimed_at', sa.TIMESTAMP(timezone=True), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('environments', 'claimed_at') + # ### end Alembic commands ### diff --git a/atst/domain/environments.py b/atst/domain/environments.py index ace75a3a..2d9eef35 100644 --- a/atst/domain/environments.py +++ b/atst/domain/environments.py @@ -1,7 +1,8 @@ -from sqlalchemy import text +from sqlalchemy import text, func from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm import load_only from typing import List +from contextlib import contextmanager from atst.database import db from atst.models import Environment, Application, Portfolio, TaskOrder, CLIN diff --git a/atst/jobs.py b/atst/jobs.py index 050cb4ca..cad4877b 100644 --- a/atst/jobs.py +++ b/atst/jobs.py @@ -1,12 +1,16 @@ from flask import current_app as app import pendulum +from celery.utils.log import get_task_logger +from sqlalchemy import func, orm, sql +from sqlalchemy import update from atst.database import db from atst.queue import celery from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException from atst.domain.environments import Environments -from atst.domain.users import Users + +logger = get_task_logger(__name__) class RecordEnvironmentFailure(celery.Task): @@ -44,32 +48,64 @@ def send_notification_mail(recipients, subject, body): app.mailer.send(recipients, subject, body) -def do_create_environment( - csp: CloudProviderInterface, environment_id=None, atat_user_id=None -): +from contextlib import contextmanager + + +class ClaimFailedException(Exception): + pass + + +@contextmanager +def claim_for_update(resource): + rows_updated = ( + db.session.query(resource.__class__) + .filter_by(id=resource.id, claimed_at=None) + .update({"claimed_at": func.now()}, synchronize_session="fetch") + ) + if rows_updated < 1: + raise ClaimFailedException( + f"Could not acquire claim for {resource.__class__.__name__} {resource.id}." + ) + + claimed = db.session.query(resource.__class__).filter_by(id=resource.id).one() + + try: + yield claimed + finally: + db.session.query(resource.__class__).filter( + resource.__class__.id == resource.id + ).filter(resource.__class__.claimed_at != None).update( + {"claimed_at": sql.null()}, synchronize_session="fetch" + ) + + +def do_create_environment(csp: CloudProviderInterface, environment_id=None): + logger.info(environment_id) environment = Environments.get(environment_id) - if environment.cloud_id is not None: - # TODO: Return value for this? - return + with claim_for_update(environment) as environment: - user = Users.get(atat_user_id) + if environment.cloud_id is not None: + # TODO: Return value for this? + return - # we'll need to do some checking in this job for cases where it's retrying - # when a failure occured after some successful steps - # (e.g. if environment.cloud_id is not None, then we can skip first step) + user = environment.creator - # credentials either from a given user or pulled from config? - # if using global creds, do we need to log what user authorized action? - atat_root_creds = csp.root_creds() + # we'll need to do some checking in this job for cases where it's retrying + # when a failure occured after some successful steps + # (e.g. if environment.cloud_id is not None, then we can skip first step) - # user is needed because baseline root account in the environment will - # be assigned to the requesting user, open question how to handle duplicate - # email addresses across new environments - csp_environment_id = csp.create_environment(atat_root_creds, user, environment) - environment.cloud_id = csp_environment_id - db.session.add(environment) - db.session.commit() + # credentials either from a given user or pulled from config? + # if using global creds, do we need to log what user authorized action? + atat_root_creds = csp.root_creds() + + # user is needed because baseline root account in the environment will + # be assigned to the requesting user, open question how to handle duplicate + # email addresses across new environments + csp_environment_id = csp.create_environment(atat_root_creds, user, environment) + environment.cloud_id = csp_environment_id + db.session.add(environment) + db.session.commit() def do_create_atat_admin_user(csp: CloudProviderInterface, environment_id=None): @@ -107,17 +143,24 @@ def do_work(fn, task, csp, **kwargs): @celery.task(bind=True) def create_environment(self, environment_id=None, atat_user_id=None): - do_work(do_create_environment, self, app.csp.cloud, **kwargs) + do_work(do_create_environment, self, app.csp.cloud, environment_id=environment_id) @celery.task(bind=True) def create_atat_admin_user(self, environment_id=None): - do_work(do_create_atat_admin_user, self, app.csp.cloud, **kwargs) + do_work( + do_create_atat_admin_user, self, app.csp.cloud, environment_id=environment_id + ) @celery.task(bind=True) def create_environment_baseline(self, environment_id=None): - do_work(do_create_environment_baseline, self, app.csp.cloud, **kwargs) + do_work( + do_create_environment_baseline, + self, + app.csp.cloud, + environment_id=environment_id, + ) @celery.task(bind=True) diff --git a/atst/models/environment.py b/atst/models/environment.py index fcdea074..a6774434 100644 --- a/atst/models/environment.py +++ b/atst/models/environment.py @@ -1,11 +1,13 @@ -from sqlalchemy import Column, ForeignKey, String +from sqlalchemy import Column, ForeignKey, String, TIMESTAMP from sqlalchemy.orm import relationship from sqlalchemy.dialects.postgresql import JSONB from enum import Enum +import contextlib from atst.models import Base from atst.models.types import Id from atst.models import mixins +from atst.database import db class Environment( @@ -29,6 +31,8 @@ class Environment( root_user_info = Column(JSONB) baseline_info = Column(JSONB) + claimed_at = Column(TIMESTAMP(timezone=True)) + job_failures = relationship("EnvironmentJobFailure") class ProvisioningStatus(Enum): diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 749b5ad5..079114c8 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -14,6 +14,9 @@ from atst.jobs import ( dispatch_create_environment, dispatch_create_atat_admin_user, dispatch_create_environment_baseline, + create_environment, + claim_for_update, + ClaimFailedException, ) from tests.factories import ( EnvironmentFactory, @@ -22,6 +25,9 @@ from tests.factories import ( PortfolioFactory, ) +from threading import Thread +from time import sleep + def test_environment_job_failure(celery_app, celery_worker): @celery_app.task(bind=True, base=RecordEnvironmentFailure) @@ -73,7 +79,7 @@ def csp(): def test_create_environment_job(session, csp): user = UserFactory.create() environment = EnvironmentFactory.create() - do_create_environment(csp, environment.id, user.id) + do_create_environment(csp, environment.id) environment_id = environment.id del environment @@ -86,7 +92,7 @@ def test_create_environment_job(session, csp): def test_create_environment_job_is_idempotent(csp, session): user = UserFactory.create() environment = EnvironmentFactory.create(cloud_id=uuid4().hex) - do_create_environment(csp, environment.id, user.id) + do_create_environment(csp, environment.id) csp.create_environment.assert_not_called() @@ -196,3 +202,91 @@ def test_dispatch_create_environment_baseline(session, monkeypatch): dispatch_create_environment_baseline.run() mock.delay.assert_called_once_with(environment_id=environment.id) + + +def test_create_environment_no_dupes(session, celery_app, celery_worker): + portfolio = PortfolioFactory.create( + applications=[ + { + "environments": [ + { + "cloud_id": uuid4().hex, + "root_user_info": {}, + "baseline_info": None, + } + ] + } + ], + task_orders=[ + { + "create_clins": [ + { + "start_date": pendulum.now().subtract(days=1), + "end_date": pendulum.now().add(days=1), + } + ] + } + ], + ) + environment = portfolio.applications[0].environments[0] + + create_environment.run(environment_id=environment.id) + environment = session.query(Environment).get(environment.id) + first_cloud_id = environment.cloud_id + + create_environment.run(environment_id=environment.id) + environment = session.query(Environment).get(environment.id) + + assert environment.cloud_id == first_cloud_id + assert environment.claimed_at == None + + +def test_claim(session): + portfolio = PortfolioFactory.create( + applications=[ + { + "environments": [ + { + "cloud_id": uuid4().hex, + "root_user_info": {}, + "baseline_info": None, + } + ] + } + ], + task_orders=[ + { + "create_clins": [ + { + "start_date": pendulum.now().subtract(days=1), + "end_date": pendulum.now().add(days=1), + } + ] + } + ], + ) + environment = portfolio.applications[0].environments[0] + + events = [] + + class FirstThread(Thread): + def run(self): + with claim_for_update(environment): + events.append("first") + + class SecondThread(Thread): + def run(self): + try: + with claim_for_update(environment): + events.append("second") + except Exception: + pass + + t1 = FirstThread() + t2 = SecondThread() + t1.start() + t2.start() + t1.join() + t2.join() + + assert events == ["first"]