Merge pull request #449 from dod-ccpo/remove-workspace-access

Remove workspace access
This commit is contained in:
richard-dds 2018-11-29 11:08:39 -05:00 committed by GitHub
commit 589e2fac07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 198 additions and 9 deletions

View File

@ -85,3 +85,13 @@ class Environments(object):
db.session.add(env_role)
db.session.commit()
@classmethod
def revoke_access(cls, user, environment, target_user):
Authorization.check_workspace_permission(
user,
environment.workspace,
Permissions.REMOVE_CSP_ROLES,
"revoke environment access",
)
EnvironmentRoles.delete(environment.id, target_user.id)

View File

@ -33,6 +33,13 @@ class WorkspaceRoles(object):
return workspace_role
@classmethod
def get_by_id(cls, id_):
try:
return db.session.query(WorkspaceRole).filter(WorkspaceRole.id == id_).one()
except NoResultFound:
raise NotFoundError("workspace_role")
@classmethod
def _get_active_workspace_role(cls, workspace_id, user_id):
try:

View File

@ -1 +1 @@
from .workspaces import Workspaces
from .workspaces import Workspaces, WorkspaceError

View File

@ -3,12 +3,17 @@ from atst.domain.authz import Authorization
from atst.models.permissions import Permissions
from atst.domain.users import Users
from atst.domain.workspace_roles import WorkspaceRoles
from atst.domain.environments import Environments
from atst.models.workspace_role import Status as WorkspaceRoleStatus
from .query import WorkspacesQuery
from .scopes import ScopedWorkspace
class WorkspaceError(Exception):
pass
class Workspaces(object):
@classmethod
def create(cls, request, name=None):
@ -138,3 +143,28 @@ class Workspaces(object):
workspace.name = new_data["name"]
WorkspacesQuery.add_and_commit(workspace)
@classmethod
def can_revoke_access_for(cls, workspace, workspace_role):
return workspace_role.user != workspace.owner
@classmethod
def revoke_access(cls, user, workspace_id, workspace_role_id):
workspace = WorkspacesQuery.get(workspace_id)
Authorization.check_workspace_permission(
user,
workspace,
Permissions.ASSIGN_AND_UNASSIGN_ATAT_ROLE,
"revoke workspace access",
)
workspace_role = WorkspaceRoles.get_by_id(workspace_role_id)
if not Workspaces.can_revoke_access_for(workspace, workspace_role):
raise WorkspaceError("cannot revoke workspace access for this user")
workspace_role.status = WorkspaceRoleStatus.DISABLED
for environment in workspace.all_environments:
Environments.revoke_access(user, environment, workspace_role.user)
WorkspacesQuery.add_and_commit(workspace_role)
return workspace_role

View File

@ -27,6 +27,10 @@ class Environment(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
def displayname(self):
return self.name
@property
def workspace(self):
return self.project.workspace
def auditable_workspace_id(self):
return self.project.workspace_id

View File

@ -1,16 +1,17 @@
from sqlalchemy import Column, ForeignKey, String
from sqlalchemy.orm import relationship
from itertools import chain
from atst.models import Base
from atst.models.types import Id
from atst.models import mixins
from atst.models import Base, mixins, types
from atst.models.workspace_role import WorkspaceRole, Status as WorkspaceRoleStatus
from atst.utils import first_or_none
from atst.database import db
class Workspace(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
__tablename__ = "workspaces"
id = Id()
id = types.Id()
name = Column(String)
request_id = Column(ForeignKey("requests.id"), nullable=False)
projects = relationship("Project", back_populates="workspace")
@ -38,12 +39,21 @@ class Workspace(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
@property
def members(self):
return self.roles
return (
db.session.query(WorkspaceRole)
.filter(WorkspaceRole.workspace_id == self.id)
.filter(WorkspaceRole.status != WorkspaceRoleStatus.DISABLED)
.all()
)
@property
def displayname(self):
return self.name
@property
def all_environments(self):
return list(chain.from_iterable(p.environments for p in self.projects))
def auditable_workspace_id(self):
return self.id

View File

@ -20,6 +20,7 @@ MEMBER_STATUSES = {
"error": "Error on invite",
"pending": "Pending",
"unknown": "Unknown errors",
"disabled": "Disabled",
}
@ -83,6 +84,8 @@ class WorkspaceRole(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
def display_status(self):
if self.status == Status.ACTIVE:
return MEMBER_STATUSES["active"]
elif self.status == Status.DISABLED:
return MEMBER_STATUSES["disabled"]
elif self.latest_invitation:
if self.latest_invitation.is_revoked:
return MEMBER_STATUSES["revoked"]

View File

@ -8,6 +8,7 @@ from atst.domain.invitations import (
ExpiredError as InvitationExpiredError,
WrongUserError as InvitationWrongUserError,
)
from atst.domain.workspaces import WorkspaceError
def log_error(e):
@ -24,6 +25,7 @@ def make_error_pages(app):
@app.errorhandler(werkzeug_exceptions.NotFound)
@app.errorhandler(exceptions.NotFoundError)
@app.errorhandler(exceptions.UnauthorizedError)
@app.errorhandler(WorkspaceError)
# pylint: disable=unused-variable
def not_found(e):
return handle_error(e)

View File

@ -168,3 +168,17 @@ def update_member(workspace_id, member_id):
workspace=workspace,
member=member,
)
@workspaces_bp.route(
"/workspaces/<workspace_id>/members/<member_id>/revoke_access", methods=["POST"]
)
def revoke_access(workspace_id, member_id):
revoked_role = Workspaces.revoke_access(g.current_user, workspace_id, member_id)
return redirect(
url_for(
"workspaces.workspace_members",
workspace_id=workspace_id,
revokedMemberName=revoked_role.user_name,
)
)

View File

@ -53,6 +53,14 @@
confirm_msg="Are you sure? This will send an email to invite the user to join this workspace."
)}}
{% endif %}
{% if can_revoke_access %}
{{ ConfirmationButton (
"Remove Workspace Access",
url_for("workspaces.revoke_access", workspace_id=workspace.id, member_id=member.id),
form.csrf_token,
confirm_msg="Are you sure? This will remove this user from the workspace.",
)}}
{% endif %}
</div>
</div>
</div>

View File

@ -44,6 +44,17 @@
) }}
{% endif %}
{% if revoked_member_name %}
{% set message -%}
<p>Removed {{ revoked_member_name }} from this workspace.</p>
{%- endset %}
{{ Alert('Removed workspace access',
message=message,
level='success'
) }}
{% endif %}
{% set member_name = request.args.get("memberName") %}
{% set updated_role = request.args.get("updatedRole") %}
{% if updated_role %}

View File

@ -2,7 +2,7 @@ import pytest
from uuid import uuid4
from atst.domain.exceptions import NotFoundError, UnauthorizedError
from atst.domain.workspaces import Workspaces
from atst.domain.workspaces import Workspaces, WorkspaceError
from atst.domain.workspace_roles import WorkspaceRoles
from atst.domain.projects import Projects
from atst.domain.environments import Environments
@ -11,8 +11,8 @@ from atst.models.workspace_role import Status as WorkspaceRoleStatus
from tests.factories import (
RequestFactory,
UserFactory,
InvitationFactory,
WorkspaceRoleFactory,
WorkspaceFactory,
)
@ -302,3 +302,38 @@ def test_can_create_workspaces_with_matching_names():
workspace_name = "Great Workspace"
Workspaces.create(RequestFactory.create(), name=workspace_name)
Workspaces.create(RequestFactory.create(), name=workspace_name)
def test_can_revoke_workspace_access():
workspace = WorkspaceFactory.create()
workspace_role = WorkspaceRoleFactory.create(workspace=workspace)
Workspaces.revoke_access(workspace.owner, workspace.id, workspace_role.id)
assert Workspaces.for_user(workspace_role.user) == []
def test_can_revoke_access():
workspace = WorkspaceFactory.create()
owner_role = workspace.roles[0]
workspace_role = WorkspaceRoleFactory.create(workspace=workspace)
assert Workspaces.can_revoke_access_for(workspace, workspace_role)
assert not Workspaces.can_revoke_access_for(workspace, owner_role)
def test_cant_revoke_owner_workspace_access():
workspace = WorkspaceFactory.create()
owner_workspace_role = workspace.roles[0]
with pytest.raises(WorkspaceError):
Workspaces.revoke_access(workspace.owner, workspace.id, owner_workspace_role.id)
def test_disabled_members_dont_show_up(session):
workspace = WorkspaceFactory.create()
WorkspaceRoleFactory.create(workspace=workspace, status=WorkspaceRoleStatus.ACTIVE)
WorkspaceRoleFactory.create(
workspace=workspace, status=WorkspaceRoleStatus.DISABLED
)
# should only return workspace owner and ACTIVE member
assert len(workspace.members) == 2

View File

@ -3,7 +3,6 @@ import datetime
from atst.domain.environments import Environments
from atst.domain.workspaces import Workspaces
from atst.domain.projects import Projects
from atst.domain.workspace_roles import WorkspaceRoles
from atst.models.workspace_role import Status
from atst.models.role import Role
from atst.models.invitation import Status as InvitationStatus
@ -16,7 +15,9 @@ from tests.factories import (
EnvironmentFactory,
EnvironmentRoleFactory,
ProjectFactory,
WorkspaceFactory,
)
from atst.domain.workspace_roles import WorkspaceRoles
def test_has_no_ws_role_history(session):
@ -234,3 +235,36 @@ def test_can_resend_invitation_if_expired():
invitations=[InvitationFactory.create(status=InvitationStatus.REJECTED_EXPIRED)]
)
assert workspace_role.can_resend_invitation
def test_can_list_all_environments():
workspace = WorkspaceFactory.create(
projects=[
{
"name": "project1",
"environments": [
{"name": "dev"},
{"name": "staging"},
{"name": "prod"},
],
},
{
"name": "project2",
"environments": [
{"name": "dev"},
{"name": "staging"},
{"name": "prod"},
],
},
{
"name": "project3",
"environments": [
{"name": "dev"},
{"name": "staging"},
{"name": "prod"},
],
},
]
)
assert len(workspace.all_environments) == 9

View File

@ -168,3 +168,24 @@ def test_update_member_environment_role_with_no_data(client, user_session):
)
assert response.status_code == 200
assert EnvironmentRoles.get(user.id, env1_id).role == "developer"
def test_revoke_member_access(client, user_session):
workspace = WorkspaceFactory.create()
user = UserFactory.create()
member = WorkspaceRoles.add(user, workspace.id, "developer")
Projects.create(
workspace.owner,
workspace,
"Snazzy Project",
"A new project for me and my friends",
{"env1"},
)
user_session(workspace.owner)
response = client.post(
url_for(
"workspaces.revoke_access", workspace_id=workspace.id, member_id=member.id
)
)
assert response.status_code == 302
assert WorkspaceRoles.get_by_id(member.id).num_environment_roles == 0