diff --git a/atst/domain/authz.py b/atst/domain/authz.py index 4c2db2ed..ab596154 100644 --- a/atst/domain/authz.py +++ b/atst/domain/authz.py @@ -33,3 +33,24 @@ class Authorization(object): @classmethod def is_ccpo(cls, user): return user.atat_role.name == "ccpo" + + @classmethod + def check_task_order_permission(cls, user, task_order, permission, message): + if Authorization._check_is_task_order_officer(task_order, user): + return True + + Authorization.check_workspace_permission( + user, task_order.workspace, permission, message + ) + + @classmethod + def _check_is_task_order_officer(cls, task_order, user): + for officer in [ + "contracting_officer", + "contracting_officer_representative", + "security_officer", + ]: + if getattr(task_order, officer, None) == user: + return True + + return False diff --git a/atst/domain/roles.py b/atst/domain/roles.py index 5f7e1ebb..469126a8 100644 --- a/atst/domain/roles.py +++ b/atst/domain/roles.py @@ -86,6 +86,9 @@ WORKSPACE_ROLES = [ Permissions.VIEW_ENVIRONMENT_IN_APPLICATION, Permissions.RENAME_ENVIRONMENT_IN_APPLICATION, Permissions.VIEW_WORKSPACE_AUDIT_LOG, + Permissions.VIEW_TASK_ORDER, + Permissions.UPDATE_TASK_ORDER, + Permissions.ADD_TASK_ORDER_OFFICER, ], }, { @@ -114,6 +117,9 @@ WORKSPACE_ROLES = [ Permissions.VIEW_ENVIRONMENT_IN_APPLICATION, Permissions.RENAME_ENVIRONMENT_IN_APPLICATION, Permissions.VIEW_WORKSPACE_AUDIT_LOG, + Permissions.VIEW_TASK_ORDER, + Permissions.UPDATE_TASK_ORDER, + Permissions.ADD_TASK_ORDER_OFFICER, ], }, { diff --git a/atst/domain/task_orders.py b/atst/domain/task_orders.py index 9acd1107..865b5b05 100644 --- a/atst/domain/task_orders.py +++ b/atst/domain/task_orders.py @@ -2,7 +2,9 @@ from sqlalchemy.orm.exc import NoResultFound from atst.database import db from atst.models.task_order import TaskOrder +from atst.models.permissions import Permissions from atst.domain.workspaces import Workspaces +from atst.domain.authz import Authorization from .exceptions import NotFoundError @@ -46,27 +48,35 @@ class TaskOrders(object): } @classmethod - def get(cls, task_order_id): + def get(cls, user, task_order_id): try: task_order = db.session.query(TaskOrder).filter_by(id=task_order_id).one() + Authorization.check_task_order_permission( + user, task_order, Permissions.VIEW_TASK_ORDER, "view task order" + ) return task_order except NoResultFound: raise NotFoundError("task_order") @classmethod - def create(cls, workspace, creator, commit=False): + def create(cls, creator, workspace): + Authorization.check_workspace_permission( + creator, workspace, Permissions.UPDATE_TASK_ORDER, "add task order" + ) task_order = TaskOrder(workspace=workspace, creator=creator) db.session.add(task_order) - - if commit: - db.session.commit() + db.session.commit() return task_order @classmethod - def update(cls, task_order, **kwargs): + def update(cls, user, task_order, **kwargs): + Authorization.check_task_order_permission( + user, task_order, Permissions.UPDATE_TASK_ORDER, "update task order" + ) + for key, value in kwargs.items(): setattr(task_order, key, value) @@ -103,6 +113,13 @@ class TaskOrders(object): @classmethod def add_officer(cls, user, task_order, officer_type, officer_data): + Authorization.check_workspace_permission( + user, + task_order.workspace, + Permissions.ADD_TASK_ORDER_OFFICER, + "add task order officer", + ) + if officer_type in TaskOrders.OFFICERS: workspace = task_order.workspace diff --git a/atst/models/permissions.py b/atst/models/permissions.py index 231d65a2..15d103c5 100644 --- a/atst/models/permissions.py +++ b/atst/models/permissions.py @@ -43,3 +43,7 @@ class Permissions(object): ADD_TAG_TO_WORKSPACE = "add_tag_to_workspace" REMOVE_TAG_FROM_WORKSPACE = "remove_tag_from_workspace" + + VIEW_TASK_ORDER = "view_task_order" + UPDATE_TASK_ORDER = "update_task_order" + ADD_TASK_ORDER_OFFICER = "add_task_order_officers" diff --git a/atst/routes/task_orders/index.py b/atst/routes/task_orders/index.py index 97364476..74393e75 100644 --- a/atst/routes/task_orders/index.py +++ b/atst/routes/task_orders/index.py @@ -1,5 +1,5 @@ from io import BytesIO -from flask import Response +from flask import g, Response from . import task_orders_bp from atst.domain.task_orders import TaskOrders @@ -8,7 +8,7 @@ from atst.utils.docx import Docx @task_orders_bp.route("/task_orders/download_summary/") def download_summary(task_order_id): - task_order = TaskOrders.get(task_order_id) + task_order = TaskOrders.get(g.current_user, task_order_id) byte_str = BytesIO() Docx.render(byte_str, data=task_order.to_dictionary()) filename = "{}.docx".format(task_order.portfolio_name) diff --git a/atst/routes/task_orders/new.py b/atst/routes/task_orders/new.py index 08f89690..96c1eb77 100644 --- a/atst/routes/task_orders/new.py +++ b/atst/routes/task_orders/new.py @@ -47,7 +47,8 @@ TASK_ORDER_SECTIONS = [ class ShowTaskOrderWorkflow: - def __init__(self, screen=1, task_order_id=None): + def __init__(self, user, screen=1, task_order_id=None): + self.user = user self.screen = screen self.task_order_id = task_order_id self._section = TASK_ORDER_SECTIONS[screen - 1] @@ -57,7 +58,7 @@ class ShowTaskOrderWorkflow: @property def task_order(self): if not self._task_order and self.task_order_id: - self._task_order = TaskOrders.get(self.task_order_id) + self._task_order = TaskOrders.get(self.user, self.task_order_id) return self._task_order @@ -122,13 +123,13 @@ class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow): def _update_task_order(self): if self.task_order: - TaskOrders.update(self.task_order, **self.form.data) + TaskOrders.update(self.user, self.task_order, **self.form.data) else: ws = Workspaces.create(self.user, self.form.portfolio_name.data) to_data = self.form.data.copy() to_data.pop("portfolio_name") - self._task_order = TaskOrders.create(workspace=ws, creator=self.user) - TaskOrders.update(self.task_order, **to_data) + self._task_order = TaskOrders.create(self.user, ws) + TaskOrders.update(self.user, self.task_order, **to_data) OFFICER_INVITATIONS = [ { @@ -189,7 +190,7 @@ class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow): @task_orders_bp.route("/task_orders/new/") @task_orders_bp.route("/task_orders/new//") def new(screen, task_order_id=None): - workflow = ShowTaskOrderWorkflow(screen, task_order_id) + workflow = ShowTaskOrderWorkflow(g.current_user, screen, task_order_id) return render_template( workflow.template, current=screen, diff --git a/tests/domain/test_task_orders.py b/tests/domain/test_task_orders.py index 8966d59b..ab485586 100644 --- a/tests/domain/test_task_orders.py +++ b/tests/domain/test_task_orders.py @@ -1,8 +1,14 @@ import pytest from atst.domain.task_orders import TaskOrders, TaskOrderError +from atst.domain.exceptions import UnauthorizedError -from tests.factories import TaskOrderFactory, UserFactory +from tests.factories import ( + TaskOrderFactory, + UserFactory, + WorkspaceRoleFactory, + WorkspaceFactory, +) def test_is_section_complete(): @@ -60,3 +66,37 @@ def test_add_officer_who_is_already_workspace_member(): assert task_order.contracting_officer == owner member = task_order.workspace.members[0] assert member.user == owner and member.role_name == "owner" + + +def test_task_order_access(): + creator = UserFactory.create() + member = UserFactory.create() + rando = UserFactory.create() + officer = UserFactory.create() + + def check_access(can, cannot, method_name, method_args): + method = getattr(TaskOrders, method_name) + + for user in can: + assert method(user, *method_args) + + for user in cannot: + with pytest.raises(UnauthorizedError): + method(user, *method_args) + + workspace = WorkspaceFactory.create(owner=creator) + task_order = TaskOrderFactory.create(creator=creator, workspace=workspace) + WorkspaceRoleFactory.create(user=member, workspace=task_order.workspace) + TaskOrders.add_officer( + creator, task_order, "contracting_officer", officer.to_dictionary() + ) + + check_access([creator, officer], [member, rando], "get", [task_order.id]) + check_access([creator], [officer, member, rando], "create", [workspace]) + check_access([creator, officer], [member, rando], "update", [task_order]) + check_access( + [creator], + [officer, member, rando], + "add_officer", + [task_order, "contracting_officer", rando.to_dictionary()], + ) diff --git a/tests/routes/task_orders/test_index.py b/tests/routes/task_orders/test_index.py index a28981da..f5d81938 100644 --- a/tests/routes/task_orders/test_index.py +++ b/tests/routes/task_orders/test_index.py @@ -5,7 +5,7 @@ from zipfile import ZipFile from atst.utils.docx import Docx -from tests.factories import TaskOrderFactory +from tests.factories import TaskOrderFactory, WorkspaceFactory, UserFactory def xml_translated(val): @@ -13,8 +13,10 @@ def xml_translated(val): def test_download_summary(client, user_session): - user_session() - task_order = TaskOrderFactory.create() + user = UserFactory.create() + workspace = WorkspaceFactory.create(owner=user) + task_order = TaskOrderFactory.create(creator=user, workspace=workspace) + user_session(user) response = client.get( url_for("task_orders.download_summary", task_order_id=task_order.id) ) diff --git a/tests/routes/task_orders/test_new_task_order.py b/tests/routes/task_orders/test_new_task_order.py index a59f1341..2287566f 100644 --- a/tests/routes/task_orders/test_new_task_order.py +++ b/tests/routes/task_orders/test_new_task_order.py @@ -92,11 +92,20 @@ def test_task_order_form_shows_errors(client, user_session): assert "Not a valid integer" in body -def test_show_task_order(): - workflow = ShowTaskOrderWorkflow() +@pytest.fixture +def task_order(): + user = UserFactory.create() + workspace = WorkspaceFactory.create(owner=user) + + return TaskOrderFactory.create(creator=user, workspace=workspace) + + +def test_show_task_order(task_order): + workflow = ShowTaskOrderWorkflow(task_order.creator) assert workflow.task_order is None - task_order = TaskOrderFactory.create() - another_workflow = ShowTaskOrderWorkflow(task_order_id=task_order.id) + another_workflow = ShowTaskOrderWorkflow( + task_order.creator, task_order_id=task_order.id + ) assert another_workflow.task_order == task_order @@ -108,19 +117,19 @@ def test_show_task_order_form_list_data(): assert workflow.form.complexity.data == complexity -def test_show_task_order_form(): - workflow = ShowTaskOrderWorkflow() +def test_show_task_order_form(task_order): + workflow = ShowTaskOrderWorkflow(task_order.creator) assert not workflow.form.data["app_migration"] - task_order = TaskOrderFactory.create() - another_workflow = ShowTaskOrderWorkflow(task_order_id=task_order.id) + another_workflow = ShowTaskOrderWorkflow( + task_order.creator, task_order_id=task_order.id + ) assert ( another_workflow.form.data["defense_component"] == task_order.defense_component ) -def test_show_task_order_display_screen(): - task_order = TaskOrderFactory.create() - workflow = ShowTaskOrderWorkflow(task_order_id=task_order.id) +def test_show_task_order_display_screen(task_order): + workflow = ShowTaskOrderWorkflow(task_order.creator, task_order_id=task_order.id) screens = workflow.display_screens # every form section is complete for i in range(2): @@ -139,22 +148,17 @@ def test_update_task_order_with_no_task_order(): assert workflow.task_order.scope == to_data["scope"] -def test_update_task_order_with_existing_task_order(): - user = UserFactory.create() - task_order = TaskOrderFactory.create() +def test_update_task_order_with_existing_task_order(task_order): to_data = serialize_dates(TaskOrderFactory.dictionary()) workflow = UpdateTaskOrderWorkflow( - to_data, user, screen=2, task_order_id=task_order.id + to_data, task_order.creator, screen=2, task_order_id=task_order.id ) assert workflow.task_order.start_date != to_data["start_date"] workflow.update() assert workflow.task_order.start_date.strftime("%m/%d/%Y") == to_data["start_date"] -def test_invite_officers_to_task_order(queue): - user = UserFactory.create() - workspace = WorkspaceFactory.create(owner=user) - task_order = TaskOrderFactory.create(creator=user, workspace=workspace) +def test_invite_officers_to_task_order(task_order, queue): to_data = { **TaskOrderFactory.dictionary(), "ko_invite": True, @@ -162,7 +166,7 @@ def test_invite_officers_to_task_order(queue): "so_invite": True, } workflow = UpdateTaskOrderWorkflow( - to_data, user, screen=3, task_order_id=task_order.id + to_data, task_order.creator, screen=3, task_order_id=task_order.id ) workflow.update() workspace = task_order.workspace @@ -179,10 +183,7 @@ def test_invite_officers_to_task_order(queue): assert task_order.security_officer.dod_id == to_data["so_dod_id"] -def test_add_officer_but_do_not_invite(queue): - user = UserFactory.create() - workspace = WorkspaceFactory.create(owner=user) - task_order = TaskOrderFactory.create(creator=user, workspace=workspace) +def test_add_officer_but_do_not_invite(task_order, queue): to_data = { **TaskOrderFactory.dictionary(), "ko_invite": False, @@ -190,7 +191,7 @@ def test_add_officer_but_do_not_invite(queue): "so_invite": False, } workflow = UpdateTaskOrderWorkflow( - to_data, user, screen=3, task_order_id=task_order.id + to_data, task_order.creator, screen=3, task_order_id=task_order.id ) workflow.update() workspace = task_order.workspace