diff --git a/atst/domain/csp/cloud/azure_cloud_provider.py b/atst/domain/csp/cloud/azure_cloud_provider.py index 703b5635..8af2580a 100644 --- a/atst/domain/csp/cloud/azure_cloud_provider.py +++ b/atst/domain/csp/cloud/azure_cloud_provider.py @@ -1,4 +1,5 @@ import re +import time from secrets import token_urlsafe from typing import Dict from uuid import uuid4 @@ -76,6 +77,8 @@ class AzureCloudProvider(CloudProviderInterface): self.secret_key = config["AZURE_SECRET_KEY"] self.tenant_id = config["AZURE_TENANT_ID"] self.vault_url = config["AZURE_VAULT_URL"] + # self.ps_client_id = config["POWERSHELL_CLIENT_ID"] + self.ps_client_id = "1950a258-227b-4e31-a9cf-717495945fc2" if azure_sdk_provider is None: self.sdk = AzureSDKProvider() @@ -479,21 +482,188 @@ class AzureCloudProvider(CloudProviderInterface): else: return self._error(result.json()) - def create_remote_admin(self, creds, tenant_details): - # create app/service principal within tenant, with name constructed from tenant details - # assign principal global admin - - # needs to call out to CLI with tenant owner username/password, prototyping for that underway - - # return identifier and creds to consumer for storage - response = {"clientId": "string", "secretKey": "string", "tenantId": "string"} - return self._ok( - { - "client_id": response["clientId"], - "secret_key": response["secret_key"], - "tenant_id": response["tenantId"], - } + def assign_root_mg_ownership(self, payload): + import ipdb; ipdb.set_trace() + # elevate + mgmt_token = self.get_tenant_admin_token( + payload.tenant_id, self.sdk.cloud.endpoints.resource_manager ) + if mgmt_token is None: + raise AuthenticationException( + "Could not resolve management token for tenant admin" + ) + + auth_header = { + "Authorization": f"Bearer {mgmt_token}", + } + url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Authorization/elevateAccess?api-version=2016-07-01" + result = self.sdk.requests.post(url, headers=auth_header) + + if not result.ok: + return False + + # ----------- NEXT STEP: Root MGMT Group Ownership (tenant admin) ------------- + time.sleep(20) + # HARD CODED, MOVE TO CONFIG + ownerRoleId = '8e3af657-a8ff-443c-a75c-2fe8c4bcb635' + + role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{ownerRoleId}" + + request_body = { + "properties": { + "roleDefinitionId": role_definition_id, + "principalId": payload.user_object_id + } + } + + auth_header = { + "Authorization": f"Bearer {mgmt_token}", + } + + assignment_guid = str(uuid4()) + + url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleAssignments/{assignment_guid}?api-version=2015-07-01" + + response = self.sdk.requests.put(url, headers=auth_header, json=request_body) + + if not response.ok: + return False + + # ----------- NEXT STEP: Root MGMT Group Ownership (remote admin SP) ------------- + time.sleep(20) + # HARD CODED, MOVE TO CONFIG + ownerRoleId = '8e3af657-a8ff-443c-a75c-2fe8c4bcb635' + + # NOTE: the tenant_id is also the id of the root management group, once it is created + role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{ownerRoleId}" + + request_body = { + "properties": { + "roleDefinitionId": role_definition_id, + "principalId": payload.admin_principal_id + } + } + + auth_header = { + "Authorization": f"Bearer {mgmt_token}", + } + + assignment_guid = str(uuid4()) + + url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleAssignments/{assignment_guid}?api-version=2015-07-01" + + response = self.sdk.requests.put(url, headers=auth_header, json=request_body) + + if not response.ok: + return False + + + + def create_remote_admin(self, payload): + import ipdb; ipdb.set_trace() + GRAPH_RESOURCE = "https://graph.microsoft.com" + graph_token = self.get_tenant_admin_token(payload.tenant_id, GRAPH_RESOURCE) + if graph_token is None: + raise AuthenticationException( + "Could not resolve graph token for tenant admin" + ) + + request_body = {"displayName": "ATAT Remote Admin"} + + auth_header = { + "Authorization": f"Bearer {graph_token}", + } + + url = f"{GRAPH_RESOURCE}/v1.0/applications" + + response = self.sdk.requests.post(url, json=request_body, headers=auth_header) + + res = response.json() + result1 = {} + if response.ok: + result1 = {"app_id": res.get("appId"), "object_id": res.get("id")} + + # ---- SEPARATE STEP (Create associated Service Principal) ---------- + time.sleep(20) + + request_body = {"appId": result1.get("app_id")} + + auth_header = { + "Authorization": f"Bearer {graph_token}", + } + + url = f"{GRAPH_RESOURCE}/beta/servicePrincipals" + + response = self.sdk.requests.post(url, json=request_body, headers=auth_header) + + res = response.json() + result2 = {} + if response.ok: + result2 = {"sp_id": res.get("id")} + + # ---- SEPARATE STEP - Generate Creds (Client Secret)---------- + time.sleep(20) + + request_body = { + "passwordCredentials": [{"displayName": "ATAT Generated Password"}] + } + + auth_header = { + "Authorization": f"Bearer {graph_token}", + } + + # Uses OBJECT_ID of App Registration + url = ( + f"{GRAPH_RESOURCE}/v1.0/applications/{result1.get('object_id')}/addPassword" + ) + + response = self.sdk.requests.post(url, json=request_body, headers=auth_header) + result3 = {} + res = response.json() + if response.ok: + result3 = {"client_secret": res.get("secretText")} + + # ---- SEPARATE STEP - Source Global Admin Role---------- + + auth_header = { + "Authorization": f"Bearer {graph_token}", + } + + # Uses OBJECT_ID of App Registration + url = f"{GRAPH_RESOURCE}/beta/roleManagement/directory/roleDefinitions" + + response = self.sdk.requests.get(url, headers=auth_header) + + result = response.json() + roleList = result.get("value") + + admin_role_id = "794bb258-3e31-42ff-9ee4-731a72f62851" # May be hard coded? use for fall back + for role in roleList: + if role.get("displayName") == "Company Administrator": + admin_role_id = role.get("id") + break + + # ---- SEPARATE STEP - Source Global Admin Role---------- + time.sleep(20) + + request_body = { + "principalId": result2.get("sp_id"), + "roleDefinitionId": admin_role_id, + "resourceScope": "/", + } + + auth_header = { + "Authorization": f"Bearer {graph_token}", + } + + url = f"{GRAPH_RESOURCE}/beta/roleManagement/directory/roleAssignments" + + response = self.sdk.requests.post(url, headers=auth_header, json=request_body) + + if response.ok: + return (result1, result2, result3) + + return False def force_tenant_admin_pw_update(self, creds, tenant_owner_id): # use creds to update to force password recovery? @@ -563,9 +733,21 @@ class AzureCloudProvider(CloudProviderInterface): if sub_id_match: return sub_id_match.group(1) - def get_tenant_principal_token(self, tenant_id): + def get_tenant_admin_token(self, tenant_id, resource): creds = self.get_secret(tenant_id) - return self._get_sp_token(creds) + return self._get_up_token_for_resource( + creds.get("admin_username"), creds.get("admin_password"), tenant_id, resource + ) + + def get_tenant_principal_token(self, tenant_id, resource): + # creds = self.get_secret(tenant_id) + # return self._get_up_token_for_resource( + # creds.get("admin_username"), + # creds.get("admin_password"), + # tenat_id, + # resource + # ) + pass def get_root_provisioning_token(self): return self._get_sp_token(self._root_creds) @@ -586,6 +768,19 @@ class AzureCloudProvider(CloudProviderInterface): return token_response.get("accessToken", None) + def _get_up_token_for_resource(self, username, password, tenant_id, resource): + + context = self.sdk.adal.AuthenticationContext( + f"{self.sdk.cloud.endpoints.active_directory}/{tenant_id}" + ) + + # TODO: handle failure states here + token_response = context.acquire_token_with_username_password( + resource, username, password, self.ps_client_id + ) + + return token_response.get("accessToken", None) + def _get_credential_obj(self, creds, resource=None): return self.sdk.credentials.ServicePrincipalCredentials( client_id=creds.get("client_id"), diff --git a/tests/domain/cloud/test_azure_csp.py b/tests/domain/cloud/test_azure_csp.py index 28a8dcf1..20476135 100644 --- a/tests/domain/cloud/test_azure_csp.py +++ b/tests/domain/cloud/test_azure_csp.py @@ -1,4 +1,4 @@ -from unittest.mock import Mock +from unittest.mock import Mock, patch from uuid import uuid4 from tests.factories import ApplicationFactory, EnvironmentFactory @@ -6,6 +6,7 @@ from tests.mock_azure import AUTH_CREDENTIALS, mock_azure from atst.domain.csp.cloud import AzureCloudProvider from atst.domain.csp.cloud.models import ( + BaseCSPPayload, BillingInstructionCSPPayload, BillingInstructionCSPResult, BillingProfileCreationCSPPayload, @@ -407,3 +408,47 @@ def test_create_billing_instruction(mock_azure: AzureCloudProvider): body: BillingInstructionCSPResult = result.get("body") assert body.reported_clin_name == "TO1:CLIN001" + +def test_admin_principal_creation(mock_azure: AzureCloudProvider): + # Auth As Tenant Admin + # Create App Registration + # Create Service Principal + # Create App Registration Password Credential + # Lookup global admin role + # Assign global admin role to Service Principal + with patch.object( + AzureCloudProvider, "get_secret", wraps=mock_azure.get_secret + ) as mock_get_secret: + mock_get_secret.return_value = { + "admin_username": "", + "admin_password": "", + } + payload = BaseCSPPayload( + **{"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4"} + ) + + result = mock_azure.create_remote_admin(payload) + + print(result) + + +def test_admin_mg_ownership(mock_azure: AzureCloudProvider): + with patch.object( + AzureCloudProvider, "get_secret", wraps=mock_azure.get_secret + ) as mock_get_secret: + mock_get_secret.return_value = { + "admin_username": "", + "admin_password": "", + } + payload = TenantCSPResult( + **{ + "user_id": "blach", + "tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4", + "user_object_id": "971efe4d-1e80-4e39-b3b9-4e5c63ad446d", + } + ) + + result = mock_azure.assign_root_mg_ownership(payload) + + print(result) + diff --git a/tests/mock_azure.py b/tests/mock_azure.py index 4a7aace3..9963f2d9 100644 --- a/tests/mock_azure.py +++ b/tests/mock_azure.py @@ -63,13 +63,15 @@ def mock_policy(): def mock_adal(): import adal - return Mock(spec=adal) + return adal + # return Mock(spec=adal) def mock_requests(): import requests - return Mock(spec=requests) + # return Mock(spec=requests) + return requests def mock_secrets():