Merge branch 'staging' into to-builder-previous-button

This commit is contained in:
leigh-mil
2020-01-29 15:09:27 -05:00
committed by GitHub
114 changed files with 2845 additions and 859 deletions

View File

@@ -1,11 +1,15 @@
from unittest.mock import Mock
import pytest
import json
from uuid import uuid4
from unittest.mock import Mock, patch
from tests.factories import ApplicationFactory, EnvironmentFactory
from tests.mock_azure import AUTH_CREDENTIALS, mock_azure
from atst.domain.csp.cloud import AzureCloudProvider
from atst.domain.csp.cloud.models import (
ApplicationCSPPayload,
ApplicationCSPResult,
BillingInstructionCSPPayload,
BillingInstructionCSPResult,
BillingProfileCreationCSPPayload,
@@ -65,8 +69,8 @@ def test_create_subscription_succeeds(mock_azure: AzureCloudProvider):
def mock_management_group_create(mock_azure, spec_dict):
mock_azure.sdk.managementgroups.ManagementGroupsAPI.return_value.management_groups.create_or_update.return_value.result.return_value = Mock(
**spec_dict
mock_azure.sdk.managementgroups.ManagementGroupsAPI.return_value.management_groups.create_or_update.return_value.result.return_value = (
spec_dict
)
@@ -82,12 +86,30 @@ def test_create_environment_succeeds(mock_azure: AzureCloudProvider):
assert result.id == "Test Id"
# mock the get_secret so it returns a JSON string
MOCK_CREDS = {
"tenant_id": str(uuid4()),
"tenant_sp_client_id": str(uuid4()),
"tenant_sp_key": "1234",
}
def mock_get_secret(azure, func):
azure.get_secret = func
return azure
def test_create_application_succeeds(mock_azure: AzureCloudProvider):
application = ApplicationFactory.create()
mock_management_group_create(mock_azure, {"id": "Test Id"})
result = mock_azure._create_application(AUTH_CREDENTIALS, application)
mock_azure = mock_get_secret(mock_azure, lambda *a, **k: json.dumps(MOCK_CREDS))
payload = ApplicationCSPPayload(
tenant_id="1234", display_name=application.name, parent_id=str(uuid4())
)
result = mock_azure.create_application(payload)
assert result.id == "Test Id"

View File

@@ -0,0 +1,99 @@
import pytest
from pydantic import ValidationError
from atst.domain.csp.cloud.models import (
AZURE_MGMNT_PATH,
KeyVaultCredentials,
ManagementGroupCSPPayload,
ManagementGroupCSPResponse,
)
def test_ManagementGroupCSPPayload_management_group_name():
# supplies management_group_name when absent
payload = ManagementGroupCSPPayload(
tenant_id="any-old-id",
display_name="Council of Naboo",
parent_id="Galactic_Senate",
)
assert payload.management_group_name
# validates management_group_name
with pytest.raises(ValidationError):
payload = ManagementGroupCSPPayload(
tenant_id="any-old-id",
management_group_name="council of Naboo 1%^&",
display_name="Council of Naboo",
parent_id="Galactic_Senate",
)
# shortens management_group_name to fit
name = "council_of_naboo".ljust(95, "1")
assert len(name) > 90
payload = ManagementGroupCSPPayload(
tenant_id="any-old-id",
management_group_name=name,
display_name="Council of Naboo",
parent_id="Galactic_Senate",
)
assert len(payload.management_group_name) == 90
def test_ManagementGroupCSPPayload_display_name():
# shortens display_name to fit
name = "Council of Naboo".ljust(95, "1")
assert len(name) > 90
payload = ManagementGroupCSPPayload(
tenant_id="any-old-id", display_name=name, parent_id="Galactic_Senate"
)
assert len(payload.display_name) == 90
def test_ManagementGroupCSPPayload_parent_id():
full_path = f"{AZURE_MGMNT_PATH}Galactic_Senate"
# adds full path
payload = ManagementGroupCSPPayload(
tenant_id="any-old-id",
display_name="Council of Naboo",
parent_id="Galactic_Senate",
)
assert payload.parent_id == full_path
# keeps full path
payload = ManagementGroupCSPPayload(
tenant_id="any-old-id", display_name="Council of Naboo", parent_id=full_path
)
assert payload.parent_id == full_path
def test_ManagementGroupCSPResponse_id():
full_id = "/path/to/naboo-123"
response = ManagementGroupCSPResponse(
**{"id": "/path/to/naboo-123", "other": "stuff"}
)
assert response.id == full_id
def test_KeyVaultCredentials_enforce_admin_creds():
with pytest.raises(ValidationError):
KeyVaultCredentials(tenant_id="an id", tenant_admin_username="C3PO")
assert KeyVaultCredentials(
tenant_id="an id",
tenant_admin_username="C3PO",
tenant_admin_password="beep boop",
)
def test_KeyVaultCredentials_enforce_sp_creds():
with pytest.raises(ValidationError):
KeyVaultCredentials(tenant_id="an id", tenant_sp_client_id="C3PO")
assert KeyVaultCredentials(
tenant_id="an id", tenant_sp_client_id="C3PO", tenant_sp_key="beep boop"
)
def test_KeyVaultCredentials_enforce_root_creds():
with pytest.raises(ValidationError):
KeyVaultCredentials(root_tenant_id="an id", root_sp_client_id="C3PO")
assert KeyVaultCredentials(
root_tenant_id="an id", root_sp_client_id="C3PO", root_sp_key="beep boop"
)

View File

@@ -1,3 +1,4 @@
from datetime import datetime, timedelta
import pytest
from uuid import uuid4
@@ -196,3 +197,20 @@ def test_update_does_not_duplicate_names_within_portfolio():
with pytest.raises(AlreadyExistsError):
Applications.update(dupe_application, {"name": name})
def test_get_applications_pending_creation():
now = datetime.now()
later = now + timedelta(minutes=30)
portfolio1 = PortfolioFactory.create(state="COMPLETED")
app_ready = ApplicationFactory.create(portfolio=portfolio1)
app_done = ApplicationFactory.create(portfolio=portfolio1, cloud_id="123456")
portfolio2 = PortfolioFactory.create(state="UNSTARTED")
app_not_ready = ApplicationFactory.create(portfolio=portfolio2)
uuids = Applications.get_applications_pending_creation()
assert [app_ready.id] == uuids

View File

@@ -71,7 +71,7 @@ def test_update_adds_clins():
def test_update_does_not_duplicate_clins():
task_order = TaskOrderFactory.create(
number="3453453456", create_clins=[{"number": "123"}, {"number": "456"}]
number="3453453456123", create_clins=[{"number": "123"}, {"number": "456"}]
)
clins = [
{
@@ -93,7 +93,7 @@ def test_update_does_not_duplicate_clins():
]
task_order = TaskOrders.update(
task_order_id=task_order.id,
number="0000000000",
number="0000000000000",
clins=clins,
pdf={"filename": "sample.pdf", "object_name": "1234567"},
)
@@ -170,3 +170,11 @@ def test_update_enforces_unique_number():
dupe_task_order = TaskOrderFactory.create()
with pytest.raises(AlreadyExistsError):
TaskOrders.update(dupe_task_order.id, task_order.number, [], None)
def test_allows_alphanumeric_number():
portfolio = PortfolioFactory.create()
valid_to_numbers = ["1234567890123", "ABC1234567890"]
for number in valid_to_numbers:
assert TaskOrders.create(portfolio.id, number, [], None)

View File

@@ -7,6 +7,7 @@ import datetime
from atst.forms import data
from atst.models import *
from atst.models.mixins.state_machines import FSMStates
from atst.domain.invitations import PortfolioInvitations
from atst.domain.permission_sets import PermissionSets
@@ -121,6 +122,7 @@ class PortfolioFactory(Base):
owner = kwargs.pop("owner", UserFactory.create())
members = kwargs.pop("members", [])
with_task_orders = kwargs.pop("task_orders", [])
state = kwargs.pop("state", None)
portfolio = super()._create(model_class, *args, **kwargs)
@@ -161,6 +163,12 @@ class PortfolioFactory(Base):
permission_sets=perms_set,
)
if state:
state = getattr(FSMStates, state)
fsm = PortfolioStateMachineFactory.create(state=state, portfolio=portfolio)
# setting it in the factory is not working for some reason
fsm.state = state
portfolio.applications = applications
portfolio.task_orders = task_orders
return portfolio

View File

@@ -112,3 +112,37 @@ def test_no_number():
http_request_form_data = {}
form = TaskOrderForm(http_request_form_data)
assert form.data["number"] is None
def test_number_allows_alphanumeric():
valid_to_numbers = ["1234567890123", "ABC1234567890"]
for number in valid_to_numbers:
form = TaskOrderForm({"number": number})
assert form.validate()
def test_number_allows_between_13_and_17_characters():
valid_to_numbers = ["123456789012345", "ABCDEFG1234567890"]
for number in valid_to_numbers:
form = TaskOrderForm({"number": number})
assert form.validate()
def test_number_strips_dashes():
valid_to_numbers = ["123-456789-012345", "ABCD-EFG12345-67890"]
for number in valid_to_numbers:
form = TaskOrderForm({"number": number})
assert form.validate()
assert not "-" in form.number.data
def test_number_case_coerces_all_caps():
valid_to_numbers = ["12345678012345", "AbcEFg1234567890"]
for number in valid_to_numbers:
form = TaskOrderForm({"number": number})
assert form.validate()
assert form.number.data == number.upper()

View File

@@ -72,6 +72,12 @@ def mock_secrets():
return Mock(spec=secrets)
def mock_identity():
import azure.identity as identity
return Mock(spec=identity)
class MockAzureSDK(object):
def __init__(self):
from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD
@@ -88,6 +94,7 @@ class MockAzureSDK(object):
self.requests = mock_requests()
# may change to a JEDI cloud
self.cloud = AZURE_PUBLIC_CLOUD
self.identity = mock_identity()
@pytest.fixture(scope="function")

View File

@@ -35,6 +35,7 @@ class TaskOrderPdfForm(Form):
class TaskOrderForm(Form):
pdf = FormField(TaskOrderPdfForm, label="task_order_pdf")
number = StringField(label="task_order_number", default="number")
@pytest.fixture
@@ -63,6 +64,12 @@ def multi_checkbox_input_macro(env):
return getattr(multi_checkbox_template.module, "MultiCheckboxInput")
@pytest.fixture
def text_input_macro(env):
text_input_template = env.get_template("components/text_input.html")
return getattr(text_input_template.module, "TextInput")
@pytest.fixture
def initial_value_form(scope="function"):
return InitialValueForm()
@@ -170,3 +177,10 @@ def test_make_pop_date_range(env, app):
index=1,
)
write_template(pop_date_range, "pop_date_range.html")
def test_make_text_input_template(text_input_macro, task_order_form):
text_input_to_number = text_input_macro(
task_order_form.number, validation="taskOrderNumber"
)
write_template(text_input_to_number, "text_input_to_number.html")

View File

@@ -158,7 +158,7 @@ def test_task_orders_form_step_two_add_number(client, user_session, task_order):
def test_task_orders_submit_form_step_two_add_number(client, user_session, task_order):
user_session(task_order.portfolio.owner)
form_data = {"number": "1234567890"}
form_data = {"number": "abc-1234567890"}
response = client.post(
url_for(
"task_orders.submit_form_step_two_add_number", task_order_id=task_order.id
@@ -167,7 +167,7 @@ def test_task_orders_submit_form_step_two_add_number(client, user_session, task_
)
assert response.status_code == 302
assert task_order.number == "1234567890"
assert task_order.number == "ABC1234567890" # pragma: allowlist secret
def test_task_orders_submit_form_step_two_enforces_unique_number(
@@ -194,7 +194,7 @@ def test_task_orders_submit_form_step_two_add_number_existing_to(
client, user_session, task_order
):
user_session(task_order.portfolio.owner)
form_data = {"number": "0000000000"}
form_data = {"number": "0000000000000"}
original_number = task_order.number
response = client.post(
url_for(
@@ -203,7 +203,7 @@ def test_task_orders_submit_form_step_two_add_number_existing_to(
data=form_data,
)
assert response.status_code == 302
assert task_order.number == "0000000000"
assert task_order.number == "0000000000000"
assert task_order.number != original_number

View File

@@ -663,7 +663,7 @@ def test_task_orders_new_get_routes(get_url_assert_status):
def test_task_orders_new_post_routes(post_url_assert_status):
post_routes = [
("task_orders.submit_form_step_one_add_pdf", {"pdf": ""}),
("task_orders.submit_form_step_two_add_number", {"number": "1234567890"}),
("task_orders.submit_form_step_two_add_number", {"number": "1234567890123"}),
(
"task_orders.submit_form_step_three_add_clins",
{

View File

@@ -8,9 +8,9 @@ from atst.domain.csp.cloud import MockCloudProvider
from atst.domain.portfolios import Portfolios
from atst.jobs import (
RecordEnvironmentFailure,
RecordEnvironmentRoleFailure,
RecordFailure,
dispatch_create_environment,
dispatch_create_application,
dispatch_create_atat_admin_user,
dispatch_provision_portfolio,
dispatch_provision_user,
@@ -18,6 +18,7 @@ from atst.jobs import (
do_provision_user,
do_provision_portfolio,
do_create_environment,
do_create_application,
do_create_atat_admin_user,
)
from atst.models.utils import claim_for_update
@@ -27,9 +28,10 @@ from tests.factories import (
EnvironmentRoleFactory,
PortfolioFactory,
PortfolioStateMachineFactory,
ApplicationFactory,
ApplicationRoleFactory,
)
from atst.models import CSPRole, EnvironmentRole, ApplicationRoleStatus
from atst.models import CSPRole, EnvironmentRole, ApplicationRoleStatus, JobFailure
@pytest.fixture(autouse=True, scope="function")
@@ -43,8 +45,17 @@ def portfolio():
return portfolio
def test_environment_job_failure(celery_app, celery_worker):
@celery_app.task(bind=True, base=RecordEnvironmentFailure)
def _find_failure(session, entity, id_):
return (
session.query(JobFailure)
.filter(JobFailure.entity == entity)
.filter(JobFailure.entity_id == id_)
.one()
)
def test_environment_job_failure(session, celery_app, celery_worker):
@celery_app.task(bind=True, base=RecordFailure)
def _fail_hard(self, environment_id=None):
raise ValueError("something bad happened")
@@ -56,13 +67,12 @@ def test_environment_job_failure(celery_app, celery_worker):
with pytest.raises(ValueError):
task.get()
assert environment.job_failures
job_failure = environment.job_failures[0]
job_failure = _find_failure(session, "environment", str(environment.id))
assert job_failure.task == task
def test_environment_role_job_failure(celery_app, celery_worker):
@celery_app.task(bind=True, base=RecordEnvironmentRoleFailure)
def test_environment_role_job_failure(session, celery_app, celery_worker):
@celery_app.task(bind=True, base=RecordFailure)
def _fail_hard(self, environment_role_id=None):
raise ValueError("something bad happened")
@@ -74,8 +84,7 @@ def test_environment_role_job_failure(celery_app, celery_worker):
with pytest.raises(ValueError):
task.get()
assert role.job_failures
job_failure = role.job_failures[0]
job_failure = _find_failure(session, "environment_role", str(role.id))
assert job_failure.task == task
@@ -99,6 +108,24 @@ def test_create_environment_job_is_idempotent(csp, session):
csp.create_environment.assert_not_called()
def test_create_application_job(session, csp):
portfolio = PortfolioFactory.create(
csp_data={"tenant_id": str(uuid4()), "root_management_group_id": str(uuid4())}
)
application = ApplicationFactory.create(portfolio=portfolio, cloud_id=None)
do_create_application(csp, application.id)
session.refresh(application)
assert application.cloud_id
def test_create_application_job_is_idempotent(csp):
application = ApplicationFactory.create(cloud_id=uuid4())
do_create_application(csp, application.id)
csp.create_application.assert_not_called()
def test_create_atat_admin_user(csp, session):
environment = EnvironmentFactory.create(cloud_id="something")
do_create_atat_admin_user(csp, environment.id)
@@ -139,6 +166,21 @@ def test_dispatch_create_environment(session, monkeypatch):
mock.delay.assert_called_once_with(environment_id=e1.id)
def test_dispatch_create_application(monkeypatch):
portfolio = PortfolioFactory.create(state="COMPLETED")
app = ApplicationFactory.create(portfolio=portfolio)
mock = Mock()
monkeypatch.setattr("atst.jobs.create_application", mock)
# When dispatch_create_application is called
dispatch_create_application.run()
# It should cause the create_application task to be called once
# with the application id
mock.delay.assert_called_once_with(application_id=app.id)
def test_dispatch_create_atat_admin_user(session, monkeypatch):
portfolio = PortfolioFactory.create(
applications=[

16
tests/utils/test_hash.py Normal file
View File

@@ -0,0 +1,16 @@
import random
import re
import string
from atst.utils import sha256_hex
def test_sha256_hex():
sample = "".join(
random.choices(string.ascii_uppercase + string.digits, k=random.randrange(200))
)
hashed = sha256_hex(sample)
assert re.match("^[a-zA-Z0-9]+$", hashed)
assert len(hashed) == 64
hashed_again = sha256_hex(sample)
assert hashed == hashed_again