remove access checks from domain methods

This commit is contained in:
dandds 2019-03-19 10:47:53 -04:00
parent 1974c89e9c
commit 0ea21fbb9b
18 changed files with 149 additions and 151 deletions

View File

@ -1,5 +1,4 @@
from atst.database import db
from atst.domain.authz import Authorization
from atst.domain.environments import Environments
from atst.domain.exceptions import NotFoundError
from atst.models.permissions import Permissions
@ -23,14 +22,6 @@ class Applications(object):
@classmethod
def get(cls, user, portfolio, application_id):
# TODO: this should check permission for this particular application
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.VIEW_APPLICATION,
"view application in portfolio",
)
try:
application = (
db.session.query(Application).filter_by(id=application_id).one()
@ -53,13 +44,6 @@ class Applications(object):
@classmethod
def get_all(cls, user, portfolio_role, portfolio):
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.VIEW_APPLICATION,
"view application in portfolio",
)
try:
applications = (
db.session.query(Application).filter_by(portfolio_id=portfolio.id).all()

View File

@ -2,7 +2,6 @@ from sqlalchemy import or_
from atst.database import db
from atst.domain.common import Query
from atst.domain.authz import Authorization, Permissions
from atst.models.audit_event import AuditEvent
@ -36,20 +35,10 @@ class AuditLog(object):
@classmethod
def get_all_events(cls, user, pagination_opts=None):
# TODO: general audit log permissions
Authorization.check_atat_permission(
user, Permissions.VIEW_AUDIT_LOG, "view audit log"
)
return AuditEventQuery.get_all(pagination_opts)
@classmethod
def get_portfolio_events(cls, user, portfolio, pagination_opts=None):
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.VIEW_PORTFOLIO_ACTIVITY_LOG,
"view portfolio audit log",
)
return AuditEventQuery.get_ws_events(portfolio.id, pagination_opts)
@classmethod

View File

@ -18,10 +18,6 @@ class Authorization(object):
def has_atat_permission(cls, user, permission):
return permission in user.permissions
@classmethod
def is_in_portfolio(cls, user, portfolio):
return user in portfolio.users
@classmethod
def check_portfolio_permission(cls, user, portfolio, permission, message):
if not (
@ -30,14 +26,14 @@ class Authorization(object):
):
raise UnauthorizedError(user, message)
return True
@classmethod
def check_atat_permission(cls, user, permission, message):
if not Authorization.has_atat_permission(user, permission):
raise UnauthorizedError(user, message)
@classmethod
def can_view_audit_log(cls, user):
return Authorization.has_atat_permission(user, Permissions.VIEW_AUDIT_LOG)
return True
@classmethod
def is_ko(cls, user, task_order):
@ -72,23 +68,11 @@ class Authorization(object):
message = "review task order {}".format(task_order.id)
raise UnauthorizedError(user, message)
@classmethod
def check_task_order_permission(cls, user, task_order, permission, message):
if Authorization._check_is_task_order_officer(user, task_order):
return True
Authorization.check_portfolio_permission(
user, task_order.portfolio, permission, message
)
def user_can_access(user, permission, portfolio=None, message=None):
if portfolio:
Authorization.check_portfolio_permission(user, portfolio, permission, message)
else:
Authorization.check_atat_permission(user, permission, message)
@classmethod
def _check_is_task_order_officer(cls, user, task_order):
for officer in [
"contracting_officer",
"contracting_officer_representative",
"security_officer",
]:
if getattr(task_order, officer, None) == user:
return True
return False
return True

View File

@ -0,0 +1,33 @@
from functools import wraps
from flask import g
from . import user_can_access
from atst.domain.portfolios import Portfolios
def user_can_access_decorator(permission, message=None, exceptions=None):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
access_args = {"message": message}
if "portfolio_id" in kwargs:
access_args["portfolio"] = Portfolios.get(
g.current_user, kwargs["portfolio_id"]
)
if exceptions:
evaluated = [
exc(g.current_user, permission, **access_args) for exc in exceptions
]
if True in evaluated:
return True
user_can_access(g.current_user, permission, **access_args)
return f(*args, **kwargs)
return decorated_function
return decorator

View File

@ -5,8 +5,6 @@ from atst.database import db
from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole
from atst.models.application import Application
from atst.models.permissions import Permissions
from atst.domain.authz import Authorization
from atst.domain.environment_roles import EnvironmentRoles
from .exceptions import NotFoundError
@ -61,12 +59,6 @@ class Environments(object):
@classmethod
def update_environment_roles(cls, user, portfolio, portfolio_role, ids_and_roles):
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.EDIT_APPLICATION_MEMBER,
"assign environment roles",
)
updated = False
for id_and_role in ids_and_roles:
@ -101,10 +93,4 @@ class Environments(object):
@classmethod
def revoke_access(cls, user, environment, target_user):
Authorization.check_portfolio_permission(
user,
environment.portfolio,
Permissions.EDIT_APPLICATION_MEMBER,
"revoke environment access",
)
EnvironmentRoles.delete(environment.id, target_user.id)

View File

@ -4,7 +4,6 @@ from sqlalchemy.orm.exc import NoResultFound
from atst.database import db
from atst.models.invitation import Invitation, Status as InvitationStatus
from atst.domain.portfolio_roles import PortfolioRoles
from atst.domain.authz import Authorization, Permissions
from atst.domain.portfolios import Portfolios
from .exceptions import NotFoundError
@ -120,13 +119,6 @@ class Invitations(object):
@classmethod
def resend(cls, user, portfolio_id, token):
portfolio = Portfolios.get(user, portfolio_id)
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.CREATE_PORTFOLIO_USERS,
"resend a portfolio invitation",
)
previous_invitation = Invitations._get(token)
Invitations._update_status(previous_invitation, InvitationStatus.REVOKED)

View File

@ -33,51 +33,29 @@ class Portfolios(object):
@classmethod
def get(cls, user, portfolio_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user, portfolio, Permissions.VIEW_PORTFOLIO, "get portfolio"
)
return ScopedPortfolio(user, portfolio)
@classmethod
def get_for_update_applications(cls, user, portfolio_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user, portfolio, Permissions.CREATE_APPLICATION, "add application"
)
return portfolio
@classmethod
def get_for_update_information(cls, user, portfolio_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.EDIT_PORTFOLIO_NAME,
"update portfolio information",
)
return portfolio
@classmethod
def get_for_update_member(cls, user, portfolio_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user,
portfolio,
Permissions.EDIT_PORTFOLIO_USERS,
"update a portfolio member",
)
return portfolio
@classmethod
def get_with_members(cls, user, portfolio_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user, portfolio, Permissions.VIEW_PORTFOLIO_USERS, "view portfolio members"
)
return portfolio
@ -91,10 +69,6 @@ class Portfolios(object):
@classmethod
def create_member(cls, user, portfolio, data):
Authorization.check_portfolio_permission(
user, portfolio, Permissions.EDIT_PORTFOLIO_USERS, "create portfolio member"
)
new_user = Users.get_or_create_by_dod_id(
data["dod_id"],
first_name=data["first_name"],
@ -114,11 +88,6 @@ class Portfolios(object):
@classmethod
def update_member(cls, user, portfolio, member, permission_sets):
Authorization.check_portfolio_permission(
user, portfolio, Permissions.EDIT_PORTFOLIO_USERS, "edit portfolio member"
)
# need to update perms sets here
return PortfolioRoles.update(member, permission_sets)
@classmethod
@ -151,9 +120,6 @@ class Portfolios(object):
@classmethod
def revoke_access(cls, user, portfolio_id, portfolio_role_id):
portfolio = PortfoliosQuery.get(portfolio_id)
Authorization.check_portfolio_permission(
user, portfolio, Permissions.EDIT_PORTFOLIO_USERS, "revoke portfolio access"
)
portfolio_role = PortfolioRoles.get_by_id(portfolio_role_id)
if not Portfolios.can_revoke_access_for(portfolio, portfolio_role):

View File

@ -3,10 +3,8 @@ from flask import current_app as app
from atst.database import db
from atst.models.task_order import TaskOrder
from atst.models.permissions import Permissions
from atst.models.dd_254 import DD254
from atst.domain.portfolios import Portfolios
from atst.domain.authz import Authorization
from atst.domain.permission_sets import PermissionSets
from .exceptions import NotFoundError
@ -57,9 +55,6 @@ class TaskOrders(object):
def get(cls, user, task_order_id):
try:
task_order = db.session.query(TaskOrder).filter_by(id=task_order_id).one()
Authorization.check_task_order_permission(
user, task_order, Permissions.VIEW_TASK_ORDER_DETAILS, "view task order"
)
return task_order
except NoResultFound:
@ -67,9 +62,6 @@ class TaskOrders(object):
@classmethod
def create(cls, creator, portfolio):
Authorization.check_portfolio_permission(
creator, portfolio, Permissions.CREATE_TASK_ORDER, "add task order"
)
task_order = TaskOrder(portfolio=portfolio, creator=creator)
db.session.add(task_order)
@ -79,10 +71,6 @@ class TaskOrders(object):
@classmethod
def update(cls, user, task_order, **kwargs):
Authorization.check_task_order_permission(
user, task_order, Permissions.EDIT_TASK_ORDER_DETAILS, "update task order"
)
for key, value in kwargs.items():
setattr(task_order, key, value)
@ -148,13 +136,6 @@ class TaskOrders(object):
@classmethod
def add_officer(cls, user, task_order, officer_type, officer_data):
Authorization.check_portfolio_permission(
user,
task_order.portfolio,
Permissions.EDIT_TASK_ORDER_DETAILS,
"add task order officer",
)
if officer_type in TaskOrders.OFFICERS:
portfolio = task_order.portfolio

View File

@ -6,7 +6,6 @@ from . import portfolios_bp
from atst.domain.reports import Reports
from atst.domain.portfolios import Portfolios
from atst.domain.audit_log import AuditLog
from atst.domain.authz import Authorization
from atst.domain.common import Paginator
from atst.forms.portfolio import PortfolioForm
from atst.models.permissions import Permissions
@ -79,13 +78,6 @@ def show_portfolio(portfolio_id):
@portfolios_bp.route("/portfolios/<portfolio_id>/reports")
def portfolio_reports(portfolio_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
Authorization.check_portfolio_permission(
g.current_user,
portfolio,
Permissions.VIEW_PORTFOLIO_REPORTS,
"view portfolio reports",
)
today = date.today()
month = http_request.args.get("month", today.month)
year = http_request.args.get("year", today.year)

View File

@ -12,8 +12,6 @@ from atst.domain.environment_roles import EnvironmentRoles
from atst.services.invitation import Invitation as InvitationService
import atst.forms.portfolio_member as member_forms
from atst.forms.data import ENVIRONMENT_ROLES, ENV_ROLE_MODAL_DESCRIPTION
from atst.domain.authz import Authorization
from atst.models.permissions import Permissions
from atst.utils.flash import formatted_flash as flash
@ -101,12 +99,6 @@ def create_member(portfolio_id):
@portfolios_bp.route("/portfolios/<portfolio_id>/members/<member_id>/member_edit")
def view_member(portfolio_id, member_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
Authorization.check_portfolio_permission(
g.current_user,
portfolio,
Permissions.EDIT_PORTFOLIO_USERS,
"edit this portfolio user",
)
member = PortfolioRoles.get(portfolio_id, member_id)
applications = Applications.get_all(g.current_user, member, portfolio)
form = member_forms.EditForm(portfolio_role="admin")
@ -135,12 +127,6 @@ def view_member(portfolio_id, member_id):
)
def update_member(portfolio_id, member_id):
portfolio = Portfolios.get(g.current_user, portfolio_id)
Authorization.check_portfolio_permission(
g.current_user,
portfolio,
Permissions.EDIT_PORTFOLIO_USERS,
"edit this portfolio user",
)
member = PortfolioRoles.get(portfolio_id, member_id)
ids_and_roles = []

View File

@ -22,6 +22,7 @@ def developer():
return UserFactory.create()
@pytest.mark.auth
def test_non_admin_cannot_view_audit_log(developer):
with pytest.raises(UnauthorizedError):
AuditLog.get_all_events(developer)
@ -63,6 +64,7 @@ def test_ws_owner_can_view_ws_audit_log():
assert len(events) > 0
@pytest.mark.auth
def test_other_users_cannot_view_portfolio_audit_log():
with pytest.raises(UnauthorizedError):
portfolio = PortfolioFactory.create()

View File

@ -1,7 +1,13 @@
import pytest
from tests.factories import TaskOrderFactory, UserFactory, PortfolioRoleFactory
from atst.domain.authz import Authorization
from tests.factories import (
TaskOrderFactory,
UserFactory,
PortfolioFactory,
PortfolioRoleFactory,
)
from atst.domain.authz import Authorization, user_can_access
from atst.domain.authz.decorator import user_can_access_decorator
from atst.domain.permission_sets import PermissionSets
from atst.domain.exceptions import UnauthorizedError
from atst.models.permissions import Permissions
@ -58,3 +64,85 @@ def test_has_portfolio_permission():
assert not Authorization.has_portfolio_permission(
different_user, port_role.portfolio, Permissions.VIEW_PORTFOLIO_REPORTS
)
def test_user_can_access():
ccpo = UserFactory.create_ccpo()
edit_admin = UserFactory.create()
view_admin = UserFactory.create()
portfolio = PortfolioFactory.create(owner=edit_admin)
# factory gives view perms by default
PortfolioRoleFactory.create(user=view_admin, portfolio=portfolio)
# check a site-wide permission
assert user_can_access(ccpo, Permissions.VIEW_AUDIT_LOG)
with pytest.raises(UnauthorizedError):
user_can_access(edit_admin, Permissions.VIEW_AUDIT_LOG)
user_can_access(view_admin, Permissions.VIEW_AUDIT_LOG)
# check a portfolio view permission
assert user_can_access(ccpo, Permissions.VIEW_PORTFOLIO, portfolio=portfolio)
assert user_can_access(edit_admin, Permissions.VIEW_PORTFOLIO, portfolio=portfolio)
assert user_can_access(view_admin, Permissions.VIEW_PORTFOLIO, portfolio=portfolio)
# check a portfolio edit permission
assert user_can_access(ccpo, Permissions.EDIT_PORTFOLIO_NAME, portfolio=portfolio)
assert user_can_access(
edit_admin, Permissions.EDIT_PORTFOLIO_NAME, portfolio=portfolio
)
with pytest.raises(UnauthorizedError):
user_can_access(
view_admin, Permissions.EDIT_PORTFOLIO_NAME, portfolio=portfolio
)
@pytest.fixture
def set_current_user(request_ctx):
def _set_current_user(user):
request_ctx.g.current_user = user
yield _set_current_user
request_ctx.g.current_user = None
def test_user_can_access_decorator(set_current_user):
ccpo = UserFactory.create_ccpo()
edit_admin = UserFactory.create()
view_admin = UserFactory.create()
portfolio = PortfolioFactory.create(owner=edit_admin)
# factory gives view perms by default
PortfolioRoleFactory.create(user=view_admin, portfolio=portfolio)
@user_can_access_decorator(Permissions.EDIT_PORTFOLIO_NAME)
def _edit_portfolio_name(*args, **kwargs):
return True
set_current_user(ccpo)
assert _edit_portfolio_name(portfolio_id=portfolio.id)
set_current_user(edit_admin)
assert _edit_portfolio_name(portfolio_id=portfolio.id)
set_current_user(view_admin)
with pytest.raises(UnauthorizedError):
_edit_portfolio_name(portfolio_id=portfolio.id)
def test_user_can_access_decorator_exceptions(set_current_user):
rando_calrissian = UserFactory.create()
portfolio = PortfolioFactory.create()
wont_work = lambda *args, **kwargs: False
because_i_said_so = lambda *args, **kwargs: True
@user_can_access_decorator(
Permissions.EDIT_PORTFOLIO_NAME, exceptions=[wont_work, because_i_said_so]
)
def _edit_portfolio_name(*args, **kwargs):
return True
set_current_user(rando_calrissian)
assert _edit_portfolio_name(portfolio_id=portfolio.id)

View File

@ -46,16 +46,19 @@ def test_portfolio_has_timestamps(portfolio):
assert portfolio.time_created == portfolio.time_updated
@pytest.mark.auth
def test_portfolios_get_ensures_user_is_in_portfolio(portfolio, portfolio_owner):
outside_user = UserFactory.create()
with pytest.raises(UnauthorizedError):
Portfolios.get(outside_user, portfolio.id)
@pytest.mark.auth
def test_get_for_update_applications_allows_owner(portfolio, portfolio_owner):
Portfolios.get_for_update_applications(portfolio_owner, portfolio.id)
@pytest.mark.auth
def test_get_for_update_applications_blocks_developer(portfolio):
developer = UserFactory.create()
PortfolioRoles.add(developer, portfolio.id)
@ -94,6 +97,7 @@ def test_can_add_existing_user_to_portfolio(portfolio, portfolio_owner):
assert not new_member.user.provisional
@pytest.mark.auth
def test_need_permission_to_create_portfolio_role(portfolio, portfolio_owner):
random_user = UserFactory.create()
@ -127,6 +131,7 @@ def test_update_portfolio_role_role(portfolio, portfolio_owner):
assert updated_member.portfolio == portfolio
@pytest.mark.auth
def test_need_permission_to_update_portfolio_role_role(portfolio, portfolio_owner):
random_user = UserFactory.create()
user_data = {
@ -154,6 +159,7 @@ def test_ccpo_can_view_portfolio_members(portfolio, portfolio_owner):
assert Portfolios.get_with_members(ccpo, portfolio.id)
@pytest.mark.auth
def test_random_user_cannot_view_portfolio_members(portfolio):
developer = UserFactory.create()
@ -282,6 +288,7 @@ def test_for_user_returns_all_portfolios_for_ccpo(portfolio, portfolio_owner):
assert len(sams_portfolios) == 2
@pytest.mark.auth
def test_get_for_update_information(portfolio, portfolio_owner):
owner_ws = Portfolios.get_for_update_information(portfolio_owner, portfolio.id)
assert portfolio == owner_ws

View File

@ -95,6 +95,7 @@ def test_add_officer_who_is_already_portfolio_member():
assert member.user == owner
@pytest.mark.auth
def test_task_order_access():
creator = UserFactory.create()
member = UserFactory.create()

View File

@ -175,6 +175,7 @@ def test_user_with_permission_can_update_application(client, user_session):
assert application.description == "A very cool application."
@pytest.mark.auth
def test_user_without_permission_cannot_update_application(client, user_session):
dev = UserFactory.create()
owner = UserFactory.create()

View File

@ -1,3 +1,4 @@
import pytest
import datetime
from flask import url_for
@ -94,6 +95,7 @@ def test_member_accepts_invalid_invite(client, user_session):
assert response.status_code == 404
@pytest.mark.auth
def test_user_who_has_not_accepted_portfolio_invite_cannot_view(client, user_session):
user = UserFactory.create()
portfolio = PortfolioFactory.create()

View File

@ -60,6 +60,7 @@ def test_user_with_permission_has_add_member_link(client, user_session):
)
@pytest.mark.auth
def test_user_without_permission_has_no_add_member_link(client, user_session):
user = UserFactory.create()
portfolio = PortfolioFactory.create()
@ -72,6 +73,7 @@ def test_user_without_permission_has_no_add_member_link(client, user_session):
)
@pytest.mark.auth
def test_permissions_for_view_member(client, user_session):
user = UserFactory.create()
portfolio = PortfolioFactory.create()

View File

@ -1,3 +1,4 @@
import pytest
from flask import url_for
from io import BytesIO
import re
@ -62,6 +63,7 @@ class TestDownloadCSPEstimate:
)
assert response.status_code == 404
@pytest.mark.auth
def test_download_with_wrong_user(self, client, user_session):
other_user = UserFactory.create()
user_session(other_user)