Remove unused code in both the cloud interfaces and environment models. Also add tests for some untested code in the cloud interface.
This commit is contained in:
parent
aaa4b401b5
commit
13aca270ca
@ -153,9 +153,9 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
creds = self._source_creds(payload.tenant_id)
|
||||
credentials = self._get_credential_obj(
|
||||
{
|
||||
"client_id": creds.root_sp_client_id,
|
||||
"secret_key": creds.root_sp_key,
|
||||
"tenant_id": creds.root_tenant_id,
|
||||
"client_id": creds.tenant_sp_client_id,
|
||||
"secret_key": creds.tenant_sp_key,
|
||||
"tenant_id": creds.tenant_id,
|
||||
},
|
||||
resource=self.sdk.cloud.endpoints.resource_manager,
|
||||
)
|
||||
@ -169,43 +169,6 @@ class AzureCloudProvider(CloudProviderInterface):
|
||||
|
||||
return EnvironmentCSPResult(**response)
|
||||
|
||||
def create_atat_admin_user(
|
||||
self, auth_credentials: Dict, csp_environment_id: str
|
||||
) -> Dict:
|
||||
root_creds = self._root_creds
|
||||
credentials = self._get_credential_obj(root_creds)
|
||||
|
||||
sub_client = self.sdk.subscription.SubscriptionClient(credentials)
|
||||
subscription = sub_client.subscriptions.get(csp_environment_id)
|
||||
|
||||
managment_principal = self._get_management_service_principal()
|
||||
|
||||
auth_client = self.sdk.authorization.AuthorizationManagementClient(
|
||||
credentials,
|
||||
# TODO: Determine which subscription this needs to point at
|
||||
# Once we're in a multi-sub environment
|
||||
subscription.id,
|
||||
)
|
||||
|
||||
# Create role assignment for
|
||||
role_assignment_id = str(uuid4())
|
||||
role_assignment_create_params = auth_client.role_assignments.models.RoleAssignmentCreateParameters(
|
||||
role_definition_id=REMOTE_ROOT_ROLE_DEF_ID,
|
||||
principal_id=managment_principal.id,
|
||||
)
|
||||
|
||||
auth_client.role_assignments.create(
|
||||
scope=f"/subscriptions/{subscription.id}/",
|
||||
role_assignment_name=role_assignment_id,
|
||||
parameters=role_assignment_create_params,
|
||||
)
|
||||
|
||||
return {
|
||||
"csp_user_id": managment_principal.object_id,
|
||||
"credentials": managment_principal.password_credentials,
|
||||
"role_name": role_assignment_id,
|
||||
}
|
||||
|
||||
def create_application(self, payload: ApplicationCSPPayload):
|
||||
creds = self._source_creds(payload.tenant_id)
|
||||
credentials = self._get_credential_obj(
|
||||
|
@ -31,33 +31,6 @@ class CloudProviderInterface:
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def create_atat_admin_user(
|
||||
self, auth_credentials: Dict, csp_environment_id: str
|
||||
) -> Dict:
|
||||
"""Creates a new, programmatic user in the CSP. Grants this user full permissions to administer
|
||||
the CSP.
|
||||
|
||||
Arguments:
|
||||
auth_credentials -- Object containing CSP account credentials
|
||||
csp_environment_id -- ID of the CSP Environment the admin user should be created in
|
||||
|
||||
Returns:
|
||||
object: Object representing new remote admin user, including credentials
|
||||
Something like:
|
||||
{
|
||||
"user_id": string,
|
||||
"credentials": dict, # structure TBD based on csp
|
||||
}
|
||||
|
||||
Raises:
|
||||
AuthenticationException: Problem with the credentials
|
||||
AuthorizationException: Credentials not authorized for current action(s)
|
||||
ConnectionException: Issue with the CSP API connection
|
||||
UnknownServerException: Unknown issue on the CSP side
|
||||
UserProvisioningException: Problem creating the root user
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def create_or_update_user(
|
||||
self, auth_credentials: Dict, user_info, csp_role_id: str
|
||||
) -> str:
|
||||
|
@ -117,23 +117,6 @@ class MockCloudProvider(CloudProviderInterface):
|
||||
subscription_id="subscriptions/60fbbb72-0516-4253-ab18-c92432ba3230"
|
||||
)
|
||||
|
||||
def create_atat_admin_user(self, auth_credentials, csp_environment_id):
|
||||
self._authorize(auth_credentials)
|
||||
|
||||
self._delay(1, 5)
|
||||
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
|
||||
self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION)
|
||||
self._maybe_raise(
|
||||
self.ATAT_ADMIN_CREATE_FAILURE_PCT,
|
||||
UserProvisioningException(
|
||||
csp_environment_id, "atat_admin", "Could not create admin user."
|
||||
),
|
||||
)
|
||||
|
||||
self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION)
|
||||
|
||||
return {"id": self._id(), "credentials": self._auth_credentials}
|
||||
|
||||
def create_tenant(self, payload: TenantCSPPayload):
|
||||
"""
|
||||
payload is an instance of TenantCSPPayload data class
|
||||
|
@ -106,7 +106,7 @@ class EnvironmentRoles(object):
|
||||
def disable(cls, environment_role_id):
|
||||
environment_role = EnvironmentRoles.get_by_id(environment_role_id)
|
||||
|
||||
if environment_role.csp_user_id and not environment_role.environment.is_pending:
|
||||
if environment_role.csp_user_id and not environment_role.environment.cloud_id:
|
||||
tenant_id = environment_role.environment.application.portfolio.csp_data.get(
|
||||
"tenant_id"
|
||||
)
|
||||
|
@ -126,7 +126,7 @@ class Environments(object):
|
||||
results = (
|
||||
cls.base_provision_query(now)
|
||||
.filter(Application.cloud_id != None)
|
||||
.filter(Environment.cloud_id == None)
|
||||
.filter(Environment.cloud_id.is_(None))
|
||||
.all()
|
||||
)
|
||||
return [id_ for id_, in results]
|
||||
|
@ -1,5 +1,3 @@
|
||||
from enum import Enum
|
||||
|
||||
from sqlalchemy import Column, ForeignKey, String, UniqueConstraint
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
@ -43,10 +41,6 @@ class Environment(
|
||||
),
|
||||
)
|
||||
|
||||
class ProvisioningStatus(Enum):
|
||||
PENDING = "pending"
|
||||
COMPLETED = "completed"
|
||||
|
||||
@property
|
||||
def users(self):
|
||||
return {r.application_role.user for r in self.roles}
|
||||
@ -67,17 +61,6 @@ class Environment(
|
||||
def portfolio_id(self):
|
||||
return self.application.portfolio_id
|
||||
|
||||
@property
|
||||
def provisioning_status(self) -> ProvisioningStatus:
|
||||
if self.cloud_id is None:
|
||||
return self.ProvisioningStatus.PENDING
|
||||
else:
|
||||
return self.ProvisioningStatus.COMPLETED
|
||||
|
||||
@property
|
||||
def is_pending(self):
|
||||
return self.provisioning_status == self.ProvisioningStatus.PENDING
|
||||
|
||||
def __repr__(self):
|
||||
return "<Environment(name='{}', num_users='{}', application='{}', portfolio='{}', id='{}')>".format(
|
||||
self.name,
|
||||
|
@ -39,7 +39,6 @@ def get_environments_obj_for_app(application):
|
||||
{
|
||||
"id": env.id,
|
||||
"name": env.name,
|
||||
"pending": env.is_pending,
|
||||
"edit_form": EditEnvironmentForm(obj=env),
|
||||
"member_count": len(env.roles),
|
||||
"members": sorted(
|
||||
|
@ -2,11 +2,11 @@ import json
|
||||
from unittest.mock import Mock, patch
|
||||
from uuid import uuid4
|
||||
|
||||
import pendulum
|
||||
import pydantic
|
||||
import pytest
|
||||
from tests.factories import ApplicationFactory, EnvironmentFactory
|
||||
from tests.mock_azure import AUTH_CREDENTIALS, mock_azure
|
||||
import pendulum
|
||||
import pydantic
|
||||
|
||||
from atst.domain.csp.cloud import AzureCloudProvider
|
||||
from atst.domain.csp.cloud.models import (
|
||||
@ -52,6 +52,7 @@ from atst.domain.csp.cloud.models import (
|
||||
TenantPrincipalCSPResult,
|
||||
TenantPrincipalOwnershipCSPPayload,
|
||||
TenantPrincipalOwnershipCSPResult,
|
||||
UserCSPPayload,
|
||||
)
|
||||
|
||||
BILLING_ACCOUNT_NAME = "52865e4c-52e8-5a6c-da6b-c58f0814f06f:7ea5de9d-b8ce-4901-b1c5-d864320c7b03_2019-05-31"
|
||||
@ -107,20 +108,6 @@ def test_create_application_succeeds(mock_azure: AzureCloudProvider):
|
||||
assert result.id == "Test Id"
|
||||
|
||||
|
||||
def test_create_atat_admin_user_succeeds(mock_azure: AzureCloudProvider):
|
||||
environment_id = str(uuid4())
|
||||
|
||||
csp_user_id = str(uuid4)
|
||||
|
||||
mock_azure.sdk.graphrbac.GraphRbacManagementClient.return_value.service_principals.create.return_value.object_id = (
|
||||
csp_user_id
|
||||
)
|
||||
|
||||
result = mock_azure.create_atat_admin_user(AUTH_CREDENTIALS, environment_id)
|
||||
|
||||
assert result.get("csp_user_id") == csp_user_id
|
||||
|
||||
|
||||
def test_create_policy_definition_succeeds(mock_azure: AzureCloudProvider):
|
||||
subscription_id = str(uuid4())
|
||||
management_group_id = str(uuid4())
|
||||
@ -882,3 +869,72 @@ def test_set_secret(mock_azure: AzureCloudProvider):
|
||||
)
|
||||
|
||||
assert mock_azure.set_secret("secret key", "secret_value") == "my secret"
|
||||
|
||||
|
||||
def test_create_active_directory_user(mock_azure: AzureCloudProvider):
|
||||
mock_result = Mock()
|
||||
mock_result.ok = True
|
||||
mock_result.json.return_value = {"id": "id"}
|
||||
mock_azure.sdk.requests.post.return_value = mock_result
|
||||
|
||||
payload = UserCSPPayload(
|
||||
tenant_id=uuid4().hex,
|
||||
display_name="Test Testerson",
|
||||
tenant_host_name="testtenant",
|
||||
email="test@testerson.test",
|
||||
password="asdfghjkl", # pragma: allowlist secret
|
||||
)
|
||||
|
||||
result = mock_azure._create_active_directory_user("token", payload)
|
||||
|
||||
assert result.id == "id"
|
||||
|
||||
|
||||
def test_update_active_directory_user_email(mock_azure: AzureCloudProvider):
|
||||
mock_result = Mock()
|
||||
mock_result.ok = True
|
||||
mock_azure.sdk.requests.patch.return_value = mock_result
|
||||
|
||||
payload = UserCSPPayload(
|
||||
tenant_id=uuid4().hex,
|
||||
display_name="Test Testerson",
|
||||
tenant_host_name="testtenant",
|
||||
email="test@testerson.test",
|
||||
password="asdfghjkl", # pragma: allowlist secret
|
||||
)
|
||||
|
||||
result = mock_azure._update_active_directory_user_email(
|
||||
"token", uuid4().hex, payload
|
||||
)
|
||||
|
||||
assert result
|
||||
|
||||
|
||||
def test_create_user(mock_azure: AzureCloudProvider):
|
||||
with patch.object(
|
||||
AzureCloudProvider,
|
||||
"_get_tenant_principal_token",
|
||||
wraps=mock_azure._get_tenant_principal_token,
|
||||
) as _get_tenant_principal_token:
|
||||
_get_tenant_principal_token.return_value = "token"
|
||||
|
||||
mock_result_create = Mock()
|
||||
mock_result_create.ok = True
|
||||
mock_result_create.json.return_value = {"id": "id"}
|
||||
mock_azure.sdk.requests.post.return_value = mock_result_create
|
||||
|
||||
mock_result_update = Mock()
|
||||
mock_result_update.ok = True
|
||||
mock_azure.sdk.requests.patch.return_value = mock_result_update
|
||||
|
||||
payload = UserCSPPayload(
|
||||
tenant_id=uuid4().hex,
|
||||
display_name="Test Testerson",
|
||||
tenant_host_name="testtenant",
|
||||
email="test@testerson.test",
|
||||
password="asdfghjkl", # pragma: allowlist secret
|
||||
)
|
||||
|
||||
result = mock_azure.create_user(payload)
|
||||
|
||||
assert result.id == "id"
|
||||
|
@ -28,17 +28,6 @@ def test_create_environment(mock_csp: MockCloudProvider):
|
||||
assert isinstance(result, EnvironmentCSPResult)
|
||||
|
||||
|
||||
def test_create_admin_user(mock_csp: MockCloudProvider):
|
||||
admin_user = mock_csp.create_atat_admin_user(CREDENTIALS, "env_id")
|
||||
assert isinstance(admin_user["id"], str)
|
||||
assert isinstance(admin_user["credentials"], dict)
|
||||
|
||||
|
||||
def test_create_environment_baseline(mock_csp: MockCloudProvider):
|
||||
baseline = mock_csp.create_atat_admin_user(CREDENTIALS, "env_id")
|
||||
assert isinstance(baseline, dict)
|
||||
|
||||
|
||||
def test_create_or_update_user(mock_csp: MockCloudProvider):
|
||||
env_role = EnvironmentRoleFactory.create()
|
||||
csp_user_id = mock_csp.create_or_update_user(CREDENTIALS, env_role, "csp_role_id")
|
||||
|
@ -93,7 +93,7 @@ def test_disable_completed(application_role, environment):
|
||||
|
||||
def test_disable_checks_env_provisioning_status(session):
|
||||
environment = EnvironmentFactory.create()
|
||||
assert environment.is_pending
|
||||
assert not environment.cloud_id
|
||||
env_role1 = EnvironmentRoleFactory.create(environment=environment)
|
||||
env_role1 = EnvironmentRoles.disable(env_role1.id)
|
||||
assert env_role1.disabled
|
||||
@ -103,7 +103,7 @@ def test_disable_checks_env_provisioning_status(session):
|
||||
session.commit()
|
||||
session.refresh(environment)
|
||||
|
||||
assert not environment.is_pending
|
||||
assert environment.cloud_id
|
||||
env_role2 = EnvironmentRoleFactory.create(environment=environment)
|
||||
env_role2 = EnvironmentRoles.disable(env_role2.id)
|
||||
assert env_role2.disabled
|
||||
|
@ -51,18 +51,6 @@ def test_audit_event_for_environment_deletion(session):
|
||||
assert after
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"env_data,expected_status",
|
||||
[
|
||||
[{"cloud_id": None,}, Environment.ProvisioningStatus.PENDING],
|
||||
[{"cloud_id": 1}, Environment.ProvisioningStatus.COMPLETED],
|
||||
],
|
||||
)
|
||||
def test_environment_provisioning_status(env_data, expected_status):
|
||||
environment = EnvironmentFactory.create(**env_data)
|
||||
assert environment.provisioning_status == expected_status
|
||||
|
||||
|
||||
def test_environment_roles_do_not_include_deleted():
|
||||
member_list = [
|
||||
{"role_name": CSPRole.ADMIN},
|
||||
|
Loading…
x
Reference in New Issue
Block a user