Once the Cloud Service Provider is selected, this link will take you to the CSP's {{ text }}!
-diff --git a/.secrets.baseline b/.secrets.baseline
index 258ea89e..e343eb4f 100644
--- a/.secrets.baseline
+++ b/.secrets.baseline
@@ -3,7 +3,7 @@
"files": "^.secrets.baseline$|^.*pgsslrootcert.yml$",
"lines": null
},
- "generated_at": "2020-01-19T20:21:20Z",
+ "generated_at": "2020-01-29T16:40:16Z",
"plugins_used": [
{
"base64_limit": 4.5,
@@ -145,7 +145,7 @@
"hashed_secret": "e4f14805dfd1e6af030359090c535e149e6b4207",
"is_secret": false,
"is_verified": false,
- "line_number": 649,
+ "line_number": 647,
"type": "Hex High Entropy String"
}
]
diff --git a/README.md b/README.md
index 2681346e..d846d486 100644
--- a/README.md
+++ b/README.md
@@ -257,6 +257,7 @@ To generate coverage reports for the Javascript tests:
- `SESSION_COOKIE_DOMAIN`: String value specifying the name to use for the session cookie. This should be set to the root domain so that it is valid for both the main site and the authentication subdomain. https://flask.palletsprojects.com/en/1.1.x/config/#SESSION_COOKIE_DOMAIN
- `SESSION_KEY_PREFIX`: A prefix that is added before all session keys: https://pythonhosted.org/Flask-Session/#configuration
- `SESSION_TYPE`: String value specifying the cookie storage backend. https://pythonhosted.org/Flask-Session/
+- `SESSION_COOKIE_SECURE`: https://flask.palletsprojects.com/en/1.1.x/config/#SESSION_COOKIE_SECURE
- `SESSION_USE_SIGNER`: Boolean value specifying if the cookie sid should be signed.
- `SQLALCHEMY_ECHO`: Boolean value specifying if SQLAlchemy should log queries to stdout.
- `STATIC_URL`: URL specifying where static assets are hosted.
diff --git a/alembic/versions/cd7e3f9a5d64_admin_and_ownership_provisioning_steps.py b/alembic/versions/cd7e3f9a5d64_admin_and_ownership_provisioning_steps.py
new file mode 100644
index 00000000..08f16c3f
--- /dev/null
+++ b/alembic/versions/cd7e3f9a5d64_admin_and_ownership_provisioning_steps.py
@@ -0,0 +1,198 @@
+"""Admin and Ownership Provisioning Steps
+
+Revision ID: cd7e3f9a5d64
+Revises: 508957112ed6
+Create Date: 2020-01-30 10:38:04.182953
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+# revision identifiers, used by Alembic.
+revision = "cd7e3f9a5d64" # pragma: allowlist secret
+down_revision = "508957112ed6" # pragma: allowlist secret
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.alter_column(
+ "portfolio_state_machines",
+ "state",
+ type_=sa.Enum(
+ "UNSTARTED",
+ "STARTING",
+ "STARTED",
+ "COMPLETED",
+ "FAILED",
+ "TENANT_CREATED",
+ "TENANT_IN_PROGRESS",
+ "TENANT_FAILED",
+ "BILLING_PROFILE_CREATION_CREATED",
+ "BILLING_PROFILE_CREATION_IN_PROGRESS",
+ "BILLING_PROFILE_CREATION_FAILED",
+ "BILLING_PROFILE_VERIFICATION_CREATED",
+ "BILLING_PROFILE_VERIFICATION_IN_PROGRESS",
+ "BILLING_PROFILE_VERIFICATION_FAILED",
+ "BILLING_PROFILE_TENANT_ACCESS_CREATED",
+ "BILLING_PROFILE_TENANT_ACCESS_IN_PROGRESS",
+ "BILLING_PROFILE_TENANT_ACCESS_FAILED",
+ "TASK_ORDER_BILLING_CREATION_CREATED",
+ "TASK_ORDER_BILLING_CREATION_IN_PROGRESS",
+ "TASK_ORDER_BILLING_CREATION_FAILED",
+ "TASK_ORDER_BILLING_VERIFICATION_CREATED",
+ "TASK_ORDER_BILLING_VERIFICATION_IN_PROGRESS",
+ "TASK_ORDER_BILLING_VERIFICATION_FAILED",
+ "BILLING_INSTRUCTION_CREATED",
+ "BILLING_INSTRUCTION_IN_PROGRESS",
+ "BILLING_INSTRUCTION_FAILED",
+ "TENANT_PRINCIPAL_APP_CREATED",
+ "TENANT_PRINCIPAL_APP_IN_PROGRESS",
+ "TENANT_PRINCIPAL_APP_FAILED",
+ "TENANT_PRINCIPAL_CREATED",
+ "TENANT_PRINCIPAL_IN_PROGRESS",
+ "TENANT_PRINCIPAL_FAILED",
+ "TENANT_PRINCIPAL_CREDENTIAL_CREATED",
+ "TENANT_PRINCIPAL_CREDENTIAL_IN_PROGRESS",
+ "TENANT_PRINCIPAL_CREDENTIAL_FAILED",
+ "ADMIN_ROLE_DEFINITION_CREATED",
+ "ADMIN_ROLE_DEFINITION_IN_PROGRESS",
+ "ADMIN_ROLE_DEFINITION_FAILED",
+ "PRINCIPAL_ADMIN_ROLE_CREATED",
+ "PRINCIPAL_ADMIN_ROLE_IN_PROGRESS",
+ "PRINCIPAL_ADMIN_ROLE_FAILED",
+ "TENANT_ADMIN_OWNERSHIP_CREATED",
+ "TENANT_ADMIN_OWNERSHIP_IN_PROGRESS",
+ "TENANT_ADMIN_OWNERSHIP_FAILED",
+ "TENANT_PRINCIPAL_OWNERSHIP_CREATED",
+ "TENANT_PRINCIPAL_OWNERSHIP_IN_PROGRESS",
+ "TENANT_PRINCIPAL_OWNERSHIP_FAILED",
+ name="fsmstates",
+ native_enum=False,
+ ),
+ existing_type=sa.Enum(
+ "UNSTARTED",
+ "STARTING",
+ "STARTED",
+ "COMPLETED",
+ "FAILED",
+ "TENANT_CREATED",
+ "TENANT_IN_PROGRESS",
+ "TENANT_FAILED",
+ "BILLING_PROFILE_CREATION_CREATED",
+ "BILLING_PROFILE_CREATION_IN_PROGRESS",
+ "BILLING_PROFILE_CREATION_FAILED",
+ "BILLING_PROFILE_VERIFICATION_CREATED",
+ "BILLING_PROFILE_VERIFICATION_IN_PROGRESS",
+ "BILLING_PROFILE_VERIFICATION_FAILED",
+ "BILLING_PROFILE_TENANT_ACCESS_CREATED",
+ "BILLING_PROFILE_TENANT_ACCESS_IN_PROGRESS",
+ "BILLING_PROFILE_TENANT_ACCESS_FAILED",
+ "TASK_ORDER_BILLING_CREATION_CREATED",
+ "TASK_ORDER_BILLING_CREATION_IN_PROGRESS",
+ "TASK_ORDER_BILLING_CREATION_FAILED",
+ "TASK_ORDER_BILLING_VERIFICATION_CREATED",
+ "TASK_ORDER_BILLING_VERIFICATION_IN_PROGRESS",
+ "TASK_ORDER_BILLING_VERIFICATION_FAILED",
+ "BILLING_INSTRUCTION_CREATED",
+ "BILLING_INSTRUCTION_IN_PROGRESS",
+ "BILLING_INSTRUCTION_FAILED",
+ name="fsmstates",
+ native_enum=False,
+ create_constraint=False,
+ ),
+ existing_nullable=False,
+ )
+ # ### end Alembic commands ###
+
+
+def downgrade():
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.alter_column(
+ "portfolio_state_machines",
+ "state",
+ type_=sa.Enum(
+ "UNSTARTED",
+ "STARTING",
+ "STARTED",
+ "COMPLETED",
+ "FAILED",
+ "TENANT_CREATED",
+ "TENANT_IN_PROGRESS",
+ "TENANT_FAILED",
+ "BILLING_PROFILE_CREATION_CREATED",
+ "BILLING_PROFILE_CREATION_IN_PROGRESS",
+ "BILLING_PROFILE_CREATION_FAILED",
+ "BILLING_PROFILE_VERIFICATION_CREATED",
+ "BILLING_PROFILE_VERIFICATION_IN_PROGRESS",
+ "BILLING_PROFILE_VERIFICATION_FAILED",
+ "BILLING_PROFILE_TENANT_ACCESS_CREATED",
+ "BILLING_PROFILE_TENANT_ACCESS_IN_PROGRESS",
+ "BILLING_PROFILE_TENANT_ACCESS_FAILED",
+ "TASK_ORDER_BILLING_CREATION_CREATED",
+ "TASK_ORDER_BILLING_CREATION_IN_PROGRESS",
+ "TASK_ORDER_BILLING_CREATION_FAILED",
+ "TASK_ORDER_BILLING_VERIFICATION_CREATED",
+ "TASK_ORDER_BILLING_VERIFICATION_IN_PROGRESS",
+ "TASK_ORDER_BILLING_VERIFICATION_FAILED",
+ "BILLING_INSTRUCTION_CREATED",
+ "BILLING_INSTRUCTION_IN_PROGRESS",
+ "BILLING_INSTRUCTION_FAILED",
+ name="fsmstates",
+ native_enum=False,
+ ),
+ existing_type=sa.Enum(
+ "UNSTARTED",
+ "STARTING",
+ "STARTED",
+ "COMPLETED",
+ "FAILED",
+ "TENANT_CREATED",
+ "TENANT_IN_PROGRESS",
+ "TENANT_FAILED",
+ "BILLING_PROFILE_CREATION_CREATED",
+ "BILLING_PROFILE_CREATION_IN_PROGRESS",
+ "BILLING_PROFILE_CREATION_FAILED",
+ "BILLING_PROFILE_VERIFICATION_CREATED",
+ "BILLING_PROFILE_VERIFICATION_IN_PROGRESS",
+ "BILLING_PROFILE_VERIFICATION_FAILED",
+ "BILLING_PROFILE_TENANT_ACCESS_CREATED",
+ "BILLING_PROFILE_TENANT_ACCESS_IN_PROGRESS",
+ "BILLING_PROFILE_TENANT_ACCESS_FAILED",
+ "TASK_ORDER_BILLING_CREATION_CREATED",
+ "TASK_ORDER_BILLING_CREATION_IN_PROGRESS",
+ "TASK_ORDER_BILLING_CREATION_FAILED",
+ "TASK_ORDER_BILLING_VERIFICATION_CREATED",
+ "TASK_ORDER_BILLING_VERIFICATION_IN_PROGRESS",
+ "TASK_ORDER_BILLING_VERIFICATION_FAILED",
+ "BILLING_INSTRUCTION_CREATED",
+ "BILLING_INSTRUCTION_IN_PROGRESS",
+ "BILLING_INSTRUCTION_FAILED",
+ "TENANT_PRINCIPAL_APP_CREATED",
+ "TENANT_PRINCIPAL_APP_IN_PROGRESS",
+ "TENANT_PRINCIPAL_APP_FAILED",
+ "TENANT_PRINCIPAL_CREATED",
+ "TENANT_PRINCIPAL_IN_PROGRESS",
+ "TENANT_PRINCIPAL_FAILED",
+ "TENANT_PRINCIPAL_CREDENTIAL_CREATED",
+ "TENANT_PRINCIPAL_CREDENTIAL_IN_PROGRESS",
+ "TENANT_PRINCIPAL_CREDENTIAL_FAILED",
+ "ADMIN_ROLE_DEFINITION_CREATED",
+ "ADMIN_ROLE_DEFINITION_IN_PROGRESS",
+ "ADMIN_ROLE_DEFINITION_FAILED",
+ "PRINCIPAL_ADMIN_ROLE_CREATED",
+ "PRINCIPAL_ADMIN_ROLE_IN_PROGRESS",
+ "PRINCIPAL_ADMIN_ROLE_FAILED",
+ "TENANT_ADMIN_OWNERSHIP_CREATED",
+ "TENANT_ADMIN_OWNERSHIP_IN_PROGRESS",
+ "TENANT_ADMIN_OWNERSHIP_FAILED",
+ "TENANT_PRINCIPAL_OWNERSHIP_CREATED",
+ "TENANT_PRINCIPAL_OWNERSHIP_IN_PROGRESS",
+ "TENANT_PRINCIPAL_OWNERSHIP_FAILED",
+ name="fsmstates",
+ native_enum=False,
+ ),
+ existing_nullable=False,
+ )
+ # ### end Alembic commands ###
diff --git a/atst/domain/csp/cloud/azure_cloud_provider.py b/atst/domain/csp/cloud/azure_cloud_provider.py
index a83bef10..3074e221 100644
--- a/atst/domain/csp/cloud/azure_cloud_provider.py
+++ b/atst/domain/csp/cloud/azure_cloud_provider.py
@@ -1,12 +1,16 @@
import json
import re
from secrets import token_urlsafe
-from typing import Dict
+from typing import Any, Dict
from uuid import uuid4
+from atst.utils import sha256_hex
+
from .cloud_provider_interface import CloudProviderInterface
from .exceptions import AuthenticationException
from .models import (
+ AdminRoleDefinitionCSPPayload,
+ AdminRoleDefinitionCSPResult,
ApplicationCSPPayload,
ApplicationCSPResult,
BillingInstructionCSPPayload,
@@ -23,26 +27,36 @@ from .models import (
ProductPurchaseCSPResult,
ProductPurchaseVerificationCSPPayload,
ProductPurchaseVerificationCSPResult,
+ PrincipalAdminRoleCSPPayload,
+ PrincipalAdminRoleCSPResult,
TaskOrderBillingCreationCSPPayload,
TaskOrderBillingCreationCSPResult,
TaskOrderBillingVerificationCSPPayload,
TaskOrderBillingVerificationCSPResult,
+ TenantAdminOwnershipCSPPayload,
+ TenantAdminOwnershipCSPResult,
TenantCSPPayload,
TenantCSPResult,
+ TenantPrincipalAppCSPPayload,
+ TenantPrincipalAppCSPResult,
+ TenantPrincipalCredentialCSPPayload,
+ TenantPrincipalCredentialCSPResult,
+ TenantPrincipalCSPPayload,
+ TenantPrincipalCSPResult,
+ TenantPrincipalOwnershipCSPPayload,
+ TenantPrincipalOwnershipCSPResult,
)
from .policy import AzurePolicyManager
-from atst.utils import sha256_hex
-AZURE_ENVIRONMENT = "AZURE_PUBLIC_CLOUD" # TBD
-AZURE_SKU_ID = "?" # probably a static sku specific to ATAT/JEDI
SUBSCRIPTION_ID_REGEX = re.compile(
"subscriptions\/([0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})",
re.I,
)
# This needs to be a fully pathed role definition identifier, not just a UUID
+# TODO: Extract these from sdk msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD
+AZURE_SKU_ID = "0001" # probably a static sku specific to ATAT/JEDI
REMOTE_ROOT_ROLE_DEF_ID = "/providers/Microsoft.Authorization/roleDefinitions/00000000-0000-4000-8000-000000000000"
-AZURE_MANAGEMENT_API = "https://management.azure.com"
class AzureSDKProvider(object):
@@ -54,8 +68,9 @@ class AzureSDKProvider(object):
import azure.identity as identity
from azure.keyvault import secrets
from azure.core import exceptions
-
- from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD
+ from msrestazure.azure_cloud import (
+ AZURE_PUBLIC_CLOUD,
+ ) # TODO: choose cloud type from config
import adal
import requests
@@ -70,7 +85,6 @@ class AzureSDKProvider(object):
self.exceptions = exceptions
self.secrets = secrets
self.requests = requests
- # may change to a JEDI cloud
self.cloud = AZURE_PUBLIC_CLOUD
@@ -82,6 +96,9 @@ class AzureCloudProvider(CloudProviderInterface):
self.secret_key = config["AZURE_SECRET_KEY"]
self.tenant_id = config["AZURE_TENANT_ID"]
self.vault_url = config["AZURE_VAULT_URL"]
+ self.ps_client_id = config["POWERSHELL_CLIENT_ID"]
+ self.owner_role_def_id = config["AZURE_OWNER_ROLE_DEF_ID"]
+ self.graph_resource = config["AZURE_GRAPH_RESOURCE"]
if azure_sdk_provider is None:
self.sdk = AzureSDKProvider()
@@ -91,7 +108,7 @@ class AzureCloudProvider(CloudProviderInterface):
self.policy_manager = AzurePolicyManager(config["AZURE_POLICY_LOCATION"])
def set_secret(self, secret_key, secret_value):
- credential = self._get_client_secret_credential_obj({})
+ credential = self._get_client_secret_credential_obj()
secret_client = self.sdk.secrets.SecretClient(
vault_url=self.vault_url, credential=credential,
)
@@ -104,7 +121,7 @@ class AzureCloudProvider(CloudProviderInterface):
)
def get_secret(self, secret_key):
- credential = self._get_client_secret_credential_obj({})
+ credential = self._get_client_secret_credential_obj()
secret_client = self.sdk.secrets.SecretClient(
vault_url=self.vault_url, credential=credential,
)
@@ -180,7 +197,7 @@ class AzureCloudProvider(CloudProviderInterface):
"secret_key": creds.root_sp_key,
"tenant_id": creds.root_tenant_id,
},
- resource=AZURE_MANAGEMENT_API,
+ resource=self.sdk.cloud.endpoints.resource_manager,
)
response = self._create_management_group(
@@ -305,7 +322,7 @@ class AzureCloudProvider(CloudProviderInterface):
)
def create_tenant(self, payload: TenantCSPPayload):
- sp_token = self._get_sp_token(payload.creds)
+ sp_token = self._get_root_provisioning_token()
if sp_token is None:
raise AuthenticationException("Could not resolve token for tenant creation")
@@ -317,26 +334,33 @@ class AzureCloudProvider(CloudProviderInterface):
}
result = self.sdk.requests.post(
- "https://management.azure.com/providers/Microsoft.SignUp/createTenant?api-version=2020-01-01-preview",
+ f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.SignUp/createTenant?api-version=2020-01-01-preview",
json=create_tenant_body,
headers=create_tenant_headers,
)
if result.status_code == 200:
- return self._ok(
- TenantCSPResult(
- **result.json(),
- tenant_admin_password=payload.password,
- tenant_admin_username=payload.user_id,
- )
+ result_dict = result.json()
+ tenant_id = result_dict.get("tenantId")
+ tenant_admin_username = (
+ f"{payload.user_id}@{payload.domain_name}.onmicrosoft.com"
)
+ self.update_tenant_creds(
+ tenant_id,
+ KeyVaultCredentials(
+ tenant_id=tenant_id,
+ tenant_admin_username=tenant_admin_username,
+ tenant_admin_password=payload.password,
+ ),
+ )
+ return self._ok(TenantCSPResult(**result_dict))
else:
return self._error(result.json())
def create_billing_profile_creation(
self, payload: BillingProfileCreationCSPPayload
):
- sp_token = self._get_sp_token(payload.creds)
+ sp_token = self._get_root_provisioning_token()
if sp_token is None:
raise AuthenticationException(
"Could not resolve token for billing profile creation"
@@ -348,7 +372,7 @@ class AzureCloudProvider(CloudProviderInterface):
"Authorization": f"Bearer {sp_token}",
}
- billing_account_create_url = f"https://management.azure.com/providers/Microsoft.Billing/billingAccounts/{payload.billing_account_name}/billingProfiles?api-version=2019-10-01-preview"
+ billing_account_create_url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Billing/billingAccounts/{payload.billing_account_name}/billingProfiles?api-version=2019-10-01-preview"
result = self.sdk.requests.post(
billing_account_create_url,
@@ -368,7 +392,7 @@ class AzureCloudProvider(CloudProviderInterface):
def create_billing_profile_verification(
self, payload: BillingProfileVerificationCSPPayload
):
- sp_token = self._get_sp_token(payload.creds)
+ sp_token = self._get_root_provisioning_token()
if sp_token is None:
raise AuthenticationException(
"Could not resolve token for billing profile validation"
@@ -393,7 +417,7 @@ class AzureCloudProvider(CloudProviderInterface):
def create_billing_profile_tenant_access(
self, payload: BillingProfileTenantAccessCSPPayload
):
- sp_token = self._get_sp_token(payload.creds)
+ sp_token = self._get_root_provisioning_token()
request_body = {
"properties": {
"principalTenantId": payload.tenant_id, # from tenant creation
@@ -406,7 +430,7 @@ class AzureCloudProvider(CloudProviderInterface):
"Authorization": f"Bearer {sp_token}",
}
- url = f"https://management.azure.com/providers/Microsoft.Billing/billingAccounts/{payload.billing_account_name}/billingProfiles/{payload.billing_profile_name}/createBillingRoleAssignment?api-version=2019-10-01-preview"
+ url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Billing/billingAccounts/{payload.billing_account_name}/billingProfiles/{payload.billing_profile_name}/createBillingRoleAssignment?api-version=2019-10-01-preview"
result = self.sdk.requests.post(url, headers=headers, json=request_body)
if result.status_code == 201:
@@ -417,12 +441,12 @@ class AzureCloudProvider(CloudProviderInterface):
def create_task_order_billing_creation(
self, payload: TaskOrderBillingCreationCSPPayload
):
- sp_token = self._get_sp_token(payload.creds)
+ sp_token = self._get_root_provisioning_token()
request_body = [
{
"op": "replace",
"path": "/enabledAzurePlans",
- "value": [{"skuId": "0001"}],
+ "value": [{"skuId": AZURE_SKU_ID}],
}
]
@@ -430,7 +454,7 @@ class AzureCloudProvider(CloudProviderInterface):
"Authorization": f"Bearer {sp_token}",
}
- url = f"https://management.azure.com/providers/Microsoft.Billing/billingAccounts/{payload.billing_account_name}/billingProfiles/{payload.billing_profile_name}?api-version=2019-10-01-preview"
+ url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Billing/billingAccounts/{payload.billing_account_name}/billingProfiles/{payload.billing_profile_name}?api-version=2019-10-01-preview"
result = self.sdk.requests.patch(
url, headers=request_headers, json=request_body
@@ -447,7 +471,7 @@ class AzureCloudProvider(CloudProviderInterface):
def create_task_order_billing_verification(
self, payload: TaskOrderBillingVerificationCSPPayload
):
- sp_token = self._get_sp_token(payload.creds)
+ sp_token = self._get_root_provisioning_token()
if sp_token is None:
raise AuthenticationException(
"Could not resolve token for task order billing validation"
@@ -470,7 +494,7 @@ class AzureCloudProvider(CloudProviderInterface):
return self._error(result.json())
def create_billing_instruction(self, payload: BillingInstructionCSPPayload):
- sp_token = self._get_sp_token(payload.creds)
+ sp_token = self._get_root_provisioning_token()
if sp_token is None:
raise AuthenticationException(
"Could not resolve token for task order billing validation"
@@ -484,7 +508,7 @@ class AzureCloudProvider(CloudProviderInterface):
}
}
- url = f"https://management.azure.com/providers/Microsoft.Billing/billingAccounts/{payload.billing_account_name}/billingProfiles/{payload.billing_profile_name}/instructions/{payload.initial_task_order_id}:CLIN00{payload.initial_clin_type}?api-version=2019-10-01-preview"
+ url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Billing/billingAccounts/{payload.billing_account_name}/billingProfiles/{payload.billing_profile_name}/instructions/{payload.initial_task_order_id}:CLIN00{payload.initial_clin_type}?api-version=2019-10-01-preview"
auth_header = {
"Authorization": f"Bearer {sp_token}",
@@ -498,7 +522,7 @@ class AzureCloudProvider(CloudProviderInterface):
return self._error(result.json())
def create_product_purchase(self, payload: ProductPurchaseCSPPayload):
- sp_token = self._get_sp_token(payload.creds)
+ sp_token = self._get_root_provisioning_token()
if sp_token is None:
raise AuthenticationException(
"Could not resolve token for aad premium product purchase"
@@ -540,7 +564,7 @@ class AzureCloudProvider(CloudProviderInterface):
def create_product_purchase_verification(
self, payload: ProductPurchaseVerificationCSPPayload
):
- sp_token = self._get_sp_token(payload.creds)
+ sp_token = self._get_root_provisioning_token()
if sp_token is None:
raise AuthenticationException(
"Could not resolve token for aad premium product purchase validation"
@@ -567,21 +591,198 @@ class AzureCloudProvider(CloudProviderInterface):
else:
return self._error(result.json())
- def create_remote_admin(self, creds, tenant_details):
- # create app/service principal within tenant, with name constructed from tenant details
- # assign principal global admin
+ def create_tenant_admin_ownership(self, payload: TenantAdminOwnershipCSPPayload):
+ mgmt_token = self._get_elevated_management_token(payload.tenant_id)
- # needs to call out to CLI with tenant owner username/password, prototyping for that underway
+ role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{self.owner_role_def_id}"
- # return identifier and creds to consumer for storage
- response = {"clientId": "string", "secretKey": "string", "tenantId": "string"}
- return self._ok(
- {
- "client_id": response["clientId"],
- "secret_key": response["secret_key"],
- "tenant_id": response["tenantId"],
+ request_body = {
+ "properties": {
+ "roleDefinitionId": role_definition_id,
+ "principalId": payload.user_object_id,
}
+ }
+
+ auth_header = {
+ "Authorization": f"Bearer {mgmt_token}",
+ }
+
+ assignment_guid = str(uuid4())
+
+ url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleAssignments/{assignment_guid}?api-version=2015-07-01"
+
+ response = self.sdk.requests.put(url, headers=auth_header, json=request_body)
+
+ if response.ok:
+ return TenantAdminOwnershipCSPResult(**response.json())
+
+ def create_tenant_principal_ownership(
+ self, payload: TenantPrincipalOwnershipCSPPayload
+ ):
+ mgmt_token = self._get_elevated_management_token(payload.tenant_id)
+
+ # NOTE: the tenant_id is also the id of the root management group, once it is created
+ role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{self.owner_role_def_id}"
+
+ request_body = {
+ "properties": {
+ "roleDefinitionId": role_definition_id,
+ "principalId": payload.principal_id,
+ }
+ }
+
+ auth_header = {
+ "Authorization": f"Bearer {mgmt_token}",
+ }
+
+ assignment_guid = str(uuid4())
+
+ url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleAssignments/{assignment_guid}?api-version=2015-07-01"
+
+ response = self.sdk.requests.put(url, headers=auth_header, json=request_body)
+
+ if response.ok:
+ return TenantPrincipalOwnershipCSPResult(**response.json())
+
+ def create_tenant_principal_app(self, payload: TenantPrincipalAppCSPPayload):
+ graph_token = self._get_tenant_admin_token(
+ payload.tenant_id, self.graph_resource
)
+ if graph_token is None:
+ raise AuthenticationException(
+ "Could not resolve graph token for tenant admin"
+ )
+
+ request_body = {"displayName": "ATAT Remote Admin"}
+
+ auth_header = {
+ "Authorization": f"Bearer {graph_token}",
+ }
+
+ url = f"{self.graph_resource}/v1.0/applications"
+
+ response = self.sdk.requests.post(url, json=request_body, headers=auth_header)
+
+ if response.ok:
+ return TenantPrincipalAppCSPResult(**response.json())
+
+ def create_tenant_principal(self, payload: TenantPrincipalCSPPayload):
+ graph_token = self._get_tenant_admin_token(
+ payload.tenant_id, self.graph_resource
+ )
+ if graph_token is None:
+ raise AuthenticationException(
+ "Could not resolve graph token for tenant admin"
+ )
+
+ request_body = {"appId": payload.principal_app_id}
+
+ auth_header = {
+ "Authorization": f"Bearer {graph_token}",
+ }
+
+ url = f"{self.graph_resource}/beta/servicePrincipals"
+
+ response = self.sdk.requests.post(url, json=request_body, headers=auth_header)
+
+ if response.ok:
+ return TenantPrincipalCSPResult(**response.json())
+
+ def create_tenant_principal_credential(
+ self, payload: TenantPrincipalCredentialCSPPayload
+ ):
+ graph_token = self._get_tenant_admin_token(
+ payload.tenant_id, self.graph_resource
+ )
+ if graph_token is None:
+ raise AuthenticationException(
+ "Could not resolve graph token for tenant admin"
+ )
+
+ request_body = {
+ "passwordCredentials": [{"displayName": "ATAT Generated Password"}]
+ }
+
+ auth_header = {
+ "Authorization": f"Bearer {graph_token}",
+ }
+
+ url = f"{self.graph_resource}/v1.0/applications/{payload.principal_app_object_id}/addPassword"
+
+ response = self.sdk.requests.post(url, json=request_body, headers=auth_header)
+
+ if response.ok:
+ result = response.json()
+ self.update_tenant_creds(
+ payload.tenant_id,
+ KeyVaultCredentials(
+ tenant_id=payload.tenant_id,
+ tenant_sp_key=result.get("secretText"),
+ tenant_sp_client_id=payload.principal_app_id,
+ ),
+ )
+ return TenantPrincipalCredentialCSPResult(
+ principal_client_id=payload.principal_app_id,
+ principal_creds_established=True,
+ )
+
+ def create_admin_role_definition(self, payload: AdminRoleDefinitionCSPPayload):
+ graph_token = self._get_tenant_admin_token(
+ payload.tenant_id, self.graph_resource
+ )
+ if graph_token is None:
+ raise AuthenticationException(
+ "Could not resolve graph token for tenant admin"
+ )
+
+ auth_header = {
+ "Authorization": f"Bearer {graph_token}",
+ }
+
+ url = f"{self.graph_resource}/beta/roleManagement/directory/roleDefinitions"
+
+ response = self.sdk.requests.get(url, headers=auth_header)
+
+ result = response.json()
+ roleList = result.get("value")
+
+ DEFAULT_ADMIN_RD_ID = "794bb258-3e31-42ff-9ee4-731a72f62851"
+ admin_role_def_id = next(
+ (
+ role.get("id")
+ for role in roleList
+ if role.get("displayName") == "Company Administrator"
+ ),
+ DEFAULT_ADMIN_RD_ID,
+ )
+
+ return AdminRoleDefinitionCSPResult(admin_role_def_id=admin_role_def_id)
+
+ def create_principal_admin_role(self, payload: PrincipalAdminRoleCSPPayload):
+ graph_token = self._get_tenant_admin_token(
+ payload.tenant_id, self.graph_resource
+ )
+ if graph_token is None:
+ raise AuthenticationException(
+ "Could not resolve graph token for tenant admin"
+ )
+
+ request_body = {
+ "principalId": payload.principal_id,
+ "roleDefinitionId": payload.admin_role_def_id,
+ "resourceScope": "/",
+ }
+
+ auth_header = {
+ "Authorization": f"Bearer {graph_token}",
+ }
+
+ url = f"{self.graph_resource}/beta/roleManagement/directory/roleAssignments"
+
+ response = self.sdk.requests.post(url, headers=auth_header, json=request_body)
+
+ if response.ok:
+ return PrincipalAdminRoleCSPResult(**response.json())
def force_tenant_admin_pw_update(self, creds, tenant_owner_id):
# use creds to update to force password recovery?
@@ -651,22 +852,42 @@ class AzureCloudProvider(CloudProviderInterface):
if sub_id_match:
return sub_id_match.group(1)
- def _get_sp_token(self, creds):
- home_tenant_id = creds.get("home_tenant_id")
- client_id = creds.get("client_id")
- secret_key = creds.get("secret_key")
+ def _get_tenant_admin_token(self, tenant_id, resource):
+ creds = self._source_tenant_creds(tenant_id)
+ return self._get_up_token_for_resource(
+ creds.tenant_admin_username,
+ creds.tenant_admin_password,
+ tenant_id,
+ resource,
+ )
- # TODO: Make endpoints consts or configs
- authentication_endpoint = "https://login.microsoftonline.com/"
- resource = "https://management.azure.com/"
+ def _get_root_provisioning_token(self):
+ creds = self._source_creds()
+ return self._get_sp_token(
+ creds.tenant_id, creds.root_sp_client_id, creds.root_sp_key
+ )
+ def _get_sp_token(self, tenant_id, client_id, secret_key):
context = self.sdk.adal.AuthenticationContext(
- authentication_endpoint + home_tenant_id
+ f"{self.sdk.cloud.endpoints.active_directory}/{tenant_id}"
)
# TODO: handle failure states here
token_response = context.acquire_token_with_client_credentials(
- resource, client_id, secret_key
+ self.sdk.cloud.endpoints.resource_manager, client_id, secret_key
+ )
+
+ return token_response.get("accessToken", None)
+
+ def _get_up_token_for_resource(self, username, password, tenant_id, resource):
+
+ context = self.sdk.adal.AuthenticationContext(
+ f"{self.sdk.cloud.endpoints.active_directory}/{tenant_id}"
+ )
+
+ # TODO: handle failure states here
+ token_response = context.acquire_token_with_username_password(
+ resource, username, password, self.ps_client_id
)
return token_response.get("accessToken", None)
@@ -680,16 +901,14 @@ class AzureCloudProvider(CloudProviderInterface):
cloud_environment=self.sdk.cloud,
)
- def _get_client_secret_credential_obj(self, creds):
+ def _get_client_secret_credential_obj(self):
+ creds = self._source_creds()
return self.sdk.identity.ClientSecretCredential(
- tenant_id=creds.get("tenant_id"),
- client_id=creds.get("client_id"),
- client_secret=creds.get("secret_key"),
+ tenant_id=creds.tenant_id,
+ client_id=creds.root_sp_client_id,
+ client_secret=creds.root_sp_key,
)
- def _make_tenant_admin_cred_obj(self, username, password):
- return self.sdk.credentials.UserPassCredentials(username, password)
-
def _ok(self, body=None):
return self._make_response("ok", body)
@@ -716,6 +935,26 @@ class AzureCloudProvider(CloudProviderInterface):
"tenant_id": self.tenant_id,
}
+ def _get_elevated_management_token(self, tenant_id):
+ mgmt_token = self._get_tenant_admin_token(
+ tenant_id, self.sdk.cloud.endpoints.resource_manager
+ )
+ if mgmt_token is None:
+ raise AuthenticationException(
+ "Failed to resolve management token for tenant admin"
+ )
+
+ auth_header = {
+ "Authorization": f"Bearer {mgmt_token}",
+ }
+ url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Authorization/elevateAccess?api-version=2016-07-01"
+ result = self.sdk.requests.post(url, headers=auth_header)
+
+ if not result.ok:
+ raise AuthenticationException("Failed to elevate access")
+
+ return mgmt_token
+
def _source_creds(self, tenant_id=None) -> KeyVaultCredentials:
if tenant_id:
return self._source_tenant_creds(tenant_id)
@@ -726,13 +965,16 @@ class AzureCloudProvider(CloudProviderInterface):
root_sp_key=self._root_creds.get("secret_key"),
)
- def update_tenant_creds(self, tenant_id, secret):
+ def update_tenant_creds(self, tenant_id, secret: KeyVaultCredentials):
hashed = sha256_hex(tenant_id)
- self.set_secret(hashed, json.dumps(secret))
+ new_secrets = secret.dict()
+ curr_secrets = self._source_tenant_creds(tenant_id)
+ updated_secrets: Dict[str, Any] = {**curr_secrets.dict(), **new_secrets}
+ us = KeyVaultCredentials(**updated_secrets)
+ self.set_secret(hashed, json.dumps(us.dict()))
+ return us
- return secret
-
- def _source_tenant_creds(self, tenant_id):
+ def _source_tenant_creds(self, tenant_id) -> KeyVaultCredentials:
hashed = sha256_hex(tenant_id)
raw_creds = self.get_secret(hashed)
return KeyVaultCredentials(**json.loads(raw_creds))
diff --git a/atst/domain/csp/cloud/mock_cloud_provider.py b/atst/domain/csp/cloud/mock_cloud_provider.py
index ddb856dd..baf711d4 100644
--- a/atst/domain/csp/cloud/mock_cloud_provider.py
+++ b/atst/domain/csp/cloud/mock_cloud_provider.py
@@ -1,41 +1,52 @@
from uuid import uuid4
-from atst.domain.csp.cloud.exceptions import (
- BaselineProvisionException,
- EnvironmentCreationException,
- GeneralCSPException,
- UserProvisioningException,
- UserRemovalException,
-)
-from atst.domain.csp.cloud.models import BillingProfileTenantAccessCSPResult
-
from .cloud_provider_interface import CloudProviderInterface
from .exceptions import (
AuthenticationException,
AuthorizationException,
+ BaselineProvisionException,
ConnectionException,
+ EnvironmentCreationException,
+ GeneralCSPException,
UnknownServerException,
+ UserProvisioningException,
+ UserRemovalException,
)
from .models import (
AZURE_MGMNT_PATH,
+ AdminRoleDefinitionCSPPayload,
+ AdminRoleDefinitionCSPResult,
ApplicationCSPPayload,
ApplicationCSPResult,
BillingInstructionCSPPayload,
BillingInstructionCSPResult,
BillingProfileCreationCSPPayload,
BillingProfileCreationCSPResult,
+ BillingProfileTenantAccessCSPResult,
BillingProfileVerificationCSPPayload,
BillingProfileVerificationCSPResult,
ProductPurchaseCSPPayload,
ProductPurchaseCSPResult,
ProductPurchaseVerificationCSPPayload,
ProductPurchaseVerificationCSPResult,
+ PrincipalAdminRoleCSPPayload,
+ PrincipalAdminRoleCSPResult,
TaskOrderBillingCreationCSPPayload,
TaskOrderBillingCreationCSPResult,
TaskOrderBillingVerificationCSPPayload,
TaskOrderBillingVerificationCSPResult,
+ TenantAdminOwnershipCSPPayload,
+ TenantAdminOwnershipCSPResult,
TenantCSPPayload,
TenantCSPResult,
+ TenantPrincipalAppCSPPayload,
+ TenantPrincipalAppCSPResult,
+ TenantPrincipalCredentialCSPPayload,
+ TenantPrincipalCredentialCSPResult,
+ TenantPrincipalCSPPayload,
+ TenantPrincipalCSPResult,
+ TenantPrincipalOwnershipCSPPayload,
+ TenantPrincipalOwnershipCSPResult,
)
@@ -124,7 +135,7 @@ class MockCloudProvider(CloudProviderInterface):
payload is an instance of TenantCSPPayload data class
"""
- self._authorize(payload.creds)
+ self._authorize("admin")
self._delay(1, 5)
@@ -304,6 +315,70 @@ class MockCloudProvider(CloudProviderInterface):
**dict(premium_purchase_date="2020-01-30T18:57:05.981Z")
)
+ def create_tenant_admin_ownership(self, payload: TenantAdminOwnershipCSPPayload):
+ self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
+ self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
+ self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
+ return TenantAdminOwnershipCSPResult(**dict(id="admin_owner_assignment_id"))
+
+
+ def create_tenant_principal_ownership(
+ self, payload: TenantPrincipalOwnershipCSPPayload
+ ):
+ self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
+ self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
+ self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
+
+ return TenantPrincipalOwnershipCSPResult(
+ **dict(id="principal_owner_assignment_id")
+ )
+
+ def create_tenant_principal_app(self, payload: TenantPrincipalAppCSPPayload):
+ self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
+ self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
+ self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
+
+ return TenantPrincipalAppCSPResult(
+ **dict(appId="principal_app_id", id="principal_app_object_id")
+ )
+
+ def create_tenant_principal(self, payload: TenantPrincipalCSPPayload):
+ self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
+ self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
+ self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
+
+ return TenantPrincipalCSPResult(**dict(id="principal_id"))
+
+ def create_tenant_principal_credential(
+ self, payload: TenantPrincipalCredentialCSPPayload
+ ):
+ self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
+ self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
+ self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
+
+ return TenantPrincipalCredentialCSPResult(
+ **dict(
+ principal_client_id="principal_client_id",
+ principal_creds_established=True,
+ )
+ )
+
+ def create_admin_role_definition(self, payload: AdminRoleDefinitionCSPPayload):
+ self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
+ self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
+ self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
+
+ return AdminRoleDefinitionCSPResult(
+ **dict(admin_role_def_id="admin_role_def_id")
+ )
+
+ def create_principal_admin_role(self, payload: PrincipalAdminRoleCSPPayload):
+ self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
+ self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
+ self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
+
+ return PrincipalAdminRoleCSPResult(**dict(id="principal_assignment_id"))
+
def create_or_update_user(self, auth_credentials, user_info, csp_role_id):
self._authorize(auth_credentials)
diff --git a/atst/domain/csp/cloud/models.py b/atst/domain/csp/cloud/models.py
index d119b7eb..aa24d027 100644
--- a/atst/domain/csp/cloud/models.py
+++ b/atst/domain/csp/cloud/models.py
@@ -22,20 +22,10 @@ class AliasModel(BaseModel):
class BaseCSPPayload(AliasModel):
- # {"username": "mock-cloud", "pass": "shh"}
- creds: Dict
-
- def dict(self, *args, **kwargs):
- exclude = {"creds"}
- if "exclude" not in kwargs:
- kwargs["exclude"] = exclude
- else:
- kwargs["exclude"].update(exclude)
-
- return super().dict(*args, **kwargs)
+ tenant_id: str
-class TenantCSPPayload(BaseCSPPayload):
+class TenantCSPPayload(AliasModel):
user_id: str
password: Optional[str]
domain_name: str
@@ -236,6 +226,81 @@ class BillingInstructionCSPResult(AliasModel):
}
+class TenantAdminOwnershipCSPPayload(BaseCSPPayload):
+ user_object_id: str
+
+
+class TenantAdminOwnershipCSPResult(AliasModel):
+ admin_owner_assignment_id: str
+
+ class Config:
+ fields = {"admin_owner_assignment_id": "id"}
+
+
+class TenantPrincipalOwnershipCSPPayload(BaseCSPPayload):
+ principal_id: str
+
+
+class TenantPrincipalOwnershipCSPResult(AliasModel):
+ principal_owner_assignment_id: str
+
+ class Config:
+ fields = {"principal_owner_assignment_id": "id"}
+
+
+class TenantPrincipalAppCSPPayload(BaseCSPPayload):
+ pass
+
+
+class TenantPrincipalAppCSPResult(AliasModel):
+ principal_app_id: str
+ principal_app_object_id: str
+
+ class Config:
+ fields = {"principal_app_id": "appId", "principal_app_object_id": "id"}
+
+
+class TenantPrincipalCSPPayload(BaseCSPPayload):
+ principal_app_id: str
+
+
+class TenantPrincipalCSPResult(AliasModel):
+ principal_id: str
+
+ class Config:
+ fields = {"principal_id": "id"}
+
+
+class TenantPrincipalCredentialCSPPayload(BaseCSPPayload):
+ principal_app_id: str
+ principal_app_object_id: str
+
+
+class TenantPrincipalCredentialCSPResult(AliasModel):
+ principal_client_id: str
+ principal_creds_established: bool
+
+
+class AdminRoleDefinitionCSPPayload(BaseCSPPayload):
+ pass
+
+
+class AdminRoleDefinitionCSPResult(AliasModel):
+ admin_role_def_id: str
+
+
+class PrincipalAdminRoleCSPPayload(BaseCSPPayload):
+ principal_id: str
+ admin_role_def_id: str
+
+
+class PrincipalAdminRoleCSPResult(AliasModel):
+ principal_assignment_id: str
+
+ class Config:
+ fields = {"principal_assignment_id": "id"}
+
+
AZURE_MGMNT_PATH = "/providers/Microsoft.Management/managementGroups/"
MANAGEMENT_GROUP_NAME_REGEX = "^[a-zA-Z0-9\-_\(\)\.]+$"
diff --git a/atst/models/mixins/state_machines.py b/atst/models/mixins/state_machines.py
index 1db257df..a7edf268 100644
--- a/atst/models/mixins/state_machines.py
+++ b/atst/models/mixins/state_machines.py
@@ -19,6 +19,13 @@ class AzureStages(Enum):
BILLING_INSTRUCTION = "billing instruction"
PRODUCT_PURCHASE = "purchase aad premium product"
PRODUCT_PURCHASE_VERIFICATION = "purchase aad premium product verification"
+ TENANT_PRINCIPAL_APP = "tenant principal application"
+ TENANT_PRINCIPAL = "tenant principal"
+ TENANT_PRINCIPAL_CREDENTIAL = "tenant principal credential"
+ ADMIN_ROLE_DEFINITION = "admin role definition"
+ PRINCIPAL_ADMIN_ROLE = "tenant principal admin"
+ TENANT_ADMIN_OWNERSHIP = "tenant admin ownership"
+ TENANT_PRINCIPAL_OWNERSHIP = "tenant principial ownership"
def _build_csp_states(csp_stages):
diff --git a/atst/models/portfolio_state_machine.py b/atst/models/portfolio_state_machine.py
index be9324b1..4b14a087 100644
--- a/atst/models/portfolio_state_machine.py
+++ b/atst/models/portfolio_state_machine.py
@@ -168,14 +168,6 @@ class PortfolioStateMachine(
self.portfolio.csp_data.update(response.dict())
db.session.add(self.portfolio)
db.session.commit()
-
- if getattr(response, "get_creds", None) is not None:
- new_creds = response.get_creds()
- # TODO: one way salted hash of tenant_id to use as kv key name?
- tenant_id = new_creds.get("tenant_id")
- secret = self.csp.get_secret(tenant_id, new_creds)
- secret.update(new_creds)
- self.csp.update_tenant_creds(tenant_id, secret)
except PydanticValidationError as exc:
app.logger.error(
f"Failed to cast response to valid result class {self.__repr__()}:",
diff --git a/atst/routes/__init__.py b/atst/routes/__init__.py
index 78934400..5fb7e22f 100644
--- a/atst/routes/__init__.py
+++ b/atst/routes/__init__.py
@@ -14,7 +14,9 @@ from flask import (
from jinja2.exceptions import TemplateNotFound
import pendulum
import os
-from werkzeug.exceptions import NotFound
+from werkzeug.exceptions import NotFound, MethodNotAllowed
+from werkzeug.routing import RequestRedirect
+
from atst.domain.users import Users
from atst.domain.authnid import AuthenticationContext
@@ -61,17 +63,36 @@ def _make_authentication_context():
def redirect_after_login_url():
- if request.args.get("next"):
- returl = request.args.get("next")
- if request.args.get(app.form_cache.PARAM_NAME):
- returl += "?" + url.urlencode(
- {app.form_cache.PARAM_NAME: request.args.get(app.form_cache.PARAM_NAME)}
- )
+ returl = request.args.get("next")
+ if match_url_pattern(returl):
+ param_name = request.args.get(app.form_cache.PARAM_NAME)
+ if param_name:
+ returl += "?" + url.urlencode({app.form_cache.PARAM_NAME: param_name})
return returl
else:
return url_for("atst.home")
+def match_url_pattern(url, method="GET"):
+ """Ensure a url matches a url pattern in the flask app
+ inspired by https://stackoverflow.com/questions/38488134/get-the-flask-view-function-that-matches-a-url/38488506#38488506
+ """
+ server_name = app.config.get("SERVER_NAME") or "localhost"
+ adapter = app.url_map.bind(server_name=server_name)
+
+ try:
+ match = adapter.match(url, method=method)
+ except RequestRedirect as e:
+ # recursively match redirects
+ return match_url_pattern(e.new_url, method)
+ except (MethodNotAllowed, NotFound):
+ # no match
+ return None
+
+ if match[0] in app.view_functions:
+ return url
+
+
def current_user_setup(user):
session["user_id"] = user.id
session["last_login"] = user.last_login
@@ -109,13 +130,3 @@ def logout():
@bp.route("/about")
def about():
return render_template("about.html")
-
-
-@bp.route("/csp-environment-access")
-def csp_environment_access():
- return render_template("mock_csp.html", text="console for this environment")
-
-
-@bp.route("/jedi-csp-calculator")
-def jedi_csp_calculator():
- return redirect(app.csp.cloud.get_calculator_url())
diff --git a/atst/routes/applications/__init__.py b/atst/routes/applications/__init__.py
index 8e09c1ed..c8668479 100644
--- a/atst/routes/applications/__init__.py
+++ b/atst/routes/applications/__init__.py
@@ -22,9 +22,4 @@ def wrap_environment_role_lookup(user, environment_id=None, **kwargs):
@applications_bp.route("/environments/ Once the Cloud Service Provider is selected, this link will take you to the CSP's {{ text }}!Do you want to save this draft?
+ {{ 'task_orders.form.builder_base.cancel_modal' | translate }}
}
```
diff --git a/tests/domain/cloud/test_azure_csp.py b/tests/domain/cloud/test_azure_csp.py
index a59a51ea..22b3cf8f 100644
--- a/tests/domain/cloud/test_azure_csp.py
+++ b/tests/domain/cloud/test_azure_csp.py
@@ -1,15 +1,18 @@
-import pytest
import json
-from uuid import uuid4
from unittest.mock import Mock, patch
+from uuid import uuid4
+import pytest
from tests.factories import ApplicationFactory, EnvironmentFactory
from tests.mock_azure import AUTH_CREDENTIALS, mock_azure
from atst.domain.csp.cloud import AzureCloudProvider
from atst.domain.csp.cloud.models import (
+ AdminRoleDefinitionCSPPayload,
+ AdminRoleDefinitionCSPResult,
ApplicationCSPPayload,
ApplicationCSPResult,
+ BaseCSPPayload,
BillingInstructionCSPPayload,
BillingInstructionCSPResult,
BillingProfileCreationCSPPayload,
@@ -26,15 +29,20 @@ from atst.domain.csp.cloud.models import (
TaskOrderBillingCreationCSPResult,
TaskOrderBillingVerificationCSPPayload,
TaskOrderBillingVerificationCSPResult,
+ TenantAdminOwnershipCSPPayload,
+ TenantAdminOwnershipCSPResult,
TenantCSPPayload,
TenantCSPResult,
+ TenantPrincipalAppCSPPayload,
+ TenantPrincipalAppCSPResult,
+ TenantPrincipalCredentialCSPPayload,
+ TenantPrincipalCredentialCSPResult,
+ TenantPrincipalCSPPayload,
+ TenantPrincipalCSPResult,
+ TenantPrincipalOwnershipCSPPayload,
+ TenantPrincipalOwnershipCSPResult,
)
-creds = {
- "home_tenant_id": "tenant_id",
- "client_id": "client_id",
- "secret_key": "secret_key",
-}
BILLING_ACCOUNT_NAME = "52865e4c-52e8-5a6c-da6b-c58f0814f06f:7ea5de9d-b8ce-4901-b1c5-d864320c7b03_2019-05-31"
@@ -98,8 +106,10 @@ MOCK_CREDS = {
}
-def mock_get_secret(azure, func):
- azure.get_secret = func
+def mock_get_secret(azure, val=None):
+ if val is None:
+ val = json.dumps(MOCK_CREDS)
+ azure.get_secret = lambda *a, **k: val
return azure
@@ -107,12 +117,12 @@ def mock_get_secret(azure, func):
def test_create_application_succeeds(mock_azure: AzureCloudProvider):
application = ApplicationFactory.create()
mock_management_group_create(mock_azure, {"id": "Test Id"})
-
- mock_azure = mock_get_secret(mock_azure, lambda *a, **k: json.dumps(MOCK_CREDS))
+ mock_azure = mock_get_secret(mock_azure)
payload = ApplicationCSPPayload(
tenant_id="1234", display_name=application.name, parent_id=str(uuid4())
)
+
result = mock_azure.create_application(payload)
assert result.id == "Test Id"
@@ -158,10 +168,6 @@ def test_create_policy_definition_succeeds(mock_azure: AzureCloudProvider):
def test_create_tenant(mock_azure: AzureCloudProvider):
- mock_azure.sdk.adal.AuthenticationContext.return_value.context.acquire_token_with_client_credentials.return_value = {
- "accessToken": "TOKEN"
- }
-
mock_result = Mock()
mock_result.json.return_value = {
"objectId": "0a5f4926-e3ee-4f47-a6e3-8b0a30a40e3d",
@@ -172,7 +178,6 @@ def test_create_tenant(mock_azure: AzureCloudProvider):
mock_azure.sdk.requests.post.return_value = mock_result
payload = TenantCSPPayload(
**dict(
- creds=creds,
user_id="admin",
password="JediJan13$coot", # pragma: allowlist secret
domain_name="jediccpospawnedtenant2",
@@ -182,6 +187,7 @@ def test_create_tenant(mock_azure: AzureCloudProvider):
password_recovery_email_address="thomas@promptworks.com",
)
)
+ mock_azure = mock_get_secret(mock_azure)
result = mock_azure.create_tenant(payload)
body: TenantCSPResult = result.get("body")
assert body.tenant_id == "60ff9d34-82bf-4f21-b565-308ef0533435"
@@ -209,7 +215,6 @@ def test_create_billing_profile_creation(mock_azure: AzureCloudProvider):
country="US",
postal_code="19109",
),
- creds=creds,
tenant_id="60ff9d34-82bf-4f21-b565-308ef0533435",
billing_profile_display_name="Test Billing Profile",
billing_account_name=BILLING_ACCOUNT_NAME,
@@ -260,7 +265,7 @@ def test_validate_billing_profile_creation(mock_azure: AzureCloudProvider):
payload = BillingProfileVerificationCSPPayload(
**dict(
- creds=creds,
+ tenant_id="60ff9d34-82bf-4f21-b565-308ef0533435",
billing_profile_verify_url="https://management.azure.com/providers/Microsoft.Billing/billingAccounts/7c89b735-b22b-55c0-ab5a-c624843e8bf6:de4416ce-acc6-44b1-8122-c87c4e903c91_2019-05-31/operationResults/createBillingProfile_478d5706-71f9-4a8b-8d4e-2cbaca27a668?api-version=2019-10-01-preview",
)
)
@@ -299,7 +304,6 @@ def test_create_billing_profile_tenant_access(mock_azure: AzureCloudProvider):
payload = BillingProfileTenantAccessCSPPayload(
**dict(
- creds=creds,
tenant_id="60ff9d34-82bf-4f21-b565-308ef0533435",
user_object_id="0a5f4926-e3ee-4f47-a6e3-8b0a30a40e3d",
billing_account_name="7c89b735-b22b-55c0-ab5a-c624843e8bf6:de4416ce-acc6-44b1-8122-c87c4e903c91_2019-05-31",
@@ -331,7 +335,7 @@ def test_create_task_order_billing_creation(mock_azure: AzureCloudProvider):
payload = TaskOrderBillingCreationCSPPayload(
**dict(
- creds=creds,
+ tenant_id="60ff9d34-82bf-4f21-b565-308ef0533435",
billing_account_name="7c89b735-b22b-55c0-ab5a-c624843e8bf6:de4416ce-acc6-44b1-8122-c87c4e903c91_2019-05-31",
billing_profile_name="KQWI-W2SU-BG7-TGB",
)
@@ -392,7 +396,7 @@ def test_create_task_order_billing_verification(mock_azure):
payload = TaskOrderBillingVerificationCSPPayload(
**dict(
- creds=creds,
+ tenant_id="60ff9d34-82bf-4f21-b565-308ef0533435",
task_order_billing_verify_url="https://management.azure.com/providers/Microsoft.Billing/billingAccounts/7c89b735-b22b-55c0-ab5a-c624843e8bf6:de4416ce-acc6-44b1-8122-c87c4e903c91_2019-05-31/operationResults/createBillingProfile_478d5706-71f9-4a8b-8d4e-2cbaca27a668?api-version=2019-10-01-preview",
)
)
@@ -427,7 +431,7 @@ def test_create_billing_instruction(mock_azure: AzureCloudProvider):
payload = BillingInstructionCSPPayload(
**dict(
- creds=creds,
+ tenant_id="60ff9d34-82bf-4f21-b565-308ef0533435",
initial_clin_amount=1000.00,
initial_clin_start_date="2020/1/1",
initial_clin_end_date="2020/3/1",
@@ -441,7 +445,6 @@ def test_create_billing_instruction(mock_azure: AzureCloudProvider):
body: BillingInstructionCSPResult = result.get("body")
assert body.reported_clin_name == "TO1:CLIN001"
-
def test_create_product_purchase(mock_azure: AzureCloudProvider):
mock_azure.sdk.adal.AuthenticationContext.return_value.context.acquire_token_with_client_credentials.return_value = {
"accessToken": "TOKEN"
@@ -458,7 +461,7 @@ def test_create_product_purchase(mock_azure: AzureCloudProvider):
payload = ProductPurchaseCSPPayload(
**dict(
- creds=creds,
+ tenant_id="6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
type="AADPremium",
sku="AADP1",
productProperties={
@@ -519,7 +522,7 @@ def test_create_product_purchase_verification(mock_azure):
payload = ProductPurchaseVerificationCSPPayload(
**dict(
- creds=creds,
+ tenant_id="6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
product_purchase_verify_url="https://management.azure.com/providers/Microsoft.Billing/billingAccounts/7c89b735-b22b-55c0-ab5a-c624843e8bf6:de4416ce-acc6-44b1-8122-c87c4e903c91_2019-05-31/operationResults/createBillingProfile_478d5706-71f9-4a8b-8d4e-2cbaca27a668?api-version=2019-10-01-preview",
)
)
@@ -527,3 +530,172 @@ def test_create_product_purchase_verification(mock_azure):
result = mock_azure.create_product_purchase_verification(payload)
body: ProductPurchaseVerificationCSPResult = result.get("body")
assert body.premium_purchase_date == "2020-01-30T18:57:05.981Z"
+
+def test_create_tenant_principal_app(mock_azure: AzureCloudProvider):
+ with patch.object(
+ AzureCloudProvider,
+ "_get_elevated_management_token",
+ wraps=mock_azure._get_elevated_management_token,
+ ) as get_elevated_management_token:
+ get_elevated_management_token.return_value = "my fake token"
+
+ mock_result = Mock()
+ mock_result.ok = True
+ mock_result.json.return_value = {"appId": "appId", "id": "id"}
+
+ mock_azure.sdk.requests.post.return_value = mock_result
+ mock_azure = mock_get_secret(mock_azure)
+
+ payload = TenantPrincipalAppCSPPayload(
+ **{"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4"}
+ )
+ result: TenantPrincipalAppCSPResult = mock_azure.create_tenant_principal_app(
+ payload
+ )
+
+ assert result.principal_app_id == "appId"
+
+
+def test_create_tenant_principal(mock_azure: AzureCloudProvider):
+ with patch.object(
+ AzureCloudProvider,
+ "_get_elevated_management_token",
+ wraps=mock_azure._get_elevated_management_token,
+ ) as get_elevated_management_token:
+ get_elevated_management_token.return_value = "my fake token"
+
+ mock_result = Mock()
+ mock_result.ok = True
+ mock_result.json.return_value = {"id": "principal_id"}
+
+ mock_azure.sdk.requests.post.return_value = mock_result
+ mock_azure = mock_get_secret(mock_azure)
+
+ payload = TenantPrincipalCSPPayload(
+ **{
+ "tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
+ "principal_app_id": "appId",
+ }
+ )
+
+ result: TenantPrincipalCSPResult = mock_azure.create_tenant_principal(payload)
+
+ assert result.principal_id == "principal_id"
+
+
+def test_create_tenant_principal_credential(mock_azure: AzureCloudProvider):
+ with patch.object(
+ AzureCloudProvider,
+ "_get_elevated_management_token",
+ wraps=mock_azure._get_elevated_management_token,
+ ) as get_elevated_management_token:
+ get_elevated_management_token.return_value = "my fake token"
+
+ mock_result = Mock()
+ mock_result.ok = True
+ mock_result.json.return_value = {"secretText": "new secret key"}
+
+ mock_azure.sdk.requests.post.return_value = mock_result
+
+ mock_azure = mock_get_secret(mock_azure)
+
+ payload = TenantPrincipalCredentialCSPPayload(
+ **{
+ "tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
+ "principal_app_id": "appId",
+ "principal_app_object_id": "appObjId",
+ }
+ )
+
+ result: TenantPrincipalCredentialCSPResult = mock_azure.create_tenant_principal_credential(
+ payload
+ )
+
+ assert result.principal_creds_established == True
+
+
+def test_create_admin_role_definition(mock_azure: AzureCloudProvider):
+ with patch.object(
+ AzureCloudProvider,
+ "_get_elevated_management_token",
+ wraps=mock_azure._get_elevated_management_token,
+ ) as get_elevated_management_token:
+ get_elevated_management_token.return_value = "my fake token"
+
+ mock_result = Mock()
+ mock_result.ok = True
+ mock_result.json.return_value = {
+ "value": [
+ {"id": "wrongid", "displayName": "Wrong Role"},
+ {"id": "id", "displayName": "Company Administrator"},
+ ]
+ }
+
+ mock_azure.sdk.requests.get.return_value = mock_result
+ mock_azure = mock_get_secret(mock_azure)
+
+ payload = AdminRoleDefinitionCSPPayload(
+ **{"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4"}
+ )
+
+ result: AdminRoleDefinitionCSPResult = mock_azure.create_admin_role_definition(
+ payload
+ )
+
+ assert result.admin_role_def_id == "id"
+
+
+def test_create_tenant_admin_ownership(mock_azure: AzureCloudProvider):
+ with patch.object(
+ AzureCloudProvider,
+ "_get_elevated_management_token",
+ wraps=mock_azure._get_elevated_management_token,
+ ) as get_elevated_management_token:
+ get_elevated_management_token.return_value = "my fake token"
+
+ mock_result = Mock()
+ mock_result.ok = True
+ mock_result.json.return_value = {"id": "id"}
+
+ mock_azure.sdk.requests.put.return_value = mock_result
+
+ payload = TenantAdminOwnershipCSPPayload(
+ **{
+ "tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
+ "user_object_id": "971efe4d-1e80-4e39-b3b9-4e5c63ad446d",
+ }
+ )
+
+ result: TenantAdminOwnershipCSPResult = mock_azure.create_tenant_admin_ownership(
+ payload
+ )
+
+ assert result.admin_owner_assignment_id == "id"
+
+
+def test_create_tenant_principal_ownership(mock_azure: AzureCloudProvider):
+ with patch.object(
+ AzureCloudProvider,
+ "_get_elevated_management_token",
+ wraps=mock_azure._get_elevated_management_token,
+ ) as get_elevated_management_token:
+ get_elevated_management_token.return_value = "my fake token"
+
+ mock_result = Mock()
+ mock_result.ok = True
+ mock_result.json.return_value = {"id": "id"}
+
+ mock_azure.sdk.requests.put.return_value = mock_result
+
+ payload = TenantPrincipalOwnershipCSPPayload(
+ **{
+ "tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
+ "principal_id": "971efe4d-1e80-4e39-b3b9-4e5c63ad446d",
+ }
+ )
+
+ result: TenantPrincipalOwnershipCSPResult = mock_azure.create_tenant_principal_ownership(
+ payload
+ )
+
+ assert result.principal_owner_assignment_id == "id"
diff --git a/tests/domain/test_portfolio_state_machine.py b/tests/domain/test_portfolio_state_machine.py
index cdd206b8..2c27cfd8 100644
--- a/tests/domain/test_portfolio_state_machine.py
+++ b/tests/domain/test_portfolio_state_machine.py
@@ -106,10 +106,15 @@ def test_fsm_transition_start(mock_cloud_provider, portfolio: Portfolio):
FSMStates.BILLING_INSTRUCTION_CREATED,
FSMStates.PRODUCT_PURCHASE_CREATED,
FSMStates.PRODUCT_PURCHASE_VERIFICATION_CREATED,
+ FSMStates.TENANT_PRINCIPAL_APP_CREATED,
+ FSMStates.TENANT_PRINCIPAL_CREATED,
+ FSMStates.TENANT_PRINCIPAL_CREDENTIAL_CREATED,
+ FSMStates.ADMIN_ROLE_DEFINITION_CREATED,
+ FSMStates.PRINCIPAL_ADMIN_ROLE_CREATED,
+ FSMStates.TENANT_ADMIN_OWNERSHIP_CREATED,
+ FSMStates.TENANT_PRINCIPAL_OWNERSHIP_CREATED,
]
- # Should source all creds for portfolio? might be easier to manage than per-step specific ones
- creds = {"username": "mock-cloud", "password": "shh"} # pragma: allowlist secret
if portfolio.csp_data is not None:
csp_data = portfolio.csp_data
else:
@@ -152,7 +157,7 @@ def test_fsm_transition_start(mock_cloud_provider, portfolio: Portfolio):
collected_data = dict(
list(csp_data.items()) + list(portfolio_data.items()) + list(config.items())
)
- sm.trigger_next_transition(creds=creds, csp_data=collected_data)
+ sm.trigger_next_transition(csp_data=collected_data)
assert sm.state == expected_state
if portfolio.csp_data is not None:
csp_data = portfolio.csp_data
diff --git a/tests/mock_azure.py b/tests/mock_azure.py
index 4f37848e..438ae855 100644
--- a/tests/mock_azure.py
+++ b/tests/mock_azure.py
@@ -9,6 +9,9 @@ AZURE_CONFIG = {
"AZURE_TENANT_ID": "MOCK",
"AZURE_POLICY_LOCATION": "policies",
"AZURE_VAULT_URL": "http://vault",
+ "POWERSHELL_CLIENT_ID": "MOCK",
+ "AZURE_OWNER_ROLE_DEF_ID": "MOCK",
+ "AZURE_GRAPH_RESOURCE": "MOCK",
}
AUTH_CREDENTIALS = {
@@ -48,6 +51,12 @@ def mock_credentials():
return Mock(spec=credentials)
+def mock_identity():
+ import azure.identity as identity
+
+ return Mock(spec=identity)
+
+
def mock_policy():
from azure.mgmt.resource import policy
@@ -72,15 +81,14 @@ def mock_secrets():
return Mock(spec=secrets)
-def mock_identity():
- import azure.identity as identity
+def mock_cloud_details():
+ from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD
- return Mock(spec=identity)
+ return AZURE_PUBLIC_CLOUD
class MockAzureSDK(object):
def __init__(self):
- from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD
self.subscription = mock_subscription()
self.authorization = mock_authorization()
@@ -89,11 +97,11 @@ class MockAzureSDK(object):
self.managementgroups = mock_managementgroups()
self.graphrbac = mock_graphrbac()
self.credentials = mock_credentials()
+ self.identity = mock_identity()
self.policy = mock_policy()
self.secrets = mock_secrets()
self.requests = mock_requests()
- # may change to a JEDI cloud
- self.cloud = AZURE_PUBLIC_CLOUD
+ self.cloud = mock_cloud_details()
self.identity = mock_identity()
diff --git a/tests/routes/applications/test_init.py b/tests/routes/applications/test_init.py
index 6691d5d9..f2f29318 100644
--- a/tests/routes/applications/test_init.py
+++ b/tests/routes/applications/test_init.py
@@ -15,7 +15,7 @@ def test_environment_access_with_env_role(client, user_session):
url_for("applications.access_environment", environment_id=environment.id)
)
assert response.status_code == 302
- assert "csp-environment-access" in response.location
+ assert "portal.azure.com" in response.location
def test_environment_access_with_no_role(client, user_session):
diff --git a/tests/routes/task_orders/test_new.py b/tests/routes/task_orders/test_new.py
index 8390e187..9929a992 100644
--- a/tests/routes/task_orders/test_new.py
+++ b/tests/routes/task_orders/test_new.py
@@ -458,3 +458,61 @@ def test_task_order_form_shows_errors(client, user_session, task_order):
body = response.data.decode()
assert "There were some errors" in body
assert "Not a valid decimal" in body
+
+
+def test_update_and_render_next_handles_previous_valid_data(
+ client, user_session, task_order
+):
+ user_session(task_order.portfolio.owner)
+ form_data = {"number": "0000000000000"}
+ original_number = task_order.number
+ response = client.post(
+ url_for(
+ "task_orders.submit_form_step_two_add_number",
+ task_order_id=task_order.id,
+ previous=True,
+ ),
+ data=form_data,
+ )
+ assert response.status_code == 302
+ assert task_order.number == "0000000000000"
+ assert task_order.number != original_number
+
+
+def test_update_and_render_next_handles_previous_invalid_data(
+ client, user_session, task_order
+):
+ clin_list = [
+ {
+ "jedi_clin_type": "JEDI_CLIN_1",
+ "number": "12312",
+ "start_date": "01/01/2020",
+ "end_date": "01/01/2021",
+ "obligated_amount": "5000",
+ "total_amount": "10000",
+ },
+ ]
+ TaskOrders.create_clins(task_order.id, clin_list)
+ assert len(task_order.clins) == 2
+
+ user_session(task_order.portfolio.owner)
+ form_data = {
+ "clins-0-jedi_clin_type": "JEDI_CLIN_1",
+ "clins-0-number": "12312",
+ "clins-0-start_date": "01/01/2020",
+ "clins-0-end_date": "01/01/2021",
+ "clins-0-obligated_amount": "5000",
+ "clins-0-total_amount": "10000",
+ "clins-1-jedi_clin_type": "JEDI_CLIN_1",
+ "clins-1-number": "1212",
+ }
+ response = client.post(
+ url_for(
+ "task_orders.submit_form_step_three_add_clins",
+ task_order_id=task_order.id,
+ previous=True,
+ ),
+ data=form_data,
+ )
+
+ assert len(task_order.clins) == 2
diff --git a/tests/routes/test_root.py b/tests/routes/test_root.py
index b06befaf..b012e87d 100644
--- a/tests/routes/test_root.py
+++ b/tests/routes/test_root.py
@@ -1,7 +1,36 @@
-from tests.factories import UserFactory
+from tests.factories import UserFactory, PortfolioFactory
+from atst.routes import match_url_pattern
def test_root_redirects_if_user_is_logged_in(client, user_session):
user_session(UserFactory.create())
response = client.get("/", follow_redirects=False)
assert "home" in response.location
+
+
+def test_match_url_pattern(client):
+
+ assert not match_url_pattern(None)
+ assert match_url_pattern("/home") == "/home"
+
+ portfolio = PortfolioFactory()
+ # matches a URL with an argument
+ assert (
+ match_url_pattern(f"/portfolios/{portfolio.id}") # /portfolios/