diff --git a/atst/domain/csp/cloud/azure_cloud_provider.py b/atst/domain/csp/cloud/azure_cloud_provider.py index 8af2580a..73a99738 100644 --- a/atst/domain/csp/cloud/azure_cloud_provider.py +++ b/atst/domain/csp/cloud/azure_cloud_provider.py @@ -1,5 +1,4 @@ import re -import time from secrets import token_urlsafe from typing import Dict from uuid import uuid4 @@ -11,6 +10,8 @@ from atst.models.user import User from .cloud_provider_interface import CloudProviderInterface from .exceptions import AuthenticationException from .models import ( + AdminRoleDefinitionCSPPayload, + AdminRoleDefinitionCSPResult, BillingInstructionCSPPayload, BillingInstructionCSPResult, BillingProfileCreationCSPPayload, @@ -19,16 +20,27 @@ from .models import ( BillingProfileTenantAccessCSPResult, BillingProfileVerificationCSPPayload, BillingProfileVerificationCSPResult, + PrincipalAdminRoleCSPPayload, + PrincipalAdminRoleCSPResult, TaskOrderBillingCreationCSPPayload, TaskOrderBillingCreationCSPResult, TaskOrderBillingVerificationCSPPayload, TaskOrderBillingVerificationCSPResult, + TenantAdminOwnershipCSPPayload, + TenantAdminOwnershipCSPResult, TenantCSPPayload, TenantCSPResult, + TenantPrincipalAppCSPPayload, + TenantPrincipalAppCSPResult, + TenantPrincipalCredentialCSPPayload, + TenantPrincipalCredentialCSPResult, + TenantPrincipalCSPPayload, + TenantPrincipalCSPResult, + TenantPrincipalOwnershipCSPPayload, + TenantPrincipalOwnershipCSPResult, ) from .policy import AzurePolicyManager - SUBSCRIPTION_ID_REGEX = re.compile( "subscriptions\/([0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})", re.I, @@ -77,8 +89,9 @@ class AzureCloudProvider(CloudProviderInterface): self.secret_key = config["AZURE_SECRET_KEY"] self.tenant_id = config["AZURE_TENANT_ID"] self.vault_url = config["AZURE_VAULT_URL"] - # self.ps_client_id = config["POWERSHELL_CLIENT_ID"] - self.ps_client_id = "1950a258-227b-4e31-a9cf-717495945fc2" + self.ps_client_id = config["POWERSHELL_CLIENT_ID"] + self.owner_role_def_id = config["AZURE_OWNER_ROLE_DEF_ID"] + self.graph_resource = config["AZURE_GRAPH_RESOURCE"] if azure_sdk_provider is None: self.sdk = AzureSDKProvider() @@ -482,15 +495,13 @@ class AzureCloudProvider(CloudProviderInterface): else: return self._error(result.json()) - def assign_root_mg_ownership(self, payload): - import ipdb; ipdb.set_trace() - # elevate + def get_elevated_management_token(self, tenant_id): mgmt_token = self.get_tenant_admin_token( - payload.tenant_id, self.sdk.cloud.endpoints.resource_manager + tenant_id, self.sdk.cloud.endpoints.resource_manager ) if mgmt_token is None: raise AuthenticationException( - "Could not resolve management token for tenant admin" + "Failed to resolve management token for tenant admin" ) auth_header = { @@ -500,19 +511,19 @@ class AzureCloudProvider(CloudProviderInterface): result = self.sdk.requests.post(url, headers=auth_header) if not result.ok: - return False + raise AuthenticationException("Failed to elevate access") - # ----------- NEXT STEP: Root MGMT Group Ownership (tenant admin) ------------- - time.sleep(20) - # HARD CODED, MOVE TO CONFIG - ownerRoleId = '8e3af657-a8ff-443c-a75c-2fe8c4bcb635' + return mgmt_token - role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{ownerRoleId}" + def create_tenant_admin_ownership(self, payload: TenantAdminOwnershipCSPPayload): + mgmt_token = self.get_elevated_management_token(payload.tenant_id) + + role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{self.owner_role_def_id}" request_body = { "properties": { "roleDefinitionId": role_definition_id, - "principalId": payload.user_object_id + "principalId": payload.user_object_id, } } @@ -526,21 +537,21 @@ class AzureCloudProvider(CloudProviderInterface): response = self.sdk.requests.put(url, headers=auth_header, json=request_body) - if not response.ok: - return False + if response.ok: + return TenantAdminOwnershipCSPResult(**response.json()) - # ----------- NEXT STEP: Root MGMT Group Ownership (remote admin SP) ------------- - time.sleep(20) - # HARD CODED, MOVE TO CONFIG - ownerRoleId = '8e3af657-a8ff-443c-a75c-2fe8c4bcb635' + def create_tenant_principal_ownership( + self, payload: TenantPrincipalOwnershipCSPPayload + ): + mgmt_token = self.get_elevated_management_token(payload.tenant_id) # NOTE: the tenant_id is also the id of the root management group, once it is created - role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{ownerRoleId}" + role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{self.owner_role_def_id}" request_body = { "properties": { "roleDefinitionId": role_definition_id, - "principalId": payload.admin_principal_id + "principalId": payload.principal_id, } } @@ -554,15 +565,14 @@ class AzureCloudProvider(CloudProviderInterface): response = self.sdk.requests.put(url, headers=auth_header, json=request_body) - if not response.ok: - return False + if response.ok: + return TenantPrincipalOwnershipCSPResult(**response.json()) + def create_tenant_principal_app(self, payload: TenantPrincipalAppCSPPayload): - - def create_remote_admin(self, payload): - import ipdb; ipdb.set_trace() - GRAPH_RESOURCE = "https://graph.microsoft.com" - graph_token = self.get_tenant_admin_token(payload.tenant_id, GRAPH_RESOURCE) + graph_token = self.get_tenant_admin_token( + payload.tenant_id, self.graph_resource + ) if graph_token is None: raise AuthenticationException( "Could not resolve graph token for tenant admin" @@ -574,35 +584,45 @@ class AzureCloudProvider(CloudProviderInterface): "Authorization": f"Bearer {graph_token}", } - url = f"{GRAPH_RESOURCE}/v1.0/applications" + url = f"{self.graph_resource}/v1.0/applications" response = self.sdk.requests.post(url, json=request_body, headers=auth_header) - res = response.json() - result1 = {} if response.ok: - result1 = {"app_id": res.get("appId"), "object_id": res.get("id")} + return TenantPrincipalAppCSPResult(**response.json()) - # ---- SEPARATE STEP (Create associated Service Principal) ---------- - time.sleep(20) + def create_tenant_principal(self, payload: TenantPrincipalCSPPayload): + graph_token = self.get_tenant_admin_token( + payload.tenant_id, self.graph_resource + ) + if graph_token is None: + raise AuthenticationException( + "Could not resolve graph token for tenant admin" + ) - request_body = {"appId": result1.get("app_id")} + request_body = {"appId": payload.principal_app_id} auth_header = { "Authorization": f"Bearer {graph_token}", } - url = f"{GRAPH_RESOURCE}/beta/servicePrincipals" + url = f"{self.graph_resource}/beta/servicePrincipals" response = self.sdk.requests.post(url, json=request_body, headers=auth_header) - res = response.json() - result2 = {} if response.ok: - result2 = {"sp_id": res.get("id")} + return TenantPrincipalCSPResult(**response.json()) - # ---- SEPARATE STEP - Generate Creds (Client Secret)---------- - time.sleep(20) + def create_tenant_principal_credential( + self, payload: TenantPrincipalCredentialCSPPayload + ): + graph_token = self.get_tenant_admin_token( + payload.tenant_id, self.graph_resource + ) + if graph_token is None: + raise AuthenticationException( + "Could not resolve graph token for tenant admin" + ) request_body = { "passwordCredentials": [{"displayName": "ATAT Generated Password"}] @@ -612,43 +632,59 @@ class AzureCloudProvider(CloudProviderInterface): "Authorization": f"Bearer {graph_token}", } - # Uses OBJECT_ID of App Registration - url = ( - f"{GRAPH_RESOURCE}/v1.0/applications/{result1.get('object_id')}/addPassword" - ) + url = f"{self.graph_resource}/v1.0/applications/{payload.principal_app_object_id}/addPassword" response = self.sdk.requests.post(url, json=request_body, headers=auth_header) - result3 = {} - res = response.json() - if response.ok: - result3 = {"client_secret": res.get("secretText")} - # ---- SEPARATE STEP - Source Global Admin Role---------- + if response.ok: + return TenantPrincipalCredentialCSPResult( + principal_client_id=payload.principal_app_id, **response.json() + ) + + def create_admin_role_definition(self, payload: AdminRoleDefinitionCSPPayload): + graph_token = self.get_tenant_admin_token( + payload.tenant_id, self.graph_resource + ) + if graph_token is None: + raise AuthenticationException( + "Could not resolve graph token for tenant admin" + ) auth_header = { "Authorization": f"Bearer {graph_token}", } - # Uses OBJECT_ID of App Registration - url = f"{GRAPH_RESOURCE}/beta/roleManagement/directory/roleDefinitions" + url = f"{self.graph_resource}/beta/roleManagement/directory/roleDefinitions" response = self.sdk.requests.get(url, headers=auth_header) result = response.json() roleList = result.get("value") - admin_role_id = "794bb258-3e31-42ff-9ee4-731a72f62851" # May be hard coded? use for fall back - for role in roleList: - if role.get("displayName") == "Company Administrator": - admin_role_id = role.get("id") - break + DEFAULT_ADMIN_RD_ID = "794bb258-3e31-42ff-9ee4-731a72f62851" + admin_role_def_id = next( + ( + role.get("id") + for role in roleList + if role.get("displayName") == "Company Administrator" + ), + DEFAULT_ADMIN_RD_ID, + ) - # ---- SEPARATE STEP - Source Global Admin Role---------- - time.sleep(20) + return AdminRoleDefinitionCSPResult(admin_role_def_id=admin_role_def_id) + + def create_principal_admin_role(self, payload: PrincipalAdminRoleCSPPayload): + graph_token = self.get_tenant_admin_token( + payload.tenant_id, self.graph_resource + ) + if graph_token is None: + raise AuthenticationException( + "Could not resolve graph token for tenant admin" + ) request_body = { - "principalId": result2.get("sp_id"), - "roleDefinitionId": admin_role_id, + "principalId": payload.principal_id, + "roleDefinitionId": payload.admin_role_def_id, "resourceScope": "/", } @@ -656,14 +692,12 @@ class AzureCloudProvider(CloudProviderInterface): "Authorization": f"Bearer {graph_token}", } - url = f"{GRAPH_RESOURCE}/beta/roleManagement/directory/roleAssignments" + url = f"{self.graph_resource}/beta/roleManagement/directory/roleAssignments" response = self.sdk.requests.post(url, headers=auth_header, json=request_body) if response.ok: - return (result1, result2, result3) - - return False + return PrincipalAdminRoleCSPResult(**response.json()) def force_tenant_admin_pw_update(self, creds, tenant_owner_id): # use creds to update to force password recovery? @@ -736,7 +770,10 @@ class AzureCloudProvider(CloudProviderInterface): def get_tenant_admin_token(self, tenant_id, resource): creds = self.get_secret(tenant_id) return self._get_up_token_for_resource( - creds.get("admin_username"), creds.get("admin_password"), tenant_id, resource + creds.get("admin_username"), + creds.get("admin_password"), + tenant_id, + resource, ) def get_tenant_principal_token(self, tenant_id, resource): diff --git a/atst/domain/csp/cloud/mock_cloud_provider.py b/atst/domain/csp/cloud/mock_cloud_provider.py index a6c338b5..81147885 100644 --- a/atst/domain/csp/cloud/mock_cloud_provider.py +++ b/atst/domain/csp/cloud/mock_cloud_provider.py @@ -1,34 +1,46 @@ from uuid import uuid4 -from atst.domain.csp.cloud.exceptions import ( - BaselineProvisionException, - EnvironmentCreationException, - GeneralCSPException, - UserProvisioningException, - UserRemovalException, -) -from atst.domain.csp.cloud.models import BillingProfileTenantAccessCSPResult from .cloud_provider_interface import CloudProviderInterface from .exceptions import ( AuthenticationException, AuthorizationException, + BaselineProvisionException, ConnectionException, + EnvironmentCreationException, + GeneralCSPException, UnknownServerException, + UserProvisioningException, + UserRemovalException, ) from .models import ( + PrincipalAdminRoleCSPResult, + AdminRoleDefinitionCSPResult, + TenantAdminOwnershipCSPResult, + TenantPrincipalCSPResult, BillingInstructionCSPPayload, BillingInstructionCSPResult, BillingProfileCreationCSPPayload, BillingProfileCreationCSPResult, + BillingProfileTenantAccessCSPResult, BillingProfileVerificationCSPPayload, BillingProfileVerificationCSPResult, + PrincipalAdminRoleCSPPayload, + AdminRoleDefinitionCSPPayload, TaskOrderBillingCreationCSPPayload, TaskOrderBillingCreationCSPResult, TaskOrderBillingVerificationCSPPayload, TaskOrderBillingVerificationCSPResult, + TenantAdminOwnershipCSPPayload, TenantCSPPayload, TenantCSPResult, + TenantPrincipalAppCSPPayload, + TenantPrincipalAppCSPResult, + TenantPrincipalCredentialCSPPayload, + TenantPrincipalCredentialCSPResult, + TenantPrincipalCSPPayload, + TenantPrincipalOwnershipCSPPayload, + TenantPrincipalOwnershipCSPResult, ) @@ -117,7 +129,7 @@ class MockCloudProvider(CloudProviderInterface): payload is an instance of TenantCSPPayload data class """ - self._authorize(payload.creds) + self._authorize("admin") self._delay(1, 5) @@ -274,6 +286,70 @@ class MockCloudProvider(CloudProviderInterface): } ) + def create_tenant_admin_ownership(self, payload: TenantAdminOwnershipCSPPayload): + self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION) + self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION) + self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION) + + return TenantAdminOwnershipCSPResult(**dict(id="admin_owner_assignment_id")) + + def create_tenant_principal_ownership( + self, payload: TenantPrincipalOwnershipCSPPayload + ): + self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION) + self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION) + self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION) + + return TenantPrincipalOwnershipCSPResult( + **dict(id="principal_owner_assignment_id") + ) + + def create_tenant_principal_app(self, payload: TenantPrincipalAppCSPPayload): + self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION) + self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION) + self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION) + + return TenantPrincipalAppCSPResult( + **dict(appId="principal_app_id", id="principal_app_object_id") + ) + + def create_tenant_principal(self, payload: TenantPrincipalCSPPayload): + self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION) + self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION) + self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION) + + return TenantPrincipalCSPResult(**dict(id="principal_id")) + + def create_tenant_principal_credential( + self, payload: TenantPrincipalCredentialCSPPayload + ): + self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION) + self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION) + self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION) + + return TenantPrincipalCredentialCSPResult( + **dict( + secretText="principal_secret_key", + principal_client_id="principal_client_id", + ) + ) + + def create_admin_role_definition(self, payload: AdminRoleDefinitionCSPPayload): + self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION) + self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION) + self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION) + + return AdminRoleDefinitionCSPResult( + **dict(admin_role_def_id="admin_role_def_id") + ) + + def create_principal_admin_role(self, payload: PrincipalAdminRoleCSPPayload): + self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION) + self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION) + self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION) + + return PrincipalAdminRoleCSPResult(**dict(id="principal_assignment_id")) + def create_or_update_user(self, auth_credentials, user_info, csp_role_id): self._authorize(auth_credentials) diff --git a/atst/domain/csp/cloud/models.py b/atst/domain/csp/cloud/models.py index f6503445..9aa16883 100644 --- a/atst/domain/csp/cloud/models.py +++ b/atst/domain/csp/cloud/models.py @@ -223,3 +223,80 @@ class BillingInstructionCSPResult(AliasModel): "reported_clin_name": "name", } + +class TenantAdminOwnershipCSPPayload(BaseCSPPayload): + user_object_id: str + + +class TenantAdminOwnershipCSPResult(AliasModel): + admin_owner_assignment_id: str + + class Config: + fields = {"admin_owner_assignment_id": "id"} + + +class TenantPrincipalOwnershipCSPPayload(BaseCSPPayload): + principal_id: str + + +class TenantPrincipalOwnershipCSPResult(AliasModel): + principal_owner_assignment_id: str + + class Config: + fields = {"principal_owner_assignment_id": "id"} + + +class TenantPrincipalAppCSPPayload(BaseCSPPayload): + pass + + +class TenantPrincipalAppCSPResult(AliasModel): + principal_app_id: str + principal_app_object_id: str + + class Config: + fields = {"principal_app_id": "appId", "principal_app_object_id": "id"} + + +class TenantPrincipalCSPPayload(BaseCSPPayload): + principal_app_id: str + + +class TenantPrincipalCSPResult(AliasModel): + principal_id: str + + class Config: + fields = {"principal_id": "id"} + + +class TenantPrincipalCredentialCSPPayload(BaseCSPPayload): + principal_app_id: str + principal_app_object_id: str + + +class TenantPrincipalCredentialCSPResult(AliasModel): + principal_client_id: str + principal_secret_key: str + + class Config: + fields = {"principal_secret_key": "secretText"} + + +class AdminRoleDefinitionCSPPayload(BaseCSPPayload): + pass + + +class AdminRoleDefinitionCSPResult(AliasModel): + admin_role_def_id: str + + +class PrincipalAdminRoleCSPPayload(BaseCSPPayload): + principal_id: str + admin_role_def_id: str + + +class PrincipalAdminRoleCSPResult(AliasModel): + principal_assignment_id: str + + class Config: + fields = {"principal_assignment_id": "id"} diff --git a/atst/models/mixins/state_machines.py b/atst/models/mixins/state_machines.py index 37682e5b..889b5b8f 100644 --- a/atst/models/mixins/state_machines.py +++ b/atst/models/mixins/state_machines.py @@ -17,6 +17,13 @@ class AzureStages(Enum): TASK_ORDER_BILLING_CREATION = "task order billing creation" TASK_ORDER_BILLING_VERIFICATION = "task order billing verification" BILLING_INSTRUCTION = "billing instruction" + TENANT_PRINCIPAL_APP = "tenant principal application" + TENANT_PRINCIPAL = "tenant principal" + TENANT_PRINCIPAL_CREDENTIAL = "tenant principal credential" + ADMIN_ROLE_DEFINITION = "admin role definition" + PRINCIPAL_ADMIN_ROLE = "tenant principal admin" + TENANT_ADMIN_OWNERSHIP = "tenant admin ownership" + TENANT_PRINCIPAL_OWNERSHIP = "tenant principial ownership" def _build_csp_states(csp_stages): diff --git a/tests/domain/cloud/test_azure_csp.py b/tests/domain/cloud/test_azure_csp.py index 20476135..da40634d 100644 --- a/tests/domain/cloud/test_azure_csp.py +++ b/tests/domain/cloud/test_azure_csp.py @@ -6,6 +6,8 @@ from tests.mock_azure import AUTH_CREDENTIALS, mock_azure from atst.domain.csp.cloud import AzureCloudProvider from atst.domain.csp.cloud.models import ( + AdminRoleDefinitionCSPPayload, + AdminRoleDefinitionCSPResult, BaseCSPPayload, BillingInstructionCSPPayload, BillingInstructionCSPResult, @@ -19,8 +21,18 @@ from atst.domain.csp.cloud.models import ( TaskOrderBillingCreationCSPResult, TaskOrderBillingVerificationCSPPayload, TaskOrderBillingVerificationCSPResult, + TenantAdminOwnershipCSPPayload, + TenantAdminOwnershipCSPResult, TenantCSPPayload, TenantCSPResult, + TenantPrincipalAppCSPPayload, + TenantPrincipalAppCSPResult, + TenantPrincipalCredentialCSPPayload, + TenantPrincipalCredentialCSPResult, + TenantPrincipalCSPPayload, + TenantPrincipalCSPResult, + TenantPrincipalOwnershipCSPPayload, + TenantPrincipalOwnershipCSPResult, ) BILLING_ACCOUNT_NAME = "52865e4c-52e8-5a6c-da6b-c58f0814f06f:7ea5de9d-b8ce-4901-b1c5-d864320c7b03_2019-05-31" @@ -409,46 +421,167 @@ def test_create_billing_instruction(mock_azure: AzureCloudProvider): assert body.reported_clin_name == "TO1:CLIN001" -def test_admin_principal_creation(mock_azure: AzureCloudProvider): - # Auth As Tenant Admin - # Create App Registration - # Create Service Principal - # Create App Registration Password Credential - # Lookup global admin role - # Assign global admin role to Service Principal +def test_create_tenant_principal_app(mock_azure: AzureCloudProvider): with patch.object( - AzureCloudProvider, "get_secret", wraps=mock_azure.get_secret - ) as mock_get_secret: - mock_get_secret.return_value = { - "admin_username": "", - "admin_password": "", - } - payload = BaseCSPPayload( + AzureCloudProvider, + "get_elevated_management_token", + wraps=mock_azure.get_elevated_management_token, + ) as get_elevated_management_token: + get_elevated_management_token.return_value = "my fake token" + + mock_result = Mock() + mock_result.ok = True + mock_result.json.return_value = {"appId": "appId", "id": "id"} + + mock_azure.sdk.requests.post.return_value = mock_result + + payload = TenantPrincipalAppCSPPayload( **{"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4"} ) - result = mock_azure.create_remote_admin(payload) + result: TenantPrincipalAppCSPResult = mock_azure.create_tenant_principal_app( + payload + ) - print(result) + assert result.principal_app_id == "appId" -def test_admin_mg_ownership(mock_azure: AzureCloudProvider): +def test_create_tenant_principal(mock_azure: AzureCloudProvider): with patch.object( - AzureCloudProvider, "get_secret", wraps=mock_azure.get_secret - ) as mock_get_secret: - mock_get_secret.return_value = { - "admin_username": "", - "admin_password": "", - } - payload = TenantCSPResult( + AzureCloudProvider, + "get_elevated_management_token", + wraps=mock_azure.get_elevated_management_token, + ) as get_elevated_management_token: + get_elevated_management_token.return_value = "my fake token" + + mock_result = Mock() + mock_result.ok = True + mock_result.json.return_value = {"id": "principal_id"} + + mock_azure.sdk.requests.post.return_value = mock_result + + payload = TenantPrincipalCSPPayload( + **{ + "tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4", + "principal_app_id": "appId", + } + ) + + result: TenantPrincipalCSPResult = mock_azure.create_tenant_principal(payload) + + assert result.principal_id == "principal_id" + + +def test_create_tenant_principal_credential(mock_azure: AzureCloudProvider): + with patch.object( + AzureCloudProvider, + "get_elevated_management_token", + wraps=mock_azure.get_elevated_management_token, + ) as get_elevated_management_token: + get_elevated_management_token.return_value = "my fake token" + + mock_result = Mock() + mock_result.ok = True + mock_result.json.return_value = {"secretText": "new secret key"} + + mock_azure.sdk.requests.post.return_value = mock_result + + payload = TenantPrincipalCredentialCSPPayload( + **{ + "tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4", + "principal_app_id": "appId", + "principal_app_object_id": "appObjId", + } + ) + + result: TenantPrincipalCredentialCSPResult = mock_azure.create_tenant_principal_credential( + payload + ) + + assert result.principal_secret_key == "new secret key" + + +def test_create_admin_role_definition(mock_azure: AzureCloudProvider): + with patch.object( + AzureCloudProvider, + "get_elevated_management_token", + wraps=mock_azure.get_elevated_management_token, + ) as get_elevated_management_token: + get_elevated_management_token.return_value = "my fake token" + + mock_result = Mock() + mock_result.ok = True + mock_result.json.return_value = { + "value": [ + {"id": "wrongid", "displayName": "Wrong Role"}, + {"id": "id", "displayName": "Company Administrator"}, + ] + } + + mock_azure.sdk.requests.get.return_value = mock_result + + payload = AdminRoleDefinitionCSPPayload( + **{"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4"} + ) + + result: AdminRoleDefinitionCSPResult = mock_azure.create_admin_role_definition( + payload + ) + + assert result.admin_role_def_id == "id" + + +def test_create_tenant_admin_ownership(mock_azure: AzureCloudProvider): + with patch.object( + AzureCloudProvider, + "get_elevated_management_token", + wraps=mock_azure.get_elevated_management_token, + ) as get_elevated_management_token: + get_elevated_management_token.return_value = "my fake token" + + mock_result = Mock() + mock_result.ok = True + mock_result.json.return_value = {"id": "id"} + + mock_azure.sdk.requests.put.return_value = mock_result + + payload = TenantAdminOwnershipCSPPayload( **{ - "user_id": "blach", "tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4", "user_object_id": "971efe4d-1e80-4e39-b3b9-4e5c63ad446d", } ) - result = mock_azure.assign_root_mg_ownership(payload) + result: TenantAdminOwnershipCSPResult = mock_azure.create_tenant_admin_ownership( + payload + ) - print(result) + assert result.admin_owner_assignment_id == "id" + +def test_create_tenant_principal_ownership(mock_azure: AzureCloudProvider): + with patch.object( + AzureCloudProvider, + "get_elevated_management_token", + wraps=mock_azure.get_elevated_management_token, + ) as get_elevated_management_token: + get_elevated_management_token.return_value = "my fake token" + + mock_result = Mock() + mock_result.ok = True + mock_result.json.return_value = {"id": "id"} + + mock_azure.sdk.requests.put.return_value = mock_result + + payload = TenantPrincipalOwnershipCSPPayload( + **{ + "tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4", + "principal_id": "971efe4d-1e80-4e39-b3b9-4e5c63ad446d", + } + ) + + result: TenantPrincipalOwnershipCSPResult = mock_azure.create_tenant_principal_ownership( + payload + ) + + assert result.principal_owner_assignment_id == "id" diff --git a/tests/domain/test_portfolio_state_machine.py b/tests/domain/test_portfolio_state_machine.py index 44d1382b..9480c8a0 100644 --- a/tests/domain/test_portfolio_state_machine.py +++ b/tests/domain/test_portfolio_state_machine.py @@ -104,6 +104,13 @@ def test_fsm_transition_start(mock_cloud_provider, portfolio: Portfolio): FSMStates.TASK_ORDER_BILLING_CREATION_CREATED, FSMStates.TASK_ORDER_BILLING_VERIFICATION_CREATED, FSMStates.BILLING_INSTRUCTION_CREATED, + FSMStates.TENANT_PRINCIPAL_APP_CREATED, + FSMStates.TENANT_PRINCIPAL_CREATED, + FSMStates.TENANT_PRINCIPAL_CREDENTIAL_CREATED, + FSMStates.ADMIN_ROLE_DEFINITION_CREATED, + FSMStates.PRINCIPAL_ADMIN_ROLE_CREATED, + FSMStates.TENANT_ADMIN_OWNERSHIP_CREATED, + FSMStates.TENANT_PRINCIPAL_OWNERSHIP_CREATED, ] if portfolio.csp_data is not None: diff --git a/tests/mock_azure.py b/tests/mock_azure.py index 9963f2d9..6818925e 100644 --- a/tests/mock_azure.py +++ b/tests/mock_azure.py @@ -9,6 +9,9 @@ AZURE_CONFIG = { "AZURE_TENANT_ID": "MOCK", "AZURE_POLICY_LOCATION": "policies", "AZURE_VAULT_URL": "http://vault", + "POWERSHELL_CLIENT_ID": "MOCK", + "AZURE_OWNER_ROLE_DEF_ID": "MOCK", + "AZURE_GRAPH_RESOURCE": "MOCK", } AUTH_CREDENTIALS = { @@ -63,15 +66,13 @@ def mock_policy(): def mock_adal(): import adal - return adal - # return Mock(spec=adal) + return Mock(spec=adal) def mock_requests(): import requests - # return Mock(spec=requests) - return requests + return Mock(spec=requests) def mock_secrets():