diff --git a/atst/domain/applications.py b/atst/domain/applications.py index 243ed733..30884c59 100644 --- a/atst/domain/applications.py +++ b/atst/domain/applications.py @@ -1,8 +1,6 @@ from atst.database import db -from atst.domain.authz import Authorization from atst.domain.environments import Environments from atst.domain.exceptions import NotFoundError -from atst.models.permissions import Permissions from atst.models.application import Application from atst.models.environment import Environment from atst.models.environment_role import EnvironmentRole @@ -10,7 +8,7 @@ from atst.models.environment_role import EnvironmentRole class Applications(object): @classmethod - def create(cls, user, portfolio, name, description, environment_names): + def create(cls, portfolio, name, description, environment_names): application = Application( portfolio=portfolio, name=name, description=description ) @@ -22,15 +20,7 @@ class Applications(object): return application @classmethod - def get(cls, user, portfolio, application_id): - # TODO: this should check permission for this particular application - Authorization.check_portfolio_permission( - user, - portfolio, - Permissions.VIEW_APPLICATION, - "view application in portfolio", - ) - + def get(cls, application_id): try: application = ( db.session.query(Application).filter_by(id=application_id).one() @@ -52,14 +42,7 @@ class Applications(object): ) @classmethod - def get_all(cls, user, portfolio_role, portfolio): - Authorization.check_portfolio_permission( - user, - portfolio, - Permissions.VIEW_APPLICATION, - "view application in portfolio", - ) - + def get_all(cls, portfolio): try: applications = ( db.session.query(Application).filter_by(portfolio_id=portfolio.id).all() @@ -70,7 +53,7 @@ class Applications(object): return applications @classmethod - def update(cls, user, portfolio, application, new_data): + def update(cls, application, new_data): if "name" in new_data: application.name = new_data["name"] if "description" in new_data: diff --git a/atst/domain/audit_log.py b/atst/domain/audit_log.py index c33be753..8fa6e517 100644 --- a/atst/domain/audit_log.py +++ b/atst/domain/audit_log.py @@ -2,7 +2,6 @@ from sqlalchemy import or_ from atst.database import db from atst.domain.common import Query -from atst.domain.authz import Authorization, Permissions from atst.models.audit_event import AuditEvent @@ -35,21 +34,11 @@ class AuditLog(object): return cls._log(resource=resource, action=action, portfolio=portfolio) @classmethod - def get_all_events(cls, user, pagination_opts=None): - # TODO: general audit log permissions - Authorization.check_atat_permission( - user, Permissions.VIEW_AUDIT_LOG, "view audit log" - ) + def get_all_events(cls, pagination_opts=None): return AuditEventQuery.get_all(pagination_opts) @classmethod - def get_portfolio_events(cls, user, portfolio, pagination_opts=None): - Authorization.check_portfolio_permission( - user, - portfolio, - Permissions.VIEW_PORTFOLIO_ACTIVITY_LOG, - "view portfolio audit log", - ) + def get_portfolio_events(cls, portfolio, pagination_opts=None): return AuditEventQuery.get_ws_events(portfolio.id, pagination_opts) @classmethod diff --git a/atst/domain/authz.py b/atst/domain/authz/__init__.py similarity index 71% rename from atst/domain/authz.py rename to atst/domain/authz/__init__.py index 6ee5fa9d..6e8cdfea 100644 --- a/atst/domain/authz.py +++ b/atst/domain/authz/__init__.py @@ -18,10 +18,6 @@ class Authorization(object): def has_atat_permission(cls, user, permission): return permission in user.permissions - @classmethod - def is_in_portfolio(cls, user, portfolio): - return user in portfolio.users - @classmethod def check_portfolio_permission(cls, user, portfolio, permission, message): if not ( @@ -30,14 +26,14 @@ class Authorization(object): ): raise UnauthorizedError(user, message) + return True + @classmethod def check_atat_permission(cls, user, permission, message): if not Authorization.has_atat_permission(user, permission): raise UnauthorizedError(user, message) - @classmethod - def can_view_audit_log(cls, user): - return Authorization.has_atat_permission(user, Permissions.VIEW_AUDIT_LOG) + return True @classmethod def is_ko(cls, user, task_order): @@ -72,23 +68,11 @@ class Authorization(object): message = "review task order {}".format(task_order.id) raise UnauthorizedError(user, message) - @classmethod - def check_task_order_permission(cls, user, task_order, permission, message): - if Authorization._check_is_task_order_officer(user, task_order): - return True - Authorization.check_portfolio_permission( - user, task_order.portfolio, permission, message - ) +def user_can_access(user, permission, portfolio=None, message=None): + if portfolio: + Authorization.check_portfolio_permission(user, portfolio, permission, message) + else: + Authorization.check_atat_permission(user, permission, message) - @classmethod - def _check_is_task_order_officer(cls, user, task_order): - for officer in [ - "contracting_officer", - "contracting_officer_representative", - "security_officer", - ]: - if getattr(task_order, officer, None) == user: - return True - - return False + return True diff --git a/atst/domain/authz/decorator.py b/atst/domain/authz/decorator.py new file mode 100644 index 00000000..17755749 --- /dev/null +++ b/atst/domain/authz/decorator.py @@ -0,0 +1,55 @@ +from functools import wraps + +from flask import g, current_app as app, request + +from . import user_can_access +from atst.domain.portfolios import Portfolios +from atst.domain.task_orders import TaskOrders +from atst.domain.exceptions import UnauthorizedError + + +def check_access(permission, message, exception, *args, **kwargs): + access_args = {"message": message} + + if "portfolio_id" in kwargs: + access_args["portfolio"] = Portfolios.get( + g.current_user, kwargs["portfolio_id"] + ) + + if "task_order_id" in kwargs: + task_order = TaskOrders.get(kwargs["task_order_id"]) + access_args["portfolio"] = task_order.portfolio + + if exception is not None and exception(g.current_user, **access_args, **kwargs): + return True + + user_can_access(g.current_user, permission, **access_args) + + return True + + +def user_can_access_decorator(permission, message=None, exception=None): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + try: + check_access(permission, message, exception, *args, **kwargs) + app.logger.info( + "[access] User {} accessed {} {}".format( + g.current_user.id, request.method, request.path + ) + ) + + return f(*args, **kwargs) + except UnauthorizedError as err: + app.logger.warning( + "[access] User {} denied access {} {}".format( + g.current_user.id, request.method, request.path + ) + ) + + raise (err) + + return decorated_function + + return decorator diff --git a/atst/domain/environments.py b/atst/domain/environments.py index d667ebb0..9b0fa9b5 100644 --- a/atst/domain/environments.py +++ b/atst/domain/environments.py @@ -5,8 +5,6 @@ from atst.database import db from atst.models.environment import Environment from atst.models.environment_role import EnvironmentRole from atst.models.application import Application -from atst.models.permissions import Permissions -from atst.domain.authz import Authorization from atst.domain.environment_roles import EnvironmentRoles from .exceptions import NotFoundError @@ -60,13 +58,7 @@ class Environments(object): return env @classmethod - def update_environment_roles(cls, user, portfolio, portfolio_role, ids_and_roles): - Authorization.check_portfolio_permission( - user, - portfolio, - Permissions.EDIT_APPLICATION_MEMBER, - "assign environment roles", - ) + def update_environment_roles(cls, portfolio_role, ids_and_roles): updated = False for id_and_role in ids_and_roles: @@ -100,11 +92,5 @@ class Environments(object): return updated @classmethod - def revoke_access(cls, user, environment, target_user): - Authorization.check_portfolio_permission( - user, - environment.portfolio, - Permissions.EDIT_APPLICATION_MEMBER, - "revoke environment access", - ) + def revoke_access(cls, environment, target_user): EnvironmentRoles.delete(environment.id, target_user.id) diff --git a/atst/domain/invitations.py b/atst/domain/invitations.py index bb4b409e..c4d51248 100644 --- a/atst/domain/invitations.py +++ b/atst/domain/invitations.py @@ -4,8 +4,6 @@ from sqlalchemy.orm.exc import NoResultFound from atst.database import db from atst.models.invitation import Invitation, Status as InvitationStatus from atst.domain.portfolio_roles import PortfolioRoles -from atst.domain.authz import Authorization, Permissions -from atst.domain.portfolios import Portfolios from .exceptions import NotFoundError @@ -118,15 +116,7 @@ class Invitations(object): return portfolio_role.latest_invitation @classmethod - def resend(cls, user, portfolio_id, token): - portfolio = Portfolios.get(user, portfolio_id) - Authorization.check_portfolio_permission( - user, - portfolio, - Permissions.CREATE_PORTFOLIO_USERS, - "resend a portfolio invitation", - ) - + def resend(cls, user, token): previous_invitation = Invitations._get(token) Invitations._update_status(previous_invitation, InvitationStatus.REVOKED) diff --git a/atst/domain/portfolios/portfolios.py b/atst/domain/portfolios/portfolios.py index 90797eeb..49e5b005 100644 --- a/atst/domain/portfolios/portfolios.py +++ b/atst/domain/portfolios/portfolios.py @@ -33,51 +33,11 @@ class Portfolios(object): @classmethod def get(cls, user, portfolio_id): portfolio = PortfoliosQuery.get(portfolio_id) - Authorization.check_portfolio_permission( - user, portfolio, Permissions.VIEW_PORTFOLIO, "get portfolio" - ) - return ScopedPortfolio(user, portfolio) @classmethod - def get_for_update_applications(cls, user, portfolio_id): + def get_for_update(cls, portfolio_id): portfolio = PortfoliosQuery.get(portfolio_id) - Authorization.check_portfolio_permission( - user, portfolio, Permissions.CREATE_APPLICATION, "add application" - ) - - return portfolio - - @classmethod - def get_for_update_information(cls, user, portfolio_id): - portfolio = PortfoliosQuery.get(portfolio_id) - Authorization.check_portfolio_permission( - user, - portfolio, - Permissions.EDIT_PORTFOLIO_NAME, - "update portfolio information", - ) - - return portfolio - - @classmethod - def get_for_update_member(cls, user, portfolio_id): - portfolio = PortfoliosQuery.get(portfolio_id) - Authorization.check_portfolio_permission( - user, - portfolio, - Permissions.EDIT_PORTFOLIO_USERS, - "update a portfolio member", - ) - - return portfolio - - @classmethod - def get_with_members(cls, user, portfolio_id): - portfolio = PortfoliosQuery.get(portfolio_id) - Authorization.check_portfolio_permission( - user, portfolio, Permissions.VIEW_PORTFOLIO_USERS, "view portfolio members" - ) return portfolio @@ -90,11 +50,7 @@ class Portfolios(object): return portfolios @classmethod - def create_member(cls, user, portfolio, data): - Authorization.check_portfolio_permission( - user, portfolio, Permissions.EDIT_PORTFOLIO_USERS, "create portfolio member" - ) - + def create_member(cls, portfolio, data): new_user = Users.get_or_create_by_dod_id( data["dod_id"], first_name=data["first_name"], @@ -113,12 +69,7 @@ class Portfolios(object): return portfolio_role @classmethod - def update_member(cls, user, portfolio, member, permission_sets): - Authorization.check_portfolio_permission( - user, portfolio, Permissions.EDIT_PORTFOLIO_USERS, "edit portfolio member" - ) - - # need to update perms sets here + def update_member(cls, member, permission_sets): return PortfolioRoles.update(member, permission_sets) @classmethod @@ -149,11 +100,8 @@ class Portfolios(object): ) @classmethod - def revoke_access(cls, user, portfolio_id, portfolio_role_id): + def revoke_access(cls, portfolio_id, portfolio_role_id): portfolio = PortfoliosQuery.get(portfolio_id) - Authorization.check_portfolio_permission( - user, portfolio, Permissions.EDIT_PORTFOLIO_USERS, "revoke portfolio access" - ) portfolio_role = PortfolioRoles.get_by_id(portfolio_role_id) if not Portfolios.can_revoke_access_for(portfolio, portfolio_role): @@ -161,7 +109,7 @@ class Portfolios(object): portfolio_role.status = PortfolioRoleStatus.DISABLED for environment in portfolio.all_environments: - Environments.revoke_access(user, environment, portfolio_role.user) + Environments.revoke_access(environment, portfolio_role.user) PortfoliosQuery.add_and_commit(portfolio_role) return portfolio_role diff --git a/atst/domain/task_orders.py b/atst/domain/task_orders.py index 3055f31d..98b86a77 100644 --- a/atst/domain/task_orders.py +++ b/atst/domain/task_orders.py @@ -3,10 +3,8 @@ from flask import current_app as app from atst.database import db from atst.models.task_order import TaskOrder -from atst.models.permissions import Permissions from atst.models.dd_254 import DD254 from atst.domain.portfolios import Portfolios -from atst.domain.authz import Authorization from atst.domain.permission_sets import PermissionSets from .exceptions import NotFoundError @@ -54,12 +52,9 @@ class TaskOrders(object): UNCLASSIFIED_FUNDING = ["performance_length", "csp_estimate", "clin_01", "clin_03"] @classmethod - def get(cls, user, task_order_id): + def get(cls, task_order_id): try: task_order = db.session.query(TaskOrder).filter_by(id=task_order_id).one() - Authorization.check_task_order_permission( - user, task_order, Permissions.VIEW_TASK_ORDER_DETAILS, "view task order" - ) return task_order except NoResultFound: @@ -67,9 +62,6 @@ class TaskOrders(object): @classmethod def create(cls, creator, portfolio): - Authorization.check_portfolio_permission( - creator, portfolio, Permissions.CREATE_TASK_ORDER, "add task order" - ) task_order = TaskOrder(portfolio=portfolio, creator=creator) db.session.add(task_order) @@ -78,11 +70,7 @@ class TaskOrders(object): return task_order @classmethod - def update(cls, user, task_order, **kwargs): - Authorization.check_task_order_permission( - user, task_order, Permissions.EDIT_TASK_ORDER_DETAILS, "update task order" - ) - + def update(cls, task_order, **kwargs): for key, value in kwargs.items(): setattr(task_order, key, value) @@ -147,14 +135,7 @@ class TaskOrders(object): ] @classmethod - def add_officer(cls, user, task_order, officer_type, officer_data): - Authorization.check_portfolio_permission( - user, - task_order.portfolio, - Permissions.EDIT_TASK_ORDER_DETAILS, - "add task order officer", - ) - + def add_officer(cls, task_order, officer_type, officer_data): if officer_type in TaskOrders.OFFICERS: portfolio = task_order.portfolio @@ -171,7 +152,6 @@ class TaskOrders(object): portfolio_user = existing_member.user else: member = Portfolios.create_member( - user, portfolio, { **officer_data, diff --git a/atst/routes/__init__.py b/atst/routes/__init__.py index f7b64c79..2f1ad3c2 100644 --- a/atst/routes/__init__.py +++ b/atst/routes/__init__.py @@ -22,6 +22,8 @@ from atst.domain.audit_log import AuditLog from atst.domain.auth import logout as _logout from atst.domain.common import Paginator from atst.domain.portfolios import Portfolios +from atst.domain.authz.decorator import user_can_access_decorator as user_can +from atst.models.permissions import Permissions from atst.utils.flash import formatted_flash as flash @@ -143,9 +145,10 @@ def logout(): @bp.route("/activity-history") +@user_can(Permissions.VIEW_AUDIT_LOG, message="view activity log") def activity_history(): pagination_opts = Paginator.get_pagination_opts(request) - audit_events = AuditLog.get_all_events(g.current_user, pagination_opts) + audit_events = AuditLog.get_all_events(pagination_opts) return render_template("audit_log/audit_log.html", audit_events=audit_events) diff --git a/atst/routes/portfolios/__init__.py b/atst/routes/portfolios/__init__.py index e735064f..4bac8b58 100644 --- a/atst/routes/portfolios/__init__.py +++ b/atst/routes/portfolios/__init__.py @@ -18,12 +18,9 @@ from atst.models.permissions import Permissions def portfolio(): portfolio = None if "portfolio_id" in http_request.view_args: - try: - portfolio = Portfolios.get( - g.current_user, http_request.view_args["portfolio_id"] - ) - except UnauthorizedError: - pass + portfolio = Portfolios.get( + g.current_user, http_request.view_args["portfolio_id"] + ) def user_can(permission): if portfolio: diff --git a/atst/routes/portfolios/applications.py b/atst/routes/portfolios/applications.py index f9cad70d..b3825944 100644 --- a/atst/routes/portfolios/applications.py +++ b/atst/routes/portfolios/applications.py @@ -13,17 +13,21 @@ from atst.domain.exceptions import UnauthorizedError from atst.domain.applications import Applications from atst.domain.portfolios import Portfolios from atst.forms.application import NewApplicationForm, ApplicationForm +from atst.domain.authz.decorator import user_can_access_decorator as user_can +from atst.models.permissions import Permissions @portfolios_bp.route("/portfolios//applications") +@user_can(Permissions.VIEW_APPLICATION, message="view portfolio applications") def portfolio_applications(portfolio_id): portfolio = Portfolios.get(g.current_user, portfolio_id) return render_template("portfolios/applications/index.html", portfolio=portfolio) @portfolios_bp.route("/portfolios//applications/new") +@user_can(Permissions.CREATE_APPLICATION, message="view create new application form") def new_application(portfolio_id): - portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id) + portfolio = Portfolios.get_for_update(portfolio_id) form = NewApplicationForm() return render_template( "portfolios/applications/new.html", portfolio=portfolio, form=form @@ -31,14 +35,14 @@ def new_application(portfolio_id): @portfolios_bp.route("/portfolios//applications/new", methods=["POST"]) +@user_can(Permissions.CREATE_APPLICATION, message="create new application") def create_application(portfolio_id): - portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id) + portfolio = Portfolios.get_for_update(portfolio_id) form = NewApplicationForm(http_request.form) if form.validate(): application_data = form.data Applications.create( - g.current_user, portfolio, application_data["name"], application_data["description"], @@ -54,9 +58,10 @@ def create_application(portfolio_id): @portfolios_bp.route("/portfolios//applications//edit") +@user_can(Permissions.EDIT_APPLICATION, message="view application edit form") def edit_application(portfolio_id, application_id): - portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id) - application = Applications.get(g.current_user, portfolio, application_id) + portfolio = Portfolios.get_for_update(portfolio_id) + application = Applications.get(application_id) form = ApplicationForm(name=application.name, description=application.description) return render_template( @@ -70,13 +75,14 @@ def edit_application(portfolio_id, application_id): @portfolios_bp.route( "/portfolios//applications//edit", methods=["POST"] ) +@user_can(Permissions.EDIT_APPLICATION, message="update application") def update_application(portfolio_id, application_id): - portfolio = Portfolios.get_for_update_applications(g.current_user, portfolio_id) - application = Applications.get(g.current_user, portfolio, application_id) + portfolio = Portfolios.get_for_update(portfolio_id) + application = Applications.get(application_id) form = ApplicationForm(http_request.form) if form.validate(): application_data = form.data - Applications.update(g.current_user, portfolio, application, application_data) + Applications.update(application, application_data) return redirect( url_for("portfolios.portfolio_applications", portfolio_id=portfolio.id) @@ -90,13 +96,20 @@ def update_application(portfolio_id, application_id): ) +def wrap_environment_role_lookup( + user, portfolio_id=None, environment_id=None, **kwargs +): + env_role = EnvironmentRoles.get(user.id, environment_id) + if not env_role: + raise UnauthorizedError(user, "access environment {}".format(environment_id)) + + return True + + @portfolios_bp.route("/portfolios//environments//access") +@user_can(None, exception=wrap_environment_role_lookup, message="access environment") def access_environment(portfolio_id, environment_id): env_role = EnvironmentRoles.get(g.current_user.id, environment_id) - if not env_role: - raise UnauthorizedError( - g.current_user, "access environment {}".format(environment_id) - ) - else: - token = app.csp.cloud.get_access_token(env_role) - return redirect(url_for("atst.csp_environment_access", token=token)) + token = app.csp.cloud.get_access_token(env_role) + + return redirect(url_for("atst.csp_environment_access", token=token)) diff --git a/atst/routes/portfolios/index.py b/atst/routes/portfolios/index.py index fbbffd27..6aadb012 100644 --- a/atst/routes/portfolios/index.py +++ b/atst/routes/portfolios/index.py @@ -6,11 +6,11 @@ from . import portfolios_bp from atst.domain.reports import Reports from atst.domain.portfolios import Portfolios from atst.domain.audit_log import AuditLog -from atst.domain.authz import Authorization from atst.domain.common import Paginator from atst.forms.portfolio import PortfolioForm -from atst.models.permissions import Permissions from atst.domain.permission_sets import PermissionSets +from atst.domain.authz.decorator import user_can_access_decorator as user_can +from atst.models.permissions import Permissions @portfolios_bp.route("/portfolios") @@ -37,14 +37,9 @@ def serialize_member(member): } -@portfolios_bp.route("/portfolios//admin") -def portfolio_admin(portfolio_id): - portfolio = Portfolios.get_for_update_information(g.current_user, portfolio_id) - form = PortfolioForm(data={"name": portfolio.name}) +def render_admin_page(portfolio, form): pagination_opts = Paginator.get_pagination_opts(http_request) - audit_events = AuditLog.get_portfolio_events( - g.current_user, portfolio, pagination_opts - ) + audit_events = AuditLog.get_portfolio_events(portfolio, pagination_opts) members_data = [serialize_member(member) for member in portfolio.members] return render_template( "portfolios/admin.html", @@ -56,9 +51,18 @@ def portfolio_admin(portfolio_id): ) +@portfolios_bp.route("/portfolios//admin") +@user_can(Permissions.VIEW_PORTFOLIO_ADMIN, message="view portfolio admin page") +def portfolio_admin(portfolio_id): + portfolio = Portfolios.get_for_update(portfolio_id) + form = PortfolioForm(data={"name": portfolio.name}) + return render_admin_page(portfolio, form) + + @portfolios_bp.route("/portfolios//edit", methods=["POST"]) +@user_can(Permissions.EDIT_PORTFOLIO_NAME, message="edit portfolio") def edit_portfolio(portfolio_id): - portfolio = Portfolios.get_for_update_information(g.current_user, portfolio_id) + portfolio = Portfolios.get_for_update(portfolio_id) form = PortfolioForm(http_request.form) if form.validate(): Portfolios.update(portfolio, form.data) @@ -66,10 +70,12 @@ def edit_portfolio(portfolio_id): url_for("portfolios.portfolio_applications", portfolio_id=portfolio.id) ) else: - return render_template("portfolios/edit.html", form=form, portfolio=portfolio) + # rerender portfolio admin page + return render_admin_page(portfolio, form) @portfolios_bp.route("/portfolios/") +@user_can(Permissions.VIEW_PORTFOLIO, message="view portfolio") def show_portfolio(portfolio_id): return redirect( url_for("portfolios.portfolio_applications", portfolio_id=portfolio_id) @@ -77,15 +83,9 @@ def show_portfolio(portfolio_id): @portfolios_bp.route("/portfolios//reports") +@user_can(Permissions.VIEW_PORTFOLIO_REPORTS, message="view portfolio reports") def portfolio_reports(portfolio_id): portfolio = Portfolios.get(g.current_user, portfolio_id) - Authorization.check_portfolio_permission( - g.current_user, - portfolio, - Permissions.VIEW_PORTFOLIO_REPORTS, - "view portfolio reports", - ) - today = date.today() month = http_request.args.get("month", today.month) year = http_request.args.get("year", today.year) diff --git a/atst/routes/portfolios/invitations.py b/atst/routes/portfolios/invitations.py index 269ca21d..4774bd24 100644 --- a/atst/routes/portfolios/invitations.py +++ b/atst/routes/portfolios/invitations.py @@ -5,6 +5,8 @@ from atst.domain.portfolios import Portfolios from atst.domain.invitations import Invitations from atst.queue import queue from atst.utils.flash import formatted_flash as flash +from atst.domain.authz.decorator import user_can_access_decorator as user_can +from atst.models.permissions import Permissions def send_invite_email(owner_name, token, new_member_email): @@ -43,8 +45,9 @@ def accept_invitation(token): @portfolios_bp.route( "/portfolios//invitations//revoke", methods=["POST"] ) +@user_can(Permissions.EDIT_PORTFOLIO_USERS, message="revoke invitation") def revoke_invitation(portfolio_id, token): - portfolio = Portfolios.get_for_update_member(g.current_user, portfolio_id) + portfolio = Portfolios.get_for_update(portfolio_id) Invitations.revoke(token) return redirect(url_for("portfolios.portfolio_members", portfolio_id=portfolio.id)) @@ -53,8 +56,9 @@ def revoke_invitation(portfolio_id, token): @portfolios_bp.route( "/portfolios//invitations//resend", methods=["POST"] ) +@user_can(Permissions.EDIT_PORTFOLIO_USERS, message="resend invitation") def resend_invitation(portfolio_id, token): - invite = Invitations.resend(g.current_user, portfolio_id, token) + invite = Invitations.resend(g.current_user, token) send_invite_email(g.current_user.full_name, invite.token, invite.email) flash("resend_portfolio_invitation", user_name=invite.user_name) return redirect(url_for("portfolios.portfolio_members", portfolio_id=portfolio_id)) diff --git a/atst/routes/portfolios/members.py b/atst/routes/portfolios/members.py index 55383529..46aaf8fb 100644 --- a/atst/routes/portfolios/members.py +++ b/atst/routes/portfolios/members.py @@ -12,7 +12,7 @@ from atst.domain.environment_roles import EnvironmentRoles from atst.services.invitation import Invitation as InvitationService import atst.forms.portfolio_member as member_forms from atst.forms.data import ENVIRONMENT_ROLES, ENV_ROLE_MODAL_DESCRIPTION -from atst.domain.authz import Authorization +from atst.domain.authz.decorator import user_can_access_decorator as user_can from atst.models.permissions import Permissions from atst.utils.flash import formatted_flash as flash @@ -34,8 +34,9 @@ def serialize_portfolio_role(portfolio_role): @portfolios_bp.route("/portfolios//members") +@user_can(Permissions.VIEW_PORTFOLIO_USERS, message="view portfolio members") def portfolio_members(portfolio_id): - portfolio = Portfolios.get_with_members(g.current_user, portfolio_id) + portfolio = Portfolios.get_for_update(portfolio_id) members_list = [serialize_portfolio_role(k) for k in portfolio.members] return render_template( @@ -47,9 +48,10 @@ def portfolio_members(portfolio_id): @portfolios_bp.route("/portfolios//applications//members") +@user_can(Permissions.VIEW_APPLICATION_MEMBER, message="view application members") def application_members(portfolio_id, application_id): - portfolio = Portfolios.get_with_members(g.current_user, portfolio_id) - application = Applications.get(g.current_user, portfolio, application_id) + portfolio = Portfolios.get_for_update(portfolio_id) + application = Applications.get(application_id) # TODO: this should show only members that have env roles in this application members_list = [serialize_portfolio_role(k) for k in portfolio.members] @@ -62,6 +64,9 @@ def application_members(portfolio_id, application_id): @portfolios_bp.route("/portfolios//members/new") +@user_can( + Permissions.CREATE_PORTFOLIO_USERS, message="view create new portfolio member form" +) def new_member(portfolio_id): portfolio = Portfolios.get(g.current_user, portfolio_id) form = member_forms.NewForm() @@ -71,13 +76,14 @@ def new_member(portfolio_id): @portfolios_bp.route("/portfolios//members/new", methods=["POST"]) +@user_can(Permissions.CREATE_PORTFOLIO_USERS, message="create new portfolio member") def create_member(portfolio_id): portfolio = Portfolios.get(g.current_user, portfolio_id) form = member_forms.NewForm(http_request.form) if form.validate(): try: - member = Portfolios.create_member(g.current_user, portfolio, form.data) + member = Portfolios.create_member(portfolio, form.data) invite_service = InvitationService( g.current_user, member, form.data.get("email") ) @@ -99,16 +105,11 @@ def create_member(portfolio_id): @portfolios_bp.route("/portfolios//members//member_edit") +@user_can(Permissions.VIEW_PORTFOLIO_USERS, message="view portfolio member") def view_member(portfolio_id, member_id): portfolio = Portfolios.get(g.current_user, portfolio_id) - Authorization.check_portfolio_permission( - g.current_user, - portfolio, - Permissions.EDIT_PORTFOLIO_USERS, - "edit this portfolio user", - ) member = PortfolioRoles.get(portfolio_id, member_id) - applications = Applications.get_all(g.current_user, member, portfolio) + applications = Applications.get_all(portfolio) form = member_forms.EditForm(portfolio_role="admin") editable = g.current_user == member.user can_revoke_access = Portfolios.can_revoke_access_for(portfolio, member) @@ -130,17 +131,14 @@ def view_member(portfolio_id, member_id): ) +# TODO: check if member_id is consistent with other routes here; +# user ID vs portfolio role ID @portfolios_bp.route( "/portfolios//members//member_edit", methods=["POST"] ) +@user_can(Permissions.EDIT_PORTFOLIO_USERS, message="update portfolio member") def update_member(portfolio_id, member_id): portfolio = Portfolios.get(g.current_user, portfolio_id) - Authorization.check_portfolio_permission( - g.current_user, - portfolio, - Permissions.EDIT_PORTFOLIO_USERS, - "edit this portfolio user", - ) member = PortfolioRoles.get(portfolio_id, member_id) ids_and_roles = [] @@ -153,12 +151,8 @@ def update_member(portfolio_id, member_id): form = member_forms.EditForm(http_request.form) if form.validate(): - member = Portfolios.update_member( - g.current_user, portfolio, member, form.data["permission_sets"] - ) - updated_roles = Environments.update_environment_roles( - g.current_user, portfolio, member, ids_and_roles - ) + member = Portfolios.update_member(member, form.data["permission_sets"]) + updated_roles = Environments.update_environment_roles(member, ids_and_roles) if updated_roles: flash("environment_access_changed") @@ -177,7 +171,8 @@ def update_member(portfolio_id, member_id): @portfolios_bp.route( "/portfolios//members//revoke_access", methods=["POST"] ) +@user_can(Permissions.EDIT_PORTFOLIO_USERS, message="revoke portfolio access") def revoke_access(portfolio_id, member_id): - revoked_role = Portfolios.revoke_access(g.current_user, portfolio_id, member_id) + revoked_role = Portfolios.revoke_access(portfolio_id, member_id) flash("revoked_portfolio_access", member_name=revoked_role.user.full_name) return redirect(url_for("portfolios.portfolio_members", portfolio_id=portfolio_id)) diff --git a/atst/routes/portfolios/task_orders.py b/atst/routes/portfolios/task_orders.py index d8f4fba8..95db9f09 100644 --- a/atst/routes/portfolios/task_orders.py +++ b/atst/routes/portfolios/task_orders.py @@ -20,9 +20,12 @@ from atst.services.invitation import ( OFFICER_INVITATIONS, Invitation as InvitationService, ) +from atst.domain.authz.decorator import user_can_access_decorator as user_can +from atst.models.permissions import Permissions @portfolios_bp.route("/portfolios//task_orders") +@user_can(Permissions.VIEW_PORTFOLIO_FUNDING, message="view portfolio funding") def portfolio_funding(portfolio_id): portfolio = Portfolios.get(g.current_user, portfolio_id) task_orders_by_status = defaultdict(list) @@ -66,9 +69,10 @@ def portfolio_funding(portfolio_id): @portfolios_bp.route("/portfolios//task_order/") +@user_can(Permissions.VIEW_TASK_ORDER_DETAILS, message="view task order details") def view_task_order(portfolio_id, task_order_id): portfolio = Portfolios.get(g.current_user, portfolio_id) - task_order = TaskOrders.get(g.current_user, task_order_id) + task_order = TaskOrders.get(task_order_id) to_form_complete = TaskOrders.all_sections_complete(task_order) dd_254_complete = DD254s.is_complete(task_order.dd_254) return render_template( @@ -85,12 +89,22 @@ def view_task_order(portfolio_id, task_order_id): ) -@portfolios_bp.route("/portfolios//task_order//review") -def ko_review(portfolio_id, task_order_id): - task_order = TaskOrders.get(g.current_user, task_order_id) - portfolio = Portfolios.get(g.current_user, portfolio_id) +def wrap_check_is_ko_or_cor(user, task_order_id=None, **_kwargs): + task_order = TaskOrders.get(task_order_id) + Authorization.check_is_ko_or_cor(user, task_order) - Authorization.check_is_ko_or_cor(g.current_user, task_order) + return True + + +@portfolios_bp.route("/portfolios//task_order//review") +@user_can( + None, + exception=wrap_check_is_ko_or_cor, + message="view contracting officer review form", +) +def ko_review(portfolio_id, task_order_id): + task_order = TaskOrders.get(task_order_id) + portfolio = Portfolios.get(g.current_user, portfolio_id) if TaskOrders.all_sections_complete(task_order): return render_template( @@ -107,7 +121,10 @@ def ko_review(portfolio_id, task_order_id): "/portfolios//task_order//resend_invite", methods=["POST"], ) -def resend_invite(portfolio_id, task_order_id, form=None): +@user_can( + Permissions.EDIT_TASK_ORDER_DETAILS, message="resend task order officer invites" +) +def resend_invite(portfolio_id, task_order_id): invite_type = http_request.args.get("invite_type") if invite_type not in OFFICER_INVITATIONS: @@ -115,7 +132,7 @@ def resend_invite(portfolio_id, task_order_id, form=None): invite_type_info = OFFICER_INVITATIONS[invite_type] - task_order = TaskOrders.get(g.current_user, task_order_id) + task_order = TaskOrders.get(task_order_id) portfolio = Portfolios.get(g.current_user, portfolio_id) officer = getattr(task_order, invite_type_info["role"]) @@ -164,15 +181,16 @@ def resend_invite(portfolio_id, task_order_id, form=None): @portfolios_bp.route( "/portfolios//task_order//review", methods=["POST"] ) +@user_can( + None, exception=wrap_check_is_ko_or_cor, message="submit contracting officer review" +) def submit_ko_review(portfolio_id, task_order_id, form=None): - task_order = TaskOrders.get(g.current_user, task_order_id) + task_order = TaskOrders.get(task_order_id) form_data = {**http_request.form, **http_request.files} form = KOReviewForm(form_data) - Authorization.check_is_ko_or_cor(g.current_user, task_order) - if form.validate(): - TaskOrders.update(user=g.current_user, task_order=task_order, **form.data) + TaskOrders.update(task_order=task_order, **form.data) if Authorization.is_ko(g.current_user, task_order) and TaskOrders.can_ko_sign( task_order ): @@ -199,9 +217,12 @@ def submit_ko_review(portfolio_id, task_order_id, form=None): @portfolios_bp.route( "/portfolios//task_order//invitations" ) +@user_can( + Permissions.EDIT_TASK_ORDER_DETAILS, message="view task order invitations page" +) def task_order_invitations(portfolio_id, task_order_id): portfolio = Portfolios.get(g.current_user, portfolio_id) - task_order = TaskOrders.get(g.current_user, task_order_id) + task_order = TaskOrders.get(task_order_id) form = EditTaskOrderOfficersForm(obj=task_order) if TaskOrders.all_sections_complete(task_order): @@ -219,9 +240,10 @@ def task_order_invitations(portfolio_id, task_order_id): "/portfolios//task_order//invitations", methods=["POST"], ) +@user_can(Permissions.EDIT_TASK_ORDER_DETAILS, message="edit task order invitations") def edit_task_order_invitations(portfolio_id, task_order_id): portfolio = Portfolios.get(g.current_user, portfolio_id) - task_order = TaskOrders.get(g.current_user, task_order_id) + task_order = TaskOrders.get(task_order_id) form = EditTaskOrderOfficersForm(formdata=http_request.form, obj=task_order) if form.validate(): @@ -266,11 +288,17 @@ def so_review_form(task_order): return DD254Form(data=form_data) -@portfolios_bp.route("/portfolios//task_order//dd254") -def so_review(portfolio_id, task_order_id): - task_order = TaskOrders.get(g.current_user, task_order_id) - Authorization.check_is_so(g.current_user, task_order) +def wrap_check_is_so(user, task_order_id=None, **_kwargs): + task_order = TaskOrders.get(task_order_id) + Authorization.check_is_so(user, task_order) + return True + + +@portfolios_bp.route("/portfolios//task_order//dd254") +@user_can(None, exception=wrap_check_is_so, message="view security officer review form") +def so_review(portfolio_id, task_order_id): + task_order = TaskOrders.get(task_order_id) form = so_review_form(task_order) return render_template( @@ -284,10 +312,11 @@ def so_review(portfolio_id, task_order_id): @portfolios_bp.route( "/portfolios//task_order//dd254", methods=["POST"] ) +@user_can( + None, exception=wrap_check_is_so, message="submit security officer review form" +) def submit_so_review(portfolio_id, task_order_id): - task_order = TaskOrders.get(g.current_user, task_order_id) - Authorization.check_is_so(g.current_user, task_order) - + task_order = TaskOrders.get(task_order_id) form = DD254Form(http_request.form) if form.validate(): diff --git a/atst/routes/task_orders/index.py b/atst/routes/task_orders/index.py index 86f25f91..1f775cc8 100644 --- a/atst/routes/task_orders/index.py +++ b/atst/routes/task_orders/index.py @@ -1,15 +1,18 @@ from io import BytesIO -from flask import g, Response, current_app as app +from flask import Response, current_app as app from . import task_orders_bp from atst.domain.task_orders import TaskOrders from atst.domain.exceptions import NotFoundError from atst.utils.docx import Docx +from atst.domain.authz.decorator import user_can_access_decorator as user_can +from atst.models.permissions import Permissions @task_orders_bp.route("/task_orders/download_summary/") +@user_can(Permissions.VIEW_TASK_ORDER_DETAILS, message="download task order summary") def download_summary(task_order_id): - task_order = TaskOrders.get(g.current_user, task_order_id) + task_order = TaskOrders.get(task_order_id) byte_str = BytesIO() Docx.render(byte_str, data=task_order.to_dictionary()) filename = "{}.docx".format(task_order.portfolio_name) @@ -31,8 +34,12 @@ def send_file(attachment): @task_orders_bp.route("/task_orders/csp_estimate/") +@user_can( + Permissions.VIEW_TASK_ORDER_DETAILS, + message="download task order cloud service provider estimate", +) def download_csp_estimate(task_order_id): - task_order = TaskOrders.get(g.current_user, task_order_id) + task_order = TaskOrders.get(task_order_id) if task_order.csp_estimate: return send_file(task_order.csp_estimate) else: @@ -40,8 +47,9 @@ def download_csp_estimate(task_order_id): @task_orders_bp.route("/task_orders/pdf/") +@user_can(Permissions.VIEW_TASK_ORDER_DETAILS, message="download task order PDF") def download_task_order_pdf(task_order_id): - task_order = TaskOrders.get(g.current_user, task_order_id) + task_order = TaskOrders.get(task_order_id) if task_order.pdf: return send_file(task_order.pdf) else: diff --git a/atst/routes/task_orders/invite.py b/atst/routes/task_orders/invite.py index f3a29d01..4021adda 100644 --- a/atst/routes/task_orders/invite.py +++ b/atst/routes/task_orders/invite.py @@ -4,11 +4,14 @@ from . import task_orders_bp from atst.domain.task_orders import TaskOrders from atst.utils.flash import formatted_flash as flash from atst.services.invitation import update_officer_invitations +from atst.domain.authz.decorator import user_can_access_decorator as user_can +from atst.models.permissions import Permissions @task_orders_bp.route("/task_orders/invite/", methods=["POST"]) +@user_can(Permissions.EDIT_TASK_ORDER_DETAILS, message="invite task order officers") def invite(task_order_id): - task_order = TaskOrders.get(g.current_user, task_order_id) + task_order = TaskOrders.get(task_order_id) if TaskOrders.all_sections_complete(task_order): update_officer_invitations(g.current_user, task_order) diff --git a/atst/routes/task_orders/new.py b/atst/routes/task_orders/new.py index 913f7545..91290ca3 100644 --- a/atst/routes/task_orders/new.py +++ b/atst/routes/task_orders/new.py @@ -14,6 +14,8 @@ from atst.domain.task_orders import TaskOrders from atst.domain.portfolios import Portfolios from atst.utils.flash import formatted_flash as flash import atst.forms.task_order as task_order_form +from atst.domain.authz.decorator import user_can_access_decorator as user_can +from atst.models.permissions import Permissions TASK_ORDER_SECTIONS = [ @@ -59,7 +61,7 @@ class ShowTaskOrderWorkflow: @property def task_order(self): if not self._task_order and self.task_order_id: - self._task_order = TaskOrders.get(self.user, self.task_order_id) + self._task_order = TaskOrders.get(self.task_order_id) return self._task_order @@ -228,7 +230,7 @@ class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow): old_name = self.task_order.portfolio_name if not new_name == old_name: Portfolios.update(self.task_order.portfolio, {"name": new_name}) - TaskOrders.update(self.user, self.task_order, **self.task_order_form_data) + TaskOrders.update(self.task_order, **self.task_order_form_data) else: if self.portfolio_id: pf = Portfolios.get(self.user, self.portfolio_id) @@ -239,7 +241,7 @@ class UpdateTaskOrderWorkflow(ShowTaskOrderWorkflow): self.form.defense_component.data, ) self._task_order = TaskOrders.create(portfolio=pf, creator=self.user) - TaskOrders.update(self.user, self.task_order, **self.task_order_form_data) + TaskOrders.update(self.task_order, **self.task_order_form_data) return self.task_order @@ -249,9 +251,23 @@ def get_started(): return render_template("task_orders/new/get_started.html") # pragma: no cover +def is_new_task_order(*_args, **kwargs): + return ( + "screen" in kwargs + and kwargs["screen"] == 1 + and "task_order_id" not in kwargs + and "portfolio_id" not in kwargs + ) + + @task_orders_bp.route("/task_orders/new/") @task_orders_bp.route("/task_orders/new//") @task_orders_bp.route("/portfolios//task_orders/new/") +@user_can( + Permissions.CREATE_TASK_ORDER, + exception=is_new_task_order, + message="view new task order form", +) def new(screen, task_order_id=None, portfolio_id=None): workflow = ShowTaskOrderWorkflow( g.current_user, screen, task_order_id, portfolio_id @@ -298,6 +314,11 @@ def new(screen, task_order_id=None, portfolio_id=None): @task_orders_bp.route( "/portfolios//task_orders/new/", methods=["POST"] ) +@user_can( + Permissions.CREATE_TASK_ORDER, + exception=is_new_task_order, + message="update task order", +) def update(screen, task_order_id=None, portfolio_id=None): form_data = {**http_request.form, **http_request.files} workflow = UpdateTaskOrderWorkflow( diff --git a/atst/routes/task_orders/signing.py b/atst/routes/task_orders/signing.py index d3cf0c55..4f4132dd 100644 --- a/atst/routes/task_orders/signing.py +++ b/atst/routes/task_orders/signing.py @@ -8,11 +8,11 @@ from atst.domain.exceptions import NoAccessError from atst.domain.task_orders import TaskOrders from atst.forms.task_order import SignatureForm from atst.utils.flash import formatted_flash as flash +from atst.domain.authz.decorator import user_can_access_decorator as user_can def find_unsigned_ko_to(task_order_id): - task_order = TaskOrders.get(g.current_user, task_order_id) - Authorization.check_is_ko(g.current_user, task_order) + task_order = TaskOrders.get(task_order_id) if not TaskOrders.can_ko_sign(task_order): raise NoAccessError("task_order") @@ -20,7 +20,17 @@ def find_unsigned_ko_to(task_order_id): return task_order +def wrap_check_is_ko(user, task_order_id=None, **_kwargs): + task_order = TaskOrders.get(task_order_id) + Authorization.check_is_ko(user, task_order) + + return True + + @task_orders_bp.route("/task_orders//digital_signature", methods=["GET"]) +@user_can( + None, exception=wrap_check_is_ko, message="view contracting officer signature page" +) def signature_requested(task_order_id): task_order = find_unsigned_ko_to(task_order_id) @@ -35,6 +45,9 @@ def signature_requested(task_order_id): @task_orders_bp.route( "/task_orders//digital_signature", methods=["POST"] ) +@user_can( + None, exception=wrap_check_is_ko, message="submit contracting officer signature" +) def record_signature(task_order_id): task_order = find_unsigned_ko_to(task_order_id) @@ -49,7 +62,6 @@ def record_signature(task_order_id): if form.validate(): TaskOrders.update( - user=g.current_user, task_order=task_order, signer_dod_id=g.current_user.dod_id, signed_at=datetime.datetime.now(), diff --git a/atst/services/invitation.py b/atst/services/invitation.py index b3bd94fb..0953d42a 100644 --- a/atst/services/invitation.py +++ b/atst/services/invitation.py @@ -33,7 +33,7 @@ def update_officer_invitations(user, task_order): ): officer_data = task_order.officer_dictionary(invite_opts["role"]) officer = TaskOrders.add_officer( - user, task_order, invite_opts["role"], officer_data + task_order, invite_opts["role"], officer_data ) pf_officer_member = PortfolioRoles.get(task_order.portfolio.id, officer.id) invite_service = Invitation( diff --git a/script/seed_sample.py b/script/seed_sample.py index f29fb5f6..f9895e59 100644 --- a/script/seed_sample.py +++ b/script/seed_sample.py @@ -68,7 +68,7 @@ def get_users(): def add_members_to_portfolio(portfolio): for portfolio_role in PORTFOLIO_USERS: - ws_role = Portfolios.create_member(portfolio.owner, portfolio, portfolio_role) + ws_role = Portfolios.create_member(portfolio, portfolio_role) db.session.refresh(ws_role) PortfolioRoles.enable(ws_role) @@ -114,7 +114,6 @@ def create_task_order(portfolio, start, end, clin_01=None, clin_03=None): def add_applications_to_portfolio(portfolio, applications): for application in applications: Applications.create( - portfolio.owner, portfolio=portfolio, name=application["name"], description=application["description"], diff --git a/templates/portfolios/admin.html b/templates/portfolios/admin.html index 3408c3a2..25c3f445 100644 --- a/templates/portfolios/admin.html +++ b/templates/portfolios/admin.html @@ -14,36 +14,44 @@
-
- {{ form.csrf_token }} -
-
- {{ TextInput(form.name, validation="portfolioName") }} -
+ {% if user_can(permissions.VIEW_PORTFOLIO_NAME) %} + + {{ form.csrf_token }} +
+
+ {{ TextInput(form.name, validation="portfolioName") }} +
-
- -
-
-
-
-
{{ "forms.task_order.defense_component_label" | translate }}
- {% if portfolio.defense_component %} -
{{ portfolio.defense_component }}
- {% else %} -
{{ "fragments.portfolio_admin.none" | translate }}
- {% endif %} +
+
- +
+
+
{{ "forms.task_order.defense_component_label" | translate }}
+ {% if portfolio.defense_component %} +
{{ portfolio.defense_component }}
+ {% else %} +
{{ "fragments.portfolio_admin.none" | translate }}
+ {% endif %} +
+
+ + {% endif %}
- {% include "fragments/primary_point_of_contact.html" %} + {% if user_can(permissions.VIEW_PORTFOLIO_POC) %} + {% include "fragments/primary_point_of_contact.html" %} + {% endif %} - {% include "fragments/admin/portfolio_members.html" %} - {% include "fragments/audit_events_log.html" %} + {% if user_can(permissions.VIEW_PORTFOLIO_USERS) %} + {% include "fragments/admin/portfolio_members.html" %} + {% endif %} - {{ Pagination(audit_events, 'portfolios.portfolio_admin', portfolio_id=portfolio.id) }} + {% if user_can(permissions.VIEW_PORTFOLIO_ACTIVITY_LOG) %} + {% include "fragments/audit_events_log.html" %} + {{ Pagination(audit_events, 'portfolios.portfolio_admin', portfolio_id=portfolio.id) }} + {% endif %}
{% endblock %} diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 779e3c66..fd088647 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -14,6 +14,7 @@ from atst.domain.authnid.crl import ( ) from tests.mocks import FIXTURE_EMAIL_ADDRESS, DOD_CN +from tests.utils import FakeLogger class MockX509Store: @@ -119,20 +120,6 @@ def test_multistep_certificate_chain(): assert cache.crl_check(cert) -class FakeLogger: - def __init__(self): - self.messages = [] - - def info(self, msg): - self.messages.append(msg) - - def warning(self, msg): - self.messages.append(msg) - - def error(self, msg): - self.messages.append(msg) - - def test_no_op_crl_cache_logs_common_name(): logger = FakeLogger() cert = open("ssl/client-certs/atat.mil.crt", "rb").read() diff --git a/tests/domain/test_applications.py b/tests/domain/test_applications.py index 5ac13cde..98dde0e2 100644 --- a/tests/domain/test_applications.py +++ b/tests/domain/test_applications.py @@ -6,7 +6,7 @@ from atst.domain.portfolios import Portfolios def test_create_application_with_multiple_environments(): portfolio = PortfolioFactory.create() application = Applications.create( - portfolio.owner, portfolio, "My Test Application", "Test", ["dev", "prod"] + portfolio, "My Test Application", "Test", ["dev", "prod"] ) assert application.portfolio == portfolio @@ -21,7 +21,7 @@ def test_portfolio_owner_can_view_environments(): owner=owner, applications=[{"environments": [{"name": "dev"}, {"name": "prod"}]}], ) - application = Applications.get(owner, portfolio, portfolio.applications[0].id) + application = Applications.get(portfolio.applications[0].id) assert len(application.environments) == 2 @@ -38,11 +38,9 @@ def test_can_only_update_name_and_description(): } ], ) - application = Applications.get(owner, portfolio, portfolio.applications[0].id) + application = Applications.get(portfolio.applications[0].id) env_name = application.environments[0].name Applications.update( - owner, - portfolio, application, { "name": "New Name", diff --git a/tests/domain/test_audit_log.py b/tests/domain/test_audit_log.py index 925e15f2..af4f76e0 100644 --- a/tests/domain/test_audit_log.py +++ b/tests/domain/test_audit_log.py @@ -22,54 +22,15 @@ def developer(): return UserFactory.create() -def test_non_admin_cannot_view_audit_log(developer): - with pytest.raises(UnauthorizedError): - AuditLog.get_all_events(developer) - - -def test_ccpo_can_view_audit_log(ccpo): - events = AuditLog.get_all_events(ccpo) - assert len(events) > 0 - - -def test_paginate_audit_log(ccpo): +def test_paginate_audit_log(): user = UserFactory.create() for _ in range(100): AuditLog.log_system_event(user, action="create") - events = AuditLog.get_all_events(ccpo, pagination_opts={"per_page": 25, "page": 2}) + events = AuditLog.get_all_events(pagination_opts={"per_page": 25, "page": 2}) assert len(events) == 25 -def test_ccpo_can_view_ws_audit_log(ccpo): - portfolio = PortfolioFactory.create() - events = AuditLog.get_portfolio_events(ccpo, portfolio) - assert len(events) > 0 - - -def test_ws_admin_can_view_ws_audit_log(): - portfolio = PortfolioFactory.create() - admin = UserFactory.create() - PortfolioRoleFactory.create( - portfolio=portfolio, user=admin, status=PortfolioRoleStatus.ACTIVE - ) - events = AuditLog.get_portfolio_events(admin, portfolio) - assert len(events) > 0 - - -def test_ws_owner_can_view_ws_audit_log(): - portfolio = PortfolioFactory.create() - events = AuditLog.get_portfolio_events(portfolio.owner, portfolio) - assert len(events) > 0 - - -def test_other_users_cannot_view_portfolio_audit_log(): - with pytest.raises(UnauthorizedError): - portfolio = PortfolioFactory.create() - dev = UserFactory.create() - AuditLog.get_portfolio_events(dev, portfolio) - - def test_paginate_ws_audit_log(): portfolio = PortfolioFactory.create() application = ApplicationFactory.create(portfolio=portfolio) @@ -79,7 +40,7 @@ def test_paginate_ws_audit_log(): ) events = AuditLog.get_portfolio_events( - portfolio.owner, portfolio, pagination_opts={"per_page": 25, "page": 2} + portfolio, pagination_opts={"per_page": 25, "page": 2} ) assert len(events) == 25 @@ -92,7 +53,7 @@ def test_ws_audit_log_only_includes_current_ws_events(): application_1 = ApplicationFactory.create(portfolio=portfolio) application_2 = ApplicationFactory.create(portfolio=other_portfolio) - events = AuditLog.get_portfolio_events(portfolio.owner, portfolio) + events = AuditLog.get_portfolio_events(portfolio) for event in events: assert event.portfolio_id == portfolio.id or event.resource_id == portfolio.id assert ( diff --git a/tests/domain/test_authz.py b/tests/domain/test_authz.py index fdf72fdd..81102ae2 100644 --- a/tests/domain/test_authz.py +++ b/tests/domain/test_authz.py @@ -1,11 +1,19 @@ import pytest -from tests.factories import TaskOrderFactory, UserFactory, PortfolioRoleFactory -from atst.domain.authz import Authorization +from tests.factories import ( + TaskOrderFactory, + UserFactory, + PortfolioFactory, + PortfolioRoleFactory, +) +from atst.domain.authz import Authorization, user_can_access +from atst.domain.authz.decorator import user_can_access_decorator from atst.domain.permission_sets import PermissionSets from atst.domain.exceptions import UnauthorizedError from atst.models.permissions import Permissions +from tests.utils import FakeLogger + @pytest.fixture def invalid_user(): @@ -58,3 +66,137 @@ def test_has_portfolio_permission(): assert not Authorization.has_portfolio_permission( different_user, port_role.portfolio, Permissions.VIEW_PORTFOLIO_REPORTS ) + + +def test_user_can_access(): + ccpo = UserFactory.create_ccpo() + edit_admin = UserFactory.create() + view_admin = UserFactory.create() + + portfolio = PortfolioFactory.create(owner=edit_admin) + # factory gives view perms by default + PortfolioRoleFactory.create(user=view_admin, portfolio=portfolio) + + # check a site-wide permission + assert user_can_access(ccpo, Permissions.VIEW_AUDIT_LOG) + + with pytest.raises(UnauthorizedError): + user_can_access(edit_admin, Permissions.VIEW_AUDIT_LOG) + + with pytest.raises(UnauthorizedError): + user_can_access(view_admin, Permissions.VIEW_AUDIT_LOG) + + # check a portfolio view permission + assert user_can_access(ccpo, Permissions.VIEW_PORTFOLIO, portfolio=portfolio) + assert user_can_access(edit_admin, Permissions.VIEW_PORTFOLIO, portfolio=portfolio) + assert user_can_access(view_admin, Permissions.VIEW_PORTFOLIO, portfolio=portfolio) + + # check a portfolio edit permission + assert user_can_access(ccpo, Permissions.EDIT_PORTFOLIO_NAME, portfolio=portfolio) + assert user_can_access( + edit_admin, Permissions.EDIT_PORTFOLIO_NAME, portfolio=portfolio + ) + with pytest.raises(UnauthorizedError): + user_can_access( + view_admin, Permissions.EDIT_PORTFOLIO_NAME, portfolio=portfolio + ) + + +@pytest.fixture +def set_current_user(request_ctx): + def _set_current_user(user): + request_ctx.g.current_user = user + + yield _set_current_user + + request_ctx.g.current_user = None + + +def test_user_can_access_decorator(set_current_user): + ccpo = UserFactory.create_ccpo() + edit_admin = UserFactory.create() + view_admin = UserFactory.create() + + portfolio = PortfolioFactory.create(owner=edit_admin) + # factory gives view perms by default + PortfolioRoleFactory.create(user=view_admin, portfolio=portfolio) + + @user_can_access_decorator(Permissions.EDIT_PORTFOLIO_NAME) + def _edit_portfolio_name(*args, **kwargs): + return True + + set_current_user(ccpo) + assert _edit_portfolio_name(portfolio_id=portfolio.id) + + set_current_user(edit_admin) + assert _edit_portfolio_name(portfolio_id=portfolio.id) + + set_current_user(view_admin) + with pytest.raises(UnauthorizedError): + _edit_portfolio_name(portfolio_id=portfolio.id) + + +def test_user_can_access_decorator_exceptions(set_current_user): + rando_calrissian = UserFactory.create() + darth_vader = UserFactory.create() + portfolio = PortfolioFactory.create() + + def _can_fly_the_millenium_falcon(u, *args, **kwargs): + if u == rando_calrissian: + return True + else: + raise UnauthorizedError(u, "is not rando") + + @user_can_access_decorator( + Permissions.EDIT_PORTFOLIO_NAME, exception=_can_fly_the_millenium_falcon + ) + def _cloud_city(*args, **kwargs): + return True + + set_current_user(rando_calrissian) + assert _cloud_city() + + set_current_user(darth_vader) + with pytest.raises(UnauthorizedError): + assert _cloud_city() + + +@pytest.fixture +def mock_logger(app): + real_logger = app.logger + app.logger = FakeLogger() + + yield app.logger + + app.logger = real_logger + + +def test_user_can_access_decorator_logs_access( + set_current_user, monkeypatch, mock_logger +): + user = UserFactory.create() + + @user_can_access_decorator(Permissions.EDIT_PORTFOLIO_NAME) + def _do_something(*args, **kwargs): + return True + + set_current_user(user) + + monkeypatch.setattr( + "atst.domain.authz.decorator.check_access", lambda *a, **k: True + ) + _do_something() + assert len(mock_logger.messages) == 1 + assert "accessed" in mock_logger.messages[0] + assert "GET" in mock_logger.messages[0] + + def _unauthorized(*a, **k): + raise UnauthorizedError(user, "do something") + + monkeypatch.setattr("atst.domain.authz.decorator.check_access", _unauthorized) + with pytest.raises(UnauthorizedError): + _do_something() + + assert len(mock_logger.messages) == 2 + assert "denied access" in mock_logger.messages[1] + assert "GET" in mock_logger.messages[1] diff --git a/tests/domain/test_environments.py b/tests/domain/test_environments.py index 78cc8b0d..dee03fb6 100644 --- a/tests/domain/test_environments.py +++ b/tests/domain/test_environments.py @@ -29,9 +29,7 @@ def test_create_environment_role_creates_cloud_id(session): portfolio_role = portfolio.members[0] assert not portfolio_role.user.cloud_id - assert Environments.update_environment_roles( - owner, portfolio, portfolio_role, new_role - ) + assert Environments.update_environment_roles(portfolio_role, new_role) assert portfolio_role.user.cloud_id is not None @@ -69,9 +67,7 @@ def test_update_environment_roles(): ] portfolio_role = portfolio.members[0] - assert Environments.update_environment_roles( - owner, portfolio, portfolio_role, new_ids_and_roles - ) + assert Environments.update_environment_roles(portfolio_role, new_ids_and_roles) new_dev_env_role = EnvironmentRoles.get(portfolio_role.user.id, dev_env.id) staging_env_role = EnvironmentRoles.get(portfolio_role.user.id, staging_env.id) @@ -120,9 +116,7 @@ def test_remove_environment_role(): ] portfolio_role = PortfolioRoles.get(portfolio.id, developer.id) - assert Environments.update_environment_roles( - owner, portfolio, portfolio_role, new_environment_roles - ) + assert Environments.update_environment_roles(portfolio_role, new_environment_roles) assert portfolio_role.num_environment_roles == 2 assert EnvironmentRoles.get(developer.id, now_ba).role == "billing_auditor" @@ -154,9 +148,7 @@ def test_no_update_to_environment_roles(): new_ids_and_roles = [{"id": dev_env.id, "role": "devops"}] portfolio_role = PortfolioRoles.get(portfolio.id, developer.id) - assert not Environments.update_environment_roles( - owner, portfolio, portfolio_role, new_ids_and_roles - ) + assert not Environments.update_environment_roles(portfolio_role, new_ids_and_roles) def test_get_scoped_environments(db): diff --git a/tests/domain/test_invitations.py b/tests/domain/test_invitations.py index 7c0f0ff8..ef08c879 100644 --- a/tests/domain/test_invitations.py +++ b/tests/domain/test_invitations.py @@ -130,7 +130,7 @@ def test_resend_invitation(): user = UserFactory.create() ws_role = PortfolioRoleFactory.create(user=user, portfolio=portfolio) invite = Invitations.create(portfolio.owner, ws_role, user.email) - Invitations.resend(portfolio.owner, portfolio.id, invite.token) + Invitations.resend(user, invite.token) assert ws_role.invitations[0].is_revoked assert ws_role.invitations[1].is_pending diff --git a/tests/domain/test_portfolios.py b/tests/domain/test_portfolios.py index e429f9b7..6819b36b 100644 --- a/tests/domain/test_portfolios.py +++ b/tests/domain/test_portfolios.py @@ -46,24 +46,6 @@ def test_portfolio_has_timestamps(portfolio): assert portfolio.time_created == portfolio.time_updated -def test_portfolios_get_ensures_user_is_in_portfolio(portfolio, portfolio_owner): - outside_user = UserFactory.create() - with pytest.raises(UnauthorizedError): - Portfolios.get(outside_user, portfolio.id) - - -def test_get_for_update_applications_allows_owner(portfolio, portfolio_owner): - Portfolios.get_for_update_applications(portfolio_owner, portfolio.id) - - -def test_get_for_update_applications_blocks_developer(portfolio): - developer = UserFactory.create() - PortfolioRoles.add(developer, portfolio.id) - - with pytest.raises(UnauthorizedError): - Portfolios.get_for_update_applications(developer, portfolio.id) - - def test_can_create_portfolio_role(portfolio, portfolio_owner): user_data = { "first_name": "New", @@ -73,7 +55,7 @@ def test_can_create_portfolio_role(portfolio, portfolio_owner): "dod_id": "1234567890", } - new_member = Portfolios.create_member(portfolio_owner, portfolio, user_data) + new_member = Portfolios.create_member(portfolio, user_data) assert new_member.portfolio == portfolio assert new_member.user.provisional @@ -88,27 +70,12 @@ def test_can_add_existing_user_to_portfolio(portfolio, portfolio_owner): "dod_id": user.dod_id, } - new_member = Portfolios.create_member(portfolio_owner, portfolio, user_data) + new_member = Portfolios.create_member(portfolio, user_data) assert new_member.portfolio == portfolio assert new_member.user.email == user.email assert not new_member.user.provisional -def test_need_permission_to_create_portfolio_role(portfolio, portfolio_owner): - random_user = UserFactory.create() - - user_data = { - "first_name": "New", - "last_name": "User", - "email": "new.user@mail.com", - "portfolio_role": "developer", - "dod_id": "1234567890", - } - - with pytest.raises(UnauthorizedError): - Portfolios.create_member(random_user, portfolio, user_data) - - def test_update_portfolio_role_role(portfolio, portfolio_owner): user_data = { "first_name": "New", @@ -121,53 +88,13 @@ def test_update_portfolio_role_role(portfolio, portfolio_owner): member = PortfolioRoleFactory.create(portfolio=portfolio) permission_sets = [PermissionSets.EDIT_PORTFOLIO_FUNDING] - updated_member = Portfolios.update_member( - portfolio_owner, portfolio, member, permission_sets=permission_sets - ) + updated_member = Portfolios.update_member(member, permission_sets=permission_sets) assert updated_member.portfolio == portfolio -def test_need_permission_to_update_portfolio_role_role(portfolio, portfolio_owner): - random_user = UserFactory.create() - user_data = { - "first_name": "New", - "last_name": "User", - "email": "new.user@mail.com", - "portfolio_role": "developer", - "dod_id": "1234567890", - } - member = Portfolios.create_member(portfolio_owner, portfolio, user_data) - role_name = "developer" - - with pytest.raises(UnauthorizedError): - Portfolios.update_member(random_user, portfolio, member, role_name) - - -def test_owner_can_view_portfolio_members(portfolio, portfolio_owner): - portfolio = Portfolios.get_with_members(portfolio_owner, portfolio.id) - - assert portfolio - - -def test_ccpo_can_view_portfolio_members(portfolio, portfolio_owner): - ccpo = UserFactory.create_ccpo() - assert Portfolios.get_with_members(ccpo, portfolio.id) - - -def test_random_user_cannot_view_portfolio_members(portfolio): - developer = UserFactory.create() - - with pytest.raises(UnauthorizedError): - portfolio = Portfolios.get_with_members(developer, portfolio.id) - - def test_scoped_portfolio_for_admin_missing_view_apps_perms(portfolio_owner, portfolio): Applications.create( - portfolio_owner, - portfolio, - "My Application 2", - "My application 2", - ["dev", "staging", "prod"], + portfolio, "My Application 2", "My application 2", ["dev", "staging", "prod"] ) restricted_admin = UserFactory.create() PortfolioRoleFactory.create( @@ -186,18 +113,10 @@ def test_scoped_portfolio_only_returns_a_users_applications_and_environments( portfolio, portfolio_owner ): new_application = Applications.create( - portfolio_owner, - portfolio, - "My Application", - "My application", - ["dev", "staging", "prod"], + portfolio, "My Application", "My application", ["dev", "staging", "prod"] ) Applications.create( - portfolio_owner, - portfolio, - "My Application 2", - "My application 2", - ["dev", "staging", "prod"], + portfolio, "My Application 2", "My application 2", ["dev", "staging", "prod"] ) developer = UserFactory.create() dev_environment = Environments.add_member( @@ -217,11 +136,7 @@ def test_scoped_portfolio_returns_all_applications_for_portfolio_admin( ): for _ in range(5): Applications.create( - portfolio_owner, - portfolio, - "My Application", - "My application", - ["dev", "staging", "prod"], + portfolio, "My Application", "My application", ["dev", "staging", "prod"] ) admin = UserFactory.create() @@ -240,11 +155,7 @@ def test_scoped_portfolio_returns_all_applications_for_portfolio_owner( ): for _ in range(5): Applications.create( - portfolio_owner, - portfolio, - "My Application", - "My application", - ["dev", "staging", "prod"], + portfolio, "My Application", "My application", ["dev", "staging", "prod"] ) scoped_portfolio = Portfolios.get(portfolio_owner, portfolio.id) @@ -282,27 +193,6 @@ def test_for_user_returns_all_portfolios_for_ccpo(portfolio, portfolio_owner): assert len(sams_portfolios) == 2 -def test_get_for_update_information(portfolio, portfolio_owner): - owner_ws = Portfolios.get_for_update_information(portfolio_owner, portfolio.id) - assert portfolio == owner_ws - - admin = UserFactory.create() - perm_sets = get_all_portfolio_permission_sets() - PortfolioRoleFactory.create( - user=admin, portfolio=portfolio, permission_sets=perm_sets - ) - admin_ws = Portfolios.get_for_update_information(admin, portfolio.id) - assert portfolio == admin_ws - - # TODO: implement ccpo roles - # ccpo = UserFactory.create_ccpo() - # assert Portfolios.get_for_update_information(ccpo, portfolio.id) - - developer = UserFactory.create() - with pytest.raises(UnauthorizedError): - Portfolios.get_for_update_information(developer, portfolio.id) - - def test_can_create_portfolios_with_matching_names(): portfolio_name = "Great Portfolio" PortfolioFactory.create(name=portfolio_name) @@ -314,7 +204,7 @@ def test_able_to_revoke_portfolio_access_for_active_member(): portfolio_role = PortfolioRoleFactory.create( portfolio=portfolio, status=PortfolioRoleStatus.ACTIVE ) - Portfolios.revoke_access(portfolio.owner, portfolio.id, portfolio_role.id) + Portfolios.revoke_access(portfolio.id, portfolio_role.id) assert Portfolios.for_user(portfolio_role.user) == [] @@ -334,7 +224,7 @@ def test_unable_to_revoke_owner_portfolio_access(): owner_portfolio_role = portfolio.roles[0] with pytest.raises(PortfolioError): - Portfolios.revoke_access(portfolio.owner, portfolio.id, owner_portfolio_role.id) + Portfolios.revoke_access(portfolio.id, owner_portfolio_role.id) def test_disabled_members_dont_show_up(session): diff --git a/tests/domain/test_task_orders.py b/tests/domain/test_task_orders.py index e1d82f10..fad18ade 100644 --- a/tests/domain/test_task_orders.py +++ b/tests/domain/test_task_orders.py @@ -21,7 +21,7 @@ def test_is_signed_by_ko(): assert not TaskOrders.is_signed_by_ko(task_order) - TaskOrders.update(user, task_order, signer_dod_id=user.dod_id) + TaskOrders.update(task_order, signer_dod_id=user.dod_id) assert TaskOrders.is_signed_by_ko(task_order) @@ -68,7 +68,7 @@ def test_add_officer(): task_order = TaskOrderFactory.create() ko = UserFactory.create() owner = task_order.portfolio.owner - TaskOrders.add_officer(owner, task_order, "contracting_officer", ko.to_dictionary()) + TaskOrders.add_officer(task_order, "contracting_officer", ko.to_dictionary()) assert task_order.contracting_officer == ko portfolio_users = [ws_role.user for ws_role in task_order.portfolio.members] @@ -80,62 +80,19 @@ def test_add_officer_with_nonexistent_role(): ko = UserFactory.create() owner = task_order.portfolio.owner with pytest.raises(TaskOrderError): - TaskOrders.add_officer(owner, task_order, "pilot", ko.to_dictionary()) + TaskOrders.add_officer(task_order, "pilot", ko.to_dictionary()) def test_add_officer_who_is_already_portfolio_member(): task_order = TaskOrderFactory.create() owner = task_order.portfolio.owner - TaskOrders.add_officer( - owner, task_order, "contracting_officer", owner.to_dictionary() - ) + TaskOrders.add_officer(task_order, "contracting_officer", owner.to_dictionary()) assert task_order.contracting_officer == owner member = task_order.portfolio.members[0] assert member.user == owner -def test_task_order_access(): - creator = UserFactory.create() - member = UserFactory.create() - rando = UserFactory.create() - officer = UserFactory.create() - - def check_access(can, cannot, method_name, method_args): - method = getattr(TaskOrders, method_name) - - for user in can: - assert method(user, *method_args) - - for user in cannot: - with pytest.raises(UnauthorizedError): - method(user, *method_args) - - portfolio = PortfolioFactory.create(owner=creator) - task_order = TaskOrderFactory.create(creator=creator, portfolio=portfolio) - PortfolioRoleFactory.create( - user=member, - portfolio=task_order.portfolio, - permission_sets=[ - PermissionSets.get(prms) - for prms in PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS - ], - ) - TaskOrders.add_officer( - creator, task_order, "contracting_officer", officer.to_dictionary() - ) - - check_access([creator, officer, member], [rando], "get", [task_order.id]) - check_access([creator, officer], [member, rando], "create", [portfolio]) - check_access([creator, officer], [member, rando], "update", [task_order]) - check_access( - [creator, officer], - [member, rando], - "add_officer", - [task_order, "contracting_officer", UserFactory.dictionary()], - ) - - def test_dd254_complete(): finished = DD254Factory.create() unfinished = DD254Factory.create(certifying_official=None) diff --git a/tests/models/test_environments.py b/tests/models/test_environments.py index 0dad7874..17a32ec8 100644 --- a/tests/models/test_environments.py +++ b/tests/models/test_environments.py @@ -9,11 +9,7 @@ def test_add_user_to_environment(): portfolio = PortfolioFactory.create(owner=owner) application = Applications.create( - owner, - portfolio, - "my test application", - "It's mine.", - ["dev", "staging", "prod"], + portfolio, "my test application", "It's mine.", ["dev", "staging", "prod"] ) dev_environment = application.environments[0] diff --git a/tests/models/test_portfolio_role.py b/tests/models/test_portfolio_role.py index 333375ff..7747a5c6 100644 --- a/tests/models/test_portfolio_role.py +++ b/tests/models/test_portfolio_role.py @@ -120,7 +120,7 @@ def test_has_env_role_history(session): user=user, environment=environment, role="developer" ) Environments.update_environment_roles( - owner, portfolio, portfolio_role, [{"role": "admin", "id": environment.id}] + portfolio_role, [{"role": "admin", "id": environment.id}] ) changed_events = ( session.query(AuditEvent) @@ -154,7 +154,7 @@ def test_has_no_environment_roles(): } portfolio = PortfolioFactory.create(owner=owner) - portfolio_role = Portfolios.create_member(owner, portfolio, developer_data) + portfolio_role = Portfolios.create_member(portfolio, developer_data) assert not portfolio_role.has_environment_roles @@ -170,13 +170,9 @@ def test_has_environment_roles(): } portfolio = PortfolioFactory.create(owner=owner) - portfolio_role = Portfolios.create_member(owner, portfolio, developer_data) + portfolio_role = Portfolios.create_member(portfolio, developer_data) application = Applications.create( - owner, - portfolio, - "my test application", - "It's mine.", - ["dev", "staging", "prod"], + portfolio, "my test application", "It's mine.", ["dev", "staging", "prod"] ) Environments.add_member( application.environments[0], portfolio_role.user, "developer" diff --git a/tests/routes/portfolios/test_applications.py b/tests/routes/portfolios/test_applications.py index ff38fd1c..b6245aaa 100644 --- a/tests/routes/portfolios/test_applications.py +++ b/tests/routes/portfolios/test_applications.py @@ -39,54 +39,6 @@ def test_user_without_permission_has_no_budget_report_link(client, user_session) ) -@pytest.mark.skip(reason="Temporarily no add activity log link") -def test_user_with_permission_has_activity_log_link(client, user_session): - portfolio = PortfolioFactory.create() - ccpo = UserFactory.create_ccpo() - admin = UserFactory.create() - PortfolioRoleFactory.create( - portfolio=portfolio, user=admin, status=PortfolioRoleStatus.ACTIVE - ) - - user_session(portfolio.owner) - response = client.get("/portfolios/{}/applications".format(portfolio.id)) - assert ( - 'href="/portfolios/{}/activity"'.format(portfolio.id).encode() in response.data - ) - - # logs out previous user before creating a new session - user_session(admin) - response = client.get("/portfolios/{}/applications".format(portfolio.id)) - assert ( - 'href="/portfolios/{}/activity"'.format(portfolio.id).encode() in response.data - ) - - user_session(ccpo) - response = client.get("/portfolios/{}/applications".format(portfolio.id)) - assert ( - 'href="/portfolios/{}/activity"'.format(portfolio.id).encode() in response.data - ) - - -@pytest.mark.skip(reason="Temporarily no add activity log link") -def test_user_without_permission_has_no_activity_log_link(client, user_session): - portfolio = PortfolioFactory.create() - developer = UserFactory.create() - PortfolioRoleFactory.create( - portfolio=portfolio, - user=developer, - role=Roles.get("developer"), - status=PortfolioRoleStatus.ACTIVE, - ) - - user_session(developer) - response = client.get("/portfolios/{}/applications".format(portfolio.id)) - assert ( - 'href="/portfolios/{}/activity"'.format(portfolio.id).encode() - not in response.data - ) - - def test_user_with_permission_has_add_application_link(client, user_session): portfolio = PortfolioFactory.create() user_session(portfolio.owner) @@ -130,7 +82,6 @@ def test_creating_application(client, user_session): def test_view_edit_application(client, user_session): portfolio = PortfolioFactory.create() application = Applications.create( - portfolio.owner, portfolio, "Snazzy Application", "A new application for me and my friends", diff --git a/tests/routes/portfolios/test_invitations.py b/tests/routes/portfolios/test_invitations.py index ce696b0c..a4352808 100644 --- a/tests/routes/portfolios/test_invitations.py +++ b/tests/routes/portfolios/test_invitations.py @@ -1,3 +1,4 @@ +import pytest import datetime from flask import url_for diff --git a/tests/routes/portfolios/test_members.py b/tests/routes/portfolios/test_members.py index 5d149f6f..b4edbb23 100644 --- a/tests/routes/portfolios/test_members.py +++ b/tests/routes/portfolios/test_members.py @@ -168,7 +168,6 @@ def test_update_member_environment_role(client, user_session): user = UserFactory.create() member = PortfolioRoles.add(user, portfolio.id) application = Applications.create( - portfolio.owner, portfolio, "Snazzy Application", "A new application for me and my friends", @@ -202,7 +201,6 @@ def test_update_member_environment_role_with_no_data(client, user_session): user = UserFactory.create() member = PortfolioRoles.add(user, portfolio.id) application = Applications.create( - portfolio.owner, portfolio, "Snazzy Application", "A new application for me and my friends", @@ -231,7 +229,6 @@ def test_revoke_active_member_access(client, user_session): portfolio=portfolio, user=user, status=PortfolioRoleStatus.ACTIVE ) Applications.create( - portfolio.owner, portfolio, "Snazzy Application", "A new application for me and my friends", diff --git a/tests/routes/portfolios/test_task_orders.py b/tests/routes/portfolios/test_task_orders.py index 158adf10..3948eaec 100644 --- a/tests/routes/portfolios/test_task_orders.py +++ b/tests/routes/portfolios/test_task_orders.py @@ -157,7 +157,7 @@ class TestTaskOrderInvitations: "security_officer-last_name": "Fett", }, ) - updated_task_order = TaskOrders.get(self.portfolio.owner, self.task_order.id) + updated_task_order = TaskOrders.get(self.task_order.id) assert updated_task_order.ko_first_name == "Luke" assert updated_task_order.ko_last_name == "Skywalker" assert updated_task_order.so_first_name == "Boba" @@ -189,7 +189,7 @@ class TestTaskOrderInvitations: "contracting_officer-invite": "y", }, ) - updated_task_order = TaskOrders.get(self.portfolio.owner, self.task_order.id) + updated_task_order = TaskOrders.get(self.task_order.id) assert updated_task_order.ko_invite == True assert updated_task_order.ko_first_name == "Luke" @@ -222,7 +222,7 @@ class TestTaskOrderInvitations: assert "There were some errors" in response.data.decode() - updated_task_order = TaskOrders.get(self.portfolio.owner, self.task_order.id) + updated_task_order = TaskOrders.get(self.task_order.id) assert updated_task_order.so_first_name != "Boba" assert len(queue.get_queue()) == queue_length assert response.status_code == 400 @@ -251,7 +251,7 @@ def test_ko_can_view_task_order(client, user_session, portfolio, user): assert response.status_code == 200 assert translate("common.manage") in response.data.decode() - TaskOrders.update(user, task_order, clin_01=None) + TaskOrders.update(task_order, clin_01=None) response = client.get( url_for( "portfolios.view_task_order", @@ -371,27 +371,6 @@ def test_mo_redirected_to_build_page(client, user_session, portfolio): assert response.status_code == 200 -def test_cor_redirected_to_build_page(client, user_session, portfolio): - cor = UserFactory.create() - PortfolioRoleFactory.create( - portfolio=portfolio, - user=cor, - status=PortfolioStatus.ACTIVE, - permission_sets=[ - PermissionSets.get(PermissionSets.VIEW_PORTFOLIO), - PermissionSets.get(PermissionSets.VIEW_PORTFOLIO_FUNDING), - ], - ) - task_order = TaskOrderFactory.create( - portfolio=portfolio, contracting_officer_representative=cor - ) - user_session(cor) - response = client.get( - url_for("task_orders.new", screen=1, task_order_id=task_order.id) - ) - assert response.status_code == 200 - - def test_submit_completed_ko_review_page_as_cor( client, user_session, pdf_upload, portfolio, user ): @@ -620,47 +599,6 @@ def test_resend_invite_when_officer_type_missing( assert len(queue.get_queue()) == queue_length -def test_resend_invite_when_ko(app, client, user_session, portfolio, user): - queue_length = len(queue.get_queue()) - - task_order = TaskOrderFactory.create( - portfolio=portfolio, contracting_officer=user, ko_invite=True - ) - - portfolio_role = PortfolioRoleFactory.create( - portfolio=portfolio, user=user, status=PortfolioStatus.ACTIVE - ) - - original_invitation = Invitations.create( - inviter=user, portfolio_role=portfolio_role, email=user.email - ) - - user_session(user) - - response = client.post( - url_for( - "portfolios.resend_invite", - portfolio_id=portfolio.id, - task_order_id=task_order.id, - invite_type="ko_invite", - _external=True, - ) - ) - - assert original_invitation.status == InvitationStatus.REVOKED - assert response.status_code == 302 - assert ( - url_for( - "portfolios.task_order_invitations", - portfolio_id=portfolio.id, - task_order_id=task_order.id, - _external=True, - ) - == response.headers["Location"] - ) - assert len(queue.get_queue()) == queue_length + 1 - - def test_resend_invite_when_not_pending(app, client, user_session, portfolio, user): queue_length = len(queue.get_queue()) @@ -726,20 +664,21 @@ def test_resending_revoked_invite(app, client, user_session, portfolio, user): assert response.status_code == 404 -def test_resending_expired_invite(app, client, user_session, portfolio, user): +def test_resending_expired_invite(app, client, user_session, portfolio): queue_length = len(queue.get_queue()) + ko = UserFactory.create() task_order = TaskOrderFactory.create( - portfolio=portfolio, contracting_officer=user, ko_invite=True + portfolio=portfolio, contracting_officer=ko, ko_invite=True ) - portfolio_role = PortfolioRoleFactory.create(portfolio=portfolio, user=user) + portfolio_role = PortfolioRoleFactory.create(portfolio=portfolio, user=ko) invite = InvitationFactory.create( - inviter=user, + inviter=portfolio.owner, portfolio_role=portfolio_role, - email=user.email, + email=ko.email, expiration_time=datetime.now() - timedelta(days=1), ) - user_session(user) + user_session(portfolio.owner) response = client.post( url_for( diff --git a/tests/routes/task_orders/test_index.py b/tests/routes/task_orders/test_index.py index 784e4488..0b9800e3 100644 --- a/tests/routes/task_orders/test_index.py +++ b/tests/routes/task_orders/test_index.py @@ -1,3 +1,4 @@ +import pytest from flask import url_for from io import BytesIO import re diff --git a/tests/routes/task_orders/test_new_task_order.py b/tests/routes/task_orders/test_new_task_order.py index f27e7ee1..28da7c8c 100644 --- a/tests/routes/task_orders/test_new_task_order.py +++ b/tests/routes/task_orders/test_new_task_order.py @@ -2,11 +2,17 @@ import pytest from flask import url_for from atst.domain.task_orders import TaskOrders +from atst.domain.permission_sets import PermissionSets from atst.models.attachment import Attachment from atst.routes.task_orders.new import ShowTaskOrderWorkflow, UpdateTaskOrderWorkflow from atst.utils.localization import translate -from tests.factories import UserFactory, TaskOrderFactory, PortfolioFactory +from tests.factories import ( + UserFactory, + TaskOrderFactory, + PortfolioFactory, + PortfolioRoleFactory, +) class TestShowTaskOrderWorkflow: @@ -93,8 +99,6 @@ def test_to_on_pf_cannot_edit_pf_attributes(): assert second_workflow.pf_attributes_read_only -# TODO: this test will need to be more complicated when we add validation to -# the forms def test_create_new_task_order(client, user_session, pdf_upload): creator = UserFactory.create() user_session(creator) @@ -114,7 +118,7 @@ def test_create_new_task_order(client, user_session, pdf_upload): assert url_for("task_orders.new", screen=2) in response.headers["Location"] created_task_order_id = response.headers["Location"].split("/")[-1] - created_task_order = TaskOrders.get(creator, created_task_order_id) + created_task_order = TaskOrders.get(created_task_order_id) assert created_task_order.portfolio is not None assert created_task_order.portfolio.name == portfolio_name assert created_task_order.portfolio.defense_component == defense_component @@ -152,7 +156,7 @@ def test_create_new_task_order_for_portfolio(client, user_session): assert url_for("task_orders.new", screen=2) in response.headers["Location"] created_task_order_id = response.headers["Location"].split("/")[-1] - created_task_order = TaskOrders.get(creator, created_task_order_id) + created_task_order = TaskOrders.get(created_task_order_id) assert created_task_order.portfolio_name == portfolio.name assert created_task_order.defense_component == portfolio.defense_component assert created_task_order.portfolio == portfolio @@ -209,7 +213,7 @@ def test_review_screen_when_all_sections_complete(client, user_session, task_ord def test_review_screen_when_not_all_sections_complete(client, user_session, task_order): - TaskOrders.update(task_order.creator, task_order, clin_01=None) + TaskOrders.update(task_order, clin_01=None) user_session(task_order.creator) response = client.get( url_for("task_orders.new", screen=4, task_order_id=task_order.id) @@ -296,6 +300,11 @@ def test_update_task_order_with_existing_task_order(task_order): def test_update_to_redirects_to_ko_review(client, user_session, task_order): ko = UserFactory.create() task_order.contracting_officer = ko + PortfolioRoleFactory.create( + user=ko, + portfolio=task_order.portfolio, + permission_sets=[PermissionSets.get(PermissionSets.EDIT_PORTFOLIO_FUNDING)], + ) user_session(ko) url = url_for( "portfolios.ko_review", diff --git a/tests/routes/task_orders/test_sign.py b/tests/routes/task_orders/test_sign.py index a7717ab0..d66cd3a7 100644 --- a/tests/routes/task_orders/test_sign.py +++ b/tests/routes/task_orders/test_sign.py @@ -18,10 +18,7 @@ def create_ko_task_order(user_session, contracting_officer): ) TaskOrders.add_officer( - contracting_officer, - task_order, - "contracting_officer", - contracting_officer.to_dictionary(), + task_order, "contracting_officer", contracting_officer.to_dictionary() ) dd_254 = DD254Factory.create() @@ -33,7 +30,7 @@ def create_ko_task_order(user_session, contracting_officer): def test_show_signature_requested_not_ko(client, user_session): contracting_officer = UserFactory.create() task_order = create_ko_task_order(user_session, contracting_officer) - TaskOrders.update(contracting_officer, task_order, contracting_officer=None) + TaskOrders.update(task_order, contracting_officer=None) response = client.get( url_for("task_orders.signature_requested", task_order_id=task_order.id) @@ -50,10 +47,7 @@ def test_show_signature_requested(client, user_session): # create unfinished TO task_order = TaskOrderFactory.create(portfolio=portfolio, clin_01=None) TaskOrders.add_officer( - contracting_officer, - task_order, - "contracting_officer", - contracting_officer.to_dictionary(), + task_order, "contracting_officer", contracting_officer.to_dictionary() ) response = client.get( url_for("task_orders.signature_requested", task_order_id=task_order.id) @@ -61,7 +55,7 @@ def test_show_signature_requested(client, user_session): assert response.status_code == 404 # Finish TO - TaskOrders.update(contracting_officer, task_order, clin_01=100) + TaskOrders.update(task_order, clin_01=100) response = client.get( url_for("task_orders.signature_requested", task_order_id=task_order.id) ) @@ -79,9 +73,7 @@ def test_show_signature_requested(client, user_session): def test_show_signature_requested_already_signed(client, user_session): contracting_officer = UserFactory.create() task_order = create_ko_task_order(user_session, contracting_officer) - TaskOrders.update( - contracting_officer, task_order, signer_dod_id=contracting_officer.dod_id - ) + TaskOrders.update(task_order, signer_dod_id=contracting_officer.dod_id) response = client.get( url_for("task_orders.signature_requested", task_order_id=task_order.id) @@ -93,7 +85,7 @@ def test_show_signature_requested_already_signed(client, user_session): def test_signing_task_order_not_ko(client, user_session): contracting_officer = UserFactory.create() task_order = create_ko_task_order(user_session, contracting_officer) - TaskOrders.update(contracting_officer, task_order, contracting_officer=None) + TaskOrders.update(task_order, contracting_officer=None) response = client.post( url_for("task_orders.record_signature", task_order_id=task_order.id), data={} @@ -105,9 +97,7 @@ def test_signing_task_order_not_ko(client, user_session): def test_singing_an_already_signed_task_order(client, user_session): contracting_officer = UserFactory.create() task_order = create_ko_task_order(user_session, contracting_officer) - TaskOrders.update( - contracting_officer, task_order, signer_dod_id=contracting_officer.dod_id - ) + TaskOrders.update(task_order, signer_dod_id=contracting_officer.dod_id) response = client.post( url_for("task_orders.record_signature", task_order_id=task_order.id), diff --git a/tests/routes/test_auth.py b/tests/routes/test_auth.py index fd3d5c45..f4852284 100644 --- a/tests/routes/test_auth.py +++ b/tests/routes/test_auth.py @@ -4,7 +4,7 @@ from urllib.parse import quote from tests.factories import UserFactory -PROTECTED_URL = "/portfolios" +PROTECTED_URL = "/task_orders/new/get_started" def test_request_page_with_complete_profile(client, user_session): diff --git a/tests/routes/test_authz.py b/tests/routes/test_authz.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_access.py b/tests/test_access.py new file mode 100644 index 00000000..d86ed2ea --- /dev/null +++ b/tests/test_access.py @@ -0,0 +1,789 @@ +import pytest + +from flask import url_for, Response + +import atst +from atst.app import make_app, make_config +from atst.domain.auth import UNPROTECTED_ROUTES as _NO_LOGIN_REQUIRED +import atst.domain.authz as authz +from atst.domain.permission_sets import PermissionSets +from atst.models.portfolio_role import Status as PortfolioRoleStatus + +from tests.factories import ( + AttachmentFactory, + InvitationFactory, + PortfolioFactory, + PortfolioRoleFactory, + TaskOrderFactory, + UserFactory, +) + +_NO_ACCESS_CHECK_REQUIRED = _NO_LOGIN_REQUIRED + [ + "task_orders.get_started", # all users can start a new TO + "atst.csp_environment_access", # internal redirect + "atst.jedi_csp_calculator", # internal redirect + "atst.styleguide", # dev reference + "dev.test_email", # dev tool + "dev.messages", # dev tool + "atst.home", # available to all users + "users.user", # available to all users + "users.update_user", # available to all users + "portfolios.accept_invitation", # available to all users; access control is built into invitation logic + "atst.catch_all", # available to all users + "portfolios.portfolios", # the portfolios list is scoped to the user separately +] + + +def protected_routes(app): + _protected_routes = [] + + for rule in app.url_map.iter_rules(): + args = [1] * len(rule.arguments) + mock_args = dict(zip(rule.arguments, args)) + _n, route = rule.build(mock_args) + if rule.endpoint in _NO_ACCESS_CHECK_REQUIRED or "/static" in route: + continue + + _protected_routes.append((rule, route)) + + return _protected_routes + + +sample_config = make_config({"CRL_STORAGE_PROVIDER": "LOCAL"}) +sample_app = make_app(sample_config) +_PROTECTED_ROUTES = protected_routes(sample_app) + + +class Null: + """ + Very simple null object. Will return itself for all attribute + calls: + > foo = Null() + > foo.bar.baz == foo + """ + + def __init__(self, *args, **kwargs): + pass + + def __getattr__(self, name): + return self + + +@pytest.mark.access_check +@pytest.mark.parametrize("rule,route", _PROTECTED_ROUTES) +def test_all_protected_routes_have_access_control( + rule, route, mocker, client, user_session, monkeypatch +): + """ + This tests that all routes, except the ones in + _NO_ACCESS_CHECK_REQUIRED, are protected by the access + decorator. + """ + # monkeypatch any object lookups that might happen in the access decorator + monkeypatch.setattr("atst.domain.portfolios.Portfolios.for_user", lambda *a: []) + monkeypatch.setattr("atst.domain.portfolios.Portfolios.get", lambda *a: None) + monkeypatch.setattr("atst.domain.task_orders.TaskOrders.get", lambda *a: Null()) + + # patch the internal function the access decorator uses so that + # we can check that it was called + mocker.patch("atst.domain.authz.decorator.check_access") + + user = UserFactory.create() + user_session(user) + + method = "get" if "GET" in rule.methods else "post" + getattr(client, method)(route) + + assert ( + atst.domain.authz.decorator.check_access.call_count == 1 + ), "no access control for {}".format(rule.endpoint) + + +def user_with(*perm_sets_names): + return UserFactory.create(permission_sets=PermissionSets.get_many(perm_sets_names)) + + +@pytest.fixture +def get_url_assert_status(client, user_session): + def _get_url_assert_status(user, url, status): + user_session(user) + resp = client.get(url) + assert resp.status_code == status + + return _get_url_assert_status + + +@pytest.fixture +def post_url_assert_status(client, user_session): + def _get_url_assert_status(user, url, status): + user_session(user) + resp = client.post(url) + assert resp.status_code == status + + return _get_url_assert_status + + +# atst.activity_history +def test_atst_activity_history_access(get_url_assert_status): + ccpo = user_with(PermissionSets.VIEW_AUDIT_LOG) + rando = user_with() + + url = url_for("atst.activity_history") + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.access_environment +def test_portfolios_access_environment_access(get_url_assert_status): + dev = UserFactory.create() + rando = UserFactory.create() + ccpo = UserFactory.create_ccpo() + + portfolio = PortfolioFactory.create( + owner=dev, + applications=[ + { + "name": "Mos Eisley", + "description": "Where Han shot first", + "environments": [ + { + "name": "thebar", + "members": [{"user": dev, "role_name": "devops"}], + } + ], + } + ], + ) + env = portfolio.applications[0].environments[0] + + url = url_for( + "portfolios.access_environment", + portfolio_id=portfolio.id, + environment_id=env.id, + ) + get_url_assert_status(dev, url, 302) + get_url_assert_status(rando, url, 404) + get_url_assert_status(ccpo, url, 404) + + +# portfolios.application_members +def test_portfolios_application_members_access(get_url_assert_status): + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_APPLICATION_MANAGEMENT) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create( + owner=owner, + applications=[{"name": "Mos Eisley", "description": "Where Han shot first"}], + ) + app = portfolio.applications[0] + + url = url_for( + "portfolios.application_members", + portfolio_id=portfolio.id, + application_id=app.id, + ) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(owner, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.create_application +def test_portfolios_create_application_access(post_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_APPLICATION_MANAGEMENT) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + + url = url_for("portfolios.create_application", portfolio_id=portfolio.id) + post_url_assert_status(ccpo, url, 200) + post_url_assert_status(owner, url, 200) + post_url_assert_status(rando, url, 404) + + +# portfolios.create_member +def test_portfolios_create_member_access(post_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + + url = url_for("portfolios.create_member", portfolio_id=portfolio.id) + post_url_assert_status(ccpo, url, 200) + post_url_assert_status(owner, url, 200) + post_url_assert_status(rando, url, 404) + + +# portfolios.edit_application +def test_portfolios_edit_application_access(get_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_APPLICATION_MANAGEMENT) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create( + owner=owner, + applications=[{"name": "Mos Eisley", "description": "Where Han shot first"}], + ) + app = portfolio.applications[0] + + url = url_for( + "portfolios.edit_application", portfolio_id=portfolio.id, application_id=app.id + ) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(owner, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.edit_portfolio +def test_portfolios_edit_portfolio_access(post_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + + url = url_for("portfolios.edit_portfolio", portfolio_id=portfolio.id) + post_url_assert_status(ccpo, url, 200) + post_url_assert_status(owner, url, 200) + post_url_assert_status(rando, url, 404) + + +# portfolios.edit_task_order_invitations +def test_portfolios_edit_task_order_invitations_access(post_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_FUNDING) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio) + + url = url_for( + "portfolios.edit_task_order_invitations", + portfolio_id=portfolio.id, + task_order_id=task_order.id, + ) + post_url_assert_status(ccpo, url, 302) + post_url_assert_status(owner, url, 302) + post_url_assert_status(rando, url, 404) + + +# portfolios.ko_review +def test_portfolios_ko_review_access(get_url_assert_status): + ccpo = UserFactory.create_ccpo() + owner = user_with() + cor = user_with() + ko = user_with() + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create( + portfolio=portfolio, + contracting_officer=ko, + contracting_officer_representative=cor, + ) + + url = url_for( + "portfolios.ko_review", portfolio_id=portfolio.id, task_order_id=task_order.id + ) + get_url_assert_status(ccpo, url, 404) + get_url_assert_status(owner, url, 404) + get_url_assert_status(ko, url, 200) + get_url_assert_status(cor, url, 200) + + +# portfolios.new_application +def test_portfolios_new_application_access(get_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_APPLICATION_MANAGEMENT) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + + url = url_for("portfolios.new_application", portfolio_id=portfolio.id) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(owner, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.new_member +def test_portfolios_new_member_access(get_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + + url = url_for("portfolios.new_member", portfolio_id=portfolio.id) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(owner, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.portfolio_admin +def test_portfolios_portfolio_admin_access(get_url_assert_status): + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_ADMIN) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + + url = url_for("portfolios.portfolio_admin", portfolio_id=portfolio.id) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(owner, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.portfolio_applications +def test_portfolios_portfolio_applications_access(get_url_assert_status): + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_APPLICATION_MANAGEMENT) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + + url = url_for("portfolios.portfolio_applications", portfolio_id=portfolio.id) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(owner, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.portfolio_funding +def test_portfolios_portfolio_funding_access(get_url_assert_status): + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_FUNDING) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + + url = url_for("portfolios.portfolio_funding", portfolio_id=portfolio.id) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(owner, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.portfolio_members +def test_portfolios_portfolio_members_access(get_url_assert_status): + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_ADMIN) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + + url = url_for("portfolios.portfolio_members", portfolio_id=portfolio.id) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(owner, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.portfolio_reports +def test_portfolios_portfolio_reports_access(get_url_assert_status): + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_REPORTS) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + + url = url_for("portfolios.portfolio_reports", portfolio_id=portfolio.id) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(owner, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.resend_invitation +def test_portfolios_resend_invitation_access(post_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN) + owner = user_with() + rando = user_with() + invitee = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + prr = PortfolioRoleFactory.create(user=invitee, portfolio=portfolio) + invite = InvitationFactory.create(user=UserFactory.create(), portfolio_role=prr) + + url = url_for( + "portfolios.resend_invitation", portfolio_id=portfolio.id, token=invite.token + ) + post_url_assert_status(ccpo, url, 302) + post_url_assert_status(owner, url, 302) + post_url_assert_status(invitee, url, 404) + post_url_assert_status(rando, url, 404) + + +# portfolios.resend_invite +def test_portfolios_resend_invite_access(post_url_assert_status): + ccpo = UserFactory.create_ccpo() + owner = user_with() + rando = user_with() + ko = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio, contracting_officer=ko) + prr = PortfolioRoleFactory.create(user=ko, portfolio=portfolio) + invite = InvitationFactory.create(user=UserFactory.create(), portfolio_role=prr) + + url = url_for( + "portfolios.resend_invite", + portfolio_id=portfolio.id, + task_order_id=task_order.id, + invite_type="ko_invite", + ) + post_url_assert_status(ccpo, url, 302) + post_url_assert_status(owner, url, 302) + post_url_assert_status(ko, url, 404) + post_url_assert_status(rando, url, 404) + + +# portfolios.revoke_access +def test_portfolios_revoke_access_access(post_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN) + owner = user_with() + rando = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + + for user, status in [(ccpo, 302), (owner, 302), (rando, 404)]: + prt_member = user_with() + prr = PortfolioRoleFactory.create( + user=prt_member, portfolio=portfolio, status=PortfolioRoleStatus.ACTIVE + ) + url = url_for( + "portfolios.revoke_access", portfolio_id=portfolio.id, member_id=prr.id + ) + post_url_assert_status(user, url, status) + + +# portfolios.revoke_invitation +def test_portfolios_revoke_invitation_access(post_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN) + owner = user_with() + rando = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + + for user, status in [(ccpo, 302), (owner, 302), (rando, 404)]: + prt_member = user_with() + prr = PortfolioRoleFactory.create( + user=prt_member, portfolio=portfolio, status=PortfolioRoleStatus.ACTIVE + ) + invite = InvitationFactory.create(user=prt_member, portfolio_role=prr) + url = url_for( + "portfolios.revoke_invitation", + portfolio_id=portfolio.id, + token=invite.token, + ) + post_url_assert_status(user, url, status) + + +# portfolios.show_portfolio +def test_portfolios_show_portfolio_access(get_url_assert_status): + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + + url = url_for("portfolios.show_portfolio", portfolio_id=portfolio.id) + get_url_assert_status(ccpo, url, 302) + get_url_assert_status(owner, url, 302) + get_url_assert_status(rando, url, 404) + + +# portfolios.so_review +def test_portfolios_so_review_access(get_url_assert_status): + ccpo = UserFactory.create_ccpo() + owner = user_with() + rando = user_with() + so = user_with() + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio, security_officer=so) + + url = url_for( + "portfolios.so_review", portfolio_id=portfolio.id, task_order_id=task_order.id + ) + get_url_assert_status(so, url, 200) + get_url_assert_status(ccpo, url, 404) + get_url_assert_status(owner, url, 404) + get_url_assert_status(rando, url, 404) + + +# portfolios.submit_ko_review +def test_portfolios_submit_ko_review_access(post_url_assert_status): + ccpo = UserFactory.create_ccpo() + owner = user_with() + cor = user_with() + ko = user_with() + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create( + portfolio=portfolio, + contracting_officer=ko, + contracting_officer_representative=cor, + ) + + url = url_for( + "portfolios.submit_ko_review", + portfolio_id=portfolio.id, + task_order_id=task_order.id, + ) + post_url_assert_status(ccpo, url, 404) + post_url_assert_status(owner, url, 404) + post_url_assert_status(ko, url, 200) + post_url_assert_status(cor, url, 200) + + +# portfolios.submit_so_review +def test_portfolios_submit_so_review_access(post_url_assert_status): + ccpo = UserFactory.create_ccpo() + owner = user_with() + rando = user_with() + so = user_with() + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio, security_officer=so) + + url = url_for( + "portfolios.submit_so_review", + portfolio_id=portfolio.id, + task_order_id=task_order.id, + ) + post_url_assert_status(so, url, 200) + post_url_assert_status(ccpo, url, 404) + post_url_assert_status(owner, url, 404) + post_url_assert_status(rando, url, 404) + + +# portfolios.task_order_invitations +def test_portfolios_task_order_invitations_access(get_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_FUNDING) + owner = user_with() + rando = user_with() + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio) + + url = url_for( + "portfolios.task_order_invitations", + portfolio_id=portfolio.id, + task_order_id=task_order.id, + ) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(owner, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.update_application +def test_portfolios_update_application_access(post_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_APPLICATION_MANAGEMENT) + dev = UserFactory.create() + rando = UserFactory.create() + + portfolio = PortfolioFactory.create( + owner=dev, + applications=[{"name": "Mos Eisley", "description": "Where Han shot first"}], + ) + app = portfolio.applications[0] + + url = url_for( + "portfolios.update_application", + portfolio_id=portfolio.id, + application_id=app.id, + ) + post_url_assert_status(dev, url, 200) + post_url_assert_status(ccpo, url, 200) + post_url_assert_status(rando, url, 404) + + +# portfolios.update_member +def test_portfolios_update_member_access(post_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN) + owner = user_with() + rando = user_with() + prt_member = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + prr = PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio) + + url = url_for( + "portfolios.update_member", portfolio_id=portfolio.id, member_id=prt_member.id + ) + post_url_assert_status(owner, url, 200) + post_url_assert_status(ccpo, url, 200) + post_url_assert_status(rando, url, 404) + + +# portfolios.view_member +def test_portfolios_view_member_access(get_url_assert_status): + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_ADMIN) + owner = user_with() + rando = user_with() + prt_member = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + prr = PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio) + + url = url_for( + "portfolios.view_member", portfolio_id=portfolio.id, member_id=prt_member.id + ) + get_url_assert_status(owner, url, 200) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(rando, url, 404) + + +# portfolios.view_task_order +def test_portfolios_view_task_order_access(get_url_assert_status): + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_FUNDING) + owner = user_with() + rando = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio) + + url = url_for( + "portfolios.view_task_order", + portfolio_id=portfolio.id, + task_order_id=task_order.id, + ) + get_url_assert_status(owner, url, 200) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(rando, url, 404) + + +# task_orders.download_csp_estimate +def test_task_orders_download_csp_estimate_access(get_url_assert_status, monkeypatch): + monkeypatch.setattr( + "atst.routes.task_orders.index.send_file", lambda a: Response("") + ) + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_FUNDING) + owner = user_with() + rando = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio) + + url = url_for("task_orders.download_csp_estimate", task_order_id=task_order.id) + get_url_assert_status(owner, url, 200) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(rando, url, 404) + + +# task_orders.download_summary +def test_task_orders_download_summary_access(get_url_assert_status): + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_FUNDING) + owner = user_with() + rando = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio) + + url = url_for("task_orders.download_summary", task_order_id=task_order.id) + get_url_assert_status(owner, url, 200) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(rando, url, 404) + + +# task_orders.download_task_order_pdf +def test_task_orders_download_task_order_pdf_access(get_url_assert_status, monkeypatch): + monkeypatch.setattr( + "atst.routes.task_orders.index.send_file", lambda a: Response("") + ) + ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_FUNDING) + owner = user_with() + rando = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create( + portfolio=portfolio, pdf=AttachmentFactory.create() + ) + + url = url_for("task_orders.download_task_order_pdf", task_order_id=task_order.id) + get_url_assert_status(owner, url, 200) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(rando, url, 404) + + +# task_orders.invite +def test_task_orders_invite_access(post_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_FUNDING) + owner = user_with() + rando = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio) + + url = url_for("task_orders.invite", task_order_id=task_order.id) + post_url_assert_status(owner, url, 302) + post_url_assert_status(ccpo, url, 302) + post_url_assert_status(rando, url, 404) + + +# task_orders.new +def test_task_orders_new_access(get_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_FUNDING) + owner = user_with() + rando = user_with() + + url = url_for("task_orders.new", screen=1) + get_url_assert_status(owner, url, 200) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(rando, url, 200) + + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio) + + url = url_for("task_orders.new", screen=2, task_order_id=task_order.id) + get_url_assert_status(owner, url, 200) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(rando, url, 404) + + url = url_for("task_orders.new", screen=1, portfolio_id=portfolio.id) + get_url_assert_status(owner, url, 200) + get_url_assert_status(ccpo, url, 200) + get_url_assert_status(rando, url, 404) + + +# task_orders.record_signature +def test_task_orders_record_signature_access(post_url_assert_status, monkeypatch): + ccpo = UserFactory.create_ccpo() + owner = user_with() + rando = user_with() + ko = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio, contracting_officer=ko) + monkeypatch.setattr( + "atst.routes.task_orders.signing.find_unsigned_ko_to", lambda *a: task_order + ) + + url = url_for("task_orders.record_signature", task_order_id=task_order.id) + post_url_assert_status(ko, url, 400) + post_url_assert_status(owner, url, 404) + post_url_assert_status(ccpo, url, 404) + post_url_assert_status(rando, url, 404) + + +# task_orders.signature_requested +def test_task_orders_signature_requested_access(get_url_assert_status, monkeypatch): + ccpo = UserFactory.create_ccpo() + owner = user_with() + rando = user_with() + ko = user_with() + + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio, contracting_officer=ko) + monkeypatch.setattr( + "atst.routes.task_orders.signing.find_unsigned_ko_to", lambda *a: task_order + ) + + url = url_for("task_orders.record_signature", task_order_id=task_order.id) + get_url_assert_status(ko, url, 200) + get_url_assert_status(owner, url, 404) + get_url_assert_status(ccpo, url, 404) + get_url_assert_status(rando, url, 404) + + +# task_orders.update +def test_task_orders_update_access(post_url_assert_status): + ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_FUNDING) + owner = user_with() + rando = user_with() + + url = url_for("task_orders.update", screen=1) + post_url_assert_status(owner, url, 200) + post_url_assert_status(ccpo, url, 200) + post_url_assert_status(rando, url, 200) + + portfolio = PortfolioFactory.create(owner=owner) + task_order = TaskOrderFactory.create(portfolio=portfolio) + + url = url_for("task_orders.update", screen=2, task_order_id=task_order.id) + post_url_assert_status(owner, url, 302) + post_url_assert_status(ccpo, url, 302) + post_url_assert_status(rando, url, 404) + + url = url_for("task_orders.update", screen=1, portfolio_id=portfolio.id) + post_url_assert_status(owner, url, 302) + post_url_assert_status(ccpo, url, 302) + post_url_assert_status(rando, url, 404) diff --git a/tests/utils.py b/tests/utils.py index c590136e..0615aec7 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -14,3 +14,17 @@ def captured_templates(app): yield recorded finally: template_rendered.disconnect(record, app) + + +class FakeLogger: + def __init__(self): + self.messages = [] + + def info(self, msg): + self.messages.append(msg) + + def warning(self, msg): + self.messages.append(msg) + + def error(self, msg): + self.messages.append(msg)