apply access decorator to routes
This commit is contained in:
parent
0ea21fbb9b
commit
de7c69bde7
@ -1,7 +1,6 @@
|
||||
from atst.database import db
|
||||
from atst.domain.environments import Environments
|
||||
from atst.domain.exceptions import NotFoundError
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.models.application import Application
|
||||
from atst.models.environment import Environment
|
||||
from atst.models.environment_role import EnvironmentRole
|
||||
|
@ -4,6 +4,15 @@ from flask import g
|
||||
|
||||
from . import user_can_access
|
||||
from atst.domain.portfolios import Portfolios
|
||||
from atst.domain.task_orders import TaskOrders
|
||||
|
||||
|
||||
def evaluate_exceptions(user, permission, exceptions, **kwargs):
|
||||
return (
|
||||
True
|
||||
if True in [exc(g.current_user, permission, **kwargs) for exc in exceptions]
|
||||
else False
|
||||
)
|
||||
|
||||
|
||||
def user_can_access_decorator(permission, message=None, exceptions=None):
|
||||
@ -16,13 +25,14 @@ def user_can_access_decorator(permission, message=None, exceptions=None):
|
||||
access_args["portfolio"] = Portfolios.get(
|
||||
g.current_user, kwargs["portfolio_id"]
|
||||
)
|
||||
elif "task_order_id" in kwargs:
|
||||
task_order = TaskOrders.get(g.current_user, kwargs["task_order_id"])
|
||||
access_args["portfolio"] = task_order.portfolio
|
||||
|
||||
if exceptions:
|
||||
evaluated = [
|
||||
exc(g.current_user, permission, **access_args) for exc in exceptions
|
||||
]
|
||||
if True in evaluated:
|
||||
return True
|
||||
if exceptions and evaluate_exceptions(
|
||||
g.current_user, permission, exceptions, **access_args, **kwargs
|
||||
):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
user_can_access(g.current_user, permission, **access_args)
|
||||
|
||||
|
@ -4,7 +4,6 @@ from sqlalchemy.orm.exc import NoResultFound
|
||||
from atst.database import db
|
||||
from atst.models.invitation import Invitation, Status as InvitationStatus
|
||||
from atst.domain.portfolio_roles import PortfolioRoles
|
||||
from atst.domain.portfolios import Portfolios
|
||||
|
||||
from .exceptions import NotFoundError
|
||||
|
||||
@ -118,7 +117,6 @@ class Invitations(object):
|
||||
|
||||
@classmethod
|
||||
def resend(cls, user, portfolio_id, token):
|
||||
portfolio = Portfolios.get(user, portfolio_id)
|
||||
previous_invitation = Invitations._get(token)
|
||||
Invitations._update_status(previous_invitation, InvitationStatus.REVOKED)
|
||||
|
||||
|
@ -22,6 +22,8 @@ from atst.domain.audit_log import AuditLog
|
||||
from atst.domain.auth import logout as _logout
|
||||
from atst.domain.common import Paginator
|
||||
from atst.domain.portfolios import Portfolios
|
||||
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.utils.flash import formatted_flash as flash
|
||||
|
||||
|
||||
@ -143,6 +145,7 @@ def logout():
|
||||
|
||||
|
||||
@bp.route("/activity-history")
|
||||
@user_can(Permissions.VIEW_AUDIT_LOG, message="view activity log")
|
||||
def activity_history():
|
||||
pagination_opts = Paginator.get_pagination_opts(request)
|
||||
audit_events = AuditLog.get_all_events(g.current_user, pagination_opts)
|
||||
|
@ -13,15 +13,19 @@ from atst.domain.exceptions import UnauthorizedError
|
||||
from atst.domain.applications import Applications
|
||||
from atst.domain.portfolios import Portfolios
|
||||
from atst.forms.application import NewApplicationForm, ApplicationForm
|
||||
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
||||
from atst.models.permissions import Permissions
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/applications")
|
||||
@user_can(Permissions.VIEW_APPLICATION)
|
||||
def portfolio_applications(portfolio_id):
|
||||
portfolio = Portfolios.get(g.current_user, portfolio_id)
|
||||
return render_template("portfolios/applications/index.html", portfolio=portfolio)
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/applications/new")
|
||||
@user_can(Permissions.CREATE_APPLICATION)
|
||||
def new_application(portfolio_id):
|
||||
portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id)
|
||||
form = NewApplicationForm()
|
||||
@ -31,6 +35,7 @@ def new_application(portfolio_id):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/applications/new", methods=["POST"])
|
||||
@user_can(Permissions.CREATE_APPLICATION)
|
||||
def create_application(portfolio_id):
|
||||
portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id)
|
||||
form = NewApplicationForm(http_request.form)
|
||||
@ -54,6 +59,7 @@ def create_application(portfolio_id):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/applications/<application_id>/edit")
|
||||
@user_can(Permissions.EDIT_APPLICATION)
|
||||
def edit_application(portfolio_id, application_id):
|
||||
portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id)
|
||||
application = Applications.get(g.current_user, portfolio, application_id)
|
||||
@ -70,6 +76,7 @@ def edit_application(portfolio_id, application_id):
|
||||
@portfolios_bp.route(
|
||||
"/portfolios/<portfolio_id>/applications/<application_id>/edit", methods=["POST"]
|
||||
)
|
||||
@user_can(Permissions.EDIT_APPLICATION)
|
||||
def update_application(portfolio_id, application_id):
|
||||
portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id)
|
||||
application = Applications.get(g.current_user, portfolio, application_id)
|
||||
@ -91,6 +98,8 @@ def update_application(portfolio_id, application_id):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/environments/<environment_id>/access")
|
||||
# TODO: we probably need a different permission for this
|
||||
@user_can(Permissions.VIEW_ENVIRONMENT)
|
||||
def access_environment(portfolio_id, environment_id):
|
||||
env_role = EnvironmentRoles.get(g.current_user.id, environment_id)
|
||||
if not env_role:
|
||||
|
@ -8,8 +8,9 @@ from atst.domain.portfolios import Portfolios
|
||||
from atst.domain.audit_log import AuditLog
|
||||
from atst.domain.common import Paginator
|
||||
from atst.forms.portfolio import PortfolioForm
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.domain.permission_sets import PermissionSets
|
||||
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
||||
from atst.models.permissions import Permissions
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios")
|
||||
@ -37,6 +38,7 @@ def serialize_member(member):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/admin")
|
||||
@user_can(Permissions.VIEW_PORTFOLIO_ADMIN)
|
||||
def portfolio_admin(portfolio_id):
|
||||
portfolio = Portfolios.get_for_update_information(g.current_user, portfolio_id)
|
||||
form = PortfolioForm(data={"name": portfolio.name})
|
||||
@ -56,6 +58,7 @@ def portfolio_admin(portfolio_id):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/edit", methods=["POST"])
|
||||
@user_can(Permissions.EDIT_PORTFOLIO_NAME)
|
||||
def edit_portfolio(portfolio_id):
|
||||
portfolio = Portfolios.get_for_update_information(g.current_user, portfolio_id)
|
||||
form = PortfolioForm(http_request.form)
|
||||
@ -69,6 +72,7 @@ def edit_portfolio(portfolio_id):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>")
|
||||
@user_can(Permissions.VIEW_PORTFOLIO)
|
||||
def show_portfolio(portfolio_id):
|
||||
return redirect(
|
||||
url_for("portfolios.portfolio_applications", portfolio_id=portfolio_id)
|
||||
@ -76,6 +80,7 @@ def show_portfolio(portfolio_id):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/reports")
|
||||
@user_can(Permissions.VIEW_PORTFOLIO_REPORTS)
|
||||
def portfolio_reports(portfolio_id):
|
||||
portfolio = Portfolios.get(g.current_user, portfolio_id)
|
||||
today = date.today()
|
||||
|
@ -5,6 +5,8 @@ from atst.domain.portfolios import Portfolios
|
||||
from atst.domain.invitations import Invitations
|
||||
from atst.queue import queue
|
||||
from atst.utils.flash import formatted_flash as flash
|
||||
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
||||
from atst.models.permissions import Permissions
|
||||
|
||||
|
||||
def send_invite_email(owner_name, token, new_member_email):
|
||||
@ -43,6 +45,7 @@ def accept_invitation(token):
|
||||
@portfolios_bp.route(
|
||||
"/portfolios/<portfolio_id>/invitations/<token>/revoke", methods=["POST"]
|
||||
)
|
||||
@user_can(Permissions.EDIT_PORTFOLIO_USERS)
|
||||
def revoke_invitation(portfolio_id, token):
|
||||
portfolio = Portfolios.get_for_update_member(g.current_user, portfolio_id)
|
||||
Invitations.revoke(token)
|
||||
@ -53,6 +56,7 @@ def revoke_invitation(portfolio_id, token):
|
||||
@portfolios_bp.route(
|
||||
"/portfolios/<portfolio_id>/invitations/<token>/resend", methods=["POST"]
|
||||
)
|
||||
@user_can(Permissions.EDIT_PORTFOLIO_USERS)
|
||||
def resend_invitation(portfolio_id, token):
|
||||
invite = Invitations.resend(g.current_user, portfolio_id, token)
|
||||
send_invite_email(g.current_user.full_name, invite.token, invite.email)
|
||||
|
@ -12,6 +12,8 @@ from atst.domain.environment_roles import EnvironmentRoles
|
||||
from atst.services.invitation import Invitation as InvitationService
|
||||
import atst.forms.portfolio_member as member_forms
|
||||
from atst.forms.data import ENVIRONMENT_ROLES, ENV_ROLE_MODAL_DESCRIPTION
|
||||
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
||||
from atst.models.permissions import Permissions
|
||||
|
||||
from atst.utils.flash import formatted_flash as flash
|
||||
|
||||
@ -32,6 +34,7 @@ def serialize_portfolio_role(portfolio_role):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/members")
|
||||
@user_can(Permissions.VIEW_PORTFOLIO_USERS)
|
||||
def portfolio_members(portfolio_id):
|
||||
portfolio = Portfolios.get_with_members(g.current_user, portfolio_id)
|
||||
members_list = [serialize_portfolio_role(k) for k in portfolio.members]
|
||||
@ -45,6 +48,7 @@ def portfolio_members(portfolio_id):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/applications/<application_id>/members")
|
||||
@user_can(Permissions.VIEW_APPLICATION_MEMBER)
|
||||
def application_members(portfolio_id, application_id):
|
||||
portfolio = Portfolios.get_with_members(g.current_user, portfolio_id)
|
||||
application = Applications.get(g.current_user, portfolio, application_id)
|
||||
@ -60,6 +64,7 @@ def application_members(portfolio_id, application_id):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/members/new")
|
||||
@user_can(Permissions.CREATE_PORTFOLIO_USERS)
|
||||
def new_member(portfolio_id):
|
||||
portfolio = Portfolios.get(g.current_user, portfolio_id)
|
||||
form = member_forms.NewForm()
|
||||
@ -69,6 +74,7 @@ def new_member(portfolio_id):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/members/new", methods=["POST"])
|
||||
@user_can(Permissions.CREATE_PORTFOLIO_USERS)
|
||||
def create_member(portfolio_id):
|
||||
portfolio = Portfolios.get(g.current_user, portfolio_id)
|
||||
form = member_forms.NewForm(http_request.form)
|
||||
@ -97,6 +103,7 @@ def create_member(portfolio_id):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/members/<member_id>/member_edit")
|
||||
@user_can(Permissions.VIEW_PORTFOLIO_USERS)
|
||||
def view_member(portfolio_id, member_id):
|
||||
portfolio = Portfolios.get(g.current_user, portfolio_id)
|
||||
member = PortfolioRoles.get(portfolio_id, member_id)
|
||||
@ -125,6 +132,7 @@ def view_member(portfolio_id, member_id):
|
||||
@portfolios_bp.route(
|
||||
"/portfolios/<portfolio_id>/members/<member_id>/member_edit", methods=["POST"]
|
||||
)
|
||||
@user_can(Permissions.EDIT_PORTFOLIO_USERS)
|
||||
def update_member(portfolio_id, member_id):
|
||||
portfolio = Portfolios.get(g.current_user, portfolio_id)
|
||||
member = PortfolioRoles.get(portfolio_id, member_id)
|
||||
@ -163,6 +171,7 @@ def update_member(portfolio_id, member_id):
|
||||
@portfolios_bp.route(
|
||||
"/portfolios/<portfolio_id>/members/<member_id>/revoke_access", methods=["POST"]
|
||||
)
|
||||
@user_can(Permissions.EDIT_PORTFOLIO_USERS)
|
||||
def revoke_access(portfolio_id, member_id):
|
||||
revoked_role = Portfolios.revoke_access(g.current_user, portfolio_id, member_id)
|
||||
flash("revoked_portfolio_access", member_name=revoked_role.user.full_name)
|
||||
|
@ -20,9 +20,12 @@ from atst.services.invitation import (
|
||||
OFFICER_INVITATIONS,
|
||||
Invitation as InvitationService,
|
||||
)
|
||||
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
||||
from atst.models.permissions import Permissions
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/task_orders")
|
||||
@user_can(Permissions.VIEW_PORTFOLIO_FUNDING)
|
||||
def portfolio_funding(portfolio_id):
|
||||
portfolio = Portfolios.get(g.current_user, portfolio_id)
|
||||
task_orders_by_status = defaultdict(list)
|
||||
@ -66,6 +69,7 @@ def portfolio_funding(portfolio_id):
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>")
|
||||
@user_can(Permissions.VIEW_TASK_ORDER_DETAILS)
|
||||
def view_task_order(portfolio_id, task_order_id):
|
||||
portfolio = Portfolios.get(g.current_user, portfolio_id)
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
@ -85,13 +89,19 @@ def view_task_order(portfolio_id, task_order_id):
|
||||
)
|
||||
|
||||
|
||||
def wrap_check_is_ko_or_cor(user, _perm, task_order_id=None, **_kwargs):
|
||||
task_order = TaskOrders.get(user, task_order_id)
|
||||
Authorization.check_is_ko_or_cor(user, task_order)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/review")
|
||||
@user_can(None, exceptions=[wrap_check_is_ko_or_cor])
|
||||
def ko_review(portfolio_id, task_order_id):
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
portfolio = Portfolios.get(g.current_user, portfolio_id)
|
||||
|
||||
Authorization.check_is_ko_or_cor(g.current_user, task_order)
|
||||
|
||||
if TaskOrders.all_sections_complete(task_order):
|
||||
return render_template(
|
||||
"/portfolios/task_orders/review.html",
|
||||
@ -107,6 +117,7 @@ def ko_review(portfolio_id, task_order_id):
|
||||
"/portfolios/<portfolio_id>/task_order/<task_order_id>/resend_invite",
|
||||
methods=["POST"],
|
||||
)
|
||||
@user_can(Permissions.EDIT_TASK_ORDER_DETAILS)
|
||||
def resend_invite(portfolio_id, task_order_id, form=None):
|
||||
invite_type = http_request.args.get("invite_type")
|
||||
|
||||
@ -164,13 +175,12 @@ def resend_invite(portfolio_id, task_order_id, form=None):
|
||||
@portfolios_bp.route(
|
||||
"/portfolios/<portfolio_id>/task_order/<task_order_id>/review", methods=["POST"]
|
||||
)
|
||||
@user_can(None, exceptions=[wrap_check_is_ko_or_cor])
|
||||
def submit_ko_review(portfolio_id, task_order_id, form=None):
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
form_data = {**http_request.form, **http_request.files}
|
||||
form = KOReviewForm(form_data)
|
||||
|
||||
Authorization.check_is_ko_or_cor(g.current_user, task_order)
|
||||
|
||||
if form.validate():
|
||||
TaskOrders.update(user=g.current_user, task_order=task_order, **form.data)
|
||||
if Authorization.is_ko(g.current_user, task_order) and TaskOrders.can_ko_sign(
|
||||
@ -199,6 +209,7 @@ def submit_ko_review(portfolio_id, task_order_id, form=None):
|
||||
@portfolios_bp.route(
|
||||
"/portfolios/<portfolio_id>/task_order/<task_order_id>/invitations"
|
||||
)
|
||||
@user_can(Permissions.EDIT_TASK_ORDER_DETAILS)
|
||||
def task_order_invitations(portfolio_id, task_order_id):
|
||||
portfolio = Portfolios.get(g.current_user, portfolio_id)
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
@ -219,6 +230,7 @@ def task_order_invitations(portfolio_id, task_order_id):
|
||||
"/portfolios/<portfolio_id>/task_order/<task_order_id>/invitations",
|
||||
methods=["POST"],
|
||||
)
|
||||
@user_can(Permissions.EDIT_TASK_ORDER_DETAILS)
|
||||
def edit_task_order_invitations(portfolio_id, task_order_id):
|
||||
portfolio = Portfolios.get(g.current_user, portfolio_id)
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
@ -266,11 +278,17 @@ def so_review_form(task_order):
|
||||
return DD254Form(data=form_data)
|
||||
|
||||
|
||||
def wrap_check_is_so(user, _perm, task_order_id=None, **_kwargs):
|
||||
task_order = TaskOrders.get(user, task_order_id)
|
||||
Authorization.check_is_so(user, task_order)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254")
|
||||
@user_can(None, exceptions=[wrap_check_is_so])
|
||||
def so_review(portfolio_id, task_order_id):
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
Authorization.check_is_so(g.current_user, task_order)
|
||||
|
||||
form = so_review_form(task_order)
|
||||
|
||||
return render_template(
|
||||
@ -284,10 +302,9 @@ def so_review(portfolio_id, task_order_id):
|
||||
@portfolios_bp.route(
|
||||
"/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254", methods=["POST"]
|
||||
)
|
||||
@user_can(None, exceptions=[wrap_check_is_so])
|
||||
def submit_so_review(portfolio_id, task_order_id):
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
Authorization.check_is_so(g.current_user, task_order)
|
||||
|
||||
form = DD254Form(http_request.form)
|
||||
|
||||
if form.validate():
|
||||
|
@ -5,9 +5,12 @@ from . import task_orders_bp
|
||||
from atst.domain.task_orders import TaskOrders
|
||||
from atst.domain.exceptions import NotFoundError
|
||||
from atst.utils.docx import Docx
|
||||
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
||||
from atst.models.permissions import Permissions
|
||||
|
||||
|
||||
@task_orders_bp.route("/task_orders/download_summary/<task_order_id>")
|
||||
@user_can(Permissions.VIEW_TASK_ORDER_DETAILS)
|
||||
def download_summary(task_order_id):
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
byte_str = BytesIO()
|
||||
@ -31,6 +34,7 @@ def send_file(attachment):
|
||||
|
||||
|
||||
@task_orders_bp.route("/task_orders/csp_estimate/<task_order_id>")
|
||||
@user_can(Permissions.VIEW_TASK_ORDER_DETAILS)
|
||||
def download_csp_estimate(task_order_id):
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
if task_order.csp_estimate:
|
||||
@ -40,6 +44,7 @@ def download_csp_estimate(task_order_id):
|
||||
|
||||
|
||||
@task_orders_bp.route("/task_orders/pdf/<task_order_id>")
|
||||
@user_can(Permissions.VIEW_TASK_ORDER_DETAILS)
|
||||
def download_task_order_pdf(task_order_id):
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
if task_order.pdf:
|
||||
|
@ -4,9 +4,12 @@ from . import task_orders_bp
|
||||
from atst.domain.task_orders import TaskOrders
|
||||
from atst.utils.flash import formatted_flash as flash
|
||||
from atst.services.invitation import update_officer_invitations
|
||||
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
||||
from atst.models.permissions import Permissions
|
||||
|
||||
|
||||
@task_orders_bp.route("/task_orders/invite/<task_order_id>", methods=["POST"])
|
||||
@user_can(Permissions.EDIT_TASK_ORDER_DETAILS)
|
||||
def invite(task_order_id):
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
if TaskOrders.all_sections_complete(task_order):
|
||||
|
@ -14,6 +14,8 @@ from atst.domain.task_orders import TaskOrders
|
||||
from atst.domain.portfolios import Portfolios
|
||||
from atst.utils.flash import formatted_flash as flash
|
||||
import atst.forms.task_order as task_order_form
|
||||
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
||||
from atst.models.permissions import Permissions
|
||||
|
||||
|
||||
TASK_ORDER_SECTIONS = [
|
||||
@ -249,9 +251,19 @@ def get_started():
|
||||
return render_template("task_orders/new/get_started.html") # pragma: no cover
|
||||
|
||||
|
||||
def is_new_task_order(*args, **kwargs):
|
||||
return (
|
||||
"screen" in kwargs
|
||||
and kwargs["screen"] == 1
|
||||
and "task_order_id" not in kwargs
|
||||
and "portfolio_id" not in kwargs
|
||||
)
|
||||
|
||||
|
||||
@task_orders_bp.route("/task_orders/new/<int:screen>")
|
||||
@task_orders_bp.route("/task_orders/new/<int:screen>/<task_order_id>")
|
||||
@task_orders_bp.route("/portfolios/<portfolio_id>/task_orders/new/<int:screen>")
|
||||
@user_can(Permissions.CREATE_TASK_ORDER, exceptions=[is_new_task_order])
|
||||
def new(screen, task_order_id=None, portfolio_id=None):
|
||||
workflow = ShowTaskOrderWorkflow(
|
||||
g.current_user, screen, task_order_id, portfolio_id
|
||||
@ -298,6 +310,7 @@ def new(screen, task_order_id=None, portfolio_id=None):
|
||||
@task_orders_bp.route(
|
||||
"/portfolios/<portfolio_id>/task_orders/new/<int:screen>", methods=["POST"]
|
||||
)
|
||||
@user_can(Permissions.CREATE_TASK_ORDER, exceptions=[is_new_task_order])
|
||||
def update(screen, task_order_id=None, portfolio_id=None):
|
||||
form_data = {**http_request.form, **http_request.files}
|
||||
workflow = UpdateTaskOrderWorkflow(
|
||||
|
@ -8,11 +8,11 @@ from atst.domain.exceptions import NoAccessError
|
||||
from atst.domain.task_orders import TaskOrders
|
||||
from atst.forms.task_order import SignatureForm
|
||||
from atst.utils.flash import formatted_flash as flash
|
||||
from atst.domain.authz.decorator import user_can_access_decorator as user_can
|
||||
|
||||
|
||||
def find_unsigned_ko_to(task_order_id):
|
||||
task_order = TaskOrders.get(g.current_user, task_order_id)
|
||||
Authorization.check_is_ko(g.current_user, task_order)
|
||||
|
||||
if not TaskOrders.can_ko_sign(task_order):
|
||||
raise NoAccessError("task_order")
|
||||
@ -20,7 +20,15 @@ def find_unsigned_ko_to(task_order_id):
|
||||
return task_order
|
||||
|
||||
|
||||
def wrap_check_is_ko(user, _perm, task_order_id=None, **_kwargs):
|
||||
task_order = TaskOrders.get(user, task_order_id)
|
||||
Authorization.check_is_ko(user, task_order)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@task_orders_bp.route("/task_orders/<task_order_id>/digital_signature", methods=["GET"])
|
||||
@user_can(None, exceptions=[wrap_check_is_ko])
|
||||
def signature_requested(task_order_id):
|
||||
task_order = find_unsigned_ko_to(task_order_id)
|
||||
|
||||
@ -35,6 +43,7 @@ def signature_requested(task_order_id):
|
||||
@task_orders_bp.route(
|
||||
"/task_orders/<task_order_id>/digital_signature", methods=["POST"]
|
||||
)
|
||||
@user_can(None, exceptions=[wrap_check_is_ko])
|
||||
def record_signature(task_order_id):
|
||||
task_order = find_unsigned_ko_to(task_order_id)
|
||||
|
||||
|
@ -22,7 +22,7 @@ def developer():
|
||||
return UserFactory.create()
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
@pytest.mark.skip(reason="redo as a route access test")
|
||||
def test_non_admin_cannot_view_audit_log(developer):
|
||||
with pytest.raises(UnauthorizedError):
|
||||
AuditLog.get_all_events(developer)
|
||||
@ -64,7 +64,7 @@ def test_ws_owner_can_view_ws_audit_log():
|
||||
assert len(events) > 0
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
@pytest.mark.skip(reason="redo as a route access test")
|
||||
def test_other_users_cannot_view_portfolio_audit_log():
|
||||
with pytest.raises(UnauthorizedError):
|
||||
portfolio = PortfolioFactory.create()
|
||||
|
@ -46,19 +46,18 @@ def test_portfolio_has_timestamps(portfolio):
|
||||
assert portfolio.time_created == portfolio.time_updated
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
@pytest.mark.skip(reason="redo as a route access test")
|
||||
def test_portfolios_get_ensures_user_is_in_portfolio(portfolio, portfolio_owner):
|
||||
outside_user = UserFactory.create()
|
||||
with pytest.raises(UnauthorizedError):
|
||||
Portfolios.get(outside_user, portfolio.id)
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
def test_get_for_update_applications_allows_owner(portfolio, portfolio_owner):
|
||||
Portfolios.get_for_update_applications(portfolio_owner, portfolio.id)
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
@pytest.mark.skip(reason="redo as a route access test")
|
||||
def test_get_for_update_applications_blocks_developer(portfolio):
|
||||
developer = UserFactory.create()
|
||||
PortfolioRoles.add(developer, portfolio.id)
|
||||
@ -97,7 +96,7 @@ def test_can_add_existing_user_to_portfolio(portfolio, portfolio_owner):
|
||||
assert not new_member.user.provisional
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
@pytest.mark.skip(reason="redo as a route access test")
|
||||
def test_need_permission_to_create_portfolio_role(portfolio, portfolio_owner):
|
||||
random_user = UserFactory.create()
|
||||
|
||||
@ -131,7 +130,7 @@ def test_update_portfolio_role_role(portfolio, portfolio_owner):
|
||||
assert updated_member.portfolio == portfolio
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
@pytest.mark.skip(reason="redo as a route access test")
|
||||
def test_need_permission_to_update_portfolio_role_role(portfolio, portfolio_owner):
|
||||
random_user = UserFactory.create()
|
||||
user_data = {
|
||||
@ -159,7 +158,7 @@ def test_ccpo_can_view_portfolio_members(portfolio, portfolio_owner):
|
||||
assert Portfolios.get_with_members(ccpo, portfolio.id)
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
@pytest.mark.skip(reason="redo as a route access test")
|
||||
def test_random_user_cannot_view_portfolio_members(portfolio):
|
||||
developer = UserFactory.create()
|
||||
|
||||
@ -288,7 +287,7 @@ def test_for_user_returns_all_portfolios_for_ccpo(portfolio, portfolio_owner):
|
||||
assert len(sams_portfolios) == 2
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
@pytest.mark.skip(reason="redo as a route access test")
|
||||
def test_get_for_update_information(portfolio, portfolio_owner):
|
||||
owner_ws = Portfolios.get_for_update_information(portfolio_owner, portfolio.id)
|
||||
assert portfolio == owner_ws
|
||||
|
@ -95,7 +95,7 @@ def test_add_officer_who_is_already_portfolio_member():
|
||||
assert member.user == owner
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
@pytest.mark.skip(reason="redo as route access test")
|
||||
def test_task_order_access():
|
||||
creator = UserFactory.create()
|
||||
member = UserFactory.create()
|
||||
|
@ -175,7 +175,6 @@ def test_user_with_permission_can_update_application(client, user_session):
|
||||
assert application.description == "A very cool application."
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
def test_user_without_permission_cannot_update_application(client, user_session):
|
||||
dev = UserFactory.create()
|
||||
owner = UserFactory.create()
|
||||
|
@ -95,7 +95,6 @@ def test_member_accepts_invalid_invite(client, user_session):
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
def test_user_who_has_not_accepted_portfolio_invite_cannot_view(client, user_session):
|
||||
user = UserFactory.create()
|
||||
portfolio = PortfolioFactory.create()
|
||||
|
@ -60,7 +60,6 @@ def test_user_with_permission_has_add_member_link(client, user_session):
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
def test_user_without_permission_has_no_add_member_link(client, user_session):
|
||||
user = UserFactory.create()
|
||||
portfolio = PortfolioFactory.create()
|
||||
@ -73,7 +72,6 @@ def test_user_without_permission_has_no_add_member_link(client, user_session):
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.auth
|
||||
def test_permissions_for_view_member(client, user_session):
|
||||
user = UserFactory.create()
|
||||
portfolio = PortfolioFactory.create()
|
||||
|
@ -371,27 +371,6 @@ def test_mo_redirected_to_build_page(client, user_session, portfolio):
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_cor_redirected_to_build_page(client, user_session, portfolio):
|
||||
cor = UserFactory.create()
|
||||
PortfolioRoleFactory.create(
|
||||
portfolio=portfolio,
|
||||
user=cor,
|
||||
status=PortfolioStatus.ACTIVE,
|
||||
permission_sets=[
|
||||
PermissionSets.get(PermissionSets.VIEW_PORTFOLIO),
|
||||
PermissionSets.get(PermissionSets.VIEW_PORTFOLIO_FUNDING),
|
||||
],
|
||||
)
|
||||
task_order = TaskOrderFactory.create(
|
||||
portfolio=portfolio, contracting_officer_representative=cor
|
||||
)
|
||||
user_session(cor)
|
||||
response = client.get(
|
||||
url_for("task_orders.new", screen=1, task_order_id=task_order.id)
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_submit_completed_ko_review_page_as_cor(
|
||||
client, user_session, pdf_upload, portfolio, user
|
||||
):
|
||||
@ -620,6 +599,7 @@ def test_resend_invite_when_officer_type_missing(
|
||||
assert len(queue.get_queue()) == queue_length
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="KO should not be able to resend invites")
|
||||
def test_resend_invite_when_ko(app, client, user_session, portfolio, user):
|
||||
queue_length = len(queue.get_queue())
|
||||
|
||||
|
@ -63,7 +63,6 @@ class TestDownloadCSPEstimate:
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.auth
|
||||
def test_download_with_wrong_user(self, client, user_session):
|
||||
other_user = UserFactory.create()
|
||||
user_session(other_user)
|
||||
|
@ -2,11 +2,17 @@ import pytest
|
||||
from flask import url_for
|
||||
|
||||
from atst.domain.task_orders import TaskOrders
|
||||
from atst.domain.permission_sets import PermissionSets
|
||||
from atst.models.attachment import Attachment
|
||||
from atst.routes.task_orders.new import ShowTaskOrderWorkflow, UpdateTaskOrderWorkflow
|
||||
from atst.utils.localization import translate
|
||||
|
||||
from tests.factories import UserFactory, TaskOrderFactory, PortfolioFactory
|
||||
from tests.factories import (
|
||||
UserFactory,
|
||||
TaskOrderFactory,
|
||||
PortfolioFactory,
|
||||
PortfolioRoleFactory,
|
||||
)
|
||||
|
||||
|
||||
class TestShowTaskOrderWorkflow:
|
||||
@ -93,8 +99,6 @@ def test_to_on_pf_cannot_edit_pf_attributes():
|
||||
assert second_workflow.pf_attributes_read_only
|
||||
|
||||
|
||||
# TODO: this test will need to be more complicated when we add validation to
|
||||
# the forms
|
||||
def test_create_new_task_order(client, user_session, pdf_upload):
|
||||
creator = UserFactory.create()
|
||||
user_session(creator)
|
||||
@ -296,6 +300,11 @@ def test_update_task_order_with_existing_task_order(task_order):
|
||||
def test_update_to_redirects_to_ko_review(client, user_session, task_order):
|
||||
ko = UserFactory.create()
|
||||
task_order.contracting_officer = ko
|
||||
PortfolioRoleFactory.create(
|
||||
user=ko,
|
||||
portfolio=task_order.portfolio,
|
||||
permission_sets=[PermissionSets.get(PermissionSets.EDIT_PORTFOLIO_FUNDING)],
|
||||
)
|
||||
user_session(ko)
|
||||
url = url_for(
|
||||
"portfolios.ko_review",
|
||||
|
@ -4,7 +4,7 @@ from urllib.parse import quote
|
||||
from tests.factories import UserFactory
|
||||
|
||||
|
||||
PROTECTED_URL = "/portfolios"
|
||||
PROTECTED_URL = "/task_orders/new/get_started"
|
||||
|
||||
|
||||
def test_request_page_with_complete_profile(client, user_session):
|
||||
|
0
tests/routes/test_authz.py
Normal file
0
tests/routes/test_authz.py
Normal file
72
tests/test_authz.py
Normal file
72
tests/test_authz.py
Normal file
@ -0,0 +1,72 @@
|
||||
import pytest
|
||||
|
||||
import atst
|
||||
from atst.app import make_app, make_config
|
||||
from atst.domain.auth import UNPROTECTED_ROUTES as _NO_LOGIN_REQUIRED
|
||||
import atst.domain.authz as authz
|
||||
|
||||
from tests.factories import UserFactory
|
||||
|
||||
_NO_ACCESS_CHECK_REQUIRED = _NO_LOGIN_REQUIRED + [
|
||||
"task_orders.get_started",
|
||||
"atst.csp_environment_access",
|
||||
"atst.jedi_csp_calculator",
|
||||
"atst.styleguide",
|
||||
"dev.test_email",
|
||||
"dev.messages",
|
||||
"atst.home",
|
||||
"users.user",
|
||||
"users.update_user",
|
||||
"portfolios.accept_invitation",
|
||||
"atst.catch_all",
|
||||
"portfolios.portfolios",
|
||||
]
|
||||
|
||||
|
||||
def protected_routes(app):
|
||||
_protected_routes = []
|
||||
|
||||
for rule in app.url_map.iter_rules():
|
||||
args = [1] * len(rule.arguments)
|
||||
mock_args = dict(zip(rule.arguments, args))
|
||||
_n, route = rule.build(mock_args)
|
||||
if rule.endpoint in _NO_ACCESS_CHECK_REQUIRED or "/static" in route:
|
||||
continue
|
||||
|
||||
_protected_routes.append((rule, route))
|
||||
|
||||
return _protected_routes
|
||||
|
||||
|
||||
_PROTECTED_ROUTES = protected_routes(make_app(make_config()))
|
||||
|
||||
|
||||
class Null:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def __getattr__(self, name):
|
||||
return self
|
||||
|
||||
|
||||
@pytest.mark.parametrize("rule,route", _PROTECTED_ROUTES)
|
||||
def test_all_protected_routes_have_access_control(
|
||||
rule, route, mocker, client, user_session, monkeypatch
|
||||
):
|
||||
monkeypatch.setattr("atst.domain.portfolios.Portfolios.for_user", lambda *a: [])
|
||||
monkeypatch.setattr("atst.domain.portfolios.Portfolios.get", lambda *a: None)
|
||||
monkeypatch.setattr("atst.domain.task_orders.TaskOrders.get", lambda *a: Null())
|
||||
|
||||
mocker.patch("atst.domain.authz.decorator.user_can_access")
|
||||
mocker.patch("atst.domain.authz.decorator.evaluate_exceptions")
|
||||
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
|
||||
method = "get" if "GET" in rule.methods else "post"
|
||||
getattr(client, method)(route)
|
||||
|
||||
assert (
|
||||
atst.domain.authz.decorator.user_can_access.call_count == 1
|
||||
or atst.domain.authz.decorator.evaluate_exceptions.call_count == 1
|
||||
), "no access control for {}".format(rule.endpoint)
|
Loading…
x
Reference in New Issue
Block a user