Merge pull request #1074 from dod-ccpo/lock-environments

Implement simple locking system for environments
This commit is contained in:
richard-dds
2019-09-17 15:44:25 -04:00
committed by GitHub
7 changed files with 281 additions and 84 deletions

View File

@@ -0,0 +1,28 @@
"""add Environment claimed_until
Revision ID: 691b04ecd85e
Revises: cfab6c8243cb
Create Date: 2019-09-13 11:51:24.677399
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '691b04ecd85e' # pragma: allowlist secret
down_revision = '502e79c55d2d' # pragma: allowlist secret
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('environments', sa.Column('claimed_until', sa.TIMESTAMP(timezone=True), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('environments', 'claimed_until')
# ### end Alembic commands ###

View File

@@ -1,7 +1,7 @@
from sqlalchemy import text from sqlalchemy import text, func, or_
from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import load_only
from typing import List from typing import List
from uuid import UUID
from atst.database import db from atst.database import db
from atst.models import Environment, Application, Portfolio, TaskOrder, CLIN from atst.models import Environment, Application, Portfolio, TaskOrder, CLIN
@@ -97,44 +97,53 @@ class Environments(object):
@classmethod @classmethod
def base_provision_query(cls, now): def base_provision_query(cls, now):
return ( return (
db.session.query(Environment) db.session.query(Environment.id)
.join(Application) .join(Application)
.join(Portfolio) .join(Portfolio)
.join(TaskOrder) .join(TaskOrder)
.join(CLIN) .join(CLIN)
.filter(CLIN.start_date <= now) .filter(CLIN.start_date <= now)
.filter(CLIN.end_date > now) .filter(CLIN.end_date > now)
# select only these columns .filter(
.options(load_only("id", "creator_id")) or_(
Environment.claimed_until == None,
Environment.claimed_until <= func.now(),
)
)
) )
@classmethod @classmethod
def get_environments_pending_creation(cls, now) -> List[Environment]: def get_environments_pending_creation(cls, now) -> List[UUID]:
""" """
Any environment with an active CLIN that doesn't yet have a `cloud_id`. Any environment with an active CLIN that doesn't yet have a `cloud_id`.
""" """
return cls.base_provision_query(now).filter(Environment.cloud_id == None).all() results = (
cls.base_provision_query(now).filter(Environment.cloud_id == None).all()
)
return [id_ for id_, in results]
@classmethod @classmethod
def get_environments_pending_atat_user_creation(cls, now) -> List[Environment]: def get_environments_pending_atat_user_creation(cls, now) -> List[UUID]:
""" """
Any environment with an active CLIN that has a cloud_id but no `root_user_info`. Any environment with an active CLIN that has a cloud_id but no `root_user_info`.
""" """
return ( results = (
cls.base_provision_query(now) cls.base_provision_query(now)
.filter(Environment.cloud_id != None) .filter(Environment.cloud_id != None)
.filter(Environment.root_user_info == text("'null'")) .filter(Environment.root_user_info == text("'null'"))
).all() ).all()
return [id_ for id_, in results]
@classmethod @classmethod
def get_environments_pending_baseline_creation(cls, now) -> List[Environment]: def get_environments_pending_baseline_creation(cls, now) -> List[UUID]:
""" """
Any environment with an active CLIN that has a `cloud_id` and `root_user_info` Any environment with an active CLIN that has a `cloud_id` and `root_user_info`
but no `baseline_info`. but no `baseline_info`.
""" """
return ( results = (
cls.base_provision_query(now) cls.base_provision_query(now)
.filter(Environment.cloud_id != None) .filter(Environment.cloud_id != None)
.filter(Environment.root_user_info != text("'null'")) .filter(Environment.root_user_info != text("'null'"))
.filter(Environment.baseline_info == text("'null'")) .filter(Environment.baseline_info == text("'null'"))
).all() ).all()
return [id_ for id_, in results]

View File

@@ -44,3 +44,12 @@ class NoAccessError(Exception):
@property @property
def message(self): def message(self):
return "Route for {} cannot be accessed".format(self.resource_name) return "Route for {} cannot be accessed".format(self.resource_name)
class ClaimFailedException(Exception):
def __init__(self, resource):
self.resource = resource
message = (
f"Could not acquire claim for {resource.__class__.__name__} {resource.id}."
)
super().__init__(message)

View File

@@ -6,7 +6,7 @@ from atst.queue import celery
from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure
from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException
from atst.domain.environments import Environments from atst.domain.environments import Environments
from atst.domain.users import Users from atst.models.utils import claim_for_update
class RecordEnvironmentFailure(celery.Task): class RecordEnvironmentFailure(celery.Task):
@@ -44,58 +44,61 @@ def send_notification_mail(recipients, subject, body):
app.mailer.send(recipients, subject, body) app.mailer.send(recipients, subject, body)
def do_create_environment( def do_create_environment(csp: CloudProviderInterface, environment_id=None):
csp: CloudProviderInterface, environment_id=None, atat_user_id=None
):
environment = Environments.get(environment_id) environment = Environments.get(environment_id)
if environment.cloud_id is not None: with claim_for_update(environment) as environment:
# TODO: Return value for this?
return
user = Users.get(atat_user_id) if environment.cloud_id is not None:
# TODO: Return value for this?
return
# we'll need to do some checking in this job for cases where it's retrying user = environment.creator
# when a failure occured after some successful steps
# (e.g. if environment.cloud_id is not None, then we can skip first step)
# credentials either from a given user or pulled from config? # we'll need to do some checking in this job for cases where it's retrying
# if using global creds, do we need to log what user authorized action? # when a failure occured after some successful steps
atat_root_creds = csp.root_creds() # (e.g. if environment.cloud_id is not None, then we can skip first step)
# user is needed because baseline root account in the environment will # credentials either from a given user or pulled from config?
# be assigned to the requesting user, open question how to handle duplicate # if using global creds, do we need to log what user authorized action?
# email addresses across new environments atat_root_creds = csp.root_creds()
csp_environment_id = csp.create_environment(atat_root_creds, user, environment)
environment.cloud_id = csp_environment_id # user is needed because baseline root account in the environment will
db.session.add(environment) # be assigned to the requesting user, open question how to handle duplicate
db.session.commit() # email addresses across new environments
csp_environment_id = csp.create_environment(atat_root_creds, user, environment)
environment.cloud_id = csp_environment_id
db.session.add(environment)
db.session.commit()
def do_create_atat_admin_user(csp: CloudProviderInterface, environment_id=None): def do_create_atat_admin_user(csp: CloudProviderInterface, environment_id=None):
environment = Environments.get(environment_id) environment = Environments.get(environment_id)
atat_root_creds = csp.root_creds()
atat_remote_root_user = csp.create_atat_admin_user( with claim_for_update(environment) as environment:
atat_root_creds, environment.cloud_id atat_root_creds = csp.root_creds()
)
environment.root_user_info = atat_remote_root_user atat_remote_root_user = csp.create_atat_admin_user(
db.session.add(environment) atat_root_creds, environment.cloud_id
db.session.commit() )
environment.root_user_info = atat_remote_root_user
db.session.add(environment)
db.session.commit()
def do_create_environment_baseline(csp: CloudProviderInterface, environment_id=None): def do_create_environment_baseline(csp: CloudProviderInterface, environment_id=None):
environment = Environments.get(environment_id) environment = Environments.get(environment_id)
# ASAP switch to use remote root user for provisioning with claim_for_update(environment) as environment:
atat_remote_root_creds = environment.root_user_info["credentials"] # ASAP switch to use remote root user for provisioning
atat_remote_root_creds = environment.root_user_info["credentials"]
baseline_info = csp.create_environment_baseline( baseline_info = csp.create_environment_baseline(
atat_remote_root_creds, environment.cloud_id atat_remote_root_creds, environment.cloud_id
) )
environment.baseline_info = baseline_info environment.baseline_info = baseline_info
db.session.add(environment) db.session.add(environment)
db.session.commit() db.session.commit()
def do_work(fn, task, csp, **kwargs): def do_work(fn, task, csp, **kwargs):
@@ -106,39 +109,46 @@ def do_work(fn, task, csp, **kwargs):
@celery.task(bind=True) @celery.task(bind=True)
def create_environment(self, environment_id=None, atat_user_id=None): def create_environment(self, environment_id=None):
do_work(do_create_environment, self, app.csp.cloud, **kwargs) do_work(do_create_environment, self, app.csp.cloud, environment_id=environment_id)
@celery.task(bind=True) @celery.task(bind=True)
def create_atat_admin_user(self, environment_id=None): def create_atat_admin_user(self, environment_id=None):
do_work(do_create_atat_admin_user, self, app.csp.cloud, **kwargs) do_work(
do_create_atat_admin_user, self, app.csp.cloud, environment_id=environment_id
)
@celery.task(bind=True) @celery.task(bind=True)
def create_environment_baseline(self, environment_id=None): def create_environment_baseline(self, environment_id=None):
do_work(do_create_environment_baseline, self, app.csp.cloud, **kwargs) do_work(
do_create_environment_baseline,
self,
app.csp.cloud,
environment_id=environment_id,
)
@celery.task(bind=True) @celery.task(bind=True)
def dispatch_create_environment(self): def dispatch_create_environment(self):
for environment in Environments.get_environments_pending_creation(pendulum.now()): for environment_id in Environments.get_environments_pending_creation(
create_environment.delay( pendulum.now()
environment_id=environment.id, atat_user_id=environment.creator_id ):
) create_environment.delay(environment_id=environment_id)
@celery.task(bind=True) @celery.task(bind=True)
def dispatch_create_atat_admin_user(self): def dispatch_create_atat_admin_user(self):
for environment in Environments.get_environments_pending_atat_user_creation( for environment_id in Environments.get_environments_pending_atat_user_creation(
pendulum.now() pendulum.now()
): ):
create_atat_admin_user.delay(environment_id=environment.id) create_atat_admin_user.delay(environment_id=environment_id)
@celery.task(bind=True) @celery.task(bind=True)
def dispatch_create_environment_baseline(self): def dispatch_create_environment_baseline(self):
for environment in Environments.get_environments_pending_baseline_creation( for environment_id in Environments.get_environments_pending_baseline_creation(
pendulum.now() pendulum.now()
): ):
create_environment_baseline.delay(environment_id=environment.id) create_environment_baseline.delay(environment_id=environment_id)

View File

@@ -1,4 +1,4 @@
from sqlalchemy import Column, ForeignKey, String from sqlalchemy import Column, ForeignKey, String, TIMESTAMP
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.dialects.postgresql import JSONB
from enum import Enum from enum import Enum
@@ -29,6 +29,8 @@ class Environment(
root_user_info = Column(JSONB) root_user_info = Column(JSONB)
baseline_info = Column(JSONB) baseline_info = Column(JSONB)
claimed_until = Column(TIMESTAMP(timezone=True))
job_failures = relationship("EnvironmentJobFailure") job_failures = relationship("EnvironmentJobFailure")
class ProvisioningStatus(Enum): class ProvisioningStatus(Enum):

49
atst/models/utils.py Normal file
View File

@@ -0,0 +1,49 @@
from sqlalchemy import func, sql, Interval, and_, or_
from contextlib import contextmanager
from atst.database import db
from atst.domain.exceptions import ClaimFailedException
@contextmanager
def claim_for_update(resource, minutes=30):
"""
Claim a mutually exclusive expiring hold on a resource.
Uses the database as a central source of time in case the server clocks have drifted.
Args:
resource: A SQLAlchemy model instance with a `claimed_until` attribute.
minutes: The maximum amount of time, in minutes, to hold the claim.
"""
Model = resource.__class__
claim_until = func.now() + func.cast(
sql.functions.concat(minutes, " MINUTES"), Interval
)
# Optimistically query for and update the resource in question. If it's
# already claimed, `rows_updated` will be 0 and we can give up.
rows_updated = (
db.session.query(Model)
.filter(
and_(
Model.id == resource.id,
or_(Model.claimed_until == None, Model.claimed_until <= func.now()),
)
)
.update({"claimed_until": claim_until}, synchronize_session="fetch")
)
if rows_updated < 1:
raise ClaimFailedException(resource)
# Fetch the claimed resource
claimed = db.session.query(Model).filter_by(id=resource.id).one()
try:
# Give the resource to the caller.
yield claimed
finally:
# Release the claim.
db.session.query(Model).filter(Model.id == resource.id).filter(
Model.claimed_until != None
).update({"claimed_until": None}, synchronize_session="fetch")

View File

@@ -2,6 +2,7 @@ import pendulum
import pytest import pytest
from uuid import uuid4 from uuid import uuid4
from unittest.mock import Mock from unittest.mock import Mock
from threading import Thread
from atst.models import Environment from atst.models import Environment
from atst.domain.csp.cloud import MockCloudProvider from atst.domain.csp.cloud import MockCloudProvider
@@ -14,7 +15,10 @@ from atst.jobs import (
dispatch_create_environment, dispatch_create_environment,
dispatch_create_atat_admin_user, dispatch_create_atat_admin_user,
dispatch_create_environment_baseline, dispatch_create_environment_baseline,
create_environment,
) )
from atst.models.utils import claim_for_update
from atst.domain.exceptions import ClaimFailedException
from tests.factories import ( from tests.factories import (
EnvironmentFactory, EnvironmentFactory,
EnvironmentRoleFactory, EnvironmentRoleFactory,
@@ -62,7 +66,6 @@ def test_environment_role_job_failure(celery_app, celery_worker):
now = pendulum.now() now = pendulum.now()
yesterday = now.subtract(days=1) yesterday = now.subtract(days=1)
tomorrow = now.add(days=1) tomorrow = now.add(days=1)
from atst.domain.environments import Environments
@pytest.fixture(autouse=True, scope="function") @pytest.fixture(autouse=True, scope="function")
@@ -71,22 +74,16 @@ def csp():
def test_create_environment_job(session, csp): def test_create_environment_job(session, csp):
user = UserFactory.create()
environment = EnvironmentFactory.create() environment = EnvironmentFactory.create()
do_create_environment(csp, environment.id, user.id) do_create_environment(csp, environment.id)
session.refresh(environment)
environment_id = environment.id assert environment.cloud_id
del environment
updated_environment = session.query(Environment).get(environment_id)
assert updated_environment.cloud_id
def test_create_environment_job_is_idempotent(csp, session): def test_create_environment_job_is_idempotent(csp, session):
user = UserFactory.create()
environment = EnvironmentFactory.create(cloud_id=uuid4().hex) environment = EnvironmentFactory.create(cloud_id=uuid4().hex)
do_create_environment(csp, environment.id, user.id) do_create_environment(csp, environment.id)
csp.create_environment.assert_not_called() csp.create_environment.assert_not_called()
@@ -94,12 +91,9 @@ def test_create_environment_job_is_idempotent(csp, session):
def test_create_atat_admin_user(csp, session): def test_create_atat_admin_user(csp, session):
environment = EnvironmentFactory.create(cloud_id="something") environment = EnvironmentFactory.create(cloud_id="something")
do_create_atat_admin_user(csp, environment.id) do_create_atat_admin_user(csp, environment.id)
session.refresh(environment)
environment_id = environment.id assert environment.root_user_info
del environment
updated_environment = session.query(Environment).get(environment_id)
assert updated_environment.root_user_info
def test_create_environment_baseline(csp, session): def test_create_environment_baseline(csp, session):
@@ -107,12 +101,9 @@ def test_create_environment_baseline(csp, session):
root_user_info={"credentials": csp.root_creds()} root_user_info={"credentials": csp.root_creds()}
) )
do_create_environment_baseline(csp, environment.id) do_create_environment_baseline(csp, environment.id)
session.refresh(environment)
environment_id = environment.id assert environment.baseline_info
del environment
updated_environment = session.query(Environment).get(environment_id)
assert updated_environment.baseline_info
def test_dispatch_create_environment(session, monkeypatch): def test_dispatch_create_environment(session, monkeypatch):
@@ -135,9 +126,7 @@ def test_dispatch_create_environment(session, monkeypatch):
dispatch_create_environment.run() dispatch_create_environment.run()
mock.delay.assert_called_once_with( mock.delay.assert_called_once_with(environment_id=environment.id)
environment_id=environment.id, atat_user_id=environment.creator_id
)
def test_dispatch_create_atat_admin_user(session, monkeypatch): def test_dispatch_create_atat_admin_user(session, monkeypatch):
@@ -196,3 +185,104 @@ def test_dispatch_create_environment_baseline(session, monkeypatch):
dispatch_create_environment_baseline.run() dispatch_create_environment_baseline.run()
mock.delay.assert_called_once_with(environment_id=environment.id) mock.delay.assert_called_once_with(environment_id=environment.id)
def test_create_environment_no_dupes(session, celery_app, celery_worker):
portfolio = PortfolioFactory.create(
applications=[
{
"environments": [
{
"cloud_id": uuid4().hex,
"root_user_info": {},
"baseline_info": None,
}
]
}
],
task_orders=[
{
"create_clins": [
{
"start_date": pendulum.now().subtract(days=1),
"end_date": pendulum.now().add(days=1),
}
]
}
],
)
environment = portfolio.applications[0].environments[0]
# create_environment is run twice on the same environment
create_environment.run(environment_id=environment.id)
session.refresh(environment)
first_cloud_id = environment.cloud_id
create_environment.run(environment_id=environment.id)
session.refresh(environment)
# The environment's cloud_id was not overwritten in the second run
assert environment.cloud_id == first_cloud_id
# The environment's claim was released
assert environment.claimed_until == None
def test_claim_for_update(session):
portfolio = PortfolioFactory.create(
applications=[
{
"environments": [
{
"cloud_id": uuid4().hex,
"root_user_info": {},
"baseline_info": None,
}
]
}
],
task_orders=[
{
"create_clins": [
{
"start_date": pendulum.now().subtract(days=1),
"end_date": pendulum.now().add(days=1),
}
]
}
],
)
environment = portfolio.applications[0].environments[0]
satisfied_claims = []
# Two threads that race to acquire a claim on the same environment.
# SecondThread's claim will be rejected, resulting in a ClaimFailedException.
class FirstThread(Thread):
def run(self):
with claim_for_update(environment):
satisfied_claims.append("FirstThread")
class SecondThread(Thread):
def run(self):
try:
with claim_for_update(environment):
satisfied_claims.append("SecondThread")
except ClaimFailedException:
pass
t1 = FirstThread()
t2 = SecondThread()
t1.start()
t2.start()
t1.join()
t2.join()
session.refresh(environment)
# Only FirstThread acquired a claim and wrote to satisfied_claims
assert satisfied_claims == ["FirstThread"]
# The claim is released
assert environment.claimed_until is None