From 99e306e6025571f9d050c9400eaf48c419c627be Mon Sep 17 00:00:00 2001 From: tomdds Date: Mon, 28 Oct 2019 15:05:44 -0400 Subject: [PATCH] First pass at mocking and testing azure integration --- atst/domain/csp/cloud.py | 56 ++++++++++++++----------- tests/domain/cloud/test_azure_csp.py | 16 ++++++++ tests/mock_azure.py | 61 ++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 24 deletions(-) create mode 100644 tests/domain/cloud/test_azure_csp.py create mode 100644 tests/mock_azure.py diff --git a/atst/domain/csp/cloud.py b/atst/domain/csp/cloud.py index 1593914a..d4ea39d1 100644 --- a/atst/domain/csp/cloud.py +++ b/atst/domain/csp/cloud.py @@ -397,31 +397,39 @@ SUBSCRIPTION_ID_REGEX = re.compile( REMOTE_ROOT_ROLE_DEF_ID = "/providers/Microsoft.Authorization/roleDefinitions/00000000-0000-4000-8000-000000000000" +class AzureSDKProvider(object): + def __init__(self): + from azure.mgmt import subscription, authorization + import azure.graphrbac as graphrbac + import azure.common.credentials as credentials + from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD + + self.subscription = subscription + self.authorization = authorization + self.graphrbac = graphrbac + self.credentials = credentials + # may change to a JEDI cloud + self.cloud = AZURE_PUBLIC_CLOUD + + class AzureCloudProvider(CloudProviderInterface): - def __init__(self, config): + def __init__(self, config, azure_sdk_provider=None): self.config = config self.client_id = config["AZURE_CLIENT_ID"] self.secret_key = config["AZURE_SECRET_KEY"] self.tenant_id = config["AZURE_TENANT_ID"] - from azure.mgmt import subscription, authorization - import azure.graphrbac as graphrbac - import azure.common.credentials as credentials - from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD - - self.azure_subscription = subscription - self.azure_authorization = authorization - self.azure_graph = graphrbac - self.azure_credentials = credentials - # may change to a JEDI cloud - self.azure_cloud = AZURE_PUBLIC_CLOUD + if azure_sdk_provider is None: + self.sdk = AzureSDKProvider() + else: + self.sdk = azure_sdk_provider def create_environment( self, auth_credentials: Dict, user: User, environment: Environment ): credentials = self._get_credential_obj(self._root_creds) - sub_client = self.azure_mgmt.subscription.SubscriptionClient(credentials) + sub_client = self.sdk.subscription.SubscriptionClient(credentials) display_name = ( f"{environment.application.name}_{environment.name}_{environment.id}" @@ -431,7 +439,7 @@ class AzureCloudProvider(CloudProviderInterface): sku_id = AZURE_SKU_ID # we want to set AT-AT as an owner here # we could potentially associate subscriptions with "management groups" per DOD component - body = self.azure_mgmt.subscription.models.ModernSubscriptionCreationParameters( + body = self.sdk.subscription.models.ModernSubscriptionCreationParameters( display_name, billing_profile_id, sku_id, @@ -465,14 +473,14 @@ class AzureCloudProvider(CloudProviderInterface): root_creds = self._root_creds credentials = self._get_credential_obj(root_creds) - sub_client = self.azure_subscription.SubscriptionClient(credentials) - subscription: self.azure_subscription.models.Subscription = sub_client.subscriptions.get( + sub_client = self.sdk.subscription.SubscriptionClient(credentials) + subscription: self.sdk.subscription.models.Subscription = sub_client.subscriptions.get( csp_environment_id ) managment_principal = self._get_management_service_principal() - auth_client = self.azure_authorization.AuthorizationManagementClient( + auth_client = self.sdk.authorization.AuthorizationManagementClient( credentials, # TODO: Determine which subscription this needs to point at # Once we're in a multi-sub environment @@ -486,7 +494,7 @@ class AzureCloudProvider(CloudProviderInterface): principal_id=managment_principal.id, ) - self.azure_authorization.models.RoleAssignment = auth_client.role_assignments.create( + self.sdk.authorization.models.RoleAssignment = auth_client.role_assignments.create( scope=f"/subscriptions/{subscription.id}/", role_assignment_name=role_assignment_id, parameters=role_assignment_create_params, @@ -511,7 +519,7 @@ class AzureCloudProvider(CloudProviderInterface): # how do we scope the graph client to the new subscription rather than # the cloud0 subscription? tenant id seems to be separate from subscription id - graph_client = self.azure_graph.GraphRbacManagementClient( + graph_client = self.sdk.graphrbac.GraphRbacManagementClient( graph_creds, self._root_creds.get("tenant_id") ) @@ -519,7 +527,7 @@ class AzureCloudProvider(CloudProviderInterface): # or should we manage access to each subscription from a single service # principal with multiple role assignments? app_display_name = "?" # name should reflect the subscription it exists - app_create_param = self.azure_graph.models.ApplicationCreateParameters( + app_create_param = self.sdk.graphrbac.models.ApplicationCreateParameters( display_name=app_display_name ) @@ -528,14 +536,14 @@ class AzureCloudProvider(CloudProviderInterface): # https://docs.microsoft.com/en-us/graph/permissions-reference#microsoft-graph-permission-names # set app perms in app registration portal # https://docs.microsoft.com/en-us/graph/auth-v2-service#2-configure-permissions-for-microsoft-graph - app: self.azure_graph.models.Application = graph_client.applications.create( + app: self.sdk.graphrbac.models.Application = graph_client.applications.create( app_create_param ) # create a new service principle for the new application, which should be scoped # to the new subscription app_id = app.app_id - sp_create_params = self.azure_graph.models.ServicePrincipalCreateParameters( + sp_create_params = self.sdk.graphrbac.models.ServicePrincipalCreateParameters( app_id=app_id, account_enabled=True ) @@ -550,12 +558,12 @@ class AzureCloudProvider(CloudProviderInterface): return sub_id_match.group(1) def _get_credential_obj(self, creds, resource=None): - return self.azure_credentials.ServicePrincipalCredentials( + return self.sdk.credentials.ServicePrincipalCredentials( client_id=creds.get("client_id"), secret=creds.get("secret_key"), tenant=creds.get("tenant_id"), resource=resource, - cloud_environment=self.azure_cloud, + cloud_environment=self.sdk.cloud, ) @property diff --git a/tests/domain/cloud/test_azure_csp.py b/tests/domain/cloud/test_azure_csp.py new file mode 100644 index 00000000..95d6e013 --- /dev/null +++ b/tests/domain/cloud/test_azure_csp.py @@ -0,0 +1,16 @@ +import pytest +from unittest.mock import Mock + +from atst.domain.csp.cloud import EnvironmentCreationException, AzureCloudProvider +from atst.jobs import ( + do_create_environment, + do_create_atat_admin_user, + do_create_environment_baseline, +) + +from tests.mock_azure import mock_azure, AUTH_CREDENTIALS +from tests.factories import EnvironmentFactory + + +def test_create_environment_succeeds(mock_azure: AzureCloudProvider): + print(mock_azure._get_credential_obj(mock_azure._root_creds)) diff --git a/tests/mock_azure.py b/tests/mock_azure.py new file mode 100644 index 00000000..a94aad62 --- /dev/null +++ b/tests/mock_azure.py @@ -0,0 +1,61 @@ +import pytest +from unittest.mock import Mock + +from atst.domain.csp.cloud import AzureCloudProvider + +AZURE_CONFIG = { + "AZURE_CLIENT_ID": "MOCK", + "AZURE_SECRET_KEY": "MOCK", + "AZURE_TENANT_ID": "MOCK", +} + +AUTH_CREDENTIALS = { + "CLIENT_ID": AZURE_CONFIG["AZURE_CLIENT_ID"], + "SECRET_KEY": AZURE_CONFIG["AZURE_SECRET_KEY"], + "TENANT_ID": AZURE_CONFIG["AZURE_TENANT_ID"], +} + + +def mock_subscription(): + from azure.mgmt import subscription + + sub_mock = Mock(spec=subscription) + + return sub_mock + + +def mock_authorization(): + from azure.mgmt import authorization + + return Mock(spec=authorization) + + +def mock_graphrbac(): + import azure.graphrbac as graphrbac + + return Mock(spec=graphrbac) + + +def mock_credentials(): + import azure.common.credentials as credentials + + cred_mock = Mock(spec=credentials) + return cred_mock + + +class MockAzureSDK(object): + def __init__(self): + from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD + + self.subscription = mock_subscription() + self.authorization = mock_authorization() + self.graphrbac = mock_graphrbac() + self.credentials = mock_credentials() + # may change to a JEDI cloud + self.cloud = AZURE_PUBLIC_CLOUD + + +@pytest.fixture(scope="function") +def mock_azure(): + return AzureCloudProvider(AZURE_CONFIG, azure_sdk_provider=MockAzureSDK()) +