Merge pull request #1056 from dod-ccpo/mock-csp

Mock implementation of CSP interface
This commit is contained in:
richard-dds 2019-09-10 12:46:52 -04:00 committed by GitHub
commit c9dcacddb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 147 additions and 13 deletions

View File

@ -4,22 +4,24 @@ from .reports import MockReportingProvider
class MockCSP:
def __init__(self, app):
self.cloud = MockCloudProvider()
def __init__(self, app, test_mode=False):
self.cloud = MockCloudProvider(
app.config, with_delay=(not test_mode), with_failure=(not test_mode)
)
self.files = MockUploader(app)
self.reports = MockReportingProvider()
class AzureCSP:
def __init__(self, app):
self.cloud = MockCloudProvider()
self.cloud = MockCloudProvider(app.config)
self.files = AzureUploader(app.config)
self.reports = MockReportingProvider()
class AwsCSP:
def __init__(self, app):
self.cloud = MockCloudProvider()
self.cloud = MockCloudProvider(app.config)
self.files = AwsUploader(app.config)
self.reports = MockReportingProvider()
@ -29,5 +31,7 @@ def make_csp_provider(app, csp=None):
app.csp = AwsCSP(app)
elif csp == "azure":
app.csp = AzureCSP(app)
elif csp == "mock-test":
app.csp = MockCSP(app, test_mode=True)
else:
app.csp = MockCSP(app)

View File

@ -7,7 +7,14 @@ from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole
class GeneralCSPException(Exception):
pass
class CloudProviderInterface:
def root_creds() -> Dict:
raise NotImplementedError()
def create_environment(
self, auth_credentials: Dict, user: User, environment: Environment
) -> str:
@ -115,28 +122,87 @@ class CloudProviderInterface:
class MockCloudProvider(CloudProviderInterface):
# TODO: All of these constants
AUTH_EXCEPTION = GeneralCSPException("Authentication failure.")
NETWORK_EXCEPTION = GeneralCSPException("Network failure.")
NETWORK_FAILURE_PCT = 7
ENV_CREATE_FAILURE_PCT = 12
ATAT_ADMIN_CREATE_FAILURE_PCT = 12
def __init__(self, config, with_delay=True, with_failure=True):
from time import sleep
import random
self._with_delay = with_delay
self._with_failure = with_failure
self._sleep = sleep
self._random = random
def root_creds(self):
return self._auth_credentials
def create_environment(self, auth_credentials, user, environment):
return uuid4().hex
self._authorize(auth_credentials)
self._delay(1, 5)
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
self._maybe_raise(
self.ENV_CREATE_FAILURE_PCT,
GeneralCSPException("Could not create environment."),
)
return self._id()
def create_atat_admin_user(self, auth_credentials, csp_environment_id):
return {"id": uuid4().hex, "credentials": {}}
self._authorize(auth_credentials)
self._delay(1, 5)
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
self._maybe_raise(
self.ATAT_ADMIN_CREATE_FAILURE_PCT,
GeneralCSPException("Could not create admin user."),
)
return {"id": self._id(), "credentials": self._auth_credentials}
def create_environment_baseline(self, auth_credentials, csp_environment_id):
self._authorize(auth_credentials)
self._delay(1, 5)
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
self._maybe_raise(
self.ATAT_ADMIN_CREATE_FAILURE_PCT,
GeneralCSPException("Could not create environment baseline."),
)
return {
CSPRole.BASIC_ACCESS: uuid4().hex,
CSPRole.NETWORK_ADMIN: uuid4().hex,
CSPRole.BUSINESS_READ: uuid4().hex,
CSPRole.TECHNICAL_READ: uuid4().hex,
CSPRole.BASIC_ACCESS.value: self._id(),
CSPRole.NETWORK_ADMIN.value: self._id(),
CSPRole.BUSINESS_READ.value: self._id(),
CSPRole.TECHNICAL_READ.value: self._id(),
}
def create_or_update_user(self, auth_credentials, user_info, csp_role_id):
return {"id": uuid4().hex}
self._authorize(auth_credentials)
self._delay(1, 5)
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
self._maybe_raise(
self.ATAT_ADMIN_CREATE_FAILURE_PCT,
GeneralCSPException("Could not create user."),
)
return {"id": self._id()}
def suspend_user(self, auth_credentials, csp_user_id):
pass
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
return self._maybe(12)
def delete_user(self, auth_credentials, csp_user_id):
pass
self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION)
return self._maybe(12)
def get_calculator_url(self):
return "https://www.rackspace.com/en-us/calculator"
@ -145,3 +211,27 @@ class MockCloudProvider(CloudProviderInterface):
"""Returns the login url for a given environment
"""
return "https://www.mycloud.com/my-env-login"
def _id(self):
return uuid4().hex
def _delay(self, min_secs, max_secs):
if self._with_delay:
duration = self._random.randrange(min_secs, max_secs)
self._sleep(duration)
def _maybe(self, pct):
return not self._with_failure or self._random.randrange(0, 100) < pct
def _maybe_raise(self, pct, exc):
if self._with_failure and self._maybe(pct):
raise exc
@property
def _auth_credentials(self):
return {"username": "mock-cloud", "pass": "shh"}
def _authorize(self, credentials):
self._delay(1, 5)
if credentials != self._auth_credentials:
raise self.AUTH_EXCEPTION

View File

@ -6,3 +6,4 @@ CRL_STORAGE_CONTAINER = tests/fixtures/crl
WTF_CSRF_ENABLED = false
STORAGE_PROVIDER=LOCAL
PRESERVE_CONTEXT_ON_EXCEPTION = false
CSP=mock-test

View File

@ -0,0 +1,39 @@
import pytest
from atst.domain.csp import MockCloudProvider
CREDENTIALS = MockCloudProvider(config={})._auth_credentials
@pytest.fixture
def mock_csp():
return MockCloudProvider(config={}, with_delay=False, with_failure=False)
def test_create_environment(mock_csp: MockCloudProvider):
environment_id = mock_csp.create_environment(CREDENTIALS, {}, {})
assert isinstance(environment_id, str)
def test_create_admin_user(mock_csp: MockCloudProvider):
admin_user = mock_csp.create_atat_admin_user(CREDENTIALS, "env_id")
assert isinstance(admin_user["id"], str)
assert isinstance(admin_user["credentials"], dict)
def test_create_environment_baseline(mock_csp: MockCloudProvider):
baseline = mock_csp.create_atat_admin_user(CREDENTIALS, "env_id")
assert isinstance(baseline, dict)
def test_create_or_update_user(mock_csp: MockCloudProvider):
user_dict = mock_csp.create_or_update_user(CREDENTIALS, {}, "csp_role_id")
assert isinstance(user_dict["id"], str)
def test_suspend_user(mock_csp: MockCloudProvider):
assert mock_csp.suspend_user(CREDENTIALS, "csp_user_id")
def test_delete_user(mock_csp: MockCloudProvider):
assert mock_csp.delete_user(CREDENTIALS, "csp_user_id")