move portfolio context processor to generic location
This commit is contained in:
parent
0497ec26b2
commit
ed25078c39
@ -12,41 +12,7 @@ from atst.domain.exceptions import UnauthorizedError
|
||||
from atst.domain.portfolios import Portfolios
|
||||
from atst.domain.authz import Authorization
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.utils.context_processors import portfolio as portfolio_context_processor
|
||||
|
||||
|
||||
@portfolios_bp.context_processor
|
||||
def portfolio():
|
||||
portfolio = None
|
||||
if "portfolio_id" in http_request.view_args:
|
||||
portfolio = Portfolios.get(
|
||||
g.current_user, http_request.view_args["portfolio_id"]
|
||||
)
|
||||
|
||||
def user_can(permission):
|
||||
if portfolio:
|
||||
return Authorization.has_portfolio_permission(
|
||||
g.current_user, portfolio, permission
|
||||
)
|
||||
return False
|
||||
|
||||
if not portfolio is None:
|
||||
active_task_orders = [
|
||||
task_order for task_order in portfolio.task_orders if task_order.is_active
|
||||
]
|
||||
funding_end_date = (
|
||||
sorted(active_task_orders, key=attrgetter("end_date"))[-1].end_date
|
||||
if active_task_orders
|
||||
else None
|
||||
)
|
||||
funded = len(active_task_orders) > 1
|
||||
else:
|
||||
funding_end_date = None
|
||||
funded = None
|
||||
|
||||
return {
|
||||
"portfolio": portfolio,
|
||||
"permissions": Permissions,
|
||||
"user_can": user_can,
|
||||
"funding_end_date": funding_end_date,
|
||||
"funded": funded,
|
||||
}
|
||||
portfolios_bp.context_processor(portfolio_context_processor)
|
||||
|
74
atst/utils/context_processors.py
Normal file
74
atst/utils/context_processors.py
Normal file
@ -0,0 +1,74 @@
|
||||
from operator import attrgetter
|
||||
|
||||
from flask import request as http_request, g
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from atst.database import db
|
||||
from atst.domain.authz import Authorization
|
||||
from atst.models import Application, Portfolio, TaskOrder
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.domain.portfolios.scopes import ScopedPortfolio
|
||||
|
||||
|
||||
def get_portfolio_from_context(view_args):
|
||||
query = None
|
||||
|
||||
if "portfolio_id" in view_args:
|
||||
query = db.session.query(Portfolio).filter(
|
||||
Portfolio.id == view_args["portfolio_id"]
|
||||
)
|
||||
|
||||
elif "application_id" in view_args:
|
||||
query = (
|
||||
db.session.query(Portfolio)
|
||||
.join(Application, Application.portfolio_id == Portfolio.id)
|
||||
.filter(Application.id == view_args["application_id"])
|
||||
)
|
||||
|
||||
elif "task_order_id" in view_args:
|
||||
query = (
|
||||
db.session.query(Portfolio)
|
||||
.join(TaskOrder, TaskOrder.portfolio_id == Portfolio.id)
|
||||
.filter(TaskOrder.id == view_args["task_order_id"])
|
||||
)
|
||||
|
||||
if query:
|
||||
try:
|
||||
portfolio = query.one()
|
||||
|
||||
return ScopedPortfolio(g.current_user, portfolio)
|
||||
except NoResultFound:
|
||||
raise NotFoundError("portfolio")
|
||||
|
||||
|
||||
def portfolio():
|
||||
portfolio = get_portfolio_from_context(http_request.view_args)
|
||||
|
||||
def user_can(permission):
|
||||
if portfolio:
|
||||
return Authorization.has_portfolio_permission(
|
||||
g.current_user, portfolio, permission
|
||||
)
|
||||
return False
|
||||
|
||||
if not portfolio is None:
|
||||
active_task_orders = [
|
||||
task_order for task_order in portfolio.task_orders if task_order.is_active
|
||||
]
|
||||
funding_end_date = (
|
||||
sorted(active_task_orders, key=attrgetter("end_date"))[-1].end_date
|
||||
if active_task_orders
|
||||
else None
|
||||
)
|
||||
funded = len(active_task_orders) > 1
|
||||
else:
|
||||
funding_end_date = None
|
||||
funded = None
|
||||
|
||||
return {
|
||||
"portfolio": portfolio,
|
||||
"permissions": Permissions,
|
||||
"user_can": user_can,
|
||||
"funding_end_date": funding_end_date,
|
||||
"funded": funded,
|
||||
}
|
@ -14,7 +14,7 @@
|
||||
("portfolios.applications.team_settings.blank_slate.title" | translate),
|
||||
action_label=("portfolios.applications.team_settings.blank_slate.action_label" | translate),
|
||||
action_href='#' if user_can_invite else None,
|
||||
sub_message=None if user_can_invite else ("portfolios.team_settings.blank_slate.sub_message" | translate),
|
||||
sub_message=None if user_can_invite else ("portfolios.applications.team_settings.blank_slate.sub_message" | translate),
|
||||
icon='avatar'
|
||||
) }}
|
||||
|
||||
|
@ -71,6 +71,11 @@ def test_all_protected_routes_have_access_control(
|
||||
monkeypatch.setattr("atst.domain.portfolios.Portfolios.for_user", lambda *a: [])
|
||||
monkeypatch.setattr("atst.domain.portfolios.Portfolios.get", lambda *a: None)
|
||||
monkeypatch.setattr("atst.domain.task_orders.TaskOrders.get", lambda *a: Mock())
|
||||
monkeypatch.setattr("atst.domain.applications.Applications.get", lambda *a: Mock())
|
||||
monkeypatch.setattr("atst.domain.invitations.Invitations._get", lambda *a: Mock())
|
||||
monkeypatch.setattr(
|
||||
"atst.utils.context_processors.get_portfolio_from_context", lambda *a: None
|
||||
)
|
||||
|
||||
# patch the internal function the access decorator uses so that
|
||||
# we can check that it was called
|
||||
|
Loading…
x
Reference in New Issue
Block a user