diff --git a/atst/domain/authz/decorator.py b/atst/domain/authz/decorator.py index cf79808b..a10e3d77 100644 --- a/atst/domain/authz/decorator.py +++ b/atst/domain/authz/decorator.py @@ -8,11 +8,7 @@ from atst.domain.task_orders import TaskOrders from atst.domain.exceptions import UnauthorizedError -def evaluate_exceptions(user, exceptions, **kwargs): - return True if True in [exc(user, **kwargs) for exc in exceptions] else False - - -def check_access(permission, message, exceptions, *args, **kwargs): +def check_access(permission, message, exception, *args, **kwargs): access_args = {"message": message} if "portfolio_id" in kwargs: @@ -23,9 +19,7 @@ def check_access(permission, message, exceptions, *args, **kwargs): task_order = TaskOrders.get(kwargs["task_order_id"]) access_args["portfolio"] = task_order.portfolio - if exceptions and evaluate_exceptions( - g.current_user, exceptions, **access_args, **kwargs - ): + if exception is not None and exception(g.current_user, **access_args, **kwargs): return True user_can_access(g.current_user, permission, **access_args) @@ -33,12 +27,12 @@ def check_access(permission, message, exceptions, *args, **kwargs): return True -def user_can_access_decorator(permission, message=None, exceptions=None): +def user_can_access_decorator(permission, message=None, exception=None): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): try: - check_access(permission, message, exceptions, *args, **kwargs) + check_access(permission, message, exception, *args, **kwargs) app.logger.info( "[access] User {} accessed {}".format( g.current_user.id, g.current_user.dod_id, request.path diff --git a/atst/routes/portfolios/applications.py b/atst/routes/portfolios/applications.py index d8ab31da..b3825944 100644 --- a/atst/routes/portfolios/applications.py +++ b/atst/routes/portfolios/applications.py @@ -107,7 +107,7 @@ def wrap_environment_role_lookup( @portfolios_bp.route("/portfolios//environments//access") -@user_can(None, exceptions=[wrap_environment_role_lookup], message="access environment") +@user_can(None, exception=wrap_environment_role_lookup, message="access environment") def access_environment(portfolio_id, environment_id): env_role = EnvironmentRoles.get(g.current_user.id, environment_id) token = app.csp.cloud.get_access_token(env_role) diff --git a/atst/routes/portfolios/task_orders.py b/atst/routes/portfolios/task_orders.py index e2165aa8..95db9f09 100644 --- a/atst/routes/portfolios/task_orders.py +++ b/atst/routes/portfolios/task_orders.py @@ -99,7 +99,7 @@ def wrap_check_is_ko_or_cor(user, task_order_id=None, **_kwargs): @portfolios_bp.route("/portfolios//task_order//review") @user_can( None, - exceptions=[wrap_check_is_ko_or_cor], + exception=wrap_check_is_ko_or_cor, message="view contracting officer review form", ) def ko_review(portfolio_id, task_order_id): @@ -182,9 +182,7 @@ def resend_invite(portfolio_id, task_order_id): "/portfolios//task_order//review", methods=["POST"] ) @user_can( - None, - exceptions=[wrap_check_is_ko_or_cor], - message="submit contracting officer review", + None, exception=wrap_check_is_ko_or_cor, message="submit contracting officer review" ) def submit_ko_review(portfolio_id, task_order_id, form=None): task_order = TaskOrders.get(task_order_id) @@ -298,9 +296,7 @@ def wrap_check_is_so(user, task_order_id=None, **_kwargs): @portfolios_bp.route("/portfolios//task_order//dd254") -@user_can( - None, exceptions=[wrap_check_is_so], message="view security officer review form" -) +@user_can(None, exception=wrap_check_is_so, message="view security officer review form") def so_review(portfolio_id, task_order_id): task_order = TaskOrders.get(task_order_id) form = so_review_form(task_order) @@ -317,7 +313,7 @@ def so_review(portfolio_id, task_order_id): "/portfolios//task_order//dd254", methods=["POST"] ) @user_can( - None, exceptions=[wrap_check_is_so], message="submit security officer review form" + None, exception=wrap_check_is_so, message="submit security officer review form" ) def submit_so_review(portfolio_id, task_order_id): task_order = TaskOrders.get(task_order_id) diff --git a/atst/routes/task_orders/new.py b/atst/routes/task_orders/new.py index 22624827..91290ca3 100644 --- a/atst/routes/task_orders/new.py +++ b/atst/routes/task_orders/new.py @@ -265,7 +265,7 @@ def is_new_task_order(*_args, **kwargs): @task_orders_bp.route("/portfolios//task_orders/new/") @user_can( Permissions.CREATE_TASK_ORDER, - exceptions=[is_new_task_order], + exception=is_new_task_order, message="view new task order form", ) def new(screen, task_order_id=None, portfolio_id=None): @@ -316,7 +316,7 @@ def new(screen, task_order_id=None, portfolio_id=None): ) @user_can( Permissions.CREATE_TASK_ORDER, - exceptions=[is_new_task_order], + exception=is_new_task_order, message="update task order", ) def update(screen, task_order_id=None, portfolio_id=None): diff --git a/atst/routes/task_orders/signing.py b/atst/routes/task_orders/signing.py index e8eaf0a6..4f4132dd 100644 --- a/atst/routes/task_orders/signing.py +++ b/atst/routes/task_orders/signing.py @@ -29,9 +29,7 @@ def wrap_check_is_ko(user, task_order_id=None, **_kwargs): @task_orders_bp.route("/task_orders//digital_signature", methods=["GET"]) @user_can( - None, - exceptions=[wrap_check_is_ko], - message="view contracting officer signature page", + None, exception=wrap_check_is_ko, message="view contracting officer signature page" ) def signature_requested(task_order_id): task_order = find_unsigned_ko_to(task_order_id) @@ -48,7 +46,7 @@ def signature_requested(task_order_id): "/task_orders//digital_signature", methods=["POST"] ) @user_can( - None, exceptions=[wrap_check_is_ko], message="submit contracting officer signature" + None, exception=wrap_check_is_ko, message="submit contracting officer signature" ) def record_signature(task_order_id): task_order = find_unsigned_ko_to(task_order_id) diff --git a/tests/domain/test_authz.py b/tests/domain/test_authz.py index faf4c79d..4d6c88c0 100644 --- a/tests/domain/test_authz.py +++ b/tests/domain/test_authz.py @@ -137,11 +137,10 @@ def test_user_can_access_decorator_exceptions(set_current_user): rando_calrissian = UserFactory.create() portfolio = PortfolioFactory.create() - wont_work = lambda *args, **kwargs: False because_i_said_so = lambda *args, **kwargs: True @user_can_access_decorator( - Permissions.EDIT_PORTFOLIO_NAME, exceptions=[wont_work, because_i_said_so] + Permissions.EDIT_PORTFOLIO_NAME, exception=because_i_said_so ) def _edit_portfolio_name(*args, **kwargs): return True diff --git a/tests/test_access.py b/tests/test_access.py index de28021e..d86ed2ea 100644 --- a/tests/test_access.py +++ b/tests/test_access.py @@ -84,10 +84,9 @@ def test_all_protected_routes_have_access_control( monkeypatch.setattr("atst.domain.portfolios.Portfolios.get", lambda *a: None) monkeypatch.setattr("atst.domain.task_orders.TaskOrders.get", lambda *a: Null()) - # patch the two internal functions the access decorator uses so - # that we can check that one or the other was called - mocker.patch("atst.domain.authz.decorator.user_can_access") - mocker.patch("atst.domain.authz.decorator.evaluate_exceptions") + # patch the internal function the access decorator uses so that + # we can check that it was called + mocker.patch("atst.domain.authz.decorator.check_access") user = UserFactory.create() user_session(user) @@ -96,8 +95,7 @@ def test_all_protected_routes_have_access_control( getattr(client, method)(route) assert ( - atst.domain.authz.decorator.user_can_access.call_count == 1 - or atst.domain.authz.decorator.evaluate_exceptions.call_count == 1 + atst.domain.authz.decorator.check_access.call_count == 1 ), "no access control for {}".format(rule.endpoint)