From 0469e061dad553b64c890b5d61a42479a041b753 Mon Sep 17 00:00:00 2001 From: richard-dds Date: Tue, 21 Aug 2018 20:47:22 -0400 Subject: [PATCH] Check permissions when attempting to create a project --- atst/domain/authz.py | 8 ++++++++ atst/domain/workspace_users.py | 23 +++++++++++++++++++++++ atst/domain/workspaces.py | 11 +++++++++++ atst/routes/workspaces.py | 4 ++-- tests/domain/test_workspaces.py | 18 ++++++++++++++++++ 5 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 atst/domain/authz.py diff --git a/atst/domain/authz.py b/atst/domain/authz.py new file mode 100644 index 00000000..08f7ac21 --- /dev/null +++ b/atst/domain/authz.py @@ -0,0 +1,8 @@ +from atst.domain.workspace_users import WorkspaceUsers + + +class Authorization(object): + @classmethod + def has_workspace_permission(cls, user, workspace, permission): + workspace_user = WorkspaceUsers.get(workspace.id, user.id) + return permission in workspace_user.permissions() diff --git a/atst/domain/workspace_users.py b/atst/domain/workspace_users.py index a67c0cbb..e7e51077 100644 --- a/atst/domain/workspace_users.py +++ b/atst/domain/workspace_users.py @@ -31,6 +31,29 @@ class WorkspaceUsers(object): return WorkspaceUser(user, workspace_role) + @classmethod + def add(cls, user, workspace_id, role_name): + role = Roles.get(role_name) + try: + existing_workspace_role = ( + db.session.query(WorkspaceRole) + .filter( + WorkspaceRole.user == user, + WorkspaceRole.workspace_id == workspace_id, + ) + .one() + ) + new_workspace_role = existing_workspace_role + new_workspace_role.role = role + except NoResultFound: + new_workspace_role = WorkspaceRole( + user=user, role_id=role.id, workspace_id=workspace_id + ) + + user.workspace_roles.append(new_workspace_role) + db.session.add(user) + db.session.commit() + @classmethod def add_many(cls, workspace_id, workspace_user_dicts): workspace_users = [] diff --git a/atst/domain/workspaces.py b/atst/domain/workspaces.py index 985447f0..ebede1f1 100644 --- a/atst/domain/workspaces.py +++ b/atst/domain/workspaces.py @@ -7,6 +7,8 @@ from atst.models.project import Project from atst.models.environment import Environment from atst.domain.exceptions import NotFoundError, UnauthorizedError from atst.domain.roles import Roles +from atst.domain.authz import Authorization +from atst.models.permissions import Permissions class Workspaces(object): @@ -42,6 +44,15 @@ class Workspaces(object): return workspace + @classmethod + def get_for_update(cls, user, workspace_id): + workspace = Workspaces.get(user, workspace_id) + if not Authorization.has_workspace_permission( + user, workspace, Permissions.ADD_APPLICATION_IN_WORKSPACE + ): + raise UnauthorizedError(user, "add project") + return workspace + @classmethod def get_by_request(cls, request): try: diff --git a/atst/routes/workspaces.py b/atst/routes/workspaces.py index 0b5c9773..34950e45 100644 --- a/atst/routes/workspaces.py +++ b/atst/routes/workspaces.py @@ -42,14 +42,14 @@ def workspace_reports(workspace_id): @bp.route("/workspaces//projects/new") def new_project(workspace_id): - workspace = Workspaces.get(g.current_user, workspace_id) + workspace = Workspaces.get_for_update(g.current_user, workspace_id) form = NewProjectForm() return render_template("workspace_project_new.html", workspace=workspace, form=form) @bp.route("/workspaces//projects", methods=["POST"]) def update_project(workspace_id): - workspace = Workspaces.get(g.current_user, workspace_id) + workspace = Workspaces.get_for_update(g.current_user, workspace_id) form = NewProjectForm(http_request.form) if form.validate(): diff --git a/tests/domain/test_workspaces.py b/tests/domain/test_workspaces.py index 2a406f39..468536a8 100644 --- a/tests/domain/test_workspaces.py +++ b/tests/domain/test_workspaces.py @@ -3,6 +3,7 @@ from uuid import uuid4 from atst.domain.exceptions import NotFoundError, UnauthorizedError from atst.domain.workspaces import Workspaces +from atst.domain.workspace_users import WorkspaceUsers from tests.factories import WorkspaceFactory, RequestFactory, UserFactory @@ -69,3 +70,20 @@ def test_workspaces_get_many_returns_a_users_workspaces(): Workspaces.create(RequestFactory.create()) assert Workspaces.get_many(user) == [users_workspace] + + +def test_get_for_update_allows_owner(): + owner = UserFactory.create() + workspace = Workspaces.create(RequestFactory.create(creator=owner)) + Workspaces.get_for_update(owner, workspace.id) + + +def test_get_for_update_blocks_developer(): + owner = UserFactory.create() + developer = UserFactory.create() + + workspace = Workspaces.create(RequestFactory.create(creator=owner)) + WorkspaceUsers.add(developer, workspace.id, "developer") + + with pytest.raises(UnauthorizedError): + Workspaces.get_for_update(developer, workspace.id)