create billing owner
This commit is contained in:
parent
45dbf9454e
commit
23aeb77821
@ -779,7 +779,65 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
return PrincipalAdminRoleCSPResult(**response.json())
|
||||
|
||||
def create_billing_owner(self, payload: BillingOwnerCSPPayload):
|
||||
pass
|
||||
graph_token = self._get_tenant_principal_token(
|
||||
payload.tenant_id, resource=self.graph_resource
|
||||
)
|
||||
if graph_token is None:
|
||||
raise AuthenticationException(
|
||||
"Could not resolve graph token for tenant admin"
|
||||
)
|
||||
|
||||
# Step 1: Create an AAD identity for the user
|
||||
user_result = self._create_active_directory_user(graph_token, payload)
|
||||
# Step 2: Set the recovery email
|
||||
self._update_active_directory_user_email(graph_token, user_result.id, payload)
|
||||
# Step 3: Find the Billing Administrator role ID
|
||||
billing_admin_role_id = self._get_billing_owner_role(graph_token)
|
||||
# Step 4: Assign the Billing Administrator role to the new user
|
||||
self._assign_billing_owner_role(
|
||||
graph_token, billing_admin_role_id, user_result.id
|
||||
)
|
||||
|
||||
return BillingOwnerCSPResult(id=user_result.id)
|
||||
|
||||
def _assign_billing_owner_role(self, graph_token, billing_admin_role_id, user_id):
|
||||
request_body = {
|
||||
"roleDefinitionId": billing_admin_role_id,
|
||||
"principalId": user_id,
|
||||
"resourceScope": "/",
|
||||
}
|
||||
|
||||
auth_header = {
|
||||
"Authorization": f"Bearer {graph_token}",
|
||||
}
|
||||
|
||||
url = f"{self.graph_resource}/beta/roleManagement/directory/roleAssignments"
|
||||
|
||||
response = self.sdk.requests.post(url, headers=auth_header, json=request_body)
|
||||
|
||||
if response.ok:
|
||||
return True
|
||||
else:
|
||||
raise UserProvisioningException("Could not assign billing admin role")
|
||||
|
||||
def _get_billing_owner_role(self, graph_token):
|
||||
auth_header = {
|
||||
"Authorization": f"Bearer {graph_token}",
|
||||
}
|
||||
|
||||
url = f"{self.graph_resource}/v1.0/directoryRoles"
|
||||
|
||||
response = self.sdk.requests.get(url, headers=auth_header)
|
||||
|
||||
if response.ok:
|
||||
result = response.json()
|
||||
for role in result["value"]:
|
||||
if role["displayName"] == "Billing Administrator":
|
||||
return role["id"]
|
||||
else:
|
||||
raise UserProvisioningException(
|
||||
"Could not find Billing Administrator role ID; role may not be enabled."
|
||||
)
|
||||
|
||||
def force_tenant_admin_pw_update(self, creds, tenant_owner_id):
|
||||
# use creds to update to force password recovery?
|
||||
@ -861,7 +919,7 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
|
||||
return result
|
||||
|
||||
def _create_active_directory_user(self, graph_token, payload: UserCSPPayload):
|
||||
def _create_active_directory_user(self, graph_token, payload):
|
||||
request_body = {
|
||||
"accountEnabled": True,
|
||||
"displayName": payload.display_name,
|
||||
@ -886,9 +944,7 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
else:
|
||||
raise UserProvisioningException(f"Failed to create user: {response.json()}")
|
||||
|
||||
def _update_active_directory_user_email(
|
||||
self, graph_token, user_id, payload: UserCSPPayload
|
||||
):
|
||||
def _update_active_directory_user_email(self, graph_token, user_id, payload):
|
||||
request_body = {"otherMails": [payload.email]}
|
||||
|
||||
auth_header = {
|
||||
|
@ -566,15 +566,10 @@ class BillingOwnerCSPPayload(BaseCSPPayload, UserMixin):
|
||||
needed for user provisioning.
|
||||
"""
|
||||
|
||||
first_name: str
|
||||
last_name: str
|
||||
display_name = "billing_admin"
|
||||
domain_name: str
|
||||
password_recovery_email_address: str
|
||||
|
||||
@property
|
||||
def display_name(self):
|
||||
return f"{self.first_name} {self.last_name}"
|
||||
|
||||
@property
|
||||
def tenant_host_name(self):
|
||||
return self.domain_name
|
||||
|
@ -16,6 +16,7 @@ from atst.domain.csp.cloud.models import (
|
||||
ApplicationCSPResult,
|
||||
BillingInstructionCSPPayload,
|
||||
BillingInstructionCSPResult,
|
||||
BillingOwnerCSPPayload,
|
||||
BillingProfileCreationCSPPayload,
|
||||
BillingProfileCreationCSPResult,
|
||||
BillingProfileTenantAccessCSPPayload,
|
||||
@ -988,3 +989,46 @@ def test_create_user_role_failure(mock_azure: AzureCloudProvider):
|
||||
|
||||
with pytest.raises(UserProvisioningException):
|
||||
mock_azure.create_user_role(payload)
|
||||
|
||||
|
||||
def test_create_billing_owner(mock_azure: AzureCloudProvider):
|
||||
with patch.object(
|
||||
AzureCloudProvider,
|
||||
"_get_tenant_principal_token",
|
||||
wraps=mock_azure._get_tenant_principal_token,
|
||||
) as _get_tenant_principal_token:
|
||||
_get_tenant_principal_token.return_value = "token"
|
||||
|
||||
final_result = "1-2-3"
|
||||
|
||||
# create_billing_owner does: POST, PATCH, GET, POST
|
||||
|
||||
def make_mock_result(return_value=None):
|
||||
mock_result_create = Mock()
|
||||
mock_result_create.ok = True
|
||||
mock_result_create.json.return_value = return_value
|
||||
|
||||
return mock_result_create
|
||||
|
||||
post_results = [make_mock_result({"id": final_result}), make_mock_result()]
|
||||
|
||||
mock_post = lambda *a, **k: post_results.pop(0)
|
||||
|
||||
# mock POST so that it pops off results in the order we want
|
||||
mock_azure.sdk.requests.post = mock_post
|
||||
# return value for PATCH doesn't matter much
|
||||
mock_azure.sdk.requests.patch.return_value = make_mock_result()
|
||||
# return value for GET needs to be a JSON object with a list of role definitions
|
||||
mock_azure.sdk.requests.get.return_value = make_mock_result(
|
||||
{"value": [{"displayName": "Billing Administrator", "id": "4567"}]}
|
||||
)
|
||||
|
||||
payload = BillingOwnerCSPPayload(
|
||||
tenant_id=uuid4().hex,
|
||||
domain_name="rebelalliance",
|
||||
password_recovery_email_address="many@bothans.org",
|
||||
)
|
||||
|
||||
result = mock_azure.create_billing_owner(payload)
|
||||
|
||||
assert result.id == final_result
|
||||
|
@ -127,15 +127,13 @@ def test_UserCSPPayload_password():
|
||||
class TestBillingOwnerCSPPayload:
|
||||
user_payload = {
|
||||
"tenant_id": "123",
|
||||
"first_name": "Han",
|
||||
"last_name": "Solo",
|
||||
"domain_name": "rebelalliance",
|
||||
"password_recovery_email_address": "han@moseisley.cantina",
|
||||
}
|
||||
|
||||
def test_display_name(self):
|
||||
payload = BillingOwnerCSPPayload(**self.user_payload)
|
||||
assert payload.display_name == "Han Solo"
|
||||
assert payload.display_name == "billing_admin"
|
||||
|
||||
def test_tenant_host_name(self):
|
||||
payload = BillingOwnerCSPPayload(**self.user_payload)
|
||||
@ -143,7 +141,7 @@ class TestBillingOwnerCSPPayload:
|
||||
|
||||
def test_mail_nickname(self):
|
||||
payload = BillingOwnerCSPPayload(**self.user_payload)
|
||||
assert payload.mail_nickname == "han.solo"
|
||||
assert payload.mail_nickname == "billing_admin"
|
||||
|
||||
def test_password(self):
|
||||
payload = BillingOwnerCSPPayload(**self.user_payload)
|
||||
@ -151,7 +149,10 @@ class TestBillingOwnerCSPPayload:
|
||||
|
||||
def test_user_principal_name(self):
|
||||
payload = BillingOwnerCSPPayload(**self.user_payload)
|
||||
assert payload.user_principal_name == f"han.solo@rebelalliance.onmicrosoft.com"
|
||||
assert (
|
||||
payload.user_principal_name
|
||||
== f"billing_admin@rebelalliance.onmicrosoft.com"
|
||||
)
|
||||
|
||||
def test_email(self):
|
||||
payload = BillingOwnerCSPPayload(**self.user_payload)
|
||||
|
Loading…
x
Reference in New Issue
Block a user