Update env roles by environment

This commit is contained in:
leigh-mil 2019-04-22 16:06:13 -04:00
parent f6577c0cd6
commit c085f27af8
4 changed files with 129 additions and 145 deletions

View File

@ -6,6 +6,7 @@ from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole from atst.models.environment_role import EnvironmentRole
from atst.models.application import Application from atst.models.application import Application
from atst.domain.environment_roles import EnvironmentRoles from atst.domain.environment_roles import EnvironmentRoles
from atst.domain.users import Users
from .exceptions import NotFoundError from .exceptions import NotFoundError
@ -62,39 +63,51 @@ class Environments(object):
return env return env
@classmethod @classmethod
def update_environment_roles(cls, portfolio_role, ids_and_roles): def update_env_role(cls, environment, user, new_role):
updated = False updated = False
for id_and_role in ids_and_roles: if new_role is None or new_role == "No access":
new_role = id_and_role["role"] role_deleted = EnvironmentRoles.delete(user.id, environment.id)
environment = Environments.get(id_and_role["id"]) if role_deleted:
updated = True
if new_role is None: else:
role_deleted = EnvironmentRoles.delete( env_role = EnvironmentRoles.get(user.id, environment.id)
portfolio_role.user.id, environment.id if env_role and env_role.role != new_role:
env_role.role = new_role
updated = True
db.session.add(env_role)
elif not env_role:
env_role = EnvironmentRoles.create(
user=user, environment=environment, role=new_role
) )
if role_deleted: updated = True
updated = True db.session.add(env_role)
else:
env_role = EnvironmentRoles.get(
portfolio_role.user.id, id_and_role["id"]
)
if env_role and env_role.role != new_role:
env_role.role = new_role
updated = True
db.session.add(env_role)
elif not env_role:
env_role = EnvironmentRoles.create(
user=portfolio_role.user, environment=environment, role=new_role
)
updated = True
db.session.add(env_role)
if updated: if updated:
db.session.commit() db.session.commit()
return updated return updated
@classmethod
def update_env_roles_by_environment(cls, environment_id, team_roles):
environment = Environments.get(environment_id)
for member in team_roles:
new_role = member["role"]
user = Users.get(member["user_id"])
Environments.update_env_role(
environment=environment, user=user, new_role=new_role
)
@classmethod
def update_env_roles_by_member(cls, member, env_roles):
for env_roles in env_roles:
new_role = env_roles["role"]
environment = Environments.get(env_roles["id"])
Environments.update_env_role(
environment=environment, user=member, new_role=new_role
)
@classmethod @classmethod
def revoke_access(cls, environment, target_user): def revoke_access(cls, environment, target_user):
EnvironmentRoles.delete(environment.id, target_user.id) EnvironmentRoles.delete(environment.id, target_user.id)

View File

@ -1,9 +1,11 @@
import pytest import pytest
import random
from atst.domain.environments import Environments from atst.domain.environments import Environments
from atst.domain.environment_roles import EnvironmentRoles from atst.domain.environment_roles import EnvironmentRoles
from atst.domain.portfolio_roles import PortfolioRoles from atst.domain.portfolio_roles import PortfolioRoles
from atst.domain.exceptions import NotFoundError from atst.domain.exceptions import NotFoundError
from atst.models.environment_role import CSPRole
from tests.factories import ( from tests.factories import (
ApplicationFactory, ApplicationFactory,
@ -21,143 +23,111 @@ def test_create_environments():
assert env.cloud_id is not None assert env.cloud_id is not None
def test_create_environment_role_creates_cloud_id(session): def test_update_env_role():
owner = UserFactory.create() env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
developer = UserFactory.create() new_role = CSPRole.TECHNICAL_READ.value
portfolio = PortfolioFactory.create( assert Environments.update_env_role(env_role.environment, env_role.user, new_role)
owner=owner, assert env_role.role == new_role
members=[{"user": developer, "role_name": "developer"}],
applications=[
{"name": "application1", "environments": [{"name": "application1 prod"}]} def test_update_env_role_no_access():
], env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
assert Environments.update_env_role(
env_role.environment, env_role.user, "No access"
)
assert not EnvironmentRoles.get(env_role.user.id, env_role.environment.id)
def test_update_env_role_no_change():
env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
new_role = CSPRole.BASIC_ACCESS.value
assert not Environments.update_env_role(
env_role.environment, env_role.user, new_role
) )
env = portfolio.applications[0].environments[0]
new_role = [{"id": env.id, "role": "developer"}]
portfolio_role = portfolio.members[0] def test_update_env_role_creates_cloud_id_for_new_member(session):
assert not portfolio_role.user.cloud_id user = UserFactory.create()
assert Environments.update_environment_roles(portfolio_role, new_role) env = EnvironmentFactory.create()
assert not user.cloud_id
assert portfolio_role.user.cloud_id is not None assert Environments.update_env_role(env, user, CSPRole.TECHNICAL_READ.value)
assert EnvironmentRoles.get(user.id, env.id)
assert user.cloud_id is not None
def test_update_environment_roles(): def test_update_env_roles_by_environment():
owner = UserFactory.create() environment = EnvironmentFactory.create()
developer = UserFactory.create() env_role_1 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value
portfolio = PortfolioFactory.create( )
owner=owner, env_role_2 = EnvironmentRoleFactory.create(
members=[{"user": developer, "role_name": "developer"}], environment=environment, role=CSPRole.NETWORK_ADMIN.value
applications=[ )
{ env_role_3 = EnvironmentRoleFactory.create(
"name": "application1", environment=environment, role=CSPRole.TECHNICAL_READ.value
"environments": [
{
"name": "application1 dev",
"members": [{"user": developer, "role_name": "devlops"}],
},
{
"name": "application1 staging",
"members": [{"user": developer, "role_name": "developer"}],
},
{"name": "application1 prod"},
],
}
],
) )
dev_env = portfolio.applications[0].environments[0] team_roles = [
staging_env = portfolio.applications[0].environments[1] {
new_ids_and_roles = [ "user_id": env_role_1.user.id,
{"id": dev_env.id, "role": "billing_admin"}, "name": env_role_1.user.full_name,
{"id": staging_env.id, "role": "developer"}, "role": CSPRole.BUSINESS_READ.value,
},
{
"user_id": env_role_2.user.id,
"name": env_role_2.user.full_name,
"role": CSPRole.NETWORK_ADMIN.value,
},
{
"user_id": env_role_3.user.id,
"name": env_role_3.user.full_name,
"role": "No access",
},
] ]
portfolio_role = portfolio.members[0] Environments.update_env_roles_by_environment(environment.id, team_roles)
assert Environments.update_environment_roles(portfolio_role, new_ids_and_roles) assert env_role_1.role == CSPRole.BUSINESS_READ.value
new_dev_env_role = EnvironmentRoles.get(portfolio_role.user.id, dev_env.id) assert env_role_2.role == CSPRole.NETWORK_ADMIN.value
staging_env_role = EnvironmentRoles.get(portfolio_role.user.id, staging_env.id) assert not EnvironmentRoles.get(env_role_3.user.id, environment.id)
assert new_dev_env_role.role == "billing_admin"
assert staging_env_role.role == "developer"
def test_remove_environment_role(): def test_update_env_roles_by_member():
owner = UserFactory.create() user = UserFactory.create()
developer = UserFactory.create() application = ApplicationFactory.create(
portfolio = PortfolioFactory.create( environments=[
owner=owner,
members=[{"user": developer, "role_name": "developer"}],
applications=[
{ {
"name": "application1", "name": "dev",
"environments": [ "members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
{ },
"name": "application1 dev", {
"members": [{"user": developer, "role_name": "devops"}], "name": "staging",
}, "members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
{ },
"name": "application1 staging", {"name": "prod"},
"members": [{"user": developer, "role_name": "developer"}], {
}, "name": "testing",
{ "members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
"name": "application1 uat", },
"members": [ ]
{"user": developer, "role_name": "financial_auditor"}
],
},
{"name": "application1 prod"},
],
}
],
) )
application = portfolio.applications[0] dev, staging, prod, testing = application.environments
now_ba = application.environments[0].id env_roles = [
now_none = application.environments[1].id {"id": dev.id, "role": CSPRole.NETWORK_ADMIN.value},
still_fa = application.environments[2].id {"id": staging.id, "role": CSPRole.BUSINESS_READ.value},
{"id": prod.id, "role": CSPRole.TECHNICAL_READ.value},
new_environment_roles = [ {"id": testing.id, "role": "No access"},
{"id": now_ba, "role": "billing_auditor"},
{"id": now_none, "role": None},
] ]
portfolio_role = PortfolioRoles.get(portfolio.id, developer.id) Environments.update_env_roles_by_member(user, env_roles)
assert Environments.update_environment_roles(portfolio_role, new_environment_roles)
assert portfolio_role.num_environment_roles == 2 assert EnvironmentRoles.get(user.id, dev.id).role == CSPRole.NETWORK_ADMIN.value
assert EnvironmentRoles.get(developer.id, now_ba).role == "billing_auditor" assert EnvironmentRoles.get(user.id, staging.id).role == CSPRole.BUSINESS_READ.value
assert EnvironmentRoles.get(developer.id, now_none) is None assert EnvironmentRoles.get(user.id, prod.id).role == CSPRole.TECHNICAL_READ.value
assert EnvironmentRoles.get(developer.id, still_fa).role == "financial_auditor" assert not EnvironmentRoles.get(user.id, testing.id)
def test_no_update_to_environment_roles():
owner = UserFactory.create()
developer = UserFactory.create()
portfolio = PortfolioFactory.create(
owner=owner,
members=[{"user": developer, "role_name": "developer"}],
applications=[
{
"name": "application1",
"environments": [
{
"name": "application1 dev",
"members": [{"user": developer, "role_name": "devops"}],
}
],
}
],
)
dev_env = portfolio.applications[0].environments[0]
new_ids_and_roles = [{"id": dev_env.id, "role": "devops"}]
portfolio_role = PortfolioRoles.get(portfolio.id, developer.id)
assert not Environments.update_environment_roles(portfolio_role, new_ids_and_roles)
def test_get_scoped_environments(db): def test_get_scoped_environments(db):

View File

@ -194,6 +194,7 @@ class EnvironmentFactory(Base):
model = Environment model = Environment
name = factory.Faker("domain_word") name = factory.Faker("domain_word")
application = factory.SubFactory(ApplicationFactory)
@classmethod @classmethod
def _create(cls, model_class, *args, **kwargs): def _create(cls, model_class, *args, **kwargs):

View File

@ -123,8 +123,8 @@ def test_has_env_role_history(session):
env_role = EnvironmentRoleFactory.create( env_role = EnvironmentRoleFactory.create(
user=user, environment=environment, role="developer" user=user, environment=environment, role="developer"
) )
Environments.update_environment_roles( Environments.update_env_roles_by_member(
portfolio_role, [{"role": "admin", "id": environment.id}] user, [{"role": "admin", "id": environment.id}]
) )
changed_events = ( changed_events = (
session.query(AuditEvent) session.query(AuditEvent)