Merge pull request #232 from dod-ccpo/review-by-user

Allow user to view submitted request
This commit is contained in:
patricksmithdds 2018-08-31 10:27:44 -04:00 committed by GitHub
commit 734c1ea006
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 137 additions and 77 deletions

View File

@ -1,4 +1,5 @@
from atst.domain.workspace_users import WorkspaceUsers
from atst.models.permissions import Permissions
class Authorization(object):
@ -10,3 +11,15 @@ class Authorization(object):
@classmethod
def is_in_workspace(cls, user, workspace):
return user in workspace.users
@classmethod
def can_view_request(cls, user, request):
if (
Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST
in user.atat_permissions
):
return True
elif request.creator == user:
return True
return False

View File

@ -5,13 +5,14 @@ from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm.attributes import flag_modified
from werkzeug.datastructures import FileStorage
from atst.database import db
from atst.domain.authz import Authorization
from atst.domain.task_orders import TaskOrders
from atst.domain.workspaces import Workspaces
from atst.models.request import Request
from atst.models.request_status_event import RequestStatusEvent, RequestStatus
from atst.domain.workspaces import Workspaces
from atst.database import db
from atst.domain.task_orders import TaskOrders
from .exceptions import NotFoundError
from .exceptions import NotFoundError, UnauthorizedError
def deep_merge(source, destination: dict):
@ -59,12 +60,15 @@ class Requests(object):
return False
@classmethod
def get(cls, request_id):
def get(cls, user, request_id):
try:
request = db.session.query(Request).filter_by(id=request_id).one()
except NoResultFound:
except (NoResultFound, exc.DataError):
raise NotFoundError("request")
if not Authorization.can_view_request(user, request):
raise UnauthorizedError(user, "get request")
return request
@classmethod

View File

@ -1,4 +1,4 @@
from flask import render_template, redirect, url_for
from flask import g, render_template, redirect, url_for
from flask import request as http_request
from . import requests_bp
@ -15,7 +15,7 @@ def financial_form(data):
@requests_bp.route("/requests/verify/<string:request_id>", methods=["GET"])
def financial_verification(request_id=None):
request = Requests.get(request_id)
request = Requests.get(g.current_user, request_id)
form = financial_form(request.body.get("financial_verification"))
return render_template(
"requests/financial_verification.html",
@ -28,7 +28,7 @@ def financial_verification(request_id=None):
@requests_bp.route("/requests/verify/<string:request_id>", methods=["POST"])
def update_financial_verification(request_id):
post_data = http_request.form
existing_request = Requests.get(request_id)
existing_request = Requests.get(g.current_user, request_id)
form = financial_form(post_data)
rerender_args = dict(
request_id=request_id, f=form, extended=http_request.args.get("extended")

View File

@ -11,15 +11,15 @@ def map_request(request):
is_new = time_created.add(days=1) > pendulum.now()
app_count = request.body.get("details_of_use", {}).get("num_software_systems", 0)
annual_usage = request.annual_spend
update_url = url_for(
"requests.requests_form_update", screen=1, request_id=request.id
)
verify_url = url_for("requests.financial_verification", request_id=request.id)
edit_link = (
verify_url
if Requests.is_pending_financial_verification(request)
else update_url
)
if Requests.is_pending_financial_verification(request):
edit_link = url_for("requests.financial_verification", request_id=request.id)
elif Requests.is_pending_ccpo_approval(request):
edit_link = url_for("requests.view_pending_request", request_id=request.id)
else:
edit_link = url_for(
"requests.requests_form_update", screen=1, request_id=request.id
)
return {
"workspace_id": request.workspace.id if request.workspace else None,

View File

@ -3,9 +3,7 @@ from flask import g, redirect, render_template, url_for, request as http_request
from . import requests_bp
from atst.domain.requests import Requests
from atst.routes.requests.jedi_request_flow import JEDIRequestFlow
from atst.models.permissions import Permissions
from atst.models.request_status_event import RequestStatus
from atst.domain.exceptions import UnauthorizedError
from atst.forms.data import (
SERVICE_BRANCHES,
ASSISTANCE_ORG_TYPES,
@ -14,6 +12,16 @@ from atst.forms.data import (
)
@requests_bp.context_processor
def option_data():
return {
"service_branches": SERVICE_BRANCHES,
"assistance_org_types": ASSISTANCE_ORG_TYPES,
"data_transfer_amounts": DATA_TRANSFER_AMOUNTS,
"completion_date_ranges": COMPLETION_DATE_RANGES,
}
@requests_bp.route("/requests/new/<int:screen>", methods=["GET"])
def requests_form_new(screen):
jedi_flow = JEDIRequestFlow(screen, request=None, current_user=g.current_user)
@ -26,10 +34,6 @@ def requests_form_new(screen):
current=screen,
next_screen=screen + 1,
can_submit=jedi_flow.can_submit,
service_branches=SERVICE_BRANCHES,
assistance_org_types=ASSISTANCE_ORG_TYPES,
data_transfer_amounts=DATA_TRANSFER_AMOUNTS,
completion_date_ranges=COMPLETION_DATE_RANGES,
)
@ -38,10 +42,9 @@ def requests_form_new(screen):
)
@requests_bp.route("/requests/new/<int:screen>/<string:request_id>", methods=["GET"])
def requests_form_update(screen=1, request_id=None):
if request_id:
_check_can_view_request(request_id)
request = Requests.get(request_id) if request_id is not None else None
request = (
Requests.get(g.current_user, request_id) if request_id is not None else None
)
jedi_flow = JEDIRequestFlow(
screen, request=request, request_id=request_id, current_user=g.current_user
)
@ -56,10 +59,6 @@ def requests_form_update(screen=1, request_id=None):
request_id=request_id,
jedi_request=jedi_flow.request,
can_submit=jedi_flow.can_submit,
service_branches=SERVICE_BRANCHES,
assistance_org_types=ASSISTANCE_ORG_TYPES,
data_transfer_amounts=DATA_TRANSFER_AMOUNTS,
completion_date_ranges=COMPLETION_DATE_RANGES,
)
@ -71,7 +70,9 @@ def requests_update(screen=1, request_id=None):
screen = int(screen)
post_data = http_request.form
current_user = g.current_user
existing_request = Requests.get(request_id) if request_id is not None else None
existing_request = (
Requests.get(g.current_user, request_id) if request_id is not None else None
)
jedi_flow = JEDIRequestFlow(
screen,
post_data=post_data,
@ -109,7 +110,7 @@ def requests_update(screen=1, request_id=None):
@requests_bp.route("/requests/submit/<string:request_id>", methods=["POST"])
def requests_submit(request_id=None):
request = Requests.get(request_id)
request = Requests.get(g.current_user, request_id)
Requests.submit(request)
if request.status == RequestStatus.PENDING_FINANCIAL_VERIFICATION:
@ -119,15 +120,7 @@ def requests_submit(request_id=None):
return redirect("/requests?modal=pendingCCPOApproval")
# TODO: generalize this, along with other authorizations, into a policy-pattern
# for authorization in the application
def _check_can_view_request(request_id):
if (
Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST
in g.current_user.atat_permissions
):
pass
elif Requests.exists(request_id, g.current_user):
pass
else:
raise UnauthorizedError(g.current_user, "view request {}".format(request_id))
@requests_bp.route("/requests/pending/<string:request_id>", methods=["GET"])
def view_pending_request(request_id=None):
request = Requests.get(g.current_user, request_id)
return render_template("requests/view_pending.html", data=request.body)

View File

@ -0,0 +1,23 @@
{% extends "base.html" %}
{% from "components/alert.html" import Alert %}
{% block content %}
<div class="col">
<div class="panel">
<div class="panel__heading">
<h1>View Pending Request</h1>
</div>
<div class="panel__content">
{{ Alert('Your request is being reviewed',
message="<p>You cannot edit your submitted request while it is under review. Your request will be reviewed within 3 business days.</p>",
level='warning'
) }}
{% include "requests/_review.html" %}
</div>
</div>
</div>
{% endblock %}

View File

@ -0,0 +1,20 @@
from atst.domain.authz import Authorization
from atst.domain.roles import Roles
from tests.factories import RequestFactory, UserFactory
def test_creator_can_view_own_request():
user = UserFactory.create()
request = RequestFactory.create(creator=user)
assert Authorization.can_view_request(user, request)
other_user = UserFactory.create()
assert not Authorization.can_view_request(other_user, request)
def test_ccpo_user_can_view_request():
role = Roles.get("ccpo")
ccpo_user = UserFactory.create(atat_role=role)
request = RequestFactory.create()
assert Authorization.can_view_request(ccpo_user, request)

View File

@ -21,14 +21,15 @@ def new_request(session):
def test_can_get_request(new_request):
request = Requests.get(new_request.id)
request = Requests.get(new_request.creator, new_request.id)
assert request.id == new_request.id
def test_nonexistent_request_raises():
a_user = UserFactory.build()
with pytest.raises(NotFoundError):
Requests.get(uuid4())
Requests.get(a_user, uuid4())
def test_new_request_has_started_status():

View File

@ -5,7 +5,7 @@ from flask import url_for
from atst.eda_client import MockEDAClient
from tests.mocks import MOCK_REQUEST, MOCK_USER
from tests.factories import PENumberFactory, RequestFactory
from tests.factories import PENumberFactory, RequestFactory, UserFactory
class TestPENumberInForm:
@ -30,12 +30,12 @@ class TestPENumberInForm:
monkeypatch.setattr(
"atst.forms.financial.FinancialForm.validate", lambda s: True
)
monkeypatch.setattr(
"atst.domain.auth.get_current_user", lambda *args: MOCK_USER
)
user = UserFactory.create()
monkeypatch.setattr("atst.domain.auth.get_current_user", lambda *args: user)
return user
def submit_data(self, client, data, extended=False):
request = RequestFactory.create(body=MOCK_REQUEST.body)
def submit_data(self, client, user, data, extended=False):
request = RequestFactory.create(creator=user, body=MOCK_REQUEST.body)
url_kwargs = {"request_id": request.id}
if extended:
url_kwargs["extended"] = True
@ -47,43 +47,43 @@ class TestPENumberInForm:
return response
def test_submit_request_form_with_invalid_pe_id(self, monkeypatch, client):
self._set_monkeypatches(monkeypatch)
user = self._set_monkeypatches(monkeypatch)
response = self.submit_data(client, self.required_data)
response = self.submit_data(client, user, self.required_data)
assert "We couldn&#39;t find that PE number" in response.data.decode()
assert response.status_code == 200
def test_submit_request_form_with_unchanged_pe_id(self, monkeypatch, client):
self._set_monkeypatches(monkeypatch)
user = self._set_monkeypatches(monkeypatch)
data = dict(self.required_data)
data["pe_id"] = MOCK_REQUEST.body["financial_verification"]["pe_id"]
response = self.submit_data(client, data)
response = self.submit_data(client, user, data)
assert response.status_code == 302
assert "/workspaces" in response.headers.get("Location")
def test_submit_request_form_with_new_valid_pe_id(self, monkeypatch, client):
self._set_monkeypatches(monkeypatch)
user = self._set_monkeypatches(monkeypatch)
pe = PENumberFactory.create(number="8675309U", description="sample PE number")
data = dict(self.required_data)
data["pe_id"] = pe.number
response = self.submit_data(client, data)
response = self.submit_data(client, user, data)
assert response.status_code == 302
assert "/workspaces" in response.headers.get("Location")
def test_submit_request_form_with_missing_pe_id(self, monkeypatch, client):
self._set_monkeypatches(monkeypatch)
user = self._set_monkeypatches(monkeypatch)
data = dict(self.required_data)
data["pe_id"] = ""
response = self.submit_data(client, data)
response = self.submit_data(client, user, data)
assert "There were some errors" in response.data.decode()
assert response.status_code == 200
@ -91,41 +91,46 @@ class TestPENumberInForm:
def test_submit_financial_form_with_invalid_task_order(
self, monkeypatch, user_session, client
):
user_session()
user = UserFactory.create()
user_session(user)
data = dict(self.required_data)
data["pe_id"] = MOCK_REQUEST.body["financial_verification"]["pe_id"]
data["task_order_number"] = "1234"
response = self.submit_data(client, data)
response = self.submit_data(client, user, data)
assert "enter TO information manually" in response.data.decode()
def test_submit_financial_form_with_valid_task_order(
self, monkeypatch, user_session, client
):
monkeypatch.setattr("atst.domain.requests.Requests.get", lambda i: MOCK_REQUEST)
user_session()
user = UserFactory.create()
monkeypatch.setattr(
"atst.domain.requests.Requests.get", lambda *args: MOCK_REQUEST
)
user_session(user)
data = dict(self.required_data)
data["pe_id"] = MOCK_REQUEST.body["financial_verification"]["pe_id"]
data["task_order_number"] = MockEDAClient.MOCK_CONTRACT_NUMBER
response = self.submit_data(client, data)
response = self.submit_data(client, user, data)
assert "enter TO information manually" not in response.data.decode()
def test_submit_extended_financial_form(
self, monkeypatch, user_session, client, extended_financial_verification_data
):
request = RequestFactory.create()
monkeypatch.setattr("atst.domain.requests.Requests.get", lambda i: request)
user = UserFactory.create()
request = RequestFactory.create(creator=user)
monkeypatch.setattr("atst.domain.requests.Requests.get", lambda *args: request)
monkeypatch.setattr("atst.forms.financial.validate_pe_id", lambda *args: True)
user_session()
data = {**self.required_data, **extended_financial_verification_data}
data["task_order_number"] = "1234567"
response = self.submit_data(client, data, extended=True)
response = self.submit_data(client, user, data, extended=True)
assert response.status_code == 302
assert "/requests" in response.headers.get("Location")
@ -134,11 +139,12 @@ class TestPENumberInForm:
self, monkeypatch, user_session, client, extended_financial_verification_data
):
monkeypatch.setattr("atst.forms.financial.validate_pe_id", lambda *args: True)
user_session()
user = UserFactory.create()
user_session(user)
data = {**self.required_data, **extended_financial_verification_data}
data["task_order_number"] = "1234567"
del (data["clin_0001"])
response = self.submit_data(client, data, extended=True)
response = self.submit_data(client, user, data, extended=True)
assert response.status_code == 200

View File

@ -122,7 +122,7 @@ def test_am_poc_causes_poc_to_be_autopopulated(client, user_session):
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="am_poc=yes",
)
request = Requests.get(request.id)
request = Requests.get(creator, request.id)
assert request.body["primary_poc"]["dodid_poc"] == creator.dod_id
@ -167,7 +167,7 @@ def test_poc_details_can_be_autopopulated_on_new_request(client, user_session):
data="am_poc=yes",
)
request_id = response.headers["Location"].split("/")[-1]
request = Requests.get(request_id)
request = Requests.get(creator, request_id)
assert request.body["primary_poc"]["dodid_poc"] == creator.dod_id
@ -191,7 +191,7 @@ def test_poc_autofill_checks_information_about_you_form_first(client, user_sessi
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=urlencode(poc_input),
)
request = Requests.get(request.id)
request = Requests.get(creator, request.id)
assert dict_contains(
request.body["primary_poc"],
{

View File

@ -55,7 +55,7 @@ def test_stepthrough_request_form(user_session, screens, client):
# at this point, the real request we made and the mock_request bodies
# should be equivalent
assert Requests.get(req_id).body == mock_request.body
assert Requests.get(user, req_id).body == mock_request.body
# finish the review and submit step
client.post(
@ -63,5 +63,5 @@ def test_stepthrough_request_form(user_session, screens, client):
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
finished_request = Requests.get(req_id)
finished_request = Requests.get(user, req_id)
assert finished_request.status == RequestStatus.PENDING_CCPO_APPROVAL