Merge branch 'staging' into environment-role-creation

This commit is contained in:
dandds
2020-02-06 05:34:12 -05:00
46 changed files with 950 additions and 707 deletions

View File

@@ -2,6 +2,8 @@ import json
from unittest.mock import Mock, patch
from uuid import uuid4
import pendulum
import pydantic
import pytest
from tests.factories import ApplicationFactory, EnvironmentFactory
from tests.mock_azure import AUTH_CREDENTIALS, mock_azure
@@ -20,10 +22,16 @@ from atst.domain.csp.cloud.models import (
BillingProfileTenantAccessCSPResult,
BillingProfileVerificationCSPPayload,
BillingProfileVerificationCSPResult,
CostManagementQueryCSPResult,
EnvironmentCSPPayload,
EnvironmentCSPResult,
PrincipalAdminRoleCSPPayload,
PrincipalAdminRoleCSPResult,
ProductPurchaseCSPPayload,
ProductPurchaseCSPResult,
ProductPurchaseVerificationCSPPayload,
ProductPurchaseVerificationCSPResult,
ReportingCSPPayload,
SubscriptionCreationCSPPayload,
SubscriptionCreationCSPResult,
SubscriptionVerificationCSPPayload,
@@ -44,7 +52,10 @@ from atst.domain.csp.cloud.models import (
TenantPrincipalCSPResult,
TenantPrincipalOwnershipCSPPayload,
TenantPrincipalOwnershipCSPResult,
UserCSPPayload,
UserRoleCSPPayload,
)
from atst.domain.csp.cloud.exceptions import UserProvisioningException
BILLING_ACCOUNT_NAME = "52865e4c-52e8-5a6c-da6b-c58f0814f06f:7ea5de9d-b8ce-4901-b1c5-d864320c7b03_2019-05-31"
@@ -57,12 +68,14 @@ def mock_management_group_create(mock_azure, spec_dict):
def test_create_environment_succeeds(mock_azure: AzureCloudProvider):
environment = EnvironmentFactory.create()
mock_management_group_create(mock_azure, {"id": "Test Id"})
result = mock_azure.create_environment(
AUTH_CREDENTIALS, environment.creator, environment
mock_azure = mock_get_secret(mock_azure)
payload = EnvironmentCSPPayload(
tenant_id="1234", display_name=environment.name, parent_id=str(uuid4())
)
result = mock_azure.create_environment(payload)
assert result.id == "Test Id"
@@ -97,20 +110,6 @@ def test_create_application_succeeds(mock_azure: AzureCloudProvider):
assert result.id == "Test Id"
def test_create_atat_admin_user_succeeds(mock_azure: AzureCloudProvider):
environment_id = str(uuid4())
csp_user_id = str(uuid4)
mock_azure.sdk.graphrbac.GraphRbacManagementClient.return_value.service_principals.create.return_value.object_id = (
csp_user_id
)
result = mock_azure.create_atat_admin_user(AUTH_CREDENTIALS, environment_id)
assert result.get("csp_user_id") == csp_user_id
def test_create_policy_definition_succeeds(mock_azure: AzureCloudProvider):
subscription_id = str(uuid4())
management_group_id = str(uuid4())
@@ -162,6 +161,27 @@ def test_create_tenant(mock_azure: AzureCloudProvider):
assert body.tenant_id == "60ff9d34-82bf-4f21-b565-308ef0533435"
def test_create_tenant_fails(mock_azure: AzureCloudProvider):
mock_result = Mock()
mock_result.json.return_value = {"error": "body"}
mock_result.status_code = 403
mock_azure.sdk.requests.post.return_value = mock_result
payload = TenantCSPPayload(
**dict(
user_id="admin",
password="JediJan13$coot", # pragma: allowlist secret
domain_name="jediccpospawnedtenant2",
first_name="Tedry",
last_name="Tenet",
country_code="US",
password_recovery_email_address="thomas@promptworks.com",
)
)
mock_azure = mock_get_secret(mock_azure)
result = mock_azure.create_tenant(payload)
assert result.get("status") == "error"
def test_create_billing_profile_creation(mock_azure: AzureCloudProvider):
mock_azure.sdk.adal.AuthenticationContext.return_value.context.acquire_token_with_client_credentials.return_value = {
"accessToken": "TOKEN"
@@ -573,10 +593,10 @@ def test_create_tenant_principal_credential(mock_azure: AzureCloudProvider):
def test_create_admin_role_definition(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider,
"_get_elevated_management_token",
wraps=mock_azure._get_elevated_management_token,
) as get_elevated_management_token:
get_elevated_management_token.return_value = "my fake token"
"_get_tenant_admin_token",
wraps=mock_azure._get_tenant_admin_token,
) as get_tenant_admin_token:
get_tenant_admin_token.return_value = "my fake token"
mock_result = Mock()
mock_result.ok = True
@@ -657,6 +677,35 @@ def test_create_tenant_principal_ownership(mock_azure: AzureCloudProvider):
assert result.principal_owner_assignment_id == "id"
def test_create_principal_admin_role(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider,
"_get_tenant_admin_token",
wraps=mock_azure._get_tenant_admin_token,
) as get_tenant_admin_token:
get_tenant_admin_token.return_value = "my fake token"
mock_result = Mock()
mock_result.ok = True
mock_result.json.return_value = {"id": "id"}
mock_azure.sdk.requests.post.return_value = mock_result
payload = PrincipalAdminRoleCSPPayload(
**{
"tenant_id": uuid4().hex,
"principal_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
"admin_role_def_id": uuid4().hex,
}
)
result: PrincipalAdminRoleCSPResult = mock_azure.create_principal_admin_role(
payload
)
assert result.principal_assignment_id == "id"
def test_create_subscription_creation(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider,
@@ -718,3 +767,224 @@ def test_create_subscription_verification(mock_azure: AzureCloudProvider):
payload
)
assert result.subscription_id == "60fbbb72-0516-4253-ab18-c92432ba3230"
def test_get_reporting_data(mock_azure: AzureCloudProvider):
mock_result = Mock()
mock_result.json.return_value = {
"eTag": None,
"id": "providers/Microsoft.Billing/billingAccounts/52865e4c-52e8-5a6c-da6b-c58f0814f06f:7ea5de9d-b8ce-4901-b1c5-d864320c7b03_2019-05-31/billingProfiles/XQDJ-6LB4-BG7-TGB/invoiceSections/P73M-XC7J-PJA-TGB/providers/Microsoft.CostManagement/query/e82d0cda-2ffb-4476-a98a-425c83c216f9",
"location": None,
"name": "e82d0cda-2ffb-4476-a98a-425c83c216f9",
"properties": {
"columns": [
{"name": "PreTaxCost", "type": "Number"},
{"name": "UsageDate", "type": "Number"},
{"name": "InvoiceId", "type": "String"},
{"name": "Currency", "type": "String"},
],
"nextLink": None,
"rows": [],
},
"sku": None,
"type": "Microsoft.CostManagement/query",
}
mock_result.ok = True
mock_azure.sdk.requests.post.return_value = mock_result
mock_azure = mock_get_secret(mock_azure)
# Subset of a profile's CSP data that we care about for reporting
csp_data = {
"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
"billing_profile_properties": {
"invoice_sections": [
{
"invoice_section_id": "providers/Microsoft.Billing/billingAccounts/52865e4c-52e8-5a6c-da6b-c58f0814f06f:7ea5de9d-b8ce-4901-b1c5-d864320c7b03_2019-05-31/billingProfiles/XQDJ-6LB4-BG7-TGB/invoiceSections/P73M-XC7J-PJA-TGB",
}
],
},
}
data: CostManagementQueryCSPResult = mock_azure.get_reporting_data(
ReportingCSPPayload(
from_date=pendulum.now().subtract(years=1).add(days=1).format("YYYY-MM-DD"),
to_date=pendulum.now().format("YYYY-MM-DD"),
**csp_data,
)
)
assert isinstance(data, CostManagementQueryCSPResult)
assert data.name == "e82d0cda-2ffb-4476-a98a-425c83c216f9"
assert len(data.properties.columns) == 4
def test_get_reporting_data_malformed_payload(mock_azure: AzureCloudProvider):
mock_result = Mock()
mock_result.ok = True
mock_azure.sdk.requests.post.return_value = mock_result
mock_azure = mock_get_secret(mock_azure)
# Malformed csp_data payloads that should throw pydantic validation errors
index_error = {
"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
"billing_profile_properties": {"invoice_sections": [],},
}
key_error = {
"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
"billing_profile_properties": {"invoice_sections": [{}],},
}
for malformed_payload in [key_error, index_error]:
with pytest.raises(pydantic.ValidationError):
assert mock_azure.get_reporting_data(
ReportingCSPPayload(
from_date="foo", to_date="bar", **malformed_payload,
)
)
def test_get_secret(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider,
"_get_client_secret_credential_obj",
wraps=mock_azure._get_client_secret_credential_obj,
) as _get_client_secret_credential_obj:
_get_client_secret_credential_obj.return_value = {}
mock_azure.sdk.secrets.SecretClient.return_value.get_secret.return_value.value = (
"my secret"
)
assert mock_azure.get_secret("secret key") == "my secret"
def test_set_secret(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider,
"_get_client_secret_credential_obj",
wraps=mock_azure._get_client_secret_credential_obj,
) as _get_client_secret_credential_obj:
_get_client_secret_credential_obj.return_value = {}
mock_azure.sdk.secrets.SecretClient.return_value.set_secret.return_value = (
"my secret"
)
assert mock_azure.set_secret("secret key", "secret_value") == "my secret"
def test_create_active_directory_user(mock_azure: AzureCloudProvider):
mock_result = Mock()
mock_result.ok = True
mock_result.json.return_value = {"id": "id"}
mock_azure.sdk.requests.post.return_value = mock_result
payload = UserCSPPayload(
tenant_id=uuid4().hex,
display_name="Test Testerson",
tenant_host_name="testtenant",
email="test@testerson.test",
password="asdfghjkl", # pragma: allowlist secret
)
result = mock_azure._create_active_directory_user("token", payload)
assert result.id == "id"
def test_update_active_directory_user_email(mock_azure: AzureCloudProvider):
mock_result = Mock()
mock_result.ok = True
mock_azure.sdk.requests.patch.return_value = mock_result
payload = UserCSPPayload(
tenant_id=uuid4().hex,
display_name="Test Testerson",
tenant_host_name="testtenant",
email="test@testerson.test",
password="asdfghjkl", # pragma: allowlist secret
)
result = mock_azure._update_active_directory_user_email(
"token", uuid4().hex, payload
)
assert result
def test_create_user(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider,
"_get_tenant_principal_token",
wraps=mock_azure._get_tenant_principal_token,
) as _get_tenant_principal_token:
_get_tenant_principal_token.return_value = "token"
mock_result_create = Mock()
mock_result_create.ok = True
mock_result_create.json.return_value = {"id": "id"}
mock_azure.sdk.requests.post.return_value = mock_result_create
mock_result_update = Mock()
mock_result_update.ok = True
mock_azure.sdk.requests.patch.return_value = mock_result_update
payload = UserCSPPayload(
tenant_id=uuid4().hex,
display_name="Test Testerson",
tenant_host_name="testtenant",
email="test@testerson.test",
password="asdfghjkl", # pragma: allowlist secret
)
result = mock_azure.create_user(payload)
assert result.id == "id"
def test_create_user_role(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider,
"_get_tenant_principal_token",
wraps=mock_azure._get_tenant_principal_token,
) as _get_tenant_principal_token:
_get_tenant_principal_token.return_value = "token"
mock_result_create = Mock()
mock_result_create.ok = True
mock_result_create.json.return_value = {"id": "id"}
mock_azure.sdk.requests.put.return_value = mock_result_create
payload = UserRoleCSPPayload(
tenant_id=uuid4().hex,
user_object_id=str(uuid4()),
management_group_id=str(uuid4()),
role="owner",
)
result = mock_azure.create_user_role(payload)
assert result.id == "id"
def test_create_user_role_failure(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider,
"_get_tenant_principal_token",
wraps=mock_azure._get_tenant_principal_token,
) as _get_tenant_principal_token:
_get_tenant_principal_token.return_value = "token"
mock_result_create = Mock()
mock_result_create.ok = False
mock_azure.sdk.requests.put.return_value = mock_result_create
payload = UserRoleCSPPayload(
tenant_id=uuid4().hex,
user_object_id=str(uuid4()),
management_group_id=str(uuid4()),
role="owner",
)
with pytest.raises(UserProvisioningException):
mock_azure.create_user_role(payload)