Ensure workspace access can be revoked
This commit is contained in:
@@ -1 +1 @@
|
|||||||
from .workspaces import Workspaces
|
from .workspaces import Workspaces, WorkspaceError
|
||||||
|
@@ -10,6 +10,10 @@ from .query import WorkspacesQuery
|
|||||||
from .scopes import ScopedWorkspace
|
from .scopes import ScopedWorkspace
|
||||||
|
|
||||||
|
|
||||||
|
class WorkspaceError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class Workspaces(object):
|
class Workspaces(object):
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, request, name=None):
|
def create(cls, request, name=None):
|
||||||
@@ -140,6 +144,10 @@ class Workspaces(object):
|
|||||||
|
|
||||||
WorkspacesQuery.add_and_commit(workspace)
|
WorkspacesQuery.add_and_commit(workspace)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def can_revoke_access(cls, workspace, workspace_role):
|
||||||
|
return workspace_role.user != workspace.owner
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def revoke_access(cls, user, workspace_id, workspace_role_id):
|
def revoke_access(cls, user, workspace_id, workspace_role_id):
|
||||||
workspace = WorkspacesQuery.get(workspace_id)
|
workspace = WorkspacesQuery.get(workspace_id)
|
||||||
@@ -150,8 +158,14 @@ class Workspaces(object):
|
|||||||
"revoke workspace access",
|
"revoke workspace access",
|
||||||
)
|
)
|
||||||
workspace_role = WorkspaceRoles.get_by_id(workspace_role_id)
|
workspace_role = WorkspaceRoles.get_by_id(workspace_role_id)
|
||||||
|
|
||||||
|
if not Workspaces.can_revoke_access(workspace, workspace_role):
|
||||||
|
raise WorkspaceError()
|
||||||
|
|
||||||
workspace_role.status = WorkspaceRoleStatus.DISABLED
|
workspace_role.status = WorkspaceRoleStatus.DISABLED
|
||||||
for environment in workspace.all_environments:
|
for environment in workspace.all_environments:
|
||||||
# TODO: Implement Environments.revoke_access
|
# TODO: Implement Environments.revoke_access
|
||||||
Environments.revoke_access(user, environment, workspace_role.user)
|
Environments.revoke_access(user, environment, workspace_role.user)
|
||||||
return WorkspacesQuery.add_and_commit(workspace_role)
|
WorkspacesQuery.add_and_commit(workspace_role)
|
||||||
|
|
||||||
|
return workspace_role
|
||||||
|
@@ -8,6 +8,7 @@ from atst.domain.invitations import (
|
|||||||
ExpiredError as InvitationExpiredError,
|
ExpiredError as InvitationExpiredError,
|
||||||
WrongUserError as InvitationWrongUserError,
|
WrongUserError as InvitationWrongUserError,
|
||||||
)
|
)
|
||||||
|
from atst.domain.workspaces import WorkspaceError
|
||||||
|
|
||||||
|
|
||||||
def log_error(e):
|
def log_error(e):
|
||||||
@@ -24,6 +25,7 @@ def make_error_pages(app):
|
|||||||
@app.errorhandler(werkzeug_exceptions.NotFound)
|
@app.errorhandler(werkzeug_exceptions.NotFound)
|
||||||
@app.errorhandler(exceptions.NotFoundError)
|
@app.errorhandler(exceptions.NotFoundError)
|
||||||
@app.errorhandler(exceptions.UnauthorizedError)
|
@app.errorhandler(exceptions.UnauthorizedError)
|
||||||
|
@app.errorhandler(WorkspaceError)
|
||||||
# pylint: disable=unused-variable
|
# pylint: disable=unused-variable
|
||||||
def not_found(e):
|
def not_found(e):
|
||||||
return handle_error(e)
|
return handle_error(e)
|
||||||
|
@@ -53,12 +53,14 @@
|
|||||||
confirm_msg="Are you sure? This will send an email to invite the user to join this workspace."
|
confirm_msg="Are you sure? This will send an email to invite the user to join this workspace."
|
||||||
)}}
|
)}}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
{% if can_revoke_access %}
|
||||||
{{ ConfirmationButton (
|
{{ ConfirmationButton (
|
||||||
"Remove Workspace Access",
|
"Remove Workspace Access",
|
||||||
url_for("workspaces.revoke_access", workspace_id=workspace.id, member_id=member.id),
|
url_for("workspaces.revoke_access", workspace_id=workspace.id, member_id=member.id),
|
||||||
form.csrf_token,
|
form.csrf_token,
|
||||||
confirm_msg="Are you sure? This will remove this user from the workspace.",
|
confirm_msg="Are you sure? This will remove this user from the workspace.",
|
||||||
)}}
|
)}}
|
||||||
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
@@ -2,7 +2,7 @@ import pytest
|
|||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from atst.domain.exceptions import NotFoundError, UnauthorizedError
|
from atst.domain.exceptions import NotFoundError, UnauthorizedError
|
||||||
from atst.domain.workspaces import Workspaces
|
from atst.domain.workspaces import Workspaces, WorkspaceError
|
||||||
from atst.domain.workspace_roles import WorkspaceRoles
|
from atst.domain.workspace_roles import WorkspaceRoles
|
||||||
from atst.domain.projects import Projects
|
from atst.domain.projects import Projects
|
||||||
from atst.domain.environments import Environments
|
from atst.domain.environments import Environments
|
||||||
@@ -304,13 +304,30 @@ def test_can_create_workspaces_with_matching_names():
|
|||||||
Workspaces.create(RequestFactory.create(), name=workspace_name)
|
Workspaces.create(RequestFactory.create(), name=workspace_name)
|
||||||
|
|
||||||
|
|
||||||
def test_can_remove_workspace_access():
|
def test_can_revoke_workspace_access():
|
||||||
workspace = WorkspaceFactory.create()
|
workspace = WorkspaceFactory.create()
|
||||||
workspace_role = WorkspaceRoleFactory.create(workspace=workspace)
|
workspace_role = WorkspaceRoleFactory.create(workspace=workspace)
|
||||||
Workspaces.revoke_access(workspace.owner, workspace.id, workspace_role.id)
|
Workspaces.revoke_access(workspace.owner, workspace.id, workspace_role.id)
|
||||||
assert Workspaces.for_user(workspace_role.user) == []
|
assert Workspaces.for_user(workspace_role.user) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_can_revoke_access():
|
||||||
|
workspace = WorkspaceFactory.create()
|
||||||
|
owner_role = workspace.roles[0]
|
||||||
|
workspace_role = WorkspaceRoleFactory.create(workspace=workspace)
|
||||||
|
|
||||||
|
assert Workspaces.can_revoke_access(workspace, workspace_role)
|
||||||
|
assert not Workspaces.can_revoke_access(workspace, owner_role)
|
||||||
|
|
||||||
|
|
||||||
|
def test_cant_revoke_owner_workspace_access():
|
||||||
|
workspace = WorkspaceFactory.create()
|
||||||
|
owner_workspace_role = workspace.roles[0]
|
||||||
|
|
||||||
|
with pytest.raises(WorkspaceError):
|
||||||
|
Workspaces.revoke_access(workspace.owner, workspace.id, owner_workspace_role.id)
|
||||||
|
|
||||||
|
|
||||||
def test_disabled_members_dont_show_up(session):
|
def test_disabled_members_dont_show_up(session):
|
||||||
workspace = WorkspaceFactory.create()
|
workspace = WorkspaceFactory.create()
|
||||||
WorkspaceRoleFactory.create(workspace=workspace, status=WorkspaceRoleStatus.ACTIVE)
|
WorkspaceRoleFactory.create(workspace=workspace, status=WorkspaceRoleStatus.ACTIVE)
|
||||||
|
Reference in New Issue
Block a user