Merge pull request #532 from dod-ccpo/to-access-control

Task Order access control
This commit is contained in:
dandds 2019-01-10 10:56:22 -05:00 committed by GitHub
commit 66ada94dcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 148 additions and 55 deletions

View File

@ -33,3 +33,24 @@ class Authorization(object):
@classmethod @classmethod
def is_ccpo(cls, user): def is_ccpo(cls, user):
return user.atat_role.name == "ccpo" return user.atat_role.name == "ccpo"
@classmethod
def check_task_order_permission(cls, user, task_order, permission, message):
if Authorization._check_is_task_order_officer(task_order, user):
return True
Authorization.check_workspace_permission(
user, task_order.workspace, permission, message
)
@classmethod
def _check_is_task_order_officer(cls, task_order, user):
for officer in [
"contracting_officer",
"contracting_officer_representative",
"security_officer",
]:
if getattr(task_order, officer, None) == user:
return True
return False

View File

@ -86,6 +86,9 @@ WORKSPACE_ROLES = [
Permissions.VIEW_ENVIRONMENT_IN_APPLICATION, Permissions.VIEW_ENVIRONMENT_IN_APPLICATION,
Permissions.RENAME_ENVIRONMENT_IN_APPLICATION, Permissions.RENAME_ENVIRONMENT_IN_APPLICATION,
Permissions.VIEW_WORKSPACE_AUDIT_LOG, Permissions.VIEW_WORKSPACE_AUDIT_LOG,
Permissions.VIEW_TASK_ORDER,
Permissions.UPDATE_TASK_ORDER,
Permissions.ADD_TASK_ORDER_OFFICER,
], ],
}, },
{ {
@ -114,6 +117,9 @@ WORKSPACE_ROLES = [
Permissions.VIEW_ENVIRONMENT_IN_APPLICATION, Permissions.VIEW_ENVIRONMENT_IN_APPLICATION,
Permissions.RENAME_ENVIRONMENT_IN_APPLICATION, Permissions.RENAME_ENVIRONMENT_IN_APPLICATION,
Permissions.VIEW_WORKSPACE_AUDIT_LOG, Permissions.VIEW_WORKSPACE_AUDIT_LOG,
Permissions.VIEW_TASK_ORDER,
Permissions.UPDATE_TASK_ORDER,
Permissions.ADD_TASK_ORDER_OFFICER,
], ],
}, },
{ {

View File

@ -2,7 +2,9 @@ from sqlalchemy.orm.exc import NoResultFound
from atst.database import db from atst.database import db
from atst.models.task_order import TaskOrder from atst.models.task_order import TaskOrder
from atst.models.permissions import Permissions
from atst.domain.workspaces import Workspaces from atst.domain.workspaces import Workspaces
from atst.domain.authz import Authorization
from .exceptions import NotFoundError from .exceptions import NotFoundError
@ -46,27 +48,35 @@ class TaskOrders(object):
} }
@classmethod @classmethod
def get(cls, task_order_id): def get(cls, user, task_order_id):
try: try:
task_order = db.session.query(TaskOrder).filter_by(id=task_order_id).one() task_order = db.session.query(TaskOrder).filter_by(id=task_order_id).one()
Authorization.check_task_order_permission(
user, task_order, Permissions.VIEW_TASK_ORDER, "view task order"
)
return task_order return task_order
except NoResultFound: except NoResultFound:
raise NotFoundError("task_order") raise NotFoundError("task_order")
@classmethod @classmethod
def create(cls, workspace, creator, commit=False): def create(cls, creator, workspace):
Authorization.check_workspace_permission(
creator, workspace, Permissions.UPDATE_TASK_ORDER, "add task order"
)
task_order = TaskOrder(workspace=workspace, creator=creator) task_order = TaskOrder(workspace=workspace, creator=creator)
db.session.add(task_order) db.session.add(task_order)
if commit:
db.session.commit() db.session.commit()
return task_order return task_order
@classmethod @classmethod
def update(cls, task_order, **kwargs): def update(cls, user, task_order, **kwargs):
Authorization.check_task_order_permission(
user, task_order, Permissions.UPDATE_TASK_ORDER, "update task order"
)
for key, value in kwargs.items(): for key, value in kwargs.items():
setattr(task_order, key, value) setattr(task_order, key, value)
@ -103,6 +113,13 @@ class TaskOrders(object):
@classmethod @classmethod
def add_officer(cls, user, task_order, officer_type, officer_data): def add_officer(cls, user, task_order, officer_type, officer_data):
Authorization.check_workspace_permission(
user,
task_order.workspace,
Permissions.ADD_TASK_ORDER_OFFICER,
"add task order officer",
)
if officer_type in TaskOrders.OFFICERS: if officer_type in TaskOrders.OFFICERS:
workspace = task_order.workspace workspace = task_order.workspace

View File

@ -43,3 +43,7 @@ class Permissions(object):
ADD_TAG_TO_WORKSPACE = "add_tag_to_workspace" ADD_TAG_TO_WORKSPACE = "add_tag_to_workspace"
REMOVE_TAG_FROM_WORKSPACE = "remove_tag_from_workspace" REMOVE_TAG_FROM_WORKSPACE = "remove_tag_from_workspace"
VIEW_TASK_ORDER = "view_task_order"
UPDATE_TASK_ORDER = "update_task_order"
ADD_TASK_ORDER_OFFICER = "add_task_order_officers"

View File

@ -1,5 +1,5 @@
from io import BytesIO from io import BytesIO
from flask import Response from flask import g, Response
from . import task_orders_bp from . import task_orders_bp
from atst.domain.task_orders import TaskOrders from atst.domain.task_orders import TaskOrders
@ -8,7 +8,7 @@ from atst.utils.docx import Docx
@task_orders_bp.route("/task_orders/download_summary/<task_order_id>") @task_orders_bp.route("/task_orders/download_summary/<task_order_id>")
def download_summary(task_order_id): def download_summary(task_order_id):
task_order = TaskOrders.get(task_order_id) task_order = TaskOrders.get(g.current_user, task_order_id)
byte_str = BytesIO() byte_str = BytesIO()
Docx.render(byte_str, data=task_order.to_dictionary()) Docx.render(byte_str, data=task_order.to_dictionary())
filename = "{}.docx".format(task_order.portfolio_name) filename = "{}.docx".format(task_order.portfolio_name)

View File

@ -47,7 +47,8 @@ TASK_ORDER_SECTIONS = [
class ShowTaskOrderWorkflow: class ShowTaskOrderWorkflow:
def __init__(self, screen=1, task_order_id=None): def __init__(self, user, screen=1, task_order_id=None):
self.user = user
self.screen = screen self.screen = screen
self.task_order_id = task_order_id self.task_order_id = task_order_id
self._section = TASK_ORDER_SECTIONS[screen - 1] self._section = TASK_ORDER_SECTIONS[screen - 1]
@ -57,7 +58,7 @@ class ShowTaskOrderWorkflow:
@property @property
def task_order(self): def task_order(self):
if not self._task_order and self.task_order_id: if not self._task_order and self.task_order_id:
self._task_order = TaskOrders.get(self.task_order_id) self._task_order = TaskOrders.get(self.user, self.task_order_id)
return self._task_order return self._task_order
@ -99,9 +100,9 @@ class ShowTaskOrderWorkflow:
class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow): class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow):
def __init__(self, form_data, user, screen=1, task_order_id=None): def __init__(self, user, form_data, screen=1, task_order_id=None):
self.form_data = form_data
self.user = user self.user = user
self.form_data = form_data
self.screen = screen self.screen = screen
self.task_order_id = task_order_id self.task_order_id = task_order_id
self._task_order = None self._task_order = None
@ -122,13 +123,13 @@ class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow):
def _update_task_order(self): def _update_task_order(self):
if self.task_order: if self.task_order:
TaskOrders.update(self.task_order, **self.form.data) TaskOrders.update(self.user, self.task_order, **self.form.data)
else: else:
ws = Workspaces.create(self.user, self.form.portfolio_name.data) ws = Workspaces.create(self.user, self.form.portfolio_name.data)
to_data = self.form.data.copy() to_data = self.form.data.copy()
to_data.pop("portfolio_name") to_data.pop("portfolio_name")
self._task_order = TaskOrders.create(workspace=ws, creator=self.user) self._task_order = TaskOrders.create(self.user, ws)
TaskOrders.update(self.task_order, **to_data) TaskOrders.update(self.user, self.task_order, **to_data)
OFFICER_INVITATIONS = [ OFFICER_INVITATIONS = [
{ {
@ -189,7 +190,7 @@ class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow):
@task_orders_bp.route("/task_orders/new/<int:screen>") @task_orders_bp.route("/task_orders/new/<int:screen>")
@task_orders_bp.route("/task_orders/new/<int:screen>/<task_order_id>") @task_orders_bp.route("/task_orders/new/<int:screen>/<task_order_id>")
def new(screen, task_order_id=None): def new(screen, task_order_id=None):
workflow = ShowTaskOrderWorkflow(screen, task_order_id) workflow = ShowTaskOrderWorkflow(g.current_user, screen, task_order_id)
return render_template( return render_template(
workflow.template, workflow.template,
current=screen, current=screen,
@ -204,7 +205,7 @@ def new(screen, task_order_id=None):
@task_orders_bp.route("/task_orders/new/<int:screen>/<task_order_id>", methods=["POST"]) @task_orders_bp.route("/task_orders/new/<int:screen>/<task_order_id>", methods=["POST"])
def update(screen, task_order_id=None): def update(screen, task_order_id=None):
workflow = UpdateTaskOrderWorkflow( workflow = UpdateTaskOrderWorkflow(
http_request.form, g.current_user, screen, task_order_id g.current_user, http_request.form, screen, task_order_id
) )
if workflow.validate(): if workflow.validate():

View File

@ -1,8 +1,14 @@
import pytest import pytest
from atst.domain.task_orders import TaskOrders, TaskOrderError from atst.domain.task_orders import TaskOrders, TaskOrderError
from atst.domain.exceptions import UnauthorizedError
from tests.factories import TaskOrderFactory, UserFactory from tests.factories import (
TaskOrderFactory,
UserFactory,
WorkspaceRoleFactory,
WorkspaceFactory,
)
def test_is_section_complete(): def test_is_section_complete():
@ -60,3 +66,37 @@ def test_add_officer_who_is_already_workspace_member():
assert task_order.contracting_officer == owner assert task_order.contracting_officer == owner
member = task_order.workspace.members[0] member = task_order.workspace.members[0]
assert member.user == owner and member.role_name == "owner" assert member.user == owner and member.role_name == "owner"
def test_task_order_access():
creator = UserFactory.create()
member = UserFactory.create()
rando = UserFactory.create()
officer = UserFactory.create()
def check_access(can, cannot, method_name, method_args):
method = getattr(TaskOrders, method_name)
for user in can:
assert method(user, *method_args)
for user in cannot:
with pytest.raises(UnauthorizedError):
method(user, *method_args)
workspace = WorkspaceFactory.create(owner=creator)
task_order = TaskOrderFactory.create(creator=creator, workspace=workspace)
WorkspaceRoleFactory.create(user=member, workspace=task_order.workspace)
TaskOrders.add_officer(
creator, task_order, "contracting_officer", officer.to_dictionary()
)
check_access([creator, officer], [member, rando], "get", [task_order.id])
check_access([creator], [officer, member, rando], "create", [workspace])
check_access([creator, officer], [member, rando], "update", [task_order])
check_access(
[creator],
[officer, member, rando],
"add_officer",
[task_order, "contracting_officer", rando.to_dictionary()],
)

View File

@ -5,7 +5,7 @@ from zipfile import ZipFile
from atst.utils.docx import Docx from atst.utils.docx import Docx
from tests.factories import TaskOrderFactory from tests.factories import TaskOrderFactory, WorkspaceFactory, UserFactory
def xml_translated(val): def xml_translated(val):
@ -13,8 +13,10 @@ def xml_translated(val):
def test_download_summary(client, user_session): def test_download_summary(client, user_session):
user_session() user = UserFactory.create()
task_order = TaskOrderFactory.create() workspace = WorkspaceFactory.create(owner=user)
task_order = TaskOrderFactory.create(creator=user, workspace=workspace)
user_session(user)
response = client.get( response = client.get(
url_for("task_orders.download_summary", task_order_id=task_order.id) url_for("task_orders.download_summary", task_order_id=task_order.id)
) )

View File

@ -92,35 +92,48 @@ def test_task_order_form_shows_errors(client, user_session):
assert "Not a valid integer" in body assert "Not a valid integer" in body
def test_show_task_order(): @pytest.fixture
workflow = ShowTaskOrderWorkflow() def task_order():
user = UserFactory.create()
workspace = WorkspaceFactory.create(owner=user)
return TaskOrderFactory.create(creator=user, workspace=workspace)
def test_show_task_order(task_order):
workflow = ShowTaskOrderWorkflow(task_order.creator)
assert workflow.task_order is None assert workflow.task_order is None
task_order = TaskOrderFactory.create() another_workflow = ShowTaskOrderWorkflow(
another_workflow = ShowTaskOrderWorkflow(task_order_id=task_order.id) task_order.creator, task_order_id=task_order.id
)
assert another_workflow.task_order == task_order assert another_workflow.task_order == task_order
def test_show_task_order_form_list_data(): def test_show_task_order_form_list_data():
complexity = ["oconus", "tactical_edge"] complexity = ["oconus", "tactical_edge"]
task_order = TaskOrderFactory.create(complexity=complexity) user = UserFactory.create()
workflow = ShowTaskOrderWorkflow(task_order_id=task_order.id) workspace = WorkspaceFactory.create(owner=user)
task_order = TaskOrderFactory.create(
creator=user, workspace=workspace, complexity=complexity
)
workflow = ShowTaskOrderWorkflow(user, task_order_id=task_order.id)
assert workflow.form.complexity.data == complexity assert workflow.form.complexity.data == complexity
def test_show_task_order_form(): def test_show_task_order_form(task_order):
workflow = ShowTaskOrderWorkflow() workflow = ShowTaskOrderWorkflow(task_order.creator)
assert not workflow.form.data["app_migration"] assert not workflow.form.data["app_migration"]
task_order = TaskOrderFactory.create() another_workflow = ShowTaskOrderWorkflow(
another_workflow = ShowTaskOrderWorkflow(task_order_id=task_order.id) task_order.creator, task_order_id=task_order.id
)
assert ( assert (
another_workflow.form.data["defense_component"] == task_order.defense_component another_workflow.form.data["defense_component"] == task_order.defense_component
) )
def test_show_task_order_display_screen(): def test_show_task_order_display_screen(task_order):
task_order = TaskOrderFactory.create() workflow = ShowTaskOrderWorkflow(task_order.creator, task_order_id=task_order.id)
workflow = ShowTaskOrderWorkflow(task_order_id=task_order.id)
screens = workflow.display_screens screens = workflow.display_screens
# every form section is complete # every form section is complete
for i in range(2): for i in range(2):
@ -132,29 +145,24 @@ def test_show_task_order_display_screen():
def test_update_task_order_with_no_task_order(): def test_update_task_order_with_no_task_order():
user = UserFactory.create() user = UserFactory.create()
to_data = TaskOrderFactory.dictionary() to_data = TaskOrderFactory.dictionary()
workflow = UpdateTaskOrderWorkflow(to_data, user) workflow = UpdateTaskOrderWorkflow(user, to_data)
assert workflow.task_order is None assert workflow.task_order is None
workflow.update() workflow.update()
assert workflow.task_order assert workflow.task_order
assert workflow.task_order.scope == to_data["scope"] assert workflow.task_order.scope == to_data["scope"]
def test_update_task_order_with_existing_task_order(): def test_update_task_order_with_existing_task_order(task_order):
user = UserFactory.create()
task_order = TaskOrderFactory.create()
to_data = serialize_dates(TaskOrderFactory.dictionary()) to_data = serialize_dates(TaskOrderFactory.dictionary())
workflow = UpdateTaskOrderWorkflow( workflow = UpdateTaskOrderWorkflow(
to_data, user, screen=2, task_order_id=task_order.id task_order.creator, to_data, screen=2, task_order_id=task_order.id
) )
assert workflow.task_order.start_date != to_data["start_date"] assert workflow.task_order.start_date != to_data["start_date"]
workflow.update() workflow.update()
assert workflow.task_order.start_date.strftime("%m/%d/%Y") == to_data["start_date"] assert workflow.task_order.start_date.strftime("%m/%d/%Y") == to_data["start_date"]
def test_invite_officers_to_task_order(queue): def test_invite_officers_to_task_order(task_order, queue):
user = UserFactory.create()
workspace = WorkspaceFactory.create(owner=user)
task_order = TaskOrderFactory.create(creator=user, workspace=workspace)
to_data = { to_data = {
**TaskOrderFactory.dictionary(), **TaskOrderFactory.dictionary(),
"ko_invite": True, "ko_invite": True,
@ -162,7 +170,7 @@ def test_invite_officers_to_task_order(queue):
"so_invite": True, "so_invite": True,
} }
workflow = UpdateTaskOrderWorkflow( workflow = UpdateTaskOrderWorkflow(
to_data, user, screen=3, task_order_id=task_order.id task_order.creator, to_data, screen=3, task_order_id=task_order.id
) )
workflow.update() workflow.update()
workspace = task_order.workspace workspace = task_order.workspace
@ -179,10 +187,7 @@ def test_invite_officers_to_task_order(queue):
assert task_order.security_officer.dod_id == to_data["so_dod_id"] assert task_order.security_officer.dod_id == to_data["so_dod_id"]
def test_add_officer_but_do_not_invite(queue): def test_add_officer_but_do_not_invite(task_order, queue):
user = UserFactory.create()
workspace = WorkspaceFactory.create(owner=user)
task_order = TaskOrderFactory.create(creator=user, workspace=workspace)
to_data = { to_data = {
**TaskOrderFactory.dictionary(), **TaskOrderFactory.dictionary(),
"ko_invite": False, "ko_invite": False,
@ -190,7 +195,7 @@ def test_add_officer_but_do_not_invite(queue):
"so_invite": False, "so_invite": False,
} }
workflow = UpdateTaskOrderWorkflow( workflow = UpdateTaskOrderWorkflow(
to_data, user, screen=3, task_order_id=task_order.id task_order.creator, to_data, screen=3, task_order_id=task_order.id
) )
workflow.update() workflow.update()
workspace = task_order.workspace workspace = task_order.workspace
@ -213,18 +218,15 @@ def test_update_does_not_resend_invitation():
) )
to_data = {**task_order.to_dictionary(), "ko_invite": True} to_data = {**task_order.to_dictionary(), "ko_invite": True}
workflow = UpdateTaskOrderWorkflow( workflow = UpdateTaskOrderWorkflow(
to_data, user, screen=3, task_order_id=task_order.id user, to_data, screen=3, task_order_id=task_order.id
) )
for i in range(2): for i in range(2):
workflow.update() workflow.update()
assert len(contracting_officer.invitations) == 1 assert len(contracting_officer.invitations) == 1
def test_review_task_order_form(client, user_session): def test_review_task_order_form(client, user_session, task_order):
creator = UserFactory.create() user_session(task_order.creator)
user_session(creator)
task_order = TaskOrderFactory.create(creator=creator)
for idx, section in enumerate(TaskOrders.SECTIONS): for idx, section in enumerate(TaskOrders.SECTIONS):
response = client.get( response = client.get(