atst/tests/test_jobs.py

570 lines
18 KiB
Python

import pendulum
import pytest
from uuid import uuid4
from unittest.mock import Mock, MagicMock
from smtplib import SMTPException
from azure.core.exceptions import AzureError
from atst.domain.csp.cloud import MockCloudProvider
from atst.domain.csp.cloud.models import BillingInstructionCSPPayload, UserRoleCSPResult
from atst.domain.portfolios import Portfolios
from atst.models import ApplicationRoleStatus, Portfolio, FSMStates
from atst.jobs import (
RecordFailure,
dispatch_create_environment,
dispatch_create_application,
dispatch_create_user,
dispatch_create_environment_role,
dispatch_provision_portfolio,
create_billing_instruction,
create_environment,
do_create_user,
do_provision_portfolio,
do_create_environment,
do_create_environment_role,
do_create_application,
send_PPOC_email,
send_task_order_files,
)
from tests.factories import (
ApplicationFactory,
ApplicationRoleFactory,
CLINFactory,
EnvironmentFactory,
EnvironmentRoleFactory,
PortfolioFactory,
PortfolioStateMachineFactory,
TaskOrderFactory,
UserFactory,
)
from atst.models import CSPRole, EnvironmentRole, ApplicationRoleStatus, JobFailure
from atst.utils.localization import translate
@pytest.fixture(autouse=True, scope="function")
def csp():
return Mock(wraps=MockCloudProvider({}, with_delay=False, with_failure=False))
@pytest.fixture(scope="function")
def portfolio():
portfolio = PortfolioFactory.create()
return portfolio
def _find_failure(session, entity, id_):
return (
session.query(JobFailure)
.filter(JobFailure.entity == entity)
.filter(JobFailure.entity_id == id_)
.one()
)
def test_environment_job_failure(session, celery_app, celery_worker):
@celery_app.task(bind=True, base=RecordFailure)
def _fail_hard(self, environment_id=None):
raise ValueError("something bad happened")
environment = EnvironmentFactory.create()
celery_worker.reload()
# Use apply instead of delay since we are testing the on_failure hook only
task = _fail_hard.apply(kwargs={"environment_id": environment.id})
with pytest.raises(ValueError):
task.get()
job_failure = _find_failure(session, "environment", str(environment.id))
assert job_failure.task == task
def test_environment_role_job_failure(session, celery_app, celery_worker):
@celery_app.task(bind=True, base=RecordFailure)
def _fail_hard(self, environment_role_id=None):
raise ValueError("something bad happened")
role = EnvironmentRoleFactory.create()
celery_worker.reload()
# Use apply instead of delay since we are testing the on_failure hook only
task = _fail_hard.apply(kwargs={"environment_role_id": role.id})
with pytest.raises(ValueError):
task.get()
job_failure = _find_failure(session, "environment_role", str(role.id))
assert job_failure.task == task
now = pendulum.now()
yesterday = now.subtract(days=1)
tomorrow = now.add(days=1)
def test_create_environment_job(session, csp):
environment = EnvironmentFactory.create()
environment.application.cloud_id = "parentId"
environment.application.portfolio.csp_data = {"tenant_id": "fake"}
session.add(environment)
session.commit()
do_create_environment(csp, environment.id)
session.refresh(environment)
assert environment.cloud_id
def test_create_environment_job_is_idempotent(csp, session):
environment = EnvironmentFactory.create(cloud_id=uuid4().hex)
do_create_environment(csp, environment.id)
csp.create_environment.assert_not_called()
def test_create_application_job(session, csp):
portfolio = PortfolioFactory.create(
csp_data={"tenant_id": str(uuid4()), "root_management_group_id": str(uuid4())}
)
application = ApplicationFactory.create(portfolio=portfolio, cloud_id=None)
do_create_application(csp, application.id)
session.refresh(application)
assert application.cloud_id
def test_create_application_job_is_idempotent(csp):
application = ApplicationFactory.create(cloud_id=uuid4())
do_create_application(csp, application.id)
csp.create_application.assert_not_called()
class TestCreateUserJob:
@pytest.fixture
def portfolio(self, app):
return PortfolioFactory.create(
csp_data={
"tenant_id": str(uuid4()),
"domain_name": f"rebelalliance.{app.config.get('OFFICE_365_DOMAIN')}",
}
)
@pytest.fixture
def app_1(self, portfolio):
return ApplicationFactory.create(portfolio=portfolio, cloud_id="321")
@pytest.fixture
def app_2(self, portfolio):
return ApplicationFactory.create(portfolio=portfolio, cloud_id="123")
@pytest.fixture
def user(self):
return UserFactory.create(
first_name="Han", last_name="Solo", email="han@example.com"
)
@pytest.fixture
def app_role_1(self, app_1, user):
return ApplicationRoleFactory.create(
application=app_1,
user=user,
status=ApplicationRoleStatus.ACTIVE,
cloud_id=None,
)
@pytest.fixture
def app_role_2(self, app_2, user):
return ApplicationRoleFactory.create(
application=app_2,
user=user,
status=ApplicationRoleStatus.ACTIVE,
cloud_id=None,
)
def test_create_user_job(self, session, csp, app_role_1):
assert not app_role_1.cloud_id
session.begin_nested()
do_create_user(csp, [app_role_1.id])
session.rollback()
assert app_role_1.cloud_id
def test_create_user_sends_email(self, monkeypatch, csp, app_role_1, app_role_2):
mock = Mock()
monkeypatch.setattr("atst.jobs.send_mail", mock)
do_create_user(csp, [app_role_1.id, app_role_2.id])
assert mock.call_count == 1
def test_dispatch_create_environment(session, monkeypatch):
# Given that I have a portfolio with an active CLIN and two environments,
# one of which is deleted
portfolio = PortfolioFactory.create(
applications=[{"environments": [{}, {}], "cloud_id": uuid4().hex}],
task_orders=[
{
"create_clins": [
{
"start_date": pendulum.now().subtract(days=1),
"end_date": pendulum.now().add(days=1),
}
]
}
],
)
[e1, e2] = portfolio.applications[0].environments
e2.deleted = True
session.add(e2)
session.commit()
mock = Mock()
monkeypatch.setattr("atst.jobs.create_environment", mock)
# When dispatch_create_environment is called
dispatch_create_environment.run()
# It should cause the create_environment task to be called once with the
# non-deleted environment
mock.delay.assert_called_once_with(environment_id=e1.id)
def test_dispatch_create_application(monkeypatch):
portfolio = PortfolioFactory.create(state="COMPLETED")
app = ApplicationFactory.create(portfolio=portfolio)
mock = Mock()
monkeypatch.setattr("atst.jobs.create_application", mock)
# When dispatch_create_application is called
dispatch_create_application.run()
# It should cause the create_application task to be called once
# with the application id
mock.delay.assert_called_once_with(application_id=app.id)
def test_dispatch_create_user(monkeypatch):
application = ApplicationFactory.create(cloud_id="123")
user = UserFactory.create(
first_name="Han", last_name="Solo", email="han@example.com"
)
app_role = ApplicationRoleFactory.create(
application=application,
user=user,
status=ApplicationRoleStatus.ACTIVE,
cloud_id=None,
)
mock = Mock()
monkeypatch.setattr("atst.jobs.create_user", mock)
# When dispatch_create_user is called
dispatch_create_user.run()
# It should cause the create_user task to be called once
# with the application id
mock.delay.assert_called_once_with(application_role_ids=[app_role.id])
def test_create_environment_no_dupes(session, celery_app, celery_worker):
portfolio = PortfolioFactory.create(
applications=[{"environments": [{"cloud_id": uuid4().hex}]}],
task_orders=[
{
"create_clins": [
{
"start_date": pendulum.now().subtract(days=1),
"end_date": pendulum.now().add(days=1),
}
]
}
],
)
environment = portfolio.applications[0].environments[0]
# create_environment is run twice on the same environment
create_environment.run(environment_id=environment.id)
session.refresh(environment)
first_cloud_id = environment.cloud_id
create_environment.run(environment_id=environment.id)
session.refresh(environment)
# The environment's cloud_id was not overwritten in the second run
assert environment.cloud_id == first_cloud_id
# The environment's claim was released
assert environment.claimed_until == None
def test_dispatch_provision_portfolio(csp, monkeypatch):
portfolio = PortfolioFactory.create(
task_orders=[
{
"create_clins": [
{
"start_date": pendulum.now().subtract(days=1),
"end_date": pendulum.now().add(days=1),
}
]
}
],
)
sm = PortfolioStateMachineFactory.create(portfolio=portfolio)
mock = Mock()
monkeypatch.setattr("atst.jobs.provision_portfolio", mock)
dispatch_provision_portfolio.run()
mock.delay.assert_called_once_with(portfolio_id=portfolio.id)
class TestDoProvisionPortfolio:
def test_portfolio_has_state_machine(self, csp, session, portfolio):
do_provision_portfolio(csp=csp, portfolio_id=portfolio.id)
session.refresh(portfolio)
assert portfolio.state_machine
def test_sends_email_to_PPOC_on_completion(
self, monkeypatch, csp, portfolio: Portfolio
):
mock = Mock()
monkeypatch.setattr("atst.jobs.send_PPOC_email", mock)
csp._authorize.return_value = None
csp._maybe_raise.return_value = None
sm: PortfolioStateMachine = PortfolioStateMachineFactory.create(
portfolio=portfolio
)
# The stage before "COMPLETED"
sm.state = FSMStates.BILLING_OWNER_CREATED
do_provision_portfolio(csp=csp, portfolio_id=portfolio.id)
# send_PPOC_email was called
assert mock.assert_called_once
def test_send_ppoc_email(monkeypatch, app):
mock = Mock()
monkeypatch.setattr("atst.jobs.send_mail", mock)
ppoc_email = "example@example.com"
user_id = "user_id"
domain_name = "domain"
send_PPOC_email(
{
"password_recovery_email_address": ppoc_email,
"user_id": user_id,
"domain_name": domain_name,
}
)
mock.assert_called_once_with(
recipients=[ppoc_email],
subject=translate("email.portfolio_ready.subject"),
body=translate(
"email.portfolio_ready.body",
{
"password_reset_address": app.config.get("AZURE_LOGIN_URL"),
"username": f"{user_id}@{domain_name}.{app.config.get('OFFICE_365_DOMAIN')}",
},
),
)
def test_provision_portfolio_create_tenant(
csp, session, portfolio, celery_app, celery_worker, monkeypatch
):
sm = PortfolioStateMachineFactory.create(portfolio=portfolio)
# mock = Mock()
# monkeypatch.setattr("atst.jobs.provision_portfolio", mock)
# dispatch_provision_portfolio.run()
# mock.delay.assert_called_once_with(portfolio_id=portfolio.id)
def test_dispatch_create_environment_role(monkeypatch):
portfolio = PortfolioFactory.create(csp_data={"tenant_id": "123"})
app_role = ApplicationRoleFactory.create(
application=ApplicationFactory.create(portfolio=portfolio),
status=ApplicationRoleStatus.ACTIVE,
cloud_id="123",
)
env_role = EnvironmentRoleFactory.create(application_role=app_role)
mock = Mock()
monkeypatch.setattr("atst.jobs.create_environment_role", mock)
dispatch_create_environment_role.run()
mock.delay.assert_called_once_with(environment_role_id=env_role.id)
def test_create_environment_role():
portfolio = PortfolioFactory.create(csp_data={"tenant_id": "123"})
app = ApplicationFactory.create(portfolio=portfolio)
app_role = ApplicationRoleFactory.create(
application=app, status=ApplicationRoleStatus.ACTIVE, cloud_id="123",
)
env = EnvironmentFactory.create(application=app, cloud_id="123")
env_role = EnvironmentRoleFactory.create(
environment=env, application_role=app_role, cloud_id=None
)
csp = Mock()
result = UserRoleCSPResult(id="a-cloud-id")
csp.create_user_role = MagicMock(return_value=result)
do_create_environment_role(csp, environment_role_id=env_role.id)
assert env_role.cloud_id == "a-cloud-id"
class TestSendTaskOrderFiles:
@pytest.fixture(scope="function")
def send_mail(self, monkeypatch):
mock = Mock()
monkeypatch.setattr("atst.jobs.send_mail", mock)
return mock
@pytest.fixture(scope="function")
def download_task_order(self, monkeypatch):
def _download_task_order(MockFileService, object_name):
return {"name": object_name}
monkeypatch.setattr(
"atst.domain.csp.files.MockFileService.download_task_order",
_download_task_order,
)
def test_sends_multiple_emails(self, send_mail, download_task_order):
# Create 3 Task Orders
for i in range(3):
TaskOrderFactory.create(create_clins=[{"number": "0001"}])
send_task_order_files.run()
# Check that send_with_attachment was called once for each task order
assert send_mail.call_count == 3
def test_kwargs(self, send_mail, download_task_order, app):
task_order = TaskOrderFactory.create(create_clins=[{"number": "0001"}])
send_task_order_files.run()
# Check that send_with_attachment was called with correct kwargs
send_mail.assert_called_once_with(
recipients=[app.config.get("MICROSOFT_TASK_ORDER_EMAIL_ADDRESS")],
subject=translate(
"email.task_order_sent.subject", {"to_number": task_order.number}
),
body=translate(
"email.task_order_sent.body", {"to_number": task_order.number}
),
attachments=[
{
"name": task_order.pdf.object_name,
"maintype": "application",
"subtype": "pdf",
}
],
)
assert task_order.pdf_last_sent_at
def test_send_failure(self, monkeypatch):
def _raise_smtp_exception(**kwargs):
raise SMTPException
monkeypatch.setattr("atst.jobs.send_mail", _raise_smtp_exception)
task_order = TaskOrderFactory.create(create_clins=[{"number": "0001"}])
send_task_order_files.run()
# Check that pdf_last_sent_at has not been updated
assert not task_order.pdf_last_sent_at
def test_download_failure(self, send_mail, monkeypatch):
def _download_task_order(MockFileService, object_name):
raise AzureError("something went wrong")
monkeypatch.setattr(
"atst.domain.csp.files.MockFileService.download_task_order",
_download_task_order,
)
task_order = TaskOrderFactory.create(create_clins=[{"number": "0002"}])
send_task_order_files.run()
# Check that pdf_last_sent_at has not been updated
assert not task_order.pdf_last_sent_at
class TestCreateBillingInstructions:
@pytest.fixture
def unsent_clin(self):
start_date = pendulum.now().subtract(days=1)
portfolio = PortfolioFactory.create(
csp_data={
"tenant_id": str(uuid4()),
"billing_account_name": "fake",
"billing_profile_name": "fake",
},
task_orders=[{"create_clins": [{"start_date": start_date}]}],
)
return portfolio.task_orders[0].clins[0]
def test_update_clin_last_sent_at(self, session, unsent_clin):
assert not unsent_clin.last_sent_at
# The session needs to be nested to prevent detached SQLAlchemy instance
session.begin_nested()
create_billing_instruction()
# check that last_sent_at has been updated
assert unsent_clin.last_sent_at
session.rollback()
def test_failure(self, monkeypatch, session, unsent_clin):
def _create_billing_instruction(MockCloudProvider, object_name):
raise AzureError("something went wrong")
monkeypatch.setattr(
"atst.domain.csp.cloud.MockCloudProvider.create_billing_instruction",
_create_billing_instruction,
)
# The session needs to be nested to prevent detached SQLAlchemy instance
session.begin_nested()
create_billing_instruction()
# check that last_sent_at has not been updated
assert not unsent_clin.last_sent_at
session.rollback()
def test_task_order_with_multiple_clins(self, session):
start_date = pendulum.now(tz="UTC").subtract(days=1)
portfolio = PortfolioFactory.create(
csp_data={
"tenant_id": str(uuid4()),
"billing_account_name": "fake",
"billing_profile_name": "fake",
},
task_orders=[
{
"create_clins": [
{"start_date": start_date, "last_sent_at": start_date}
]
}
],
)
task_order = portfolio.task_orders[0]
sent_clin = task_order.clins[0]
# Add new CLIN to the Task Order
new_clin = CLINFactory.create(task_order=task_order)
assert not new_clin.last_sent_at
session.begin_nested()
create_billing_instruction()
session.add(sent_clin)
# check that last_sent_at has been update for the new clin only
assert new_clin.last_sent_at
assert sent_clin.last_sent_at != new_clin.last_sent_at
session.rollback()