authorization checks for task orders
This commit is contained in:
parent
a3bac44371
commit
ccf1ff2024
@ -33,3 +33,24 @@ class Authorization(object):
|
||||
@classmethod
|
||||
def is_ccpo(cls, user):
|
||||
return user.atat_role.name == "ccpo"
|
||||
|
||||
@classmethod
|
||||
def check_task_order_permission(cls, user, task_order, permission, message):
|
||||
if Authorization._check_is_task_order_officer(task_order, user):
|
||||
return True
|
||||
|
||||
Authorization.check_workspace_permission(
|
||||
user, task_order.workspace, permission, message
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _check_is_task_order_officer(cls, task_order, user):
|
||||
for officer in [
|
||||
"contracting_officer",
|
||||
"contracting_officer_representative",
|
||||
"security_officer",
|
||||
]:
|
||||
if getattr(task_order, officer, None) == user:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
@ -86,6 +86,9 @@ WORKSPACE_ROLES = [
|
||||
Permissions.VIEW_ENVIRONMENT_IN_APPLICATION,
|
||||
Permissions.RENAME_ENVIRONMENT_IN_APPLICATION,
|
||||
Permissions.VIEW_WORKSPACE_AUDIT_LOG,
|
||||
Permissions.VIEW_TASK_ORDER,
|
||||
Permissions.UPDATE_TASK_ORDER,
|
||||
Permissions.ADD_TASK_ORDER_OFFICER,
|
||||
],
|
||||
},
|
||||
{
|
||||
@ -114,6 +117,9 @@ WORKSPACE_ROLES = [
|
||||
Permissions.VIEW_ENVIRONMENT_IN_APPLICATION,
|
||||
Permissions.RENAME_ENVIRONMENT_IN_APPLICATION,
|
||||
Permissions.VIEW_WORKSPACE_AUDIT_LOG,
|
||||
Permissions.VIEW_TASK_ORDER,
|
||||
Permissions.UPDATE_TASK_ORDER,
|
||||
Permissions.ADD_TASK_ORDER_OFFICER,
|
||||
],
|
||||
},
|
||||
{
|
||||
|
@ -2,7 +2,9 @@ from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from atst.database import db
|
||||
from atst.models.task_order import TaskOrder
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.domain.workspaces import Workspaces
|
||||
from atst.domain.authz import Authorization
|
||||
from .exceptions import NotFoundError
|
||||
|
||||
|
||||
@ -46,27 +48,35 @@ class TaskOrders(object):
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get(cls, task_order_id):
|
||||
def get(cls, user, task_order_id):
|
||||
try:
|
||||
task_order = db.session.query(TaskOrder).filter_by(id=task_order_id).one()
|
||||
Authorization.check_task_order_permission(
|
||||
user, task_order, Permissions.VIEW_TASK_ORDER, "view task order"
|
||||
)
|
||||
|
||||
return task_order
|
||||
except NoResultFound:
|
||||
raise NotFoundError("task_order")
|
||||
|
||||
@classmethod
|
||||
def create(cls, workspace, creator, commit=False):
|
||||
def create(cls, creator, workspace):
|
||||
Authorization.check_workspace_permission(
|
||||
creator, workspace, Permissions.UPDATE_TASK_ORDER, "add task order"
|
||||
)
|
||||
task_order = TaskOrder(workspace=workspace, creator=creator)
|
||||
|
||||
db.session.add(task_order)
|
||||
|
||||
if commit:
|
||||
db.session.commit()
|
||||
|
||||
return task_order
|
||||
|
||||
@classmethod
|
||||
def update(cls, task_order, **kwargs):
|
||||
def update(cls, user, task_order, **kwargs):
|
||||
Authorization.check_task_order_permission(
|
||||
user, task_order, Permissions.UPDATE_TASK_ORDER, "update task order"
|
||||
)
|
||||
|
||||
for key, value in kwargs.items():
|
||||
setattr(task_order, key, value)
|
||||
|
||||
@ -103,6 +113,13 @@ class TaskOrders(object):
|
||||
|
||||
@classmethod
|
||||
def add_officer(cls, user, task_order, officer_type, officer_data):
|
||||
Authorization.check_workspace_permission(
|
||||
user,
|
||||
task_order.workspace,
|
||||
Permissions.ADD_TASK_ORDER_OFFICER,
|
||||
"add task order officer",
|
||||
)
|
||||
|
||||
if officer_type in TaskOrders.OFFICERS:
|
||||
workspace = task_order.workspace
|
||||
|
||||
|
@ -43,3 +43,7 @@ class Permissions(object):
|
||||
|
||||
ADD_TAG_TO_WORKSPACE = "add_tag_to_workspace"
|
||||
REMOVE_TAG_FROM_WORKSPACE = "remove_tag_from_workspace"
|
||||
|
||||
VIEW_TASK_ORDER = "view_task_order"
|
||||
UPDATE_TASK_ORDER = "update_task_order"
|
||||
ADD_TASK_ORDER_OFFICER = "add_task_order_officers"
|
||||
|
@ -1,5 +1,5 @@
|
||||
from io import BytesIO
|
||||
from flask import Response
|
||||
from flask import g, Response
|
||||
|
||||
from . import task_orders_bp
|
||||
from atst.domain.task_orders import TaskOrders
|
||||
@ -8,7 +8,7 @@ from atst.utils.docx import Docx
|
||||
|
||||
@task_orders_bp.route("/task_orders/download_summary/<task_order_id>")
|
||||
def download_summary(task_order_id):
|
||||
task_order = TaskOrders.get(task_order_id)
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
byte_str = BytesIO()
|
||||
Docx.render(byte_str, data=task_order.to_dictionary())
|
||||
filename = "{}.docx".format(task_order.portfolio_name)
|
||||
|
@ -47,7 +47,8 @@ TASK_ORDER_SECTIONS = [
|
||||
|
||||
|
||||
class ShowTaskOrderWorkflow:
|
||||
def __init__(self, screen=1, task_order_id=None):
|
||||
def __init__(self, user, screen=1, task_order_id=None):
|
||||
self.user = user
|
||||
self.screen = screen
|
||||
self.task_order_id = task_order_id
|
||||
self._section = TASK_ORDER_SECTIONS[screen - 1]
|
||||
@ -57,7 +58,7 @@ class ShowTaskOrderWorkflow:
|
||||
@property
|
||||
def task_order(self):
|
||||
if not self._task_order and self.task_order_id:
|
||||
self._task_order = TaskOrders.get(self.task_order_id)
|
||||
self._task_order = TaskOrders.get(self.user, self.task_order_id)
|
||||
|
||||
return self._task_order
|
||||
|
||||
@ -122,13 +123,13 @@ class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow):
|
||||
|
||||
def _update_task_order(self):
|
||||
if self.task_order:
|
||||
TaskOrders.update(self.task_order, **self.form.data)
|
||||
TaskOrders.update(self.user, self.task_order, **self.form.data)
|
||||
else:
|
||||
ws = Workspaces.create(self.user, self.form.portfolio_name.data)
|
||||
to_data = self.form.data.copy()
|
||||
to_data.pop("portfolio_name")
|
||||
self._task_order = TaskOrders.create(workspace=ws, creator=self.user)
|
||||
TaskOrders.update(self.task_order, **to_data)
|
||||
self._task_order = TaskOrders.create(self.user, ws)
|
||||
TaskOrders.update(self.user, self.task_order, **to_data)
|
||||
|
||||
OFFICER_INVITATIONS = [
|
||||
{
|
||||
@ -189,7 +190,7 @@ class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow):
|
||||
@task_orders_bp.route("/task_orders/new/<int:screen>")
|
||||
@task_orders_bp.route("/task_orders/new/<int:screen>/<task_order_id>")
|
||||
def new(screen, task_order_id=None):
|
||||
workflow = ShowTaskOrderWorkflow(screen, task_order_id)
|
||||
workflow = ShowTaskOrderWorkflow(g.current_user, screen, task_order_id)
|
||||
return render_template(
|
||||
workflow.template,
|
||||
current=screen,
|
||||
|
@ -1,8 +1,14 @@
|
||||
import pytest
|
||||
|
||||
from atst.domain.task_orders import TaskOrders, TaskOrderError
|
||||
from atst.domain.exceptions import UnauthorizedError
|
||||
|
||||
from tests.factories import TaskOrderFactory, UserFactory
|
||||
from tests.factories import (
|
||||
TaskOrderFactory,
|
||||
UserFactory,
|
||||
WorkspaceRoleFactory,
|
||||
WorkspaceFactory,
|
||||
)
|
||||
|
||||
|
||||
def test_is_section_complete():
|
||||
@ -60,3 +66,37 @@ def test_add_officer_who_is_already_workspace_member():
|
||||
assert task_order.contracting_officer == owner
|
||||
member = task_order.workspace.members[0]
|
||||
assert member.user == owner and member.role_name == "owner"
|
||||
|
||||
|
||||
def test_task_order_access():
|
||||
creator = UserFactory.create()
|
||||
member = UserFactory.create()
|
||||
rando = UserFactory.create()
|
||||
officer = UserFactory.create()
|
||||
|
||||
def check_access(can, cannot, method_name, method_args):
|
||||
method = getattr(TaskOrders, method_name)
|
||||
|
||||
for user in can:
|
||||
assert method(user, *method_args)
|
||||
|
||||
for user in cannot:
|
||||
with pytest.raises(UnauthorizedError):
|
||||
method(user, *method_args)
|
||||
|
||||
workspace = WorkspaceFactory.create(owner=creator)
|
||||
task_order = TaskOrderFactory.create(creator=creator, workspace=workspace)
|
||||
WorkspaceRoleFactory.create(user=member, workspace=task_order.workspace)
|
||||
TaskOrders.add_officer(
|
||||
creator, task_order, "contracting_officer", officer.to_dictionary()
|
||||
)
|
||||
|
||||
check_access([creator, officer], [member, rando], "get", [task_order.id])
|
||||
check_access([creator], [officer, member, rando], "create", [workspace])
|
||||
check_access([creator, officer], [member, rando], "update", [task_order])
|
||||
check_access(
|
||||
[creator],
|
||||
[officer, member, rando],
|
||||
"add_officer",
|
||||
[task_order, "contracting_officer", rando.to_dictionary()],
|
||||
)
|
||||
|
@ -5,7 +5,7 @@ from zipfile import ZipFile
|
||||
|
||||
from atst.utils.docx import Docx
|
||||
|
||||
from tests.factories import TaskOrderFactory
|
||||
from tests.factories import TaskOrderFactory, WorkspaceFactory, UserFactory
|
||||
|
||||
|
||||
def xml_translated(val):
|
||||
@ -13,8 +13,10 @@ def xml_translated(val):
|
||||
|
||||
|
||||
def test_download_summary(client, user_session):
|
||||
user_session()
|
||||
task_order = TaskOrderFactory.create()
|
||||
user = UserFactory.create()
|
||||
workspace = WorkspaceFactory.create(owner=user)
|
||||
task_order = TaskOrderFactory.create(creator=user, workspace=workspace)
|
||||
user_session(user)
|
||||
response = client.get(
|
||||
url_for("task_orders.download_summary", task_order_id=task_order.id)
|
||||
)
|
||||
|
@ -92,11 +92,20 @@ def test_task_order_form_shows_errors(client, user_session):
|
||||
assert "Not a valid integer" in body
|
||||
|
||||
|
||||
def test_show_task_order():
|
||||
workflow = ShowTaskOrderWorkflow()
|
||||
@pytest.fixture
|
||||
def task_order():
|
||||
user = UserFactory.create()
|
||||
workspace = WorkspaceFactory.create(owner=user)
|
||||
|
||||
return TaskOrderFactory.create(creator=user, workspace=workspace)
|
||||
|
||||
|
||||
def test_show_task_order(task_order):
|
||||
workflow = ShowTaskOrderWorkflow(task_order.creator)
|
||||
assert workflow.task_order is None
|
||||
task_order = TaskOrderFactory.create()
|
||||
another_workflow = ShowTaskOrderWorkflow(task_order_id=task_order.id)
|
||||
another_workflow = ShowTaskOrderWorkflow(
|
||||
task_order.creator, task_order_id=task_order.id
|
||||
)
|
||||
assert another_workflow.task_order == task_order
|
||||
|
||||
|
||||
@ -108,19 +117,19 @@ def test_show_task_order_form_list_data():
|
||||
assert workflow.form.complexity.data == complexity
|
||||
|
||||
|
||||
def test_show_task_order_form():
|
||||
workflow = ShowTaskOrderWorkflow()
|
||||
def test_show_task_order_form(task_order):
|
||||
workflow = ShowTaskOrderWorkflow(task_order.creator)
|
||||
assert not workflow.form.data["app_migration"]
|
||||
task_order = TaskOrderFactory.create()
|
||||
another_workflow = ShowTaskOrderWorkflow(task_order_id=task_order.id)
|
||||
another_workflow = ShowTaskOrderWorkflow(
|
||||
task_order.creator, task_order_id=task_order.id
|
||||
)
|
||||
assert (
|
||||
another_workflow.form.data["defense_component"] == task_order.defense_component
|
||||
)
|
||||
|
||||
|
||||
def test_show_task_order_display_screen():
|
||||
task_order = TaskOrderFactory.create()
|
||||
workflow = ShowTaskOrderWorkflow(task_order_id=task_order.id)
|
||||
def test_show_task_order_display_screen(task_order):
|
||||
workflow = ShowTaskOrderWorkflow(task_order.creator, task_order_id=task_order.id)
|
||||
screens = workflow.display_screens
|
||||
# every form section is complete
|
||||
for i in range(2):
|
||||
@ -139,22 +148,17 @@ def test_update_task_order_with_no_task_order():
|
||||
assert workflow.task_order.scope == to_data["scope"]
|
||||
|
||||
|
||||
def test_update_task_order_with_existing_task_order():
|
||||
user = UserFactory.create()
|
||||
task_order = TaskOrderFactory.create()
|
||||
def test_update_task_order_with_existing_task_order(task_order):
|
||||
to_data = serialize_dates(TaskOrderFactory.dictionary())
|
||||
workflow = UpdateTaskOrderWorkflow(
|
||||
to_data, user, screen=2, task_order_id=task_order.id
|
||||
to_data, task_order.creator, screen=2, task_order_id=task_order.id
|
||||
)
|
||||
assert workflow.task_order.start_date != to_data["start_date"]
|
||||
workflow.update()
|
||||
assert workflow.task_order.start_date.strftime("%m/%d/%Y") == to_data["start_date"]
|
||||
|
||||
|
||||
def test_invite_officers_to_task_order(queue):
|
||||
user = UserFactory.create()
|
||||
workspace = WorkspaceFactory.create(owner=user)
|
||||
task_order = TaskOrderFactory.create(creator=user, workspace=workspace)
|
||||
def test_invite_officers_to_task_order(task_order, queue):
|
||||
to_data = {
|
||||
**TaskOrderFactory.dictionary(),
|
||||
"ko_invite": True,
|
||||
@ -162,7 +166,7 @@ def test_invite_officers_to_task_order(queue):
|
||||
"so_invite": True,
|
||||
}
|
||||
workflow = UpdateTaskOrderWorkflow(
|
||||
to_data, user, screen=3, task_order_id=task_order.id
|
||||
to_data, task_order.creator, screen=3, task_order_id=task_order.id
|
||||
)
|
||||
workflow.update()
|
||||
workspace = task_order.workspace
|
||||
@ -179,10 +183,7 @@ def test_invite_officers_to_task_order(queue):
|
||||
assert task_order.security_officer.dod_id == to_data["so_dod_id"]
|
||||
|
||||
|
||||
def test_add_officer_but_do_not_invite(queue):
|
||||
user = UserFactory.create()
|
||||
workspace = WorkspaceFactory.create(owner=user)
|
||||
task_order = TaskOrderFactory.create(creator=user, workspace=workspace)
|
||||
def test_add_officer_but_do_not_invite(task_order, queue):
|
||||
to_data = {
|
||||
**TaskOrderFactory.dictionary(),
|
||||
"ko_invite": False,
|
||||
@ -190,7 +191,7 @@ def test_add_officer_but_do_not_invite(queue):
|
||||
"so_invite": False,
|
||||
}
|
||||
workflow = UpdateTaskOrderWorkflow(
|
||||
to_data, user, screen=3, task_order_id=task_order.id
|
||||
to_data, task_order.creator, screen=3, task_order_id=task_order.id
|
||||
)
|
||||
workflow.update()
|
||||
workspace = task_order.workspace
|
||||
|
Loading…
x
Reference in New Issue
Block a user