Orchestration for creating app management groups.

This adds:
- A Celery beat task for enqueuing application creation tasks
- A Celery task for creating the application
- Payload and Response dataclasses for creating management groups

It also does some incidental cleanup.
This commit is contained in:
dandds
2020-01-25 17:29:17 -05:00
parent bfc0692063
commit 8810a59e0a
7 changed files with 258 additions and 15 deletions

View File

@@ -10,6 +10,7 @@ from atst.domain.portfolios import Portfolios
from atst.jobs import (
RecordFailure,
dispatch_create_environment,
dispatch_create_application,
dispatch_create_atat_admin_user,
dispatch_provision_portfolio,
dispatch_provision_user,
@@ -17,6 +18,7 @@ from atst.jobs import (
do_provision_user,
do_provision_portfolio,
do_create_environment,
do_create_application,
do_create_atat_admin_user,
)
from atst.models.utils import claim_for_update
@@ -26,6 +28,7 @@ from tests.factories import (
EnvironmentRoleFactory,
PortfolioFactory,
PortfolioStateMachineFactory,
ApplicationFactory,
ApplicationRoleFactory,
)
from atst.models import CSPRole, EnvironmentRole, ApplicationRoleStatus, JobFailure
@@ -105,6 +108,24 @@ def test_create_environment_job_is_idempotent(csp, session):
csp.create_environment.assert_not_called()
def test_create_application_job(session, csp):
portfolio = PortfolioFactory.create(
csp_data={"tenant_id": str(uuid4()), "root_management_group_id": str(uuid4())}
)
application = ApplicationFactory.create(portfolio=portfolio, cloud_id=None)
do_create_application(csp, application.id)
session.refresh(application)
assert application.cloud_id
def test_create_application_job_is_idempotent(csp):
application = ApplicationFactory.create(cloud_id=uuid4())
do_create_application(csp, application.id)
csp.create_application.assert_not_called()
def test_create_atat_admin_user(csp, session):
environment = EnvironmentFactory.create(cloud_id="something")
do_create_atat_admin_user(csp, environment.id)
@@ -145,6 +166,21 @@ def test_dispatch_create_environment(session, monkeypatch):
mock.delay.assert_called_once_with(environment_id=e1.id)
def test_dispatch_create_application(monkeypatch):
portfolio = PortfolioFactory.create(state="COMPLETED")
app = ApplicationFactory.create(portfolio=portfolio)
mock = Mock()
monkeypatch.setattr("atst.jobs.create_application", mock)
# When dispatch_create_application is called
dispatch_create_application.run()
# It should cause the create_application task to be called once
# with the application id
mock.delay.assert_called_once_with(application_id=app.id)
def test_dispatch_create_atat_admin_user(session, monkeypatch):
portfolio = PortfolioFactory.create(
applications=[