| @@ -33,6 +33,8 @@ from atst.queue import queue | |||||||
| from logging.config import dictConfig | from logging.config import dictConfig | ||||||
| from atst.utils.logging import JsonFormatter, RequestContextFilter | from atst.utils.logging import JsonFormatter, RequestContextFilter | ||||||
|  |  | ||||||
|  | from atst.utils.context_processors import assign_resources | ||||||
|  |  | ||||||
|  |  | ||||||
| ENV = os.getenv("FLASK_ENV", "dev") | ENV = os.getenv("FLASK_ENV", "dev") | ||||||
|  |  | ||||||
| @@ -83,6 +85,10 @@ def make_app(config): | |||||||
|     apply_authentication(app) |     apply_authentication(app) | ||||||
|     set_default_headers(app) |     set_default_headers(app) | ||||||
|  |  | ||||||
|  |     @app.before_request | ||||||
|  |     def _set_resources(): | ||||||
|  |         assign_resources(request.view_args) | ||||||
|  |  | ||||||
|     return app |     return app | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -107,6 +113,9 @@ def make_flask_callbacks(app): | |||||||
|     @app.after_request |     @app.after_request | ||||||
|     def _cleanup(response): |     def _cleanup(response): | ||||||
|         g.current_user = None |         g.current_user = None | ||||||
|  |         g.portfolio = None | ||||||
|  |         g.application = None | ||||||
|  |         g.task_order = None | ||||||
|         return response |         return response | ||||||
|  |  | ||||||
|  |  | ||||||
|   | |||||||
| @@ -3,39 +3,15 @@ from functools import wraps | |||||||
| from flask import g, current_app as app, request | from flask import g, current_app as app, request | ||||||
|  |  | ||||||
| from . import user_can_access | from . import user_can_access | ||||||
| from atst.domain.portfolios import Portfolios |  | ||||||
| from atst.domain.task_orders import TaskOrders |  | ||||||
| from atst.domain.applications import Applications |  | ||||||
| from atst.domain.environments import Environments |  | ||||||
| from atst.domain.invitations import PortfolioInvitations |  | ||||||
| from atst.domain.exceptions import UnauthorizedError | from atst.domain.exceptions import UnauthorizedError | ||||||
|  |  | ||||||
|  |  | ||||||
| def check_access(permission, message, override, *args, **kwargs): | def check_access(permission, message, override, *args, **kwargs): | ||||||
|     access_args = {"message": message} |     access_args = { | ||||||
|  |         "message": message, | ||||||
|     if "application_id" in kwargs: |         "portfolio": g.portfolio, | ||||||
|         application = Applications.get(kwargs["application_id"]) |         "application": g.application, | ||||||
|         access_args["application"] = application |     } | ||||||
|         access_args["portfolio"] = application.portfolio |  | ||||||
|  |  | ||||||
|     elif "task_order_id" in kwargs: |  | ||||||
|         task_order = TaskOrders.get(kwargs["task_order_id"]) |  | ||||||
|         access_args["portfolio"] = task_order.portfolio |  | ||||||
|  |  | ||||||
|     elif "token" in kwargs: |  | ||||||
|         invite = PortfolioInvitations._get(kwargs["token"]) |  | ||||||
|         access_args["portfolio"] = invite.role.portfolio |  | ||||||
|  |  | ||||||
|     elif "portfolio_id" in kwargs: |  | ||||||
|         access_args["portfolio"] = Portfolios.get( |  | ||||||
|             g.current_user, kwargs["portfolio_id"] |  | ||||||
|         ) |  | ||||||
|  |  | ||||||
|     elif "environment_id" in kwargs: |  | ||||||
|         environment = Environments.get(kwargs["environment_id"]) |  | ||||||
|         access_args["application"] = environment.application |  | ||||||
|         access_args["portfolio"] = environment.application.portfolio |  | ||||||
|  |  | ||||||
|     if override is not None and override(g.current_user, **access_args, **kwargs): |     if override is not None and override(g.current_user, **access_args, **kwargs): | ||||||
|         return True |         return True | ||||||
|   | |||||||
| @@ -19,9 +19,9 @@ def send_invite_email(owner_name, token, new_member_email): | |||||||
|     ) |     ) | ||||||
|  |  | ||||||
|  |  | ||||||
| @portfolios_bp.route("/portfolios/invitations/<token>", methods=["GET"]) | @portfolios_bp.route("/portfolios/invitations/<portfolio_token>", methods=["GET"]) | ||||||
| def accept_invitation(token): | def accept_invitation(portfolio_token): | ||||||
|     invite = PortfolioInvitations.accept(g.current_user, token) |     invite = PortfolioInvitations.accept(g.current_user, portfolio_token) | ||||||
|  |  | ||||||
|     for task_order in invite.portfolio.task_orders: |     for task_order in invite.portfolio.task_orders: | ||||||
|         if g.current_user in task_order.officers: |         if g.current_user in task_order.officers: | ||||||
| @@ -35,11 +35,11 @@ def accept_invitation(token): | |||||||
|  |  | ||||||
|  |  | ||||||
| @portfolios_bp.route( | @portfolios_bp.route( | ||||||
|     "/portfolios/<portfolio_id>/invitations/<token>/revoke", methods=["POST"] |     "/portfolios/<portfolio_id>/invitations/<portfolio_token>/revoke", methods=["POST"] | ||||||
| ) | ) | ||||||
| @user_can(Permissions.EDIT_PORTFOLIO_USERS, message="revoke invitation") | @user_can(Permissions.EDIT_PORTFOLIO_USERS, message="revoke invitation") | ||||||
| def revoke_invitation(portfolio_id, token): | def revoke_invitation(portfolio_id, portfolio_token): | ||||||
|     PortfolioInvitations.revoke(token) |     PortfolioInvitations.revoke(portfolio_token) | ||||||
|  |  | ||||||
|     return redirect( |     return redirect( | ||||||
|         url_for( |         url_for( | ||||||
| @@ -52,11 +52,11 @@ def revoke_invitation(portfolio_id, token): | |||||||
|  |  | ||||||
|  |  | ||||||
| @portfolios_bp.route( | @portfolios_bp.route( | ||||||
|     "/portfolios/<portfolio_id>/invitations/<token>/resend", methods=["POST"] |     "/portfolios/<portfolio_id>/invitations/<portfolio_token>/resend", methods=["POST"] | ||||||
| ) | ) | ||||||
| @user_can(Permissions.EDIT_PORTFOLIO_USERS, message="resend invitation") | @user_can(Permissions.EDIT_PORTFOLIO_USERS, message="resend invitation") | ||||||
| def resend_invitation(portfolio_id, token): | def resend_invitation(portfolio_id, portfolio_token): | ||||||
|     invite = PortfolioInvitations.resend(g.current_user, token) |     invite = PortfolioInvitations.resend(g.current_user, portfolio_token) | ||||||
|     send_invite_email(g.current_user.full_name, invite.token, invite.email) |     send_invite_email(g.current_user.full_name, invite.token, invite.email) | ||||||
|     flash("resend_portfolio_invitation", user_name=invite.user_name) |     flash("resend_portfolio_invitation", user_name=invite.user_name) | ||||||
|     return redirect( |     return redirect( | ||||||
|   | |||||||
| @@ -1,33 +1,51 @@ | |||||||
| from operator import attrgetter | from operator import attrgetter | ||||||
|  |  | ||||||
| from flask import request as http_request, g | from flask import g | ||||||
| from sqlalchemy.orm.exc import NoResultFound | from sqlalchemy.orm.exc import NoResultFound | ||||||
|  |  | ||||||
| from atst.database import db | from atst.database import db | ||||||
| from atst.domain.authz import Authorization | from atst.domain.authz import Authorization | ||||||
| from atst.models import Application, Environment, Portfolio, TaskOrder |  | ||||||
| from atst.models.permissions import Permissions |  | ||||||
| from atst.domain.portfolios.scopes import ScopedPortfolio | from atst.domain.portfolios.scopes import ScopedPortfolio | ||||||
|  | from atst.models import ( | ||||||
|  |     Application, | ||||||
|  |     Environment, | ||||||
|  |     Permissions, | ||||||
|  |     Portfolio, | ||||||
|  |     PortfolioInvitation, | ||||||
|  |     PortfolioRole, | ||||||
|  |     TaskOrder, | ||||||
|  | ) | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_portfolio_from_context(view_args): | def get_resources_from_context(view_args): | ||||||
|     query = None |     query = None | ||||||
|  |  | ||||||
|     if "portfolio_id" in view_args: |     if "portfolio_token" in view_args: | ||||||
|  |         query = ( | ||||||
|  |             db.session.query(Portfolio) | ||||||
|  |             .join(PortfolioRole, PortfolioRole.portfolio_id == Portfolio.id) | ||||||
|  |             .join( | ||||||
|  |                 PortfolioInvitation, | ||||||
|  |                 PortfolioInvitation.portfolio_role_id == PortfolioRole.id, | ||||||
|  |             ) | ||||||
|  |             .filter(PortfolioInvitation.token == view_args["portfolio_token"]) | ||||||
|  |         ) | ||||||
|  |  | ||||||
|  |     elif "portfolio_id" in view_args: | ||||||
|         query = db.session.query(Portfolio).filter( |         query = db.session.query(Portfolio).filter( | ||||||
|             Portfolio.id == view_args["portfolio_id"] |             Portfolio.id == view_args["portfolio_id"] | ||||||
|         ) |         ) | ||||||
|  |  | ||||||
|     elif "application_id" in view_args: |     elif "application_id" in view_args: | ||||||
|         query = ( |         query = ( | ||||||
|             db.session.query(Portfolio) |             db.session.query(Portfolio, Application) | ||||||
|             .join(Application, Application.portfolio_id == Portfolio.id) |             .join(Application, Application.portfolio_id == Portfolio.id) | ||||||
|             .filter(Application.id == view_args["application_id"]) |             .filter(Application.id == view_args["application_id"]) | ||||||
|         ) |         ) | ||||||
|  |  | ||||||
|     elif "environment_id" in view_args: |     elif "environment_id" in view_args: | ||||||
|         query = ( |         query = ( | ||||||
|             db.session.query(Portfolio) |             db.session.query(Portfolio, Application) | ||||||
|             .join(Application, Application.portfolio_id == Portfolio.id) |             .join(Application, Application.portfolio_id == Portfolio.id) | ||||||
|             .join(Environment, Environment.application_id == Application.id) |             .join(Environment, Environment.application_id == Application.id) | ||||||
|             .filter(Environment.id == view_args["environment_id"]) |             .filter(Environment.id == view_args["environment_id"]) | ||||||
| @@ -35,33 +53,51 @@ def get_portfolio_from_context(view_args): | |||||||
|  |  | ||||||
|     elif "task_order_id" in view_args: |     elif "task_order_id" in view_args: | ||||||
|         query = ( |         query = ( | ||||||
|             db.session.query(Portfolio) |             db.session.query(Portfolio, TaskOrder) | ||||||
|             .join(TaskOrder, TaskOrder.portfolio_id == Portfolio.id) |             .join(TaskOrder, TaskOrder.portfolio_id == Portfolio.id) | ||||||
|             .filter(TaskOrder.id == view_args["task_order_id"]) |             .filter(TaskOrder.id == view_args["task_order_id"]) | ||||||
|         ) |         ) | ||||||
|  |  | ||||||
|     if query: |     if query: | ||||||
|         try: |         try: | ||||||
|             portfolio = query.one() |             return query.only_return_tuples(True).one() | ||||||
|  |  | ||||||
|             return ScopedPortfolio(g.current_user, portfolio) |  | ||||||
|         except NoResultFound: |         except NoResultFound: | ||||||
|             raise NotFoundError("portfolio") |             raise NotFoundError("portfolio") | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def assign_resources(view_args): | ||||||
|  |     g.portfolio = None | ||||||
|  |     g.application = None | ||||||
|  |     g.task_order = None | ||||||
|  |  | ||||||
|  |     resources = get_resources_from_context(view_args) | ||||||
|  |     if resources: | ||||||
|  |         for resource in resources: | ||||||
|  |             if isinstance(resource, Portfolio): | ||||||
|  |                 g.portfolio = ScopedPortfolio(g.current_user, resource) | ||||||
|  |             elif isinstance(resource, Application): | ||||||
|  |                 g.application = resource | ||||||
|  |             elif isinstance(resource, TaskOrder): | ||||||
|  |                 g.task_order = resource | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def user_can_view(permission): | ||||||
|  |     if g.application: | ||||||
|  |         return Authorization.has_application_permission( | ||||||
|  |             g.current_user, g.application, permission | ||||||
|  |         ) | ||||||
|  |     elif g.portfolio: | ||||||
|  |         return Authorization.has_portfolio_permission( | ||||||
|  |             g.current_user, g.portfolio, permission | ||||||
|  |         ) | ||||||
|  |     else: | ||||||
|  |         return Authorization.has_atat_permission(g.current_user, permission) | ||||||
|  |  | ||||||
|  |  | ||||||
| def portfolio(): | def portfolio(): | ||||||
|     portfolio = get_portfolio_from_context(http_request.view_args) |     if g.portfolio is not None: | ||||||
|  |  | ||||||
|     def user_can(permission): |  | ||||||
|         if portfolio: |  | ||||||
|             return Authorization.has_portfolio_permission( |  | ||||||
|                 g.current_user, portfolio, permission |  | ||||||
|             ) |  | ||||||
|         return False |  | ||||||
|  |  | ||||||
|     if not portfolio is None: |  | ||||||
|         active_task_orders = [ |         active_task_orders = [ | ||||||
|             task_order for task_order in portfolio.task_orders if task_order.is_active |             task_order for task_order in g.portfolio.task_orders if task_order.is_active | ||||||
|         ] |         ] | ||||||
|         funding_end_date = ( |         funding_end_date = ( | ||||||
|             sorted(active_task_orders, key=attrgetter("end_date"))[-1].end_date |             sorted(active_task_orders, key=attrgetter("end_date"))[-1].end_date | ||||||
| @@ -74,9 +110,9 @@ def portfolio(): | |||||||
|         funded = None |         funded = None | ||||||
|  |  | ||||||
|     return { |     return { | ||||||
|         "portfolio": portfolio, |         "portfolio": g.portfolio, | ||||||
|         "permissions": Permissions, |         "permissions": Permissions, | ||||||
|         "user_can": user_can, |         "user_can": user_can_view, | ||||||
|         "funding_end_date": funding_end_date, |         "funding_end_date": funding_end_date, | ||||||
|         "funded": funded, |         "funded": funded, | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -5,6 +5,6 @@ | |||||||
| Join this JEDI Cloud Portfolio | Join this JEDI Cloud Portfolio | ||||||
| {{ owner }} has invited you to join a JEDI Cloud Portfolio.  Login now to view or use your JEDI Cloud resources. | {{ owner }} has invited you to join a JEDI Cloud Portfolio.  Login now to view or use your JEDI Cloud resources. | ||||||
|  |  | ||||||
| {{ url_for("portfolios.accept_invitation", token=token, _external=True) }} | {{ url_for("portfolios.accept_invitation", portfolio_token=token, _external=True) }} | ||||||
|  |  | ||||||
| {% endblock %} | {% endblock %} | ||||||
|   | |||||||
| @@ -153,7 +153,7 @@ def test_user_can_access_decorator_atat_level(set_current_user): | |||||||
|         _access_activity_log() |         _access_activity_log() | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_user_can_access_decorator_portfolio_level(set_current_user): | def test_user_can_access_decorator_portfolio_level(set_current_user, request_ctx): | ||||||
|     ccpo = UserFactory.create_ccpo() |     ccpo = UserFactory.create_ccpo() | ||||||
|     edit_admin = UserFactory.create() |     edit_admin = UserFactory.create() | ||||||
|     view_admin = UserFactory.create() |     view_admin = UserFactory.create() | ||||||
| @@ -162,6 +162,9 @@ def test_user_can_access_decorator_portfolio_level(set_current_user): | |||||||
|     # factory gives view perms by default |     # factory gives view perms by default | ||||||
|     PortfolioRoleFactory.create(user=view_admin, portfolio=portfolio) |     PortfolioRoleFactory.create(user=view_admin, portfolio=portfolio) | ||||||
|  |  | ||||||
|  |     request_ctx.g.portfolio = portfolio | ||||||
|  |     request_ctx.g.application = None | ||||||
|  |  | ||||||
|     @user_can_access_decorator(Permissions.EDIT_PORTFOLIO_NAME) |     @user_can_access_decorator(Permissions.EDIT_PORTFOLIO_NAME) | ||||||
|     def _edit_portfolio_name(*args, **kwargs): |     def _edit_portfolio_name(*args, **kwargs): | ||||||
|         return True |         return True | ||||||
| @@ -177,7 +180,7 @@ def test_user_can_access_decorator_portfolio_level(set_current_user): | |||||||
|         _edit_portfolio_name(portfolio_id=portfolio.id) |         _edit_portfolio_name(portfolio_id=portfolio.id) | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_user_can_access_decorator_application_level(set_current_user): | def test_user_can_access_decorator_application_level(set_current_user, request_ctx): | ||||||
|     ccpo = UserFactory.create_ccpo() |     ccpo = UserFactory.create_ccpo() | ||||||
|     port_admin = UserFactory.create() |     port_admin = UserFactory.create() | ||||||
|     app_user = UserFactory.create() |     app_user = UserFactory.create() | ||||||
| @@ -189,6 +192,9 @@ def test_user_can_access_decorator_application_level(set_current_user): | |||||||
|     app = portfolio.applications[0] |     app = portfolio.applications[0] | ||||||
|     ApplicationRoleFactory.create(application=app, user=app_user) |     ApplicationRoleFactory.create(application=app, user=app_user) | ||||||
|  |  | ||||||
|  |     request_ctx.g.portfolio = portfolio | ||||||
|  |     request_ctx.g.application = app | ||||||
|  |  | ||||||
|     @user_can_access_decorator(Permissions.VIEW_APPLICATION) |     @user_can_access_decorator(Permissions.VIEW_APPLICATION) | ||||||
|     def _stroll_into_mos_eisley(*args, **kwargs): |     def _stroll_into_mos_eisley(*args, **kwargs): | ||||||
|         return True |         return True | ||||||
|   | |||||||
| @@ -27,7 +27,9 @@ def test_existing_member_accepts_valid_invite(client, user_session): | |||||||
|     assert len(Portfolios.for_user(user)) == 0 |     assert len(Portfolios.for_user(user)) == 0 | ||||||
|  |  | ||||||
|     user_session(user) |     user_session(user) | ||||||
|     response = client.get(url_for("portfolios.accept_invitation", token=invite.token)) |     response = client.get( | ||||||
|  |         url_for("portfolios.accept_invitation", portfolio_token=invite.token) | ||||||
|  |     ) | ||||||
|  |  | ||||||
|     # user is redirected to the portfolio view |     # user is redirected to the portfolio view | ||||||
|     assert response.status_code == 302 |     assert response.status_code == 302 | ||||||
| @@ -68,7 +70,9 @@ def test_new_member_accepts_valid_invite(monkeypatch, client, user_session): | |||||||
|         "atst.domain.auth.should_redirect_to_user_profile", lambda *args: False |         "atst.domain.auth.should_redirect_to_user_profile", lambda *args: False | ||||||
|     ) |     ) | ||||||
|     user_session(user) |     user_session(user) | ||||||
|     response = client.get(url_for("portfolios.accept_invitation", token=token)) |     response = client.get( | ||||||
|  |         url_for("portfolios.accept_invitation", portfolio_token=token) | ||||||
|  |     ) | ||||||
|  |  | ||||||
|     # user is redirected to the portfolio view |     # user is redirected to the portfolio view | ||||||
|     assert response.status_code == 302 |     assert response.status_code == 302 | ||||||
| @@ -90,7 +94,9 @@ def test_member_accepts_invalid_invite(client, user_session): | |||||||
|         user_id=user.id, role=ws_role, status=InvitationStatus.REJECTED_WRONG_USER |         user_id=user.id, role=ws_role, status=InvitationStatus.REJECTED_WRONG_USER | ||||||
|     ) |     ) | ||||||
|     user_session(user) |     user_session(user) | ||||||
|     response = client.get(url_for("portfolios.accept_invitation", token=invite.token)) |     response = client.get( | ||||||
|  |         url_for("portfolios.accept_invitation", portfolio_token=invite.token) | ||||||
|  |     ) | ||||||
|  |  | ||||||
|     assert response.status_code == 404 |     assert response.status_code == 404 | ||||||
|  |  | ||||||
| @@ -121,7 +127,9 @@ def test_user_accepts_invite_with_wrong_dod_id(client, user_session): | |||||||
|     ) |     ) | ||||||
|     invite = PortfolioInvitationFactory.create(user_id=user.id, role=ws_role) |     invite = PortfolioInvitationFactory.create(user_id=user.id, role=ws_role) | ||||||
|     user_session(different_user) |     user_session(different_user) | ||||||
|     response = client.get(url_for("portfolios.accept_invitation", token=invite.token)) |     response = client.get( | ||||||
|  |         url_for("portfolios.accept_invitation", portfolio_token=invite.token) | ||||||
|  |     ) | ||||||
|  |  | ||||||
|     assert response.status_code == 404 |     assert response.status_code == 404 | ||||||
|  |  | ||||||
| @@ -139,7 +147,9 @@ def test_user_accepts_expired_invite(client, user_session): | |||||||
|         expiration_time=datetime.datetime.now() - datetime.timedelta(seconds=1), |         expiration_time=datetime.datetime.now() - datetime.timedelta(seconds=1), | ||||||
|     ) |     ) | ||||||
|     user_session(user) |     user_session(user) | ||||||
|     response = client.get(url_for("portfolios.accept_invitation", token=invite.token)) |     response = client.get( | ||||||
|  |         url_for("portfolios.accept_invitation", portfolio_token=invite.token) | ||||||
|  |     ) | ||||||
|  |  | ||||||
|     assert response.status_code == 404 |     assert response.status_code == 404 | ||||||
|  |  | ||||||
| @@ -161,7 +171,7 @@ def test_revoke_invitation(client, user_session): | |||||||
|         url_for( |         url_for( | ||||||
|             "portfolios.revoke_invitation", |             "portfolios.revoke_invitation", | ||||||
|             portfolio_id=portfolio.id, |             portfolio_id=portfolio.id, | ||||||
|             token=invite.token, |             portfolio_token=invite.token, | ||||||
|         ) |         ) | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
| @@ -187,7 +197,7 @@ def test_user_can_only_revoke_invites_in_their_portfolio(client, user_session): | |||||||
|         url_for( |         url_for( | ||||||
|             "portfolios.revoke_invitation", |             "portfolios.revoke_invitation", | ||||||
|             portfolio_id=portfolio.id, |             portfolio_id=portfolio.id, | ||||||
|             token=invite.token, |             portfolio_token=invite.token, | ||||||
|         ) |         ) | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
| @@ -213,7 +223,7 @@ def test_user_can_only_resend_invites_in_their_portfolio(client, user_session, q | |||||||
|         url_for( |         url_for( | ||||||
|             "portfolios.resend_invitation", |             "portfolios.resend_invitation", | ||||||
|             portfolio_id=portfolio.id, |             portfolio_id=portfolio.id, | ||||||
|             token=invite.token, |             portfolio_token=invite.token, | ||||||
|         ) |         ) | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
| @@ -235,7 +245,7 @@ def test_resend_invitation_sends_email(client, user_session, queue): | |||||||
|         url_for( |         url_for( | ||||||
|             "portfolios.resend_invitation", |             "portfolios.resend_invitation", | ||||||
|             portfolio_id=portfolio.id, |             portfolio_id=portfolio.id, | ||||||
|             token=invite.token, |             portfolio_token=invite.token, | ||||||
|         ) |         ) | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
| @@ -261,7 +271,7 @@ def test_existing_member_invite_resent_to_email_submitted_in_form( | |||||||
|         url_for( |         url_for( | ||||||
|             "portfolios.resend_invitation", |             "portfolios.resend_invitation", | ||||||
|             portfolio_id=portfolio.id, |             portfolio_id=portfolio.id, | ||||||
|             token=invite.token, |             portfolio_token=invite.token, | ||||||
|         ) |         ) | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
| @@ -295,7 +305,9 @@ def test_contracting_officer_accepts_invite(monkeypatch, client, user_session): | |||||||
|         "atst.domain.auth.should_redirect_to_user_profile", lambda *args: False |         "atst.domain.auth.should_redirect_to_user_profile", lambda *args: False | ||||||
|     ) |     ) | ||||||
|     user_session(user) |     user_session(user) | ||||||
|     response = client.get(url_for("portfolios.accept_invitation", token=token)) |     response = client.get( | ||||||
|  |         url_for("portfolios.accept_invitation", portfolio_token=token) | ||||||
|  |     ) | ||||||
|  |  | ||||||
|     # user is redirected to the task order review page |     # user is redirected to the task order review page | ||||||
|     assert response.status_code == 302 |     assert response.status_code == 302 | ||||||
| @@ -329,7 +341,9 @@ def test_cor_accepts_invite(monkeypatch, client, user_session): | |||||||
|         "atst.domain.auth.should_redirect_to_user_profile", lambda *args: False |         "atst.domain.auth.should_redirect_to_user_profile", lambda *args: False | ||||||
|     ) |     ) | ||||||
|     user_session(user) |     user_session(user) | ||||||
|     response = client.get(url_for("portfolios.accept_invitation", token=token)) |     response = client.get( | ||||||
|  |         url_for("portfolios.accept_invitation", portfolio_token=token) | ||||||
|  |     ) | ||||||
|  |  | ||||||
|     # user is redirected to the task order review page |     # user is redirected to the task order review page | ||||||
|     assert response.status_code == 302 |     assert response.status_code == 302 | ||||||
| @@ -363,7 +377,9 @@ def test_so_accepts_invite(monkeypatch, client, user_session): | |||||||
|         "atst.domain.auth.should_redirect_to_user_profile", lambda *args: False |         "atst.domain.auth.should_redirect_to_user_profile", lambda *args: False | ||||||
|     ) |     ) | ||||||
|     user_session(user) |     user_session(user) | ||||||
|     response = client.get(url_for("portfolios.accept_invitation", token=token)) |     response = client.get( | ||||||
|  |         url_for("portfolios.accept_invitation", portfolio_token=token) | ||||||
|  |     ) | ||||||
|  |  | ||||||
|     # user is redirected to the task order review page |     # user is redirected to the task order review page | ||||||
|     assert response.status_code == 302 |     assert response.status_code == 302 | ||||||
|   | |||||||
| @@ -78,9 +78,7 @@ def test_all_protected_routes_have_access_control( | |||||||
|     monkeypatch.setattr( |     monkeypatch.setattr( | ||||||
|         "atst.domain.invitations.PortfolioInvitations._get", lambda *a: Mock() |         "atst.domain.invitations.PortfolioInvitations._get", lambda *a: Mock() | ||||||
|     ) |     ) | ||||||
|     monkeypatch.setattr( |     monkeypatch.setattr("atst.app.assign_resources", lambda *a: None) | ||||||
|         "atst.utils.context_processors.get_portfolio_from_context", lambda *a: None |  | ||||||
|     ) |  | ||||||
|  |  | ||||||
|     # patch the internal function the access decorator uses so that |     # patch the internal function the access decorator uses so that | ||||||
|     # we can check that it was called |     # we can check that it was called | ||||||
| @@ -413,7 +411,9 @@ def test_portfolios_resend_invitation_access(post_url_assert_status): | |||||||
|     invite = PortfolioInvitationFactory.create(user=UserFactory.create(), role=prr) |     invite = PortfolioInvitationFactory.create(user=UserFactory.create(), role=prr) | ||||||
|  |  | ||||||
|     url = url_for( |     url = url_for( | ||||||
|         "portfolios.resend_invitation", portfolio_id=portfolio.id, token=invite.token |         "portfolios.resend_invitation", | ||||||
|  |         portfolio_id=portfolio.id, | ||||||
|  |         portfolio_token=invite.token, | ||||||
|     ) |     ) | ||||||
|     post_url_assert_status(ccpo, url, 302) |     post_url_assert_status(ccpo, url, 302) | ||||||
|     post_url_assert_status(owner, url, 302) |     post_url_assert_status(owner, url, 302) | ||||||
| @@ -461,7 +461,7 @@ def test_portfolios_revoke_invitation_access(post_url_assert_status): | |||||||
|         url = url_for( |         url = url_for( | ||||||
|             "portfolios.revoke_invitation", |             "portfolios.revoke_invitation", | ||||||
|             portfolio_id=portfolio.id, |             portfolio_id=portfolio.id, | ||||||
|             token=invite.token, |             portfolio_token=invite.token, | ||||||
|         ) |         ) | ||||||
|         post_url_assert_status(user, url, status) |         post_url_assert_status(user, url, status) | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										65
									
								
								tests/utils/test_context_processors.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										65
									
								
								tests/utils/test_context_processors.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,65 @@ | |||||||
|  | import pytest | ||||||
|  |  | ||||||
|  | from atst.domain.permission_sets import PermissionSets | ||||||
|  | from atst.models import Permissions | ||||||
|  | from atst.utils.context_processors import get_resources_from_context, user_can_view | ||||||
|  |  | ||||||
|  | from tests.factories import * | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def test_get_resources_from_context(): | ||||||
|  |     portfolio = PortfolioFactory.create() | ||||||
|  |     task_order = TaskOrderFactory.create(portfolio=portfolio) | ||||||
|  |     application = ApplicationFactory.create(portfolio=portfolio) | ||||||
|  |     environment = EnvironmentFactory.create(application=application) | ||||||
|  |  | ||||||
|  |     assert get_resources_from_context({"portfolio_id": portfolio.id}) == (portfolio,) | ||||||
|  |     assert get_resources_from_context({"application_id": application.id}) == ( | ||||||
|  |         portfolio, | ||||||
|  |         application, | ||||||
|  |     ) | ||||||
|  |     assert get_resources_from_context({"environment_id": environment.id}) == ( | ||||||
|  |         portfolio, | ||||||
|  |         application, | ||||||
|  |     ) | ||||||
|  |     assert get_resources_from_context({"task_order_id": task_order.id}) == ( | ||||||
|  |         portfolio, | ||||||
|  |         task_order, | ||||||
|  |     ) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @pytest.fixture | ||||||
|  | def set_g(request_ctx): | ||||||
|  |     def _set_g(attr, val): | ||||||
|  |         setattr(request_ctx.g, attr, val) | ||||||
|  |  | ||||||
|  |     yield _set_g | ||||||
|  |  | ||||||
|  |     setattr(request_ctx.g, "application", None) | ||||||
|  |     setattr(request_ctx.g, "portfolio", None) | ||||||
|  |     setattr(request_ctx.g, "current_user", None) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def test_user_can_view(set_g): | ||||||
|  |     owner = UserFactory.create() | ||||||
|  |     app_user = UserFactory.create() | ||||||
|  |     rando = UserFactory.create() | ||||||
|  |  | ||||||
|  |     portfolio = PortfolioFactory.create(owner=owner) | ||||||
|  |     application = ApplicationFactory.create(portfolio=portfolio) | ||||||
|  |     ApplicationRoleFactory.create( | ||||||
|  |         user=app_user, | ||||||
|  |         application=application, | ||||||
|  |         permission_sets=PermissionSets.get_many([PermissionSets.VIEW_APPLICATION]), | ||||||
|  |     ) | ||||||
|  |  | ||||||
|  |     set_g("portfolio", portfolio) | ||||||
|  |     set_g("application", application) | ||||||
|  |     set_g("current_user", owner) | ||||||
|  |     assert user_can_view(Permissions.VIEW_APPLICATION) | ||||||
|  |  | ||||||
|  |     set_g("current_user", app_user) | ||||||
|  |     assert user_can_view(Permissions.VIEW_APPLICATION) | ||||||
|  |  | ||||||
|  |     set_g("current_user", rando) | ||||||
|  |     assert not user_can_view(Permissions.VIEW_APPLICATION) | ||||||
		Reference in New Issue
	
	Block a user