Use application_role_id on environment_roles.

In the future, an `application_invitation1 will not refer to a `user` until
someone accepts the invitation; they'll only reference an
`application_role`. When a user is invited to an application, the
inviter can specify the environments the invitee should have access to.
For this to be possible, an `environment_role` should reference an
`application_role`, because no `user` entity will be known at that time.

In addition to updating all the models and domain methods necessary for
this change, this commit deletes unused code and tests that were
dependent on `environment_roles` having a `user_id` foreign key.
This commit is contained in:
dandds 2019-05-29 16:11:58 -04:00
parent f6698b3880
commit df06d1b62f
26 changed files with 314 additions and 434 deletions

View File

@ -0,0 +1,54 @@
"""environment_roles has relation to application_roles
Revision ID: d2390c547dca
Revises: ab1167fc8260
Create Date: 2019-05-29 12:34:45.928219
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'd2390c547dca'
down_revision = 'ab1167fc8260'
branch_labels = None
depends_on = None
def upgrade():
conn = op.get_bind()
op.add_column('environment_roles', sa.Column('application_role_id', postgresql.UUID(as_uuid=True), nullable=True))
op.drop_index('environments_role_user_environment', table_name='environment_roles')
op.create_index('environments_role_user_environment', 'environment_roles', ['application_role_id', 'environment_id'], unique=True)
op.drop_constraint('environment_roles_user_id_fkey', 'environment_roles', type_='foreignkey')
op.create_foreign_key("environment_roles_application_roles_fkey", 'environment_roles', 'application_roles', ['application_role_id'], ['id'])
conn.execute("""
UPDATE environment_roles er
SET application_role_id = application_roles.id
FROM environments, applications, application_roles
WHERE
environment_id = environments.id AND
applications.id = environments.application_id AND
application_roles.application_id = applications.id AND
er.user_id = application_roles.user_id;
""")
op.alter_column('environment_roles', "application_role_id", nullable=False)
op.drop_column('environment_roles', 'user_id')
def downgrade():
conn = op.get_bind()
op.add_column('environment_roles', sa.Column('user_id', postgresql.UUID(), autoincrement=False, nullable=True))
op.drop_constraint("environment_roles_application_roles_fkey", 'environment_roles', type_='foreignkey')
op.create_foreign_key('environment_roles_user_id_fkey', 'environment_roles', 'users', ['user_id'], ['id'])
op.drop_index('environments_role_user_environment', table_name='environment_roles')
op.create_index('environments_role_user_environment', 'environment_roles', ['user_id', 'environment_id'], unique=True)
conn.execute("""
UPDATE environment_roles
SET user_id = application_roles.user_id
FROM application_roles
WHERE application_role_id = application_roles.id
""")
op.alter_column('environment_roles', "user_id", nullable=False)
op.drop_column('environment_roles', 'application_role_id')

View File

@ -98,7 +98,9 @@ class Applications(BaseDomainClass):
role = env_role_data.get("role")
if role:
environment = Environments.get(env_role_data.get("environment_id"))
Environments.add_member(environment, user, env_role_data.get("role"))
Environments.add_member(
environment, application_role, env_role_data.get("role")
)
return application_role
@ -110,8 +112,11 @@ class Applications(BaseDomainClass):
application_role.status = ApplicationRoleStatus.DISABLED
application_role.deleted = True
db.session.add(application_role)
db.session.commit()
for env in application.environments:
EnvironmentRoles.delete(user_id=user_id, environment_id=env.id)
EnvironmentRoles.delete(
application_role_id=application_role.id, environment_id=env.id
)
db.session.add(application_role)
db.session.commit()

View File

@ -75,7 +75,7 @@ class MockCloudProvider(CloudProviderInterface):
def get_access_token(self, environment_role):
# for now, just create a mock token using the user and environment
# cloud IDs and the name of the role in the environment
user_id = environment_role.user.cloud_id or ""
user_id = environment_role.application_role.user.cloud_id or ""
env_id = environment_role.environment.cloud_id or ""
role_details = environment_role.role
return "::".join([user_id, env_id, role_details])

View File

@ -1,24 +1,27 @@
from flask import current_app as app
from atst.database import db
from atst.models import EnvironmentRole, Application, Environment
from atst.models import EnvironmentRole, ApplicationRole
class EnvironmentRoles(object):
@classmethod
def create(cls, user, environment, role):
env_role = EnvironmentRole(user=user, environment=environment, role=role)
if not user.cloud_id:
user.cloud_id = app.csp.cloud.create_user(user)
def create(cls, application_role, environment, role):
env_role = EnvironmentRole(
application_role=application_role, environment=environment, role=role
)
# TODO: move cloud_id behavior to invitation acceptance
# if not user.cloud_id:
# user.cloud_id = app.csp.cloud.create_user(user)
app.csp.cloud.create_role(env_role)
return env_role
@classmethod
def get(cls, user_id, environment_id):
def get(cls, application_role_id, environment_id):
existing_env_role = (
db.session.query(EnvironmentRole)
.filter(
EnvironmentRole.user_id == user_id,
EnvironmentRole.application_role_id == application_role_id,
EnvironmentRole.environment_id == environment_id,
)
.one_or_none()
@ -26,8 +29,21 @@ class EnvironmentRoles(object):
return existing_env_role
@classmethod
def delete(cls, user_id, environment_id):
existing_env_role = EnvironmentRoles.get(user_id, environment_id)
def get_by_user_and_environment(cls, user_id, environment_id):
existing_env_role = (
db.session.query(EnvironmentRole)
.join(ApplicationRole)
.filter(
ApplicationRole.user_id == user_id,
EnvironmentRole.environment_id == environment_id,
)
.one_or_none()
)
return existing_env_role
@classmethod
def delete(cls, application_role_id, environment_id):
existing_env_role = EnvironmentRoles.get(application_role_id, environment_id)
if existing_env_role:
app.csp.cloud.delete_role(existing_env_role)
db.session.delete(existing_env_role)
@ -37,14 +53,10 @@ class EnvironmentRoles(object):
return False
@classmethod
def get_for_application_and_user(cls, user_id, application_id):
def get_for_application_member(cls, application_role_id):
return (
db.session.query(EnvironmentRole)
.join(Environment)
.join(Application, Environment.application_id == Application.id)
.filter(EnvironmentRole.user_id == user_id)
.filter(Application.id == application_id)
.filter(EnvironmentRole.environment_id == Environment.id)
.filter(EnvironmentRole.application_role_id == application_role_id)
.filter(EnvironmentRole.deleted != True)
.all()
)

View File

@ -3,8 +3,6 @@ from sqlalchemy.orm.exc import NoResultFound
from atst.database import db
from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole
from atst.models.application import Application
from atst.domain.environment_roles import EnvironmentRoles
from atst.domain.application_roles import ApplicationRoles
@ -31,24 +29,13 @@ class Environments(object):
return environments
@classmethod
def add_member(cls, environment, user, role):
def add_member(cls, environment, application_role, role):
environment_user = EnvironmentRoles.create(
user=user, environment=environment, role=role
application_role=application_role, environment=environment, role=role
)
db.session.add(environment_user)
return environment
@classmethod
def for_user(cls, user, application):
return (
db.session.query(Environment)
.join(EnvironmentRole)
.join(Application)
.filter(EnvironmentRole.user_id == user.id)
.filter(Environment.application_id == application.id)
.all()
)
@classmethod
def update(cls, environment, name=None):
if name is not None:
@ -70,20 +57,22 @@ class Environments(object):
return env
@classmethod
def update_env_role(cls, environment, user, new_role):
def update_env_role(cls, environment, application_role, new_role):
updated = False
if new_role is None:
updated = EnvironmentRoles.delete(user.id, environment.id)
updated = EnvironmentRoles.delete(application_role.id, environment.id)
else:
env_role = EnvironmentRoles.get(user.id, environment.id)
env_role = EnvironmentRoles.get(application_role.id, environment.id)
if env_role and env_role.role != new_role:
env_role.role = new_role
updated = True
db.session.add(env_role)
elif not env_role:
env_role = EnvironmentRoles.create(
user=user, environment=environment, role=new_role
application_role=application_role,
environment=environment,
role=new_role,
)
updated = True
db.session.add(env_role)
@ -101,16 +90,7 @@ class Environments(object):
new_role = member["role_name"]
app_role = ApplicationRoles.get_by_id(member["application_role_id"])
Environments.update_env_role(
environment=environment, user=app_role.user, new_role=new_role
)
@classmethod
def update_env_roles_by_member(cls, member, env_roles):
for env_roles in env_roles:
new_role = env_roles["role"]
environment = Environments.get(env_roles["id"])
Environments.update_env_role(
environment=environment, user=member, new_role=new_role
environment=environment, application_role=app_role, new_role=new_role
)
@classmethod

View File

@ -1,14 +1,12 @@
from enum import Enum
from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import object_session, relationship
from sqlalchemy.orm import relationship
from sqlalchemy.event import listen
from atst.utils import first_or_none
from atst.models import Base, mixins
from atst.models.mixins.auditable import record_permission_sets_updates
from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole
from .types import Id
@ -93,22 +91,6 @@ class ApplicationRole(
"portfolio": self.application.portfolio.name,
}
@property
def environment_roles(self):
if getattr(self, "_environment_roles", None) is None:
roles = (
object_session(self)
.query(EnvironmentRole)
.join(Environment, Environment.application_id == self.application_id)
.filter(EnvironmentRole.environment_id == Environment.id)
.filter(EnvironmentRole.user_id == self.user_id)
.all()
)
setattr(self, "_environment_roles", roles)
return self._environment_roles
Index(
"application_role_user_application",

View File

@ -21,7 +21,7 @@ class Environment(
@property
def users(self):
return {r.user for r in self.roles}
return {r.application_role.user for r in self.roles}
@property
def num_users(self):

View File

@ -26,12 +26,14 @@ class EnvironmentRole(
role = Column(String())
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
user = relationship("User", backref="environment_roles")
application_role_id = Column(
UUID(as_uuid=True), ForeignKey("application_roles.id"), nullable=False
)
application_role = relationship("ApplicationRole", backref="environment_roles")
def __repr__(self):
return "<EnvironmentRole(role='{}', user='{}', environment='{}', id='{}')>".format(
self.role, self.user.full_name, self.environment.name, self.id
self.role, self.application_role.user_name, self.environment.name, self.id
)
@property
@ -53,8 +55,8 @@ class EnvironmentRole(
@property
def event_details(self):
return {
"updated_user_name": self.user.displayname,
"updated_user_id": str(self.user_id),
"updated_user_name": self.application_role.user_name,
"updated_application_role_id": str(self.application_role_id),
"role": self.role,
"environment": self.environment.displayname,
"environment_id": str(self.environment_id),
@ -67,7 +69,7 @@ class EnvironmentRole(
Index(
"environments_role_user_environment",
EnvironmentRole.user_id,
EnvironmentRole.application_role_id,
EnvironmentRole.environment_id,
unique=True,
)

View File

@ -7,11 +7,7 @@ from sqlalchemy.event import listen
from atst.models import Base, mixins
from .types import Id
from atst.database import db
from atst.utils import first_or_none
from atst.models.environment_role import EnvironmentRole
from atst.models.application import Application
from atst.models.environment import Environment
from atst.models.mixins.auditable import record_permission_sets_updates
@ -126,34 +122,6 @@ class PortfolioRole(
def is_active(self):
return self.status == Status.ACTIVE
@property
def num_environment_roles(self):
return (
db.session.query(EnvironmentRole)
.join(EnvironmentRole.environment)
.join(Environment.application)
.join(Application.portfolio)
.filter(Application.portfolio_id == self.portfolio_id)
.filter(EnvironmentRole.user_id == self.user_id)
.count()
)
@property
def environment_roles(self):
return (
db.session.query(EnvironmentRole)
.join(EnvironmentRole.environment)
.join(Environment.application)
.join(Application.portfolio)
.filter(Application.portfolio_id == self.portfolio_id)
.filter(EnvironmentRole.user_id == self.user_id)
.all()
)
@property
def has_environment_roles(self):
return self.num_environment_roles > 0
@property
def can_resend_invitation(self):
return not self.is_active and (

View File

@ -17,7 +17,7 @@ applications_bp.context_processor(portfolio_context_processor)
def wrap_environment_role_lookup(user, environment_id=None, **kwargs):
env_role = EnvironmentRoles.get(user.id, environment_id)
env_role = EnvironmentRoles.get_by_user_and_environment(user.id, environment_id)
if not env_role:
raise UnauthorizedError(user, "access environment {}".format(environment_id))
@ -27,7 +27,9 @@ def wrap_environment_role_lookup(user, environment_id=None, **kwargs):
@applications_bp.route("/environments/<environment_id>/access")
@user_can(None, override=wrap_environment_role_lookup, message="access environment")
def access_environment(environment_id):
env_role = EnvironmentRoles.get(g.current_user.id, environment_id)
env_role = EnvironmentRoles.get_by_user_and_environment(
g.current_user.id, environment_id
)
token = app.csp.cloud.get_access_token(env_role)
return redirect(url_for("atst.csp_environment_access", token=token))

View File

@ -38,9 +38,7 @@ def get_team_form(application):
member, PermissionSets.DELETE_APPLICATION_ENVIRONMENTS
),
}
roles = EnvironmentRoles.get_for_application_and_user(
member.user.id, application.id
)
roles = EnvironmentRoles.get_for_application_member(member.id)
environment_roles = [
{
"environment_id": str(role.environment.id),
@ -110,7 +108,7 @@ def update_team(application_id):
environment_role_form.environment_id.data
)
Environments.update_env_role(
environment, app_role.user, environment_role_form.data.get("role")
environment, app_role, environment_role_form.data.get("role")
)
flash("updated_application_team_settings", application_name=application.name)

View File

@ -11,21 +11,6 @@ from atst.models.permissions import Permissions
from atst.utils.flash import formatted_flash as flash
def serialize_portfolio_role(portfolio_role):
return {
"name": portfolio_role.user_name,
"status": portfolio_role.display_status,
"id": portfolio_role.user_id,
"role": "admin",
"num_env": portfolio_role.num_environment_roles,
"edit_link": url_for(
"portfolios.view_member",
portfolio_id=portfolio_role.portfolio_id,
member_id=portfolio_role.user_id,
),
}
@portfolios_bp.route("/portfolios/<portfolio_id>/members/new", methods=["POST"])
@user_can(Permissions.CREATE_PORTFOLIO_USERS, message="create new portfolio member")
def create_member(portfolio_id):

View File

@ -225,7 +225,7 @@ def add_applications_to_portfolio(portfolio):
last_name=user_data["last_name"],
)
ApplicationRoles.create(
app_role = ApplicationRoles.create(
user=user,
application=application,
permission_set_names=[PermissionSets.EDIT_APPLICATION_TEAM],
@ -237,7 +237,9 @@ def add_applications_to_portfolio(portfolio):
)
for env in user_environments:
role = random.choice([e.value for e in CSPRole])
EnvironmentRoles.create(user=user, environment=env, role=role)
EnvironmentRoles.create(
application_role=app_role, environment=env, role=role
)
def create_demo_portfolio(name, data):

View File

@ -128,7 +128,7 @@ def test_create_member():
# view application AND edit application team
assert len(member_role.permission_sets) == 2
env_roles = member_role.user.environment_roles
env_roles = member_role.environment_roles
assert len(env_roles) == 1
assert env_roles[0].environment == env1
@ -165,7 +165,9 @@ def test_remove_member():
user = UserFactory.create()
member_role = ApplicationRoleFactory.create(application=application, user=user)
environment = EnvironmentFactory.create(application=application)
environment_role = EnvironmentRoleFactory.create(user=user, environment=environment)
environment_role = EnvironmentRoleFactory.create(
application_role=member_role, environment=environment
)
assert member_role == ApplicationRoles.get(
user_id=user.id, application_id=application.id
@ -181,4 +183,9 @@ def test_remove_member():
#
# TODO: Why does above raise NotFoundError and this returns None
#
assert EnvironmentRoles.get(user_id=user.id, environment_id=environment.id) == None
assert (
EnvironmentRoles.get(
application_role_id=member_role.id, environment_id=environment.id
)
is None
)

View File

@ -84,7 +84,7 @@ def test_get_portfolio_events_includes_app_and_env_events():
# add environment level events
env = EnvironmentFactory.create(application=application)
env_role = EnvironmentRoleFactory.create(environment=env, user=app_role.user)
env_role = EnvironmentRoleFactory.create(environment=env, application_role=app_role)
portfolio_app_and_env_events = AuditLog.get_portfolio_events(portfolio)
assert len(portfolio_and_app_events) < len(portfolio_app_and_env_events)
@ -106,7 +106,7 @@ def test_get_application_events():
app_role = ApplicationRoleFactory.create(application=application)
app_invite = ApplicationInvitationFactory.create(role=app_role)
env = EnvironmentFactory.create(application=application)
env_role = EnvironmentRoleFactory.create(environment=env, user=app_role.user)
env_role = EnvironmentRoleFactory.create(environment=env, application_role=app_role)
# add rando app
rando_app = ApplicationFactory.create(portfolio=portfolio)

View File

@ -1,26 +1,90 @@
import pytest
from unittest.mock import MagicMock
from atst.domain.environment_roles import EnvironmentRoles
from tests.factories import *
def test_get_for_application_and_user():
@pytest.fixture
def application_role():
user = UserFactory.create()
application = ApplicationFactory.create()
env1 = EnvironmentFactory.create(application=application)
EnvironmentFactory.create(application=application)
EnvironmentRoleFactory.create(user=user, environment=env1)
return ApplicationRoleFactory.create(application=application, user=user)
roles = EnvironmentRoles.get_for_application_and_user(user.id, application.id)
@pytest.fixture
def environment(application_role):
return EnvironmentFactory.create(application=application_role.application)
def test_create(application_role, environment, monkeypatch):
mock_create_role = MagicMock()
monkeypatch.setattr(
"atst.domain.environment_roles.app.csp.cloud.create_role", mock_create_role
)
environment_role = EnvironmentRoles.create(
application_role, environment, "network admin"
)
assert environment_role.application_role == application_role
assert environment_role.environment == environment
assert environment_role.role == "network admin"
mock_create_role.assert_called_with(environment_role)
def test_get(application_role, environment):
EnvironmentRoleFactory.create(
application_role=application_role, environment=environment
)
environment_role = EnvironmentRoles.get(application_role.id, environment.id)
assert environment_role
assert environment_role.application_role == application_role
assert environment_role.environment == environment
def test_get_by_user_and_environment(application_role, environment):
expected_role = EnvironmentRoleFactory.create(
application_role=application_role, environment=environment
)
actual_role = EnvironmentRoles.get_by_user_and_environment(
application_role.user.id, environment.id
)
assert expected_role == actual_role
def test_delete(application_role, environment, monkeypatch):
mock_delete_role = MagicMock()
monkeypatch.setattr(
"atst.domain.environment_roles.app.csp.cloud.delete_role", mock_delete_role
)
environment_role = EnvironmentRoleFactory.create(
application_role=application_role, environment=environment
)
assert EnvironmentRoles.delete(application_role.id, environment.id)
mock_delete_role.assert_called_with(environment_role)
assert not EnvironmentRoles.delete(application_role.id, environment.id)
def test_get_for_application_member(application_role, environment):
EnvironmentRoleFactory.create(
application_role=application_role, environment=environment
)
roles = EnvironmentRoles.get_for_application_member(application_role.id)
assert len(roles) == 1
assert roles[0].environment == env1
assert roles[0].user == user
assert roles[0].environment == environment
assert roles[0].application_role == application_role
def test_get_for_application_and_user_does_not_return_deleted():
user = UserFactory.create()
application = ApplicationFactory.create()
env1 = EnvironmentFactory.create(application=application)
EnvironmentRoleFactory.create(user=user, environment=env1, deleted=True)
def test_get_for_application_member_does_not_return_deleted(
application_role, environment
):
EnvironmentRoleFactory.create(
application_role=application_role, environment=environment, deleted=True
)
roles = EnvironmentRoles.get_for_application_and_user(user.id, application.id)
roles = EnvironmentRoles.get_for_application_member(application_role.id)
assert len(roles) == 0

View File

@ -25,22 +25,22 @@ def test_create_environments():
def test_update_env_role():
env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
new_role = CSPRole.TECHNICAL_READ.value
ApplicationRoleFactory.create(
user=env_role.user, application=env_role.environment.application
)
assert Environments.update_env_role(env_role.environment, env_role.user, new_role)
assert Environments.update_env_role(
env_role.environment, env_role.application_role, new_role
)
assert env_role.role == new_role
def test_update_env_role_no_access():
env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
ApplicationRoleFactory.create(
user=env_role.user, application=env_role.environment.application
)
assert Environments.update_env_role(env_role.environment, env_role.user, None)
assert not EnvironmentRoles.get(env_role.user.id, env_role.environment.id)
assert Environments.update_env_role(
env_role.environment, env_role.application_role, None
)
assert not EnvironmentRoles.get(
env_role.application_role.id, env_role.environment.id
)
def test_update_env_role_no_change():
@ -48,55 +48,45 @@ def test_update_env_role_no_change():
new_role = CSPRole.BASIC_ACCESS.value
assert not Environments.update_env_role(
env_role.environment, env_role.user, new_role
env_role.environment, env_role.application_role, new_role
)
def test_update_env_role_creates_cloud_id_for_new_member(session):
user = UserFactory.create()
env = EnvironmentFactory.create()
ApplicationRoleFactory.create(user=user, application=env.application)
assert not user.cloud_id
assert Environments.update_env_role(env, user, CSPRole.TECHNICAL_READ.value)
assert EnvironmentRoles.get(user.id, env.id)
assert user.cloud_id is not None
def test_update_env_roles_by_environment():
environment = EnvironmentFactory.create()
app_role_1 = ApplicationRoleFactory.create(application=environment.application)
env_role_1 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value
)
app_role_1 = ApplicationRoleFactory.create(
user=env_role_1.user, application=environment.application
application_role=app_role_1,
environment=environment,
role=CSPRole.BASIC_ACCESS.value,
)
app_role_2 = ApplicationRoleFactory.create(application=environment.application)
env_role_2 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.NETWORK_ADMIN.value
)
app_role_2 = ApplicationRoleFactory.create(
user=env_role_2.user, application=environment.application
application_role=app_role_2,
environment=environment,
role=CSPRole.NETWORK_ADMIN.value,
)
app_role_3 = ApplicationRoleFactory.create(application=environment.application)
env_role_3 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.TECHNICAL_READ.value
)
app_role_3 = ApplicationRoleFactory.create(
user=env_role_3.user, application=environment.application
application_role=app_role_3,
environment=environment,
role=CSPRole.TECHNICAL_READ.value,
)
team_roles = [
{
"application_role_id": app_role_1.id,
"user_name": env_role_1.user.full_name,
"user_name": app_role_1.user_name,
"role_name": CSPRole.BUSINESS_READ.value,
},
{
"application_role_id": app_role_2.id,
"user_name": env_role_2.user.full_name,
"user_name": app_role_2.user_name,
"role_name": CSPRole.NETWORK_ADMIN.value,
},
{
"application_role_id": app_role_3.id,
"user_name": env_role_3.user.full_name,
"user_name": app_role_3.user_name,
"role_name": None,
},
]
@ -104,80 +94,7 @@ def test_update_env_roles_by_environment():
Environments.update_env_roles_by_environment(environment.id, team_roles)
assert env_role_1.role == CSPRole.BUSINESS_READ.value
assert env_role_2.role == CSPRole.NETWORK_ADMIN.value
assert not EnvironmentRoles.get(env_role_3.user.id, environment.id)
def test_update_env_roles_by_member():
user = UserFactory.create()
application = ApplicationFactory.create(
environments=[
{
"name": "dev",
"members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
},
{
"name": "staging",
"members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
},
{"name": "prod"},
{
"name": "testing",
"members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
},
]
)
dev, staging, prod, testing = application.environments
env_roles = [
{"id": dev.id, "role": CSPRole.NETWORK_ADMIN.value},
{"id": staging.id, "role": CSPRole.BUSINESS_READ.value},
{"id": prod.id, "role": CSPRole.TECHNICAL_READ.value},
{"id": testing.id, "role": None},
]
Environments.update_env_roles_by_member(user, env_roles)
assert EnvironmentRoles.get(user.id, dev.id).role == CSPRole.NETWORK_ADMIN.value
assert EnvironmentRoles.get(user.id, staging.id).role == CSPRole.BUSINESS_READ.value
assert EnvironmentRoles.get(user.id, prod.id).role == CSPRole.TECHNICAL_READ.value
assert not EnvironmentRoles.get(user.id, testing.id)
def test_get_scoped_environments(db):
developer = UserFactory.create()
portfolio = PortfolioFactory.create(
members=[{"user": developer, "role_name": "developer"}],
applications=[
{
"name": "application1",
"environments": [
{
"name": "application1 dev",
"members": [{"user": developer, "role_name": "developer"}],
},
{"name": "application1 staging"},
{"name": "application1 prod"},
],
},
{
"name": "application2",
"environments": [
{"name": "application2 dev"},
{
"name": "application2 staging",
"members": [{"user": developer, "role_name": "developer"}],
},
{"name": "application2 prod"},
],
},
],
)
application1_envs = Environments.for_user(developer, portfolio.applications[0])
assert [env.name for env in application1_envs] == ["application1 dev"]
application2_envs = Environments.for_user(developer, portfolio.applications[1])
assert [env.name for env in application2_envs] == ["application2 staging"]
assert not EnvironmentRoles.get(app_role_3.id, environment.id)
def test_get_excludes_deleted():
@ -190,7 +107,10 @@ def test_get_excludes_deleted():
def test_delete_environment(session):
env = EnvironmentFactory.create(application=ApplicationFactory.create())
env_role = EnvironmentRoleFactory.create(user=UserFactory.create(), environment=env)
env_role = EnvironmentRoleFactory.create(
application_role=ApplicationRoleFactory.create(application=env.application),
environment=env,
)
assert not env.deleted
assert not env_role.deleted
Environments.delete(env)

View File

@ -111,29 +111,6 @@ def test_scoped_portfolio_for_admin_missing_view_apps_perms(portfolio_owner, por
assert len(scoped_portfolio.applications) == 0
@pytest.mark.skip(reason="should be reworked pending application member changes")
def test_scoped_portfolio_only_returns_a_users_applications_and_environments(
portfolio, portfolio_owner
):
new_application = Applications.create(
portfolio, "My Application", "My application", ["dev", "staging", "prod"]
)
Applications.create(
portfolio, "My Application 2", "My application 2", ["dev", "staging", "prod"]
)
developer = UserFactory.create()
dev_environment = Environments.add_member(
new_application.environments[0], developer, "developer"
)
scoped_portfolio = Portfolios.get(developer, portfolio.id)
# Should only return the application and environment in which the user has an
# environment role.
assert scoped_portfolio.applications == [new_application]
assert scoped_portfolio.applications[0].environments == [dev_environment]
def test_scoped_portfolio_returns_all_applications_for_portfolio_admin(
portfolio, portfolio_owner
):

View File

@ -163,20 +163,6 @@ class ApplicationFactory(Base):
with_environments = kwargs.pop("environments", [])
application = super()._create(model_class, *args, **kwargs)
# need to create application roles for environment users
app_members_from_envs = set()
for env in with_environments:
with_members = env.get("members", [])
for member_data in with_members:
member = member_data.get("user", UserFactory.create())
app_members_from_envs.add(member)
# set for environments in case we just created the
# user for the application
member_data["user"] = member
for member in app_members_from_envs:
ApplicationRoleFactory.create(application=application, user=member)
environments = [
EnvironmentFactory.create(application=application, **e)
for e in with_environments
@ -200,9 +186,14 @@ class EnvironmentFactory(Base):
for member in with_members:
user = member.get("user", UserFactory.create())
application_role = ApplicationRoleFactory.create(
application=environment.application, user=user
)
role_name = member["role_name"]
EnvironmentRoleFactory.create(
environment=environment, role=role_name, user=user
environment=environment,
role=role_name,
application_role=application_role,
)
return environment
@ -234,7 +225,7 @@ class EnvironmentRoleFactory(Base):
environment = factory.SubFactory(EnvironmentFactory)
role = random.choice([e.value for e in CSPRole])
user = factory.SubFactory(UserFactory)
application_role = factory.SubFactory(ApplicationRoleFactory)
class PortfolioInvitationFactory(Base):

View File

@ -45,6 +45,8 @@ def test_environment_roles():
environment = EnvironmentFactory.create(application=application)
user = UserFactory.create()
application_role = ApplicationRoleFactory.create(application=application, user=user)
environment_role = EnvironmentRoleFactory.create(environment=environment, user=user)
environment_role = EnvironmentRoleFactory.create(
environment=environment, application_role=application_role
)
assert application_role.environment_roles == [environment_role]

View File

@ -3,13 +3,7 @@ from atst.models.environment_role import CSPRole
from atst.domain.environments import Environments
from atst.domain.applications import Applications
from tests.factories import (
PortfolioFactory,
UserFactory,
EnvironmentFactory,
ApplicationFactory,
ApplicationRoleFactory,
)
from tests.factories import *
def test_add_user_to_environment():
@ -22,9 +16,13 @@ def test_add_user_to_environment():
)
dev_environment = application.environments[0]
ApplicationRoleFactory.create(user=developer, application=application)
dev_environment = Environments.add_member(
dev_environment, developer, CSPRole.BASIC_ACCESS.value
application_role = ApplicationRoleFactory.create(
user=developer, application=application
)
EnvironmentRoleFactory.create(
application_role=application_role,
environment=dev_environment,
role=CSPRole.BASIC_ACCESS.value,
)
assert developer in dev_environment.users

View File

@ -8,16 +8,7 @@ from atst.domain.applications import Applications
from atst.domain.permission_sets import PermissionSets
from atst.models import AuditEvent, InvitationStatus, PortfolioRoleStatus, CSPRole
from tests.factories import (
UserFactory,
PortfolioInvitationFactory,
PortfolioRoleFactory,
EnvironmentFactory,
EnvironmentRoleFactory,
ApplicationFactory,
ApplicationRoleFactory,
PortfolioFactory,
)
from tests.factories import *
from atst.domain.portfolio_roles import PortfolioRoles
@ -97,8 +88,9 @@ def test_has_no_env_role_history(session):
application=application, name="new environment!"
)
app_role = ApplicationRoleFactory.create(user=user, application=application)
env_role = EnvironmentRoleFactory.create(
user=user, environment=environment, role="developer"
application_role=app_role, environment=environment, role="developer"
)
create_event = (
session.query(AuditEvent)
@ -110,22 +102,24 @@ def test_has_no_env_role_history(session):
def test_has_env_role_history(session):
owner = UserFactory.create()
user = UserFactory.create()
portfolio = PortfolioFactory.create(owner=owner)
portfolio_role = PortfolioRoleFactory.create(portfolio=portfolio, user=user)
application = ApplicationFactory.create(portfolio=portfolio)
ApplicationRoleFactory.create(user=user, application=application)
application = ApplicationFactory.create()
app_role = ApplicationRoleFactory.create(user=user, application=application)
environment = EnvironmentFactory.create(
application=application, name="new environment!"
)
env_role = EnvironmentRoleFactory.create(
user=user, environment=environment, role="developer"
)
Environments.update_env_roles_by_member(
user, [{"role": "admin", "id": environment.id}]
application_role=app_role, environment=environment, role="developer"
)
session.add(env_role)
session.commit()
session.refresh(env_role)
env_role.role = "admin"
session.add(env_role)
session.commit()
changed_events = (
session.query(AuditEvent)
.filter(AuditEvent.resource_id == env_role.id, AuditEvent.action == "update")
@ -147,44 +141,6 @@ def test_event_details():
assert portfolio_role.event_details["updated_user_id"] == str(user.id)
def test_has_no_environment_roles():
owner = UserFactory.create()
developer_data = {
"dod_id": "1234567890",
"first_name": "Test",
"last_name": "User",
"email": "test.user@mail.com",
"portfolio_role": "developer",
}
portfolio = PortfolioFactory.create(owner=owner)
portfolio_role = Portfolios.create_member(portfolio, developer_data)
assert not portfolio_role.has_environment_roles
def test_has_environment_roles():
owner = UserFactory.create()
developer_data = {
"dod_id": "1234567890",
"first_name": "Test",
"last_name": "User",
"email": "test.user@mail.com",
"portfolio_role": "developer",
}
portfolio = PortfolioFactory.create(owner=owner)
portfolio_role = Portfolios.create_member(portfolio, developer_data)
application = Applications.create(
portfolio, "my test application", "It's mine.", ["dev", "staging", "prod"]
)
ApplicationRoleFactory.create(user=portfolio_role.user, application=application)
Environments.add_member(
application.environments[0], portfolio_role.user, CSPRole.BASIC_ACCESS.value
)
assert portfolio_role.has_environment_roles
def test_status_when_member_is_active():
portfolio_role = PortfolioRoleFactory.create(status=PortfolioRoleStatus.ACTIVE)
assert portfolio_role.display_status == "Active"

View File

@ -1,33 +1,16 @@
from flask import url_for, get_flashed_messages
from tests.factories import (
UserFactory,
PortfolioFactory,
PortfolioRoleFactory,
EnvironmentRoleFactory,
EnvironmentFactory,
ApplicationFactory,
)
from atst.domain.applications import Applications
from atst.domain.portfolios import Portfolios
from atst.models.portfolio_role import Status as PortfolioRoleStatus
from tests.utils import captured_templates
def create_environment(user):
portfolio = PortfolioFactory.create()
portfolio_role = PortfolioRoleFactory.create(portfolio=portfolio, user=user)
application = ApplicationFactory.create(portfolio=portfolio)
return EnvironmentFactory.create(application=application, name="new environment!")
from tests.factories import *
def test_environment_access_with_env_role(client, user_session):
user = UserFactory.create()
environment = create_environment(user)
env_role = EnvironmentRoleFactory.create(
user=user, environment=environment, role="developer"
environment = EnvironmentFactory.create()
app_role = ApplicationRoleFactory.create(
user=user, application=environment.application
)
EnvironmentRoleFactory.create(
application_role=app_role, environment=environment, role="developer"
)
user_session(user)
response = client.get(
@ -39,7 +22,7 @@ def test_environment_access_with_env_role(client, user_session):
def test_environment_access_with_no_role(client, user_session):
user = UserFactory.create()
environment = create_environment(user)
environment = EnvironmentFactory.create()
user_session(user)
response = client.get(
url_for("applications.access_environment", environment_id=environment.id)

View File

@ -98,15 +98,14 @@ def test_edit_application_environments_obj(app, client, user_session):
{"env"},
)
env = application.environments[0]
app_role = ApplicationRoleFactory.create(application=application)
app_role1 = ApplicationRoleFactory.create(application=application)
env_role1 = EnvironmentRoleFactory.create(
environment=env, role=CSPRole.BASIC_ACCESS.value
application_role=app_role1, environment=env, role=CSPRole.BASIC_ACCESS.value
)
ApplicationRoleFactory.create(application=application, user=env_role1.user)
app_role2 = ApplicationRoleFactory.create(application=application)
env_role2 = EnvironmentRoleFactory.create(
environment=env, role=CSPRole.NETWORK_ADMIN.value
application_role=app_role2, environment=env, role=CSPRole.NETWORK_ADMIN.value
)
ApplicationRoleFactory.create(application=application, user=env_role2.user)
user_session(portfolio.owner)
@ -125,7 +124,7 @@ def test_edit_application_environments_obj(app, client, user_session):
assert isinstance(env_obj["edit_form"], EditEnvironmentForm)
assert (
env_obj["members"].sort()
== [env_role1.user.full_name, env_role2.user.full_name].sort()
== [app_role1.user_name, app_role2.user_name].sort()
)
assert isinstance(context["audit_events"], Paginator)
@ -140,17 +139,13 @@ def test_data_for_app_env_roles_form(app, client, user_session):
)
env = application.environments[0]
app_role0 = ApplicationRoleFactory.create(application=application)
app_role1 = ApplicationRoleFactory.create(application=application)
env_role1 = EnvironmentRoleFactory.create(
environment=env, role=CSPRole.BASIC_ACCESS.value
)
app_role1 = ApplicationRoleFactory.create(
application=application, user=env_role1.user
application_role=app_role1, environment=env, role=CSPRole.BASIC_ACCESS.value
)
app_role2 = ApplicationRoleFactory.create(application=application)
env_role2 = EnvironmentRoleFactory.create(
environment=env, role=CSPRole.NETWORK_ADMIN.value
)
app_role2 = ApplicationRoleFactory.create(
application=application, user=env_role2.user
application_role=app_role2, environment=env, role=CSPRole.NETWORK_ADMIN.value
)
user_session(portfolio.owner)
@ -175,7 +170,7 @@ def test_data_for_app_env_roles_form(app, client, user_session):
"members": [
{
"application_role_id": str(app_role0.id),
"user_name": app_role0.user.full_name,
"user_name": app_role0.user_name,
"role_name": None,
}
],
@ -185,7 +180,7 @@ def test_data_for_app_env_roles_form(app, client, user_session):
"members": [
{
"application_role_id": str(app_role1.id),
"user_name": env_role1.user.full_name,
"user_name": app_role1.user_name,
"role_name": CSPRole.BASIC_ACCESS.value,
}
],
@ -195,7 +190,7 @@ def test_data_for_app_env_roles_form(app, client, user_session):
"members": [
{
"application_role_id": str(app_role2.id),
"user_name": env_role2.user.full_name,
"user_name": app_role2.user_name,
"role_name": CSPRole.NETWORK_ADMIN.value,
}
],
@ -268,15 +263,21 @@ def test_update_team_env_roles(client, user_session):
application = environment.application
app_role_1 = ApplicationRoleFactory.create(application=application)
env_role_1 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value, user=app_role_1.user
environment=environment,
role=CSPRole.BASIC_ACCESS.value,
application_role=app_role_1,
)
app_role_2 = ApplicationRoleFactory.create(application=application)
env_role_2 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value, user=app_role_2.user
environment=environment,
role=CSPRole.BASIC_ACCESS.value,
application_role=app_role_2,
)
app_role_3 = ApplicationRoleFactory.create(application=application)
env_role_3 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value, user=app_role_3.user
environment=environment,
role=CSPRole.BASIC_ACCESS.value,
application_role=app_role_3,
)
app_role_4 = ApplicationRoleFactory.create(application=application)
@ -302,8 +303,8 @@ def test_update_team_env_roles(client, user_session):
assert response.status_code == 200
assert env_role_1.role == CSPRole.NETWORK_ADMIN.value
assert env_role_2.role == CSPRole.BASIC_ACCESS.value
assert not EnvironmentRoles.get(env_role_3.user.id, environment.id)
assert EnvironmentRoles.get(app_role_4.user.id, environment.id)
assert not EnvironmentRoles.get(app_role_3.id, environment.id)
assert EnvironmentRoles.get(app_role_4.id, environment.id)
def test_user_can_only_access_apps_in_their_portfolio(client, user_session):

View File

@ -94,7 +94,9 @@ def test_update_team_environment_roles(client, user_session):
)
environment = EnvironmentFactory.create(application=application)
env_role = EnvironmentRoleFactory.create(
user=app_role.user, environment=environment, role=CSPRole.NETWORK_ADMIN.value
application_role=app_role,
environment=environment,
role=CSPRole.NETWORK_ADMIN.value,
)
user_session(owner)
response = client.post(
@ -121,7 +123,9 @@ def test_update_team_revoke_environment_access(client, user_session, db, session
)
environment = EnvironmentFactory.create(application=application)
env_role = EnvironmentRoleFactory.create(
user=app_role.user, environment=environment, role=CSPRole.BASIC_ACCESS.value
application_role=app_role,
environment=environment,
role=CSPRole.BASIC_ACCESS.value,
)
user_session(owner)
response = client.post(
@ -177,8 +181,11 @@ def test_create_member(client, user_session):
assert response.location == expected_url
assert len(user.application_roles) == 1
assert user.application_roles[0].application == application
assert len(user.environment_roles) == 1
assert user.environment_roles[0].environment == env
environment_roles = [
er for ar in user.application_roles for er in ar.environment_roles
]
assert len(environment_roles) == 1
assert environment_roles[0].environment == env
def test_remove_member_success(client, user_session):

View File

@ -10,18 +10,7 @@ from atst.domain.auth import UNPROTECTED_ROUTES as _NO_LOGIN_REQUIRED
from atst.domain.permission_sets import PermissionSets
from atst.models import CSPRole, PortfolioRoleStatus, ApplicationRoleStatus
from tests.factories import (
AttachmentFactory,
ApplicationFactory,
ApplicationRoleFactory,
EnvironmentFactory,
EnvironmentRoleFactory,
PortfolioInvitationFactory,
PortfolioFactory,
PortfolioRoleFactory,
TaskOrderFactory,
UserFactory,
)
from tests.factories import *
_NO_ACCESS_CHECK_REQUIRED = _NO_LOGIN_REQUIRED + [
"task_orders.get_started", # all users can start a new TO
@ -265,11 +254,6 @@ def test_application_settings_access(get_url_assert_status):
applications=[{"name": "Mos Eisley", "description": "Where Han shot first"}],
)
app = portfolio.applications[0]
env = EnvironmentFactory.create(application=app)
env_role = EnvironmentRoleFactory.create(
environment=env, role=CSPRole.NETWORK_ADMIN.value
)
ApplicationRoleFactory.create(application=app, user=env_role.user)
url = url_for("applications.settings", application_id=app.id)
get_url_assert_status(ccpo, url, 200)