update authorization decorator to check for application permissions
This commit is contained in:
		| @@ -2,11 +2,19 @@ from atst.utils import first_or_none | ||||
| from atst.models.permissions import Permissions | ||||
| from atst.domain.exceptions import UnauthorizedError | ||||
| from atst.models.portfolio_role import Status as PortfolioRoleStatus | ||||
| from atst.models.application_role import Status as ApplicationRoleStatus | ||||
|  | ||||
|  | ||||
| class Authorization(object): | ||||
|     @classmethod | ||||
|     def has_atat_permission(cls, user, permission): | ||||
|         return permission in user.permissions | ||||
|  | ||||
|     @classmethod | ||||
|     def has_portfolio_permission(cls, user, portfolio, permission): | ||||
|         if Authorization.has_atat_permission(user, permission): | ||||
|             return True | ||||
|  | ||||
|         port_role = first_or_none( | ||||
|             lambda pr: pr.portfolio == portfolio, user.portfolio_roles | ||||
|         ) | ||||
| @@ -16,22 +24,37 @@ class Authorization(object): | ||||
|             return False | ||||
|  | ||||
|     @classmethod | ||||
|     def has_atat_permission(cls, user, permission): | ||||
|         return permission in user.permissions | ||||
|     def has_application_permission(cls, user, application, permission): | ||||
|         if Authorization.has_portfolio_permission( | ||||
|             user, application.portfolio, permission | ||||
|         ): | ||||
|             return True | ||||
|  | ||||
|         app_role = first_or_none( | ||||
|             lambda app_role: app_role.application == application, user.application_roles | ||||
|         ) | ||||
|         if app_role and app_role.status is not ApplicationRoleStatus.DISABLED: | ||||
|             return permission in app_role.permissions | ||||
|         else: | ||||
|             return False | ||||
|  | ||||
|     @classmethod | ||||
|     def check_portfolio_permission(cls, user, portfolio, permission, message): | ||||
|         if not ( | ||||
|             Authorization.has_atat_permission(user, permission) | ||||
|             or Authorization.has_portfolio_permission(user, portfolio, permission) | ||||
|         ): | ||||
|     def check_atat_permission(cls, user, permission, message): | ||||
|         if not Authorization.has_atat_permission(user, permission): | ||||
|             raise UnauthorizedError(user, message) | ||||
|  | ||||
|         return True | ||||
|  | ||||
|     @classmethod | ||||
|     def check_atat_permission(cls, user, permission, message): | ||||
|         if not Authorization.has_atat_permission(user, permission): | ||||
|     def check_portfolio_permission(cls, user, portfolio, permission, message): | ||||
|         if not Authorization.has_portfolio_permission(user, portfolio, permission): | ||||
|             raise UnauthorizedError(user, message) | ||||
|  | ||||
|         return True | ||||
|  | ||||
|     @classmethod | ||||
|     def check_application_permission(cls, user, portfolio, permission, message): | ||||
|         if not Authorization.has_application_permission(user, portfolio, permission): | ||||
|             raise UnauthorizedError(user, message) | ||||
|  | ||||
|         return True | ||||
| @@ -70,8 +93,12 @@ class Authorization(object): | ||||
|             raise UnauthorizedError(user, message) | ||||
|  | ||||
|  | ||||
| def user_can_access(user, permission, portfolio=None, message=None): | ||||
|     if portfolio: | ||||
| def user_can_access(user, permission, portfolio=None, application=None, message=None): | ||||
|     if application: | ||||
|         Authorization.check_application_permission( | ||||
|             user, application, permission, message | ||||
|         ) | ||||
|     elif portfolio: | ||||
|         Authorization.check_portfolio_permission(user, portfolio, permission, message) | ||||
|     else: | ||||
|         Authorization.check_atat_permission(user, permission, message) | ||||
|   | ||||
| @@ -15,6 +15,7 @@ def check_access(permission, message, override, *args, **kwargs): | ||||
|  | ||||
|     if "application_id" in kwargs: | ||||
|         application = Applications.get(kwargs["application_id"]) | ||||
|         access_args["application"] = application | ||||
|         access_args["portfolio"] = application.portfolio | ||||
|  | ||||
|     elif "task_order_id" in kwargs: | ||||
|   | ||||
| @@ -1,6 +1,7 @@ | ||||
| import pytest | ||||
|  | ||||
| from tests.factories import ( | ||||
|     ApplicationRoleFactory, | ||||
|     TaskOrderFactory, | ||||
|     UserFactory, | ||||
|     PortfolioFactory, | ||||
| @@ -69,6 +70,22 @@ def test_has_portfolio_permission(): | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def test_has_application_permission(): | ||||
|     role_one = PermissionSets.get(PermissionSets.EDIT_APPLICATION_TEAM) | ||||
|     role_two = PermissionSets.get(PermissionSets.EDIT_APPLICATION_ENVIRONMENTS) | ||||
|     app_role = ApplicationRoleFactory.create(permission_sets=[role_one, role_two]) | ||||
|     different_user = UserFactory.create() | ||||
|     assert Authorization.has_application_permission( | ||||
|         app_role.user, app_role.application, Permissions.EDIT_ENVIRONMENT | ||||
|     ) | ||||
|     assert not Authorization.has_portfolio_permission( | ||||
|         app_role.user, app_role.application, Permissions.DELETE_ENVIRONMENT | ||||
|     ) | ||||
|     assert not Authorization.has_portfolio_permission( | ||||
|         different_user, app_role.application, Permissions.DELETE_ENVIRONMENT | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def test_user_can_access(): | ||||
|     ccpo = UserFactory.create_ccpo() | ||||
|     edit_admin = UserFactory.create() | ||||
| @@ -120,7 +137,23 @@ def set_current_user(request_ctx): | ||||
|     request_ctx.g.current_user = None | ||||
|  | ||||
|  | ||||
| def test_user_can_access_decorator(set_current_user): | ||||
| def test_user_can_access_decorator_atat_level(set_current_user): | ||||
|     ccpo = UserFactory.create_ccpo() | ||||
|     rando = UserFactory.create() | ||||
|  | ||||
|     @user_can_access_decorator(Permissions.VIEW_AUDIT_LOG) | ||||
|     def _access_activity_log(*args, **kwargs): | ||||
|         return True | ||||
|  | ||||
|     set_current_user(ccpo) | ||||
|     assert _access_activity_log() | ||||
|  | ||||
|     set_current_user(rando) | ||||
|     with pytest.raises(UnauthorizedError): | ||||
|         _access_activity_log() | ||||
|  | ||||
|  | ||||
| def test_user_can_access_decorator_portfolio_level(set_current_user): | ||||
|     ccpo = UserFactory.create_ccpo() | ||||
|     edit_admin = UserFactory.create() | ||||
|     view_admin = UserFactory.create() | ||||
| @@ -144,6 +177,36 @@ def test_user_can_access_decorator(set_current_user): | ||||
|         _edit_portfolio_name(portfolio_id=portfolio.id) | ||||
|  | ||||
|  | ||||
| def test_user_can_access_decorator_application_level(set_current_user): | ||||
|     ccpo = UserFactory.create_ccpo() | ||||
|     port_admin = UserFactory.create() | ||||
|     app_user = UserFactory.create() | ||||
|     rando = UserFactory.create() | ||||
|  | ||||
|     portfolio = PortfolioFactory.create( | ||||
|         owner=port_admin, applications=[{"name": "Mos Eisley"}] | ||||
|     ) | ||||
|     app = portfolio.applications[0] | ||||
|     ApplicationRoleFactory.create(application=app, user=app_user) | ||||
|  | ||||
|     @user_can_access_decorator(Permissions.VIEW_APPLICATION) | ||||
|     def _stroll_into_mos_eisley(*args, **kwargs): | ||||
|         return True | ||||
|  | ||||
|     set_current_user(ccpo) | ||||
|     assert _stroll_into_mos_eisley(application_id=app.id) | ||||
|  | ||||
|     set_current_user(port_admin) | ||||
|     assert _stroll_into_mos_eisley(application_id=app.id) | ||||
|  | ||||
|     set_current_user(app_user) | ||||
|     assert _stroll_into_mos_eisley(application_id=app.id) | ||||
|  | ||||
|     set_current_user(rando) | ||||
|     with pytest.raises(UnauthorizedError): | ||||
|         _stroll_into_mos_eisley(application_id=app.id) | ||||
|  | ||||
|  | ||||
| def test_user_can_access_decorator_override(set_current_user): | ||||
|     rando_calrissian = UserFactory.create() | ||||
|     darth_vader = UserFactory.create() | ||||
|   | ||||
| @@ -63,6 +63,10 @@ def base_portfolio_permission_sets(): | ||||
|     ] | ||||
|  | ||||
|  | ||||
| def base_application_permission_sets(): | ||||
|     return [PermissionSets.get(PermissionSets.VIEW_APPLICATION)] | ||||
|  | ||||
|  | ||||
| def get_all_portfolio_permission_sets(): | ||||
|     return PermissionSets.get_many(PortfolioRoles.PORTFOLIO_PERMISSION_SETS) | ||||
|  | ||||
| @@ -220,7 +224,7 @@ class ApplicationRoleFactory(Base): | ||||
|     application = factory.SubFactory(ApplicationFactory) | ||||
|     user = factory.SubFactory(UserFactory) | ||||
|     status = ApplicationRoleStatus.PENDING | ||||
|     permission_sets = [] | ||||
|     permission_sets = factory.LazyFunction(base_application_permission_sets) | ||||
|  | ||||
|  | ||||
| class EnvironmentRoleFactory(Base): | ||||
|   | ||||
| @@ -12,6 +12,7 @@ from atst.models.portfolio_role import Status as PortfolioRoleStatus | ||||
|  | ||||
| from tests.factories import ( | ||||
|     AttachmentFactory, | ||||
|     ApplicationRoleFactory, | ||||
|     InvitationFactory, | ||||
|     PortfolioFactory, | ||||
|     PortfolioRoleFactory, | ||||
| @@ -156,12 +157,14 @@ def test_portfolios_access_environment_access(get_url_assert_status): | ||||
| def test_portfolios_application_members_access(get_url_assert_status): | ||||
|     ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_APPLICATION_MANAGEMENT) | ||||
|     owner = user_with() | ||||
|     app_dev = user_with() | ||||
|     rando = user_with() | ||||
|     portfolio = PortfolioFactory.create( | ||||
|         owner=owner, | ||||
|         applications=[{"name": "Mos Eisley", "description": "Where Han shot first"}], | ||||
|     ) | ||||
|     app = portfolio.applications[0] | ||||
|     ApplicationRoleFactory.create(application=app, user=app_dev) | ||||
|  | ||||
|     url = url_for( | ||||
|         "portfolios.application_members", | ||||
| @@ -170,6 +173,7 @@ def test_portfolios_application_members_access(get_url_assert_status): | ||||
|     ) | ||||
|     get_url_assert_status(ccpo, url, 200) | ||||
|     get_url_assert_status(owner, url, 200) | ||||
|     get_url_assert_status(app_dev, url, 200) | ||||
|     get_url_assert_status(rando, url, 404) | ||||
|  | ||||
|  | ||||
| @@ -570,7 +574,7 @@ def test_portfolios_update_member_access(post_url_assert_status): | ||||
|     prt_member = user_with() | ||||
|  | ||||
|     portfolio = PortfolioFactory.create(owner=owner) | ||||
|     prr = PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio) | ||||
|     PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio) | ||||
|  | ||||
|     url = url_for( | ||||
|         "portfolios.update_member", portfolio_id=portfolio.id, member_id=prt_member.id | ||||
| @@ -588,7 +592,7 @@ def test_portfolios_view_member_access(get_url_assert_status): | ||||
|     prt_member = user_with() | ||||
|  | ||||
|     portfolio = PortfolioFactory.create(owner=owner) | ||||
|     prr = PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio) | ||||
|     PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio) | ||||
|  | ||||
|     url = url_for( | ||||
|         "portfolios.view_member", portfolio_id=portfolio.id, member_id=prt_member.id | ||||
|   | ||||
		Reference in New Issue
	
	Block a user