resolve merge conflict

This commit is contained in:
2020-02-11 15:40:15 -05:00
201 changed files with 1667 additions and 3959 deletions

View File

@@ -1,58 +1,47 @@
from atst.domain.csp.reports import MockReportingProvider
from atst.domain.csp.reports import prepare_azure_reporting_data
from tests.factories import PortfolioFactory
from decimal import Decimal
import pendulum
def test_get_environment_monthly_totals():
environment = {
"name": "Test Environment",
"spending": {
"this_month": {"JEDI_CLIN_1": 100, "JEDI_CLIN_2": 100},
"last_month": {"JEDI_CLIN_1": 200, "JEDI_CLIN_2": 200},
"total": {"JEDI_CLIN_1": 1000, "JEDI_CLIN_2": 1000},
},
}
totals = MockReportingProvider._get_environment_monthly_totals(environment)
assert totals == {
"name": "Test Environment",
"this_month": 200,
"last_month": 400,
"total": 2000,
}
class TestPrepareAzureData:
start_of_month = pendulum.today(tz="utc").start_of("month").replace(tzinfo=None)
next_month = start_of_month.add(months=1).to_atom_string()
this_month = start_of_month.to_atom_string()
last_month = start_of_month.subtract(months=1).to_atom_string()
two_months_ago = last_month = start_of_month.subtract(months=2).to_atom_string()
def test_estimated_and_invoiced(self):
rows = [
[150.0, self.two_months_ago, "", "USD"],
[100.0, self.last_month, "e0500a4qhw", "USD"],
[50.0, self.this_month, "", "USD"],
[50.0, self.next_month, "", "USD"],
]
output = prepare_azure_reporting_data(rows)
def test_get_application_monthly_totals():
portfolio = PortfolioFactory.create(
applications=[
{"name": "Test Application", "environments": [{"name": "Z"}, {"name": "A"}]}
],
)
application = {
"name": "Test Application",
"environments": [
{
"name": "Z",
"spending": {
"this_month": {"JEDI_CLIN_1": 50, "JEDI_CLIN_2": 50},
"last_month": {"JEDI_CLIN_1": 150, "JEDI_CLIN_2": 150},
"total": {"JEDI_CLIN_1": 250, "JEDI_CLIN_2": 250},
},
},
{
"name": "A",
"spending": {
"this_month": {"JEDI_CLIN_1": 100, "JEDI_CLIN_2": 100},
"last_month": {"JEDI_CLIN_1": 200, "JEDI_CLIN_2": 200},
"total": {"JEDI_CLIN_1": 1000, "JEDI_CLIN_2": 1000},
},
},
],
}
assert output.get("invoiced") == Decimal(250.0)
assert output.get("estimated") == Decimal(100.0)
totals = MockReportingProvider._get_application_monthly_totals(
portfolio, application
)
assert totals["name"] == "Test Application"
assert totals["this_month"] == 300
assert totals["last_month"] == 700
assert totals["total"] == 2500
assert [env["name"] for env in totals["environments"]] == ["A", "Z"]
def test_just_estimated(self):
rows = [
[100.0, self.this_month, "", "USD"],
]
output = prepare_azure_reporting_data(rows)
assert output.get("invoiced") == Decimal(0.0)
assert output.get("estimated") == Decimal(100.0)
def test_just_invoiced(self):
rows = [
[100.0, self.last_month, "", "USD"],
]
output = prepare_azure_reporting_data(rows)
assert output.get("invoiced") == Decimal(100.0)
assert output.get("estimated") == Decimal(0.0)
def test_no_rows(self):
output = prepare_azure_reporting_data([])
assert output.get("invoiced") == Decimal(0.0)
assert output.get("estimated") == Decimal(0.0)

View File

@@ -29,9 +29,14 @@ from atst.domain.csp.cloud.models import (
BillingProfileTenantAccessCSPResult,
BillingProfileVerificationCSPPayload,
BillingProfileVerificationCSPResult,
InitialMgmtGroupCSPPayload,
InitialMgmtGroupCSPResult,
InitialMgmtGroupVerificationCSPPayload,
InitialMgmtGroupVerificationCSPResult,
CostManagementQueryCSPResult,
EnvironmentCSPPayload,
EnvironmentCSPResult,
KeyVaultCredentials,
PrincipalAdminRoleCSPPayload,
PrincipalAdminRoleCSPResult,
ProductPurchaseCSPPayload,
@@ -60,7 +65,9 @@ from atst.domain.csp.cloud.models import (
TenantPrincipalOwnershipCSPPayload,
TenantPrincipalOwnershipCSPResult,
UserCSPPayload,
UserRoleCSPPayload,
)
from atst.domain.csp.cloud.exceptions import UserProvisioningException
BILLING_ACCOUNT_NAME = "52865e4c-52e8-5a6c-da6b-c58f0814f06f:7ea5de9d-b8ce-4901-b1c5-d864320c7b03_2019-05-31"
@@ -71,6 +78,12 @@ def mock_management_group_create(mock_azure, spec_dict):
)
def mock_management_group_get(mock_azure, spec_dict):
mock_azure.sdk.managementgroups.ManagementGroupsAPI.return_value.management_groups.get.return_value.result.return_value = (
spec_dict
)
def test_create_environment_succeeds(mock_azure: AzureCloudProvider):
environment = EnvironmentFactory.create()
mock_management_group_create(mock_azure, {"id": "Test Id"})
@@ -115,6 +128,41 @@ def test_create_application_succeeds(mock_azure: AzureCloudProvider):
assert result.id == "Test Id"
def test_create_initial_mgmt_group_succeeds(mock_azure: AzureCloudProvider):
application = ApplicationFactory.create()
mock_management_group_create(mock_azure, {"id": "Test Id"})
mock_azure = mock_get_secret(mock_azure)
payload = InitialMgmtGroupCSPPayload(
tenant_id="1234",
display_name=application.name,
management_group_name=str(uuid4()),
)
result: InitialMgmtGroupCSPResult = mock_azure.create_initial_mgmt_group(payload)
assert result.id == "Test Id"
def test_create_initial_mgmt_group_verification_succeeds(
mock_azure: AzureCloudProvider,
):
application = ApplicationFactory.create()
mock_management_group_get(mock_azure, {"id": "Test Id"})
mock_azure = mock_get_secret(mock_azure)
management_group_name = str(uuid4())
payload = InitialMgmtGroupVerificationCSPPayload(
tenant_id="1234", management_group_name=management_group_name
)
result: InitialMgmtGroupVerificationCSPResult = mock_azure.create_initial_mgmt_group_verification(
payload
)
assert result.id == "Test Id"
# assert result.name == management_group_name
def test_create_policy_definition_succeeds(mock_azure: AzureCloudProvider):
subscription_id = str(uuid4())
management_group_id = str(uuid4())
@@ -1178,3 +1226,71 @@ def test_create_user(mock_azure: AzureCloudProvider):
result = mock_azure.create_user(payload)
assert result.id == "id"
def test_create_user_role(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider,
"_get_tenant_principal_token",
wraps=mock_azure._get_tenant_principal_token,
) as _get_tenant_principal_token:
_get_tenant_principal_token.return_value = "token"
mock_result_create = Mock()
mock_result_create.ok = True
mock_result_create.json.return_value = {"id": "id"}
mock_azure.sdk.requests.put.return_value = mock_result_create
payload = UserRoleCSPPayload(
tenant_id=uuid4().hex,
user_object_id=str(uuid4()),
management_group_id=str(uuid4()),
role="owner",
)
result = mock_azure.create_user_role(payload)
assert result.id == "id"
def test_create_user_role_failure(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider,
"_get_tenant_principal_token",
wraps=mock_azure._get_tenant_principal_token,
) as _get_tenant_principal_token:
_get_tenant_principal_token.return_value = "token"
mock_result_create = Mock()
mock_result_create.ok = False
mock_azure.sdk.requests.put.return_value = mock_result_create
payload = UserRoleCSPPayload(
tenant_id=uuid4().hex,
user_object_id=str(uuid4()),
management_group_id=str(uuid4()),
role="owner",
)
with pytest.raises(UserProvisioningException):
mock_azure.create_user_role(payload)
def test_update_tenant_creds(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider, "set_secret", wraps=mock_azure.set_secret,
) as set_secret:
set_secret.return_value = None
existing_secrets = {
"tenant_id": "mytenant",
"tenant_admin_username": "admin",
"tenant_admin_password": "foo", # pragma: allowlist secret
}
mock_azure = mock_get_secret(mock_azure, json.dumps(existing_secrets))
mock_new_secrets = KeyVaultCredentials(**MOCK_CREDS)
updated_secret = mock_azure.update_tenant_creds("mytenant", mock_new_secrets)
assert updated_secret == KeyVaultCredentials(
**{**existing_secrets, **MOCK_CREDS}
)

View File

@@ -100,6 +100,26 @@ def test_KeyVaultCredentials_enforce_root_creds():
)
def test_KeyVaultCredentials_merge_credentials():
old_secret = KeyVaultCredentials(
tenant_id="foo",
tenant_admin_username="bar",
tenant_admin_password="baz", # pragma: allowlist secret
)
new_secret = KeyVaultCredentials(
tenant_id="foo", tenant_sp_client_id="bip", tenant_sp_key="bop"
)
expected_update = KeyVaultCredentials(
tenant_id="foo",
tenant_admin_username="bar",
tenant_admin_password="baz", # pragma: allowlist secret
tenant_sp_client_id="bip",
tenant_sp_key="bop",
)
assert old_secret.merge_credentials(new_secret) == expected_update
user_payload = {
"tenant_id": "123",
"display_name": "Han Solo",

View File

@@ -1,7 +1,7 @@
import pytest
from atst.domain.environment_roles import EnvironmentRoles
from atst.models.environment_role import EnvironmentRole
from atst.models import EnvironmentRole, ApplicationRoleStatus
from tests.factories import *
@@ -113,14 +113,14 @@ def test_disable_checks_env_role_provisioning_status():
environment = EnvironmentFactory.create(cloud_id="cloud-id")
environment.application.portfolio.csp_data = {"tenant_id": uuid4().hex}
env_role1 = EnvironmentRoleFactory.create(environment=environment)
assert not env_role1.csp_user_id
assert not env_role1.cloud_id
env_role1 = EnvironmentRoles.disable(env_role1.id)
assert env_role1.disabled
env_role2 = EnvironmentRoleFactory.create(
environment=environment, csp_user_id="123456"
environment=environment, cloud_id="123456"
)
assert env_role2.csp_user_id
assert env_role2.cloud_id
env_role2 = EnvironmentRoles.disable(env_role2.id)
assert env_role2.disabled
@@ -159,3 +159,34 @@ def test_for_user(application_role):
assert len(env_roles) == 3
assert env_roles == [env_role_1, env_role_2, env_role_3]
assert not rando_env_role in env_roles
class TestPendingCreation:
def test_pending_role(self):
appr = ApplicationRoleFactory.create(cloud_id="123")
envr = EnvironmentRoleFactory.create(application_role=appr)
assert EnvironmentRoles.get_pending_creation() == [envr.id]
def test_deleted_role(self):
appr = ApplicationRoleFactory.create(cloud_id="123")
envr = EnvironmentRoleFactory.create(application_role=appr, deleted=True)
assert EnvironmentRoles.get_pending_creation() == []
def test_not_ready_role(self):
appr = ApplicationRoleFactory.create(cloud_id=None)
envr = EnvironmentRoleFactory.create(application_role=appr)
assert EnvironmentRoles.get_pending_creation() == []
def test_disabled_app_role(self):
appr = ApplicationRoleFactory.create(
cloud_id="123", status=ApplicationRoleStatus.DISABLED
)
envr = EnvironmentRoleFactory.create(application_role=appr)
assert EnvironmentRoles.get_pending_creation() == []
def test_disabled_env_role(self):
appr = ApplicationRoleFactory.create(cloud_id="123")
envr = EnvironmentRoleFactory.create(
application_role=appr, status=EnvironmentRole.Status.DISABLED
)
assert EnvironmentRoles.get_pending_creation() == []

View File

@@ -213,6 +213,8 @@ def test_fsm_transition_start(mock_cloud_provider, portfolio: Portfolio):
FSMStates.TENANT_PRINCIPAL_CREDENTIAL_CREATED,
FSMStates.ADMIN_ROLE_DEFINITION_CREATED,
FSMStates.PRINCIPAL_ADMIN_ROLE_CREATED,
FSMStates.INITIAL_MGMT_GROUP_CREATED,
FSMStates.INITIAL_MGMT_GROUP_VERIFICATION_CREATED,
FSMStates.TENANT_ADMIN_OWNERSHIP_CREATED,
FSMStates.TENANT_PRINCIPAL_OWNERSHIP_CREATED,
]
@@ -233,10 +235,12 @@ def test_fsm_transition_start(mock_cloud_provider, portfolio: Portfolio):
"user_id": user_id,
"password": "jklfsdNCVD83nklds2#202", # pragma: allowlist secret
"domain_name": domain_name,
"display_name": "mgmt group display name",
"management_group_name": "mgmt-group-uuid",
"first_name": ppoc.first_name,
"last_name": ppoc.last_name,
"country_code": "US",
"password_recovery_email_address": ppoc.email,
"password_recovery_email_address": "email@example.com", # ppoc.email,
"address": { # TODO: TBD if we're sourcing this from data or config
"company_name": "",
"address_line_1": "",

View File

@@ -1,8 +1,31 @@
# TODO: Implement when we get real reporting data
def test_expired_task_orders():
pass
import pytest
from atst.domain.reports import Reports
from tests.factories import PortfolioFactory
from decimal import Decimal
# TODO: Implement when we get real reporting data
def test_obligated_funds_by_JEDI_clin():
pass
@pytest.fixture(scope="function")
def portfolio():
portfolio = PortfolioFactory.create()
return portfolio
class TestGetPortfolioSpending:
csp_data = {
"tenant_id": "",
"billing_profile_properties": {
"invoice_sections": [{"invoice_section_id": "",}]
},
}
def test_with_csp_data(self, portfolio):
portfolio.csp_data = self.csp_data
data = Reports.get_portfolio_spending(portfolio)
assert data["invoiced"] == Decimal(1551.0)
assert data["estimated"] == Decimal(500.0)
def test_without_csp_data(self, portfolio):
data = Reports.get_portfolio_spending(portfolio)
assert data["invoiced"] == Decimal(0)
assert data["estimated"] == Decimal(0)

View File

@@ -1,5 +1,5 @@
import pytest
from datetime import date, timedelta
from datetime import date, datetime, timedelta
from decimal import Decimal
from atst.domain.exceptions import AlreadyExistsError
@@ -149,11 +149,12 @@ def test_task_order_sort_by_status():
]
sorted_by_status = TaskOrders.sort_by_status(initial_to_list)
assert len(sorted_by_status["Draft"]) == 3
assert len(sorted_by_status["Draft"]) == 4
assert len(sorted_by_status["Active"]) == 1
assert len(sorted_by_status["Upcoming"]) == 1
assert len(sorted_by_status["Expired"]) == 2
assert len(sorted_by_status["Unsigned"]) == 1
with pytest.raises(KeyError):
sorted_by_status["Unsigned"]
assert list(sorted_by_status.keys()) == [status.value for status in SORT_ORDERING]
@@ -178,3 +179,21 @@ def test_allows_alphanumeric_number():
for number in valid_to_numbers:
assert TaskOrders.create(portfolio.id, number, [], None)
def test_get_for_send_task_order_files():
new_to = TaskOrderFactory.create(create_clins=[{}])
updated_to = TaskOrderFactory.create(
create_clins=[{"last_sent_at": datetime(2020, 2, 1)}],
pdf_last_sent_at=datetime(2020, 1, 1),
)
sent_to = TaskOrderFactory.create(
create_clins=[{"last_sent_at": datetime(2020, 1, 1)}],
pdf_last_sent_at=datetime(2020, 1, 1),
)
updated_and_new_task_orders = TaskOrders.get_for_send_task_order_files()
assert len(updated_and_new_task_orders) == 2
assert sent_to not in updated_and_new_task_orders
assert updated_to in updated_and_new_task_orders
assert new_to in updated_and_new_task_orders

View File

@@ -322,6 +322,7 @@ class TaskOrderFactory(Base):
number = factory.LazyFunction(random_task_order_number)
signed_at = None
_pdf = factory.SubFactory(AttachmentFactory)
pdf_last_sent_at = None
@classmethod
def _create(cls, model_class, *args, **kwargs):
@@ -347,6 +348,7 @@ class CLINFactory(Base):
jedi_clin_type = factory.LazyFunction(
lambda *args: random.choice(list(clin.JEDICLINType))
)
last_sent_at = None
class NotificationRecipientFactory(Base):

View File

@@ -9,8 +9,10 @@ AZURE_CONFIG = {
"AZURE_TENANT_ID": "MOCK",
"AZURE_POLICY_LOCATION": "policies",
"AZURE_VAULT_URL": "http://vault",
"POWERSHELL_CLIENT_ID": "MOCK",
"AZURE_OWNER_ROLE_DEF_ID": "MOCK",
"AZURE_POWERSHELL_CLIENT_ID": "MOCK",
"AZURE_ROLE_DEF_ID_OWNER": "MOCK",
"AZURE_ROLE_DEF_ID_CONTRIBUTOR": "MOCK",
"AZURE_ROLE_DEF_ID_BILLING_READER": "MOCK",
"AZURE_GRAPH_RESOURCE": "MOCK",
"AZURE_AADP_QTY": 5,
}

View File

@@ -7,6 +7,51 @@ from tests.factories import (
random_past_date,
)
import datetime
import pendulum
from decimal import Decimal
import pytest
@pytest.fixture(scope="function")
def upcoming_task_order():
return dict(
signed_at=pendulum.today().subtract(days=3),
create_clins=[
dict(
start_date=pendulum.today().add(days=2),
end_date=pendulum.today().add(days=3),
obligated_amount=Decimal(700.0),
)
],
)
@pytest.fixture(scope="function")
def current_task_order():
return dict(
signed_at=pendulum.today().subtract(days=3),
create_clins=[
dict(
start_date=pendulum.today().subtract(days=1),
end_date=pendulum.today().add(days=1),
obligated_amount=Decimal(1000.0),
)
],
)
@pytest.fixture(scope="function")
def past_task_order():
return dict(
signed_at=pendulum.today().subtract(days=3),
create_clins=[
dict(
start_date=pendulum.today().subtract(days=3),
end_date=pendulum.today().subtract(days=2),
obligated_amount=Decimal(500.0),
)
],
)
def test_portfolio_applications_excludes_deleted():
@@ -85,3 +130,53 @@ def test_active_task_orders(session):
portfolio=portfolio, signed_at=random_past_date(), clins=[CLINFactory.create()]
)
assert len(portfolio.active_task_orders) == 1
class TestCurrentObligatedFunds:
"""
Tests the current_obligated_funds property
"""
def test_no_task_orders(self):
portfolio = PortfolioFactory()
assert portfolio.total_obligated_funds == Decimal(0)
def test_with_current(self, current_task_order):
portfolio = PortfolioFactory(
task_orders=[current_task_order, current_task_order]
)
assert portfolio.total_obligated_funds == Decimal(2000.0)
def test_with_others(
self, past_task_order, current_task_order, upcoming_task_order
):
portfolio = PortfolioFactory(
task_orders=[past_task_order, current_task_order, upcoming_task_order,]
)
# Only sums the current task order
assert portfolio.total_obligated_funds == Decimal(1000.0)
class TestUpcomingObligatedFunds:
"""
Tests the upcoming_obligated_funds property
"""
def test_no_task_orders(self):
portfolio = PortfolioFactory()
assert portfolio.upcoming_obligated_funds == Decimal(0)
def test_with_upcoming(self, upcoming_task_order):
portfolio = PortfolioFactory(
task_orders=[upcoming_task_order, upcoming_task_order]
)
assert portfolio.upcoming_obligated_funds == Decimal(1400.0)
def test_with_others(
self, past_task_order, current_task_order, upcoming_task_order
):
portfolio = PortfolioFactory(
task_orders=[past_task_order, current_task_order, upcoming_task_order]
)
# Only sums the upcoming task order
assert portfolio.upcoming_obligated_funds == Decimal(700.0)

View File

@@ -7,6 +7,7 @@ from atst.app import (
make_crl_validator,
apply_config_from_directory,
apply_config_from_environment,
make_config,
)
@@ -67,3 +68,18 @@ def test_apply_config_from_environment_skips_unknown_settings(
monkeypatch.setenv("FLARF", "MAYO")
apply_config_from_environment(config_object)
assert "FLARF" not in config_object.options("default")
class TestMakeConfig:
def test_redis_ssl_connection(self):
config = make_config({"REDIS_TLS": True})
uri = config.get("REDIS_URI")
assert "rediss" in uri
assert "ssl_cert_reqs" in uri
def test_non_redis_ssl_connection(self):
config = make_config({"REDIS_TLS": False})
uri = config.get("REDIS_URI")
assert "rediss" not in uri
assert "redis" in uri
assert "ssl_cert_reqs" not in uri

View File

@@ -1,9 +1,12 @@
import pendulum
import pytest
from uuid import uuid4
from unittest.mock import Mock
from unittest.mock import Mock, MagicMock
from smtplib import SMTPException
from azure.core.exceptions import AzureError
from atst.domain.csp.cloud import MockCloudProvider
from atst.domain.csp.cloud.models import UserRoleCSPResult
from atst.domain.portfolios import Portfolios
from atst.models import ApplicationRoleStatus
@@ -12,23 +15,28 @@ from atst.jobs import (
dispatch_create_environment,
dispatch_create_application,
dispatch_create_user,
dispatch_create_environment_role,
dispatch_provision_portfolio,
dispatch_send_task_order_files,
create_environment,
do_create_user,
do_provision_portfolio,
do_create_environment,
do_create_environment_role,
do_create_application,
)
from tests.factories import (
ApplicationFactory,
ApplicationRoleFactory,
EnvironmentFactory,
EnvironmentRoleFactory,
PortfolioFactory,
PortfolioStateMachineFactory,
ApplicationFactory,
ApplicationRoleFactory,
TaskOrderFactory,
UserFactory,
)
from atst.models import CSPRole, EnvironmentRole, ApplicationRoleStatus, JobFailure
from atst.utils.localization import translate
@pytest.fixture(autouse=True, scope="function")
@@ -287,3 +295,120 @@ def test_provision_portfolio_create_tenant(
# monkeypatch.setattr("atst.jobs.provision_portfolio", mock)
# dispatch_provision_portfolio.run()
# mock.delay.assert_called_once_with(portfolio_id=portfolio.id)
def test_dispatch_create_environment_role(monkeypatch):
portfolio = PortfolioFactory.create(csp_data={"tenant_id": "123"})
app_role = ApplicationRoleFactory.create(
application=ApplicationFactory.create(portfolio=portfolio),
status=ApplicationRoleStatus.ACTIVE,
cloud_id="123",
)
env_role = EnvironmentRoleFactory.create(application_role=app_role)
mock = Mock()
monkeypatch.setattr("atst.jobs.create_environment_role", mock)
dispatch_create_environment_role.run()
mock.delay.assert_called_once_with(environment_role_id=env_role.id)
def test_create_environment_role():
portfolio = PortfolioFactory.create(csp_data={"tenant_id": "123"})
app = ApplicationFactory.create(portfolio=portfolio)
app_role = ApplicationRoleFactory.create(
application=app, status=ApplicationRoleStatus.ACTIVE, cloud_id="123",
)
env = EnvironmentFactory.create(application=app, cloud_id="123")
env_role = EnvironmentRoleFactory.create(
environment=env, application_role=app_role, cloud_id=None
)
csp = Mock()
result = UserRoleCSPResult(id="a-cloud-id")
csp.create_user_role = MagicMock(return_value=result)
do_create_environment_role(csp, environment_role_id=env_role.id)
assert env_role.cloud_id == "a-cloud-id"
# TODO: Refactor the tests related to dispatch_send_task_order_files() into a class
# and separate the success test into two tests
def test_dispatch_send_task_order_files(monkeypatch, app):
mock = Mock()
monkeypatch.setattr("atst.jobs.send_mail", mock)
def _download_task_order(MockFileService, object_name):
return {"name": object_name}
monkeypatch.setattr(
"atst.domain.csp.files.MockFileService.download_task_order",
_download_task_order,
)
# Create 3 new Task Orders
for i in range(3):
TaskOrderFactory.create(create_clins=[{"number": "0001"}])
dispatch_send_task_order_files.run()
# Check that send_with_attachment was called once for each task order
assert mock.call_count == 3
mock.reset_mock()
# Create new TO
task_order = TaskOrderFactory.create(create_clins=[{"number": "0001"}])
assert not task_order.pdf_last_sent_at
dispatch_send_task_order_files.run()
# Check that send_with_attachment was called with correct kwargs
mock.assert_called_once_with(
recipients=[app.config.get("MICROSOFT_TASK_ORDER_EMAIL_ADDRESS")],
subject=translate(
"email.task_order_sent.subject", {"to_number": task_order.number}
),
body=translate("email.task_order_sent.body", {"to_number": task_order.number}),
attachments=[
{
"name": task_order.pdf.object_name,
"maintype": "application",
"subtype": "pdf",
}
],
)
assert task_order.pdf_last_sent_at
def test_dispatch_send_task_order_files_send_failure(monkeypatch):
def _raise_smtp_exception(**kwargs):
raise SMTPException
monkeypatch.setattr("atst.jobs.send_mail", _raise_smtp_exception)
task_order = TaskOrderFactory.create(create_clins=[{"number": "0001"}])
dispatch_send_task_order_files.run()
# Check that pdf_last_sent_at has not been updated
assert not task_order.pdf_last_sent_at
def test_dispatch_send_task_order_files_download_failure(monkeypatch):
mock = Mock()
monkeypatch.setattr("atst.jobs.send_mail", mock)
def _download_task_order(MockFileService, object_name):
raise AzureError("something went wrong")
monkeypatch.setattr(
"atst.domain.csp.files.MockFileService.download_task_order",
_download_task_order,
)
task_order = TaskOrderFactory.create(create_clins=[{"number": "0002"}])
dispatch_send_task_order_files.run()
# Check that pdf_last_sent_at has not been updated
assert not task_order.pdf_last_sent_at