diff --git a/atst/domain/environment_roles.py b/atst/domain/environment_roles.py index dab6d8b8..99728467 100644 --- a/atst/domain/environment_roles.py +++ b/atst/domain/environment_roles.py @@ -7,12 +7,11 @@ from atst.models import EnvironmentRole class EnvironmentRoles(object): @classmethod def create(cls, user, environment, role): - if environment.application.has_member(user.id): - env_role = EnvironmentRole(user=user, environment=environment, role=role) - if not user.cloud_id: - user.cloud_id = app.csp.cloud.create_user(user) - app.csp.cloud.create_role(env_role) - return env_role + env_role = EnvironmentRole(user=user, environment=environment, role=role) + if not user.cloud_id: + user.cloud_id = app.csp.cloud.create_user(user) + app.csp.cloud.create_role(env_role) + return env_role @classmethod def get(cls, user_id, environment_id): diff --git a/atst/domain/environments.py b/atst/domain/environments.py index 0ad704a3..aeba40d2 100644 --- a/atst/domain/environments.py +++ b/atst/domain/environments.py @@ -70,24 +70,23 @@ class Environments(object): def update_env_role(cls, environment, user, new_role): updated = False - if environment.application.has_member(user.id): - if new_role is None: - updated = EnvironmentRoles.delete(user.id, environment.id) - else: - env_role = EnvironmentRoles.get(user.id, environment.id) - if env_role and env_role.role != new_role: - env_role.role = new_role - updated = True - db.session.add(env_role) - elif not env_role: - env_role = EnvironmentRoles.create( - user=user, environment=environment, role=new_role - ) - updated = True - db.session.add(env_role) + if new_role is None: + updated = EnvironmentRoles.delete(user.id, environment.id) + else: + env_role = EnvironmentRoles.get(user.id, environment.id) + if env_role and env_role.role != new_role: + env_role.role = new_role + updated = True + db.session.add(env_role) + elif not env_role: + env_role = EnvironmentRoles.create( + user=user, environment=environment, role=new_role + ) + updated = True + db.session.add(env_role) - if updated: - db.session.commit() + if updated: + db.session.commit() return updated diff --git a/atst/routes/applications/settings.py b/atst/routes/applications/settings.py index 8adc4a02..ad68a8d4 100644 --- a/atst/routes/applications/settings.py +++ b/atst/routes/applications/settings.py @@ -7,6 +7,7 @@ from atst.domain.applications import Applications from atst.forms.application import ApplicationForm from atst.forms.app_settings import EnvironmentRolesForm from atst.domain.authz.decorator import user_can_access_decorator as user_can +from atst.domain.exceptions import NotFoundError from atst.models.permissions import Permissions from atst.utils.flash import formatted_flash as flash @@ -43,6 +44,13 @@ def serialize_env_member_form_data(application): return environments_list +def check_users_are_in_application(user_ids, application): + for user_id in user_ids: + if not application.has_member(user_id): + raise NotFoundError("application user {}".format(user_id)) + return True + + @applications_bp.route("/applications//settings") @user_can(Permissions.VIEW_APPLICATION, message="view application edit form") def settings(application_id): @@ -101,6 +109,19 @@ def update_env_roles(environment_id): env_roles_form = EnvironmentRolesForm(http_request.form) if env_roles_form.validate(): + + try: + user_ids = [user["user_id"] for user in env_roles_form.data["team_roles"]] + check_users_are_in_application(user_ids, application) + except NotFoundError as err: + app.logger.warning( + "Security violation for user {}, {} {}".format( + g.current_user.id, request.method, request.path + ), + extra={"tags": ["update", "failure"], "security_warning": True}, + ) + + raise (err) env_data = env_roles_form.data Environments.update_env_roles_by_environment( environment_id=environment_id, team_roles=env_data["team_roles"] diff --git a/tests/routes/applications/test_settings.py b/tests/routes/applications/test_settings.py index 88e97c84..354fcbb6 100644 --- a/tests/routes/applications/test_settings.py +++ b/tests/routes/applications/test_settings.py @@ -1,3 +1,4 @@ +import pytest from flask import url_for, get_flashed_messages from tests.factories import ( @@ -9,12 +10,15 @@ from tests.factories import ( ApplicationFactory, ApplicationRoleFactory, ) +from atst.routes.applications.settings import check_users_are_in_application from atst.domain.applications import Applications from atst.domain.environment_roles import EnvironmentRoles from atst.domain.environments import Environments from atst.domain.permission_sets import PermissionSets from atst.domain.portfolios import Portfolios +from atst.domain.exceptions import NotFoundError + from atst.models.environment_role import CSPRole from atst.models.portfolio_role import Status as PortfolioRoleStatus @@ -166,6 +170,32 @@ def test_user_without_permission_cannot_update_application(client, user_session) assert application.description == "Cool stuff happening here!" +def test_check_users_are_in_application_raises_NotFoundError(): + application = ApplicationFactory.create() + app_user_1 = UserFactory.create() + app_user_2 = UserFactory.create() + for user in [app_user_1, app_user_2]: + ApplicationRoleFactory.create(user=user, application=application) + + non_app_user = UserFactory.create() + user_ids = [app_user_1.id, app_user_2.id, non_app_user.id] + with pytest.raises(NotFoundError): + check_users_are_in_application(user_ids, application) + + +def test_check_users_are_in_application(): + application = ApplicationFactory.create() + app_user_1 = UserFactory.create() + app_user_2 = UserFactory.create() + app_user_3 = UserFactory.create() + + for user in [app_user_1, app_user_2, app_user_3]: + ApplicationRoleFactory.create(user=user, application=application) + + user_ids = [app_user_1.id, app_user_2.id, app_user_3.id] + assert check_users_are_in_application(user_ids, application) + + def test_update_team_env_roles(client, user_session): environment = EnvironmentFactory.create() application = environment.application diff --git a/tests/test_access.py b/tests/test_access.py index 9839902c..2f47db0d 100644 --- a/tests/test_access.py +++ b/tests/test_access.py @@ -196,7 +196,9 @@ def test_applications_update_team_env_roles(post_url_assert_status): ] ), ) - ApplicationRoleFactory.create(user=app_member) + ApplicationRoleFactory.create(user=app_member, application=application) + ApplicationRoleFactory.create(user=ccpo, application=application) + ApplicationRoleFactory.create(user=owner, application=application) url = url_for("applications.update_env_roles", environment_id=environment.id) post_url_assert_status(ccpo, url, 302)