Merge pull request #725 from dod-ccpo/access-decorator-touchup
Access decorator touchup
This commit is contained in:
commit
c661bb98bd
@ -10,7 +10,7 @@ from atst.domain.invitations import Invitations
|
|||||||
from atst.domain.exceptions import UnauthorizedError
|
from atst.domain.exceptions import UnauthorizedError
|
||||||
|
|
||||||
|
|
||||||
def check_access(permission, message, exception, *args, **kwargs):
|
def check_access(permission, message, override, *args, **kwargs):
|
||||||
access_args = {"message": message}
|
access_args = {"message": message}
|
||||||
|
|
||||||
if "application_id" in kwargs:
|
if "application_id" in kwargs:
|
||||||
@ -30,7 +30,7 @@ def check_access(permission, message, exception, *args, **kwargs):
|
|||||||
g.current_user, kwargs["portfolio_id"]
|
g.current_user, kwargs["portfolio_id"]
|
||||||
)
|
)
|
||||||
|
|
||||||
if exception is not None and exception(g.current_user, **access_args, **kwargs):
|
if override is not None and override(g.current_user, **access_args, **kwargs):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
user_can_access(g.current_user, permission, **access_args)
|
user_can_access(g.current_user, permission, **access_args)
|
||||||
@ -38,12 +38,12 @@ def check_access(permission, message, exception, *args, **kwargs):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def user_can_access_decorator(permission, message=None, exception=None):
|
def user_can_access_decorator(permission, message=None, override=None):
|
||||||
def decorator(f):
|
def decorator(f):
|
||||||
@wraps(f)
|
@wraps(f)
|
||||||
def decorated_function(*args, **kwargs):
|
def decorated_function(*args, **kwargs):
|
||||||
try:
|
try:
|
||||||
check_access(permission, message, exception, *args, **kwargs)
|
check_access(permission, message, override, *args, **kwargs)
|
||||||
app.logger.info(
|
app.logger.info(
|
||||||
"[access] User {} accessed {} {}".format(
|
"[access] User {} accessed {} {}".format(
|
||||||
g.current_user.id, request.method, request.path
|
g.current_user.id, request.method, request.path
|
||||||
|
@ -107,7 +107,7 @@ def wrap_environment_role_lookup(
|
|||||||
|
|
||||||
|
|
||||||
@portfolios_bp.route("/portfolios/<portfolio_id>/environments/<environment_id>/access")
|
@portfolios_bp.route("/portfolios/<portfolio_id>/environments/<environment_id>/access")
|
||||||
@user_can(None, exception=wrap_environment_role_lookup, message="access environment")
|
@user_can(None, override=wrap_environment_role_lookup, message="access environment")
|
||||||
def access_environment(portfolio_id, environment_id):
|
def access_environment(portfolio_id, environment_id):
|
||||||
env_role = EnvironmentRoles.get(g.current_user.id, environment_id)
|
env_role = EnvironmentRoles.get(g.current_user.id, environment_id)
|
||||||
token = app.csp.cloud.get_access_token(env_role)
|
token = app.csp.cloud.get_access_token(env_role)
|
||||||
|
@ -99,7 +99,7 @@ def wrap_check_is_ko_or_cor(user, task_order_id=None, **_kwargs):
|
|||||||
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/review")
|
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/review")
|
||||||
@user_can(
|
@user_can(
|
||||||
None,
|
None,
|
||||||
exception=wrap_check_is_ko_or_cor,
|
override=wrap_check_is_ko_or_cor,
|
||||||
message="view contracting officer review form",
|
message="view contracting officer review form",
|
||||||
)
|
)
|
||||||
def ko_review(portfolio_id, task_order_id):
|
def ko_review(portfolio_id, task_order_id):
|
||||||
@ -182,7 +182,7 @@ def resend_invite(portfolio_id, task_order_id):
|
|||||||
"/portfolios/<portfolio_id>/task_order/<task_order_id>/review", methods=["POST"]
|
"/portfolios/<portfolio_id>/task_order/<task_order_id>/review", methods=["POST"]
|
||||||
)
|
)
|
||||||
@user_can(
|
@user_can(
|
||||||
None, exception=wrap_check_is_ko_or_cor, message="submit contracting officer review"
|
None, override=wrap_check_is_ko_or_cor, message="submit contracting officer review"
|
||||||
)
|
)
|
||||||
def submit_ko_review(portfolio_id, task_order_id, form=None):
|
def submit_ko_review(portfolio_id, task_order_id, form=None):
|
||||||
task_order = TaskOrders.get(task_order_id)
|
task_order = TaskOrders.get(task_order_id)
|
||||||
@ -296,7 +296,7 @@ def wrap_check_is_so(user, task_order_id=None, **_kwargs):
|
|||||||
|
|
||||||
|
|
||||||
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254")
|
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254")
|
||||||
@user_can(None, exception=wrap_check_is_so, message="view security officer review form")
|
@user_can(None, override=wrap_check_is_so, message="view security officer review form")
|
||||||
def so_review(portfolio_id, task_order_id):
|
def so_review(portfolio_id, task_order_id):
|
||||||
task_order = TaskOrders.get(task_order_id)
|
task_order = TaskOrders.get(task_order_id)
|
||||||
form = so_review_form(task_order)
|
form = so_review_form(task_order)
|
||||||
@ -313,7 +313,7 @@ def so_review(portfolio_id, task_order_id):
|
|||||||
"/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254", methods=["POST"]
|
"/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254", methods=["POST"]
|
||||||
)
|
)
|
||||||
@user_can(
|
@user_can(
|
||||||
None, exception=wrap_check_is_so, message="submit security officer review form"
|
None, override=wrap_check_is_so, message="submit security officer review form"
|
||||||
)
|
)
|
||||||
def submit_so_review(portfolio_id, task_order_id):
|
def submit_so_review(portfolio_id, task_order_id):
|
||||||
task_order = TaskOrders.get(task_order_id)
|
task_order = TaskOrders.get(task_order_id)
|
||||||
|
@ -265,7 +265,7 @@ def is_new_task_order(*_args, **kwargs):
|
|||||||
@task_orders_bp.route("/portfolios/<portfolio_id>/task_orders/new/<int:screen>")
|
@task_orders_bp.route("/portfolios/<portfolio_id>/task_orders/new/<int:screen>")
|
||||||
@user_can(
|
@user_can(
|
||||||
Permissions.CREATE_TASK_ORDER,
|
Permissions.CREATE_TASK_ORDER,
|
||||||
exception=is_new_task_order,
|
override=is_new_task_order,
|
||||||
message="view new task order form",
|
message="view new task order form",
|
||||||
)
|
)
|
||||||
def new(screen, task_order_id=None, portfolio_id=None):
|
def new(screen, task_order_id=None, portfolio_id=None):
|
||||||
@ -316,7 +316,7 @@ def new(screen, task_order_id=None, portfolio_id=None):
|
|||||||
)
|
)
|
||||||
@user_can(
|
@user_can(
|
||||||
Permissions.CREATE_TASK_ORDER,
|
Permissions.CREATE_TASK_ORDER,
|
||||||
exception=is_new_task_order,
|
override=is_new_task_order,
|
||||||
message="update task order",
|
message="update task order",
|
||||||
)
|
)
|
||||||
def update(screen, task_order_id=None, portfolio_id=None):
|
def update(screen, task_order_id=None, portfolio_id=None):
|
||||||
|
@ -29,7 +29,7 @@ def wrap_check_is_ko(user, task_order_id=None, **_kwargs):
|
|||||||
|
|
||||||
@task_orders_bp.route("/task_orders/<task_order_id>/digital_signature", methods=["GET"])
|
@task_orders_bp.route("/task_orders/<task_order_id>/digital_signature", methods=["GET"])
|
||||||
@user_can(
|
@user_can(
|
||||||
None, exception=wrap_check_is_ko, message="view contracting officer signature page"
|
None, override=wrap_check_is_ko, message="view contracting officer signature page"
|
||||||
)
|
)
|
||||||
def signature_requested(task_order_id):
|
def signature_requested(task_order_id):
|
||||||
task_order = find_unsigned_ko_to(task_order_id)
|
task_order = find_unsigned_ko_to(task_order_id)
|
||||||
@ -46,7 +46,7 @@ def signature_requested(task_order_id):
|
|||||||
"/task_orders/<task_order_id>/digital_signature", methods=["POST"]
|
"/task_orders/<task_order_id>/digital_signature", methods=["POST"]
|
||||||
)
|
)
|
||||||
@user_can(
|
@user_can(
|
||||||
None, exception=wrap_check_is_ko, message="submit contracting officer signature"
|
None, override=wrap_check_is_ko, message="submit contracting officer signature"
|
||||||
)
|
)
|
||||||
def record_signature(task_order_id):
|
def record_signature(task_order_id):
|
||||||
task_order = find_unsigned_ko_to(task_order_id)
|
task_order = find_unsigned_ko_to(task_order_id)
|
||||||
|
@ -136,10 +136,9 @@ def test_user_can_access_decorator(set_current_user):
|
|||||||
_edit_portfolio_name(portfolio_id=portfolio.id)
|
_edit_portfolio_name(portfolio_id=portfolio.id)
|
||||||
|
|
||||||
|
|
||||||
def test_user_can_access_decorator_exceptions(set_current_user):
|
def test_user_can_access_decorator_override(set_current_user):
|
||||||
rando_calrissian = UserFactory.create()
|
rando_calrissian = UserFactory.create()
|
||||||
darth_vader = UserFactory.create()
|
darth_vader = UserFactory.create()
|
||||||
portfolio = PortfolioFactory.create()
|
|
||||||
|
|
||||||
def _can_fly_the_millenium_falcon(u, *args, **kwargs):
|
def _can_fly_the_millenium_falcon(u, *args, **kwargs):
|
||||||
if u == rando_calrissian:
|
if u == rando_calrissian:
|
||||||
@ -148,7 +147,7 @@ def test_user_can_access_decorator_exceptions(set_current_user):
|
|||||||
raise UnauthorizedError(u, "is not rando")
|
raise UnauthorizedError(u, "is not rando")
|
||||||
|
|
||||||
@user_can_access_decorator(
|
@user_can_access_decorator(
|
||||||
Permissions.EDIT_PORTFOLIO_NAME, exception=_can_fly_the_millenium_falcon
|
Permissions.EDIT_PORTFOLIO_NAME, override=_can_fly_the_millenium_falcon
|
||||||
)
|
)
|
||||||
def _cloud_city(*args, **kwargs):
|
def _cloud_city(*args, **kwargs):
|
||||||
return True
|
return True
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
from unittest.mock import Mock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from flask import url_for, Response
|
from flask import url_for, Response
|
||||||
@ -5,7 +7,6 @@ from flask import url_for, Response
|
|||||||
import atst
|
import atst
|
||||||
from atst.app import make_app, make_config
|
from atst.app import make_app, make_config
|
||||||
from atst.domain.auth import UNPROTECTED_ROUTES as _NO_LOGIN_REQUIRED
|
from atst.domain.auth import UNPROTECTED_ROUTES as _NO_LOGIN_REQUIRED
|
||||||
import atst.domain.authz as authz
|
|
||||||
from atst.domain.permission_sets import PermissionSets
|
from atst.domain.permission_sets import PermissionSets
|
||||||
from atst.models.portfolio_role import Status as PortfolioRoleStatus
|
from atst.models.portfolio_role import Status as PortfolioRoleStatus
|
||||||
|
|
||||||
@ -54,21 +55,6 @@ sample_app = make_app(sample_config)
|
|||||||
_PROTECTED_ROUTES = protected_routes(sample_app)
|
_PROTECTED_ROUTES = protected_routes(sample_app)
|
||||||
|
|
||||||
|
|
||||||
class Null:
|
|
||||||
"""
|
|
||||||
Very simple null object. Will return itself for all attribute
|
|
||||||
calls:
|
|
||||||
> foo = Null()
|
|
||||||
> foo.bar.baz == foo
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __getattr__(self, name):
|
|
||||||
return self
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.access_check
|
@pytest.mark.access_check
|
||||||
@pytest.mark.parametrize("rule,route", _PROTECTED_ROUTES)
|
@pytest.mark.parametrize("rule,route", _PROTECTED_ROUTES)
|
||||||
def test_all_protected_routes_have_access_control(
|
def test_all_protected_routes_have_access_control(
|
||||||
@ -82,7 +68,7 @@ def test_all_protected_routes_have_access_control(
|
|||||||
# monkeypatch any object lookups that might happen in the access decorator
|
# monkeypatch any object lookups that might happen in the access decorator
|
||||||
monkeypatch.setattr("atst.domain.portfolios.Portfolios.for_user", lambda *a: [])
|
monkeypatch.setattr("atst.domain.portfolios.Portfolios.for_user", lambda *a: [])
|
||||||
monkeypatch.setattr("atst.domain.portfolios.Portfolios.get", lambda *a: None)
|
monkeypatch.setattr("atst.domain.portfolios.Portfolios.get", lambda *a: None)
|
||||||
monkeypatch.setattr("atst.domain.task_orders.TaskOrders.get", lambda *a: Null())
|
monkeypatch.setattr("atst.domain.task_orders.TaskOrders.get", lambda *a: Mock())
|
||||||
|
|
||||||
# patch the internal function the access decorator uses so that
|
# patch the internal function the access decorator uses so that
|
||||||
# we can check that it was called
|
# we can check that it was called
|
||||||
|
Loading…
x
Reference in New Issue
Block a user