Merge pull request #1062 from dod-ccpo/env-provisioning-task

Environment provisioning background jobs
This commit is contained in:
richard-dds 2019-09-16 09:58:18 -04:00 committed by GitHub
commit 79c8e4fc63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 613 additions and 31 deletions

3
.gitignore vendored
View File

@ -62,3 +62,6 @@ browserstacklocal
# configuration files
override.ini
atst-overrides.ini
# binary file created by celery beat
celerybeat-schedule

View File

@ -0,0 +1,30 @@
"""add Environment csp info
Revision ID: 502e79c55d2d
Revises: 4a3122ffe898
Create Date: 2019-09-05 13:54:23.840512
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '502e79c55d2d'
down_revision = '30ea1cb20807' # pragma: allowlist secret
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('environments', sa.Column('baseline_info', postgresql.JSONB(astext_type=sa.Text()), nullable=True))
op.add_column('environments', sa.Column('root_user_info', postgresql.JSONB(astext_type=sa.Text()), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('environments', 'root_user_info')
op.drop_column('environments', 'baseline_info')
# ### end Alembic commands ###

View File

@ -0,0 +1,30 @@
"""add Environment creator_role
Revision ID: cfab6c8243cb
Revises: 502e79c55d2d
Create Date: 2019-09-10 11:21:43.252592
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'cfab6c8243cb' # pragma: allowlist secret
down_revision = '502e79c55d2d'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('environments', sa.Column('creator_id', postgresql.UUID(as_uuid=True), nullable=False))
op.create_foreign_key("fk_users_id", 'environments', 'users', ['creator_id'], ['id'])
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint("fk_users_id", 'environments', type_='foreignkey')
op.drop_column('environments', 'creator_id')
# ### end Alembic commands ###

View File

@ -19,13 +19,13 @@ class Applications(BaseDomainClass):
resource_name = "application"
@classmethod
def create(cls, portfolio, name, description, environment_names):
def create(cls, user, portfolio, name, description, environment_names):
application = Application(
portfolio=portfolio, name=name, description=description
)
db.session.add(application)
Environments.create_many(application, environment_names)
Environments.create_many(user, application, environment_names)
db.session.commit()
return application

View File

@ -1,7 +1,10 @@
from sqlalchemy import text
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import load_only
from typing import List
from atst.database import db
from atst.models.environment import Environment
from atst.models import Environment, Application, Portfolio, TaskOrder, CLIN
from atst.domain.environment_roles import EnvironmentRoles
from .exceptions import NotFoundError
@ -9,17 +12,17 @@ from .exceptions import NotFoundError
class Environments(object):
@classmethod
def create(cls, application, name):
environment = Environment(application=application, name=name)
def create(cls, user, application, name):
environment = Environment(application=application, name=name, creator=user)
db.session.add(environment)
db.session.commit()
return environment
@classmethod
def create_many(cls, application, names):
def create_many(cls, user, application, names):
environments = []
for name in names:
environment = Environments.create(application, name)
environment = Environments.create(user, application, name)
environments.append(environment)
db.session.add_all(environments)
@ -90,3 +93,48 @@ class Environments(object):
# TODO: How do we work around environment deletion being a largely manual process in the CSPs
return environment
@classmethod
def base_provision_query(cls, now):
return (
db.session.query(Environment)
.join(Application)
.join(Portfolio)
.join(TaskOrder)
.join(CLIN)
.filter(CLIN.start_date <= now)
.filter(CLIN.end_date > now)
# select only these columns
.options(load_only("id", "creator_id"))
)
@classmethod
def get_environments_pending_creation(cls, now) -> List[Environment]:
"""
Any environment with an active CLIN that doesn't yet have a `cloud_id`.
"""
return cls.base_provision_query(now).filter(Environment.cloud_id == None).all()
@classmethod
def get_environments_pending_atat_user_creation(cls, now) -> List[Environment]:
"""
Any environment with an active CLIN that has a cloud_id but no `root_user_info`.
"""
return (
cls.base_provision_query(now)
.filter(Environment.cloud_id != None)
.filter(Environment.root_user_info == text("'null'"))
).all()
@classmethod
def get_environments_pending_baseline_creation(cls, now) -> List[Environment]:
"""
Any environment with an active CLIN that has a `cloud_id` and `root_user_info`
but no `baseline_info`.
"""
return (
cls.base_provision_query(now)
.filter(Environment.cloud_id != None)
.filter(Environment.root_user_info != text("'null'"))
.filter(Environment.baseline_info == text("'null'"))
).all()

View File

@ -1,8 +1,12 @@
from flask import current_app as app
import pendulum
from atst.queue import celery
from atst.database import db
from atst.queue import celery
from atst.models import EnvironmentJobFailure, EnvironmentRoleJobFailure
from atst.domain.csp.cloud import CloudProviderInterface, GeneralCSPException
from atst.domain.environments import Environments
from atst.domain.users import Users
class RecordEnvironmentFailure(celery.Task):
@ -38,3 +42,103 @@ def send_notification_mail(recipients, subject, body):
)
)
app.mailer.send(recipients, subject, body)
def do_create_environment(
csp: CloudProviderInterface, environment_id=None, atat_user_id=None
):
environment = Environments.get(environment_id)
if environment.cloud_id is not None:
# TODO: Return value for this?
return
user = Users.get(atat_user_id)
# we'll need to do some checking in this job for cases where it's retrying
# when a failure occured after some successful steps
# (e.g. if environment.cloud_id is not None, then we can skip first step)
# credentials either from a given user or pulled from config?
# if using global creds, do we need to log what user authorized action?
atat_root_creds = csp.root_creds()
# user is needed because baseline root account in the environment will
# be assigned to the requesting user, open question how to handle duplicate
# email addresses across new environments
csp_environment_id = csp.create_environment(atat_root_creds, user, environment)
environment.cloud_id = csp_environment_id
db.session.add(environment)
db.session.commit()
def do_create_atat_admin_user(csp: CloudProviderInterface, environment_id=None):
environment = Environments.get(environment_id)
atat_root_creds = csp.root_creds()
atat_remote_root_user = csp.create_atat_admin_user(
atat_root_creds, environment.cloud_id
)
environment.root_user_info = atat_remote_root_user
db.session.add(environment)
db.session.commit()
def do_create_environment_baseline(csp: CloudProviderInterface, environment_id=None):
environment = Environments.get(environment_id)
# ASAP switch to use remote root user for provisioning
atat_remote_root_creds = environment.root_user_info["credentials"]
baseline_info = csp.create_environment_baseline(
atat_remote_root_creds, environment.cloud_id
)
environment.baseline_info = baseline_info
db.session.add(environment)
db.session.commit()
def do_work(fn, task, csp, **kwargs):
try:
fn(csp, **kwargs)
except GeneralCSPException as e:
raise task.retry(exc=e)
@celery.task(bind=True)
def create_environment(self, environment_id=None, atat_user_id=None):
do_work(do_create_environment, self, app.csp.cloud, **kwargs)
@celery.task(bind=True)
def create_atat_admin_user(self, environment_id=None):
do_work(do_create_atat_admin_user, self, app.csp.cloud, **kwargs)
@celery.task(bind=True)
def create_environment_baseline(self, environment_id=None):
do_work(do_create_environment_baseline, self, app.csp.cloud, **kwargs)
@celery.task(bind=True)
def dispatch_create_environment(self):
for environment in Environments.get_environments_pending_creation(pendulum.now()):
create_environment.delay(
environment_id=environment.id, atat_user_id=environment.creator_id
)
@celery.task(bind=True)
def dispatch_create_atat_admin_user(self):
for environment in Environments.get_environments_pending_atat_user_creation(
pendulum.now()
):
create_atat_admin_user.delay(environment_id=environment.id)
@celery.task(bind=True)
def dispatch_create_environment_baseline(self):
for environment in Environments.get_environments_pending_baseline_creation(
pendulum.now()
):
create_environment_baseline.delay(environment_id=environment.id)

View File

@ -1,5 +1,7 @@
from sqlalchemy import Column, ForeignKey, String
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import JSONB
from enum import Enum
from atst.models import Base
from atst.models.types import Id
@ -17,10 +19,22 @@ class Environment(
application_id = Column(ForeignKey("applications.id"), nullable=False)
application = relationship("Application")
# User user.id as the foreign key here beacuse the Environment creator may
# not have an application role. We may need to revisit this if we receive any
# requirements around tracking an environment's custodian.
creator_id = Column(ForeignKey("users.id"), nullable=False)
creator = relationship("User")
cloud_id = Column(String)
root_user_info = Column(JSONB)
baseline_info = Column(JSONB)
job_failures = relationship("EnvironmentJobFailure")
class ProvisioningStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
@property
def users(self):
return {r.application_role.user for r in self.roles}
@ -41,6 +55,17 @@ class Environment(
def portfolio_id(self):
return self.application.portfolio_id
@property
def provisioning_status(self) -> ProvisioningStatus:
if (
self.cloud_id is None
or self.root_user_info is None
or self.baseline_info is None
):
return self.ProvisioningStatus.PENDING
else:
return self.ProvisioningStatus.COMPLETED
def __repr__(self):
return "<Environment(name='{}', num_users='{}', application='{}', portfolio='{}', id='{}')>".format(
self.name,

View File

@ -5,6 +5,7 @@ celery = Celery(__name__)
def update_celery(celery, app):
celery.conf.update(app.config)
celery.conf.CELERYBEAT_SCHEDULE = {}
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):

View File

@ -1,4 +1,4 @@
from flask import redirect, render_template, request as http_request, url_for
from flask import redirect, render_template, request as http_request, url_for, g
from . import applications_bp
from atst.domain.applications import Applications
@ -24,6 +24,7 @@ def create(portfolio_id):
if form.validate():
application_data = form.data
Applications.create(
g.current_user,
portfolio,
application_data["name"],
application_data["description"],

View File

@ -227,7 +227,9 @@ def new_environment(application_id):
env_form = EditEnvironmentForm(formdata=http_request.form)
if env_form.validate():
Environments.create(application=application, name=env_form.name.data)
Environments.create(
g.current_user, application=application, name=env_form.name.data
)
flash("environment_added", environment_name=env_form.data["name"])

View File

@ -4,7 +4,7 @@
set -e
WORKER="pipenv run celery -A celery_worker.celery worker --loglevel=info"
WORKER="pipenv run celery -A celery_worker.celery worker --loglevel=info -B"
if [[ `command -v entr` ]]; then
find atst | entr -r $WORKER

View File

@ -220,6 +220,7 @@ def add_applications_to_portfolio(portfolio):
applications = random_applications()
for application_data in applications:
application = Applications.create(
portfolio.owner,
portfolio=portfolio,
name=application_data["name"],
description=application_data["description"],
@ -278,7 +279,7 @@ def create_demo_portfolio(name, data):
portfolio=portfolio, name=mock_application.name, description=""
)
env_names = [env.name for env in mock_application.environments]
envs = Environments.create_many(application, env_names)
envs = Environments.create_many(portfolio.owner, application, env_names)
db.session.add(application)
db.session.commit()

View File

@ -21,7 +21,7 @@ from tests.factories import (
def test_create_application_with_multiple_environments():
portfolio = PortfolioFactory.create()
application = Applications.create(
portfolio, "My Test Application", "Test", ["dev", "prod"]
portfolio.owner, portfolio, "My Test Application", "Test", ["dev", "prod"]
)
assert application.portfolio == portfolio

View File

@ -1,4 +1,6 @@
import pytest
import pendulum
from uuid import uuid4
from atst.domain.environments import Environments
from atst.domain.environment_roles import EnvironmentRoles
@ -7,7 +9,6 @@ from atst.models.environment_role import CSPRole
from tests.factories import (
ApplicationFactory,
UserFactory,
PortfolioFactory,
EnvironmentFactory,
EnvironmentRoleFactory,
@ -15,12 +16,13 @@ from tests.factories import (
)
@pytest.mark.skip(reason="Reinstate and update once jobs api is up")
def test_create_environments():
application = ApplicationFactory.create()
environments = Environments.create_many(application, ["Staging", "Production"])
environments = Environments.create_many(
application.portfolio.owner, application, ["Staging", "Production"]
)
for env in environments:
assert env.cloud_id is not None
assert env.cloud_id is None
def test_update_env_role():
@ -87,3 +89,125 @@ def test_update_environment():
assert environment.name is not "name 2"
Environments.update(environment, name="name 2")
assert environment.name == "name 2"
class EnvQueryTest:
@property
def NOW(self):
return pendulum.now()
@property
def YESTERDAY(self):
return self.NOW.subtract(days=1)
@property
def TOMORROW(self):
return self.NOW.add(days=1)
def create_portfolio_with_clins(self, start_and_end_dates, env_data=None):
env_data = env_data or {}
return PortfolioFactory.create(
applications=[
{
"name": "Mos Eisley",
"description": "Where Han shot first",
"environments": [{"name": "thebar", **env_data}],
}
],
task_orders=[
{
"create_clins": [
{"start_date": start_date, "end_date": end_date}
for (start_date, end_date) in start_and_end_dates
]
}
],
)
class TestGetEnvironmentsPendingCreate(EnvQueryTest):
def test_with_expired_clins(self, session):
self.create_portfolio_with_clins([(self.YESTERDAY, self.YESTERDAY)])
assert len(Environments.get_environments_pending_creation(self.NOW)) == 0
def test_with_active_clins(self, session):
portfolio = self.create_portfolio_with_clins([(self.YESTERDAY, self.TOMORROW)])
Environments.get_environments_pending_creation(self.NOW) == [
portfolio.applications[0].environments[0].id
]
def test_with_future_clins(self, session):
self.create_portfolio_with_clins([(self.TOMORROW, self.TOMORROW)])
assert len(Environments.get_environments_pending_creation(self.NOW)) == 0
def test_with_already_provisioned_env(self, session):
self.create_portfolio_with_clins(
[(self.YESTERDAY, self.TOMORROW)], env_data={"cloud_id": uuid4().hex}
)
assert len(Environments.get_environments_pending_creation(self.NOW)) == 0
class TestGetEnvironmentsPendingAtatUserCreation(EnvQueryTest):
def test_with_provisioned_environment(self):
self.create_portfolio_with_clins(
[(self.YESTERDAY, self.TOMORROW)],
{"cloud_id": uuid4().hex, "root_user_info": {}},
)
assert (
len(Environments.get_environments_pending_atat_user_creation(self.NOW)) == 0
)
def test_with_unprovisioned_environment(self):
self.create_portfolio_with_clins(
[(self.YESTERDAY, self.TOMORROW)],
{"cloud_id": uuid4().hex, "root_user_info": None},
)
assert (
len(Environments.get_environments_pending_atat_user_creation(self.NOW)) == 1
)
def test_with_unprovisioned_expired_clins_environment(self):
self.create_portfolio_with_clins(
[(self.YESTERDAY, self.YESTERDAY)],
{"cloud_id": uuid4().hex, "root_user_info": None},
)
assert (
len(Environments.get_environments_pending_atat_user_creation(self.NOW)) == 0
)
class TestGetEnvironmentsPendingBaselineCreation(EnvQueryTest):
def test_with_provisioned_environment(self):
self.create_portfolio_with_clins(
[(self.YESTERDAY, self.TOMORROW)],
{
"cloud_id": uuid4().hex,
"root_user_info": {"foo": "bar"},
"baseline_info": {"foo": "bar"},
},
)
assert (
len(Environments.get_environments_pending_baseline_creation(self.NOW)) == 0
)
def test_with_unprovisioned_environment(self):
self.create_portfolio_with_clins(
[(self.YESTERDAY, self.TOMORROW)],
{
"cloud_id": uuid4().hex,
"root_user_info": {"foo": "bar"},
"baseline_info": None,
},
)
assert (
len(Environments.get_environments_pending_baseline_creation(self.NOW)) == 1
)
def test_with_unprovisioned_expired_clins_environment(self):
self.create_portfolio_with_clins(
[(self.YESTERDAY, self.YESTERDAY)],
{"cloud_id": uuid4().hex, "root_user_info": {"foo": "bar"}},
)
assert (
len(Environments.get_environments_pending_baseline_creation(self.NOW)) == 0
)

View File

@ -71,7 +71,11 @@ def test_update_portfolio_role_role(portfolio, portfolio_owner):
def test_scoped_portfolio_for_admin_missing_view_apps_perms(portfolio_owner, portfolio):
Applications.create(
portfolio, "My Application 2", "My application 2", ["dev", "staging", "prod"]
portfolio.owner,
portfolio,
"My Application 2",
"My application 2",
["dev", "staging", "prod"],
)
restricted_admin = UserFactory.create()
PortfolioRoleFactory.create(
@ -90,7 +94,11 @@ def test_scoped_portfolio_returns_all_applications_for_portfolio_admin(
):
for _ in range(5):
Applications.create(
portfolio, "My Application", "My application", ["dev", "staging", "prod"]
portfolio.owner,
portfolio,
"My Application",
"My application",
["dev", "staging", "prod"],
)
admin = UserFactory.create()
@ -109,7 +117,11 @@ def test_scoped_portfolio_returns_all_applications_for_portfolio_owner(
):
for _ in range(5):
Applications.create(
portfolio, "My Application", "My application", ["dev", "staging", "prod"]
portfolio.owner,
portfolio,
"My Application",
"My application",
["dev", "staging", "prod"],
)
scoped_portfolio = Portfolios.get(portfolio_owner, portfolio.id)

View File

@ -139,7 +139,7 @@ def test_update_adds_clins():
def test_update_does_not_duplicate_clins():
task_order = TaskOrderFactory.create(
number="3453453456", create_clins=["123", "456"]
number="3453453456", create_clins=[{"number": "123"}, {"number": "456"}]
)
clins = [
{
@ -171,7 +171,9 @@ def test_update_does_not_duplicate_clins():
def test_delete_task_order_with_clins(session):
task_order = TaskOrderFactory.create(create_clins=[1, 2, 3])
task_order = TaskOrderFactory.create(
create_clins=[{"number": 1}, {"number": 2}, {"number": 3}]
)
TaskOrders.delete(task_order.id)
assert not session.query(

View File

@ -118,6 +118,7 @@ class PortfolioFactory(Base):
with_applications = kwargs.pop("applications", [])
owner = kwargs.pop("owner", UserFactory.create())
members = kwargs.pop("members", [])
with_task_orders = kwargs.pop("task_orders", [])
portfolio = super()._create(model_class, *args, **kwargs)
@ -126,6 +127,11 @@ class PortfolioFactory(Base):
for p in with_applications
]
task_orders = [
TaskOrderFactory.create(portfolio=portfolio, **to)
for to in with_task_orders
]
PortfolioRoleFactory.create(
portfolio=portfolio,
user=owner,
@ -154,6 +160,7 @@ class PortfolioFactory(Base):
)
portfolio.applications = applications
portfolio.task_orders = task_orders
return portfolio
@ -185,6 +192,7 @@ class EnvironmentFactory(Base):
name = factory.Faker("domain_word")
application = factory.SubFactory(ApplicationFactory)
creator = factory.SubFactory(UserFactory)
@classmethod
def _create(cls, model_class, *args, **kwargs):
@ -278,7 +286,7 @@ class TaskOrderFactory(Base):
task_order = super()._create(model_class, *args, **kwargs)
for clin in create_clins:
CLINFactory.create(task_order=task_order, number=clin)
CLINFactory.create(task_order=task_order, **clin)
return task_order

View File

@ -1,6 +1,7 @@
import pytest
from atst.models import AuditEvent
from atst.models.environment_role import CSPRole
from atst.domain.environments import Environments
from atst.domain.applications import Applications
from tests.factories import *
@ -12,7 +13,11 @@ def test_add_user_to_environment():
portfolio = PortfolioFactory.create(owner=owner)
application = Applications.create(
portfolio, "my test application", "It's mine.", ["dev", "staging", "prod"]
portfolio.owner,
portfolio,
"my test application",
"It's mine.",
["dev", "staging", "prod"],
)
dev_environment = application.environments[0]
@ -42,3 +47,29 @@ def test_audit_event_for_environment_deletion(session):
before, after = update_event.changed_state["deleted"]
assert not before
assert after
@pytest.mark.parametrize(
"env_data,expected_status",
[
[
{"cloud_id": None, "root_user_info": None, "baseline_info": None},
Environment.ProvisioningStatus.PENDING,
],
[
{"cloud_id": 1, "root_user_info": None, "baseline_info": None},
Environment.ProvisioningStatus.PENDING,
],
[
{"cloud_id": 1, "root_user_info": {}, "baseline_info": None},
Environment.ProvisioningStatus.PENDING,
],
[
{"cloud_id": 1, "root_user_info": {}, "baseline_info": {}},
Environment.ProvisioningStatus.COMPLETED,
],
],
)
def test_environment_provisioning_status(env_data, expected_status):
environment = EnvironmentFactory.create(**env_data)
assert environment.provisioning_status == expected_status

View File

@ -70,6 +70,7 @@ def test_update_environment_failure(client, user_session):
def test_application_settings(client, user_session):
portfolio = PortfolioFactory.create()
application = Applications.create(
portfolio.owner,
portfolio,
"Snazzy Application",
"A new application for me and my friends",
@ -85,6 +86,7 @@ def test_application_settings(client, user_session):
def test_edit_application_environments_obj(app, client, user_session):
portfolio = PortfolioFactory.create()
application = Applications.create(
portfolio.owner,
portfolio,
"Snazzy Application",
"A new application for me and my friends",
@ -125,6 +127,7 @@ def test_edit_application_environments_obj(app, client, user_session):
def test_data_for_app_env_roles_form(app, client, user_session):
portfolio = PortfolioFactory.create()
application = Applications.create(
portfolio.owner,
portfolio,
"Snazzy Application",
"A new application for me and my friends",

View File

@ -29,7 +29,7 @@ def completed_task_order():
task_order = TaskOrderFactory.create(
creator=portfolio.owner,
portfolio=portfolio,
create_clins=["1234567890123456789012345678901234567890123"],
create_clins=[{"number": "1234567890123456789012345678901234567890123"}],
)
return task_order
@ -334,7 +334,7 @@ def test_task_orders_submit_task_order(client, user_session, task_order):
({"_pdf": None, "number": "", "clins": []}, "step_1"),
({"number": "", "clins": []}, "step_2"),
({"number": "1234567890123", "clins": []}, "step_3"),
({"number": "1234567890123", "create_clins": [1]}, "step_4"),
({"number": "1234567890123", "create_clins": [{"number": 1}]}, "step_4"),
],
)
def test_task_orders_edit_redirects_to_latest_incomplete_step(

View File

@ -504,7 +504,7 @@ def test_task_orders_new_get_routes(get_url_assert_status):
task_order = TaskOrderFactory.create(
creator=owner,
portfolio=portfolio,
create_clins=["1234567890123456789012345678901234567890123"],
create_clins=[{"number": "1234567890123456789012345678901234567890123"}],
)
for route in get_routes:

View File

@ -1,8 +1,26 @@
import pendulum
import pytest
from uuid import uuid4
from unittest.mock import Mock
from atst.jobs import RecordEnvironmentFailure, RecordEnvironmentRoleFailure
from tests.factories import EnvironmentFactory, EnvironmentRoleFactory
from atst.models import Environment
from atst.domain.csp.cloud import MockCloudProvider
from atst.jobs import (
RecordEnvironmentFailure,
RecordEnvironmentRoleFailure,
do_create_environment,
do_create_atat_admin_user,
do_create_environment_baseline,
dispatch_create_environment,
dispatch_create_atat_admin_user,
dispatch_create_environment_baseline,
)
from tests.factories import (
EnvironmentFactory,
EnvironmentRoleFactory,
UserFactory,
PortfolioFactory,
)
def test_environment_job_failure(celery_app, celery_worker):
@ -39,3 +57,142 @@ def test_environment_role_job_failure(celery_app, celery_worker):
assert role.job_failures
job_failure = role.job_failures[0]
assert job_failure.task == task
now = pendulum.now()
yesterday = now.subtract(days=1)
tomorrow = now.add(days=1)
from atst.domain.environments import Environments
@pytest.fixture(autouse=True, scope="function")
def csp():
return Mock(wraps=MockCloudProvider({}, with_delay=False, with_failure=False))
def test_create_environment_job(session, csp):
user = UserFactory.create()
environment = EnvironmentFactory.create()
do_create_environment(csp, environment.id, user.id)
environment_id = environment.id
del environment
updated_environment = session.query(Environment).get(environment_id)
assert updated_environment.cloud_id
def test_create_environment_job_is_idempotent(csp, session):
user = UserFactory.create()
environment = EnvironmentFactory.create(cloud_id=uuid4().hex)
do_create_environment(csp, environment.id, user.id)
csp.create_environment.assert_not_called()
def test_create_atat_admin_user(csp, session):
environment = EnvironmentFactory.create(cloud_id="something")
do_create_atat_admin_user(csp, environment.id)
environment_id = environment.id
del environment
updated_environment = session.query(Environment).get(environment_id)
assert updated_environment.root_user_info
def test_create_environment_baseline(csp, session):
environment = EnvironmentFactory.create(
root_user_info={"credentials": csp.root_creds()}
)
do_create_environment_baseline(csp, environment.id)
environment_id = environment.id
del environment
updated_environment = session.query(Environment).get(environment_id)
assert updated_environment.baseline_info
def test_dispatch_create_environment(session, monkeypatch):
portfolio = PortfolioFactory.create(
applications=[{"environments": [{}]}],
task_orders=[
{
"create_clins": [
{
"start_date": pendulum.now().subtract(days=1),
"end_date": pendulum.now().add(days=1),
}
]
}
],
)
mock = Mock()
monkeypatch.setattr("atst.jobs.create_environment", mock)
environment = portfolio.applications[0].environments[0]
dispatch_create_environment.run()
mock.delay.assert_called_once_with(
environment_id=environment.id, atat_user_id=environment.creator_id
)
def test_dispatch_create_atat_admin_user(session, monkeypatch):
portfolio = PortfolioFactory.create(
applications=[
{"environments": [{"cloud_id": uuid4().hex, "root_user_info": None}]}
],
task_orders=[
{
"create_clins": [
{
"start_date": pendulum.now().subtract(days=1),
"end_date": pendulum.now().add(days=1),
}
]
}
],
)
mock = Mock()
monkeypatch.setattr("atst.jobs.create_atat_admin_user", mock)
environment = portfolio.applications[0].environments[0]
dispatch_create_atat_admin_user.run()
mock.delay.assert_called_once_with(environment_id=environment.id)
def test_dispatch_create_environment_baseline(session, monkeypatch):
portfolio = PortfolioFactory.create(
applications=[
{
"environments": [
{
"cloud_id": uuid4().hex,
"root_user_info": {},
"baseline_info": None,
}
]
}
],
task_orders=[
{
"create_clins": [
{
"start_date": pendulum.now().subtract(days=1),
"end_date": pendulum.now().add(days=1),
}
]
}
],
)
mock = Mock()
monkeypatch.setattr("atst.jobs.create_environment_baseline", mock)
environment = portfolio.applications[0].environments[0]
dispatch_create_environment_baseline.run()
mock.delay.assert_called_once_with(environment_id=environment.id)