Implement principal creation and admin elevation provisioning features.
This commit is contained in:
parent
144312863c
commit
d4dd581b7a
@ -1,5 +1,4 @@
|
||||
import re
|
||||
import time
|
||||
from secrets import token_urlsafe
|
||||
from typing import Dict
|
||||
from uuid import uuid4
|
||||
@ -11,6 +10,8 @@ from atst.models.user import User
|
||||
from .cloud_provider_interface import CloudProviderInterface
|
||||
from .exceptions import AuthenticationException
|
||||
from .models import (
|
||||
AdminRoleDefinitionCSPPayload,
|
||||
AdminRoleDefinitionCSPResult,
|
||||
BillingInstructionCSPPayload,
|
||||
BillingInstructionCSPResult,
|
||||
BillingProfileCreationCSPPayload,
|
||||
@ -19,16 +20,27 @@ from .models import (
|
||||
BillingProfileTenantAccessCSPResult,
|
||||
BillingProfileVerificationCSPPayload,
|
||||
BillingProfileVerificationCSPResult,
|
||||
PrincipalAdminRoleCSPPayload,
|
||||
PrincipalAdminRoleCSPResult,
|
||||
TaskOrderBillingCreationCSPPayload,
|
||||
TaskOrderBillingCreationCSPResult,
|
||||
TaskOrderBillingVerificationCSPPayload,
|
||||
TaskOrderBillingVerificationCSPResult,
|
||||
TenantAdminOwnershipCSPPayload,
|
||||
TenantAdminOwnershipCSPResult,
|
||||
TenantCSPPayload,
|
||||
TenantCSPResult,
|
||||
TenantPrincipalAppCSPPayload,
|
||||
TenantPrincipalAppCSPResult,
|
||||
TenantPrincipalCredentialCSPPayload,
|
||||
TenantPrincipalCredentialCSPResult,
|
||||
TenantPrincipalCSPPayload,
|
||||
TenantPrincipalCSPResult,
|
||||
TenantPrincipalOwnershipCSPPayload,
|
||||
TenantPrincipalOwnershipCSPResult,
|
||||
)
|
||||
from .policy import AzurePolicyManager
|
||||
|
||||
|
||||
SUBSCRIPTION_ID_REGEX = re.compile(
|
||||
"subscriptions\/([0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})",
|
||||
re.I,
|
||||
@ -77,8 +89,9 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
self.secret_key = config["AZURE_SECRET_KEY"]
|
||||
self.tenant_id = config["AZURE_TENANT_ID"]
|
||||
self.vault_url = config["AZURE_VAULT_URL"]
|
||||
# self.ps_client_id = config["POWERSHELL_CLIENT_ID"]
|
||||
self.ps_client_id = "1950a258-227b-4e31-a9cf-717495945fc2"
|
||||
self.ps_client_id = config["POWERSHELL_CLIENT_ID"]
|
||||
self.owner_role_def_id = config["AZURE_OWNER_ROLE_DEF_ID"]
|
||||
self.graph_resource = config["AZURE_GRAPH_RESOURCE"]
|
||||
|
||||
if azure_sdk_provider is None:
|
||||
self.sdk = AzureSDKProvider()
|
||||
@ -482,15 +495,13 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
else:
|
||||
return self._error(result.json())
|
||||
|
||||
def assign_root_mg_ownership(self, payload):
|
||||
import ipdb; ipdb.set_trace()
|
||||
# elevate
|
||||
def get_elevated_management_token(self, tenant_id):
|
||||
mgmt_token = self.get_tenant_admin_token(
|
||||
payload.tenant_id, self.sdk.cloud.endpoints.resource_manager
|
||||
tenant_id, self.sdk.cloud.endpoints.resource_manager
|
||||
)
|
||||
if mgmt_token is None:
|
||||
raise AuthenticationException(
|
||||
"Could not resolve management token for tenant admin"
|
||||
"Failed to resolve management token for tenant admin"
|
||||
)
|
||||
|
||||
auth_header = {
|
||||
@ -500,19 +511,19 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
result = self.sdk.requests.post(url, headers=auth_header)
|
||||
|
||||
if not result.ok:
|
||||
return False
|
||||
raise AuthenticationException("Failed to elevate access")
|
||||
|
||||
# ----------- NEXT STEP: Root MGMT Group Ownership (tenant admin) -------------
|
||||
time.sleep(20)
|
||||
# HARD CODED, MOVE TO CONFIG
|
||||
ownerRoleId = '8e3af657-a8ff-443c-a75c-2fe8c4bcb635'
|
||||
return mgmt_token
|
||||
|
||||
role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{ownerRoleId}"
|
||||
def create_tenant_admin_ownership(self, payload: TenantAdminOwnershipCSPPayload):
|
||||
mgmt_token = self.get_elevated_management_token(payload.tenant_id)
|
||||
|
||||
role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{self.owner_role_def_id}"
|
||||
|
||||
request_body = {
|
||||
"properties": {
|
||||
"roleDefinitionId": role_definition_id,
|
||||
"principalId": payload.user_object_id
|
||||
"principalId": payload.user_object_id,
|
||||
}
|
||||
}
|
||||
|
||||
@ -526,21 +537,21 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
|
||||
response = self.sdk.requests.put(url, headers=auth_header, json=request_body)
|
||||
|
||||
if not response.ok:
|
||||
return False
|
||||
if response.ok:
|
||||
return TenantAdminOwnershipCSPResult(**response.json())
|
||||
|
||||
# ----------- NEXT STEP: Root MGMT Group Ownership (remote admin SP) -------------
|
||||
time.sleep(20)
|
||||
# HARD CODED, MOVE TO CONFIG
|
||||
ownerRoleId = '8e3af657-a8ff-443c-a75c-2fe8c4bcb635'
|
||||
def create_tenant_principal_ownership(
|
||||
self, payload: TenantPrincipalOwnershipCSPPayload
|
||||
):
|
||||
mgmt_token = self.get_elevated_management_token(payload.tenant_id)
|
||||
|
||||
# NOTE: the tenant_id is also the id of the root management group, once it is created
|
||||
role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{ownerRoleId}"
|
||||
role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{self.owner_role_def_id}"
|
||||
|
||||
request_body = {
|
||||
"properties": {
|
||||
"roleDefinitionId": role_definition_id,
|
||||
"principalId": payload.admin_principal_id
|
||||
"principalId": payload.principal_id,
|
||||
}
|
||||
}
|
||||
|
||||
@ -554,15 +565,14 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
|
||||
response = self.sdk.requests.put(url, headers=auth_header, json=request_body)
|
||||
|
||||
if not response.ok:
|
||||
return False
|
||||
if response.ok:
|
||||
return TenantPrincipalOwnershipCSPResult(**response.json())
|
||||
|
||||
def create_tenant_principal_app(self, payload: TenantPrincipalAppCSPPayload):
|
||||
|
||||
|
||||
def create_remote_admin(self, payload):
|
||||
import ipdb; ipdb.set_trace()
|
||||
GRAPH_RESOURCE = "https://graph.microsoft.com"
|
||||
graph_token = self.get_tenant_admin_token(payload.tenant_id, GRAPH_RESOURCE)
|
||||
graph_token = self.get_tenant_admin_token(
|
||||
payload.tenant_id, self.graph_resource
|
||||
)
|
||||
if graph_token is None:
|
||||
raise AuthenticationException(
|
||||
"Could not resolve graph token for tenant admin"
|
||||
@ -574,35 +584,45 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
"Authorization": f"Bearer {graph_token}",
|
||||
}
|
||||
|
||||
url = f"{GRAPH_RESOURCE}/v1.0/applications"
|
||||
url = f"{self.graph_resource}/v1.0/applications"
|
||||
|
||||
response = self.sdk.requests.post(url, json=request_body, headers=auth_header)
|
||||
|
||||
res = response.json()
|
||||
result1 = {}
|
||||
if response.ok:
|
||||
result1 = {"app_id": res.get("appId"), "object_id": res.get("id")}
|
||||
return TenantPrincipalAppCSPResult(**response.json())
|
||||
|
||||
# ---- SEPARATE STEP (Create associated Service Principal) ----------
|
||||
time.sleep(20)
|
||||
def create_tenant_principal(self, payload: TenantPrincipalCSPPayload):
|
||||
graph_token = self.get_tenant_admin_token(
|
||||
payload.tenant_id, self.graph_resource
|
||||
)
|
||||
if graph_token is None:
|
||||
raise AuthenticationException(
|
||||
"Could not resolve graph token for tenant admin"
|
||||
)
|
||||
|
||||
request_body = {"appId": result1.get("app_id")}
|
||||
request_body = {"appId": payload.principal_app_id}
|
||||
|
||||
auth_header = {
|
||||
"Authorization": f"Bearer {graph_token}",
|
||||
}
|
||||
|
||||
url = f"{GRAPH_RESOURCE}/beta/servicePrincipals"
|
||||
url = f"{self.graph_resource}/beta/servicePrincipals"
|
||||
|
||||
response = self.sdk.requests.post(url, json=request_body, headers=auth_header)
|
||||
|
||||
res = response.json()
|
||||
result2 = {}
|
||||
if response.ok:
|
||||
result2 = {"sp_id": res.get("id")}
|
||||
return TenantPrincipalCSPResult(**response.json())
|
||||
|
||||
# ---- SEPARATE STEP - Generate Creds (Client Secret)----------
|
||||
time.sleep(20)
|
||||
def create_tenant_principal_credential(
|
||||
self, payload: TenantPrincipalCredentialCSPPayload
|
||||
):
|
||||
graph_token = self.get_tenant_admin_token(
|
||||
payload.tenant_id, self.graph_resource
|
||||
)
|
||||
if graph_token is None:
|
||||
raise AuthenticationException(
|
||||
"Could not resolve graph token for tenant admin"
|
||||
)
|
||||
|
||||
request_body = {
|
||||
"passwordCredentials": [{"displayName": "ATAT Generated Password"}]
|
||||
@ -612,43 +632,59 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
"Authorization": f"Bearer {graph_token}",
|
||||
}
|
||||
|
||||
# Uses OBJECT_ID of App Registration
|
||||
url = (
|
||||
f"{GRAPH_RESOURCE}/v1.0/applications/{result1.get('object_id')}/addPassword"
|
||||
)
|
||||
url = f"{self.graph_resource}/v1.0/applications/{payload.principal_app_object_id}/addPassword"
|
||||
|
||||
response = self.sdk.requests.post(url, json=request_body, headers=auth_header)
|
||||
result3 = {}
|
||||
res = response.json()
|
||||
if response.ok:
|
||||
result3 = {"client_secret": res.get("secretText")}
|
||||
|
||||
# ---- SEPARATE STEP - Source Global Admin Role----------
|
||||
if response.ok:
|
||||
return TenantPrincipalCredentialCSPResult(
|
||||
principal_client_id=payload.principal_app_id, **response.json()
|
||||
)
|
||||
|
||||
def create_admin_role_definition(self, payload: AdminRoleDefinitionCSPPayload):
|
||||
graph_token = self.get_tenant_admin_token(
|
||||
payload.tenant_id, self.graph_resource
|
||||
)
|
||||
if graph_token is None:
|
||||
raise AuthenticationException(
|
||||
"Could not resolve graph token for tenant admin"
|
||||
)
|
||||
|
||||
auth_header = {
|
||||
"Authorization": f"Bearer {graph_token}",
|
||||
}
|
||||
|
||||
# Uses OBJECT_ID of App Registration
|
||||
url = f"{GRAPH_RESOURCE}/beta/roleManagement/directory/roleDefinitions"
|
||||
url = f"{self.graph_resource}/beta/roleManagement/directory/roleDefinitions"
|
||||
|
||||
response = self.sdk.requests.get(url, headers=auth_header)
|
||||
|
||||
result = response.json()
|
||||
roleList = result.get("value")
|
||||
|
||||
admin_role_id = "794bb258-3e31-42ff-9ee4-731a72f62851" # May be hard coded? use for fall back
|
||||
for role in roleList:
|
||||
if role.get("displayName") == "Company Administrator":
|
||||
admin_role_id = role.get("id")
|
||||
break
|
||||
DEFAULT_ADMIN_RD_ID = "794bb258-3e31-42ff-9ee4-731a72f62851"
|
||||
admin_role_def_id = next(
|
||||
(
|
||||
role.get("id")
|
||||
for role in roleList
|
||||
if role.get("displayName") == "Company Administrator"
|
||||
),
|
||||
DEFAULT_ADMIN_RD_ID,
|
||||
)
|
||||
|
||||
# ---- SEPARATE STEP - Source Global Admin Role----------
|
||||
time.sleep(20)
|
||||
return AdminRoleDefinitionCSPResult(admin_role_def_id=admin_role_def_id)
|
||||
|
||||
def create_principal_admin_role(self, payload: PrincipalAdminRoleCSPPayload):
|
||||
graph_token = self.get_tenant_admin_token(
|
||||
payload.tenant_id, self.graph_resource
|
||||
)
|
||||
if graph_token is None:
|
||||
raise AuthenticationException(
|
||||
"Could not resolve graph token for tenant admin"
|
||||
)
|
||||
|
||||
request_body = {
|
||||
"principalId": result2.get("sp_id"),
|
||||
"roleDefinitionId": admin_role_id,
|
||||
"principalId": payload.principal_id,
|
||||
"roleDefinitionId": payload.admin_role_def_id,
|
||||
"resourceScope": "/",
|
||||
}
|
||||
|
||||
@ -656,14 +692,12 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
"Authorization": f"Bearer {graph_token}",
|
||||
}
|
||||
|
||||
url = f"{GRAPH_RESOURCE}/beta/roleManagement/directory/roleAssignments"
|
||||
url = f"{self.graph_resource}/beta/roleManagement/directory/roleAssignments"
|
||||
|
||||
response = self.sdk.requests.post(url, headers=auth_header, json=request_body)
|
||||
|
||||
if response.ok:
|
||||
return (result1, result2, result3)
|
||||
|
||||
return False
|
||||
return PrincipalAdminRoleCSPResult(**response.json())
|
||||
|
||||
def force_tenant_admin_pw_update(self, creds, tenant_owner_id):
|
||||
# use creds to update to force password recovery?
|
||||
@ -736,7 +770,10 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
def get_tenant_admin_token(self, tenant_id, resource):
|
||||
creds = self.get_secret(tenant_id)
|
||||
return self._get_up_token_for_resource(
|
||||
creds.get("admin_username"), creds.get("admin_password"), tenant_id, resource
|
||||
creds.get("admin_username"),
|
||||
creds.get("admin_password"),
|
||||
tenant_id,
|
||||
resource,
|
||||
)
|
||||
|
||||
def get_tenant_principal_token(self, tenant_id, resource):
|
||||
|
@ -1,34 +1,46 @@
|
||||
from uuid import uuid4
|
||||
|
||||
from atst.domain.csp.cloud.exceptions import (
|
||||
BaselineProvisionException,
|
||||
EnvironmentCreationException,
|
||||
GeneralCSPException,
|
||||
UserProvisioningException,
|
||||
UserRemovalException,
|
||||
)
|
||||
from atst.domain.csp.cloud.models import BillingProfileTenantAccessCSPResult
|
||||
|
||||
from .cloud_provider_interface import CloudProviderInterface
|
||||
from .exceptions import (
|
||||
AuthenticationException,
|
||||
AuthorizationException,
|
||||
BaselineProvisionException,
|
||||
ConnectionException,
|
||||
EnvironmentCreationException,
|
||||
GeneralCSPException,
|
||||
UnknownServerException,
|
||||
UserProvisioningException,
|
||||
UserRemovalException,
|
||||
)
|
||||
from .models import (
|
||||
PrincipalAdminRoleCSPResult,
|
||||
AdminRoleDefinitionCSPResult,
|
||||
TenantAdminOwnershipCSPResult,
|
||||
TenantPrincipalCSPResult,
|
||||
BillingInstructionCSPPayload,
|
||||
BillingInstructionCSPResult,
|
||||
BillingProfileCreationCSPPayload,
|
||||
BillingProfileCreationCSPResult,
|
||||
BillingProfileTenantAccessCSPResult,
|
||||
BillingProfileVerificationCSPPayload,
|
||||
BillingProfileVerificationCSPResult,
|
||||
PrincipalAdminRoleCSPPayload,
|
||||
AdminRoleDefinitionCSPPayload,
|
||||
TaskOrderBillingCreationCSPPayload,
|
||||
TaskOrderBillingCreationCSPResult,
|
||||
TaskOrderBillingVerificationCSPPayload,
|
||||
TaskOrderBillingVerificationCSPResult,
|
||||
TenantAdminOwnershipCSPPayload,
|
||||
TenantCSPPayload,
|
||||
TenantCSPResult,
|
||||
TenantPrincipalAppCSPPayload,
|
||||
TenantPrincipalAppCSPResult,
|
||||
TenantPrincipalCredentialCSPPayload,
|
||||
TenantPrincipalCredentialCSPResult,
|
||||
TenantPrincipalCSPPayload,
|
||||
TenantPrincipalOwnershipCSPPayload,
|
||||
TenantPrincipalOwnershipCSPResult,
|
||||
)
|
||||
|
||||
|
||||
@ -117,7 +129,7 @@ class MockCloudProvider(CloudProviderInterface):
|
||||
payload is an instance of TenantCSPPayload data class
|
||||
"""
|
||||
|
||||
self._authorize(payload.creds)
|
||||
self._authorize("admin")
|
||||
|
||||
self._delay(1, 5)
|
||||
|
||||
@ -274,6 +286,70 @@ class MockCloudProvider(CloudProviderInterface):
|
||||
}
|
||||
)
|
||||
|
||||
def create_tenant_admin_ownership(self, payload: TenantAdminOwnershipCSPPayload):
|
||||
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
|
||||
self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
|
||||
self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
|
||||
|
||||
return TenantAdminOwnershipCSPResult(**dict(id="admin_owner_assignment_id"))
|
||||
|
||||
def create_tenant_principal_ownership(
|
||||
self, payload: TenantPrincipalOwnershipCSPPayload
|
||||
):
|
||||
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
|
||||
self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
|
||||
self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
|
||||
|
||||
return TenantPrincipalOwnershipCSPResult(
|
||||
**dict(id="principal_owner_assignment_id")
|
||||
)
|
||||
|
||||
def create_tenant_principal_app(self, payload: TenantPrincipalAppCSPPayload):
|
||||
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
|
||||
self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
|
||||
self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
|
||||
|
||||
return TenantPrincipalAppCSPResult(
|
||||
**dict(appId="principal_app_id", id="principal_app_object_id")
|
||||
)
|
||||
|
||||
def create_tenant_principal(self, payload: TenantPrincipalCSPPayload):
|
||||
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
|
||||
self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
|
||||
self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
|
||||
|
||||
return TenantPrincipalCSPResult(**dict(id="principal_id"))
|
||||
|
||||
def create_tenant_principal_credential(
|
||||
self, payload: TenantPrincipalCredentialCSPPayload
|
||||
):
|
||||
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
|
||||
self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
|
||||
self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
|
||||
|
||||
return TenantPrincipalCredentialCSPResult(
|
||||
**dict(
|
||||
secretText="principal_secret_key",
|
||||
principal_client_id="principal_client_id",
|
||||
)
|
||||
)
|
||||
|
||||
def create_admin_role_definition(self, payload: AdminRoleDefinitionCSPPayload):
|
||||
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
|
||||
self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
|
||||
self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
|
||||
|
||||
return AdminRoleDefinitionCSPResult(
|
||||
**dict(admin_role_def_id="admin_role_def_id")
|
||||
)
|
||||
|
||||
def create_principal_admin_role(self, payload: PrincipalAdminRoleCSPPayload):
|
||||
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
|
||||
self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
|
||||
self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
|
||||
|
||||
return PrincipalAdminRoleCSPResult(**dict(id="principal_assignment_id"))
|
||||
|
||||
def create_or_update_user(self, auth_credentials, user_info, csp_role_id):
|
||||
self._authorize(auth_credentials)
|
||||
|
||||
|
@ -223,3 +223,80 @@ class BillingInstructionCSPResult(AliasModel):
|
||||
"reported_clin_name": "name",
|
||||
}
|
||||
|
||||
|
||||
class TenantAdminOwnershipCSPPayload(BaseCSPPayload):
|
||||
user_object_id: str
|
||||
|
||||
|
||||
class TenantAdminOwnershipCSPResult(AliasModel):
|
||||
admin_owner_assignment_id: str
|
||||
|
||||
class Config:
|
||||
fields = {"admin_owner_assignment_id": "id"}
|
||||
|
||||
|
||||
class TenantPrincipalOwnershipCSPPayload(BaseCSPPayload):
|
||||
principal_id: str
|
||||
|
||||
|
||||
class TenantPrincipalOwnershipCSPResult(AliasModel):
|
||||
principal_owner_assignment_id: str
|
||||
|
||||
class Config:
|
||||
fields = {"principal_owner_assignment_id": "id"}
|
||||
|
||||
|
||||
class TenantPrincipalAppCSPPayload(BaseCSPPayload):
|
||||
pass
|
||||
|
||||
|
||||
class TenantPrincipalAppCSPResult(AliasModel):
|
||||
principal_app_id: str
|
||||
principal_app_object_id: str
|
||||
|
||||
class Config:
|
||||
fields = {"principal_app_id": "appId", "principal_app_object_id": "id"}
|
||||
|
||||
|
||||
class TenantPrincipalCSPPayload(BaseCSPPayload):
|
||||
principal_app_id: str
|
||||
|
||||
|
||||
class TenantPrincipalCSPResult(AliasModel):
|
||||
principal_id: str
|
||||
|
||||
class Config:
|
||||
fields = {"principal_id": "id"}
|
||||
|
||||
|
||||
class TenantPrincipalCredentialCSPPayload(BaseCSPPayload):
|
||||
principal_app_id: str
|
||||
principal_app_object_id: str
|
||||
|
||||
|
||||
class TenantPrincipalCredentialCSPResult(AliasModel):
|
||||
principal_client_id: str
|
||||
principal_secret_key: str
|
||||
|
||||
class Config:
|
||||
fields = {"principal_secret_key": "secretText"}
|
||||
|
||||
|
||||
class AdminRoleDefinitionCSPPayload(BaseCSPPayload):
|
||||
pass
|
||||
|
||||
|
||||
class AdminRoleDefinitionCSPResult(AliasModel):
|
||||
admin_role_def_id: str
|
||||
|
||||
|
||||
class PrincipalAdminRoleCSPPayload(BaseCSPPayload):
|
||||
principal_id: str
|
||||
admin_role_def_id: str
|
||||
|
||||
|
||||
class PrincipalAdminRoleCSPResult(AliasModel):
|
||||
principal_assignment_id: str
|
||||
|
||||
class Config:
|
||||
fields = {"principal_assignment_id": "id"}
|
||||
|
@ -17,6 +17,13 @@ class AzureStages(Enum):
|
||||
TASK_ORDER_BILLING_CREATION = "task order billing creation"
|
||||
TASK_ORDER_BILLING_VERIFICATION = "task order billing verification"
|
||||
BILLING_INSTRUCTION = "billing instruction"
|
||||
TENANT_PRINCIPAL_APP = "tenant principal application"
|
||||
TENANT_PRINCIPAL = "tenant principal"
|
||||
TENANT_PRINCIPAL_CREDENTIAL = "tenant principal credential"
|
||||
ADMIN_ROLE_DEFINITION = "admin role definition"
|
||||
PRINCIPAL_ADMIN_ROLE = "tenant principal admin"
|
||||
TENANT_ADMIN_OWNERSHIP = "tenant admin ownership"
|
||||
TENANT_PRINCIPAL_OWNERSHIP = "tenant principial ownership"
|
||||
|
||||
|
||||
def _build_csp_states(csp_stages):
|
||||
|
@ -6,6 +6,8 @@ from tests.mock_azure import AUTH_CREDENTIALS, mock_azure
|
||||
|
||||
from atst.domain.csp.cloud import AzureCloudProvider
|
||||
from atst.domain.csp.cloud.models import (
|
||||
AdminRoleDefinitionCSPPayload,
|
||||
AdminRoleDefinitionCSPResult,
|
||||
BaseCSPPayload,
|
||||
BillingInstructionCSPPayload,
|
||||
BillingInstructionCSPResult,
|
||||
@ -19,8 +21,18 @@ from atst.domain.csp.cloud.models import (
|
||||
TaskOrderBillingCreationCSPResult,
|
||||
TaskOrderBillingVerificationCSPPayload,
|
||||
TaskOrderBillingVerificationCSPResult,
|
||||
TenantAdminOwnershipCSPPayload,
|
||||
TenantAdminOwnershipCSPResult,
|
||||
TenantCSPPayload,
|
||||
TenantCSPResult,
|
||||
TenantPrincipalAppCSPPayload,
|
||||
TenantPrincipalAppCSPResult,
|
||||
TenantPrincipalCredentialCSPPayload,
|
||||
TenantPrincipalCredentialCSPResult,
|
||||
TenantPrincipalCSPPayload,
|
||||
TenantPrincipalCSPResult,
|
||||
TenantPrincipalOwnershipCSPPayload,
|
||||
TenantPrincipalOwnershipCSPResult,
|
||||
)
|
||||
|
||||
BILLING_ACCOUNT_NAME = "52865e4c-52e8-5a6c-da6b-c58f0814f06f:7ea5de9d-b8ce-4901-b1c5-d864320c7b03_2019-05-31"
|
||||
@ -409,46 +421,167 @@ def test_create_billing_instruction(mock_azure: AzureCloudProvider):
|
||||
assert body.reported_clin_name == "TO1:CLIN001"
|
||||
|
||||
|
||||
def test_admin_principal_creation(mock_azure: AzureCloudProvider):
|
||||
# Auth As Tenant Admin
|
||||
# Create App Registration
|
||||
# Create Service Principal
|
||||
# Create App Registration Password Credential
|
||||
# Lookup global admin role
|
||||
# Assign global admin role to Service Principal
|
||||
def test_create_tenant_principal_app(mock_azure: AzureCloudProvider):
|
||||
with patch.object(
|
||||
AzureCloudProvider, "get_secret", wraps=mock_azure.get_secret
|
||||
) as mock_get_secret:
|
||||
mock_get_secret.return_value = {
|
||||
"admin_username": "",
|
||||
"admin_password": "",
|
||||
}
|
||||
payload = BaseCSPPayload(
|
||||
AzureCloudProvider,
|
||||
"get_elevated_management_token",
|
||||
wraps=mock_azure.get_elevated_management_token,
|
||||
) as get_elevated_management_token:
|
||||
get_elevated_management_token.return_value = "my fake token"
|
||||
|
||||
mock_result = Mock()
|
||||
mock_result.ok = True
|
||||
mock_result.json.return_value = {"appId": "appId", "id": "id"}
|
||||
|
||||
mock_azure.sdk.requests.post.return_value = mock_result
|
||||
|
||||
payload = TenantPrincipalAppCSPPayload(
|
||||
**{"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4"}
|
||||
)
|
||||
|
||||
result = mock_azure.create_remote_admin(payload)
|
||||
result: TenantPrincipalAppCSPResult = mock_azure.create_tenant_principal_app(
|
||||
payload
|
||||
)
|
||||
|
||||
print(result)
|
||||
assert result.principal_app_id == "appId"
|
||||
|
||||
|
||||
def test_admin_mg_ownership(mock_azure: AzureCloudProvider):
|
||||
def test_create_tenant_principal(mock_azure: AzureCloudProvider):
|
||||
with patch.object(
|
||||
AzureCloudProvider, "get_secret", wraps=mock_azure.get_secret
|
||||
) as mock_get_secret:
|
||||
mock_get_secret.return_value = {
|
||||
"admin_username": "",
|
||||
"admin_password": "",
|
||||
}
|
||||
payload = TenantCSPResult(
|
||||
AzureCloudProvider,
|
||||
"get_elevated_management_token",
|
||||
wraps=mock_azure.get_elevated_management_token,
|
||||
) as get_elevated_management_token:
|
||||
get_elevated_management_token.return_value = "my fake token"
|
||||
|
||||
mock_result = Mock()
|
||||
mock_result.ok = True
|
||||
mock_result.json.return_value = {"id": "principal_id"}
|
||||
|
||||
mock_azure.sdk.requests.post.return_value = mock_result
|
||||
|
||||
payload = TenantPrincipalCSPPayload(
|
||||
**{
|
||||
"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
|
||||
"principal_app_id": "appId",
|
||||
}
|
||||
)
|
||||
|
||||
result: TenantPrincipalCSPResult = mock_azure.create_tenant_principal(payload)
|
||||
|
||||
assert result.principal_id == "principal_id"
|
||||
|
||||
|
||||
def test_create_tenant_principal_credential(mock_azure: AzureCloudProvider):
|
||||
with patch.object(
|
||||
AzureCloudProvider,
|
||||
"get_elevated_management_token",
|
||||
wraps=mock_azure.get_elevated_management_token,
|
||||
) as get_elevated_management_token:
|
||||
get_elevated_management_token.return_value = "my fake token"
|
||||
|
||||
mock_result = Mock()
|
||||
mock_result.ok = True
|
||||
mock_result.json.return_value = {"secretText": "new secret key"}
|
||||
|
||||
mock_azure.sdk.requests.post.return_value = mock_result
|
||||
|
||||
payload = TenantPrincipalCredentialCSPPayload(
|
||||
**{
|
||||
"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
|
||||
"principal_app_id": "appId",
|
||||
"principal_app_object_id": "appObjId",
|
||||
}
|
||||
)
|
||||
|
||||
result: TenantPrincipalCredentialCSPResult = mock_azure.create_tenant_principal_credential(
|
||||
payload
|
||||
)
|
||||
|
||||
assert result.principal_secret_key == "new secret key"
|
||||
|
||||
|
||||
def test_create_admin_role_definition(mock_azure: AzureCloudProvider):
|
||||
with patch.object(
|
||||
AzureCloudProvider,
|
||||
"get_elevated_management_token",
|
||||
wraps=mock_azure.get_elevated_management_token,
|
||||
) as get_elevated_management_token:
|
||||
get_elevated_management_token.return_value = "my fake token"
|
||||
|
||||
mock_result = Mock()
|
||||
mock_result.ok = True
|
||||
mock_result.json.return_value = {
|
||||
"value": [
|
||||
{"id": "wrongid", "displayName": "Wrong Role"},
|
||||
{"id": "id", "displayName": "Company Administrator"},
|
||||
]
|
||||
}
|
||||
|
||||
mock_azure.sdk.requests.get.return_value = mock_result
|
||||
|
||||
payload = AdminRoleDefinitionCSPPayload(
|
||||
**{"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4"}
|
||||
)
|
||||
|
||||
result: AdminRoleDefinitionCSPResult = mock_azure.create_admin_role_definition(
|
||||
payload
|
||||
)
|
||||
|
||||
assert result.admin_role_def_id == "id"
|
||||
|
||||
|
||||
def test_create_tenant_admin_ownership(mock_azure: AzureCloudProvider):
|
||||
with patch.object(
|
||||
AzureCloudProvider,
|
||||
"get_elevated_management_token",
|
||||
wraps=mock_azure.get_elevated_management_token,
|
||||
) as get_elevated_management_token:
|
||||
get_elevated_management_token.return_value = "my fake token"
|
||||
|
||||
mock_result = Mock()
|
||||
mock_result.ok = True
|
||||
mock_result.json.return_value = {"id": "id"}
|
||||
|
||||
mock_azure.sdk.requests.put.return_value = mock_result
|
||||
|
||||
payload = TenantAdminOwnershipCSPPayload(
|
||||
**{
|
||||
"user_id": "blach",
|
||||
"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
|
||||
"user_object_id": "971efe4d-1e80-4e39-b3b9-4e5c63ad446d",
|
||||
}
|
||||
)
|
||||
|
||||
result = mock_azure.assign_root_mg_ownership(payload)
|
||||
result: TenantAdminOwnershipCSPResult = mock_azure.create_tenant_admin_ownership(
|
||||
payload
|
||||
)
|
||||
|
||||
print(result)
|
||||
assert result.admin_owner_assignment_id == "id"
|
||||
|
||||
|
||||
def test_create_tenant_principal_ownership(mock_azure: AzureCloudProvider):
|
||||
with patch.object(
|
||||
AzureCloudProvider,
|
||||
"get_elevated_management_token",
|
||||
wraps=mock_azure.get_elevated_management_token,
|
||||
) as get_elevated_management_token:
|
||||
get_elevated_management_token.return_value = "my fake token"
|
||||
|
||||
mock_result = Mock()
|
||||
mock_result.ok = True
|
||||
mock_result.json.return_value = {"id": "id"}
|
||||
|
||||
mock_azure.sdk.requests.put.return_value = mock_result
|
||||
|
||||
payload = TenantPrincipalOwnershipCSPPayload(
|
||||
**{
|
||||
"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
|
||||
"principal_id": "971efe4d-1e80-4e39-b3b9-4e5c63ad446d",
|
||||
}
|
||||
)
|
||||
|
||||
result: TenantPrincipalOwnershipCSPResult = mock_azure.create_tenant_principal_ownership(
|
||||
payload
|
||||
)
|
||||
|
||||
assert result.principal_owner_assignment_id == "id"
|
||||
|
@ -104,6 +104,13 @@ def test_fsm_transition_start(mock_cloud_provider, portfolio: Portfolio):
|
||||
FSMStates.TASK_ORDER_BILLING_CREATION_CREATED,
|
||||
FSMStates.TASK_ORDER_BILLING_VERIFICATION_CREATED,
|
||||
FSMStates.BILLING_INSTRUCTION_CREATED,
|
||||
FSMStates.TENANT_PRINCIPAL_APP_CREATED,
|
||||
FSMStates.TENANT_PRINCIPAL_CREATED,
|
||||
FSMStates.TENANT_PRINCIPAL_CREDENTIAL_CREATED,
|
||||
FSMStates.ADMIN_ROLE_DEFINITION_CREATED,
|
||||
FSMStates.PRINCIPAL_ADMIN_ROLE_CREATED,
|
||||
FSMStates.TENANT_ADMIN_OWNERSHIP_CREATED,
|
||||
FSMStates.TENANT_PRINCIPAL_OWNERSHIP_CREATED,
|
||||
]
|
||||
|
||||
if portfolio.csp_data is not None:
|
||||
|
@ -9,6 +9,9 @@ AZURE_CONFIG = {
|
||||
"AZURE_TENANT_ID": "MOCK",
|
||||
"AZURE_POLICY_LOCATION": "policies",
|
||||
"AZURE_VAULT_URL": "http://vault",
|
||||
"POWERSHELL_CLIENT_ID": "MOCK",
|
||||
"AZURE_OWNER_ROLE_DEF_ID": "MOCK",
|
||||
"AZURE_GRAPH_RESOURCE": "MOCK",
|
||||
}
|
||||
|
||||
AUTH_CREDENTIALS = {
|
||||
@ -63,15 +66,13 @@ def mock_policy():
|
||||
def mock_adal():
|
||||
import adal
|
||||
|
||||
return adal
|
||||
# return Mock(spec=adal)
|
||||
return Mock(spec=adal)
|
||||
|
||||
|
||||
def mock_requests():
|
||||
import requests
|
||||
|
||||
# return Mock(spec=requests)
|
||||
return requests
|
||||
return Mock(spec=requests)
|
||||
|
||||
|
||||
def mock_secrets():
|
||||
|
Loading…
x
Reference in New Issue
Block a user