Look up major database resources in a before_request hook.

A `before_request` hook queries the database for portfolios, requests,
and task orders based on the route arguments. The resources are added as
attributes on `g`. The portfolio context processor and the access
decorator now rely on those resources being available on `g`.

WIP: find major resources in before_request hook, apply to g

WIP: use g.portfolio for portfolio context processor

WIP: the access decorator should rely on the resources being available on g
This commit is contained in:
dandds 2019-05-03 18:10:12 -04:00
parent b0600a34db
commit 42b912d4cb
6 changed files with 79 additions and 41 deletions

View File

@ -33,6 +33,8 @@ from atst.queue import queue
from logging.config import dictConfig
from atst.utils.logging import JsonFormatter, RequestContextFilter
from atst.utils.context_processors import assign_resources
ENV = os.getenv("FLASK_ENV", "dev")
@ -83,6 +85,10 @@ def make_app(config):
apply_authentication(app)
set_default_headers(app)
@app.before_request
def _set_resources():
assign_resources(request.view_args)
return app
@ -107,6 +113,9 @@ def make_flask_callbacks(app):
@app.after_request
def _cleanup(response):
g.current_user = None
g.portfolio = None
g.application = None
g.task_order = None
return response

View File

@ -12,31 +12,20 @@ from atst.domain.exceptions import UnauthorizedError
def check_access(permission, message, override, *args, **kwargs):
access_args = {"message": message}
access_args = {
"message": message,
"portfolio": g.portfolio,
"application": g.application,
}
if "application_id" in kwargs:
application = Applications.get(kwargs["application_id"])
access_args["application"] = application
access_args["portfolio"] = application.portfolio
elif "task_order_id" in kwargs:
task_order = TaskOrders.get(kwargs["task_order_id"])
access_args["portfolio"] = task_order.portfolio
elif "token" in kwargs:
# TODO: We should change the `token` arg in routes to be either
# `portfolio_token` or `application_token` and have
# atst.utils.context_processors.assign_resources take care of
# this.
if "token" in kwargs:
invite = PortfolioInvitations._get(kwargs["token"])
access_args["portfolio"] = invite.role.portfolio
elif "portfolio_id" in kwargs:
access_args["portfolio"] = Portfolios.get(
g.current_user, kwargs["portfolio_id"]
)
elif "environment_id" in kwargs:
environment = Environments.get(kwargs["environment_id"])
access_args["application"] = environment.application
access_args["portfolio"] = environment.application.portfolio
if override is not None and override(g.current_user, **access_args, **kwargs):
return True

View File

@ -1,6 +1,6 @@
from operator import attrgetter
from flask import request as http_request, g
from flask import g
from sqlalchemy.orm.exc import NoResultFound
from atst.database import db
@ -10,7 +10,7 @@ from atst.models.permissions import Permissions
from atst.domain.portfolios.scopes import ScopedPortfolio
def get_portfolio_from_context(view_args):
def get_resources_from_context(view_args):
query = None
if "portfolio_id" in view_args:
@ -20,14 +20,14 @@ def get_portfolio_from_context(view_args):
elif "application_id" in view_args:
query = (
db.session.query(Portfolio)
db.session.query(Portfolio, Application)
.join(Application, Application.portfolio_id == Portfolio.id)
.filter(Application.id == view_args["application_id"])
)
elif "environment_id" in view_args:
query = (
db.session.query(Portfolio)
db.session.query(Portfolio, Application)
.join(Application, Application.portfolio_id == Portfolio.id)
.join(Environment, Environment.application_id == Application.id)
.filter(Environment.id == view_args["environment_id"])
@ -35,33 +35,45 @@ def get_portfolio_from_context(view_args):
elif "task_order_id" in view_args:
query = (
db.session.query(Portfolio)
db.session.query(Portfolio, TaskOrder)
.join(TaskOrder, TaskOrder.portfolio_id == Portfolio.id)
.filter(TaskOrder.id == view_args["task_order_id"])
)
if query:
try:
portfolio = query.one()
return ScopedPortfolio(g.current_user, portfolio)
return query.only_return_tuples(True).one()
except NoResultFound:
raise NotFoundError("portfolio")
def portfolio():
portfolio = get_portfolio_from_context(http_request.view_args)
def assign_resources(view_args):
g.portfolio = None
g.application = None
g.task_order = None
resources = get_resources_from_context(view_args)
if resources:
for resource in resources:
if isinstance(resource, Portfolio):
g.portfolio = ScopedPortfolio(g.current_user, resource)
elif isinstance(resource, Application):
g.application = resource
elif isinstance(resource, TaskOrder):
g.task_order = resource
def portfolio():
def user_can(permission):
if portfolio:
if g.portfolio:
return Authorization.has_portfolio_permission(
g.current_user, portfolio, permission
g.current_user, g.portfolio, permission
)
return False
if not portfolio is None:
if not g.portfolio is None:
active_task_orders = [
task_order for task_order in portfolio.task_orders if task_order.is_active
task_order for task_order in g.portfolio.task_orders if task_order.is_active
]
funding_end_date = (
sorted(active_task_orders, key=attrgetter("end_date"))[-1].end_date
@ -74,7 +86,7 @@ def portfolio():
funded = None
return {
"portfolio": portfolio,
"portfolio": g.portfolio,
"permissions": Permissions,
"user_can": user_can,
"funding_end_date": funding_end_date,

View File

@ -153,7 +153,7 @@ def test_user_can_access_decorator_atat_level(set_current_user):
_access_activity_log()
def test_user_can_access_decorator_portfolio_level(set_current_user):
def test_user_can_access_decorator_portfolio_level(set_current_user, request_ctx):
ccpo = UserFactory.create_ccpo()
edit_admin = UserFactory.create()
view_admin = UserFactory.create()
@ -162,6 +162,9 @@ def test_user_can_access_decorator_portfolio_level(set_current_user):
# factory gives view perms by default
PortfolioRoleFactory.create(user=view_admin, portfolio=portfolio)
request_ctx.g.portfolio = portfolio
request_ctx.g.application = None
@user_can_access_decorator(Permissions.EDIT_PORTFOLIO_NAME)
def _edit_portfolio_name(*args, **kwargs):
return True
@ -177,7 +180,7 @@ def test_user_can_access_decorator_portfolio_level(set_current_user):
_edit_portfolio_name(portfolio_id=portfolio.id)
def test_user_can_access_decorator_application_level(set_current_user):
def test_user_can_access_decorator_application_level(set_current_user, request_ctx):
ccpo = UserFactory.create_ccpo()
port_admin = UserFactory.create()
app_user = UserFactory.create()
@ -189,6 +192,9 @@ def test_user_can_access_decorator_application_level(set_current_user):
app = portfolio.applications[0]
ApplicationRoleFactory.create(application=app, user=app_user)
request_ctx.g.portfolio = portfolio
request_ctx.g.application = app
@user_can_access_decorator(Permissions.VIEW_APPLICATION)
def _stroll_into_mos_eisley(*args, **kwargs):
return True

View File

@ -78,9 +78,7 @@ def test_all_protected_routes_have_access_control(
monkeypatch.setattr(
"atst.domain.invitations.PortfolioInvitations._get", lambda *a: Mock()
)
monkeypatch.setattr(
"atst.utils.context_processors.get_portfolio_from_context", lambda *a: None
)
monkeypatch.setattr("atst.app.assign_resources", lambda *a: None)
# patch the internal function the access decorator uses so that
# we can check that it was called

View File

@ -0,0 +1,24 @@
from atst.utils.context_processors import get_resources_from_context
from tests.factories import *
def test_get_resources_from_context():
portfolio = PortfolioFactory.create()
task_order = TaskOrderFactory.create(portfolio=portfolio)
application = ApplicationFactory.create(portfolio=portfolio)
environment = EnvironmentFactory.create(application=application)
assert get_resources_from_context({"portfolio_id": portfolio.id}) == (portfolio,)
assert get_resources_from_context({"application_id": application.id}) == (
portfolio,
application,
)
assert get_resources_from_context({"environment_id": environment.id}) == (
portfolio,
application,
)
assert get_resources_from_context({"task_order_id": task_order.id}) == (
portfolio,
task_order,
)