Merge pull request #281 from dod-ccpo/refactory-query

Query refactors
This commit is contained in:
richard-dds 2018-09-13 14:52:47 -04:00 committed by GitHub
commit 7877b4bdef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 248 additions and 243 deletions

View File

@ -0,0 +1 @@
from .query import Query

View File

@ -0,0 +1,37 @@
from sqlalchemy.exc import DataError
from sqlalchemy.orm.exc import NoResultFound
from atst.domain.exceptions import NotFoundError
from atst.database import db
class Query(object):
model = None
@property
def resource_name(cls):
return cls.model.__class__.lower()
@classmethod
def create(cls, **kwargs):
# pylint: disable=E1102
return cls.model(**kwargs)
@classmethod
def get(cls, id_):
try:
resource = db.session.query(cls.model).filter_by(id=id_).one()
return resource
except (NoResultFound, DataError):
raise NotFoundError(cls.resource_name)
@classmethod
def get_all(cls):
return db.session.query(cls.model).all()
@classmethod
def add_and_commit(cls, resource):
db.session.add(resource)
db.session.commit()
return resource

View File

@ -0,0 +1 @@
from .requests import Requests, create_revision_from_request_body

View File

@ -0,0 +1,71 @@
from sqlalchemy import exists, and_, exc, text
from atst.database import db
from atst.domain.common import Query
from atst.models.request import Request
class RequestsQuery(Query):
model = Request
@classmethod
def exists(cls, request_id, creator):
try:
return db.session.query(
exists().where(
and_(Request.id == request_id, Request.creator == creator)
)
).scalar()
except exc.DataError:
return False
@classmethod
def get_many(cls, creator=None):
filters = []
if creator:
filters.append(Request.creator == creator)
requests = (
db.session.query(Request)
.filter(*filters)
.order_by(Request.time_created.desc())
.all()
)
return requests
@classmethod
def get_with_lock(cls, request_id):
try:
# Query for request matching id, acquiring a row-level write lock.
# https://www.postgresql.org/docs/10/static/sql-select.html#SQL-FOR-UPDATE-SHARE
return (
db.session.query(Request)
.filter_by(id=request_id)
.with_for_update(of=Request)
.one()
)
except NoResultFound:
raise NotFoundError("requests")
@classmethod
def status_count(cls, status, creator=None):
bindings = {"status": status.name}
raw = """
SELECT count(requests_with_status.id)
FROM (
SELECT DISTINCT ON (rse.request_id) r.*, rse.new_status as status
FROM request_status_events rse JOIN requests r ON r.id = rse.request_id
ORDER BY rse.request_id, rse.sequence DESC
) as requests_with_status
WHERE requests_with_status.status = :status
"""
if creator:
raw += " AND requests_with_status.user_id = :user_id"
bindings["user_id"] = creator.id
results = db.session.execute(text(raw), bindings).fetchone()
(count,) = results
return count

View File

@ -1,22 +1,18 @@
from enum import Enum
from sqlalchemy import exists, and_, exc
from sqlalchemy.sql import text
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.datastructures import FileStorage from werkzeug.datastructures import FileStorage
import dateutil import dateutil
from atst.database import db
from atst.domain.authz import Authorization from atst.domain.authz import Authorization
from atst.domain.task_orders import TaskOrders from atst.domain.task_orders import TaskOrders
from atst.domain.workspaces import Workspaces from atst.domain.workspaces import Workspaces
from atst.models.request import Request
from atst.models.request_revision import RequestRevision from atst.models.request_revision import RequestRevision
from atst.models.request_status_event import RequestStatusEvent, RequestStatus from atst.models.request_status_event import RequestStatusEvent, RequestStatus
from atst.models.request_review import RequestReview from atst.models.request_review import RequestReview
from atst.models.request_internal_comment import RequestInternalComment from atst.models.request_internal_comment import RequestInternalComment
from atst.utils import deep_merge from atst.utils import deep_merge
from .exceptions import NotFoundError, UnauthorizedError from atst.domain.exceptions import UnauthorizedError
from .query import RequestsQuery
def create_revision_from_request_body(body): def create_revision_from_request_body(body):
@ -38,38 +34,19 @@ class Requests(object):
@classmethod @classmethod
def create(cls, creator, body): def create(cls, creator, body):
revision = create_revision_from_request_body(body) revision = create_revision_from_request_body(body)
request = Request(creator=creator, revisions=[revision]) request = RequestsQuery.create(creator=creator, revisions=[revision])
request = Requests.set_status(request, RequestStatus.STARTED) request = Requests.set_status(request, RequestStatus.STARTED)
request = RequestsQuery.add_and_commit(request)
db.session.add(request)
db.session.commit()
return request return request
@classmethod @classmethod
def exists(cls, request_id, creator): def exists(cls, request_id, creator):
try: return RequestsQuery.exists(request_id, creator)
return db.session.query(
exists().where(
and_(Request.id == request_id, Request.creator == creator)
)
).scalar()
except exc.DataError:
return False
@classmethod
def _get(cls, user, request_id):
try:
request = db.session.query(Request).filter_by(id=request_id).one()
except (NoResultFound, exc.DataError):
raise NotFoundError("request")
return request
@classmethod @classmethod
def get(cls, user, request_id): def get(cls, user, request_id):
request = Requests._get(user, request_id) request = RequestsQuery.get(request_id)
if not Authorization.can_view_request(user, request): if not Authorization.can_view_request(user, request):
raise UnauthorizedError(user, "get request") raise UnauthorizedError(user, "get request")
@ -78,7 +55,7 @@ class Requests(object):
@classmethod @classmethod
def get_for_approval(cls, user, request_id): def get_for_approval(cls, user, request_id):
request = Requests._get(user, request_id) request = RequestsQuery.get(request_id)
Authorization.check_can_approve_request(user) Authorization.check_can_approve_request(user)
@ -86,17 +63,7 @@ class Requests(object):
@classmethod @classmethod
def get_many(cls, creator=None): def get_many(cls, creator=None):
filters = [] return RequestsQuery.get_many(creator)
if creator:
filters.append(Request.creator == creator)
requests = (
db.session.query(Request)
.filter(*filters)
.order_by(Request.time_created.desc())
.all()
)
return requests
@classmethod @classmethod
def submit(cls, request): def submit(cls, request):
@ -109,52 +76,33 @@ class Requests(object):
new_status = RequestStatus.PENDING_CCPO_ACCEPTANCE new_status = RequestStatus.PENDING_CCPO_ACCEPTANCE
request = Requests.set_status(request, new_status) request = Requests.set_status(request, new_status)
request = RequestsQuery.add_and_commit(request)
db.session.add(request)
db.session.commit()
return request return request
@classmethod @classmethod
def update(cls, request_id, request_delta): def update(cls, request_id, request_delta):
request = Requests._get_with_lock(request_id) request = RequestsQuery.get_with_lock(request_id)
new_body = deep_merge(request_delta, request.body) new_body = deep_merge(request_delta, request.body)
revision = create_revision_from_request_body(new_body) revision = create_revision_from_request_body(new_body)
request.revisions.append(revision) request.revisions.append(revision)
db.session.add(request) request = RequestsQuery.add_and_commit(request)
db.session.commit()
return request return request
@classmethod
def _get_with_lock(cls, request_id):
try:
# Query for request matching id, acquiring a row-level write lock.
# https://www.postgresql.org/docs/10/static/sql-select.html#SQL-FOR-UPDATE-SHARE
return (
db.session.query(Request)
.filter_by(id=request_id)
.with_for_update(of=Request)
.one()
)
except NoResultFound:
raise NotFoundError()
@classmethod @classmethod
def approve_and_create_workspace(cls, request): def approve_and_create_workspace(cls, request):
approved_request = Requests.set_status(request, RequestStatus.APPROVED) approved_request = Requests.set_status(request, RequestStatus.APPROVED)
workspace = Workspaces.create(approved_request) workspace = Workspaces.create(approved_request)
db.session.add(approved_request) RequestsQuery.add_and_commit(approved_request)
db.session.commit()
return workspace return workspace
@classmethod @classmethod
def set_status(cls, request: Request, status: RequestStatus): def set_status(cls, request, status: RequestStatus):
status_event = RequestStatusEvent( status_event = RequestStatusEvent(
new_status=status, revision=request.latest_revision new_status=status, revision=request.latest_revision
) )
@ -205,26 +153,7 @@ class Requests(object):
@classmethod @classmethod
def status_count(cls, status, creator=None): def status_count(cls, status, creator=None):
if isinstance(status, Enum): return RequestsQuery.status_count(status, creator)
status = status.name
bindings = {"status": status}
raw = """
SELECT count(requests_with_status.id)
FROM (
SELECT DISTINCT ON (rse.request_id) r.*, rse.new_status as status
FROM request_status_events rse JOIN requests r ON r.id = rse.request_id
ORDER BY rse.request_id, rse.sequence DESC
) as requests_with_status
WHERE requests_with_status.status = :status
"""
if creator:
raw += " AND requests_with_status.user_id = :user_id"
bindings["user_id"] = creator.id
results = db.session.execute(text(raw), bindings).fetchone()
(count,) = results
return count
@classmethod @classmethod
def in_progress_count(cls): def in_progress_count(cls):
@ -251,7 +180,7 @@ WHERE requests_with_status.status = :status
@classmethod @classmethod
def update_financial_verification(cls, request_id, financial_data): def update_financial_verification(cls, request_id, financial_data):
request = Requests._get_with_lock(request_id) request = RequestsQuery.get_with_lock(request_id)
request_data = financial_data.copy() request_data = financial_data.copy()
task_order_data = { task_order_data = {
@ -283,20 +212,14 @@ WHERE requests_with_status.status = :status
@classmethod @classmethod
def submit_financial_verification(cls, request): def submit_financial_verification(cls, request):
Requests.set_status(request, RequestStatus.PENDING_CCPO_APPROVAL) request = Requests.set_status(request, RequestStatus.PENDING_CCPO_APPROVAL)
request = RequestsQuery.add_and_commit(request)
db.session.add(request)
db.session.commit()
return request return request
@classmethod @classmethod
def _add_review(cls, user, request, review_data): def _add_review(cls, user, request, review_data):
request.latest_status.review = RequestReview(reviewer=user, **review_data) request.latest_status.review = RequestReview(reviewer=user, **review_data)
request = RequestsQuery.add_and_commit(request)
db.session.add(request)
db.session.commit()
return request return request
@classmethod @classmethod
@ -320,9 +243,6 @@ WHERE requests_with_status.status = :status
@classmethod @classmethod
def update_internal_comments(cls, user, request, comment_text): def update_internal_comments(cls, user, request, comment_text):
Authorization.check_can_approve_request(user) Authorization.check_can_approve_request(user)
request.internal_comments = RequestInternalComment(text=comment_text, user=user) request.internal_comments = RequestInternalComment(text=comment_text, user=user)
db.session.add(request) request = RequestsQuery.add_and_commit(request)
db.session.commit()
return request return request

View File

@ -0,0 +1,33 @@
from sqlalchemy.orm.exc import NoResultFound
from atst.database import db
from atst.domain.common import Query
from atst.domain.exceptions import NotFoundError
from atst.models.workspace import Workspace
from atst.models.workspace_role import WorkspaceRole
class WorkspacesQuery(Query):
model = Workspace
@classmethod
def get_by_request(cls, request):
try:
workspace = db.session.query(Workspace).filter_by(request=request).one()
except NoResultFound:
raise NotFoundError("workspace")
return workspace
@classmethod
def get_for_user(cls, user):
return (
db.session.query(Workspace)
.join(WorkspaceRole)
.filter(WorkspaceRole.user == user)
.all()
)
@classmethod
def create_workspace_role(cls, user, role, workspace):
return WorkspaceRole(user=user, role=role, workspace=workspace)

View File

@ -1,14 +1,10 @@
from sqlalchemy.orm.exc import NoResultFound
from atst.database import db
from atst.models.workspace import Workspace
from atst.models.workspace_role import WorkspaceRole
from atst.domain.exceptions import NotFoundError
from atst.domain.roles import Roles from atst.domain.roles import Roles
from atst.domain.authz import Authorization from atst.domain.authz import Authorization
from atst.models.permissions import Permissions from atst.models.permissions import Permissions
from atst.domain.users import Users from atst.domain.users import Users
from atst.domain.workspace_users import WorkspaceUsers from atst.domain.workspace_users import WorkspaceUsers
from .query import WorkspacesQuery
from .scopes import ScopedWorkspace from .scopes import ScopedWorkspace
@ -16,17 +12,14 @@ class Workspaces(object):
@classmethod @classmethod
def create(cls, request, name=None): def create(cls, request, name=None):
name = name or request.id name = name or request.id
workspace = Workspace(request=request, name=name) workspace = WorkspacesQuery.create(request=request, name=name)
Workspaces._create_workspace_role(request.creator, workspace, "owner") Workspaces._create_workspace_role(request.creator, workspace, "owner")
WorkspacesQuery.add_and_commit(workspace)
db.session.add(workspace)
db.session.commit()
return workspace return workspace
@classmethod @classmethod
def get(cls, user, workspace_id): def get(cls, user, workspace_id):
workspace = Workspaces._get(workspace_id) workspace = WorkspacesQuery.get(workspace_id)
Authorization.check_workspace_permission( Authorization.check_workspace_permission(
user, workspace, Permissions.VIEW_WORKSPACE, "get workspace" user, workspace, Permissions.VIEW_WORKSPACE, "get workspace"
) )
@ -35,7 +28,7 @@ class Workspaces(object):
@classmethod @classmethod
def get_for_update(cls, user, workspace_id): def get_for_update(cls, user, workspace_id):
workspace = Workspaces._get(workspace_id) workspace = WorkspacesQuery.get(workspace_id)
Authorization.check_workspace_permission( Authorization.check_workspace_permission(
user, workspace, Permissions.ADD_APPLICATION_IN_WORKSPACE, "add project" user, workspace, Permissions.ADD_APPLICATION_IN_WORKSPACE, "add project"
) )
@ -44,16 +37,11 @@ class Workspaces(object):
@classmethod @classmethod
def get_by_request(cls, request): def get_by_request(cls, request):
try: return WorkspacesQuery.get_by_request(request)
workspace = db.session.query(Workspace).filter_by(request=request).one()
except NoResultFound:
raise NotFoundError("workspace")
return workspace
@classmethod @classmethod
def get_with_members(cls, user, workspace_id): def get_with_members(cls, user, workspace_id):
workspace = Workspaces._get(workspace_id) workspace = WorkspacesQuery.get(workspace_id)
Authorization.check_workspace_permission( Authorization.check_workspace_permission(
user, user,
workspace, workspace,
@ -63,27 +51,12 @@ class Workspaces(object):
return workspace return workspace
@classmethod
def get_many(cls, user):
workspaces = (
db.session.query(Workspace)
.join(WorkspaceRole)
.filter(WorkspaceRole.user == user)
.all()
)
return workspaces
@classmethod @classmethod
def for_user(cls, user): def for_user(cls, user):
if Authorization.has_atat_permission(user, Permissions.VIEW_WORKSPACE): if Authorization.has_atat_permission(user, Permissions.VIEW_WORKSPACE):
workspaces = db.session.query(Workspace).all() workspaces = WorkspacesQuery.get_all()
else: else:
workspaces = ( workspaces = WorkspacesQuery.get_for_user(user)
db.session.query(Workspace)
.join(WorkspaceRole)
.filter(WorkspaceRole.user == user)
.all()
)
return workspaces return workspaces
@classmethod @classmethod
@ -122,15 +95,6 @@ class Workspaces(object):
@classmethod @classmethod
def _create_workspace_role(cls, user, workspace, role_name): def _create_workspace_role(cls, user, workspace, role_name):
role = Roles.get(role_name) role = Roles.get(role_name)
workspace_role = WorkspaceRole(user=user, role=role, workspace=workspace) workspace_role = WorkspacesQuery.create_workspace_role(user, role, workspace)
db.session.add(workspace_role) WorkspacesQuery.add_and_commit(workspace_role)
return workspace_role return workspace_role
@classmethod
def _get(cls, workspace_id):
try:
workspace = db.session.query(Workspace).filter_by(id=workspace_id).one()
except NoResultFound:
raise NotFoundError("workspace")
return workspace

View File

@ -7,20 +7,36 @@ from atst.domain.workspace_users import WorkspaceUsers
from atst.domain.projects import Projects from atst.domain.projects import Projects
from atst.domain.environments import Environments from atst.domain.environments import Environments
from tests.factories import WorkspaceFactory, RequestFactory, UserFactory from tests.factories import RequestFactory, UserFactory
def test_can_create_workspace(): @pytest.fixture(scope="function")
request = RequestFactory.create() def workspace_owner():
workspace = Workspaces.create(request, name="frugal-whale") return UserFactory.create()
@pytest.fixture(scope="function")
def request_(workspace_owner):
return RequestFactory.create(creator=workspace_owner)
@pytest.fixture(scope="function")
def workspace(request_):
workspace = Workspaces.create(request_)
return workspace
def test_can_create_workspace(request_):
workspace = Workspaces.create(request_, name="frugal-whale")
assert workspace.name == "frugal-whale" assert workspace.name == "frugal-whale"
assert workspace.request == request
def test_default_workspace_name_is_request_id(): def test_request_is_associated_with_workspace(workspace, request_):
request = RequestFactory.create() assert workspace.request == request_
workspace = Workspaces.create(request)
assert workspace.name == str(request.id)
def test_default_workspace_name_is_request_id(workspace, request_):
assert workspace.name == str(request_.id)
def test_get_nonexistent_workspace_raises(): def test_get_nonexistent_workspace_raises():
@ -28,73 +44,38 @@ def test_get_nonexistent_workspace_raises():
Workspaces.get(UserFactory.build(), uuid4()) Workspaces.get(UserFactory.build(), uuid4())
def test_can_get_workspace_by_request(): def test_can_get_workspace_by_request(workspace):
workspace = WorkspaceFactory.create()
found = Workspaces.get_by_request(workspace.request) found = Workspaces.get_by_request(workspace.request)
assert workspace == found assert workspace == found
def test_creating_workspace_adds_owner(): def test_creating_workspace_adds_owner(workspace, workspace_owner):
user = UserFactory.create() assert workspace.roles[0].user == workspace_owner
request = RequestFactory.create(creator=user)
workspace = Workspaces.create(request)
assert workspace.roles[0].user == user
def test_workspace_has_timestamps(): def test_workspace_has_timestamps(workspace):
request = RequestFactory.create()
workspace = Workspaces.create(request)
assert workspace.time_created == workspace.time_updated assert workspace.time_created == workspace.time_updated
def test_workspaces_get_ensures_user_is_in_workspace(): def test_workspaces_get_ensures_user_is_in_workspace(workspace, workspace_owner):
owner = UserFactory.create()
outside_user = UserFactory.create() outside_user = UserFactory.create()
workspace = Workspaces.create(RequestFactory.create(creator=owner))
workspace_ = Workspaces.get(owner, workspace.id)
assert workspace_ == workspace
with pytest.raises(UnauthorizedError): with pytest.raises(UnauthorizedError):
Workspaces.get(outside_user, workspace.id) Workspaces.get(outside_user, workspace.id)
def test_workspaces_get_many_with_no_workspaces(): def test_get_for_update_allows_owner(workspace, workspace_owner):
workspaces = Workspaces.get_many(UserFactory.build()) Workspaces.get_for_update(workspace_owner, workspace.id)
assert workspaces == []
def test_workspaces_get_many_returns_a_users_workspaces(): def test_get_for_update_blocks_developer(workspace):
user = UserFactory.create()
users_workspace = Workspaces.create(RequestFactory.create(creator=user))
# random workspace
Workspaces.create(RequestFactory.create())
assert Workspaces.get_many(user) == [users_workspace]
def test_get_for_update_allows_owner():
owner = UserFactory.create()
workspace = Workspaces.create(RequestFactory.create(creator=owner))
Workspaces.get_for_update(owner, workspace.id)
def test_get_for_update_blocks_developer():
owner = UserFactory.create()
developer = UserFactory.create() developer = UserFactory.create()
workspace = Workspaces.create(RequestFactory.create(creator=owner))
WorkspaceUsers.add(developer, workspace.id, "developer") WorkspaceUsers.add(developer, workspace.id, "developer")
with pytest.raises(UnauthorizedError): with pytest.raises(UnauthorizedError):
Workspaces.get_for_update(developer, workspace.id) Workspaces.get_for_update(developer, workspace.id)
def test_can_create_workspace_user(): def test_can_create_workspace_user(workspace, workspace_owner):
owner = UserFactory.create()
workspace = Workspaces.create(RequestFactory.create(creator=owner))
user_data = { user_data = {
"first_name": "New", "first_name": "New",
"last_name": "User", "last_name": "User",
@ -103,12 +84,11 @@ def test_can_create_workspace_user():
"dod_id": "1234567890", "dod_id": "1234567890",
} }
new_member = Workspaces.create_member(owner, workspace, user_data) new_member = Workspaces.create_member(workspace_owner, workspace, user_data)
assert new_member.workspace == workspace assert new_member.workspace == workspace
def test_need_permission_to_create_workspace_user(): def test_need_permission_to_create_workspace_user(workspace, workspace_owner):
workspace = Workspaces.create(request=RequestFactory.create())
random_user = UserFactory.create() random_user = UserFactory.create()
user_data = { user_data = {
@ -123,9 +103,7 @@ def test_need_permission_to_create_workspace_user():
Workspaces.create_member(random_user, workspace, user_data) Workspaces.create_member(random_user, workspace, user_data)
def test_update_workspace_user_role(): def test_update_workspace_user_role(workspace, workspace_owner):
owner = UserFactory.create()
workspace = Workspaces.create(RequestFactory.create(creator=owner))
user_data = { user_data = {
"first_name": "New", "first_name": "New",
"last_name": "User", "last_name": "User",
@ -133,17 +111,17 @@ def test_update_workspace_user_role():
"workspace_role": "developer", "workspace_role": "developer",
"dod_id": "1234567890", "dod_id": "1234567890",
} }
member = Workspaces.create_member(owner, workspace, user_data) member = Workspaces.create_member(workspace_owner, workspace, user_data)
role_name = "admin" role_name = "admin"
updated_member = Workspaces.update_member(owner, workspace, member, role_name) updated_member = Workspaces.update_member(
workspace_owner, workspace, member, role_name
)
assert updated_member.workspace == workspace assert updated_member.workspace == workspace
assert updated_member.role == role_name assert updated_member.role == role_name
def test_need_permission_to_update_workspace_user_role(): def test_need_permission_to_update_workspace_user_role(workspace, workspace_owner):
owner = UserFactory.create()
workspace = Workspaces.create(RequestFactory.create(creator=owner))
random_user = UserFactory.create() random_user = UserFactory.create()
user_data = { user_data = {
"first_name": "New", "first_name": "New",
@ -152,41 +130,38 @@ def test_need_permission_to_update_workspace_user_role():
"workspace_role": "developer", "workspace_role": "developer",
"dod_id": "1234567890", "dod_id": "1234567890",
} }
member = Workspaces.create_member(owner, workspace, user_data) member = Workspaces.create_member(workspace_owner, workspace, user_data)
role_name = "developer" role_name = "developer"
with pytest.raises(UnauthorizedError): with pytest.raises(UnauthorizedError):
Workspaces.update_member(random_user, workspace, member, role_name) Workspaces.update_member(random_user, workspace, member, role_name)
def test_owner_can_view_workspace_members(): def test_owner_can_view_workspace_members(workspace, workspace_owner):
owner = UserFactory.create() workspace_owner = UserFactory.create()
workspace = Workspaces.create(RequestFactory.create(creator=owner)) workspace = Workspaces.create(RequestFactory.create(creator=workspace_owner))
workspace = Workspaces.get_with_members(owner, workspace.id) workspace = Workspaces.get_with_members(workspace_owner, workspace.id)
assert workspace assert workspace
def test_ccpo_can_view_workspace_members(): def test_ccpo_can_view_workspace_members(workspace, workspace_owner):
workspace = Workspaces.create(RequestFactory.create(creator=UserFactory.create()))
ccpo = UserFactory.from_atat_role("ccpo") ccpo = UserFactory.from_atat_role("ccpo")
workspace = Workspaces.get_with_members(ccpo, workspace.id) assert Workspaces.get_with_members(ccpo, workspace.id)
assert workspace
def test_random_user_cannot_view_workspace_members(): def test_random_user_cannot_view_workspace_members(workspace):
workspace = Workspaces.create(RequestFactory.create(creator=UserFactory.create()))
developer = UserFactory.from_atat_role("developer") developer = UserFactory.from_atat_role("developer")
with pytest.raises(UnauthorizedError): with pytest.raises(UnauthorizedError):
workspace = Workspaces.get_with_members(developer, workspace.id) workspace = Workspaces.get_with_members(developer, workspace.id)
def test_scoped_workspace_only_returns_a_users_projects_and_environments(): def test_scoped_workspace_only_returns_a_users_projects_and_environments(
workspace = WorkspaceFactory.create() workspace, workspace_owner
):
new_project = Projects.create( new_project = Projects.create(
workspace.owner, workspace_owner,
workspace, workspace,
"My Project", "My Project",
"My project", "My project",
@ -194,7 +169,7 @@ def test_scoped_workspace_only_returns_a_users_projects_and_environments():
) )
developer = UserFactory.from_atat_role("developer") developer = UserFactory.from_atat_role("developer")
dev_environment = Environments.add_member( dev_environment = Environments.add_member(
workspace.owner, new_project.environments[0], developer workspace_owner, new_project.environments[0], developer
) )
scoped_workspace = Workspaces.get(developer, workspace.id) scoped_workspace = Workspaces.get(developer, workspace.id)
@ -205,11 +180,12 @@ def test_scoped_workspace_only_returns_a_users_projects_and_environments():
assert scoped_workspace.projects[0].environments == [dev_environment] assert scoped_workspace.projects[0].environments == [dev_environment]
def test_scoped_workspace_returns_all_projects_for_workspace_admin(): def test_scoped_workspace_returns_all_projects_for_workspace_admin(
workspace = Workspaces.create(RequestFactory.create()) workspace, workspace_owner
):
for _ in range(5): for _ in range(5):
Projects.create( Projects.create(
workspace.owner, workspace_owner,
workspace, workspace,
"My Project", "My Project",
"My project", "My project",
@ -225,34 +201,35 @@ def test_scoped_workspace_returns_all_projects_for_workspace_admin():
assert len(scoped_workspace.projects[0].environments) == 3 assert len(scoped_workspace.projects[0].environments) == 3
def test_scoped_workspace_returns_all_projects_for_workspace_owner(): def test_scoped_workspace_returns_all_projects_for_workspace_owner(
workspace = Workspaces.create(RequestFactory.create()) workspace, workspace_owner
owner = workspace.owner ):
for _ in range(5): for _ in range(5):
Projects.create( Projects.create(
owner, workspace, "My Project", "My project", ["dev", "staging", "prod"] workspace_owner,
workspace,
"My Project",
"My project",
["dev", "staging", "prod"],
) )
scoped_workspace = Workspaces.get(owner, workspace.id) scoped_workspace = Workspaces.get(workspace_owner, workspace.id)
assert len(scoped_workspace.projects) == 5 assert len(scoped_workspace.projects) == 5
assert len(scoped_workspace.projects[0].environments) == 3 assert len(scoped_workspace.projects[0].environments) == 3
def test_for_user_workspace_member(): def test_for_user_returns_assigned_workspaces_for_user(workspace, workspace_owner):
bob = UserFactory.from_atat_role("default") bob = UserFactory.from_atat_role("default")
workspace = Workspaces.create(RequestFactory.create())
Workspaces.add_member(workspace, bob, "developer") Workspaces.add_member(workspace, bob, "developer")
Workspaces.create(RequestFactory.create()) Workspaces.create(RequestFactory.create())
bobs_workspaces = Workspaces.for_user(bob) bobs_workspaces = Workspaces.for_user(bob)
assert len(bobs_workspaces) == 1 assert len(bobs_workspaces) == 1
def test_for_user_ccpo(): def test_for_user_returns_all_workspaces_for_ccpo(workspace, workspace_owner):
sam = UserFactory.from_atat_role("ccpo") sam = UserFactory.from_atat_role("ccpo")
workspace = Workspaces.create(RequestFactory.create())
Workspaces.create(RequestFactory.create()) Workspaces.create(RequestFactory.create())
sams_workspaces = Workspaces.for_user(sam) sams_workspaces = Workspaces.for_user(sam)

View File

@ -4,7 +4,8 @@ from tests.factories import (
RequestStatusEventFactory, RequestStatusEventFactory,
RequestReviewFactory, RequestReviewFactory,
) )
from atst.domain.requests import Requests, RequestStatus from atst.domain.requests import Requests
from atst.models.request_status_event import RequestStatus
def test_pending_financial_requires_mo_action(): def test_pending_financial_requires_mo_action():