Scope access to applications, task orders, and environment roles.

These resources should be scoped to the portfolio when accessed from
route functions.
This commit is contained in:
dandds 2019-04-16 10:32:42 -04:00
parent eaeeed0b05
commit c1df245800
8 changed files with 95 additions and 20 deletions

View File

@ -1,7 +1,9 @@
from flask import current_app as app
from sqlalchemy.orm.exc import NoResultFound
from atst.models.environment_role import EnvironmentRole
from atst.database import db
from atst.domain.exceptions import NotFoundError
from atst.models import EnvironmentRole, Environment, Application
class EnvironmentRoles(object):
@ -13,6 +15,23 @@ class EnvironmentRoles(object):
app.csp.cloud.create_role(env_role)
return env_role
@classmethod
def get_for_portfolio(cls, user_id, environment_id, portfolio_id):
try:
return (
db.session.query(EnvironmentRole)
.join(Environment, EnvironmentRole.environment_id == Environment.id)
.join(Application, Environment.application_id == Application.id)
.filter(
EnvironmentRole.user_id == user_id,
EnvironmentRole.environment_id == environment_id,
Application.portfolio_id == portfolio_id,
)
.one()
)
except NoResultFound:
raise NotFoundError("environment_role")
@classmethod
def get(cls, user_id, environment_id):
existing_env_role = (

View File

@ -64,7 +64,7 @@ def get_environments_obj_for_app(application):
@portfolios_bp.route("/portfolios/<portfolio_id>/applications/<application_id>/edit")
@user_can(Permissions.EDIT_APPLICATION, message="view application edit form")
def edit_application(portfolio_id, application_id):
application = Applications.get(application_id)
application = Applications.get(application_id, portfolio_id=portfolio_id)
form = ApplicationForm(name=application.name, description=application.description)
return render_template(
@ -80,7 +80,7 @@ def edit_application(portfolio_id, application_id):
)
@user_can(Permissions.EDIT_APPLICATION, message="update application")
def update_application(portfolio_id, application_id):
application = Applications.get(application_id)
application = Applications.get(application_id, portfolio_id=portfolio_id)
form = ApplicationForm(http_request.form)
if form.validate():
application_data = form.data
@ -101,7 +101,9 @@ def update_application(portfolio_id, application_id):
def wrap_environment_role_lookup(
user, portfolio_id=None, environment_id=None, **kwargs
):
env_role = EnvironmentRoles.get(user.id, environment_id)
env_role = EnvironmentRoles.get_for_portfolio(
user.id, environment_id, portfolio_id=portfolio_id
)
if not env_role:
raise UnauthorizedError(user, "access environment {}".format(environment_id))
@ -111,7 +113,9 @@ def wrap_environment_role_lookup(
@portfolios_bp.route("/portfolios/<portfolio_id>/environments/<environment_id>/access")
@user_can(None, override=wrap_environment_role_lookup, message="access environment")
def access_environment(portfolio_id, environment_id):
env_role = EnvironmentRoles.get(g.current_user.id, environment_id)
env_role = EnvironmentRoles.get_for_portfolio(
g.current_user.id, environment_id, portfolio_id=portfolio_id
)
token = app.csp.cloud.get_access_token(env_role)
return redirect(url_for("atst.csp_environment_access", token=token))
@ -122,7 +126,7 @@ def access_environment(portfolio_id, environment_id):
)
@user_can(Permissions.DELETE_APPLICATION, message="delete application")
def delete_application(portfolio_id, application_id):
application = Applications.get(application_id)
application = Applications.get(application_id, portfolio_id=portfolio_id)
Applications.delete(application)
flash("application_deleted", application_name=application.name)

View File

@ -50,7 +50,7 @@ def portfolio_members(portfolio_id):
@user_can(Permissions.VIEW_APPLICATION_MEMBER, message="view application members")
def application_members(portfolio_id, application_id):
portfolio = Portfolios.get_for_update(portfolio_id)
application = Applications.get(application_id)
application = Applications.get(application_id, portfolio_id=portfolio_id)
# TODO: this should show only members that have env roles in this application
members_list = [serialize_portfolio_role(k) for k in portfolio.members]

View File

@ -70,7 +70,7 @@ def portfolio_funding(portfolio_id):
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>")
@user_can(Permissions.VIEW_TASK_ORDER_DETAILS, message="view task order details")
def view_task_order(portfolio_id, task_order_id):
task_order = TaskOrders.get(task_order_id)
task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id)
to_form_complete = TaskOrders.all_sections_complete(task_order)
dd_254_complete = DD254s.is_complete(task_order.dd_254)
return render_template(
@ -86,8 +86,8 @@ def view_task_order(portfolio_id, task_order_id):
)
def wrap_check_is_ko_or_cor(user, task_order_id=None, **_kwargs):
task_order = TaskOrders.get(task_order_id)
def wrap_check_is_ko_or_cor(user, task_order_id=None, portfolio_id=None, **_kwargs):
task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id)
Authorization.check_is_ko_or_cor(user, task_order)
return True
@ -100,7 +100,7 @@ def wrap_check_is_ko_or_cor(user, task_order_id=None, **_kwargs):
message="view contracting officer review form",
)
def ko_review(portfolio_id, task_order_id):
task_order = TaskOrders.get(task_order_id)
task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id)
if TaskOrders.all_sections_complete(task_order):
return render_template(
@ -127,7 +127,7 @@ def resend_invite(portfolio_id, task_order_id):
invite_type_info = OFFICER_INVITATIONS[invite_type]
task_order = TaskOrders.get(task_order_id)
task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id)
portfolio = Portfolios.get(g.current_user, portfolio_id)
officer = getattr(task_order, invite_type_info["role"])
@ -180,7 +180,7 @@ def resend_invite(portfolio_id, task_order_id):
None, override=wrap_check_is_ko_or_cor, message="submit contracting officer review"
)
def submit_ko_review(portfolio_id, task_order_id, form=None):
task_order = TaskOrders.get(task_order_id)
task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id)
form_data = {**http_request.form, **http_request.files}
form = KOReviewForm(form_data)
@ -213,7 +213,7 @@ def submit_ko_review(portfolio_id, task_order_id, form=None):
Permissions.EDIT_TASK_ORDER_DETAILS, message="view task order invitations page"
)
def task_order_invitations(portfolio_id, task_order_id):
task_order = TaskOrders.get(task_order_id)
task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id)
form = EditTaskOrderOfficersForm(obj=task_order)
if TaskOrders.all_sections_complete(task_order):
@ -233,7 +233,7 @@ def task_order_invitations(portfolio_id, task_order_id):
)
@user_can(Permissions.EDIT_TASK_ORDER_DETAILS, message="edit task order invitations")
def edit_task_order_invitations(portfolio_id, task_order_id):
task_order = TaskOrders.get(task_order_id)
task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id)
form = EditTaskOrderOfficersForm(formdata=http_request.form, obj=task_order)
if form.validate():
@ -277,8 +277,8 @@ def so_review_form(task_order):
return DD254Form(data=form_data)
def wrap_check_is_so(user, task_order_id=None, **_kwargs):
task_order = TaskOrders.get(task_order_id)
def wrap_check_is_so(user, task_order_id=None, portfolio_id=None, **_kwargs):
task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id)
Authorization.check_is_so(user, task_order)
return True
@ -287,7 +287,7 @@ def wrap_check_is_so(user, task_order_id=None, **_kwargs):
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254")
@user_can(None, override=wrap_check_is_so, message="view security officer review form")
def so_review(portfolio_id, task_order_id):
task_order = TaskOrders.get(task_order_id)
task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id)
form = so_review_form(task_order)
return render_template(
@ -302,7 +302,7 @@ def so_review(portfolio_id, task_order_id):
None, override=wrap_check_is_so, message="submit security officer review form"
)
def submit_so_review(portfolio_id, task_order_id):
task_order = TaskOrders.get(task_order_id)
task_order = TaskOrders.get(task_order_id, portfolio_id=portfolio_id)
form = DD254Form(http_request.form)
if form.validate():

View File

@ -61,7 +61,12 @@ class ShowTaskOrderWorkflow:
@property
def task_order(self):
if not self._task_order and self.task_order_id:
self._task_order = TaskOrders.get(self.task_order_id)
if self.portfolio_id:
self._task_order = TaskOrders.get(
self.task_order_id, portfolio_id=self.portfolio_id
)
else:
self._task_order = TaskOrders.get(self.task_order_id)
return self._task_order
@ -260,6 +265,7 @@ def is_new_task_order(*_args, **kwargs):
)
# TODO: /task_orders/new/<int:screen>/<task_order_id> should not exist
@task_orders_bp.route("/task_orders/new/<int:screen>")
@task_orders_bp.route("/task_orders/new/<int:screen>/<task_order_id>")
@task_orders_bp.route("/portfolios/<portfolio_id>/task_orders/new/<int:screen>")
@ -309,6 +315,7 @@ def new(screen, task_order_id=None, portfolio_id=None):
return render_template(workflow.template, **template_args)
# TODO: /task_orders/new/<int:screen>/<task_order_id> should not exist
@task_orders_bp.route("/task_orders/new/<int:screen>", methods=["POST"])
@task_orders_bp.route("/task_orders/new/<int:screen>/<task_order_id>", methods=["POST"])
@task_orders_bp.route(

View File

@ -27,6 +27,7 @@ def wrap_check_is_ko(user, task_order_id=None, **_kwargs):
return True
# TODO: this route needs to be moved under the portfolio blueprint
@task_orders_bp.route("/task_orders/<task_order_id>/digital_signature", methods=["GET"])
@user_can(
None, override=wrap_check_is_ko, message="view contracting officer signature page"
@ -42,6 +43,7 @@ def signature_requested(task_order_id):
)
# TODO: this route needs to be moved under the portfolio blueprint
@task_orders_bp.route(
"/task_orders/<task_order_id>/digital_signature", methods=["POST"]
)

View File

@ -0,0 +1,27 @@
import pytest
from atst.domain.environment_roles import EnvironmentRoles
from atst.domain.exceptions import NotFoundError
from tests.factories import *
def test_get_for_portfolio():
user = UserFactory.create()
portfolio = PortfolioFactory.create()
application = ApplicationFactory.create(portfolio=portfolio)
environment = EnvironmentFactory.create(application=application)
env_role = EnvironmentRoleFactory.create(
environment=environment, user=user, role="basic access"
)
assert (
EnvironmentRoles.get_for_portfolio(
user.id, environment.id, portfolio_id=portfolio.id
)
== env_role
)
with pytest.raises(NotFoundError):
EnvironmentRoles.get_for_portfolio(
user.id, environment.id, portfolio_id=application.id
)

View File

@ -329,3 +329,19 @@ def test_delete_application(client, user_session):
# app and envs are soft deleted
assert len(port.applications) == 0
assert len(application.environments) == 0
def test_edit_application_scope(client, user_session):
owner = UserFactory.create()
port1 = PortfolioFactory.create(owner=owner, applications=[{"name": "first app"}])
port2 = PortfolioFactory.create(owner=owner, applications=[{"name": "second app"}])
user_session(owner)
response = client.get(
url_for(
"portfolios.edit_application",
portfolio_id=port2.id,
application_id=port1.applications[0].id,
)
)
assert response.status_code == 404