Merge pull request #264 from dod-ccpo/view-pe-as-workspace-user

View projects and environments as a workspace user
This commit is contained in:
richard-dds 2018-09-11 10:16:18 -04:00 committed by GitHub
commit 6a8cbcf1c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 220 additions and 11 deletions

View File

@ -0,0 +1,49 @@
"""remove view project and environment permissions
Revision ID: dea6b8e09d63
Revises: ad30159ef19b
Create Date: 2018-09-10 11:06:00.017222
"""
from alembic import op
from sqlalchemy.orm.session import Session
from atst.models.role import Role
from atst.models.permissions import Permissions
# revision identifiers, used by Alembic.
revision = "dea6b8e09d63"
down_revision = "ad30159ef19b"
branch_labels = None
depends_on = None
def upgrade():
session = Session(bind=op.get_bind())
priveleged_role_names = ("owner", "admin", "ccpo")
non_priveleged_roles = (
session.query(Role).filter(Role.name.notin_(priveleged_role_names)).all()
)
for role in non_priveleged_roles:
role.remove_permission(Permissions.VIEW_APPLICATION_IN_WORKSPACE)
role.remove_permission(Permissions.VIEW_ENVIRONMENT_IN_APPLICATION)
session.add(role)
session.commit()
def downgrade():
session = Session(bind=op.get_bind())
priveleged_role_names = ("owner", "admin", "ccpo")
non_priveleged_roles = (
session.query(Role).filter(not Role.name.in_(priveleged_role_names)).all()
)
for role in non_priveleged_roles:
role.add_permission(Permissions.VIEW_APPLICATION_IN_WORKSPACE)
role.add_permission(Permissions.VIEW_ENVIRONMENT_IN_APPLICATION)
session.add(role)
session.commit()

View File

@ -1,6 +1,7 @@
from atst.database import db from atst.database import db
from atst.models.environment import Environment from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole, CSPRole from atst.models.environment_role import EnvironmentRole, CSPRole
from atst.models.project import Project
class Environments(object): class Environments(object):
@ -19,11 +20,22 @@ class Environments(object):
db.session.commit() db.session.commit()
@classmethod @classmethod
def add_member(cls, user, environment, member): def add_member(cls, user, environment, member, role=CSPRole.NONSENSE_ROLE):
environment_user = EnvironmentRole( environment_user = EnvironmentRole(
user=member, environment=environment, role=CSPRole.NONSENSE_ROLE.value user=member, environment=environment, role=role.value
) )
db.session.add(environment_user) db.session.add(environment_user)
db.session.commit() db.session.commit()
return environment return environment
@classmethod
def for_user(cls, user, project):
return (
db.session.query(Environment)
.join(EnvironmentRole)
.join(Project)
.filter(EnvironmentRole.user_id == user.id)
.filter(Project.id == Environment.project_id)
.all()
)

View File

@ -4,14 +4,19 @@ from atst.domain.environments import Environments
from atst.domain.exceptions import NotFoundError from atst.domain.exceptions import NotFoundError
from atst.models.permissions import Permissions from atst.models.permissions import Permissions
from atst.models.project import Project from atst.models.project import Project
from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole
class Projects(object): class Projects(object):
@classmethod @classmethod
def create(cls, workspace, name, description, environment_names): def create(cls, user, workspace, name, description, environment_names):
project = Project(workspace=workspace, name=name, description=description) project = Project(workspace=workspace, name=name, description=description)
Environments.create_many(project, environment_names) Environments.create_many(project, environment_names)
for environment in project.environments:
Environments.add_member(user, environment, user)
db.session.add(project) db.session.add(project)
db.session.commit() db.session.commit()
@ -33,3 +38,14 @@ class Projects(object):
raise NotFoundError("project") raise NotFoundError("project")
return project return project
@classmethod
def for_user(self, user, workspace):
return (
db.session.query(Project)
.join(Environment)
.join(EnvironmentRole)
.filter(Project.workspace_id == workspace.id)
.filter(EnvironmentRole.user_id == user.id)
.all()
)

View File

@ -0,0 +1 @@
from .workspaces import Workspaces

View File

@ -0,0 +1,65 @@
from atst.domain.authz import Authorization
from atst.models.permissions import Permissions
from atst.domain.projects import Projects
from atst.domain.environments import Environments
class ScopedResource(object):
"""
An abstract class that represents a resource that is restricted
in some way by the priveleges of the user viewing that resource.
"""
def __init__(self, user, resource):
self.user = user
self.resource = resource
def __getattr__(self, name):
return getattr(self.resource, name)
def __eq__(self, other):
return self.resource == other
class ScopedWorkspace(ScopedResource):
"""
An object that obeys the same API as a Workspace, but with the added
functionality that it only returns sub-resources (projects and environments)
that the given user is allowed to see.
"""
@property
def projects(self):
can_view_all_projects = Authorization.has_workspace_permission(
self.user, self.resource, Permissions.VIEW_APPLICATION_IN_WORKSPACE
)
if can_view_all_projects:
projects = self.resource.projects
else:
projects = Projects.for_user(self.user, self.resource)
return [ScopedProject(self.user, project) for project in projects]
class ScopedProject(ScopedResource):
"""
An object that obeys the same API as a Workspace, but with the added
functionality that it only returns sub-resources (environments)
that the given user is allowed to see.
"""
@property
def environments(self):
can_view_all_environments = Authorization.has_workspace_permission(
self.user,
self.resource.workspace,
Permissions.VIEW_ENVIRONMENT_IN_APPLICATION,
)
if can_view_all_environments:
environments = self.resource.environments
else:
environments = Environments.for_user(self.user, self.resource)
return environments

View File

@ -9,6 +9,7 @@ from atst.domain.authz import Authorization
from atst.models.permissions import Permissions from atst.models.permissions import Permissions
from atst.domain.users import Users from atst.domain.users import Users
from atst.domain.workspace_users import WorkspaceUsers from atst.domain.workspace_users import WorkspaceUsers
from .scopes import ScopedWorkspace
class Workspaces(object): class Workspaces(object):
@ -30,7 +31,7 @@ class Workspaces(object):
user, workspace, Permissions.VIEW_WORKSPACE, "get workspace" user, workspace, Permissions.VIEW_WORKSPACE, "get workspace"
) )
return workspace return ScopedWorkspace(user, workspace)
@classmethod @classmethod
def get_for_update(cls, user, workspace_id): def get_for_update(cls, user, workspace_id):
@ -87,9 +88,11 @@ class Workspaces(object):
last_name=data["last_name"], last_name=data["last_name"],
email=data["email"], email=data["email"],
) )
workspace_user = WorkspaceUsers.add( return Workspaces.add_member(workspace, new_user, data["workspace_role"])
new_user, workspace.id, data["workspace_role"]
) @classmethod
def add_member(cls, workspace, member, role_name):
workspace_user = WorkspaceUsers.add(member, workspace.id, role_name)
return workspace_user return workspace_user
@classmethod @classmethod

View File

@ -22,7 +22,8 @@ class Workspace(Base, TimestampsMixin):
def _is_workspace_owner(workspace_role): def _is_workspace_owner(workspace_role):
return workspace_role.role.name == "owner" return workspace_role.role.name == "owner"
return first_or_none(_is_workspace_owner, self.roles) owner = first_or_none(_is_workspace_owner, self.roles)
return owner.user if owner else None
@property @property
def users(self): def users(self):

View File

@ -112,6 +112,7 @@ def create_project(workspace_id):
if form.validate(): if form.validate():
project_data = form.data project_data = form.data
Projects.create( Projects.create(
g.current_user,
workspace, workspace,
project_data["name"], project_data["name"],
project_data["description"], project_data["description"],

View File

@ -68,6 +68,7 @@ def seed_db():
Workspaces.create_member(user, workspace, workspace_user) Workspaces.create_member(user, workspace, workspace_user)
Projects.create( Projects.create(
user,
workspace=workspace, workspace=workspace,
name="First Project", name="First Project",
description="This is our first project.", description="This is our first project.",

View File

@ -4,7 +4,9 @@ from tests.factories import WorkspaceFactory
def test_create_project_with_multiple_environments(): def test_create_project_with_multiple_environments():
workspace = WorkspaceFactory.create() workspace = WorkspaceFactory.create()
project = Projects.create(workspace, "My Test Project", "Test", ["dev", "prod"]) project = Projects.create(
workspace.owner, workspace, "My Test Project", "Test", ["dev", "prod"]
)
assert project.workspace == workspace assert project.workspace == workspace
assert project.name == "My Test Project" assert project.name == "My Test Project"

View File

@ -4,6 +4,8 @@ from uuid import uuid4
from atst.domain.exceptions import NotFoundError, UnauthorizedError from atst.domain.exceptions import NotFoundError, UnauthorizedError
from atst.domain.workspaces import Workspaces from atst.domain.workspaces import Workspaces
from atst.domain.workspace_users import WorkspaceUsers from atst.domain.workspace_users import WorkspaceUsers
from atst.domain.projects import Projects
from atst.domain.environments import Environments
from tests.factories import WorkspaceFactory, RequestFactory, UserFactory from tests.factories import WorkspaceFactory, RequestFactory, UserFactory
@ -179,3 +181,59 @@ def test_random_user_cannot_view_workspace_members():
with pytest.raises(UnauthorizedError): with pytest.raises(UnauthorizedError):
workspace = Workspaces.get_with_members(developer, workspace.id) workspace = Workspaces.get_with_members(developer, workspace.id)
def test_scoped_workspace_only_returns_a_users_projects_and_environments():
workspace = WorkspaceFactory.create()
new_project = Projects.create(
workspace.owner,
workspace,
"My Project",
"My project",
["dev", "staging", "prod"],
)
developer = UserFactory.from_atat_role("developer")
dev_environment = Environments.add_member(
workspace.owner, new_project.environments[0], developer
)
scoped_workspace = Workspaces.get(developer, workspace.id)
# Should only return the project and environment in which the user has an
# environment role.
assert scoped_workspace.projects == [new_project]
assert scoped_workspace.projects[0].environments == [dev_environment]
def test_scoped_workspace_returns_all_projects_for_workspace_admin():
workspace = Workspaces.create(RequestFactory.create())
for _ in range(5):
Projects.create(
workspace.owner,
workspace,
"My Project",
"My project",
["dev", "staging", "prod"],
)
admin = Workspaces.add_member(
workspace, UserFactory.from_atat_role("default"), "admin"
).user
scoped_workspace = Workspaces.get(admin, workspace.id)
assert len(scoped_workspace.projects) == 5
assert len(scoped_workspace.projects[0].environments) == 3
def test_scoped_workspace_returns_all_projects_for_workspace_owner():
workspace = Workspaces.create(RequestFactory.create())
owner = workspace.owner
for _ in range(5):
Projects.create(
owner, workspace, "My Project", "My project", ["dev", "staging", "prod"]
)
scoped_workspace = Workspaces.get(owner, workspace.id)
assert len(scoped_workspace.projects) == 5
assert len(scoped_workspace.projects[0].environments) == 3

View File

@ -10,7 +10,7 @@ def test_add_user_to_environment():
workspace = Workspaces.create(RequestFactory.create(creator=owner)) workspace = Workspaces.create(RequestFactory.create(creator=owner))
project = Projects.create( project = Projects.create(
workspace, "my test project", "It's mine.", ["dev", "staging", "prod"] owner, workspace, "my test project", "It's mine.", ["dev", "staging", "prod"]
) )
dev_environment = project.environments[0] dev_environment = project.environments[0]

View File

@ -33,7 +33,7 @@ def test_has_environment_roles():
workspace = Workspaces.create(RequestFactory.create(creator=owner)) workspace = Workspaces.create(RequestFactory.create(creator=owner))
workspace_user = Workspaces.create_member(owner, workspace, developer_data) workspace_user = Workspaces.create_member(owner, workspace, developer_data)
project = Projects.create( project = Projects.create(
workspace, "my test project", "It's mine.", ["dev", "staging", "prod"] owner, workspace, "my test project", "It's mine.", ["dev", "staging", "prod"]
) )
Environments.add_member(owner, project.environments[0], workspace_user.user) Environments.add_member(owner, project.environments[0], workspace_user.user)
assert workspace_user.has_environment_roles assert workspace_user.has_environment_roles