Merge pull request #854 from dod-ccpo/environment-application-roles

Use application_role_id on environment_roles.
This commit is contained in:
dandds 2019-05-31 12:59:20 -04:00 committed by GitHub
commit d3ba997106
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 314 additions and 434 deletions

View File

@ -0,0 +1,54 @@
"""environment_roles has relation to application_roles
Revision ID: d2390c547dca
Revises: ab1167fc8260
Create Date: 2019-05-29 12:34:45.928219
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'd2390c547dca'
down_revision = 'ab1167fc8260'
branch_labels = None
depends_on = None
def upgrade():
conn = op.get_bind()
op.add_column('environment_roles', sa.Column('application_role_id', postgresql.UUID(as_uuid=True), nullable=True))
op.drop_index('environments_role_user_environment', table_name='environment_roles')
op.create_index('environments_role_user_environment', 'environment_roles', ['application_role_id', 'environment_id'], unique=True)
op.drop_constraint('environment_roles_user_id_fkey', 'environment_roles', type_='foreignkey')
op.create_foreign_key("environment_roles_application_roles_fkey", 'environment_roles', 'application_roles', ['application_role_id'], ['id'])
conn.execute("""
UPDATE environment_roles er
SET application_role_id = application_roles.id
FROM environments, applications, application_roles
WHERE
environment_id = environments.id AND
applications.id = environments.application_id AND
application_roles.application_id = applications.id AND
er.user_id = application_roles.user_id;
""")
op.alter_column('environment_roles', "application_role_id", nullable=False)
op.drop_column('environment_roles', 'user_id')
def downgrade():
conn = op.get_bind()
op.add_column('environment_roles', sa.Column('user_id', postgresql.UUID(), autoincrement=False, nullable=True))
op.drop_constraint("environment_roles_application_roles_fkey", 'environment_roles', type_='foreignkey')
op.create_foreign_key('environment_roles_user_id_fkey', 'environment_roles', 'users', ['user_id'], ['id'])
op.drop_index('environments_role_user_environment', table_name='environment_roles')
op.create_index('environments_role_user_environment', 'environment_roles', ['user_id', 'environment_id'], unique=True)
conn.execute("""
UPDATE environment_roles
SET user_id = application_roles.user_id
FROM application_roles
WHERE application_role_id = application_roles.id
""")
op.alter_column('environment_roles', "user_id", nullable=False)
op.drop_column('environment_roles', 'application_role_id')

View File

@ -98,7 +98,9 @@ class Applications(BaseDomainClass):
role = env_role_data.get("role")
if role:
environment = Environments.get(env_role_data.get("environment_id"))
Environments.add_member(environment, user, env_role_data.get("role"))
Environments.add_member(
environment, application_role, env_role_data.get("role")
)
return application_role
@ -110,8 +112,11 @@ class Applications(BaseDomainClass):
application_role.status = ApplicationRoleStatus.DISABLED
application_role.deleted = True
db.session.add(application_role)
db.session.commit()
for env in application.environments:
EnvironmentRoles.delete(user_id=user_id, environment_id=env.id)
EnvironmentRoles.delete(
application_role_id=application_role.id, environment_id=env.id
)
db.session.add(application_role)
db.session.commit()

View File

@ -75,7 +75,7 @@ class MockCloudProvider(CloudProviderInterface):
def get_access_token(self, environment_role):
# for now, just create a mock token using the user and environment
# cloud IDs and the name of the role in the environment
user_id = environment_role.user.cloud_id or ""
user_id = environment_role.application_role.user.cloud_id or ""
env_id = environment_role.environment.cloud_id or ""
role_details = environment_role.role
return "::".join([user_id, env_id, role_details])

View File

@ -1,24 +1,27 @@
from flask import current_app as app
from atst.database import db
from atst.models import EnvironmentRole, Application, Environment
from atst.models import EnvironmentRole, ApplicationRole
class EnvironmentRoles(object):
@classmethod
def create(cls, user, environment, role):
env_role = EnvironmentRole(user=user, environment=environment, role=role)
if not user.cloud_id:
user.cloud_id = app.csp.cloud.create_user(user)
def create(cls, application_role, environment, role):
env_role = EnvironmentRole(
application_role=application_role, environment=environment, role=role
)
# TODO: move cloud_id behavior to invitation acceptance
# if not user.cloud_id:
# user.cloud_id = app.csp.cloud.create_user(user)
app.csp.cloud.create_role(env_role)
return env_role
@classmethod
def get(cls, user_id, environment_id):
def get(cls, application_role_id, environment_id):
existing_env_role = (
db.session.query(EnvironmentRole)
.filter(
EnvironmentRole.user_id == user_id,
EnvironmentRole.application_role_id == application_role_id,
EnvironmentRole.environment_id == environment_id,
)
.one_or_none()
@ -26,8 +29,21 @@ class EnvironmentRoles(object):
return existing_env_role
@classmethod
def delete(cls, user_id, environment_id):
existing_env_role = EnvironmentRoles.get(user_id, environment_id)
def get_by_user_and_environment(cls, user_id, environment_id):
existing_env_role = (
db.session.query(EnvironmentRole)
.join(ApplicationRole)
.filter(
ApplicationRole.user_id == user_id,
EnvironmentRole.environment_id == environment_id,
)
.one_or_none()
)
return existing_env_role
@classmethod
def delete(cls, application_role_id, environment_id):
existing_env_role = EnvironmentRoles.get(application_role_id, environment_id)
if existing_env_role:
app.csp.cloud.delete_role(existing_env_role)
db.session.delete(existing_env_role)
@ -37,14 +53,10 @@ class EnvironmentRoles(object):
return False
@classmethod
def get_for_application_and_user(cls, user_id, application_id):
def get_for_application_member(cls, application_role_id):
return (
db.session.query(EnvironmentRole)
.join(Environment)
.join(Application, Environment.application_id == Application.id)
.filter(EnvironmentRole.user_id == user_id)
.filter(Application.id == application_id)
.filter(EnvironmentRole.environment_id == Environment.id)
.filter(EnvironmentRole.application_role_id == application_role_id)
.filter(EnvironmentRole.deleted != True)
.all()
)

View File

@ -3,8 +3,6 @@ from sqlalchemy.orm.exc import NoResultFound
from atst.database import db
from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole
from atst.models.application import Application
from atst.domain.environment_roles import EnvironmentRoles
from atst.domain.application_roles import ApplicationRoles
@ -31,24 +29,13 @@ class Environments(object):
return environments
@classmethod
def add_member(cls, environment, user, role):
def add_member(cls, environment, application_role, role):
environment_user = EnvironmentRoles.create(
user=user, environment=environment, role=role
application_role=application_role, environment=environment, role=role
)
db.session.add(environment_user)
return environment
@classmethod
def for_user(cls, user, application):
return (
db.session.query(Environment)
.join(EnvironmentRole)
.join(Application)
.filter(EnvironmentRole.user_id == user.id)
.filter(Environment.application_id == application.id)
.all()
)
@classmethod
def update(cls, environment, name=None):
if name is not None:
@ -70,20 +57,22 @@ class Environments(object):
return env
@classmethod
def update_env_role(cls, environment, user, new_role):
def update_env_role(cls, environment, application_role, new_role):
updated = False
if new_role is None:
updated = EnvironmentRoles.delete(user.id, environment.id)
updated = EnvironmentRoles.delete(application_role.id, environment.id)
else:
env_role = EnvironmentRoles.get(user.id, environment.id)
env_role = EnvironmentRoles.get(application_role.id, environment.id)
if env_role and env_role.role != new_role:
env_role.role = new_role
updated = True
db.session.add(env_role)
elif not env_role:
env_role = EnvironmentRoles.create(
user=user, environment=environment, role=new_role
application_role=application_role,
environment=environment,
role=new_role,
)
updated = True
db.session.add(env_role)
@ -101,16 +90,7 @@ class Environments(object):
new_role = member["role_name"]
app_role = ApplicationRoles.get_by_id(member["application_role_id"])
Environments.update_env_role(
environment=environment, user=app_role.user, new_role=new_role
)
@classmethod
def update_env_roles_by_member(cls, member, env_roles):
for env_roles in env_roles:
new_role = env_roles["role"]
environment = Environments.get(env_roles["id"])
Environments.update_env_role(
environment=environment, user=member, new_role=new_role
environment=environment, application_role=app_role, new_role=new_role
)
@classmethod

View File

@ -1,14 +1,12 @@
from enum import Enum
from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import object_session, relationship
from sqlalchemy.orm import relationship
from sqlalchemy.event import listen
from atst.utils import first_or_none
from atst.models import Base, mixins
from atst.models.mixins.auditable import record_permission_sets_updates
from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole
from .types import Id
@ -93,22 +91,6 @@ class ApplicationRole(
"portfolio": self.application.portfolio.name,
}
@property
def environment_roles(self):
if getattr(self, "_environment_roles", None) is None:
roles = (
object_session(self)
.query(EnvironmentRole)
.join(Environment, Environment.application_id == self.application_id)
.filter(EnvironmentRole.environment_id == Environment.id)
.filter(EnvironmentRole.user_id == self.user_id)
.all()
)
setattr(self, "_environment_roles", roles)
return self._environment_roles
Index(
"application_role_user_application",

View File

@ -21,7 +21,7 @@ class Environment(
@property
def users(self):
return {r.user for r in self.roles}
return {r.application_role.user for r in self.roles}
@property
def num_users(self):

View File

@ -26,12 +26,14 @@ class EnvironmentRole(
role = Column(String())
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
user = relationship("User", backref="environment_roles")
application_role_id = Column(
UUID(as_uuid=True), ForeignKey("application_roles.id"), nullable=False
)
application_role = relationship("ApplicationRole", backref="environment_roles")
def __repr__(self):
return "<EnvironmentRole(role='{}', user='{}', environment='{}', id='{}')>".format(
self.role, self.user.full_name, self.environment.name, self.id
self.role, self.application_role.user_name, self.environment.name, self.id
)
@property
@ -53,8 +55,8 @@ class EnvironmentRole(
@property
def event_details(self):
return {
"updated_user_name": self.user.displayname,
"updated_user_id": str(self.user_id),
"updated_user_name": self.application_role.user_name,
"updated_application_role_id": str(self.application_role_id),
"role": self.role,
"environment": self.environment.displayname,
"environment_id": str(self.environment_id),
@ -67,7 +69,7 @@ class EnvironmentRole(
Index(
"environments_role_user_environment",
EnvironmentRole.user_id,
EnvironmentRole.application_role_id,
EnvironmentRole.environment_id,
unique=True,
)

View File

@ -7,11 +7,7 @@ from sqlalchemy.event import listen
from atst.models import Base, mixins
from .types import Id
from atst.database import db
from atst.utils import first_or_none
from atst.models.environment_role import EnvironmentRole
from atst.models.application import Application
from atst.models.environment import Environment
from atst.models.mixins.auditable import record_permission_sets_updates
@ -126,34 +122,6 @@ class PortfolioRole(
def is_active(self):
return self.status == Status.ACTIVE
@property
def num_environment_roles(self):
return (
db.session.query(EnvironmentRole)
.join(EnvironmentRole.environment)
.join(Environment.application)
.join(Application.portfolio)
.filter(Application.portfolio_id == self.portfolio_id)
.filter(EnvironmentRole.user_id == self.user_id)
.count()
)
@property
def environment_roles(self):
return (
db.session.query(EnvironmentRole)
.join(EnvironmentRole.environment)
.join(Environment.application)
.join(Application.portfolio)
.filter(Application.portfolio_id == self.portfolio_id)
.filter(EnvironmentRole.user_id == self.user_id)
.all()
)
@property
def has_environment_roles(self):
return self.num_environment_roles > 0
@property
def can_resend_invitation(self):
return not self.is_active and (

View File

@ -17,7 +17,7 @@ applications_bp.context_processor(portfolio_context_processor)
def wrap_environment_role_lookup(user, environment_id=None, **kwargs):
env_role = EnvironmentRoles.get(user.id, environment_id)
env_role = EnvironmentRoles.get_by_user_and_environment(user.id, environment_id)
if not env_role:
raise UnauthorizedError(user, "access environment {}".format(environment_id))
@ -27,7 +27,9 @@ def wrap_environment_role_lookup(user, environment_id=None, **kwargs):
@applications_bp.route("/environments/<environment_id>/access")
@user_can(None, override=wrap_environment_role_lookup, message="access environment")
def access_environment(environment_id):
env_role = EnvironmentRoles.get(g.current_user.id, environment_id)
env_role = EnvironmentRoles.get_by_user_and_environment(
g.current_user.id, environment_id
)
token = app.csp.cloud.get_access_token(env_role)
return redirect(url_for("atst.csp_environment_access", token=token))

View File

@ -38,9 +38,7 @@ def get_team_form(application):
member, PermissionSets.DELETE_APPLICATION_ENVIRONMENTS
),
}
roles = EnvironmentRoles.get_for_application_and_user(
member.user.id, application.id
)
roles = EnvironmentRoles.get_for_application_member(member.id)
environment_roles = [
{
"environment_id": str(role.environment.id),
@ -110,7 +108,7 @@ def update_team(application_id):
environment_role_form.environment_id.data
)
Environments.update_env_role(
environment, app_role.user, environment_role_form.data.get("role")
environment, app_role, environment_role_form.data.get("role")
)
flash("updated_application_team_settings", application_name=application.name)

View File

@ -11,21 +11,6 @@ from atst.models.permissions import Permissions
from atst.utils.flash import formatted_flash as flash
def serialize_portfolio_role(portfolio_role):
return {
"name": portfolio_role.user_name,
"status": portfolio_role.display_status,
"id": portfolio_role.user_id,
"role": "admin",
"num_env": portfolio_role.num_environment_roles,
"edit_link": url_for(
"portfolios.view_member",
portfolio_id=portfolio_role.portfolio_id,
member_id=portfolio_role.user_id,
),
}
@portfolios_bp.route("/portfolios/<portfolio_id>/members/new", methods=["POST"])
@user_can(Permissions.CREATE_PORTFOLIO_USERS, message="create new portfolio member")
def create_member(portfolio_id):

View File

@ -225,7 +225,7 @@ def add_applications_to_portfolio(portfolio):
last_name=user_data["last_name"],
)
ApplicationRoles.create(
app_role = ApplicationRoles.create(
user=user,
application=application,
permission_set_names=[PermissionSets.EDIT_APPLICATION_TEAM],
@ -237,7 +237,9 @@ def add_applications_to_portfolio(portfolio):
)
for env in user_environments:
role = random.choice([e.value for e in CSPRole])
EnvironmentRoles.create(user=user, environment=env, role=role)
EnvironmentRoles.create(
application_role=app_role, environment=env, role=role
)
def create_demo_portfolio(name, data):

View File

@ -128,7 +128,7 @@ def test_create_member():
# view application AND edit application team
assert len(member_role.permission_sets) == 2
env_roles = member_role.user.environment_roles
env_roles = member_role.environment_roles
assert len(env_roles) == 1
assert env_roles[0].environment == env1
@ -165,7 +165,9 @@ def test_remove_member():
user = UserFactory.create()
member_role = ApplicationRoleFactory.create(application=application, user=user)
environment = EnvironmentFactory.create(application=application)
environment_role = EnvironmentRoleFactory.create(user=user, environment=environment)
environment_role = EnvironmentRoleFactory.create(
application_role=member_role, environment=environment
)
assert member_role == ApplicationRoles.get(
user_id=user.id, application_id=application.id
@ -181,4 +183,9 @@ def test_remove_member():
#
# TODO: Why does above raise NotFoundError and this returns None
#
assert EnvironmentRoles.get(user_id=user.id, environment_id=environment.id) == None
assert (
EnvironmentRoles.get(
application_role_id=member_role.id, environment_id=environment.id
)
is None
)

View File

@ -84,7 +84,7 @@ def test_get_portfolio_events_includes_app_and_env_events():
# add environment level events
env = EnvironmentFactory.create(application=application)
env_role = EnvironmentRoleFactory.create(environment=env, user=app_role.user)
env_role = EnvironmentRoleFactory.create(environment=env, application_role=app_role)
portfolio_app_and_env_events = AuditLog.get_portfolio_events(portfolio)
assert len(portfolio_and_app_events) < len(portfolio_app_and_env_events)
@ -106,7 +106,7 @@ def test_get_application_events():
app_role = ApplicationRoleFactory.create(application=application)
app_invite = ApplicationInvitationFactory.create(role=app_role)
env = EnvironmentFactory.create(application=application)
env_role = EnvironmentRoleFactory.create(environment=env, user=app_role.user)
env_role = EnvironmentRoleFactory.create(environment=env, application_role=app_role)
# add rando app
rando_app = ApplicationFactory.create(portfolio=portfolio)

View File

@ -1,26 +1,90 @@
import pytest
from unittest.mock import MagicMock
from atst.domain.environment_roles import EnvironmentRoles
from tests.factories import *
def test_get_for_application_and_user():
@pytest.fixture
def application_role():
user = UserFactory.create()
application = ApplicationFactory.create()
env1 = EnvironmentFactory.create(application=application)
EnvironmentFactory.create(application=application)
EnvironmentRoleFactory.create(user=user, environment=env1)
return ApplicationRoleFactory.create(application=application, user=user)
roles = EnvironmentRoles.get_for_application_and_user(user.id, application.id)
@pytest.fixture
def environment(application_role):
return EnvironmentFactory.create(application=application_role.application)
def test_create(application_role, environment, monkeypatch):
mock_create_role = MagicMock()
monkeypatch.setattr(
"atst.domain.environment_roles.app.csp.cloud.create_role", mock_create_role
)
environment_role = EnvironmentRoles.create(
application_role, environment, "network admin"
)
assert environment_role.application_role == application_role
assert environment_role.environment == environment
assert environment_role.role == "network admin"
mock_create_role.assert_called_with(environment_role)
def test_get(application_role, environment):
EnvironmentRoleFactory.create(
application_role=application_role, environment=environment
)
environment_role = EnvironmentRoles.get(application_role.id, environment.id)
assert environment_role
assert environment_role.application_role == application_role
assert environment_role.environment == environment
def test_get_by_user_and_environment(application_role, environment):
expected_role = EnvironmentRoleFactory.create(
application_role=application_role, environment=environment
)
actual_role = EnvironmentRoles.get_by_user_and_environment(
application_role.user.id, environment.id
)
assert expected_role == actual_role
def test_delete(application_role, environment, monkeypatch):
mock_delete_role = MagicMock()
monkeypatch.setattr(
"atst.domain.environment_roles.app.csp.cloud.delete_role", mock_delete_role
)
environment_role = EnvironmentRoleFactory.create(
application_role=application_role, environment=environment
)
assert EnvironmentRoles.delete(application_role.id, environment.id)
mock_delete_role.assert_called_with(environment_role)
assert not EnvironmentRoles.delete(application_role.id, environment.id)
def test_get_for_application_member(application_role, environment):
EnvironmentRoleFactory.create(
application_role=application_role, environment=environment
)
roles = EnvironmentRoles.get_for_application_member(application_role.id)
assert len(roles) == 1
assert roles[0].environment == env1
assert roles[0].user == user
assert roles[0].environment == environment
assert roles[0].application_role == application_role
def test_get_for_application_and_user_does_not_return_deleted():
user = UserFactory.create()
application = ApplicationFactory.create()
env1 = EnvironmentFactory.create(application=application)
EnvironmentRoleFactory.create(user=user, environment=env1, deleted=True)
def test_get_for_application_member_does_not_return_deleted(
application_role, environment
):
EnvironmentRoleFactory.create(
application_role=application_role, environment=environment, deleted=True
)
roles = EnvironmentRoles.get_for_application_and_user(user.id, application.id)
roles = EnvironmentRoles.get_for_application_member(application_role.id)
assert len(roles) == 0

View File

@ -25,22 +25,22 @@ def test_create_environments():
def test_update_env_role():
env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
new_role = CSPRole.TECHNICAL_READ.value
ApplicationRoleFactory.create(
user=env_role.user, application=env_role.environment.application
)
assert Environments.update_env_role(env_role.environment, env_role.user, new_role)
assert Environments.update_env_role(
env_role.environment, env_role.application_role, new_role
)
assert env_role.role == new_role
def test_update_env_role_no_access():
env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
ApplicationRoleFactory.create(
user=env_role.user, application=env_role.environment.application
)
assert Environments.update_env_role(env_role.environment, env_role.user, None)
assert not EnvironmentRoles.get(env_role.user.id, env_role.environment.id)
assert Environments.update_env_role(
env_role.environment, env_role.application_role, None
)
assert not EnvironmentRoles.get(
env_role.application_role.id, env_role.environment.id
)
def test_update_env_role_no_change():
@ -48,55 +48,45 @@ def test_update_env_role_no_change():
new_role = CSPRole.BASIC_ACCESS.value
assert not Environments.update_env_role(
env_role.environment, env_role.user, new_role
env_role.environment, env_role.application_role, new_role
)
def test_update_env_role_creates_cloud_id_for_new_member(session):
user = UserFactory.create()
env = EnvironmentFactory.create()
ApplicationRoleFactory.create(user=user, application=env.application)
assert not user.cloud_id
assert Environments.update_env_role(env, user, CSPRole.TECHNICAL_READ.value)
assert EnvironmentRoles.get(user.id, env.id)
assert user.cloud_id is not None
def test_update_env_roles_by_environment():
environment = EnvironmentFactory.create()
app_role_1 = ApplicationRoleFactory.create(application=environment.application)
env_role_1 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value
)
app_role_1 = ApplicationRoleFactory.create(
user=env_role_1.user, application=environment.application
application_role=app_role_1,
environment=environment,
role=CSPRole.BASIC_ACCESS.value,
)
app_role_2 = ApplicationRoleFactory.create(application=environment.application)
env_role_2 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.NETWORK_ADMIN.value
)
app_role_2 = ApplicationRoleFactory.create(
user=env_role_2.user, application=environment.application
application_role=app_role_2,
environment=environment,
role=CSPRole.NETWORK_ADMIN.value,
)
app_role_3 = ApplicationRoleFactory.create(application=environment.application)
env_role_3 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.TECHNICAL_READ.value
)
app_role_3 = ApplicationRoleFactory.create(
user=env_role_3.user, application=environment.application
application_role=app_role_3,
environment=environment,
role=CSPRole.TECHNICAL_READ.value,
)
team_roles = [
{
"application_role_id": app_role_1.id,
"user_name": env_role_1.user.full_name,
"user_name": app_role_1.user_name,
"role_name": CSPRole.BUSINESS_READ.value,
},
{
"application_role_id": app_role_2.id,
"user_name": env_role_2.user.full_name,
"user_name": app_role_2.user_name,
"role_name": CSPRole.NETWORK_ADMIN.value,
},
{
"application_role_id": app_role_3.id,
"user_name": env_role_3.user.full_name,
"user_name": app_role_3.user_name,
"role_name": None,
},
]
@ -104,80 +94,7 @@ def test_update_env_roles_by_environment():
Environments.update_env_roles_by_environment(environment.id, team_roles)
assert env_role_1.role == CSPRole.BUSINESS_READ.value
assert env_role_2.role == CSPRole.NETWORK_ADMIN.value
assert not EnvironmentRoles.get(env_role_3.user.id, environment.id)
def test_update_env_roles_by_member():
user = UserFactory.create()
application = ApplicationFactory.create(
environments=[
{
"name": "dev",
"members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
},
{
"name": "staging",
"members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
},
{"name": "prod"},
{
"name": "testing",
"members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
},
]
)
dev, staging, prod, testing = application.environments
env_roles = [
{"id": dev.id, "role": CSPRole.NETWORK_ADMIN.value},
{"id": staging.id, "role": CSPRole.BUSINESS_READ.value},
{"id": prod.id, "role": CSPRole.TECHNICAL_READ.value},
{"id": testing.id, "role": None},
]
Environments.update_env_roles_by_member(user, env_roles)
assert EnvironmentRoles.get(user.id, dev.id).role == CSPRole.NETWORK_ADMIN.value
assert EnvironmentRoles.get(user.id, staging.id).role == CSPRole.BUSINESS_READ.value
assert EnvironmentRoles.get(user.id, prod.id).role == CSPRole.TECHNICAL_READ.value
assert not EnvironmentRoles.get(user.id, testing.id)
def test_get_scoped_environments(db):
developer = UserFactory.create()
portfolio = PortfolioFactory.create(
members=[{"user": developer, "role_name": "developer"}],
applications=[
{
"name": "application1",
"environments": [
{
"name": "application1 dev",
"members": [{"user": developer, "role_name": "developer"}],
},
{"name": "application1 staging"},
{"name": "application1 prod"},
],
},
{
"name": "application2",
"environments": [
{"name": "application2 dev"},
{
"name": "application2 staging",
"members": [{"user": developer, "role_name": "developer"}],
},
{"name": "application2 prod"},
],
},
],
)
application1_envs = Environments.for_user(developer, portfolio.applications[0])
assert [env.name for env in application1_envs] == ["application1 dev"]
application2_envs = Environments.for_user(developer, portfolio.applications[1])
assert [env.name for env in application2_envs] == ["application2 staging"]
assert not EnvironmentRoles.get(app_role_3.id, environment.id)
def test_get_excludes_deleted():
@ -190,7 +107,10 @@ def test_get_excludes_deleted():
def test_delete_environment(session):
env = EnvironmentFactory.create(application=ApplicationFactory.create())
env_role = EnvironmentRoleFactory.create(user=UserFactory.create(), environment=env)
env_role = EnvironmentRoleFactory.create(
application_role=ApplicationRoleFactory.create(application=env.application),
environment=env,
)
assert not env.deleted
assert not env_role.deleted
Environments.delete(env)

View File

@ -111,29 +111,6 @@ def test_scoped_portfolio_for_admin_missing_view_apps_perms(portfolio_owner, por
assert len(scoped_portfolio.applications) == 0
@pytest.mark.skip(reason="should be reworked pending application member changes")
def test_scoped_portfolio_only_returns_a_users_applications_and_environments(
portfolio, portfolio_owner
):
new_application = Applications.create(
portfolio, "My Application", "My application", ["dev", "staging", "prod"]
)
Applications.create(
portfolio, "My Application 2", "My application 2", ["dev", "staging", "prod"]
)
developer = UserFactory.create()
dev_environment = Environments.add_member(
new_application.environments[0], developer, "developer"
)
scoped_portfolio = Portfolios.get(developer, portfolio.id)
# Should only return the application and environment in which the user has an
# environment role.
assert scoped_portfolio.applications == [new_application]
assert scoped_portfolio.applications[0].environments == [dev_environment]
def test_scoped_portfolio_returns_all_applications_for_portfolio_admin(
portfolio, portfolio_owner
):

View File

@ -163,20 +163,6 @@ class ApplicationFactory(Base):
with_environments = kwargs.pop("environments", [])
application = super()._create(model_class, *args, **kwargs)
# need to create application roles for environment users
app_members_from_envs = set()
for env in with_environments:
with_members = env.get("members", [])
for member_data in with_members:
member = member_data.get("user", UserFactory.create())
app_members_from_envs.add(member)
# set for environments in case we just created the
# user for the application
member_data["user"] = member
for member in app_members_from_envs:
ApplicationRoleFactory.create(application=application, user=member)
environments = [
EnvironmentFactory.create(application=application, **e)
for e in with_environments
@ -200,9 +186,14 @@ class EnvironmentFactory(Base):
for member in with_members:
user = member.get("user", UserFactory.create())
application_role = ApplicationRoleFactory.create(
application=environment.application, user=user
)
role_name = member["role_name"]
EnvironmentRoleFactory.create(
environment=environment, role=role_name, user=user
environment=environment,
role=role_name,
application_role=application_role,
)
return environment
@ -234,7 +225,7 @@ class EnvironmentRoleFactory(Base):
environment = factory.SubFactory(EnvironmentFactory)
role = random.choice([e.value for e in CSPRole])
user = factory.SubFactory(UserFactory)
application_role = factory.SubFactory(ApplicationRoleFactory)
class PortfolioInvitationFactory(Base):

View File

@ -45,6 +45,8 @@ def test_environment_roles():
environment = EnvironmentFactory.create(application=application)
user = UserFactory.create()
application_role = ApplicationRoleFactory.create(application=application, user=user)
environment_role = EnvironmentRoleFactory.create(environment=environment, user=user)
environment_role = EnvironmentRoleFactory.create(
environment=environment, application_role=application_role
)
assert application_role.environment_roles == [environment_role]

View File

@ -3,13 +3,7 @@ from atst.models.environment_role import CSPRole
from atst.domain.environments import Environments
from atst.domain.applications import Applications
from tests.factories import (
PortfolioFactory,
UserFactory,
EnvironmentFactory,
ApplicationFactory,
ApplicationRoleFactory,
)
from tests.factories import *
def test_add_user_to_environment():
@ -22,9 +16,13 @@ def test_add_user_to_environment():
)
dev_environment = application.environments[0]
ApplicationRoleFactory.create(user=developer, application=application)
dev_environment = Environments.add_member(
dev_environment, developer, CSPRole.BASIC_ACCESS.value
application_role = ApplicationRoleFactory.create(
user=developer, application=application
)
EnvironmentRoleFactory.create(
application_role=application_role,
environment=dev_environment,
role=CSPRole.BASIC_ACCESS.value,
)
assert developer in dev_environment.users

View File

@ -8,16 +8,7 @@ from atst.domain.applications import Applications
from atst.domain.permission_sets import PermissionSets
from atst.models import AuditEvent, InvitationStatus, PortfolioRoleStatus, CSPRole
from tests.factories import (
UserFactory,
PortfolioInvitationFactory,
PortfolioRoleFactory,
EnvironmentFactory,
EnvironmentRoleFactory,
ApplicationFactory,
ApplicationRoleFactory,
PortfolioFactory,
)
from tests.factories import *
from atst.domain.portfolio_roles import PortfolioRoles
@ -97,8 +88,9 @@ def test_has_no_env_role_history(session):
application=application, name="new environment!"
)
app_role = ApplicationRoleFactory.create(user=user, application=application)
env_role = EnvironmentRoleFactory.create(
user=user, environment=environment, role="developer"
application_role=app_role, environment=environment, role="developer"
)
create_event = (
session.query(AuditEvent)
@ -110,22 +102,24 @@ def test_has_no_env_role_history(session):
def test_has_env_role_history(session):
owner = UserFactory.create()
user = UserFactory.create()
portfolio = PortfolioFactory.create(owner=owner)
portfolio_role = PortfolioRoleFactory.create(portfolio=portfolio, user=user)
application = ApplicationFactory.create(portfolio=portfolio)
ApplicationRoleFactory.create(user=user, application=application)
application = ApplicationFactory.create()
app_role = ApplicationRoleFactory.create(user=user, application=application)
environment = EnvironmentFactory.create(
application=application, name="new environment!"
)
env_role = EnvironmentRoleFactory.create(
user=user, environment=environment, role="developer"
)
Environments.update_env_roles_by_member(
user, [{"role": "admin", "id": environment.id}]
application_role=app_role, environment=environment, role="developer"
)
session.add(env_role)
session.commit()
session.refresh(env_role)
env_role.role = "admin"
session.add(env_role)
session.commit()
changed_events = (
session.query(AuditEvent)
.filter(AuditEvent.resource_id == env_role.id, AuditEvent.action == "update")
@ -147,44 +141,6 @@ def test_event_details():
assert portfolio_role.event_details["updated_user_id"] == str(user.id)
def test_has_no_environment_roles():
owner = UserFactory.create()
developer_data = {
"dod_id": "1234567890",
"first_name": "Test",
"last_name": "User",
"email": "test.user@mail.com",
"portfolio_role": "developer",
}
portfolio = PortfolioFactory.create(owner=owner)
portfolio_role = Portfolios.create_member(portfolio, developer_data)
assert not portfolio_role.has_environment_roles
def test_has_environment_roles():
owner = UserFactory.create()
developer_data = {
"dod_id": "1234567890",
"first_name": "Test",
"last_name": "User",
"email": "test.user@mail.com",
"portfolio_role": "developer",
}
portfolio = PortfolioFactory.create(owner=owner)
portfolio_role = Portfolios.create_member(portfolio, developer_data)
application = Applications.create(
portfolio, "my test application", "It's mine.", ["dev", "staging", "prod"]
)
ApplicationRoleFactory.create(user=portfolio_role.user, application=application)
Environments.add_member(
application.environments[0], portfolio_role.user, CSPRole.BASIC_ACCESS.value
)
assert portfolio_role.has_environment_roles
def test_status_when_member_is_active():
portfolio_role = PortfolioRoleFactory.create(status=PortfolioRoleStatus.ACTIVE)
assert portfolio_role.display_status == "Active"

View File

@ -1,33 +1,16 @@
from flask import url_for, get_flashed_messages
from tests.factories import (
UserFactory,
PortfolioFactory,
PortfolioRoleFactory,
EnvironmentRoleFactory,
EnvironmentFactory,
ApplicationFactory,
)
from atst.domain.applications import Applications
from atst.domain.portfolios import Portfolios
from atst.models.portfolio_role import Status as PortfolioRoleStatus
from tests.utils import captured_templates
def create_environment(user):
portfolio = PortfolioFactory.create()
portfolio_role = PortfolioRoleFactory.create(portfolio=portfolio, user=user)
application = ApplicationFactory.create(portfolio=portfolio)
return EnvironmentFactory.create(application=application, name="new environment!")
from tests.factories import *
def test_environment_access_with_env_role(client, user_session):
user = UserFactory.create()
environment = create_environment(user)
env_role = EnvironmentRoleFactory.create(
user=user, environment=environment, role="developer"
environment = EnvironmentFactory.create()
app_role = ApplicationRoleFactory.create(
user=user, application=environment.application
)
EnvironmentRoleFactory.create(
application_role=app_role, environment=environment, role="developer"
)
user_session(user)
response = client.get(
@ -39,7 +22,7 @@ def test_environment_access_with_env_role(client, user_session):
def test_environment_access_with_no_role(client, user_session):
user = UserFactory.create()
environment = create_environment(user)
environment = EnvironmentFactory.create()
user_session(user)
response = client.get(
url_for("applications.access_environment", environment_id=environment.id)

View File

@ -98,15 +98,14 @@ def test_edit_application_environments_obj(app, client, user_session):
{"env"},
)
env = application.environments[0]
app_role = ApplicationRoleFactory.create(application=application)
app_role1 = ApplicationRoleFactory.create(application=application)
env_role1 = EnvironmentRoleFactory.create(
environment=env, role=CSPRole.BASIC_ACCESS.value
application_role=app_role1, environment=env, role=CSPRole.BASIC_ACCESS.value
)
ApplicationRoleFactory.create(application=application, user=env_role1.user)
app_role2 = ApplicationRoleFactory.create(application=application)
env_role2 = EnvironmentRoleFactory.create(
environment=env, role=CSPRole.NETWORK_ADMIN.value
application_role=app_role2, environment=env, role=CSPRole.NETWORK_ADMIN.value
)
ApplicationRoleFactory.create(application=application, user=env_role2.user)
user_session(portfolio.owner)
@ -125,7 +124,7 @@ def test_edit_application_environments_obj(app, client, user_session):
assert isinstance(env_obj["edit_form"], EditEnvironmentForm)
assert (
env_obj["members"].sort()
== [env_role1.user.full_name, env_role2.user.full_name].sort()
== [app_role1.user_name, app_role2.user_name].sort()
)
assert isinstance(context["audit_events"], Paginator)
@ -140,17 +139,13 @@ def test_data_for_app_env_roles_form(app, client, user_session):
)
env = application.environments[0]
app_role0 = ApplicationRoleFactory.create(application=application)
app_role1 = ApplicationRoleFactory.create(application=application)
env_role1 = EnvironmentRoleFactory.create(
environment=env, role=CSPRole.BASIC_ACCESS.value
)
app_role1 = ApplicationRoleFactory.create(
application=application, user=env_role1.user
application_role=app_role1, environment=env, role=CSPRole.BASIC_ACCESS.value
)
app_role2 = ApplicationRoleFactory.create(application=application)
env_role2 = EnvironmentRoleFactory.create(
environment=env, role=CSPRole.NETWORK_ADMIN.value
)
app_role2 = ApplicationRoleFactory.create(
application=application, user=env_role2.user
application_role=app_role2, environment=env, role=CSPRole.NETWORK_ADMIN.value
)
user_session(portfolio.owner)
@ -175,7 +170,7 @@ def test_data_for_app_env_roles_form(app, client, user_session):
"members": [
{
"application_role_id": str(app_role0.id),
"user_name": app_role0.user.full_name,
"user_name": app_role0.user_name,
"role_name": None,
}
],
@ -185,7 +180,7 @@ def test_data_for_app_env_roles_form(app, client, user_session):
"members": [
{
"application_role_id": str(app_role1.id),
"user_name": env_role1.user.full_name,
"user_name": app_role1.user_name,
"role_name": CSPRole.BASIC_ACCESS.value,
}
],
@ -195,7 +190,7 @@ def test_data_for_app_env_roles_form(app, client, user_session):
"members": [
{
"application_role_id": str(app_role2.id),
"user_name": env_role2.user.full_name,
"user_name": app_role2.user_name,
"role_name": CSPRole.NETWORK_ADMIN.value,
}
],
@ -268,15 +263,21 @@ def test_update_team_env_roles(client, user_session):
application = environment.application
app_role_1 = ApplicationRoleFactory.create(application=application)
env_role_1 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value, user=app_role_1.user
environment=environment,
role=CSPRole.BASIC_ACCESS.value,
application_role=app_role_1,
)
app_role_2 = ApplicationRoleFactory.create(application=application)
env_role_2 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value, user=app_role_2.user
environment=environment,
role=CSPRole.BASIC_ACCESS.value,
application_role=app_role_2,
)
app_role_3 = ApplicationRoleFactory.create(application=application)
env_role_3 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value, user=app_role_3.user
environment=environment,
role=CSPRole.BASIC_ACCESS.value,
application_role=app_role_3,
)
app_role_4 = ApplicationRoleFactory.create(application=application)
@ -302,8 +303,8 @@ def test_update_team_env_roles(client, user_session):
assert response.status_code == 200
assert env_role_1.role == CSPRole.NETWORK_ADMIN.value
assert env_role_2.role == CSPRole.BASIC_ACCESS.value
assert not EnvironmentRoles.get(env_role_3.user.id, environment.id)
assert EnvironmentRoles.get(app_role_4.user.id, environment.id)
assert not EnvironmentRoles.get(app_role_3.id, environment.id)
assert EnvironmentRoles.get(app_role_4.id, environment.id)
def test_user_can_only_access_apps_in_their_portfolio(client, user_session):

View File

@ -94,7 +94,9 @@ def test_update_team_environment_roles(client, user_session):
)
environment = EnvironmentFactory.create(application=application)
env_role = EnvironmentRoleFactory.create(
user=app_role.user, environment=environment, role=CSPRole.NETWORK_ADMIN.value
application_role=app_role,
environment=environment,
role=CSPRole.NETWORK_ADMIN.value,
)
user_session(owner)
response = client.post(
@ -121,7 +123,9 @@ def test_update_team_revoke_environment_access(client, user_session, db, session
)
environment = EnvironmentFactory.create(application=application)
env_role = EnvironmentRoleFactory.create(
user=app_role.user, environment=environment, role=CSPRole.BASIC_ACCESS.value
application_role=app_role,
environment=environment,
role=CSPRole.BASIC_ACCESS.value,
)
user_session(owner)
response = client.post(
@ -177,8 +181,11 @@ def test_create_member(client, user_session):
assert response.location == expected_url
assert len(user.application_roles) == 1
assert user.application_roles[0].application == application
assert len(user.environment_roles) == 1
assert user.environment_roles[0].environment == env
environment_roles = [
er for ar in user.application_roles for er in ar.environment_roles
]
assert len(environment_roles) == 1
assert environment_roles[0].environment == env
def test_remove_member_success(client, user_session):

View File

@ -10,18 +10,7 @@ from atst.domain.auth import UNPROTECTED_ROUTES as _NO_LOGIN_REQUIRED
from atst.domain.permission_sets import PermissionSets
from atst.models import CSPRole, PortfolioRoleStatus, ApplicationRoleStatus
from tests.factories import (
AttachmentFactory,
ApplicationFactory,
ApplicationRoleFactory,
EnvironmentFactory,
EnvironmentRoleFactory,
PortfolioInvitationFactory,
PortfolioFactory,
PortfolioRoleFactory,
TaskOrderFactory,
UserFactory,
)
from tests.factories import *
_NO_ACCESS_CHECK_REQUIRED = _NO_LOGIN_REQUIRED + [
"task_orders.get_started", # all users can start a new TO
@ -265,11 +254,6 @@ def test_application_settings_access(get_url_assert_status):
applications=[{"name": "Mos Eisley", "description": "Where Han shot first"}],
)
app = portfolio.applications[0]
env = EnvironmentFactory.create(application=app)
env_role = EnvironmentRoleFactory.create(
environment=env, role=CSPRole.NETWORK_ADMIN.value
)
ApplicationRoleFactory.create(application=app, user=env_role.user)
url = url_for("applications.settings", application_id=app.id)
get_url_assert_status(ccpo, url, 200)