Merge pull request #720 from dod-ccpo/access-decorator

Access Decorator
This commit is contained in:
dandds 2019-03-25 12:44:45 -04:00 committed by GitHub
commit 2cb5cf6b9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 1313 additions and 696 deletions

View File

@ -1,8 +1,6 @@
from atst.database import db
from atst.domain.authz import Authorization
from atst.domain.environments import Environments
from atst.domain.exceptions import NotFoundError
from atst.models.permissions import Permissions
from atst.models.application import Application
from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole
@ -10,7 +8,7 @@ from atst.models.environment_role import EnvironmentRole
class Applications(object):
@classmethod
def create(cls, user, portfolio, name, description, environment_names):
def create(cls, portfolio, name, description, environment_names):
application = Application(
portfolio=portfolio, name=name, description=description
)
@ -22,15 +20,7 @@ class Applications(object):
return application
@classmethod
def get(cls, user, portfolio, application_id):
# TODO: this should check permission for this particular application
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.VIEW_APPLICATION,
"view application in portfolio",
)
def get(cls, application_id):
try:
application = (
db.session.query(Application).filter_by(id=application_id).one()
@ -52,14 +42,7 @@ class Applications(object):
)
@classmethod
def get_all(cls, user, portfolio_role, portfolio):
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.VIEW_APPLICATION,
"view application in portfolio",
)
def get_all(cls, portfolio):
try:
applications = (
db.session.query(Application).filter_by(portfolio_id=portfolio.id).all()
@ -70,7 +53,7 @@ class Applications(object):
return applications
@classmethod
def update(cls, user, portfolio, application, new_data):
def update(cls, application, new_data):
if "name" in new_data:
application.name = new_data["name"]
if "description" in new_data:

View File

@ -2,7 +2,6 @@ from sqlalchemy import or_
from atst.database import db
from atst.domain.common import Query
from atst.domain.authz import Authorization, Permissions
from atst.models.audit_event import AuditEvent
@ -35,21 +34,11 @@ class AuditLog(object):
return cls._log(resource=resource, action=action, portfolio=portfolio)
@classmethod
def get_all_events(cls, user, pagination_opts=None):
# TODO: general audit log permissions
Authorization.check_atat_permission(
user, Permissions.VIEW_AUDIT_LOG, "view audit log"
)
def get_all_events(cls, pagination_opts=None):
return AuditEventQuery.get_all(pagination_opts)
@classmethod
def get_portfolio_events(cls, user, portfolio, pagination_opts=None):
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.VIEW_PORTFOLIO_ACTIVITY_LOG,
"view portfolio audit log",
)
def get_portfolio_events(cls, portfolio, pagination_opts=None):
return AuditEventQuery.get_ws_events(portfolio.id, pagination_opts)
@classmethod

View File

@ -18,10 +18,6 @@ class Authorization(object):
def has_atat_permission(cls, user, permission):
return permission in user.permissions
@classmethod
def is_in_portfolio(cls, user, portfolio):
return user in portfolio.users
@classmethod
def check_portfolio_permission(cls, user, portfolio, permission, message):
if not (
@ -30,14 +26,14 @@ class Authorization(object):
):
raise UnauthorizedError(user, message)
return True
@classmethod
def check_atat_permission(cls, user, permission, message):
if not Authorization.has_atat_permission(user, permission):
raise UnauthorizedError(user, message)
@classmethod
def can_view_audit_log(cls, user):
return Authorization.has_atat_permission(user, Permissions.VIEW_AUDIT_LOG)
return True
@classmethod
def is_ko(cls, user, task_order):
@ -72,23 +68,11 @@ class Authorization(object):
message = "review task order {}".format(task_order.id)
raise UnauthorizedError(user, message)
@classmethod
def check_task_order_permission(cls, user, task_order, permission, message):
if Authorization._check_is_task_order_officer(user, task_order):
def user_can_access(user, permission, portfolio=None, message=None):
if portfolio:
Authorization.check_portfolio_permission(user, portfolio, permission, message)
else:
Authorization.check_atat_permission(user, permission, message)
return True
Authorization.check_portfolio_permission(
user, task_order.portfolio, permission, message
)
@classmethod
def _check_is_task_order_officer(cls, user, task_order):
for officer in [
"contracting_officer",
"contracting_officer_representative",
"security_officer",
]:
if getattr(task_order, officer, None) == user:
return True
return False

View File

@ -0,0 +1,55 @@
from functools import wraps
from flask import g, current_app as app, request
from . import user_can_access
from atst.domain.portfolios import Portfolios
from atst.domain.task_orders import TaskOrders
from atst.domain.exceptions import UnauthorizedError
def check_access(permission, message, exception, *args, **kwargs):
access_args = {"message": message}
if "portfolio_id" in kwargs:
access_args["portfolio"] = Portfolios.get(
g.current_user, kwargs["portfolio_id"]
)
if "task_order_id" in kwargs:
task_order = TaskOrders.get(kwargs["task_order_id"])
access_args["portfolio"] = task_order.portfolio
if exception is not None and exception(g.current_user, **access_args, **kwargs):
return True
user_can_access(g.current_user, permission, **access_args)
return True
def user_can_access_decorator(permission, message=None, exception=None):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
try:
check_access(permission, message, exception, *args, **kwargs)
app.logger.info(
"[access] User {} accessed {} {}".format(
g.current_user.id, request.method, request.path
)
)
return f(*args, **kwargs)
except UnauthorizedError as err:
app.logger.warning(
"[access] User {} denied access {} {}".format(
g.current_user.id, request.method, request.path
)
)
raise (err)
return decorated_function
return decorator

View File

@ -5,8 +5,6 @@ from atst.database import db
from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole
from atst.models.application import Application
from atst.models.permissions import Permissions
from atst.domain.authz import Authorization
from atst.domain.environment_roles import EnvironmentRoles
from .exceptions import NotFoundError
@ -60,13 +58,7 @@ class Environments(object):
return env
@classmethod
def update_environment_roles(cls, user, portfolio, portfolio_role, ids_and_roles):
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.EDIT_APPLICATION_MEMBER,
"assign environment roles",
)
def update_environment_roles(cls, portfolio_role, ids_and_roles):
updated = False
for id_and_role in ids_and_roles:
@ -100,11 +92,5 @@ class Environments(object):
return updated
@classmethod
def revoke_access(cls, user, environment, target_user):
Authorization.check_portfolio_permission(
user,
environment.portfolio,
Permissions.EDIT_APPLICATION_MEMBER,
"revoke environment access",
)
def revoke_access(cls, environment, target_user):
EnvironmentRoles.delete(environment.id, target_user.id)

View File

@ -4,8 +4,6 @@ from sqlalchemy.orm.exc import NoResultFound
from atst.database import db
from atst.models.invitation import Invitation, Status as InvitationStatus
from atst.domain.portfolio_roles import PortfolioRoles
from atst.domain.authz import Authorization, Permissions
from atst.domain.portfolios import Portfolios
from .exceptions import NotFoundError
@ -118,15 +116,7 @@ class Invitations(object):
return portfolio_role.latest_invitation
@classmethod
def resend(cls, user, portfolio_id, token):
portfolio = Portfolios.get(user, portfolio_id)
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.CREATE_PORTFOLIO_USERS,
"resend a portfolio invitation",
)
def resend(cls, user, token):
previous_invitation = Invitations._get(token)
Invitations._update_status(previous_invitation, InvitationStatus.REVOKED)

View File

@ -33,51 +33,11 @@ class Portfolios(object):
@classmethod
def get(cls, user, portfolio_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user, portfolio, Permissions.VIEW_PORTFOLIO, "get portfolio"
)
return ScopedPortfolio(user, portfolio)
@classmethod
def get_for_update_applications(cls, user, portfolio_id):
def get_for_update(cls, portfolio_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user, portfolio, Permissions.CREATE_APPLICATION, "add application"
)
return portfolio
@classmethod
def get_for_update_information(cls, user, portfolio_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.EDIT_PORTFOLIO_NAME,
"update portfolio information",
)
return portfolio
@classmethod
def get_for_update_member(cls, user, portfolio_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.EDIT_PORTFOLIO_USERS,
"update a portfolio member",
)
return portfolio
@classmethod
def get_with_members(cls, user, portfolio_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user, portfolio, Permissions.VIEW_PORTFOLIO_USERS, "view portfolio members"
)
return portfolio
@ -90,11 +50,7 @@ class Portfolios(object):
return portfolios
@classmethod
def create_member(cls, user, portfolio, data):
Authorization.check_portfolio_permission(
user, portfolio, Permissions.EDIT_PORTFOLIO_USERS, "create portfolio member"
)
def create_member(cls, portfolio, data):
new_user = Users.get_or_create_by_dod_id(
data["dod_id"],
first_name=data["first_name"],
@ -113,12 +69,7 @@ class Portfolios(object):
return portfolio_role
@classmethod
def update_member(cls, user, portfolio, member, permission_sets):
Authorization.check_portfolio_permission(
user, portfolio, Permissions.EDIT_PORTFOLIO_USERS, "edit portfolio member"
)
# need to update perms sets here
def update_member(cls, member, permission_sets):
return PortfolioRoles.update(member, permission_sets)
@classmethod
@ -149,11 +100,8 @@ class Portfolios(object):
)
@classmethod
def revoke_access(cls, user, portfolio_id, portfolio_role_id):
def revoke_access(cls, portfolio_id, portfolio_role_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user, portfolio, Permissions.EDIT_PORTFOLIO_USERS, "revoke portfolio access"
)
portfolio_role = PortfolioRoles.get_by_id(portfolio_role_id)
if not Portfolios.can_revoke_access_for(portfolio, portfolio_role):
@ -161,7 +109,7 @@ class Portfolios(object):
portfolio_role.status = PortfolioRoleStatus.DISABLED
for environment in portfolio.all_environments:
Environments.revoke_access(user, environment, portfolio_role.user)
Environments.revoke_access(environment, portfolio_role.user)
PortfoliosQuery.add_and_commit(portfolio_role)
return portfolio_role

View File

@ -3,10 +3,8 @@ from flask import current_app as app
from atst.database import db
from atst.models.task_order import TaskOrder
from atst.models.permissions import Permissions
from atst.models.dd_254 import DD254
from atst.domain.portfolios import Portfolios
from atst.domain.authz import Authorization
from atst.domain.permission_sets import PermissionSets
from .exceptions import NotFoundError
@ -54,12 +52,9 @@ class TaskOrders(object):
UNCLASSIFIED_FUNDING = ["performance_length", "csp_estimate", "clin_01", "clin_03"]
@classmethod
def get(cls, user, task_order_id):
def get(cls, task_order_id):
try:
task_order = db.session.query(TaskOrder).filter_by(id=task_order_id).one()
Authorization.check_task_order_permission(
user, task_order, Permissions.VIEW_TASK_ORDER_DETAILS, "view task order"
)
return task_order
except NoResultFound:
@ -67,9 +62,6 @@ class TaskOrders(object):
@classmethod
def create(cls, creator, portfolio):
Authorization.check_portfolio_permission(
creator, portfolio, Permissions.CREATE_TASK_ORDER, "add task order"
)
task_order = TaskOrder(portfolio=portfolio, creator=creator)
db.session.add(task_order)
@ -78,11 +70,7 @@ class TaskOrders(object):
return task_order
@classmethod
def update(cls, user, task_order, **kwargs):
Authorization.check_task_order_permission(
user, task_order, Permissions.EDIT_TASK_ORDER_DETAILS, "update task order"
)
def update(cls, task_order, **kwargs):
for key, value in kwargs.items():
setattr(task_order, key, value)
@ -147,14 +135,7 @@ class TaskOrders(object):
]
@classmethod
def add_officer(cls, user, task_order, officer_type, officer_data):
Authorization.check_portfolio_permission(
user,
task_order.portfolio,
Permissions.EDIT_TASK_ORDER_DETAILS,
"add task order officer",
)
def add_officer(cls, task_order, officer_type, officer_data):
if officer_type in TaskOrders.OFFICERS:
portfolio = task_order.portfolio
@ -171,7 +152,6 @@ class TaskOrders(object):
portfolio_user = existing_member.user
else:
member = Portfolios.create_member(
user,
portfolio,
{
**officer_data,

View File

@ -22,6 +22,8 @@ from atst.domain.audit_log import AuditLog
from atst.domain.auth import logout as _logout
from atst.domain.common import Paginator
from atst.domain.portfolios import Portfolios
from atst.domain.authz.decorator import user_can_access_decorator as user_can
from atst.models.permissions import Permissions
from atst.utils.flash import formatted_flash as flash
@ -143,9 +145,10 @@ def logout():
@bp.route("/activity-history")
@user_can(Permissions.VIEW_AUDIT_LOG, message="view activity log")
def activity_history():
pagination_opts = Paginator.get_pagination_opts(request)
audit_events = AuditLog.get_all_events(g.current_user, pagination_opts)
audit_events = AuditLog.get_all_events(pagination_opts)
return render_template("audit_log/audit_log.html", audit_events=audit_events)

View File

@ -18,12 +18,9 @@ from atst.models.permissions import Permissions
def portfolio():
portfolio = None
if "portfolio_id" in http_request.view_args:
try:
portfolio = Portfolios.get(
g.current_user, http_request.view_args["portfolio_id"]
)
except UnauthorizedError:
pass
def user_can(permission):
if portfolio:

View File

@ -13,17 +13,21 @@ from atst.domain.exceptions import UnauthorizedError
from atst.domain.applications import Applications
from atst.domain.portfolios import Portfolios
from atst.forms.application import NewApplicationForm, ApplicationForm
from atst.domain.authz.decorator import user_can_access_decorator as user_can
from atst.models.permissions import Permissions
@portfolios_bp.route("/portfolios/<portfolio_id>/applications")
@user_can(Permissions.VIEW_APPLICATION, message="view portfolio applications")
def portfolio_applications(portfolio_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
return render_template("portfolios/applications/index.html", portfolio=portfolio)
@portfolios_bp.route("/portfolios/<portfolio_id>/applications/new")
@user_can(Permissions.CREATE_APPLICATION, message="view create new application form")
def new_application(portfolio_id):
portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id)
portfolio = Portfolios.get_for_update(portfolio_id)
form = NewApplicationForm()
return render_template(
"portfolios/applications/new.html", portfolio=portfolio, form=form
@ -31,14 +35,14 @@ def new_application(portfolio_id):
@portfolios_bp.route("/portfolios/<portfolio_id>/applications/new", methods=["POST"])
@user_can(Permissions.CREATE_APPLICATION, message="create new application")
def create_application(portfolio_id):
portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id)
portfolio = Portfolios.get_for_update(portfolio_id)
form = NewApplicationForm(http_request.form)
if form.validate():
application_data = form.data
Applications.create(
g.current_user,
portfolio,
application_data["name"],
application_data["description"],
@ -54,9 +58,10 @@ def create_application(portfolio_id):
@portfolios_bp.route("/portfolios/<portfolio_id>/applications/<application_id>/edit")
@user_can(Permissions.EDIT_APPLICATION, message="view application edit form")
def edit_application(portfolio_id, application_id):
portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id)
application = Applications.get(g.current_user, portfolio, application_id)
portfolio = Portfolios.get_for_update(portfolio_id)
application = Applications.get(application_id)
form = ApplicationForm(name=application.name, description=application.description)
return render_template(
@ -70,13 +75,14 @@ def edit_application(portfolio_id, application_id):
@portfolios_bp.route(
"/portfolios/<portfolio_id>/applications/<application_id>/edit", methods=["POST"]
)
@user_can(Permissions.EDIT_APPLICATION, message="update application")
def update_application(portfolio_id, application_id):
portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id)
application = Applications.get(g.current_user, portfolio, application_id)
portfolio = Portfolios.get_for_update(portfolio_id)
application = Applications.get(application_id)
form = ApplicationForm(http_request.form)
if form.validate():
application_data = form.data
Applications.update(g.current_user, portfolio, application, application_data)
Applications.update(application, application_data)
return redirect(
url_for("portfolios.portfolio_applications", portfolio_id=portfolio.id)
@ -90,13 +96,20 @@ def update_application(portfolio_id, application_id):
)
def wrap_environment_role_lookup(
user, portfolio_id=None, environment_id=None, **kwargs
):
env_role = EnvironmentRoles.get(user.id, environment_id)
if not env_role:
raise UnauthorizedError(user, "access environment {}".format(environment_id))
return True
@portfolios_bp.route("/portfolios/<portfolio_id>/environments/<environment_id>/access")
@user_can(None, exception=wrap_environment_role_lookup, message="access environment")
def access_environment(portfolio_id, environment_id):
env_role = EnvironmentRoles.get(g.current_user.id, environment_id)
if not env_role:
raise UnauthorizedError(
g.current_user, "access environment {}".format(environment_id)
)
else:
token = app.csp.cloud.get_access_token(env_role)
return redirect(url_for("atst.csp_environment_access", token=token))

View File

@ -6,11 +6,11 @@ from . import portfolios_bp
from atst.domain.reports import Reports
from atst.domain.portfolios import Portfolios
from atst.domain.audit_log import AuditLog
from atst.domain.authz import Authorization
from atst.domain.common import Paginator
from atst.forms.portfolio import PortfolioForm
from atst.models.permissions import Permissions
from atst.domain.permission_sets import PermissionSets
from atst.domain.authz.decorator import user_can_access_decorator as user_can
from atst.models.permissions import Permissions
@portfolios_bp.route("/portfolios")
@ -37,14 +37,9 @@ def serialize_member(member):
}
@portfolios_bp.route("/portfolios/<portfolio_id>/admin")
def portfolio_admin(portfolio_id):
portfolio = Portfolios.get_for_update_information(g.current_user, portfolio_id)
form = PortfolioForm(data={"name": portfolio.name})
def render_admin_page(portfolio, form):
pagination_opts = Paginator.get_pagination_opts(http_request)
audit_events = AuditLog.get_portfolio_events(
g.current_user, portfolio, pagination_opts
)
audit_events = AuditLog.get_portfolio_events(portfolio, pagination_opts)
members_data = [serialize_member(member) for member in portfolio.members]
return render_template(
"portfolios/admin.html",
@ -56,9 +51,18 @@ def portfolio_admin(portfolio_id):
)
@portfolios_bp.route("/portfolios/<portfolio_id>/admin")
@user_can(Permissions.VIEW_PORTFOLIO_ADMIN, message="view portfolio admin page")
def portfolio_admin(portfolio_id):
portfolio = Portfolios.get_for_update(portfolio_id)
form = PortfolioForm(data={"name": portfolio.name})
return render_admin_page(portfolio, form)
@portfolios_bp.route("/portfolios/<portfolio_id>/edit", methods=["POST"])
@user_can(Permissions.EDIT_PORTFOLIO_NAME, message="edit portfolio")
def edit_portfolio(portfolio_id):
portfolio = Portfolios.get_for_update_information(g.current_user, portfolio_id)
portfolio = Portfolios.get_for_update(portfolio_id)
form = PortfolioForm(http_request.form)
if form.validate():
Portfolios.update(portfolio, form.data)
@ -66,10 +70,12 @@ def edit_portfolio(portfolio_id):
url_for("portfolios.portfolio_applications", portfolio_id=portfolio.id)
)
else:
return render_template("portfolios/edit.html", form=form, portfolio=portfolio)
# rerender portfolio admin page
return render_admin_page(portfolio, form)
@portfolios_bp.route("/portfolios/<portfolio_id>")
@user_can(Permissions.VIEW_PORTFOLIO, message="view portfolio")
def show_portfolio(portfolio_id):
return redirect(
url_for("portfolios.portfolio_applications", portfolio_id=portfolio_id)
@ -77,15 +83,9 @@ def show_portfolio(portfolio_id):
@portfolios_bp.route("/portfolios/<portfolio_id>/reports")
@user_can(Permissions.VIEW_PORTFOLIO_REPORTS, message="view portfolio reports")
def portfolio_reports(portfolio_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
Authorization.check_portfolio_permission(
g.current_user,
portfolio,
Permissions.VIEW_PORTFOLIO_REPORTS,
"view portfolio reports",
)
today = date.today()
month = http_request.args.get("month", today.month)
year = http_request.args.get("year", today.year)

View File

@ -5,6 +5,8 @@ from atst.domain.portfolios import Portfolios
from atst.domain.invitations import Invitations
from atst.queue import queue
from atst.utils.flash import formatted_flash as flash
from atst.domain.authz.decorator import user_can_access_decorator as user_can
from atst.models.permissions import Permissions
def send_invite_email(owner_name, token, new_member_email):
@ -43,8 +45,9 @@ def accept_invitation(token):
@portfolios_bp.route(
"/portfolios/<portfolio_id>/invitations/<token>/revoke", methods=["POST"]
)
@user_can(Permissions.EDIT_PORTFOLIO_USERS, message="revoke invitation")
def revoke_invitation(portfolio_id, token):
portfolio = Portfolios.get_for_update_member(g.current_user, portfolio_id)
portfolio = Portfolios.get_for_update(portfolio_id)
Invitations.revoke(token)
return redirect(url_for("portfolios.portfolio_members", portfolio_id=portfolio.id))
@ -53,8 +56,9 @@ def revoke_invitation(portfolio_id, token):
@portfolios_bp.route(
"/portfolios/<portfolio_id>/invitations/<token>/resend", methods=["POST"]
)
@user_can(Permissions.EDIT_PORTFOLIO_USERS, message="resend invitation")
def resend_invitation(portfolio_id, token):
invite = Invitations.resend(g.current_user, portfolio_id, token)
invite = Invitations.resend(g.current_user, token)
send_invite_email(g.current_user.full_name, invite.token, invite.email)
flash("resend_portfolio_invitation", user_name=invite.user_name)
return redirect(url_for("portfolios.portfolio_members", portfolio_id=portfolio_id))

View File

@ -12,7 +12,7 @@ from atst.domain.environment_roles import EnvironmentRoles
from atst.services.invitation import Invitation as InvitationService
import atst.forms.portfolio_member as member_forms
from atst.forms.data import ENVIRONMENT_ROLES, ENV_ROLE_MODAL_DESCRIPTION
from atst.domain.authz import Authorization
from atst.domain.authz.decorator import user_can_access_decorator as user_can
from atst.models.permissions import Permissions
from atst.utils.flash import formatted_flash as flash
@ -34,8 +34,9 @@ def serialize_portfolio_role(portfolio_role):
@portfolios_bp.route("/portfolios/<portfolio_id>/members")
@user_can(Permissions.VIEW_PORTFOLIO_USERS, message="view portfolio members")
def portfolio_members(portfolio_id):
portfolio = Portfolios.get_with_members(g.current_user, portfolio_id)
portfolio = Portfolios.get_for_update(portfolio_id)
members_list = [serialize_portfolio_role(k) for k in portfolio.members]
return render_template(
@ -47,9 +48,10 @@ def portfolio_members(portfolio_id):
@portfolios_bp.route("/portfolios/<portfolio_id>/applications/<application_id>/members")
@user_can(Permissions.VIEW_APPLICATION_MEMBER, message="view application members")
def application_members(portfolio_id, application_id):
portfolio = Portfolios.get_with_members(g.current_user, portfolio_id)
application = Applications.get(g.current_user, portfolio, application_id)
portfolio = Portfolios.get_for_update(portfolio_id)
application = Applications.get(application_id)
# TODO: this should show only members that have env roles in this application
members_list = [serialize_portfolio_role(k) for k in portfolio.members]
@ -62,6 +64,9 @@ def application_members(portfolio_id, application_id):
@portfolios_bp.route("/portfolios/<portfolio_id>/members/new")
@user_can(
Permissions.CREATE_PORTFOLIO_USERS, message="view create new portfolio member form"
)
def new_member(portfolio_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
form = member_forms.NewForm()
@ -71,13 +76,14 @@ def new_member(portfolio_id):
@portfolios_bp.route("/portfolios/<portfolio_id>/members/new", methods=["POST"])
@user_can(Permissions.CREATE_PORTFOLIO_USERS, message="create new portfolio member")
def create_member(portfolio_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
form = member_forms.NewForm(http_request.form)
if form.validate():
try:
member = Portfolios.create_member(g.current_user, portfolio, form.data)
member = Portfolios.create_member(portfolio, form.data)
invite_service = InvitationService(
g.current_user, member, form.data.get("email")
)
@ -99,16 +105,11 @@ def create_member(portfolio_id):
@portfolios_bp.route("/portfolios/<portfolio_id>/members/<member_id>/member_edit")
@user_can(Permissions.VIEW_PORTFOLIO_USERS, message="view portfolio member")
def view_member(portfolio_id, member_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
Authorization.check_portfolio_permission(
g.current_user,
portfolio,
Permissions.EDIT_PORTFOLIO_USERS,
"edit this portfolio user",
)
member = PortfolioRoles.get(portfolio_id, member_id)
applications = Applications.get_all(g.current_user, member, portfolio)
applications = Applications.get_all(portfolio)
form = member_forms.EditForm(portfolio_role="admin")
editable = g.current_user == member.user
can_revoke_access = Portfolios.can_revoke_access_for(portfolio, member)
@ -130,17 +131,14 @@ def view_member(portfolio_id, member_id):
)
# TODO: check if member_id is consistent with other routes here;
# user ID vs portfolio role ID
@portfolios_bp.route(
"/portfolios/<portfolio_id>/members/<member_id>/member_edit", methods=["POST"]
)
@user_can(Permissions.EDIT_PORTFOLIO_USERS, message="update portfolio member")
def update_member(portfolio_id, member_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
Authorization.check_portfolio_permission(
g.current_user,
portfolio,
Permissions.EDIT_PORTFOLIO_USERS,
"edit this portfolio user",
)
member = PortfolioRoles.get(portfolio_id, member_id)
ids_and_roles = []
@ -153,12 +151,8 @@ def update_member(portfolio_id, member_id):
form = member_forms.EditForm(http_request.form)
if form.validate():
member = Portfolios.update_member(
g.current_user, portfolio, member, form.data["permission_sets"]
)
updated_roles = Environments.update_environment_roles(
g.current_user, portfolio, member, ids_and_roles
)
member = Portfolios.update_member(member, form.data["permission_sets"])
updated_roles = Environments.update_environment_roles(member, ids_and_roles)
if updated_roles:
flash("environment_access_changed")
@ -177,7 +171,8 @@ def update_member(portfolio_id, member_id):
@portfolios_bp.route(
"/portfolios/<portfolio_id>/members/<member_id>/revoke_access", methods=["POST"]
)
@user_can(Permissions.EDIT_PORTFOLIO_USERS, message="revoke portfolio access")
def revoke_access(portfolio_id, member_id):
revoked_role = Portfolios.revoke_access(g.current_user, portfolio_id, member_id)
revoked_role = Portfolios.revoke_access(portfolio_id, member_id)
flash("revoked_portfolio_access", member_name=revoked_role.user.full_name)
return redirect(url_for("portfolios.portfolio_members", portfolio_id=portfolio_id))

View File

@ -20,9 +20,12 @@ from atst.services.invitation import (
OFFICER_INVITATIONS,
Invitation as InvitationService,
)
from atst.domain.authz.decorator import user_can_access_decorator as user_can
from atst.models.permissions import Permissions
@portfolios_bp.route("/portfolios/<portfolio_id>/task_orders")
@user_can(Permissions.VIEW_PORTFOLIO_FUNDING, message="view portfolio funding")
def portfolio_funding(portfolio_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
task_orders_by_status = defaultdict(list)
@ -66,9 +69,10 @@ def portfolio_funding(portfolio_id):
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>")
@user_can(Permissions.VIEW_TASK_ORDER_DETAILS, message="view task order details")
def view_task_order(portfolio_id, task_order_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
task_order = TaskOrders.get(g.current_user, task_order_id)
task_order = TaskOrders.get(task_order_id)
to_form_complete = TaskOrders.all_sections_complete(task_order)
dd_254_complete = DD254s.is_complete(task_order.dd_254)
return render_template(
@ -85,12 +89,22 @@ def view_task_order(portfolio_id, task_order_id):
)
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/review")
def ko_review(portfolio_id, task_order_id):
task_order = TaskOrders.get(g.current_user, task_order_id)
portfolio = Portfolios.get(g.current_user, portfolio_id)
def wrap_check_is_ko_or_cor(user, task_order_id=None, **_kwargs):
task_order = TaskOrders.get(task_order_id)
Authorization.check_is_ko_or_cor(user, task_order)
Authorization.check_is_ko_or_cor(g.current_user, task_order)
return True
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/review")
@user_can(
None,
exception=wrap_check_is_ko_or_cor,
message="view contracting officer review form",
)
def ko_review(portfolio_id, task_order_id):
task_order = TaskOrders.get(task_order_id)
portfolio = Portfolios.get(g.current_user, portfolio_id)
if TaskOrders.all_sections_complete(task_order):
return render_template(
@ -107,7 +121,10 @@ def ko_review(portfolio_id, task_order_id):
"/portfolios/<portfolio_id>/task_order/<task_order_id>/resend_invite",
methods=["POST"],
)
def resend_invite(portfolio_id, task_order_id, form=None):
@user_can(
Permissions.EDIT_TASK_ORDER_DETAILS, message="resend task order officer invites"
)
def resend_invite(portfolio_id, task_order_id):
invite_type = http_request.args.get("invite_type")
if invite_type not in OFFICER_INVITATIONS:
@ -115,7 +132,7 @@ def resend_invite(portfolio_id, task_order_id, form=None):
invite_type_info = OFFICER_INVITATIONS[invite_type]
task_order = TaskOrders.get(g.current_user, task_order_id)
task_order = TaskOrders.get(task_order_id)
portfolio = Portfolios.get(g.current_user, portfolio_id)
officer = getattr(task_order, invite_type_info["role"])
@ -164,15 +181,16 @@ def resend_invite(portfolio_id, task_order_id, form=None):
@portfolios_bp.route(
"/portfolios/<portfolio_id>/task_order/<task_order_id>/review", methods=["POST"]
)
@user_can(
None, exception=wrap_check_is_ko_or_cor, message="submit contracting officer review"
)
def submit_ko_review(portfolio_id, task_order_id, form=None):
task_order = TaskOrders.get(g.current_user, task_order_id)
task_order = TaskOrders.get(task_order_id)
form_data = {**http_request.form, **http_request.files}
form = KOReviewForm(form_data)
Authorization.check_is_ko_or_cor(g.current_user, task_order)
if form.validate():
TaskOrders.update(user=g.current_user, task_order=task_order, **form.data)
TaskOrders.update(task_order=task_order, **form.data)
if Authorization.is_ko(g.current_user, task_order) and TaskOrders.can_ko_sign(
task_order
):
@ -199,9 +217,12 @@ def submit_ko_review(portfolio_id, task_order_id, form=None):
@portfolios_bp.route(
"/portfolios/<portfolio_id>/task_order/<task_order_id>/invitations"
)
@user_can(
Permissions.EDIT_TASK_ORDER_DETAILS, message="view task order invitations page"
)
def task_order_invitations(portfolio_id, task_order_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
task_order = TaskOrders.get(g.current_user, task_order_id)
task_order = TaskOrders.get(task_order_id)
form = EditTaskOrderOfficersForm(obj=task_order)
if TaskOrders.all_sections_complete(task_order):
@ -219,9 +240,10 @@ def task_order_invitations(portfolio_id, task_order_id):
"/portfolios/<portfolio_id>/task_order/<task_order_id>/invitations",
methods=["POST"],
)
@user_can(Permissions.EDIT_TASK_ORDER_DETAILS, message="edit task order invitations")
def edit_task_order_invitations(portfolio_id, task_order_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
task_order = TaskOrders.get(g.current_user, task_order_id)
task_order = TaskOrders.get(task_order_id)
form = EditTaskOrderOfficersForm(formdata=http_request.form, obj=task_order)
if form.validate():
@ -266,11 +288,17 @@ def so_review_form(task_order):
return DD254Form(data=form_data)
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254")
def so_review(portfolio_id, task_order_id):
task_order = TaskOrders.get(g.current_user, task_order_id)
Authorization.check_is_so(g.current_user, task_order)
def wrap_check_is_so(user, task_order_id=None, **_kwargs):
task_order = TaskOrders.get(task_order_id)
Authorization.check_is_so(user, task_order)
return True
@portfolios_bp.route("/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254")
@user_can(None, exception=wrap_check_is_so, message="view security officer review form")
def so_review(portfolio_id, task_order_id):
task_order = TaskOrders.get(task_order_id)
form = so_review_form(task_order)
return render_template(
@ -284,10 +312,11 @@ def so_review(portfolio_id, task_order_id):
@portfolios_bp.route(
"/portfolios/<portfolio_id>/task_order/<task_order_id>/dd254", methods=["POST"]
)
@user_can(
None, exception=wrap_check_is_so, message="submit security officer review form"
)
def submit_so_review(portfolio_id, task_order_id):
task_order = TaskOrders.get(g.current_user, task_order_id)
Authorization.check_is_so(g.current_user, task_order)
task_order = TaskOrders.get(task_order_id)
form = DD254Form(http_request.form)
if form.validate():

View File

@ -1,15 +1,18 @@
from io import BytesIO
from flask import g, Response, current_app as app
from flask import Response, current_app as app
from . import task_orders_bp
from atst.domain.task_orders import TaskOrders
from atst.domain.exceptions import NotFoundError
from atst.utils.docx import Docx
from atst.domain.authz.decorator import user_can_access_decorator as user_can
from atst.models.permissions import Permissions
@task_orders_bp.route("/task_orders/download_summary/<task_order_id>")
@user_can(Permissions.VIEW_TASK_ORDER_DETAILS, message="download task order summary")
def download_summary(task_order_id):
task_order = TaskOrders.get(g.current_user, task_order_id)
task_order = TaskOrders.get(task_order_id)
byte_str = BytesIO()
Docx.render(byte_str, data=task_order.to_dictionary())
filename = "{}.docx".format(task_order.portfolio_name)
@ -31,8 +34,12 @@ def send_file(attachment):
@task_orders_bp.route("/task_orders/csp_estimate/<task_order_id>")
@user_can(
Permissions.VIEW_TASK_ORDER_DETAILS,
message="download task order cloud service provider estimate",
)
def download_csp_estimate(task_order_id):
task_order = TaskOrders.get(g.current_user, task_order_id)
task_order = TaskOrders.get(task_order_id)
if task_order.csp_estimate:
return send_file(task_order.csp_estimate)
else:
@ -40,8 +47,9 @@ def download_csp_estimate(task_order_id):
@task_orders_bp.route("/task_orders/pdf/<task_order_id>")
@user_can(Permissions.VIEW_TASK_ORDER_DETAILS, message="download task order PDF")
def download_task_order_pdf(task_order_id):
task_order = TaskOrders.get(g.current_user, task_order_id)
task_order = TaskOrders.get(task_order_id)
if task_order.pdf:
return send_file(task_order.pdf)
else:

View File

@ -4,11 +4,14 @@ from . import task_orders_bp
from atst.domain.task_orders import TaskOrders
from atst.utils.flash import formatted_flash as flash
from atst.services.invitation import update_officer_invitations
from atst.domain.authz.decorator import user_can_access_decorator as user_can
from atst.models.permissions import Permissions
@task_orders_bp.route("/task_orders/invite/<task_order_id>", methods=["POST"])
@user_can(Permissions.EDIT_TASK_ORDER_DETAILS, message="invite task order officers")
def invite(task_order_id):
task_order = TaskOrders.get(g.current_user, task_order_id)
task_order = TaskOrders.get(task_order_id)
if TaskOrders.all_sections_complete(task_order):
update_officer_invitations(g.current_user, task_order)

View File

@ -14,6 +14,8 @@ from atst.domain.task_orders import TaskOrders
from atst.domain.portfolios import Portfolios
from atst.utils.flash import formatted_flash as flash
import atst.forms.task_order as task_order_form
from atst.domain.authz.decorator import user_can_access_decorator as user_can
from atst.models.permissions import Permissions
TASK_ORDER_SECTIONS = [
@ -59,7 +61,7 @@ class ShowTaskOrderWorkflow:
@property
def task_order(self):
if not self._task_order and self.task_order_id:
self._task_order = TaskOrders.get(self.user, self.task_order_id)
self._task_order = TaskOrders.get(self.task_order_id)
return self._task_order
@ -228,7 +230,7 @@ class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow):
old_name = self.task_order.portfolio_name
if not new_name == old_name:
Portfolios.update(self.task_order.portfolio, {"name": new_name})
TaskOrders.update(self.user, self.task_order, **self.task_order_form_data)
TaskOrders.update(self.task_order, **self.task_order_form_data)
else:
if self.portfolio_id:
pf = Portfolios.get(self.user, self.portfolio_id)
@ -239,7 +241,7 @@ class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow):
self.form.defense_component.data,
)
self._task_order = TaskOrders.create(portfolio=pf, creator=self.user)
TaskOrders.update(self.user, self.task_order, **self.task_order_form_data)
TaskOrders.update(self.task_order, **self.task_order_form_data)
return self.task_order
@ -249,9 +251,23 @@ def get_started():
return render_template("task_orders/new/get_started.html") # pragma: no cover
def is_new_task_order(*_args, **kwargs):
return (
"screen" in kwargs
and kwargs["screen"] == 1
and "task_order_id" not in kwargs
and "portfolio_id" not in kwargs
)
@task_orders_bp.route("/task_orders/new/<int:screen>")
@task_orders_bp.route("/task_orders/new/<int:screen>/<task_order_id>")
@task_orders_bp.route("/portfolios/<portfolio_id>/task_orders/new/<int:screen>")
@user_can(
Permissions.CREATE_TASK_ORDER,
exception=is_new_task_order,
message="view new task order form",
)
def new(screen, task_order_id=None, portfolio_id=None):
workflow = ShowTaskOrderWorkflow(
g.current_user, screen, task_order_id, portfolio_id
@ -298,6 +314,11 @@ def new(screen, task_order_id=None, portfolio_id=None):
@task_orders_bp.route(
"/portfolios/<portfolio_id>/task_orders/new/<int:screen>", methods=["POST"]
)
@user_can(
Permissions.CREATE_TASK_ORDER,
exception=is_new_task_order,
message="update task order",
)
def update(screen, task_order_id=None, portfolio_id=None):
form_data = {**http_request.form, **http_request.files}
workflow = UpdateTaskOrderWorkflow(

View File

@ -8,11 +8,11 @@ from atst.domain.exceptions import NoAccessError
from atst.domain.task_orders import TaskOrders
from atst.forms.task_order import SignatureForm
from atst.utils.flash import formatted_flash as flash
from atst.domain.authz.decorator import user_can_access_decorator as user_can
def find_unsigned_ko_to(task_order_id):
task_order = TaskOrders.get(g.current_user, task_order_id)
Authorization.check_is_ko(g.current_user, task_order)
task_order = TaskOrders.get(task_order_id)
if not TaskOrders.can_ko_sign(task_order):
raise NoAccessError("task_order")
@ -20,7 +20,17 @@ def find_unsigned_ko_to(task_order_id):
return task_order
def wrap_check_is_ko(user, task_order_id=None, **_kwargs):
task_order = TaskOrders.get(task_order_id)
Authorization.check_is_ko(user, task_order)
return True
@task_orders_bp.route("/task_orders/<task_order_id>/digital_signature", methods=["GET"])
@user_can(
None, exception=wrap_check_is_ko, message="view contracting officer signature page"
)
def signature_requested(task_order_id):
task_order = find_unsigned_ko_to(task_order_id)
@ -35,6 +45,9 @@ def signature_requested(task_order_id):
@task_orders_bp.route(
"/task_orders/<task_order_id>/digital_signature", methods=["POST"]
)
@user_can(
None, exception=wrap_check_is_ko, message="submit contracting officer signature"
)
def record_signature(task_order_id):
task_order = find_unsigned_ko_to(task_order_id)
@ -49,7 +62,6 @@ def record_signature(task_order_id):
if form.validate():
TaskOrders.update(
user=g.current_user,
task_order=task_order,
signer_dod_id=g.current_user.dod_id,
signed_at=datetime.datetime.now(),

View File

@ -33,7 +33,7 @@ def update_officer_invitations(user, task_order):
):
officer_data = task_order.officer_dictionary(invite_opts["role"])
officer = TaskOrders.add_officer(
user, task_order, invite_opts["role"], officer_data
task_order, invite_opts["role"], officer_data
)
pf_officer_member = PortfolioRoles.get(task_order.portfolio.id, officer.id)
invite_service = Invitation(

View File

@ -68,7 +68,7 @@ def get_users():
def add_members_to_portfolio(portfolio):
for portfolio_role in PORTFOLIO_USERS:
ws_role = Portfolios.create_member(portfolio.owner, portfolio, portfolio_role)
ws_role = Portfolios.create_member(portfolio, portfolio_role)
db.session.refresh(ws_role)
PortfolioRoles.enable(ws_role)
@ -114,7 +114,6 @@ def create_task_order(portfolio, start, end, clin_01=None, clin_03=None):
def add_applications_to_portfolio(portfolio, applications):
for application in applications:
Applications.create(
portfolio.owner,
portfolio=portfolio,
name=application["name"],
description=application["description"],

View File

@ -14,6 +14,7 @@
<div class="panel">
<div class="panel__content">
{% if user_can(permissions.VIEW_PORTFOLIO_NAME) %}
<form method="POST" action="{{ url_for('portfolios.edit_portfolio', portfolio_id=portfolio.id) }}" autocomplete="false">
{{ form.csrf_token }}
<div class='form-row'>
@ -36,14 +37,21 @@
</div>
</div>
</form>
{% endif %}
</div>
</div>
{% if user_can(permissions.VIEW_PORTFOLIO_POC) %}
{% include "fragments/primary_point_of_contact.html" %}
{% endif %}
{% if user_can(permissions.VIEW_PORTFOLIO_USERS) %}
{% include "fragments/admin/portfolio_members.html" %}
{% include "fragments/audit_events_log.html" %}
{% endif %}
{% if user_can(permissions.VIEW_PORTFOLIO_ACTIVITY_LOG) %}
{% include "fragments/audit_events_log.html" %}
{{ Pagination(audit_events, 'portfolios.portfolio_admin', portfolio_id=portfolio.id) }}
{% endif %}
</div>
{% endblock %}

View File

@ -14,6 +14,7 @@ from atst.domain.authnid.crl import (
)
from tests.mocks import FIXTURE_EMAIL_ADDRESS, DOD_CN
from tests.utils import FakeLogger
class MockX509Store:
@ -119,20 +120,6 @@ def test_multistep_certificate_chain():
assert cache.crl_check(cert)
class FakeLogger:
def __init__(self):
self.messages = []
def info(self, msg):
self.messages.append(msg)
def warning(self, msg):
self.messages.append(msg)
def error(self, msg):
self.messages.append(msg)
def test_no_op_crl_cache_logs_common_name():
logger = FakeLogger()
cert = open("ssl/client-certs/atat.mil.crt", "rb").read()

View File

@ -6,7 +6,7 @@ from atst.domain.portfolios import Portfolios
def test_create_application_with_multiple_environments():
portfolio = PortfolioFactory.create()
application = Applications.create(
portfolio.owner, portfolio, "My Test Application", "Test", ["dev", "prod"]
portfolio, "My Test Application", "Test", ["dev", "prod"]
)
assert application.portfolio == portfolio
@ -21,7 +21,7 @@ def test_portfolio_owner_can_view_environments():
owner=owner,
applications=[{"environments": [{"name": "dev"}, {"name": "prod"}]}],
)
application = Applications.get(owner, portfolio, portfolio.applications[0].id)
application = Applications.get(portfolio.applications[0].id)
assert len(application.environments) == 2
@ -38,11 +38,9 @@ def test_can_only_update_name_and_description():
}
],
)
application = Applications.get(owner, portfolio, portfolio.applications[0].id)
application = Applications.get(portfolio.applications[0].id)
env_name = application.environments[0].name
Applications.update(
owner,
portfolio,
application,
{
"name": "New Name",

View File

@ -22,54 +22,15 @@ def developer():
return UserFactory.create()
def test_non_admin_cannot_view_audit_log(developer):
with pytest.raises(UnauthorizedError):
AuditLog.get_all_events(developer)
def test_ccpo_can_view_audit_log(ccpo):
events = AuditLog.get_all_events(ccpo)
assert len(events) > 0
def test_paginate_audit_log(ccpo):
def test_paginate_audit_log():
user = UserFactory.create()
for _ in range(100):
AuditLog.log_system_event(user, action="create")
events = AuditLog.get_all_events(ccpo, pagination_opts={"per_page": 25, "page": 2})
events = AuditLog.get_all_events(pagination_opts={"per_page": 25, "page": 2})
assert len(events) == 25
def test_ccpo_can_view_ws_audit_log(ccpo):
portfolio = PortfolioFactory.create()
events = AuditLog.get_portfolio_events(ccpo, portfolio)
assert len(events) > 0
def test_ws_admin_can_view_ws_audit_log():
portfolio = PortfolioFactory.create()
admin = UserFactory.create()
PortfolioRoleFactory.create(
portfolio=portfolio, user=admin, status=PortfolioRoleStatus.ACTIVE
)
events = AuditLog.get_portfolio_events(admin, portfolio)
assert len(events) > 0
def test_ws_owner_can_view_ws_audit_log():
portfolio = PortfolioFactory.create()
events = AuditLog.get_portfolio_events(portfolio.owner, portfolio)
assert len(events) > 0
def test_other_users_cannot_view_portfolio_audit_log():
with pytest.raises(UnauthorizedError):
portfolio = PortfolioFactory.create()
dev = UserFactory.create()
AuditLog.get_portfolio_events(dev, portfolio)
def test_paginate_ws_audit_log():
portfolio = PortfolioFactory.create()
application = ApplicationFactory.create(portfolio=portfolio)
@ -79,7 +40,7 @@ def test_paginate_ws_audit_log():
)
events = AuditLog.get_portfolio_events(
portfolio.owner, portfolio, pagination_opts={"per_page": 25, "page": 2}
portfolio, pagination_opts={"per_page": 25, "page": 2}
)
assert len(events) == 25
@ -92,7 +53,7 @@ def test_ws_audit_log_only_includes_current_ws_events():
application_1 = ApplicationFactory.create(portfolio=portfolio)
application_2 = ApplicationFactory.create(portfolio=other_portfolio)
events = AuditLog.get_portfolio_events(portfolio.owner, portfolio)
events = AuditLog.get_portfolio_events(portfolio)
for event in events:
assert event.portfolio_id == portfolio.id or event.resource_id == portfolio.id
assert (

View File

@ -1,11 +1,19 @@
import pytest
from tests.factories import TaskOrderFactory, UserFactory, PortfolioRoleFactory
from atst.domain.authz import Authorization
from tests.factories import (
TaskOrderFactory,
UserFactory,
PortfolioFactory,
PortfolioRoleFactory,
)
from atst.domain.authz import Authorization, user_can_access
from atst.domain.authz.decorator import user_can_access_decorator
from atst.domain.permission_sets import PermissionSets
from atst.domain.exceptions import UnauthorizedError
from atst.models.permissions import Permissions
from tests.utils import FakeLogger
@pytest.fixture
def invalid_user():
@ -58,3 +66,137 @@ def test_has_portfolio_permission():
assert not Authorization.has_portfolio_permission(
different_user, port_role.portfolio, Permissions.VIEW_PORTFOLIO_REPORTS
)
def test_user_can_access():
ccpo = UserFactory.create_ccpo()
edit_admin = UserFactory.create()
view_admin = UserFactory.create()
portfolio = PortfolioFactory.create(owner=edit_admin)
# factory gives view perms by default
PortfolioRoleFactory.create(user=view_admin, portfolio=portfolio)
# check a site-wide permission
assert user_can_access(ccpo, Permissions.VIEW_AUDIT_LOG)
with pytest.raises(UnauthorizedError):
user_can_access(edit_admin, Permissions.VIEW_AUDIT_LOG)
with pytest.raises(UnauthorizedError):
user_can_access(view_admin, Permissions.VIEW_AUDIT_LOG)
# check a portfolio view permission
assert user_can_access(ccpo, Permissions.VIEW_PORTFOLIO, portfolio=portfolio)
assert user_can_access(edit_admin, Permissions.VIEW_PORTFOLIO, portfolio=portfolio)
assert user_can_access(view_admin, Permissions.VIEW_PORTFOLIO, portfolio=portfolio)
# check a portfolio edit permission
assert user_can_access(ccpo, Permissions.EDIT_PORTFOLIO_NAME, portfolio=portfolio)
assert user_can_access(
edit_admin, Permissions.EDIT_PORTFOLIO_NAME, portfolio=portfolio
)
with pytest.raises(UnauthorizedError):
user_can_access(
view_admin, Permissions.EDIT_PORTFOLIO_NAME, portfolio=portfolio
)
@pytest.fixture
def set_current_user(request_ctx):
def _set_current_user(user):
request_ctx.g.current_user = user
yield _set_current_user
request_ctx.g.current_user = None
def test_user_can_access_decorator(set_current_user):
ccpo = UserFactory.create_ccpo()
edit_admin = UserFactory.create()
view_admin = UserFactory.create()
portfolio = PortfolioFactory.create(owner=edit_admin)
# factory gives view perms by default
PortfolioRoleFactory.create(user=view_admin, portfolio=portfolio)
@user_can_access_decorator(Permissions.EDIT_PORTFOLIO_NAME)
def _edit_portfolio_name(*args, **kwargs):
return True
set_current_user(ccpo)
assert _edit_portfolio_name(portfolio_id=portfolio.id)
set_current_user(edit_admin)
assert _edit_portfolio_name(portfolio_id=portfolio.id)
set_current_user(view_admin)
with pytest.raises(UnauthorizedError):
_edit_portfolio_name(portfolio_id=portfolio.id)
def test_user_can_access_decorator_exceptions(set_current_user):
rando_calrissian = UserFactory.create()
darth_vader = UserFactory.create()
portfolio = PortfolioFactory.create()
def _can_fly_the_millenium_falcon(u, *args, **kwargs):
if u == rando_calrissian:
return True
else:
raise UnauthorizedError(u, "is not rando")
@user_can_access_decorator(
Permissions.EDIT_PORTFOLIO_NAME, exception=_can_fly_the_millenium_falcon
)
def _cloud_city(*args, **kwargs):
return True
set_current_user(rando_calrissian)
assert _cloud_city()
set_current_user(darth_vader)
with pytest.raises(UnauthorizedError):
assert _cloud_city()
@pytest.fixture
def mock_logger(app):
real_logger = app.logger
app.logger = FakeLogger()
yield app.logger
app.logger = real_logger
def test_user_can_access_decorator_logs_access(
set_current_user, monkeypatch, mock_logger
):
user = UserFactory.create()
@user_can_access_decorator(Permissions.EDIT_PORTFOLIO_NAME)
def _do_something(*args, **kwargs):
return True
set_current_user(user)
monkeypatch.setattr(
"atst.domain.authz.decorator.check_access", lambda *a, **k: True
)
_do_something()
assert len(mock_logger.messages) == 1
assert "accessed" in mock_logger.messages[0]
assert "GET" in mock_logger.messages[0]
def _unauthorized(*a, **k):
raise UnauthorizedError(user, "do something")
monkeypatch.setattr("atst.domain.authz.decorator.check_access", _unauthorized)
with pytest.raises(UnauthorizedError):
_do_something()
assert len(mock_logger.messages) == 2
assert "denied access" in mock_logger.messages[1]
assert "GET" in mock_logger.messages[1]

View File

@ -29,9 +29,7 @@ def test_create_environment_role_creates_cloud_id(session):
portfolio_role = portfolio.members[0]
assert not portfolio_role.user.cloud_id
assert Environments.update_environment_roles(
owner, portfolio, portfolio_role, new_role
)
assert Environments.update_environment_roles(portfolio_role, new_role)
assert portfolio_role.user.cloud_id is not None
@ -69,9 +67,7 @@ def test_update_environment_roles():
]
portfolio_role = portfolio.members[0]
assert Environments.update_environment_roles(
owner, portfolio, portfolio_role, new_ids_and_roles
)
assert Environments.update_environment_roles(portfolio_role, new_ids_and_roles)
new_dev_env_role = EnvironmentRoles.get(portfolio_role.user.id, dev_env.id)
staging_env_role = EnvironmentRoles.get(portfolio_role.user.id, staging_env.id)
@ -120,9 +116,7 @@ def test_remove_environment_role():
]
portfolio_role = PortfolioRoles.get(portfolio.id, developer.id)
assert Environments.update_environment_roles(
owner, portfolio, portfolio_role, new_environment_roles
)
assert Environments.update_environment_roles(portfolio_role, new_environment_roles)
assert portfolio_role.num_environment_roles == 2
assert EnvironmentRoles.get(developer.id, now_ba).role == "billing_auditor"
@ -154,9 +148,7 @@ def test_no_update_to_environment_roles():
new_ids_and_roles = [{"id": dev_env.id, "role": "devops"}]
portfolio_role = PortfolioRoles.get(portfolio.id, developer.id)
assert not Environments.update_environment_roles(
owner, portfolio, portfolio_role, new_ids_and_roles
)
assert not Environments.update_environment_roles(portfolio_role, new_ids_and_roles)
def test_get_scoped_environments(db):

View File

@ -130,7 +130,7 @@ def test_resend_invitation():
user = UserFactory.create()
ws_role = PortfolioRoleFactory.create(user=user, portfolio=portfolio)
invite = Invitations.create(portfolio.owner, ws_role, user.email)
Invitations.resend(portfolio.owner, portfolio.id, invite.token)
Invitations.resend(user, invite.token)
assert ws_role.invitations[0].is_revoked
assert ws_role.invitations[1].is_pending

View File

@ -46,24 +46,6 @@ def test_portfolio_has_timestamps(portfolio):
assert portfolio.time_created == portfolio.time_updated
def test_portfolios_get_ensures_user_is_in_portfolio(portfolio, portfolio_owner):
outside_user = UserFactory.create()
with pytest.raises(UnauthorizedError):
Portfolios.get(outside_user, portfolio.id)
def test_get_for_update_applications_allows_owner(portfolio, portfolio_owner):
Portfolios.get_for_update_applications(portfolio_owner, portfolio.id)
def test_get_for_update_applications_blocks_developer(portfolio):
developer = UserFactory.create()
PortfolioRoles.add(developer, portfolio.id)
with pytest.raises(UnauthorizedError):
Portfolios.get_for_update_applications(developer, portfolio.id)
def test_can_create_portfolio_role(portfolio, portfolio_owner):
user_data = {
"first_name": "New",
@ -73,7 +55,7 @@ def test_can_create_portfolio_role(portfolio, portfolio_owner):
"dod_id": "1234567890",
}
new_member = Portfolios.create_member(portfolio_owner, portfolio, user_data)
new_member = Portfolios.create_member(portfolio, user_data)
assert new_member.portfolio == portfolio
assert new_member.user.provisional
@ -88,27 +70,12 @@ def test_can_add_existing_user_to_portfolio(portfolio, portfolio_owner):
"dod_id": user.dod_id,
}
new_member = Portfolios.create_member(portfolio_owner, portfolio, user_data)
new_member = Portfolios.create_member(portfolio, user_data)
assert new_member.portfolio == portfolio
assert new_member.user.email == user.email
assert not new_member.user.provisional
def test_need_permission_to_create_portfolio_role(portfolio, portfolio_owner):
random_user = UserFactory.create()
user_data = {
"first_name": "New",
"last_name": "User",
"email": "new.user@mail.com",
"portfolio_role": "developer",
"dod_id": "1234567890",
}
with pytest.raises(UnauthorizedError):
Portfolios.create_member(random_user, portfolio, user_data)
def test_update_portfolio_role_role(portfolio, portfolio_owner):
user_data = {
"first_name": "New",
@ -121,53 +88,13 @@ def test_update_portfolio_role_role(portfolio, portfolio_owner):
member = PortfolioRoleFactory.create(portfolio=portfolio)
permission_sets = [PermissionSets.EDIT_PORTFOLIO_FUNDING]
updated_member = Portfolios.update_member(
portfolio_owner, portfolio, member, permission_sets=permission_sets
)
updated_member = Portfolios.update_member(member, permission_sets=permission_sets)
assert updated_member.portfolio == portfolio
def test_need_permission_to_update_portfolio_role_role(portfolio, portfolio_owner):
random_user = UserFactory.create()
user_data = {
"first_name": "New",
"last_name": "User",
"email": "new.user@mail.com",
"portfolio_role": "developer",
"dod_id": "1234567890",
}
member = Portfolios.create_member(portfolio_owner, portfolio, user_data)
role_name = "developer"
with pytest.raises(UnauthorizedError):
Portfolios.update_member(random_user, portfolio, member, role_name)
def test_owner_can_view_portfolio_members(portfolio, portfolio_owner):
portfolio = Portfolios.get_with_members(portfolio_owner, portfolio.id)
assert portfolio
def test_ccpo_can_view_portfolio_members(portfolio, portfolio_owner):
ccpo = UserFactory.create_ccpo()
assert Portfolios.get_with_members(ccpo, portfolio.id)
def test_random_user_cannot_view_portfolio_members(portfolio):
developer = UserFactory.create()
with pytest.raises(UnauthorizedError):
portfolio = Portfolios.get_with_members(developer, portfolio.id)
def test_scoped_portfolio_for_admin_missing_view_apps_perms(portfolio_owner, portfolio):
Applications.create(
portfolio_owner,
portfolio,
"My Application 2",
"My application 2",
["dev", "staging", "prod"],
portfolio, "My Application 2", "My application 2", ["dev", "staging", "prod"]
)
restricted_admin = UserFactory.create()
PortfolioRoleFactory.create(
@ -186,18 +113,10 @@ def test_scoped_portfolio_only_returns_a_users_applications_and_environments(
portfolio, portfolio_owner
):
new_application = Applications.create(
portfolio_owner,
portfolio,
"My Application",
"My application",
["dev", "staging", "prod"],
portfolio, "My Application", "My application", ["dev", "staging", "prod"]
)
Applications.create(
portfolio_owner,
portfolio,
"My Application 2",
"My application 2",
["dev", "staging", "prod"],
portfolio, "My Application 2", "My application 2", ["dev", "staging", "prod"]
)
developer = UserFactory.create()
dev_environment = Environments.add_member(
@ -217,11 +136,7 @@ def test_scoped_portfolio_returns_all_applications_for_portfolio_admin(
):
for _ in range(5):
Applications.create(
portfolio_owner,
portfolio,
"My Application",
"My application",
["dev", "staging", "prod"],
portfolio, "My Application", "My application", ["dev", "staging", "prod"]
)
admin = UserFactory.create()
@ -240,11 +155,7 @@ def test_scoped_portfolio_returns_all_applications_for_portfolio_owner(
):
for _ in range(5):
Applications.create(
portfolio_owner,
portfolio,
"My Application",
"My application",
["dev", "staging", "prod"],
portfolio, "My Application", "My application", ["dev", "staging", "prod"]
)
scoped_portfolio = Portfolios.get(portfolio_owner, portfolio.id)
@ -282,27 +193,6 @@ def test_for_user_returns_all_portfolios_for_ccpo(portfolio, portfolio_owner):
assert len(sams_portfolios) == 2
def test_get_for_update_information(portfolio, portfolio_owner):
owner_ws = Portfolios.get_for_update_information(portfolio_owner, portfolio.id)
assert portfolio == owner_ws
admin = UserFactory.create()
perm_sets = get_all_portfolio_permission_sets()
PortfolioRoleFactory.create(
user=admin, portfolio=portfolio, permission_sets=perm_sets
)
admin_ws = Portfolios.get_for_update_information(admin, portfolio.id)
assert portfolio == admin_ws
# TODO: implement ccpo roles
# ccpo = UserFactory.create_ccpo()
# assert Portfolios.get_for_update_information(ccpo, portfolio.id)
developer = UserFactory.create()
with pytest.raises(UnauthorizedError):
Portfolios.get_for_update_information(developer, portfolio.id)
def test_can_create_portfolios_with_matching_names():
portfolio_name = "Great Portfolio"
PortfolioFactory.create(name=portfolio_name)
@ -314,7 +204,7 @@ def test_able_to_revoke_portfolio_access_for_active_member():
portfolio_role = PortfolioRoleFactory.create(
portfolio=portfolio, status=PortfolioRoleStatus.ACTIVE
)
Portfolios.revoke_access(portfolio.owner, portfolio.id, portfolio_role.id)
Portfolios.revoke_access(portfolio.id, portfolio_role.id)
assert Portfolios.for_user(portfolio_role.user) == []
@ -334,7 +224,7 @@ def test_unable_to_revoke_owner_portfolio_access():
owner_portfolio_role = portfolio.roles[0]
with pytest.raises(PortfolioError):
Portfolios.revoke_access(portfolio.owner, portfolio.id, owner_portfolio_role.id)
Portfolios.revoke_access(portfolio.id, owner_portfolio_role.id)
def test_disabled_members_dont_show_up(session):

View File

@ -21,7 +21,7 @@ def test_is_signed_by_ko():
assert not TaskOrders.is_signed_by_ko(task_order)
TaskOrders.update(user, task_order, signer_dod_id=user.dod_id)
TaskOrders.update(task_order, signer_dod_id=user.dod_id)
assert TaskOrders.is_signed_by_ko(task_order)
@ -68,7 +68,7 @@ def test_add_officer():
task_order = TaskOrderFactory.create()
ko = UserFactory.create()
owner = task_order.portfolio.owner
TaskOrders.add_officer(owner, task_order, "contracting_officer", ko.to_dictionary())
TaskOrders.add_officer(task_order, "contracting_officer", ko.to_dictionary())
assert task_order.contracting_officer == ko
portfolio_users = [ws_role.user for ws_role in task_order.portfolio.members]
@ -80,62 +80,19 @@ def test_add_officer_with_nonexistent_role():
ko = UserFactory.create()
owner = task_order.portfolio.owner
with pytest.raises(TaskOrderError):
TaskOrders.add_officer(owner, task_order, "pilot", ko.to_dictionary())
TaskOrders.add_officer(task_order, "pilot", ko.to_dictionary())
def test_add_officer_who_is_already_portfolio_member():
task_order = TaskOrderFactory.create()
owner = task_order.portfolio.owner
TaskOrders.add_officer(
owner, task_order, "contracting_officer", owner.to_dictionary()
)
TaskOrders.add_officer(task_order, "contracting_officer", owner.to_dictionary())
assert task_order.contracting_officer == owner
member = task_order.portfolio.members[0]
assert member.user == owner
def test_task_order_access():
creator = UserFactory.create()
member = UserFactory.create()
rando = UserFactory.create()
officer = UserFactory.create()
def check_access(can, cannot, method_name, method_args):
method = getattr(TaskOrders, method_name)
for user in can:
assert method(user, *method_args)
for user in cannot:
with pytest.raises(UnauthorizedError):
method(user, *method_args)
portfolio = PortfolioFactory.create(owner=creator)
task_order = TaskOrderFactory.create(creator=creator, portfolio=portfolio)
PortfolioRoleFactory.create(
user=member,
portfolio=task_order.portfolio,
permission_sets=[
PermissionSets.get(prms)
for prms in PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS
],
)
TaskOrders.add_officer(
creator, task_order, "contracting_officer", officer.to_dictionary()
)
check_access([creator, officer, member], [rando], "get", [task_order.id])
check_access([creator, officer], [member, rando], "create", [portfolio])
check_access([creator, officer], [member, rando], "update", [task_order])
check_access(
[creator, officer],
[member, rando],
"add_officer",
[task_order, "contracting_officer", UserFactory.dictionary()],
)
def test_dd254_complete():
finished = DD254Factory.create()
unfinished = DD254Factory.create(certifying_official=None)

View File

@ -9,11 +9,7 @@ def test_add_user_to_environment():
portfolio = PortfolioFactory.create(owner=owner)
application = Applications.create(
owner,
portfolio,
"my test application",
"It's mine.",
["dev", "staging", "prod"],
portfolio, "my test application", "It's mine.", ["dev", "staging", "prod"]
)
dev_environment = application.environments[0]

View File

@ -120,7 +120,7 @@ def test_has_env_role_history(session):
user=user, environment=environment, role="developer"
)
Environments.update_environment_roles(
owner, portfolio, portfolio_role, [{"role": "admin", "id": environment.id}]
portfolio_role, [{"role": "admin", "id": environment.id}]
)
changed_events = (
session.query(AuditEvent)
@ -154,7 +154,7 @@ def test_has_no_environment_roles():
}
portfolio = PortfolioFactory.create(owner=owner)
portfolio_role = Portfolios.create_member(owner, portfolio, developer_data)
portfolio_role = Portfolios.create_member(portfolio, developer_data)
assert not portfolio_role.has_environment_roles
@ -170,13 +170,9 @@ def test_has_environment_roles():
}
portfolio = PortfolioFactory.create(owner=owner)
portfolio_role = Portfolios.create_member(owner, portfolio, developer_data)
portfolio_role = Portfolios.create_member(portfolio, developer_data)
application = Applications.create(
owner,
portfolio,
"my test application",
"It's mine.",
["dev", "staging", "prod"],
portfolio, "my test application", "It's mine.", ["dev", "staging", "prod"]
)
Environments.add_member(
application.environments[0], portfolio_role.user, "developer"

View File

@ -39,54 +39,6 @@ def test_user_without_permission_has_no_budget_report_link(client, user_session)
)
@pytest.mark.skip(reason="Temporarily no add activity log link")
def test_user_with_permission_has_activity_log_link(client, user_session):
portfolio = PortfolioFactory.create()
ccpo = UserFactory.create_ccpo()
admin = UserFactory.create()
PortfolioRoleFactory.create(
portfolio=portfolio, user=admin, status=PortfolioRoleStatus.ACTIVE
)
user_session(portfolio.owner)
response = client.get("/portfolios/{}/applications".format(portfolio.id))
assert (
'href="/portfolios/{}/activity"'.format(portfolio.id).encode() in response.data
)
# logs out previous user before creating a new session
user_session(admin)
response = client.get("/portfolios/{}/applications".format(portfolio.id))
assert (
'href="/portfolios/{}/activity"'.format(portfolio.id).encode() in response.data
)
user_session(ccpo)
response = client.get("/portfolios/{}/applications".format(portfolio.id))
assert (
'href="/portfolios/{}/activity"'.format(portfolio.id).encode() in response.data
)
@pytest.mark.skip(reason="Temporarily no add activity log link")
def test_user_without_permission_has_no_activity_log_link(client, user_session):
portfolio = PortfolioFactory.create()
developer = UserFactory.create()
PortfolioRoleFactory.create(
portfolio=portfolio,
user=developer,
role=Roles.get("developer"),
status=PortfolioRoleStatus.ACTIVE,
)
user_session(developer)
response = client.get("/portfolios/{}/applications".format(portfolio.id))
assert (
'href="/portfolios/{}/activity"'.format(portfolio.id).encode()
not in response.data
)
def test_user_with_permission_has_add_application_link(client, user_session):
portfolio = PortfolioFactory.create()
user_session(portfolio.owner)
@ -130,7 +82,6 @@ def test_creating_application(client, user_session):
def test_view_edit_application(client, user_session):
portfolio = PortfolioFactory.create()
application = Applications.create(
portfolio.owner,
portfolio,
"Snazzy Application",
"A new application for me and my friends",

View File

@ -1,3 +1,4 @@
import pytest
import datetime
from flask import url_for

View File

@ -168,7 +168,6 @@ def test_update_member_environment_role(client, user_session):
user = UserFactory.create()
member = PortfolioRoles.add(user, portfolio.id)
application = Applications.create(
portfolio.owner,
portfolio,
"Snazzy Application",
"A new application for me and my friends",
@ -202,7 +201,6 @@ def test_update_member_environment_role_with_no_data(client, user_session):
user = UserFactory.create()
member = PortfolioRoles.add(user, portfolio.id)
application = Applications.create(
portfolio.owner,
portfolio,
"Snazzy Application",
"A new application for me and my friends",
@ -231,7 +229,6 @@ def test_revoke_active_member_access(client, user_session):
portfolio=portfolio, user=user, status=PortfolioRoleStatus.ACTIVE
)
Applications.create(
portfolio.owner,
portfolio,
"Snazzy Application",
"A new application for me and my friends",

View File

@ -157,7 +157,7 @@ class TestTaskOrderInvitations:
"security_officer-last_name": "Fett",
},
)
updated_task_order = TaskOrders.get(self.portfolio.owner, self.task_order.id)
updated_task_order = TaskOrders.get(self.task_order.id)
assert updated_task_order.ko_first_name == "Luke"
assert updated_task_order.ko_last_name == "Skywalker"
assert updated_task_order.so_first_name == "Boba"
@ -189,7 +189,7 @@ class TestTaskOrderInvitations:
"contracting_officer-invite": "y",
},
)
updated_task_order = TaskOrders.get(self.portfolio.owner, self.task_order.id)
updated_task_order = TaskOrders.get(self.task_order.id)
assert updated_task_order.ko_invite == True
assert updated_task_order.ko_first_name == "Luke"
@ -222,7 +222,7 @@ class TestTaskOrderInvitations:
assert "There were some errors" in response.data.decode()
updated_task_order = TaskOrders.get(self.portfolio.owner, self.task_order.id)
updated_task_order = TaskOrders.get(self.task_order.id)
assert updated_task_order.so_first_name != "Boba"
assert len(queue.get_queue()) == queue_length
assert response.status_code == 400
@ -251,7 +251,7 @@ def test_ko_can_view_task_order(client, user_session, portfolio, user):
assert response.status_code == 200
assert translate("common.manage") in response.data.decode()
TaskOrders.update(user, task_order, clin_01=None)
TaskOrders.update(task_order, clin_01=None)
response = client.get(
url_for(
"portfolios.view_task_order",
@ -371,27 +371,6 @@ def test_mo_redirected_to_build_page(client, user_session, portfolio):
assert response.status_code == 200
def test_cor_redirected_to_build_page(client, user_session, portfolio):
cor = UserFactory.create()
PortfolioRoleFactory.create(
portfolio=portfolio,
user=cor,
status=PortfolioStatus.ACTIVE,
permission_sets=[
PermissionSets.get(PermissionSets.VIEW_PORTFOLIO),
PermissionSets.get(PermissionSets.VIEW_PORTFOLIO_FUNDING),
],
)
task_order = TaskOrderFactory.create(
portfolio=portfolio, contracting_officer_representative=cor
)
user_session(cor)
response = client.get(
url_for("task_orders.new", screen=1, task_order_id=task_order.id)
)
assert response.status_code == 200
def test_submit_completed_ko_review_page_as_cor(
client, user_session, pdf_upload, portfolio, user
):
@ -620,47 +599,6 @@ def test_resend_invite_when_officer_type_missing(
assert len(queue.get_queue()) == queue_length
def test_resend_invite_when_ko(app, client, user_session, portfolio, user):
queue_length = len(queue.get_queue())
task_order = TaskOrderFactory.create(
portfolio=portfolio, contracting_officer=user, ko_invite=True
)
portfolio_role = PortfolioRoleFactory.create(
portfolio=portfolio, user=user, status=PortfolioStatus.ACTIVE
)
original_invitation = Invitations.create(
inviter=user, portfolio_role=portfolio_role, email=user.email
)
user_session(user)
response = client.post(
url_for(
"portfolios.resend_invite",
portfolio_id=portfolio.id,
task_order_id=task_order.id,
invite_type="ko_invite",
_external=True,
)
)
assert original_invitation.status == InvitationStatus.REVOKED
assert response.status_code == 302
assert (
url_for(
"portfolios.task_order_invitations",
portfolio_id=portfolio.id,
task_order_id=task_order.id,
_external=True,
)
== response.headers["Location"]
)
assert len(queue.get_queue()) == queue_length + 1
def test_resend_invite_when_not_pending(app, client, user_session, portfolio, user):
queue_length = len(queue.get_queue())
@ -726,20 +664,21 @@ def test_resending_revoked_invite(app, client, user_session, portfolio, user):
assert response.status_code == 404
def test_resending_expired_invite(app, client, user_session, portfolio, user):
def test_resending_expired_invite(app, client, user_session, portfolio):
queue_length = len(queue.get_queue())
ko = UserFactory.create()
task_order = TaskOrderFactory.create(
portfolio=portfolio, contracting_officer=user, ko_invite=True
portfolio=portfolio, contracting_officer=ko, ko_invite=True
)
portfolio_role = PortfolioRoleFactory.create(portfolio=portfolio, user=user)
portfolio_role = PortfolioRoleFactory.create(portfolio=portfolio, user=ko)
invite = InvitationFactory.create(
inviter=user,
inviter=portfolio.owner,
portfolio_role=portfolio_role,
email=user.email,
email=ko.email,
expiration_time=datetime.now() - timedelta(days=1),
)
user_session(user)
user_session(portfolio.owner)
response = client.post(
url_for(

View File

@ -1,3 +1,4 @@
import pytest
from flask import url_for
from io import BytesIO
import re

View File

@ -2,11 +2,17 @@ import pytest
from flask import url_for
from atst.domain.task_orders import TaskOrders
from atst.domain.permission_sets import PermissionSets
from atst.models.attachment import Attachment
from atst.routes.task_orders.new import ShowTaskOrderWorkflow, UpdateTaskOrderWorkflow
from atst.utils.localization import translate
from tests.factories import UserFactory, TaskOrderFactory, PortfolioFactory
from tests.factories import (
UserFactory,
TaskOrderFactory,
PortfolioFactory,
PortfolioRoleFactory,
)
class TestShowTaskOrderWorkflow:
@ -93,8 +99,6 @@ def test_to_on_pf_cannot_edit_pf_attributes():
assert second_workflow.pf_attributes_read_only
# TODO: this test will need to be more complicated when we add validation to
# the forms
def test_create_new_task_order(client, user_session, pdf_upload):
creator = UserFactory.create()
user_session(creator)
@ -114,7 +118,7 @@ def test_create_new_task_order(client, user_session, pdf_upload):
assert url_for("task_orders.new", screen=2) in response.headers["Location"]
created_task_order_id = response.headers["Location"].split("/")[-1]
created_task_order = TaskOrders.get(creator, created_task_order_id)
created_task_order = TaskOrders.get(created_task_order_id)
assert created_task_order.portfolio is not None
assert created_task_order.portfolio.name == portfolio_name
assert created_task_order.portfolio.defense_component == defense_component
@ -152,7 +156,7 @@ def test_create_new_task_order_for_portfolio(client, user_session):
assert url_for("task_orders.new", screen=2) in response.headers["Location"]
created_task_order_id = response.headers["Location"].split("/")[-1]
created_task_order = TaskOrders.get(creator, created_task_order_id)
created_task_order = TaskOrders.get(created_task_order_id)
assert created_task_order.portfolio_name == portfolio.name
assert created_task_order.defense_component == portfolio.defense_component
assert created_task_order.portfolio == portfolio
@ -209,7 +213,7 @@ def test_review_screen_when_all_sections_complete(client, user_session, task_ord
def test_review_screen_when_not_all_sections_complete(client, user_session, task_order):
TaskOrders.update(task_order.creator, task_order, clin_01=None)
TaskOrders.update(task_order, clin_01=None)
user_session(task_order.creator)
response = client.get(
url_for("task_orders.new", screen=4, task_order_id=task_order.id)
@ -296,6 +300,11 @@ def test_update_task_order_with_existing_task_order(task_order):
def test_update_to_redirects_to_ko_review(client, user_session, task_order):
ko = UserFactory.create()
task_order.contracting_officer = ko
PortfolioRoleFactory.create(
user=ko,
portfolio=task_order.portfolio,
permission_sets=[PermissionSets.get(PermissionSets.EDIT_PORTFOLIO_FUNDING)],
)
user_session(ko)
url = url_for(
"portfolios.ko_review",

View File

@ -18,10 +18,7 @@ def create_ko_task_order(user_session, contracting_officer):
)
TaskOrders.add_officer(
contracting_officer,
task_order,
"contracting_officer",
contracting_officer.to_dictionary(),
task_order, "contracting_officer", contracting_officer.to_dictionary()
)
dd_254 = DD254Factory.create()
@ -33,7 +30,7 @@ def create_ko_task_order(user_session, contracting_officer):
def test_show_signature_requested_not_ko(client, user_session):
contracting_officer = UserFactory.create()
task_order = create_ko_task_order(user_session, contracting_officer)
TaskOrders.update(contracting_officer, task_order, contracting_officer=None)
TaskOrders.update(task_order, contracting_officer=None)
response = client.get(
url_for("task_orders.signature_requested", task_order_id=task_order.id)
@ -50,10 +47,7 @@ def test_show_signature_requested(client, user_session):
# create unfinished TO
task_order = TaskOrderFactory.create(portfolio=portfolio, clin_01=None)
TaskOrders.add_officer(
contracting_officer,
task_order,
"contracting_officer",
contracting_officer.to_dictionary(),
task_order, "contracting_officer", contracting_officer.to_dictionary()
)
response = client.get(
url_for("task_orders.signature_requested", task_order_id=task_order.id)
@ -61,7 +55,7 @@ def test_show_signature_requested(client, user_session):
assert response.status_code == 404
# Finish TO
TaskOrders.update(contracting_officer, task_order, clin_01=100)
TaskOrders.update(task_order, clin_01=100)
response = client.get(
url_for("task_orders.signature_requested", task_order_id=task_order.id)
)
@ -79,9 +73,7 @@ def test_show_signature_requested(client, user_session):
def test_show_signature_requested_already_signed(client, user_session):
contracting_officer = UserFactory.create()
task_order = create_ko_task_order(user_session, contracting_officer)
TaskOrders.update(
contracting_officer, task_order, signer_dod_id=contracting_officer.dod_id
)
TaskOrders.update(task_order, signer_dod_id=contracting_officer.dod_id)
response = client.get(
url_for("task_orders.signature_requested", task_order_id=task_order.id)
@ -93,7 +85,7 @@ def test_show_signature_requested_already_signed(client, user_session):
def test_signing_task_order_not_ko(client, user_session):
contracting_officer = UserFactory.create()
task_order = create_ko_task_order(user_session, contracting_officer)
TaskOrders.update(contracting_officer, task_order, contracting_officer=None)
TaskOrders.update(task_order, contracting_officer=None)
response = client.post(
url_for("task_orders.record_signature", task_order_id=task_order.id), data={}
@ -105,9 +97,7 @@ def test_signing_task_order_not_ko(client, user_session):
def test_singing_an_already_signed_task_order(client, user_session):
contracting_officer = UserFactory.create()
task_order = create_ko_task_order(user_session, contracting_officer)
TaskOrders.update(
contracting_officer, task_order, signer_dod_id=contracting_officer.dod_id
)
TaskOrders.update(task_order, signer_dod_id=contracting_officer.dod_id)
response = client.post(
url_for("task_orders.record_signature", task_order_id=task_order.id),

View File

@ -4,7 +4,7 @@ from urllib.parse import quote
from tests.factories import UserFactory
PROTECTED_URL = "/portfolios"
PROTECTED_URL = "/task_orders/new/get_started"
def test_request_page_with_complete_profile(client, user_session):

View File

789
tests/test_access.py Normal file
View File

@ -0,0 +1,789 @@
import pytest
from flask import url_for, Response
import atst
from atst.app import make_app, make_config
from atst.domain.auth import UNPROTECTED_ROUTES as _NO_LOGIN_REQUIRED
import atst.domain.authz as authz
from atst.domain.permission_sets import PermissionSets
from atst.models.portfolio_role import Status as PortfolioRoleStatus
from tests.factories import (
AttachmentFactory,
InvitationFactory,
PortfolioFactory,
PortfolioRoleFactory,
TaskOrderFactory,
UserFactory,
)
_NO_ACCESS_CHECK_REQUIRED = _NO_LOGIN_REQUIRED + [
"task_orders.get_started", # all users can start a new TO
"atst.csp_environment_access", # internal redirect
"atst.jedi_csp_calculator", # internal redirect
"atst.styleguide", # dev reference
"dev.test_email", # dev tool
"dev.messages", # dev tool
"atst.home", # available to all users
"users.user", # available to all users
"users.update_user", # available to all users
"portfolios.accept_invitation", # available to all users; access control is built into invitation logic
"atst.catch_all", # available to all users
"portfolios.portfolios", # the portfolios list is scoped to the user separately
]
def protected_routes(app):
_protected_routes = []
for rule in app.url_map.iter_rules():
args = [1] * len(rule.arguments)
mock_args = dict(zip(rule.arguments, args))
_n, route = rule.build(mock_args)
if rule.endpoint in _NO_ACCESS_CHECK_REQUIRED or "/static" in route:
continue
_protected_routes.append((rule, route))
return _protected_routes
sample_config = make_config({"CRL_STORAGE_PROVIDER": "LOCAL"})
sample_app = make_app(sample_config)
_PROTECTED_ROUTES = protected_routes(sample_app)
class Null:
"""
Very simple null object. Will return itself for all attribute
calls:
> foo = Null()
> foo.bar.baz == foo
"""
def __init__(self, *args, **kwargs):
pass
def __getattr__(self, name):
return self
@pytest.mark.access_check
@pytest.mark.parametrize("rule,route", _PROTECTED_ROUTES)
def test_all_protected_routes_have_access_control(
rule, route, mocker, client, user_session, monkeypatch
):
"""
This tests that all routes, except the ones in
_NO_ACCESS_CHECK_REQUIRED, are protected by the access
decorator.
"""
# monkeypatch any object lookups that might happen in the access decorator
monkeypatch.setattr("atst.domain.portfolios.Portfolios.for_user", lambda *a: [])
monkeypatch.setattr("atst.domain.portfolios.Portfolios.get", lambda *a: None)
monkeypatch.setattr("atst.domain.task_orders.TaskOrders.get", lambda *a: Null())
# patch the internal function the access decorator uses so that
# we can check that it was called
mocker.patch("atst.domain.authz.decorator.check_access")
user = UserFactory.create()
user_session(user)
method = "get" if "GET" in rule.methods else "post"
getattr(client, method)(route)
assert (
atst.domain.authz.decorator.check_access.call_count == 1
), "no access control for {}".format(rule.endpoint)
def user_with(*perm_sets_names):
return UserFactory.create(permission_sets=PermissionSets.get_many(perm_sets_names))
@pytest.fixture
def get_url_assert_status(client, user_session):
def _get_url_assert_status(user, url, status):
user_session(user)
resp = client.get(url)
assert resp.status_code == status
return _get_url_assert_status
@pytest.fixture
def post_url_assert_status(client, user_session):
def _get_url_assert_status(user, url, status):
user_session(user)
resp = client.post(url)
assert resp.status_code == status
return _get_url_assert_status
# atst.activity_history
def test_atst_activity_history_access(get_url_assert_status):
ccpo = user_with(PermissionSets.VIEW_AUDIT_LOG)
rando = user_with()
url = url_for("atst.activity_history")
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.access_environment
def test_portfolios_access_environment_access(get_url_assert_status):
dev = UserFactory.create()
rando = UserFactory.create()
ccpo = UserFactory.create_ccpo()
portfolio = PortfolioFactory.create(
owner=dev,
applications=[
{
"name": "Mos Eisley",
"description": "Where Han shot first",
"environments": [
{
"name": "thebar",
"members": [{"user": dev, "role_name": "devops"}],
}
],
}
],
)
env = portfolio.applications[0].environments[0]
url = url_for(
"portfolios.access_environment",
portfolio_id=portfolio.id,
environment_id=env.id,
)
get_url_assert_status(dev, url, 302)
get_url_assert_status(rando, url, 404)
get_url_assert_status(ccpo, url, 404)
# portfolios.application_members
def test_portfolios_application_members_access(get_url_assert_status):
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_APPLICATION_MANAGEMENT)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(
owner=owner,
applications=[{"name": "Mos Eisley", "description": "Where Han shot first"}],
)
app = portfolio.applications[0]
url = url_for(
"portfolios.application_members",
portfolio_id=portfolio.id,
application_id=app.id,
)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(owner, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.create_application
def test_portfolios_create_application_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_APPLICATION_MANAGEMENT)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
url = url_for("portfolios.create_application", portfolio_id=portfolio.id)
post_url_assert_status(ccpo, url, 200)
post_url_assert_status(owner, url, 200)
post_url_assert_status(rando, url, 404)
# portfolios.create_member
def test_portfolios_create_member_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
url = url_for("portfolios.create_member", portfolio_id=portfolio.id)
post_url_assert_status(ccpo, url, 200)
post_url_assert_status(owner, url, 200)
post_url_assert_status(rando, url, 404)
# portfolios.edit_application
def test_portfolios_edit_application_access(get_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_APPLICATION_MANAGEMENT)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(
owner=owner,
applications=[{"name": "Mos Eisley", "description": "Where Han shot first"}],
)
app = portfolio.applications[0]
url = url_for(
"portfolios.edit_application", portfolio_id=portfolio.id, application_id=app.id
)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(owner, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.edit_portfolio
def test_portfolios_edit_portfolio_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
url = url_for("portfolios.edit_portfolio", portfolio_id=portfolio.id)
post_url_assert_status(ccpo, url, 200)
post_url_assert_status(owner, url, 200)
post_url_assert_status(rando, url, 404)
# portfolios.edit_task_order_invitations
def test_portfolios_edit_task_order_invitations_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_FUNDING)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio)
url = url_for(
"portfolios.edit_task_order_invitations",
portfolio_id=portfolio.id,
task_order_id=task_order.id,
)
post_url_assert_status(ccpo, url, 302)
post_url_assert_status(owner, url, 302)
post_url_assert_status(rando, url, 404)
# portfolios.ko_review
def test_portfolios_ko_review_access(get_url_assert_status):
ccpo = UserFactory.create_ccpo()
owner = user_with()
cor = user_with()
ko = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(
portfolio=portfolio,
contracting_officer=ko,
contracting_officer_representative=cor,
)
url = url_for(
"portfolios.ko_review", portfolio_id=portfolio.id, task_order_id=task_order.id
)
get_url_assert_status(ccpo, url, 404)
get_url_assert_status(owner, url, 404)
get_url_assert_status(ko, url, 200)
get_url_assert_status(cor, url, 200)
# portfolios.new_application
def test_portfolios_new_application_access(get_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_APPLICATION_MANAGEMENT)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
url = url_for("portfolios.new_application", portfolio_id=portfolio.id)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(owner, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.new_member
def test_portfolios_new_member_access(get_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
url = url_for("portfolios.new_member", portfolio_id=portfolio.id)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(owner, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.portfolio_admin
def test_portfolios_portfolio_admin_access(get_url_assert_status):
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_ADMIN)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
url = url_for("portfolios.portfolio_admin", portfolio_id=portfolio.id)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(owner, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.portfolio_applications
def test_portfolios_portfolio_applications_access(get_url_assert_status):
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_APPLICATION_MANAGEMENT)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
url = url_for("portfolios.portfolio_applications", portfolio_id=portfolio.id)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(owner, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.portfolio_funding
def test_portfolios_portfolio_funding_access(get_url_assert_status):
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_FUNDING)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
url = url_for("portfolios.portfolio_funding", portfolio_id=portfolio.id)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(owner, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.portfolio_members
def test_portfolios_portfolio_members_access(get_url_assert_status):
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_ADMIN)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
url = url_for("portfolios.portfolio_members", portfolio_id=portfolio.id)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(owner, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.portfolio_reports
def test_portfolios_portfolio_reports_access(get_url_assert_status):
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_REPORTS)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
url = url_for("portfolios.portfolio_reports", portfolio_id=portfolio.id)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(owner, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.resend_invitation
def test_portfolios_resend_invitation_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN)
owner = user_with()
rando = user_with()
invitee = user_with()
portfolio = PortfolioFactory.create(owner=owner)
prr = PortfolioRoleFactory.create(user=invitee, portfolio=portfolio)
invite = InvitationFactory.create(user=UserFactory.create(), portfolio_role=prr)
url = url_for(
"portfolios.resend_invitation", portfolio_id=portfolio.id, token=invite.token
)
post_url_assert_status(ccpo, url, 302)
post_url_assert_status(owner, url, 302)
post_url_assert_status(invitee, url, 404)
post_url_assert_status(rando, url, 404)
# portfolios.resend_invite
def test_portfolios_resend_invite_access(post_url_assert_status):
ccpo = UserFactory.create_ccpo()
owner = user_with()
rando = user_with()
ko = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio, contracting_officer=ko)
prr = PortfolioRoleFactory.create(user=ko, portfolio=portfolio)
invite = InvitationFactory.create(user=UserFactory.create(), portfolio_role=prr)
url = url_for(
"portfolios.resend_invite",
portfolio_id=portfolio.id,
task_order_id=task_order.id,
invite_type="ko_invite",
)
post_url_assert_status(ccpo, url, 302)
post_url_assert_status(owner, url, 302)
post_url_assert_status(ko, url, 404)
post_url_assert_status(rando, url, 404)
# portfolios.revoke_access
def test_portfolios_revoke_access_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
for user, status in [(ccpo, 302), (owner, 302), (rando, 404)]:
prt_member = user_with()
prr = PortfolioRoleFactory.create(
user=prt_member, portfolio=portfolio, status=PortfolioRoleStatus.ACTIVE
)
url = url_for(
"portfolios.revoke_access", portfolio_id=portfolio.id, member_id=prr.id
)
post_url_assert_status(user, url, status)
# portfolios.revoke_invitation
def test_portfolios_revoke_invitation_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
for user, status in [(ccpo, 302), (owner, 302), (rando, 404)]:
prt_member = user_with()
prr = PortfolioRoleFactory.create(
user=prt_member, portfolio=portfolio, status=PortfolioRoleStatus.ACTIVE
)
invite = InvitationFactory.create(user=prt_member, portfolio_role=prr)
url = url_for(
"portfolios.revoke_invitation",
portfolio_id=portfolio.id,
token=invite.token,
)
post_url_assert_status(user, url, status)
# portfolios.show_portfolio
def test_portfolios_show_portfolio_access(get_url_assert_status):
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
url = url_for("portfolios.show_portfolio", portfolio_id=portfolio.id)
get_url_assert_status(ccpo, url, 302)
get_url_assert_status(owner, url, 302)
get_url_assert_status(rando, url, 404)
# portfolios.so_review
def test_portfolios_so_review_access(get_url_assert_status):
ccpo = UserFactory.create_ccpo()
owner = user_with()
rando = user_with()
so = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio, security_officer=so)
url = url_for(
"portfolios.so_review", portfolio_id=portfolio.id, task_order_id=task_order.id
)
get_url_assert_status(so, url, 200)
get_url_assert_status(ccpo, url, 404)
get_url_assert_status(owner, url, 404)
get_url_assert_status(rando, url, 404)
# portfolios.submit_ko_review
def test_portfolios_submit_ko_review_access(post_url_assert_status):
ccpo = UserFactory.create_ccpo()
owner = user_with()
cor = user_with()
ko = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(
portfolio=portfolio,
contracting_officer=ko,
contracting_officer_representative=cor,
)
url = url_for(
"portfolios.submit_ko_review",
portfolio_id=portfolio.id,
task_order_id=task_order.id,
)
post_url_assert_status(ccpo, url, 404)
post_url_assert_status(owner, url, 404)
post_url_assert_status(ko, url, 200)
post_url_assert_status(cor, url, 200)
# portfolios.submit_so_review
def test_portfolios_submit_so_review_access(post_url_assert_status):
ccpo = UserFactory.create_ccpo()
owner = user_with()
rando = user_with()
so = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio, security_officer=so)
url = url_for(
"portfolios.submit_so_review",
portfolio_id=portfolio.id,
task_order_id=task_order.id,
)
post_url_assert_status(so, url, 200)
post_url_assert_status(ccpo, url, 404)
post_url_assert_status(owner, url, 404)
post_url_assert_status(rando, url, 404)
# portfolios.task_order_invitations
def test_portfolios_task_order_invitations_access(get_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_FUNDING)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio)
url = url_for(
"portfolios.task_order_invitations",
portfolio_id=portfolio.id,
task_order_id=task_order.id,
)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(owner, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.update_application
def test_portfolios_update_application_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_APPLICATION_MANAGEMENT)
dev = UserFactory.create()
rando = UserFactory.create()
portfolio = PortfolioFactory.create(
owner=dev,
applications=[{"name": "Mos Eisley", "description": "Where Han shot first"}],
)
app = portfolio.applications[0]
url = url_for(
"portfolios.update_application",
portfolio_id=portfolio.id,
application_id=app.id,
)
post_url_assert_status(dev, url, 200)
post_url_assert_status(ccpo, url, 200)
post_url_assert_status(rando, url, 404)
# portfolios.update_member
def test_portfolios_update_member_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN)
owner = user_with()
rando = user_with()
prt_member = user_with()
portfolio = PortfolioFactory.create(owner=owner)
prr = PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio)
url = url_for(
"portfolios.update_member", portfolio_id=portfolio.id, member_id=prt_member.id
)
post_url_assert_status(owner, url, 200)
post_url_assert_status(ccpo, url, 200)
post_url_assert_status(rando, url, 404)
# portfolios.view_member
def test_portfolios_view_member_access(get_url_assert_status):
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_ADMIN)
owner = user_with()
rando = user_with()
prt_member = user_with()
portfolio = PortfolioFactory.create(owner=owner)
prr = PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio)
url = url_for(
"portfolios.view_member", portfolio_id=portfolio.id, member_id=prt_member.id
)
get_url_assert_status(owner, url, 200)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(rando, url, 404)
# portfolios.view_task_order
def test_portfolios_view_task_order_access(get_url_assert_status):
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_FUNDING)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio)
url = url_for(
"portfolios.view_task_order",
portfolio_id=portfolio.id,
task_order_id=task_order.id,
)
get_url_assert_status(owner, url, 200)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(rando, url, 404)
# task_orders.download_csp_estimate
def test_task_orders_download_csp_estimate_access(get_url_assert_status, monkeypatch):
monkeypatch.setattr(
"atst.routes.task_orders.index.send_file", lambda a: Response("")
)
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_FUNDING)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio)
url = url_for("task_orders.download_csp_estimate", task_order_id=task_order.id)
get_url_assert_status(owner, url, 200)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(rando, url, 404)
# task_orders.download_summary
def test_task_orders_download_summary_access(get_url_assert_status):
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_FUNDING)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio)
url = url_for("task_orders.download_summary", task_order_id=task_order.id)
get_url_assert_status(owner, url, 200)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(rando, url, 404)
# task_orders.download_task_order_pdf
def test_task_orders_download_task_order_pdf_access(get_url_assert_status, monkeypatch):
monkeypatch.setattr(
"atst.routes.task_orders.index.send_file", lambda a: Response("")
)
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_FUNDING)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(
portfolio=portfolio, pdf=AttachmentFactory.create()
)
url = url_for("task_orders.download_task_order_pdf", task_order_id=task_order.id)
get_url_assert_status(owner, url, 200)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(rando, url, 404)
# task_orders.invite
def test_task_orders_invite_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_FUNDING)
owner = user_with()
rando = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio)
url = url_for("task_orders.invite", task_order_id=task_order.id)
post_url_assert_status(owner, url, 302)
post_url_assert_status(ccpo, url, 302)
post_url_assert_status(rando, url, 404)
# task_orders.new
def test_task_orders_new_access(get_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_FUNDING)
owner = user_with()
rando = user_with()
url = url_for("task_orders.new", screen=1)
get_url_assert_status(owner, url, 200)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(rando, url, 200)
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio)
url = url_for("task_orders.new", screen=2, task_order_id=task_order.id)
get_url_assert_status(owner, url, 200)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(rando, url, 404)
url = url_for("task_orders.new", screen=1, portfolio_id=portfolio.id)
get_url_assert_status(owner, url, 200)
get_url_assert_status(ccpo, url, 200)
get_url_assert_status(rando, url, 404)
# task_orders.record_signature
def test_task_orders_record_signature_access(post_url_assert_status, monkeypatch):
ccpo = UserFactory.create_ccpo()
owner = user_with()
rando = user_with()
ko = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio, contracting_officer=ko)
monkeypatch.setattr(
"atst.routes.task_orders.signing.find_unsigned_ko_to", lambda *a: task_order
)
url = url_for("task_orders.record_signature", task_order_id=task_order.id)
post_url_assert_status(ko, url, 400)
post_url_assert_status(owner, url, 404)
post_url_assert_status(ccpo, url, 404)
post_url_assert_status(rando, url, 404)
# task_orders.signature_requested
def test_task_orders_signature_requested_access(get_url_assert_status, monkeypatch):
ccpo = UserFactory.create_ccpo()
owner = user_with()
rando = user_with()
ko = user_with()
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio, contracting_officer=ko)
monkeypatch.setattr(
"atst.routes.task_orders.signing.find_unsigned_ko_to", lambda *a: task_order
)
url = url_for("task_orders.record_signature", task_order_id=task_order.id)
get_url_assert_status(ko, url, 200)
get_url_assert_status(owner, url, 404)
get_url_assert_status(ccpo, url, 404)
get_url_assert_status(rando, url, 404)
# task_orders.update
def test_task_orders_update_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_FUNDING)
owner = user_with()
rando = user_with()
url = url_for("task_orders.update", screen=1)
post_url_assert_status(owner, url, 200)
post_url_assert_status(ccpo, url, 200)
post_url_assert_status(rando, url, 200)
portfolio = PortfolioFactory.create(owner=owner)
task_order = TaskOrderFactory.create(portfolio=portfolio)
url = url_for("task_orders.update", screen=2, task_order_id=task_order.id)
post_url_assert_status(owner, url, 302)
post_url_assert_status(ccpo, url, 302)
post_url_assert_status(rando, url, 404)
url = url_for("task_orders.update", screen=1, portfolio_id=portfolio.id)
post_url_assert_status(owner, url, 302)
post_url_assert_status(ccpo, url, 302)
post_url_assert_status(rando, url, 404)

View File

@ -14,3 +14,17 @@ def captured_templates(app):
yield recorded
finally:
template_rendered.disconnect(record, app)
class FakeLogger:
def __init__(self):
self.messages = []
def info(self, msg):
self.messages.append(msg)
def warning(self, msg):
self.messages.append(msg)
def error(self, msg):
self.messages.append(msg)