azure initial management group creation

This commit is contained in:
Philip Kalinsky 2020-02-04 08:09:24 -05:00
parent f79af3ad62
commit 8c5f532ca1
6 changed files with 135 additions and 5 deletions

View File

@ -25,7 +25,10 @@ from .models import (
BillingProfileVerificationCSPPayload,
BillingProfileVerificationCSPResult,
KeyVaultCredentials,
ManagementGroupCSPPayload,
ManagementGroupCSPResponse,
ManagementGroupGetCSPPayload,
ManagementGroupGetCSPResponse,
ProductPurchaseCSPPayload,
ProductPurchaseCSPResult,
ProductPurchaseVerificationCSPPayload,
@ -209,6 +212,40 @@ class AzureCloudProvider(CloudProviderInterface):
return ApplicationCSPResult(**response)
def create_initial_mgmt_group(self, payload: ManagementGroupCSPPayload):
creds = self._source_creds(payload.tenant_id)
credentials = self._get_credential_obj(
{
"client_id": creds.root_sp_client_id,
"secret_key": creds.root_sp_key,
"tenant_id": creds.root_tenant_id,
},
resource=self.sdk.cloud.endpoints.resource_manager,
)
response = self._create_management_group(
credentials, payload.management_group_name, payload.display_name,
)
return ManagementGroupCSPResponse(**response)
def create_initial_mgmt_group_verification(
self, payload: ManagementGroupGetCSPPayload
):
creds = self._source_creds(payload.tenant_id)
credentials = self._get_credential_obj(
{
"client_id": creds.root_sp_client_id,
"secret_key": creds.root_sp_key,
"tenant_id": creds.root_tenant_id,
},
resource=self.sdk.cloud.endpoints.resource_manager,
)
response = self._get_management_group(
credentials, payload.management_group_name,
)
return ManagementGroupGetCSPResponse(**response.result())
def _create_management_group(
self, credentials, management_group_id, display_name, parent_id=None,
):
@ -235,6 +272,11 @@ class AzureCloudProvider(CloudProviderInterface):
# instead?
return create_request.result()
def _get_management_group(self, credentials, management_group_id):
mgmgt_group_client = self.sdk.managementgroups.ManagementGroupsAPI(credentials)
response = mgmgt_group_client.management_groups.get(management_group_id)
return response
def _create_policy_definition(
self, credentials, subscription_id, management_group_id, properties,
):

View File

@ -25,6 +25,10 @@ from .models import (
BillingProfileTenantAccessCSPResult,
BillingProfileVerificationCSPPayload,
BillingProfileVerificationCSPResult,
ManagementGroupCSPPayload,
ManagementGroupCSPResponse,
ManagementGroupGetCSPPayload,
ManagementGroupGetCSPResponse,
ProductPurchaseCSPPayload,
ProductPurchaseCSPResult,
ProductPurchaseVerificationCSPPayload,
@ -319,6 +323,29 @@ class MockCloudProvider(CloudProviderInterface):
}
)
def create_initial_mgmt_group(self, payload: ManagementGroupCSPPayload):
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
return ManagementGroupCSPResponse(
id=f"{AZURE_MGMNT_PATH}{payload.management_group_name}"
)
def create_initial_mgmt_group_verification(
self, payload: ManagementGroupGetCSPPayload
):
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
return ManagementGroupGetCSPResponse(
**dict(
id="Test Id"
# id=f"{AZURE_MGMNT_PATH}{payload.management_group_name}"
)
)
def create_product_purchase(self, payload: ProductPurchaseCSPPayload):
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)

View File

@ -318,7 +318,7 @@ class ManagementGroupCSPPayload(AliasModel):
tenant_id: str
management_group_name: Optional[str]
display_name: str
parent_id: str
parent_id: Optional[str]
@validator("management_group_name", pre=True, always=True)
def supply_management_group_name_default(cls, name):
@ -336,18 +336,28 @@ class ManagementGroupCSPPayload(AliasModel):
def enforce_display_name_length(cls, name):
return name[0:90]
@validator("parent_id", pre=True, always=True)
def enforce_parent_id_pattern(cls, id_):
if AZURE_MGMNT_PATH not in id_:
return f"{AZURE_MGMNT_PATH}{id_}"
else:
return id_
if id_:
if AZURE_MGMNT_PATH not in id_:
return f"{AZURE_MGMNT_PATH}{id_}"
else:
return id_
class ManagementGroupCSPResponse(AliasModel):
id: str
class ManagementGroupGetCSPPayload(BaseCSPPayload):
management_group_name: str
class ManagementGroupGetCSPResponse(AliasModel):
id: str
class ApplicationCSPPayload(ManagementGroupCSPPayload):
pass

View File

@ -24,6 +24,8 @@ class AzureStages(Enum):
TENANT_PRINCIPAL_CREDENTIAL = "tenant principal credential"
ADMIN_ROLE_DEFINITION = "admin role definition"
PRINCIPAL_ADMIN_ROLE = "tenant principal admin"
INITIAL_MGMT_GROUP = "initial management group"
INITIAL_MGMT_GROUP_VERIFICATION = "initial management group verification"
TENANT_ADMIN_OWNERSHIP = "tenant admin ownership"
TENANT_PRINCIPAL_OWNERSHIP = "tenant principial ownership"

View File

@ -20,6 +20,10 @@ from atst.domain.csp.cloud.models import (
BillingProfileTenantAccessCSPResult,
BillingProfileVerificationCSPPayload,
BillingProfileVerificationCSPResult,
ManagementGroupCSPPayload,
ManagementGroupCSPResponse,
ManagementGroupGetCSPPayload,
ManagementGroupGetCSPResponse,
ProductPurchaseCSPPayload,
ProductPurchaseCSPResult,
ProductPurchaseVerificationCSPPayload,
@ -55,6 +59,12 @@ def mock_management_group_create(mock_azure, spec_dict):
)
def mock_management_group_get(mock_azure, spec_dict):
mock_azure.sdk.managementgroups.ManagementGroupsAPI.return_value.management_groups.get.return_value.result.return_value = (
spec_dict
)
def test_create_environment_succeeds(mock_azure: AzureCloudProvider):
environment = EnvironmentFactory.create()
@ -97,6 +107,43 @@ def test_create_application_succeeds(mock_azure: AzureCloudProvider):
assert result.id == "Test Id"
def test_create_initial_mgmt_group_succeeds(mock_azure: AzureCloudProvider):
application = ApplicationFactory.create()
mock_management_group_create(mock_azure, {"id": "Test Id"})
mock_azure = mock_get_secret(mock_azure)
payload = ManagementGroupCSPPayload(
tenant_id="1234",
display_name=application.name,
management_group_name=str(uuid4()),
)
result: ManagementGroupCSPResponse = mock_azure.create_initial_mgmt_group(payload)
assert result.id == "Test Id"
def test_create_initial_mgmt_group_verification_succeeds(
mock_azure: AzureCloudProvider,
):
application = ApplicationFactory.create()
mock_management_group_get(mock_azure, {"id": "Test Id"})
mock_azure = mock_get_secret(mock_azure)
management_group_name = str(uuid4())
payload = ManagementGroupGetCSPPayload(
tenant_id="1234", management_group_name=management_group_name
)
result: ManagementGroupGetCSPResponse = mock_azure.create_initial_mgmt_group_verification(
payload
)
assert result.id == "Test Id"
# assert result.name == management_group_name
def test_create_atat_admin_user_succeeds(mock_azure: AzureCloudProvider):
environment_id = str(uuid4())

View File

@ -111,6 +111,8 @@ def test_fsm_transition_start(mock_cloud_provider, portfolio: Portfolio):
FSMStates.TENANT_PRINCIPAL_CREDENTIAL_CREATED,
FSMStates.ADMIN_ROLE_DEFINITION_CREATED,
FSMStates.PRINCIPAL_ADMIN_ROLE_CREATED,
FSMStates.INITIAL_MGMT_GROUP_CREATED,
FSMStates.PRODUCT_PURCHASE_VERIFICATION_CREATED,
FSMStates.TENANT_ADMIN_OWNERSHIP_CREATED,
FSMStates.TENANT_PRINCIPAL_OWNERSHIP_CREATED,
]