simple implementation of request view authorization
This commit is contained in:
parent
1a5800cbc5
commit
2cfc142417
@ -3,6 +3,7 @@ from flask import g, redirect, render_template, url_for, request as http_request
|
|||||||
from . import requests_bp
|
from . import requests_bp
|
||||||
from atst.domain.requests import Requests
|
from atst.domain.requests import Requests
|
||||||
from atst.routes.requests.jedi_request_flow import JEDIRequestFlow
|
from atst.routes.requests.jedi_request_flow import JEDIRequestFlow
|
||||||
|
from atst.models.permissions import Permissions
|
||||||
|
|
||||||
|
|
||||||
@requests_bp.route("/requests/new/<int:screen>", methods=["GET"])
|
@requests_bp.route("/requests/new/<int:screen>", methods=["GET"])
|
||||||
@ -25,6 +26,9 @@ def requests_form_new(screen):
|
|||||||
)
|
)
|
||||||
@requests_bp.route("/requests/new/<int:screen>/<string:request_id>", methods=["GET"])
|
@requests_bp.route("/requests/new/<int:screen>/<string:request_id>", methods=["GET"])
|
||||||
def requests_form_update(screen=1, request_id=None):
|
def requests_form_update(screen=1, request_id=None):
|
||||||
|
if request_id and not _can_view_request(request_id):
|
||||||
|
return redirect(url_for("atst.unauthorized"))
|
||||||
|
|
||||||
request = Requests.get(request_id) if request_id is not None else None
|
request = Requests.get(request_id) if request_id is not None else None
|
||||||
jedi_flow = JEDIRequestFlow(screen, request, request_id=request_id)
|
jedi_flow = JEDIRequestFlow(screen, request, request_id=request_id)
|
||||||
|
|
||||||
@ -79,10 +83,12 @@ def requests_update(screen=1, request_id=None):
|
|||||||
request_id=jedi_flow.request_id,
|
request_id=jedi_flow.request_id,
|
||||||
)
|
)
|
||||||
return redirect(where)
|
return redirect(where)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return render_template(
|
return render_template(
|
||||||
"requests/screen-%d.html" % int(screen), **rerender_args
|
"requests/screen-%d.html" % int(screen), **rerender_args
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return render_template("requests/screen-%d.html" % int(screen), **rerender_args)
|
return render_template("requests/screen-%d.html" % int(screen), **rerender_args)
|
||||||
|
|
||||||
@ -94,5 +100,15 @@ def requests_submit(request_id=None):
|
|||||||
|
|
||||||
if request.status == "approved":
|
if request.status == "approved":
|
||||||
return redirect("/requests?modal=True")
|
return redirect("/requests?modal=True")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return redirect("/requests")
|
return redirect("/requests")
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: generalize this, along with other authorizations, into a policy-pattern
|
||||||
|
# for authorization in the application
|
||||||
|
def _can_view_request(request_id):
|
||||||
|
return (
|
||||||
|
Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST in g.current_user.atat_permissions
|
||||||
|
or Requests.is_creator(request_id, g.current_user.id)
|
||||||
|
)
|
||||||
|
@ -9,6 +9,8 @@ from atst.models.pe_number import PENumber
|
|||||||
from atst.models.task_order import TaskOrder
|
from atst.models.task_order import TaskOrder
|
||||||
from atst.models.user import User
|
from atst.models.user import User
|
||||||
from atst.models.role import Role
|
from atst.models.role import Role
|
||||||
|
from atst.models.request_status_event import RequestStatusEvent
|
||||||
|
from atst.domain.roles import Roles
|
||||||
|
|
||||||
|
|
||||||
class RequestStatusFactory(factory.alchemy.SQLAlchemyModelFactory):
|
class RequestStatusFactory(factory.alchemy.SQLAlchemyModelFactory):
|
||||||
@ -24,6 +26,7 @@ class RequestFactory(factory.alchemy.SQLAlchemyModelFactory):
|
|||||||
status_events = factory.RelatedFactory(
|
status_events = factory.RelatedFactory(
|
||||||
RequestStatusFactory, "request", new_status=RequestStatus.STARTED
|
RequestStatusFactory, "request", new_status=RequestStatus.STARTED
|
||||||
)
|
)
|
||||||
|
body = {}
|
||||||
|
|
||||||
|
|
||||||
class PENumberFactory(factory.alchemy.SQLAlchemyModelFactory):
|
class PENumberFactory(factory.alchemy.SQLAlchemyModelFactory):
|
||||||
@ -53,3 +56,11 @@ class UserFactory(factory.alchemy.SQLAlchemyModelFactory):
|
|||||||
last_name = factory.Faker("last_name")
|
last_name = factory.Faker("last_name")
|
||||||
atat_role = factory.SubFactory(RoleFactory)
|
atat_role = factory.SubFactory(RoleFactory)
|
||||||
dod_id = factory.LazyFunction(lambda: "".join(random.choices(string.digits, k=10)))
|
dod_id = factory.LazyFunction(lambda: "".join(random.choices(string.digits, k=10)))
|
||||||
|
|
||||||
|
|
||||||
|
class RequestStatusEventFactory(factory.alchemy.SQLAlchemyModelFactory):
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
model = RequestStatusEvent
|
||||||
|
|
||||||
|
id = factory.Sequence(lambda x: uuid4())
|
||||||
|
@ -2,7 +2,8 @@ import re
|
|||||||
import pytest
|
import pytest
|
||||||
import urllib
|
import urllib
|
||||||
from tests.mocks import MOCK_USER, MOCK_REQUEST
|
from tests.mocks import MOCK_USER, MOCK_REQUEST
|
||||||
from tests.factories import RequestFactory
|
from tests.factories import RequestFactory, UserFactory, RequestStatusEventFactory
|
||||||
|
from atst.domain.roles import Roles
|
||||||
|
|
||||||
|
|
||||||
ERROR_CLASS = "alert--error"
|
ERROR_CLASS = "alert--error"
|
||||||
@ -27,3 +28,37 @@ def test_submit_valid_request_form(monkeypatch, client, user_session):
|
|||||||
data="meaning=42",
|
data="meaning=42",
|
||||||
)
|
)
|
||||||
assert "/requests/new/2" in response.headers.get("Location")
|
assert "/requests/new/2" in response.headers.get("Location")
|
||||||
|
|
||||||
|
|
||||||
|
def test_owner_can_view_request(client, user_session):
|
||||||
|
user = UserFactory.create()
|
||||||
|
user_session(user)
|
||||||
|
request = RequestFactory.create(creator=user.id)
|
||||||
|
status = RequestStatusEventFactory.create(request_id=request.id)
|
||||||
|
|
||||||
|
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_non_owner_cannot_view_request(client, user_session):
|
||||||
|
user = UserFactory.create()
|
||||||
|
user_session(user)
|
||||||
|
request = RequestFactory.create()
|
||||||
|
status = RequestStatusEventFactory.create(request_id=request.id)
|
||||||
|
|
||||||
|
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
|
||||||
|
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
def test_ccpo_can_view_request(client, user_session):
|
||||||
|
ccpo = Roles.get("ccpo")
|
||||||
|
user = UserFactory.create(atat_role=ccpo)
|
||||||
|
user_session(user)
|
||||||
|
request = RequestFactory.create()
|
||||||
|
status = RequestStatusEventFactory.create(request_id=request.id)
|
||||||
|
|
||||||
|
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
Loading…
x
Reference in New Issue
Block a user