Check that all users with changed data are app users before updating env roles
This commit is contained in:
parent
0736b229bf
commit
60b4c50819
@ -7,7 +7,6 @@ from atst.models import EnvironmentRole
|
|||||||
class EnvironmentRoles(object):
|
class EnvironmentRoles(object):
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, user, environment, role):
|
def create(cls, user, environment, role):
|
||||||
if environment.application.has_member(user.id):
|
|
||||||
env_role = EnvironmentRole(user=user, environment=environment, role=role)
|
env_role = EnvironmentRole(user=user, environment=environment, role=role)
|
||||||
if not user.cloud_id:
|
if not user.cloud_id:
|
||||||
user.cloud_id = app.csp.cloud.create_user(user)
|
user.cloud_id = app.csp.cloud.create_user(user)
|
||||||
|
@ -70,7 +70,6 @@ class Environments(object):
|
|||||||
def update_env_role(cls, environment, user, new_role):
|
def update_env_role(cls, environment, user, new_role):
|
||||||
updated = False
|
updated = False
|
||||||
|
|
||||||
if environment.application.has_member(user.id):
|
|
||||||
if new_role is None:
|
if new_role is None:
|
||||||
updated = EnvironmentRoles.delete(user.id, environment.id)
|
updated = EnvironmentRoles.delete(user.id, environment.id)
|
||||||
else:
|
else:
|
||||||
|
@ -7,6 +7,7 @@ from atst.domain.applications import Applications
|
|||||||
from atst.forms.application import ApplicationForm
|
from atst.forms.application import ApplicationForm
|
||||||
from atst.forms.app_settings import EnvironmentRolesForm
|
from atst.forms.app_settings import EnvironmentRolesForm
|
||||||
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
||||||
|
from atst.domain.exceptions import NotFoundError
|
||||||
|
|
||||||
from atst.models.permissions import Permissions
|
from atst.models.permissions import Permissions
|
||||||
from atst.utils.flash import formatted_flash as flash
|
from atst.utils.flash import formatted_flash as flash
|
||||||
@ -43,6 +44,13 @@ def serialize_env_member_form_data(application):
|
|||||||
return environments_list
|
return environments_list
|
||||||
|
|
||||||
|
|
||||||
|
def check_users_are_in_application(user_ids, application):
|
||||||
|
for user_id in user_ids:
|
||||||
|
if not application.has_member(user_id):
|
||||||
|
raise NotFoundError("application user {}".format(user_id))
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
@applications_bp.route("/applications/<application_id>/settings")
|
@applications_bp.route("/applications/<application_id>/settings")
|
||||||
@user_can(Permissions.VIEW_APPLICATION, message="view application edit form")
|
@user_can(Permissions.VIEW_APPLICATION, message="view application edit form")
|
||||||
def settings(application_id):
|
def settings(application_id):
|
||||||
@ -101,6 +109,19 @@ def update_env_roles(environment_id):
|
|||||||
env_roles_form = EnvironmentRolesForm(http_request.form)
|
env_roles_form = EnvironmentRolesForm(http_request.form)
|
||||||
|
|
||||||
if env_roles_form.validate():
|
if env_roles_form.validate():
|
||||||
|
|
||||||
|
try:
|
||||||
|
user_ids = [user["user_id"] for user in env_roles_form.data["team_roles"]]
|
||||||
|
check_users_are_in_application(user_ids, application)
|
||||||
|
except NotFoundError as err:
|
||||||
|
app.logger.warning(
|
||||||
|
"Security violation for user {}, {} {}".format(
|
||||||
|
g.current_user.id, request.method, request.path
|
||||||
|
),
|
||||||
|
extra={"tags": ["update", "failure"], "security_warning": True},
|
||||||
|
)
|
||||||
|
|
||||||
|
raise (err)
|
||||||
env_data = env_roles_form.data
|
env_data = env_roles_form.data
|
||||||
Environments.update_env_roles_by_environment(
|
Environments.update_env_roles_by_environment(
|
||||||
environment_id=environment_id, team_roles=env_data["team_roles"]
|
environment_id=environment_id, team_roles=env_data["team_roles"]
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
import pytest
|
||||||
from flask import url_for, get_flashed_messages
|
from flask import url_for, get_flashed_messages
|
||||||
|
|
||||||
from tests.factories import (
|
from tests.factories import (
|
||||||
@ -9,12 +10,15 @@ from tests.factories import (
|
|||||||
ApplicationFactory,
|
ApplicationFactory,
|
||||||
ApplicationRoleFactory,
|
ApplicationRoleFactory,
|
||||||
)
|
)
|
||||||
|
from atst.routes.applications.settings import check_users_are_in_application
|
||||||
|
|
||||||
from atst.domain.applications import Applications
|
from atst.domain.applications import Applications
|
||||||
from atst.domain.environment_roles import EnvironmentRoles
|
from atst.domain.environment_roles import EnvironmentRoles
|
||||||
from atst.domain.environments import Environments
|
from atst.domain.environments import Environments
|
||||||
from atst.domain.permission_sets import PermissionSets
|
from atst.domain.permission_sets import PermissionSets
|
||||||
from atst.domain.portfolios import Portfolios
|
from atst.domain.portfolios import Portfolios
|
||||||
|
from atst.domain.exceptions import NotFoundError
|
||||||
|
|
||||||
from atst.models.environment_role import CSPRole
|
from atst.models.environment_role import CSPRole
|
||||||
from atst.models.portfolio_role import Status as PortfolioRoleStatus
|
from atst.models.portfolio_role import Status as PortfolioRoleStatus
|
||||||
|
|
||||||
@ -166,6 +170,32 @@ def test_user_without_permission_cannot_update_application(client, user_session)
|
|||||||
assert application.description == "Cool stuff happening here!"
|
assert application.description == "Cool stuff happening here!"
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_users_are_in_application_raises_NotFoundError():
|
||||||
|
application = ApplicationFactory.create()
|
||||||
|
app_user_1 = UserFactory.create()
|
||||||
|
app_user_2 = UserFactory.create()
|
||||||
|
for user in [app_user_1, app_user_2]:
|
||||||
|
ApplicationRoleFactory.create(user=user, application=application)
|
||||||
|
|
||||||
|
non_app_user = UserFactory.create()
|
||||||
|
user_ids = [app_user_1.id, app_user_2.id, non_app_user.id]
|
||||||
|
with pytest.raises(NotFoundError):
|
||||||
|
check_users_are_in_application(user_ids, application)
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_users_are_in_application():
|
||||||
|
application = ApplicationFactory.create()
|
||||||
|
app_user_1 = UserFactory.create()
|
||||||
|
app_user_2 = UserFactory.create()
|
||||||
|
app_user_3 = UserFactory.create()
|
||||||
|
|
||||||
|
for user in [app_user_1, app_user_2, app_user_3]:
|
||||||
|
ApplicationRoleFactory.create(user=user, application=application)
|
||||||
|
|
||||||
|
user_ids = [app_user_1.id, app_user_2.id, app_user_3.id]
|
||||||
|
assert check_users_are_in_application(user_ids, application)
|
||||||
|
|
||||||
|
|
||||||
def test_update_team_env_roles(client, user_session):
|
def test_update_team_env_roles(client, user_session):
|
||||||
environment = EnvironmentFactory.create()
|
environment = EnvironmentFactory.create()
|
||||||
application = environment.application
|
application = environment.application
|
||||||
|
@ -196,7 +196,9 @@ def test_applications_update_team_env_roles(post_url_assert_status):
|
|||||||
]
|
]
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
ApplicationRoleFactory.create(user=app_member)
|
ApplicationRoleFactory.create(user=app_member, application=application)
|
||||||
|
ApplicationRoleFactory.create(user=ccpo, application=application)
|
||||||
|
ApplicationRoleFactory.create(user=owner, application=application)
|
||||||
|
|
||||||
url = url_for("applications.update_env_roles", environment_id=environment.id)
|
url = url_for("applications.update_env_roles", environment_id=environment.id)
|
||||||
post_url_assert_status(ccpo, url, 302)
|
post_url_assert_status(ccpo, url, 302)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user