only pass one func to exception kwarg in access decorator

This commit is contained in:
dandds 2019-03-22 06:05:59 -04:00
parent 905f03342d
commit 746a5834c1
7 changed files with 18 additions and 33 deletions

View File

@ -8,11 +8,7 @@ from atst.domain.task_orders import TaskOrders
from atst.domain.exceptions import UnauthorizedError
def evaluate_exceptions(user, exceptions, **kwargs):
return True if True in [exc(user, **kwargs) for exc in exceptions] else False
def check_access(permission, message, exceptions, *args, **kwargs):
def check_access(permission, message, exception, *args, **kwargs):
access_args = {"message": message}
if "portfolio_id" in kwargs:
@ -23,9 +19,7 @@ def check_access(permission, message, exceptions, *args, **kwargs):
task_order = TaskOrders.get(kwargs["task_order_id"])
access_args["portfolio"] = task_order.portfolio
if exceptions and evaluate_exceptions(
g.current_user, exceptions, **access_args, **kwargs
):
if exception is not None and exception(g.current_user, **access_args, **kwargs):
return True
user_can_access(g.current_user, permission, **access_args)
@ -33,12 +27,12 @@ def check_access(permission, message, exceptions, *args, **kwargs):
return True
def user_can_access_decorator(permission, message=None, exceptions=None):
def user_can_access_decorator(permission, message=None, exception=None):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
try:
check_access(permission, message, exceptions, *args, **kwargs)
check_access(permission, message, exception, *args, **kwargs)
app.logger.info(
"[access] User {} accessed {}".format(
g.current_user.id, g.current_user.dod_id, request.path

View File

@ -107,7 +107,7 @@ def wrap_environment_role_lookup(
@portfolios_bp.route("/portfolios/<portfolio_id>/environments/<environment_id>/access")
@user_can(None, exceptions=[wrap_environment_role_lookup], message="access environment")
@user_can(None, exception=wrap_environment_role_lookup, message="access environment")
def access_environment(portfolio_id, environment_id):
env_role = EnvironmentRoles.get(g.current_user.id, environment_id)
token = app.csp.cloud.get_access_token(env_role)

View File

@ -99,7 +99,7 @@ def wrap_check_is_ko_or_cor(user, task_order_id=None, **_kwargs):
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/review")
@user_can(
None,
exceptions=[wrap_check_is_ko_or_cor],
exception=wrap_check_is_ko_or_cor,
message="view contracting officer review form",
)
def ko_review(portfolio_id, task_order_id):
@ -182,9 +182,7 @@ def resend_invite(portfolio_id, task_order_id):
"/portfolios/<portfolio_id>/task_order/<task_order_id>/review", methods=["POST"]
)
@user_can(
None,
exceptions=[wrap_check_is_ko_or_cor],
message="submit contracting officer review",
None, exception=wrap_check_is_ko_or_cor, message="submit contracting officer review"
)
def submit_ko_review(portfolio_id, task_order_id, form=None):
task_order = TaskOrders.get(task_order_id)
@ -298,9 +296,7 @@ def wrap_check_is_so(user, task_order_id=None, **_kwargs):
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254")
@user_can(
None, exceptions=[wrap_check_is_so], message="view security officer review form"
)
@user_can(None, exception=wrap_check_is_so, message="view security officer review form")
def so_review(portfolio_id, task_order_id):
task_order = TaskOrders.get(task_order_id)
form = so_review_form(task_order)
@ -317,7 +313,7 @@ def so_review(portfolio_id, task_order_id):
"/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254", methods=["POST"]
)
@user_can(
None, exceptions=[wrap_check_is_so], message="submit security officer review form"
None, exception=wrap_check_is_so, message="submit security officer review form"
)
def submit_so_review(portfolio_id, task_order_id):
task_order = TaskOrders.get(task_order_id)

View File

@ -265,7 +265,7 @@ def is_new_task_order(*_args, **kwargs):
@task_orders_bp.route("/portfolios/<portfolio_id>/task_orders/new/<int:screen>")
@user_can(
Permissions.CREATE_TASK_ORDER,
exceptions=[is_new_task_order],
exception=is_new_task_order,
message="view new task order form",
)
def new(screen, task_order_id=None, portfolio_id=None):
@ -316,7 +316,7 @@ def new(screen, task_order_id=None, portfolio_id=None):
)
@user_can(
Permissions.CREATE_TASK_ORDER,
exceptions=[is_new_task_order],
exception=is_new_task_order,
message="update task order",
)
def update(screen, task_order_id=None, portfolio_id=None):

View File

@ -29,9 +29,7 @@ def wrap_check_is_ko(user, task_order_id=None, **_kwargs):
@task_orders_bp.route("/task_orders/<task_order_id>/digital_signature", methods=["GET"])
@user_can(
None,
exceptions=[wrap_check_is_ko],
message="view contracting officer signature page",
None, exception=wrap_check_is_ko, message="view contracting officer signature page"
)
def signature_requested(task_order_id):
task_order = find_unsigned_ko_to(task_order_id)
@ -48,7 +46,7 @@ def signature_requested(task_order_id):
"/task_orders/<task_order_id>/digital_signature", methods=["POST"]
)
@user_can(
None, exceptions=[wrap_check_is_ko], message="submit contracting officer signature"
None, exception=wrap_check_is_ko, message="submit contracting officer signature"
)
def record_signature(task_order_id):
task_order = find_unsigned_ko_to(task_order_id)

View File

@ -137,11 +137,10 @@ def test_user_can_access_decorator_exceptions(set_current_user):
rando_calrissian = UserFactory.create()
portfolio = PortfolioFactory.create()
wont_work = lambda *args, **kwargs: False
because_i_said_so = lambda *args, **kwargs: True
@user_can_access_decorator(
Permissions.EDIT_PORTFOLIO_NAME, exceptions=[wont_work, because_i_said_so]
Permissions.EDIT_PORTFOLIO_NAME, exception=because_i_said_so
)
def _edit_portfolio_name(*args, **kwargs):
return True

View File

@ -84,10 +84,9 @@ def test_all_protected_routes_have_access_control(
monkeypatch.setattr("atst.domain.portfolios.Portfolios.get", lambda *a: None)
monkeypatch.setattr("atst.domain.task_orders.TaskOrders.get", lambda *a: Null())
# patch the two internal functions the access decorator uses so
# that we can check that one or the other was called
mocker.patch("atst.domain.authz.decorator.user_can_access")
mocker.patch("atst.domain.authz.decorator.evaluate_exceptions")
# patch the internal function the access decorator uses so that
# we can check that it was called
mocker.patch("atst.domain.authz.decorator.check_access")
user = UserFactory.create()
user_session(user)
@ -96,8 +95,7 @@ def test_all_protected_routes_have_access_control(
getattr(client, method)(route)
assert (
atst.domain.authz.decorator.user_can_access.call_count == 1
or atst.domain.authz.decorator.evaluate_exceptions.call_count == 1
atst.domain.authz.decorator.check_access.call_count == 1
), "no access control for {}".format(rule.endpoint)