diff --git a/atst/domain/csp/cloud/azure_cloud_provider.py b/atst/domain/csp/cloud/azure_cloud_provider.py index 9df8051a..5d6e91d8 100644 --- a/atst/domain/csp/cloud/azure_cloud_provider.py +++ b/atst/domain/csp/cloud/azure_cloud_provider.py @@ -779,7 +779,65 @@ class AzureCloudProvider(CloudProviderInterface): return PrincipalAdminRoleCSPResult(**response.json()) def create_billing_owner(self, payload: BillingOwnerCSPPayload): - pass + graph_token = self._get_tenant_principal_token( + payload.tenant_id, resource=self.graph_resource + ) + if graph_token is None: + raise AuthenticationException( + "Could not resolve graph token for tenant admin" + ) + + # Step 1: Create an AAD identity for the user + user_result = self._create_active_directory_user(graph_token, payload) + # Step 2: Set the recovery email + self._update_active_directory_user_email(graph_token, user_result.id, payload) + # Step 3: Find the Billing Administrator role ID + billing_admin_role_id = self._get_billing_owner_role(graph_token) + # Step 4: Assign the Billing Administrator role to the new user + self._assign_billing_owner_role( + graph_token, billing_admin_role_id, user_result.id + ) + + return BillingOwnerCSPResult(id=user_result.id) + + def _assign_billing_owner_role(self, graph_token, billing_admin_role_id, user_id): + request_body = { + "roleDefinitionId": billing_admin_role_id, + "principalId": user_id, + "resourceScope": "/", + } + + auth_header = { + "Authorization": f"Bearer {graph_token}", + } + + url = f"{self.graph_resource}/beta/roleManagement/directory/roleAssignments" + + response = self.sdk.requests.post(url, headers=auth_header, json=request_body) + + if response.ok: + return True + else: + raise UserProvisioningException("Could not assign billing admin role") + + def _get_billing_owner_role(self, graph_token): + auth_header = { + "Authorization": f"Bearer {graph_token}", + } + + url = f"{self.graph_resource}/v1.0/directoryRoles" + + response = self.sdk.requests.get(url, headers=auth_header) + + if response.ok: + result = response.json() + for role in result["value"]: + if role["displayName"] == "Billing Administrator": + return role["id"] + else: + raise UserProvisioningException( + "Could not find Billing Administrator role ID; role may not be enabled." + ) def force_tenant_admin_pw_update(self, creds, tenant_owner_id): # use creds to update to force password recovery? @@ -861,7 +919,7 @@ class AzureCloudProvider(CloudProviderInterface): return result - def _create_active_directory_user(self, graph_token, payload: UserCSPPayload): + def _create_active_directory_user(self, graph_token, payload): request_body = { "accountEnabled": True, "displayName": payload.display_name, @@ -886,9 +944,7 @@ class AzureCloudProvider(CloudProviderInterface): else: raise UserProvisioningException(f"Failed to create user: {response.json()}") - def _update_active_directory_user_email( - self, graph_token, user_id, payload: UserCSPPayload - ): + def _update_active_directory_user_email(self, graph_token, user_id, payload): request_body = {"otherMails": [payload.email]} auth_header = { diff --git a/atst/domain/csp/cloud/models.py b/atst/domain/csp/cloud/models.py index 140e39fc..b4920856 100644 --- a/atst/domain/csp/cloud/models.py +++ b/atst/domain/csp/cloud/models.py @@ -566,15 +566,10 @@ class BillingOwnerCSPPayload(BaseCSPPayload, UserMixin): needed for user provisioning. """ - first_name: str - last_name: str + display_name = "billing_admin" domain_name: str password_recovery_email_address: str - @property - def display_name(self): - return f"{self.first_name} {self.last_name}" - @property def tenant_host_name(self): return self.domain_name diff --git a/tests/domain/cloud/test_azure_csp.py b/tests/domain/cloud/test_azure_csp.py index 1d397257..d2ad9857 100644 --- a/tests/domain/cloud/test_azure_csp.py +++ b/tests/domain/cloud/test_azure_csp.py @@ -16,6 +16,7 @@ from atst.domain.csp.cloud.models import ( ApplicationCSPResult, BillingInstructionCSPPayload, BillingInstructionCSPResult, + BillingOwnerCSPPayload, BillingProfileCreationCSPPayload, BillingProfileCreationCSPResult, BillingProfileTenantAccessCSPPayload, @@ -988,3 +989,46 @@ def test_create_user_role_failure(mock_azure: AzureCloudProvider): with pytest.raises(UserProvisioningException): mock_azure.create_user_role(payload) + + +def test_create_billing_owner(mock_azure: AzureCloudProvider): + with patch.object( + AzureCloudProvider, + "_get_tenant_principal_token", + wraps=mock_azure._get_tenant_principal_token, + ) as _get_tenant_principal_token: + _get_tenant_principal_token.return_value = "token" + + final_result = "1-2-3" + + # create_billing_owner does: POST, PATCH, GET, POST + + def make_mock_result(return_value=None): + mock_result_create = Mock() + mock_result_create.ok = True + mock_result_create.json.return_value = return_value + + return mock_result_create + + post_results = [make_mock_result({"id": final_result}), make_mock_result()] + + mock_post = lambda *a, **k: post_results.pop(0) + + # mock POST so that it pops off results in the order we want + mock_azure.sdk.requests.post = mock_post + # return value for PATCH doesn't matter much + mock_azure.sdk.requests.patch.return_value = make_mock_result() + # return value for GET needs to be a JSON object with a list of role definitions + mock_azure.sdk.requests.get.return_value = make_mock_result( + {"value": [{"displayName": "Billing Administrator", "id": "4567"}]} + ) + + payload = BillingOwnerCSPPayload( + tenant_id=uuid4().hex, + domain_name="rebelalliance", + password_recovery_email_address="many@bothans.org", + ) + + result = mock_azure.create_billing_owner(payload) + + assert result.id == final_result diff --git a/tests/domain/cloud/test_models.py b/tests/domain/cloud/test_models.py index c0aedd50..ee352d0a 100644 --- a/tests/domain/cloud/test_models.py +++ b/tests/domain/cloud/test_models.py @@ -127,15 +127,13 @@ def test_UserCSPPayload_password(): class TestBillingOwnerCSPPayload: user_payload = { "tenant_id": "123", - "first_name": "Han", - "last_name": "Solo", "domain_name": "rebelalliance", "password_recovery_email_address": "han@moseisley.cantina", } def test_display_name(self): payload = BillingOwnerCSPPayload(**self.user_payload) - assert payload.display_name == "Han Solo" + assert payload.display_name == "billing_admin" def test_tenant_host_name(self): payload = BillingOwnerCSPPayload(**self.user_payload) @@ -143,7 +141,7 @@ class TestBillingOwnerCSPPayload: def test_mail_nickname(self): payload = BillingOwnerCSPPayload(**self.user_payload) - assert payload.mail_nickname == "han.solo" + assert payload.mail_nickname == "billing_admin" def test_password(self): payload = BillingOwnerCSPPayload(**self.user_payload) @@ -151,7 +149,10 @@ class TestBillingOwnerCSPPayload: def test_user_principal_name(self): payload = BillingOwnerCSPPayload(**self.user_payload) - assert payload.user_principal_name == f"han.solo@rebelalliance.onmicrosoft.com" + assert ( + payload.user_principal_name + == f"billing_admin@rebelalliance.onmicrosoft.com" + ) def test_email(self): payload = BillingOwnerCSPPayload(**self.user_payload)