Merge branch 'staging' into state-machine-unit-tests
This commit is contained in:
@@ -1523,3 +1523,23 @@ def test_update_tenant_creds(mock_azure: AzureCloudProvider):
|
||||
assert updated_secret == KeyVaultCredentials(
|
||||
**{**existing_secrets, **MOCK_CREDS}
|
||||
)
|
||||
|
||||
|
||||
def test_get_calculator_creds(mock_azure: AzureCloudProvider):
|
||||
mock_azure.sdk.adal.AuthenticationContext.return_value.acquire_token_with_client_credentials.return_value = {
|
||||
"accessToken": "TOKEN"
|
||||
}
|
||||
assert mock_azure._get_calculator_creds() == "TOKEN"
|
||||
|
||||
|
||||
def test_get_calculator_url(mock_azure: AzureCloudProvider):
|
||||
with patch.object(
|
||||
AzureCloudProvider,
|
||||
"_get_calculator_creds",
|
||||
wraps=mock_azure._get_calculator_creds,
|
||||
) as _get_calculator_creds:
|
||||
_get_calculator_creds.return_value = "TOKEN"
|
||||
assert (
|
||||
mock_azure.get_calculator_url()
|
||||
== f"{mock_azure.config.get('AZURE_CALC_URL')}?access_token=TOKEN"
|
||||
)
|
||||
|
@@ -9,6 +9,27 @@ from atst.models.task_order import TaskOrder, SORT_ORDERING, Status
|
||||
from tests.factories import TaskOrderFactory, CLINFactory, PortfolioFactory
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def new_task_order():
|
||||
return TaskOrderFactory.create(create_clins=[{}])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def updated_task_order():
|
||||
return TaskOrderFactory.create(
|
||||
create_clins=[{"last_sent_at": pendulum.date(2020, 2, 1)}],
|
||||
pdf_last_sent_at=pendulum.date(2020, 1, 1),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sent_task_order():
|
||||
return TaskOrderFactory.create(
|
||||
create_clins=[{"last_sent_at": pendulum.date(2020, 1, 1)}],
|
||||
pdf_last_sent_at=pendulum.date(2020, 1, 1),
|
||||
)
|
||||
|
||||
|
||||
def test_create_adds_clins():
|
||||
portfolio = PortfolioFactory.create()
|
||||
clins = [
|
||||
@@ -181,19 +202,18 @@ def test_allows_alphanumeric_number():
|
||||
assert TaskOrders.create(portfolio.id, number, [], None)
|
||||
|
||||
|
||||
def test_get_for_send_task_order_files():
|
||||
new_to = TaskOrderFactory.create(create_clins=[{}])
|
||||
updated_to = TaskOrderFactory.create(
|
||||
create_clins=[{"last_sent_at": pendulum.datetime(2020, 2, 1)}],
|
||||
pdf_last_sent_at=pendulum.datetime(2020, 1, 1),
|
||||
)
|
||||
sent_to = TaskOrderFactory.create(
|
||||
create_clins=[{"last_sent_at": pendulum.datetime(2020, 1, 1)}],
|
||||
pdf_last_sent_at=pendulum.datetime(2020, 1, 1),
|
||||
)
|
||||
|
||||
def test_get_for_send_task_order_files(
|
||||
new_task_order, updated_task_order, sent_task_order
|
||||
):
|
||||
updated_and_new_task_orders = TaskOrders.get_for_send_task_order_files()
|
||||
assert len(updated_and_new_task_orders) == 2
|
||||
assert sent_to not in updated_and_new_task_orders
|
||||
assert updated_to in updated_and_new_task_orders
|
||||
assert new_to in updated_and_new_task_orders
|
||||
assert sent_task_order not in updated_and_new_task_orders
|
||||
assert updated_task_order in updated_and_new_task_orders
|
||||
assert new_task_order in updated_and_new_task_orders
|
||||
|
||||
|
||||
def test_get_clins_for_create_billing_instructions(new_task_order, sent_task_order):
|
||||
new_clins = TaskOrders.get_clins_for_create_billing_instructions()
|
||||
assert len(new_clins) == 1
|
||||
assert new_task_order.clins[0] in new_clins
|
||||
assert sent_task_order.clins[0] not in new_clins
|
||||
|
@@ -4,6 +4,9 @@ from unittest.mock import Mock
|
||||
from atst.domain.csp.cloud import AzureCloudProvider
|
||||
|
||||
AZURE_CONFIG = {
|
||||
"AZURE_CALC_CLIENT_ID": "MOCK",
|
||||
"AZURE_CALC_SECRET": "MOCK", # pragma: allowlist secret
|
||||
"AZURE_CALC_RESOURCE": "http://calc",
|
||||
"AZURE_CLIENT_ID": "MOCK",
|
||||
"AZURE_SECRET_KEY": "MOCK",
|
||||
"AZURE_TENANT_ID": "MOCK",
|
||||
|
@@ -6,7 +6,8 @@ from smtplib import SMTPException
|
||||
from azure.core.exceptions import AzureError
|
||||
|
||||
from atst.domain.csp.cloud import MockCloudProvider
|
||||
from atst.domain.csp.cloud.models import UserRoleCSPResult
|
||||
from atst.domain.csp.cloud.models import BillingInstructionCSPPayload, UserRoleCSPResult
|
||||
from atst.domain.portfolios import Portfolios
|
||||
from atst.models import ApplicationRoleStatus, Portfolio, FSMStates
|
||||
|
||||
from atst.jobs import (
|
||||
@@ -16,7 +17,7 @@ from atst.jobs import (
|
||||
dispatch_create_user,
|
||||
dispatch_create_environment_role,
|
||||
dispatch_provision_portfolio,
|
||||
dispatch_send_task_order_files,
|
||||
create_billing_instruction,
|
||||
create_environment,
|
||||
do_create_user,
|
||||
do_provision_portfolio,
|
||||
@@ -24,10 +25,12 @@ from atst.jobs import (
|
||||
do_create_environment_role,
|
||||
do_create_application,
|
||||
send_PPOC_email,
|
||||
send_task_order_files,
|
||||
)
|
||||
from tests.factories import (
|
||||
ApplicationFactory,
|
||||
ApplicationRoleFactory,
|
||||
CLINFactory,
|
||||
EnvironmentFactory,
|
||||
EnvironmentRoleFactory,
|
||||
PortfolioFactory,
|
||||
@@ -135,28 +138,63 @@ def test_create_application_job_is_idempotent(csp):
|
||||
csp.create_application.assert_not_called()
|
||||
|
||||
|
||||
def test_create_user_job(session, csp, app):
|
||||
portfolio = PortfolioFactory.create(
|
||||
csp_data={
|
||||
"tenant_id": str(uuid4()),
|
||||
"domain_name": f"rebelalliance.{app.config.get('OFFICE_365_DOMAIN')}",
|
||||
}
|
||||
)
|
||||
application = ApplicationFactory.create(portfolio=portfolio, cloud_id="321")
|
||||
user = UserFactory.create(
|
||||
first_name="Han", last_name="Solo", email="han@example.com"
|
||||
)
|
||||
app_role = ApplicationRoleFactory.create(
|
||||
application=application,
|
||||
user=user,
|
||||
status=ApplicationRoleStatus.ACTIVE,
|
||||
cloud_id=None,
|
||||
)
|
||||
class TestCreateUserJob:
|
||||
@pytest.fixture
|
||||
def portfolio(self, app):
|
||||
return PortfolioFactory.create(
|
||||
csp_data={
|
||||
"tenant_id": str(uuid4()),
|
||||
"domain_name": f"rebelalliance.{app.config.get('OFFICE_365_DOMAIN')}",
|
||||
}
|
||||
)
|
||||
|
||||
do_create_user(csp, [app_role.id])
|
||||
session.refresh(app_role)
|
||||
@pytest.fixture
|
||||
def app_1(self, portfolio):
|
||||
return ApplicationFactory.create(portfolio=portfolio, cloud_id="321")
|
||||
|
||||
assert app_role.cloud_id
|
||||
@pytest.fixture
|
||||
def app_2(self, portfolio):
|
||||
return ApplicationFactory.create(portfolio=portfolio, cloud_id="123")
|
||||
|
||||
@pytest.fixture
|
||||
def user(self):
|
||||
return UserFactory.create(
|
||||
first_name="Han", last_name="Solo", email="han@example.com"
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def app_role_1(self, app_1, user):
|
||||
return ApplicationRoleFactory.create(
|
||||
application=app_1,
|
||||
user=user,
|
||||
status=ApplicationRoleStatus.ACTIVE,
|
||||
cloud_id=None,
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def app_role_2(self, app_2, user):
|
||||
return ApplicationRoleFactory.create(
|
||||
application=app_2,
|
||||
user=user,
|
||||
status=ApplicationRoleStatus.ACTIVE,
|
||||
cloud_id=None,
|
||||
)
|
||||
|
||||
def test_create_user_job(self, session, csp, app_role_1):
|
||||
assert not app_role_1.cloud_id
|
||||
|
||||
session.begin_nested()
|
||||
do_create_user(csp, [app_role_1.id])
|
||||
session.rollback()
|
||||
|
||||
assert app_role_1.cloud_id
|
||||
|
||||
def test_create_user_sends_email(self, monkeypatch, csp, app_role_1, app_role_2):
|
||||
mock = Mock()
|
||||
monkeypatch.setattr("atst.jobs.send_mail", mock)
|
||||
|
||||
do_create_user(csp, [app_role_1.id, app_role_2.id])
|
||||
assert mock.call_count == 1
|
||||
|
||||
|
||||
def test_dispatch_create_environment(session, monkeypatch):
|
||||
@@ -380,82 +418,152 @@ def test_create_environment_role():
|
||||
assert env_role.cloud_id == "a-cloud-id"
|
||||
|
||||
|
||||
# TODO: Refactor the tests related to dispatch_send_task_order_files() into a class
|
||||
# and separate the success test into two tests
|
||||
def test_dispatch_send_task_order_files(monkeypatch, app):
|
||||
mock = Mock()
|
||||
monkeypatch.setattr("atst.jobs.send_mail", mock)
|
||||
class TestSendTaskOrderFiles:
|
||||
@pytest.fixture(scope="function")
|
||||
def send_mail(self, monkeypatch):
|
||||
mock = Mock()
|
||||
monkeypatch.setattr("atst.jobs.send_mail", mock)
|
||||
return mock
|
||||
|
||||
def _download_task_order(MockFileService, object_name):
|
||||
return {"name": object_name}
|
||||
@pytest.fixture(scope="function")
|
||||
def download_task_order(self, monkeypatch):
|
||||
def _download_task_order(MockFileService, object_name):
|
||||
return {"name": object_name}
|
||||
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.csp.files.MockFileService.download_task_order",
|
||||
_download_task_order,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.csp.files.MockFileService.download_task_order",
|
||||
_download_task_order,
|
||||
)
|
||||
|
||||
# Create 3 new Task Orders
|
||||
for i in range(3):
|
||||
TaskOrderFactory.create(create_clins=[{"number": "0001"}])
|
||||
def test_sends_multiple_emails(self, send_mail, download_task_order):
|
||||
# Create 3 Task Orders
|
||||
for i in range(3):
|
||||
TaskOrderFactory.create(create_clins=[{"number": "0001"}])
|
||||
|
||||
dispatch_send_task_order_files.run()
|
||||
send_task_order_files.run()
|
||||
|
||||
# Check that send_with_attachment was called once for each task order
|
||||
assert mock.call_count == 3
|
||||
mock.reset_mock()
|
||||
# Check that send_with_attachment was called once for each task order
|
||||
assert send_mail.call_count == 3
|
||||
|
||||
# Create new TO
|
||||
task_order = TaskOrderFactory.create(create_clins=[{"number": "0001"}])
|
||||
assert not task_order.pdf_last_sent_at
|
||||
def test_kwargs(self, send_mail, download_task_order, app):
|
||||
task_order = TaskOrderFactory.create(create_clins=[{"number": "0001"}])
|
||||
send_task_order_files.run()
|
||||
|
||||
dispatch_send_task_order_files.run()
|
||||
# Check that send_with_attachment was called with correct kwargs
|
||||
send_mail.assert_called_once_with(
|
||||
recipients=[app.config.get("MICROSOFT_TASK_ORDER_EMAIL_ADDRESS")],
|
||||
subject=translate(
|
||||
"email.task_order_sent.subject", {"to_number": task_order.number}
|
||||
),
|
||||
body=translate(
|
||||
"email.task_order_sent.body", {"to_number": task_order.number}
|
||||
),
|
||||
attachments=[
|
||||
{
|
||||
"name": task_order.pdf.object_name,
|
||||
"maintype": "application",
|
||||
"subtype": "pdf",
|
||||
}
|
||||
],
|
||||
)
|
||||
assert task_order.pdf_last_sent_at
|
||||
|
||||
# Check that send_with_attachment was called with correct kwargs
|
||||
mock.assert_called_once_with(
|
||||
recipients=[app.config.get("MICROSOFT_TASK_ORDER_EMAIL_ADDRESS")],
|
||||
subject=translate(
|
||||
"email.task_order_sent.subject", {"to_number": task_order.number}
|
||||
),
|
||||
body=translate("email.task_order_sent.body", {"to_number": task_order.number}),
|
||||
attachments=[
|
||||
{
|
||||
"name": task_order.pdf.object_name,
|
||||
"maintype": "application",
|
||||
"subtype": "pdf",
|
||||
}
|
||||
],
|
||||
)
|
||||
def test_send_failure(self, monkeypatch):
|
||||
def _raise_smtp_exception(**kwargs):
|
||||
raise SMTPException
|
||||
|
||||
assert task_order.pdf_last_sent_at
|
||||
monkeypatch.setattr("atst.jobs.send_mail", _raise_smtp_exception)
|
||||
task_order = TaskOrderFactory.create(create_clins=[{"number": "0001"}])
|
||||
send_task_order_files.run()
|
||||
|
||||
# Check that pdf_last_sent_at has not been updated
|
||||
assert not task_order.pdf_last_sent_at
|
||||
|
||||
def test_download_failure(self, send_mail, monkeypatch):
|
||||
def _download_task_order(MockFileService, object_name):
|
||||
raise AzureError("something went wrong")
|
||||
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.csp.files.MockFileService.download_task_order",
|
||||
_download_task_order,
|
||||
)
|
||||
task_order = TaskOrderFactory.create(create_clins=[{"number": "0002"}])
|
||||
send_task_order_files.run()
|
||||
|
||||
# Check that pdf_last_sent_at has not been updated
|
||||
assert not task_order.pdf_last_sent_at
|
||||
|
||||
|
||||
def test_dispatch_send_task_order_files_send_failure(monkeypatch):
|
||||
def _raise_smtp_exception(**kwargs):
|
||||
raise SMTPException
|
||||
class TestCreateBillingInstructions:
|
||||
@pytest.fixture
|
||||
def unsent_clin(self):
|
||||
start_date = pendulum.now().subtract(days=1)
|
||||
portfolio = PortfolioFactory.create(
|
||||
csp_data={
|
||||
"tenant_id": str(uuid4()),
|
||||
"billing_account_name": "fake",
|
||||
"billing_profile_name": "fake",
|
||||
},
|
||||
task_orders=[{"create_clins": [{"start_date": start_date}]}],
|
||||
)
|
||||
return portfolio.task_orders[0].clins[0]
|
||||
|
||||
monkeypatch.setattr("atst.jobs.send_mail", _raise_smtp_exception)
|
||||
def test_update_clin_last_sent_at(self, session, unsent_clin):
|
||||
assert not unsent_clin.last_sent_at
|
||||
|
||||
task_order = TaskOrderFactory.create(create_clins=[{"number": "0001"}])
|
||||
dispatch_send_task_order_files.run()
|
||||
# The session needs to be nested to prevent detached SQLAlchemy instance
|
||||
session.begin_nested()
|
||||
create_billing_instruction()
|
||||
|
||||
# Check that pdf_last_sent_at has not been updated
|
||||
assert not task_order.pdf_last_sent_at
|
||||
# check that last_sent_at has been updated
|
||||
assert unsent_clin.last_sent_at
|
||||
session.rollback()
|
||||
|
||||
def test_failure(self, monkeypatch, session, unsent_clin):
|
||||
def _create_billing_instruction(MockCloudProvider, object_name):
|
||||
raise AzureError("something went wrong")
|
||||
|
||||
def test_dispatch_send_task_order_files_download_failure(monkeypatch):
|
||||
mock = Mock()
|
||||
monkeypatch.setattr("atst.jobs.send_mail", mock)
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.csp.cloud.MockCloudProvider.create_billing_instruction",
|
||||
_create_billing_instruction,
|
||||
)
|
||||
|
||||
def _download_task_order(MockFileService, object_name):
|
||||
raise AzureError("something went wrong")
|
||||
# The session needs to be nested to prevent detached SQLAlchemy instance
|
||||
session.begin_nested()
|
||||
create_billing_instruction()
|
||||
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.csp.files.MockFileService.download_task_order",
|
||||
_download_task_order,
|
||||
)
|
||||
# check that last_sent_at has not been updated
|
||||
assert not unsent_clin.last_sent_at
|
||||
session.rollback()
|
||||
|
||||
task_order = TaskOrderFactory.create(create_clins=[{"number": "0002"}])
|
||||
dispatch_send_task_order_files.run()
|
||||
def test_task_order_with_multiple_clins(self, session):
|
||||
start_date = pendulum.now(tz="UTC").subtract(days=1)
|
||||
portfolio = PortfolioFactory.create(
|
||||
csp_data={
|
||||
"tenant_id": str(uuid4()),
|
||||
"billing_account_name": "fake",
|
||||
"billing_profile_name": "fake",
|
||||
},
|
||||
task_orders=[
|
||||
{
|
||||
"create_clins": [
|
||||
{"start_date": start_date, "last_sent_at": start_date}
|
||||
]
|
||||
}
|
||||
],
|
||||
)
|
||||
task_order = portfolio.task_orders[0]
|
||||
sent_clin = task_order.clins[0]
|
||||
|
||||
# Check that pdf_last_sent_at has not been updated
|
||||
assert not task_order.pdf_last_sent_at
|
||||
# Add new CLIN to the Task Order
|
||||
new_clin = CLINFactory.create(task_order=task_order)
|
||||
assert not new_clin.last_sent_at
|
||||
|
||||
session.begin_nested()
|
||||
create_billing_instruction()
|
||||
session.add(sent_clin)
|
||||
|
||||
# check that last_sent_at has been update for the new clin only
|
||||
assert new_clin.last_sent_at
|
||||
assert sent_clin.last_sent_at != new_clin.last_sent_at
|
||||
session.rollback()
|
||||
|
Reference in New Issue
Block a user