From c1df245800665887186f4e1474c2c6cc221ecb24 Mon Sep 17 00:00:00 2001 From: dandds Date: Tue, 16 Apr 2019 10:32:42 -0400 Subject: [PATCH] Scope access to applications, task orders, and environment roles. These resources should be scoped to the portfolio when accessed from route functions. --- atst/domain/environment_roles.py | 21 ++++++++++++++- atst/routes/portfolios/applications.py | 14 ++++++---- atst/routes/portfolios/members.py | 2 +- atst/routes/portfolios/task_orders.py | 24 ++++++++--------- atst/routes/task_orders/new.py | 9 ++++++- atst/routes/task_orders/signing.py | 2 ++ tests/domain/test_environment_roles.py | 27 ++++++++++++++++++++ tests/routes/portfolios/test_applications.py | 16 ++++++++++++ 8 files changed, 95 insertions(+), 20 deletions(-) create mode 100644 tests/domain/test_environment_roles.py diff --git a/atst/domain/environment_roles.py b/atst/domain/environment_roles.py index d5200ac4..ab5a082c 100644 --- a/atst/domain/environment_roles.py +++ b/atst/domain/environment_roles.py @@ -1,7 +1,9 @@ from flask import current_app as app +from sqlalchemy.orm.exc import NoResultFound -from atst.models.environment_role import EnvironmentRole from atst.database import db +from atst.domain.exceptions import NotFoundError +from atst.models import EnvironmentRole, Environment, Application class EnvironmentRoles(object): @@ -13,6 +15,23 @@ class EnvironmentRoles(object): app.csp.cloud.create_role(env_role) return env_role + @classmethod + def get_for_portfolio(cls, user_id, environment_id, portfolio_id): + try: + return ( + db.session.query(EnvironmentRole) + .join(Environment, EnvironmentRole.environment_id == Environment.id) + .join(Application, Environment.application_id == Application.id) + .filter( + EnvironmentRole.user_id == user_id, + EnvironmentRole.environment_id == environment_id, + Application.portfolio_id == portfolio_id, + ) + .one() + ) + except NoResultFound: + raise NotFoundError("environment_role") + @classmethod def get(cls, user_id, environment_id): existing_env_role = ( diff --git a/atst/routes/portfolios/applications.py b/atst/routes/portfolios/applications.py index c4776f30..a00a4f66 100644 --- a/atst/routes/portfolios/applications.py +++ b/atst/routes/portfolios/applications.py @@ -64,7 +64,7 @@ def get_environments_obj_for_app(application): @portfolios_bp.route("/portfolios//applications//edit") @user_can(Permissions.EDIT_APPLICATION, message="view application edit form") def edit_application(portfolio_id, application_id): - application = Applications.get(application_id) + application = Applications.get(application_id, portfolio_id=portfolio_id) form = ApplicationForm(name=application.name, description=application.description) return render_template( @@ -80,7 +80,7 @@ def edit_application(portfolio_id, application_id): ) @user_can(Permissions.EDIT_APPLICATION, message="update application") def update_application(portfolio_id, application_id): - application = Applications.get(application_id) + application = Applications.get(application_id, portfolio_id=portfolio_id) form = ApplicationForm(http_request.form) if form.validate(): application_data = form.data @@ -101,7 +101,9 @@ def update_application(portfolio_id, application_id): def wrap_environment_role_lookup( user, portfolio_id=None, environment_id=None, **kwargs ): - env_role = EnvironmentRoles.get(user.id, environment_id) + env_role = EnvironmentRoles.get_for_portfolio( + user.id, environment_id, portfolio_id=portfolio_id + ) if not env_role: raise UnauthorizedError(user, "access environment {}".format(environment_id)) @@ -111,7 +113,9 @@ def wrap_environment_role_lookup( @portfolios_bp.route("/portfolios//environments//access") @user_can(None, override=wrap_environment_role_lookup, message="access environment") def access_environment(portfolio_id, environment_id): - env_role = EnvironmentRoles.get(g.current_user.id, environment_id) + env_role = EnvironmentRoles.get_for_portfolio( + g.current_user.id, environment_id, portfolio_id=portfolio_id + ) token = app.csp.cloud.get_access_token(env_role) return redirect(url_for("atst.csp_environment_access", token=token)) @@ -122,7 +126,7 @@ def access_environment(portfolio_id, environment_id): ) @user_can(Permissions.DELETE_APPLICATION, message="delete application") def delete_application(portfolio_id, application_id): - application = Applications.get(application_id) + application = Applications.get(application_id, portfolio_id=portfolio_id) Applications.delete(application) flash("application_deleted", application_name=application.name) diff --git a/atst/routes/portfolios/members.py b/atst/routes/portfolios/members.py index d87b526c..76187dc7 100644 --- a/atst/routes/portfolios/members.py +++ b/atst/routes/portfolios/members.py @@ -50,7 +50,7 @@ def portfolio_members(portfolio_id): @user_can(Permissions.VIEW_APPLICATION_MEMBER, message="view application members") def application_members(portfolio_id, application_id): portfolio = Portfolios.get_for_update(portfolio_id) - application = Applications.get(application_id) + application = Applications.get(application_id, portfolio_id=portfolio_id) # TODO: this should show only members that have env roles in this application members_list = [serialize_portfolio_role(k) for k in portfolio.members] diff --git a/atst/routes/portfolios/task_orders.py b/atst/routes/portfolios/task_orders.py index 4b9f5b4c..35b3eb32 100644 --- a/atst/routes/portfolios/task_orders.py +++ b/atst/routes/portfolios/task_orders.py @@ -70,7 +70,7 @@ def portfolio_funding(portfolio_id): @portfolios_bp.route("/portfolios//task_order/") @user_can(Permissions.VIEW_TASK_ORDER_DETAILS, message="view task order details") def view_task_order(portfolio_id, task_order_id): - task_order = TaskOrders.get(task_order_id) + task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id) to_form_complete = TaskOrders.all_sections_complete(task_order) dd_254_complete = DD254s.is_complete(task_order.dd_254) return render_template( @@ -86,8 +86,8 @@ def view_task_order(portfolio_id, task_order_id): ) -def wrap_check_is_ko_or_cor(user, task_order_id=None, **_kwargs): - task_order = TaskOrders.get(task_order_id) +def wrap_check_is_ko_or_cor(user, task_order_id=None, portfolio_id=None, **_kwargs): + task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id) Authorization.check_is_ko_or_cor(user, task_order) return True @@ -100,7 +100,7 @@ def wrap_check_is_ko_or_cor(user, task_order_id=None, **_kwargs): message="view contracting officer review form", ) def ko_review(portfolio_id, task_order_id): - task_order = TaskOrders.get(task_order_id) + task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id) if TaskOrders.all_sections_complete(task_order): return render_template( @@ -127,7 +127,7 @@ def resend_invite(portfolio_id, task_order_id): invite_type_info = OFFICER_INVITATIONS[invite_type] - task_order = TaskOrders.get(task_order_id) + task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id) portfolio = Portfolios.get(g.current_user, portfolio_id) officer = getattr(task_order, invite_type_info["role"]) @@ -180,7 +180,7 @@ def resend_invite(portfolio_id, task_order_id): None, override=wrap_check_is_ko_or_cor, message="submit contracting officer review" ) def submit_ko_review(portfolio_id, task_order_id, form=None): - task_order = TaskOrders.get(task_order_id) + task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id) form_data = {**http_request.form, **http_request.files} form = KOReviewForm(form_data) @@ -213,7 +213,7 @@ def submit_ko_review(portfolio_id, task_order_id, form=None): Permissions.EDIT_TASK_ORDER_DETAILS, message="view task order invitations page" ) def task_order_invitations(portfolio_id, task_order_id): - task_order = TaskOrders.get(task_order_id) + task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id) form = EditTaskOrderOfficersForm(obj=task_order) if TaskOrders.all_sections_complete(task_order): @@ -233,7 +233,7 @@ def task_order_invitations(portfolio_id, task_order_id): ) @user_can(Permissions.EDIT_TASK_ORDER_DETAILS, message="edit task order invitations") def edit_task_order_invitations(portfolio_id, task_order_id): - task_order = TaskOrders.get(task_order_id) + task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id) form = EditTaskOrderOfficersForm(formdata=http_request.form, obj=task_order) if form.validate(): @@ -277,8 +277,8 @@ def so_review_form(task_order): return DD254Form(data=form_data) -def wrap_check_is_so(user, task_order_id=None, **_kwargs): - task_order = TaskOrders.get(task_order_id) +def wrap_check_is_so(user, task_order_id=None, portfolio_id=None, **_kwargs): + task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id) Authorization.check_is_so(user, task_order) return True @@ -287,7 +287,7 @@ def wrap_check_is_so(user, task_order_id=None, **_kwargs): @portfolios_bp.route("/portfolios//task_order//dd254") @user_can(None, override=wrap_check_is_so, message="view security officer review form") def so_review(portfolio_id, task_order_id): - task_order = TaskOrders.get(task_order_id) + task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id) form = so_review_form(task_order) return render_template( @@ -302,7 +302,7 @@ def so_review(portfolio_id, task_order_id): None, override=wrap_check_is_so, message="submit security officer review form" ) def submit_so_review(portfolio_id, task_order_id): - task_order = TaskOrders.get(task_order_id) + task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id) form = DD254Form(http_request.form) if form.validate(): diff --git a/atst/routes/task_orders/new.py b/atst/routes/task_orders/new.py index 7ef462a1..685b3563 100644 --- a/atst/routes/task_orders/new.py +++ b/atst/routes/task_orders/new.py @@ -61,7 +61,12 @@ class ShowTaskOrderWorkflow: @property def task_order(self): if not self._task_order and self.task_order_id: - self._task_order = TaskOrders.get(self.task_order_id) + if self.portfolio_id: + self._task_order = TaskOrders.get( + self.task_order_id, portfolio_id=self.portfolio_id + ) + else: + self._task_order = TaskOrders.get(self.task_order_id) return self._task_order @@ -260,6 +265,7 @@ def is_new_task_order(*_args, **kwargs): ) +# TODO: /task_orders/new// should not exist @task_orders_bp.route("/task_orders/new/") @task_orders_bp.route("/task_orders/new//") @task_orders_bp.route("/portfolios//task_orders/new/") @@ -309,6 +315,7 @@ def new(screen, task_order_id=None, portfolio_id=None): return render_template(workflow.template, **template_args) +# TODO: /task_orders/new// should not exist @task_orders_bp.route("/task_orders/new/", methods=["POST"]) @task_orders_bp.route("/task_orders/new//", methods=["POST"]) @task_orders_bp.route( diff --git a/atst/routes/task_orders/signing.py b/atst/routes/task_orders/signing.py index 74f4f561..5f40bea8 100644 --- a/atst/routes/task_orders/signing.py +++ b/atst/routes/task_orders/signing.py @@ -27,6 +27,7 @@ def wrap_check_is_ko(user, task_order_id=None, **_kwargs): return True +# TODO: this route needs to be moved under the portfolio blueprint @task_orders_bp.route("/task_orders//digital_signature", methods=["GET"]) @user_can( None, override=wrap_check_is_ko, message="view contracting officer signature page" @@ -42,6 +43,7 @@ def signature_requested(task_order_id): ) +# TODO: this route needs to be moved under the portfolio blueprint @task_orders_bp.route( "/task_orders//digital_signature", methods=["POST"] ) diff --git a/tests/domain/test_environment_roles.py b/tests/domain/test_environment_roles.py new file mode 100644 index 00000000..f2f1a5a2 --- /dev/null +++ b/tests/domain/test_environment_roles.py @@ -0,0 +1,27 @@ +import pytest + +from atst.domain.environment_roles import EnvironmentRoles +from atst.domain.exceptions import NotFoundError + +from tests.factories import * + + +def test_get_for_portfolio(): + user = UserFactory.create() + portfolio = PortfolioFactory.create() + application = ApplicationFactory.create(portfolio=portfolio) + environment = EnvironmentFactory.create(application=application) + env_role = EnvironmentRoleFactory.create( + environment=environment, user=user, role="basic access" + ) + + assert ( + EnvironmentRoles.get_for_portfolio( + user.id, environment.id, portfolio_id=portfolio.id + ) + == env_role + ) + with pytest.raises(NotFoundError): + EnvironmentRoles.get_for_portfolio( + user.id, environment.id, portfolio_id=application.id + ) diff --git a/tests/routes/portfolios/test_applications.py b/tests/routes/portfolios/test_applications.py index 8df582e3..e9168685 100644 --- a/tests/routes/portfolios/test_applications.py +++ b/tests/routes/portfolios/test_applications.py @@ -329,3 +329,19 @@ def test_delete_application(client, user_session): # app and envs are soft deleted assert len(port.applications) == 0 assert len(application.environments) == 0 + + +def test_edit_application_scope(client, user_session): + owner = UserFactory.create() + port1 = PortfolioFactory.create(owner=owner, applications=[{"name": "first app"}]) + port2 = PortfolioFactory.create(owner=owner, applications=[{"name": "second app"}]) + + user_session(owner) + response = client.get( + url_for( + "portfolios.edit_application", + portfolio_id=port2.id, + application_id=port1.applications[0].id, + ) + ) + assert response.status_code == 404