Implement simple locking system for environments

This commit is contained in:
richard-dds 2019-09-13 15:50:51 -04:00
parent 25a78964df
commit 2bbe974755
5 changed files with 198 additions and 28 deletions

View File

@ -0,0 +1,28 @@
"""add Environment claimed_at
Revision ID: 691b04ecd85e
Revises: cfab6c8243cb
Create Date: 2019-09-13 11:51:24.677399
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '691b04ecd85e' # pragma: allowlist secret
down_revision = 'cfab6c8243cb' # pragma: allowlist secret
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('environments', sa.Column('claimed_at', sa.TIMESTAMP(timezone=True), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('environments', 'claimed_at')
# ### end Alembic commands ###

View File

@ -1,7 +1,8 @@
from sqlalchemy import text from sqlalchemy import text, func
from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import load_only from sqlalchemy.orm import load_only
from typing import List from typing import List
from contextlib import contextmanager
from atst.database import db from atst.database import db
from atst.models import Environment, Application, Portfolio, TaskOrder, CLIN from atst.models import Environment, Application, Portfolio, TaskOrder, CLIN

View File

@ -1,12 +1,16 @@
from flask import current_app as app from flask import current_app as app
import pendulum import pendulum
from celery.utils.log import get_task_logger
from sqlalchemy import func, orm, sql
from sqlalchemy import update
from atst.database import db from atst.database import db
from atst.queue import celery from atst.queue import celery
from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure
from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException
from atst.domain.environments import Environments from atst.domain.environments import Environments
from atst.domain.users import Users
logger = get_task_logger(__name__)
class RecordEnvironmentFailure(celery.Task): class RecordEnvironmentFailure(celery.Task):
@ -44,32 +48,64 @@ def send_notification_mail(recipients, subject, body):
app.mailer.send(recipients, subject, body) app.mailer.send(recipients, subject, body)
def do_create_environment( from contextlib import contextmanager
csp: CloudProviderInterface, environment_id=None, atat_user_id=None
):
class ClaimFailedException(Exception):
pass
@contextmanager
def claim_for_update(resource):
rows_updated = (
db.session.query(resource.__class__)
.filter_by(id=resource.id, claimed_at=None)
.update({"claimed_at": func.now()}, synchronize_session="fetch")
)
if rows_updated < 1:
raise ClaimFailedException(
f"Could not acquire claim for {resource.__class__.__name__} {resource.id}."
)
claimed = db.session.query(resource.__class__).filter_by(id=resource.id).one()
try:
yield claimed
finally:
db.session.query(resource.__class__).filter(
resource.__class__.id == resource.id
).filter(resource.__class__.claimed_at != None).update(
{"claimed_at": sql.null()}, synchronize_session="fetch"
)
def do_create_environment(csp: CloudProviderInterface, environment_id=None):
logger.info(environment_id)
environment = Environments.get(environment_id) environment = Environments.get(environment_id)
if environment.cloud_id is not None: with claim_for_update(environment) as environment:
# TODO: Return value for this?
return
user = Users.get(atat_user_id) if environment.cloud_id is not None:
# TODO: Return value for this?
return
# we'll need to do some checking in this job for cases where it's retrying user = environment.creator
# when a failure occured after some successful steps
# (e.g. if environment.cloud_id is not None, then we can skip first step)
# credentials either from a given user or pulled from config? # we'll need to do some checking in this job for cases where it's retrying
# if using global creds, do we need to log what user authorized action? # when a failure occured after some successful steps
atat_root_creds = csp.root_creds() # (e.g. if environment.cloud_id is not None, then we can skip first step)
# user is needed because baseline root account in the environment will # credentials either from a given user or pulled from config?
# be assigned to the requesting user, open question how to handle duplicate # if using global creds, do we need to log what user authorized action?
# email addresses across new environments atat_root_creds = csp.root_creds()
csp_environment_id = csp.create_environment(atat_root_creds, user, environment)
environment.cloud_id = csp_environment_id # user is needed because baseline root account in the environment will
db.session.add(environment) # be assigned to the requesting user, open question how to handle duplicate
db.session.commit() # email addresses across new environments
csp_environment_id = csp.create_environment(atat_root_creds, user, environment)
environment.cloud_id = csp_environment_id
db.session.add(environment)
db.session.commit()
def do_create_atat_admin_user(csp: CloudProviderInterface, environment_id=None): def do_create_atat_admin_user(csp: CloudProviderInterface, environment_id=None):
@ -107,17 +143,24 @@ def do_work(fn, task, csp, **kwargs):
@celery.task(bind=True) @celery.task(bind=True)
def create_environment(self, environment_id=None, atat_user_id=None): def create_environment(self, environment_id=None, atat_user_id=None):
do_work(do_create_environment, self, app.csp.cloud, **kwargs) do_work(do_create_environment, self, app.csp.cloud, environment_id=environment_id)
@celery.task(bind=True) @celery.task(bind=True)
def create_atat_admin_user(self, environment_id=None): def create_atat_admin_user(self, environment_id=None):
do_work(do_create_atat_admin_user, self, app.csp.cloud, **kwargs) do_work(
do_create_atat_admin_user, self, app.csp.cloud, environment_id=environment_id
)
@celery.task(bind=True) @celery.task(bind=True)
def create_environment_baseline(self, environment_id=None): def create_environment_baseline(self, environment_id=None):
do_work(do_create_environment_baseline, self, app.csp.cloud, **kwargs) do_work(
do_create_environment_baseline,
self,
app.csp.cloud,
environment_id=environment_id,
)
@celery.task(bind=True) @celery.task(bind=True)

View File

@ -1,11 +1,13 @@
from sqlalchemy import Column, ForeignKey, String from sqlalchemy import Column, ForeignKey, String, TIMESTAMP
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.dialects.postgresql import JSONB
from enum import Enum from enum import Enum
import contextlib
from atst.models import Base from atst.models import Base
from atst.models.types import Id from atst.models.types import Id
from atst.models import mixins from atst.models import mixins
from atst.database import db
class Environment( class Environment(
@ -29,6 +31,8 @@ class Environment(
root_user_info = Column(JSONB) root_user_info = Column(JSONB)
baseline_info = Column(JSONB) baseline_info = Column(JSONB)
claimed_at = Column(TIMESTAMP(timezone=True))
job_failures = relationship("EnvironmentJobFailure") job_failures = relationship("EnvironmentJobFailure")
class ProvisioningStatus(Enum): class ProvisioningStatus(Enum):

View File

@ -14,6 +14,9 @@ from atst.jobs import (
dispatch_create_environment, dispatch_create_environment,
dispatch_create_atat_admin_user, dispatch_create_atat_admin_user,
dispatch_create_environment_baseline, dispatch_create_environment_baseline,
create_environment,
claim_for_update,
ClaimFailedException,
) )
from tests.factories import ( from tests.factories import (
EnvironmentFactory, EnvironmentFactory,
@ -22,6 +25,9 @@ from tests.factories import (
PortfolioFactory, PortfolioFactory,
) )
from threading import Thread
from time import sleep
def test_environment_job_failure(celery_app, celery_worker): def test_environment_job_failure(celery_app, celery_worker):
@celery_app.task(bind=True, base=RecordEnvironmentFailure) @celery_app.task(bind=True, base=RecordEnvironmentFailure)
@ -73,7 +79,7 @@ def csp():
def test_create_environment_job(session, csp): def test_create_environment_job(session, csp):
user = UserFactory.create() user = UserFactory.create()
environment = EnvironmentFactory.create() environment = EnvironmentFactory.create()
do_create_environment(csp, environment.id, user.id) do_create_environment(csp, environment.id)
environment_id = environment.id environment_id = environment.id
del environment del environment
@ -86,7 +92,7 @@ def test_create_environment_job(session, csp):
def test_create_environment_job_is_idempotent(csp, session): def test_create_environment_job_is_idempotent(csp, session):
user = UserFactory.create() user = UserFactory.create()
environment = EnvironmentFactory.create(cloud_id=uuid4().hex) environment = EnvironmentFactory.create(cloud_id=uuid4().hex)
do_create_environment(csp, environment.id, user.id) do_create_environment(csp, environment.id)
csp.create_environment.assert_not_called() csp.create_environment.assert_not_called()
@ -196,3 +202,91 @@ def test_dispatch_create_environment_baseline(session, monkeypatch):
dispatch_create_environment_baseline.run() dispatch_create_environment_baseline.run()
mock.delay.assert_called_once_with(environment_id=environment.id) mock.delay.assert_called_once_with(environment_id=environment.id)
def test_create_environment_no_dupes(session, celery_app, celery_worker):
portfolio = PortfolioFactory.create(
applications=[
{
"environments": [
{
"cloud_id": uuid4().hex,
"root_user_info": {},
"baseline_info": None,
}
]
}
],
task_orders=[
{
"create_clins": [
{
"start_date": pendulum.now().subtract(days=1),
"end_date": pendulum.now().add(days=1),
}
]
}
],
)
environment = portfolio.applications[0].environments[0]
create_environment.run(environment_id=environment.id)
environment = session.query(Environment).get(environment.id)
first_cloud_id = environment.cloud_id
create_environment.run(environment_id=environment.id)
environment = session.query(Environment).get(environment.id)
assert environment.cloud_id == first_cloud_id
assert environment.claimed_at == None
def test_claim(session):
portfolio = PortfolioFactory.create(
applications=[
{
"environments": [
{
"cloud_id": uuid4().hex,
"root_user_info": {},
"baseline_info": None,
}
]
}
],
task_orders=[
{
"create_clins": [
{
"start_date": pendulum.now().subtract(days=1),
"end_date": pendulum.now().add(days=1),
}
]
}
],
)
environment = portfolio.applications[0].environments[0]
events = []
class FirstThread(Thread):
def run(self):
with claim_for_update(environment):
events.append("first")
class SecondThread(Thread):
def run(self):
try:
with claim_for_update(environment):
events.append("second")
except Exception:
pass
t1 = FirstThread()
t2 = SecondThread()
t1.start()
t2.start()
t1.join()
t2.join()
assert events == ["first"]