Merge pull request #376 from dod-ccpo/auth-patterns
Standardize auth patterns
This commit is contained in:
commit
76854acf81
@ -19,6 +19,7 @@ from atst.routes.errors import make_error_pages
|
|||||||
from atst.domain.authnid.crl import CRLCache
|
from atst.domain.authnid.crl import CRLCache
|
||||||
from atst.domain.auth import apply_authentication
|
from atst.domain.auth import apply_authentication
|
||||||
from atst.domain.authz import Authorization
|
from atst.domain.authz import Authorization
|
||||||
|
from atst.models.permissions import Permissions
|
||||||
from atst.eda_client import MockEDAClient
|
from atst.eda_client import MockEDAClient
|
||||||
from atst.uploader import Uploader
|
from atst.uploader import Uploader
|
||||||
|
|
||||||
@ -72,6 +73,7 @@ def make_flask_callbacks(app):
|
|||||||
g.matchesPath = lambda href: re.match("^" + href, request.path)
|
g.matchesPath = lambda href: re.match("^" + href, request.path)
|
||||||
g.modal = request.args.get("modal", None)
|
g.modal = request.args.get("modal", None)
|
||||||
g.Authorization = Authorization
|
g.Authorization = Authorization
|
||||||
|
g.Permissions = Permissions
|
||||||
|
|
||||||
@app.after_request
|
@app.after_request
|
||||||
def _cleanup(response):
|
def _cleanup(response):
|
||||||
|
@ -17,28 +17,6 @@ class Authorization(object):
|
|||||||
def is_in_workspace(cls, user, workspace):
|
def is_in_workspace(cls, user, workspace):
|
||||||
return user in workspace.users
|
return user in workspace.users
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def can_view_request(cls, user, request):
|
|
||||||
if (
|
|
||||||
Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST
|
|
||||||
in user.atat_permissions
|
|
||||||
):
|
|
||||||
return True
|
|
||||||
elif request.creator == user:
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def check_can_approve_request(cls, user):
|
|
||||||
if (
|
|
||||||
Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST
|
|
||||||
in user.atat_permissions
|
|
||||||
):
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
raise UnauthorizedError(user, "cannot review and approve requests")
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def check_workspace_permission(cls, user, workspace, permission, message):
|
def check_workspace_permission(cls, user, workspace, permission, message):
|
||||||
if not Authorization.has_workspace_permission(user, workspace, permission):
|
if not Authorization.has_workspace_permission(user, workspace, permission):
|
||||||
|
29
atst/domain/requests/authorization.py
Normal file
29
atst/domain/requests/authorization.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
from atst.models.permissions import Permissions
|
||||||
|
from atst.domain.authz import Authorization
|
||||||
|
from atst.domain.exceptions import UnauthorizedError
|
||||||
|
|
||||||
|
|
||||||
|
class RequestsAuthorization(object):
|
||||||
|
def __init__(self, user, request):
|
||||||
|
self.user = user
|
||||||
|
self.request = request
|
||||||
|
|
||||||
|
@property
|
||||||
|
def can_view(self):
|
||||||
|
return (
|
||||||
|
Authorization.has_atat_permission(
|
||||||
|
self.user, Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST
|
||||||
|
)
|
||||||
|
or self.request.creator == self.user
|
||||||
|
)
|
||||||
|
|
||||||
|
def check_can_view(self, message):
|
||||||
|
if not self.can_view:
|
||||||
|
raise UnauthorizedError(self.user, message)
|
||||||
|
|
||||||
|
def check_can_approve(self):
|
||||||
|
return Authorization.check_atat_permission(
|
||||||
|
self.user,
|
||||||
|
Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST,
|
||||||
|
"cannot review and approve requests",
|
||||||
|
)
|
@ -1,7 +1,6 @@
|
|||||||
from werkzeug.datastructures import FileStorage
|
from werkzeug.datastructures import FileStorage
|
||||||
import dateutil
|
import dateutil
|
||||||
|
|
||||||
from atst.domain.authz import Authorization
|
|
||||||
from atst.domain.task_orders import TaskOrders
|
from atst.domain.task_orders import TaskOrders
|
||||||
from atst.domain.workspaces import Workspaces
|
from atst.domain.workspaces import Workspaces
|
||||||
from atst.models.request_revision import RequestRevision
|
from atst.models.request_revision import RequestRevision
|
||||||
@ -10,9 +9,8 @@ from atst.models.request_review import RequestReview
|
|||||||
from atst.models.request_internal_comment import RequestInternalComment
|
from atst.models.request_internal_comment import RequestInternalComment
|
||||||
from atst.utils import deep_merge
|
from atst.utils import deep_merge
|
||||||
|
|
||||||
from atst.domain.exceptions import UnauthorizedError
|
|
||||||
|
|
||||||
from .query import RequestsQuery
|
from .query import RequestsQuery
|
||||||
|
from .authorization import RequestsAuthorization
|
||||||
|
|
||||||
|
|
||||||
def create_revision_from_request_body(body):
|
def create_revision_from_request_body(body):
|
||||||
@ -47,18 +45,13 @@ class Requests(object):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def get(cls, user, request_id):
|
def get(cls, user, request_id):
|
||||||
request = RequestsQuery.get(request_id)
|
request = RequestsQuery.get(request_id)
|
||||||
|
RequestsAuthorization(user, request).check_can_view("get request")
|
||||||
if not Authorization.can_view_request(user, request):
|
|
||||||
raise UnauthorizedError(user, "get request")
|
|
||||||
|
|
||||||
return request
|
return request
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_for_approval(cls, user, request_id):
|
def get_for_approval(cls, user, request_id):
|
||||||
request = RequestsQuery.get(request_id)
|
request = RequestsQuery.get(request_id)
|
||||||
|
RequestsAuthorization(user, request).check_can_approve()
|
||||||
Authorization.check_can_approve_request(user)
|
|
||||||
|
|
||||||
return request
|
return request
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -226,7 +219,7 @@ class Requests(object):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def add_internal_comment(cls, user, request, comment_text):
|
def add_internal_comment(cls, user, request, comment_text):
|
||||||
Authorization.check_can_approve_request(user)
|
RequestsAuthorization(user, request).check_can_approve()
|
||||||
comment = RequestInternalComment(request=request, text=comment_text, user=user)
|
comment = RequestInternalComment(request=request, text=comment_text, user=user)
|
||||||
RequestsQuery.add_and_commit(comment)
|
RequestsQuery.add_and_commit(comment)
|
||||||
return request
|
return request
|
||||||
|
@ -26,7 +26,7 @@
|
|||||||
{{ SidenavItem("Workspaces", href="/workspaces", icon="cloud", active=g.matchesPath('/workspaces')) }}
|
{{ SidenavItem("Workspaces", href="/workspaces", icon="cloud", active=g.matchesPath('/workspaces')) }}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
|
||||||
{% if g.Authorization.can_view_audit_log(g.current_user) %}
|
{% if g.Authorization.has_atat_permission(g.current_user, g.Permissions.VIEW_AUDIT_LOG) %}
|
||||||
{{ SidenavItem("Activity History", url_for('atst.activity_history'), icon="time", active=g.matchesPath('/activity-history')) }}
|
{{ SidenavItem("Activity History", url_for('atst.activity_history'), icon="time", active=g.matchesPath('/activity-history')) }}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</ul>
|
</ul>
|
||||||
|
@ -1,20 +0,0 @@
|
|||||||
from atst.domain.authz import Authorization
|
|
||||||
from atst.domain.roles import Roles
|
|
||||||
|
|
||||||
from tests.factories import RequestFactory, UserFactory
|
|
||||||
|
|
||||||
|
|
||||||
def test_creator_can_view_own_request():
|
|
||||||
user = UserFactory.create()
|
|
||||||
request = RequestFactory.create(creator=user)
|
|
||||||
assert Authorization.can_view_request(user, request)
|
|
||||||
|
|
||||||
other_user = UserFactory.create()
|
|
||||||
assert not Authorization.can_view_request(other_user, request)
|
|
||||||
|
|
||||||
|
|
||||||
def test_ccpo_user_can_view_request():
|
|
||||||
role = Roles.get("ccpo")
|
|
||||||
ccpo_user = UserFactory.create(atat_role=role)
|
|
||||||
request = RequestFactory.create()
|
|
||||||
assert Authorization.can_view_request(ccpo_user, request)
|
|
@ -3,6 +3,7 @@ from uuid import uuid4
|
|||||||
|
|
||||||
from atst.domain.exceptions import NotFoundError
|
from atst.domain.exceptions import NotFoundError
|
||||||
from atst.domain.requests import Requests
|
from atst.domain.requests import Requests
|
||||||
|
from atst.domain.requests.authorization import RequestsAuthorization
|
||||||
from atst.models.request import Request
|
from atst.models.request import Request
|
||||||
from atst.models.request_status_event import RequestStatus
|
from atst.models.request_status_event import RequestStatus
|
||||||
from atst.models.task_order import Source as TaskOrderSource
|
from atst.models.task_order import Source as TaskOrderSource
|
||||||
@ -233,3 +234,24 @@ def test_add_internal_comment():
|
|||||||
|
|
||||||
assert len(request.internal_comments) == 1
|
assert len(request.internal_comments) == 1
|
||||||
assert request.internal_comments[0].text == "this is my comment"
|
assert request.internal_comments[0].text == "this is my comment"
|
||||||
|
|
||||||
|
|
||||||
|
def test_creator_can_view_own_request():
|
||||||
|
creator = UserFactory.create()
|
||||||
|
request = RequestFactory.create(creator=creator)
|
||||||
|
|
||||||
|
assert RequestsAuthorization(creator, request).can_view
|
||||||
|
|
||||||
|
|
||||||
|
def test_ccpo_can_view_request():
|
||||||
|
ccpo = UserFactory.from_atat_role("ccpo")
|
||||||
|
request = RequestFactory.create()
|
||||||
|
|
||||||
|
assert RequestsAuthorization(ccpo, request).can_view
|
||||||
|
|
||||||
|
|
||||||
|
def test_random_user_cannot_view_request():
|
||||||
|
user = UserFactory.create()
|
||||||
|
request = RequestFactory.create()
|
||||||
|
|
||||||
|
assert not RequestsAuthorization(user, request).can_view
|
||||||
|
Loading…
x
Reference in New Issue
Block a user