Remove Requests routes
This commit is contained in:
parent
9af70044ed
commit
c3b79103a4
@ -13,7 +13,6 @@ from atst.assets import environment as assets_environment
|
||||
from atst.filters import register_filters
|
||||
from atst.routes import bp
|
||||
from atst.routes.portfolios import portfolios_bp as portfolio_routes
|
||||
from atst.routes.requests import requests_bp
|
||||
from atst.routes.task_orders import task_orders_bp
|
||||
from atst.routes.dev import bp as dev_routes
|
||||
from atst.routes.users import bp as user_routes
|
||||
@ -68,7 +67,6 @@ def make_app(config):
|
||||
app.register_blueprint(portfolio_routes)
|
||||
app.register_blueprint(task_orders_bp)
|
||||
app.register_blueprint(user_routes)
|
||||
app.register_blueprint(requests_bp)
|
||||
|
||||
if ENV != "prod":
|
||||
app.register_blueprint(dev_routes)
|
||||
|
@ -1,15 +0,0 @@
|
||||
from flask import Blueprint
|
||||
|
||||
from atst.domain.requests import Requests
|
||||
|
||||
requests_bp = Blueprint("requests", __name__)
|
||||
|
||||
from . import index
|
||||
from . import requests_form
|
||||
from . import financial_verification
|
||||
from . import approval
|
||||
|
||||
|
||||
@requests_bp.context_processor
|
||||
def annual_spend_threshold():
|
||||
return {"annual_spend_threshold": Requests.ANNUAL_SPEND_THRESHOLD}
|
@ -1,97 +0,0 @@
|
||||
from flask import (
|
||||
render_template,
|
||||
g,
|
||||
Response,
|
||||
request as http_request,
|
||||
redirect,
|
||||
url_for,
|
||||
)
|
||||
from flask import current_app as app
|
||||
|
||||
from . import requests_bp
|
||||
from atst.domain.requests import Requests
|
||||
from atst.domain.exceptions import NotFoundError
|
||||
from atst.forms.ccpo_review import CCPOReviewForm
|
||||
from atst.forms.internal_comment import InternalCommentForm
|
||||
|
||||
|
||||
def map_ccpo_authorizing(user):
|
||||
return {"fname_ccpo": user.first_name, "lname_ccpo": user.last_name}
|
||||
|
||||
|
||||
def render_approval(request, form=None, internal_comment_form=None):
|
||||
data = request.body
|
||||
if request.has_financial_data:
|
||||
data["legacy_task_order"] = request.legacy_task_order.to_dictionary()
|
||||
|
||||
if not form:
|
||||
mo_data = map_ccpo_authorizing(g.current_user)
|
||||
form = CCPOReviewForm(data=mo_data)
|
||||
|
||||
if not internal_comment_form:
|
||||
internal_comment_form = InternalCommentForm()
|
||||
|
||||
return render_template(
|
||||
"requests/approval.html",
|
||||
data=data,
|
||||
reviews=list(reversed(request.reviews)),
|
||||
jedi_request=request,
|
||||
current_status=request.status.value,
|
||||
review_form=form or CCPOReviewForm(),
|
||||
internal_comment_form=internal_comment_form,
|
||||
comments=request.internal_comments,
|
||||
)
|
||||
|
||||
|
||||
@requests_bp.route("/requests/approval/<string:request_id>", methods=["GET"])
|
||||
def approval(request_id):
|
||||
request = Requests.get_for_approval(g.current_user, request_id)
|
||||
|
||||
return render_approval(request)
|
||||
|
||||
|
||||
@requests_bp.route("/requests/submit_approval/<string:request_id>", methods=["POST"])
|
||||
def submit_approval(request_id):
|
||||
request = Requests.get_for_approval(g.current_user, request_id)
|
||||
|
||||
form = CCPOReviewForm(http_request.form)
|
||||
if form.validate():
|
||||
if http_request.form.get("review") == "approving":
|
||||
Requests.advance(g.current_user, request, form.data)
|
||||
else:
|
||||
Requests.request_changes(g.current_user, request, form.data)
|
||||
|
||||
return redirect(url_for("requests.requests_index"))
|
||||
else:
|
||||
return render_approval(request, form)
|
||||
|
||||
|
||||
@requests_bp.route("/requests/task_order_download/<string:request_id>", methods=["GET"])
|
||||
def task_order_pdf_download(request_id):
|
||||
request = Requests.get(g.current_user, request_id)
|
||||
if request.legacy_task_order and request.legacy_task_order.pdf:
|
||||
pdf = request.legacy_task_order.pdf
|
||||
generator = app.csp.files.download(pdf.object_name)
|
||||
return Response(
|
||||
generator,
|
||||
headers={
|
||||
"Content-Disposition": "attachment; filename={}".format(pdf.filename)
|
||||
},
|
||||
mimetype="application/pdf",
|
||||
)
|
||||
|
||||
else:
|
||||
raise NotFoundError("legacy_task_order pdf")
|
||||
|
||||
|
||||
@requests_bp.route("/requests/internal_comments/<string:request_id>", methods=["POST"])
|
||||
def create_internal_comment(request_id):
|
||||
form = InternalCommentForm(http_request.form)
|
||||
request = Requests.get(g.current_user, request_id)
|
||||
if form.validate():
|
||||
Requests.add_internal_comment(g.current_user, request, form.data.get("text"))
|
||||
return redirect(
|
||||
url_for("requests.approval", request_id=request_id, _anchor="ccpo-notes")
|
||||
)
|
||||
else:
|
||||
return render_approval(request, internal_comment_form=form)
|
@ -1,291 +0,0 @@
|
||||
from flask import g, render_template, redirect, url_for
|
||||
from flask import request as http_request
|
||||
from werkzeug.datastructures import ImmutableMultiDict, FileStorage
|
||||
|
||||
from . import requests_bp
|
||||
from atst.domain.requests import Requests
|
||||
from atst.forms.financial import FinancialVerificationForm
|
||||
from atst.forms.exceptions import FormValidationError
|
||||
from atst.domain.exceptions import NotFoundError
|
||||
from atst.domain.requests.financial_verification import (
|
||||
PENumberValidator,
|
||||
TaskOrderNumberValidator,
|
||||
)
|
||||
from atst.models.attachment import Attachment
|
||||
from atst.domain.legacy_task_orders import LegacyTaskOrders
|
||||
from atst.utils.flash import formatted_flash as flash
|
||||
|
||||
|
||||
def fv_extended(_http_request):
|
||||
return _http_request.args.get("extended", "false").lower() in ["true", "t"]
|
||||
|
||||
|
||||
class FinancialVerification(object):
|
||||
def __init__(self, request):
|
||||
self.request = request.latest_revision
|
||||
self.legacy_task_order = request.legacy_task_order
|
||||
|
||||
|
||||
class FinancialVerificationBase(object):
|
||||
def _get_form(self, request, is_extended, formdata=None):
|
||||
_formdata = ImmutableMultiDict(formdata) if formdata is not None else None
|
||||
fv = FinancialVerification(request)
|
||||
form = FinancialVerificationForm(obj=fv, formdata=_formdata)
|
||||
|
||||
if not form.has_pdf_upload:
|
||||
if isinstance(form.legacy_task_order.pdf.data, Attachment):
|
||||
form.legacy_task_order.pdf.data = (
|
||||
form.legacy_task_order.pdf.data.filename
|
||||
)
|
||||
else:
|
||||
try:
|
||||
attachment = Attachment.get_for_resource(
|
||||
"legacy_task_order", self.request.id
|
||||
)
|
||||
form.legacy_task_order.pdf.data = attachment.filename
|
||||
except NotFoundError:
|
||||
pass
|
||||
|
||||
return form
|
||||
|
||||
def _process_attachment(self, is_extended, form):
|
||||
attachment = None
|
||||
if is_extended:
|
||||
attachment = None
|
||||
if isinstance(form.legacy_task_order.pdf.data, FileStorage):
|
||||
Attachment.delete_for_resource("legacy_task_order", self.request.id)
|
||||
attachment = Attachment.attach(
|
||||
form.legacy_task_order.pdf.data,
|
||||
"legacy_task_order",
|
||||
self.request.id,
|
||||
)
|
||||
elif isinstance(form.legacy_task_order.pdf.data, str):
|
||||
try:
|
||||
attachment = Attachment.get_for_resource(
|
||||
"legacy_task_order", self.request.id
|
||||
)
|
||||
except NotFoundError:
|
||||
pass
|
||||
|
||||
if attachment:
|
||||
form.legacy_task_order.pdf.data = attachment.filename
|
||||
|
||||
return attachment
|
||||
|
||||
def _try_create_task_order(self, form, attachment, is_extended):
|
||||
task_order_number = form.legacy_task_order.number.data
|
||||
if not task_order_number:
|
||||
return None
|
||||
|
||||
task_order_data = form.legacy_task_order.data
|
||||
|
||||
if attachment:
|
||||
task_order_data["pdf"] = attachment
|
||||
|
||||
try:
|
||||
legacy_task_order = LegacyTaskOrders.get(task_order_number)
|
||||
legacy_task_order = LegacyTaskOrders.update(
|
||||
legacy_task_order, task_order_data
|
||||
)
|
||||
return legacy_task_order
|
||||
except NotFoundError:
|
||||
pass
|
||||
|
||||
try:
|
||||
return LegacyTaskOrders.get_from_eda(task_order_number)
|
||||
except NotFoundError:
|
||||
pass
|
||||
|
||||
return LegacyTaskOrders.create(**task_order_data)
|
||||
|
||||
def _raise(self, form):
|
||||
form.reset()
|
||||
raise FormValidationError(form)
|
||||
|
||||
|
||||
class GetFinancialVerificationForm(FinancialVerificationBase):
|
||||
def __init__(self, user, request, is_extended=False):
|
||||
self.user = user
|
||||
self.request = request
|
||||
self.is_extended = is_extended
|
||||
|
||||
def execute(self):
|
||||
form = self._get_form(self.request, self.is_extended)
|
||||
form.reset()
|
||||
return form
|
||||
|
||||
|
||||
class UpdateFinancialVerification(FinancialVerificationBase):
|
||||
def __init__(
|
||||
self,
|
||||
pe_validator,
|
||||
task_order_validator,
|
||||
user,
|
||||
request,
|
||||
fv_data,
|
||||
is_extended=False,
|
||||
):
|
||||
self.pe_validator = pe_validator
|
||||
self.task_order_validator = task_order_validator
|
||||
self.user = user
|
||||
self.request = request
|
||||
self.fv_data = fv_data
|
||||
self.is_extended = is_extended
|
||||
|
||||
def execute(self):
|
||||
form = self._get_form(self.request, self.is_extended, self.fv_data)
|
||||
|
||||
should_update = True
|
||||
should_submit = True
|
||||
updated_request = None
|
||||
|
||||
attachment = self._process_attachment(self.is_extended, form)
|
||||
|
||||
if not form.validate(is_extended=self.is_extended, has_attachment=attachment):
|
||||
should_update = False
|
||||
|
||||
if not self.pe_validator.validate(self.request, form.pe_id):
|
||||
should_submit = False
|
||||
|
||||
if not self.is_extended and not self.task_order_validator.validate(
|
||||
form.legacy_task_order.number
|
||||
):
|
||||
should_submit = False
|
||||
|
||||
if should_update:
|
||||
legacy_task_order = self._try_create_task_order(
|
||||
form, attachment, self.is_extended
|
||||
)
|
||||
updated_request = Requests.update_financial_verification(
|
||||
self.request.id, form.request.data, legacy_task_order=legacy_task_order
|
||||
)
|
||||
if should_submit:
|
||||
return Requests.submit_financial_verification(updated_request)
|
||||
|
||||
self._raise(form)
|
||||
|
||||
|
||||
class SaveFinancialVerificationDraft(FinancialVerificationBase):
|
||||
def __init__(
|
||||
self,
|
||||
pe_validator,
|
||||
task_order_validator,
|
||||
user,
|
||||
request,
|
||||
fv_data,
|
||||
is_extended=False,
|
||||
):
|
||||
self.pe_validator = pe_validator
|
||||
self.task_order_validator = task_order_validator
|
||||
self.user = user
|
||||
self.request = request
|
||||
self.fv_data = fv_data
|
||||
self.is_extended = is_extended
|
||||
|
||||
def execute(self):
|
||||
form = self._get_form(self.request, self.is_extended, self.fv_data)
|
||||
attachment = self._process_attachment(self.is_extended, form)
|
||||
legacy_task_order = self._try_create_task_order(
|
||||
form, attachment, self.is_extended
|
||||
)
|
||||
updated_request = Requests.update_financial_verification(
|
||||
self.request.id, form.request.data, legacy_task_order=legacy_task_order
|
||||
)
|
||||
|
||||
return updated_request
|
||||
|
||||
|
||||
@requests_bp.route("/requests/verify/<string:request_id>/draft", methods=["GET"])
|
||||
@requests_bp.route("/requests/verify/<string:request_id>", methods=["GET"])
|
||||
def financial_verification(request_id):
|
||||
request = Requests.get(g.current_user, request_id)
|
||||
is_extended = fv_extended(http_request)
|
||||
saved_draft = http_request.args.get("saved_draft", False)
|
||||
|
||||
should_be_extended = not is_extended and request.has_manual_task_order
|
||||
if should_be_extended:
|
||||
return redirect(
|
||||
url_for(".financial_verification", request_id=request_id, extended=True)
|
||||
)
|
||||
|
||||
form = GetFinancialVerificationForm(
|
||||
g.current_user, request, is_extended=is_extended
|
||||
).execute()
|
||||
|
||||
if request.review_comment:
|
||||
flash("request_review_comment", comment=request.review_comment)
|
||||
|
||||
return render_template(
|
||||
"requests/financial_verification.html",
|
||||
f=form,
|
||||
jedi_request=request,
|
||||
extended=is_extended,
|
||||
saved_draft=saved_draft,
|
||||
)
|
||||
|
||||
|
||||
@requests_bp.route("/requests/verify/<string:request_id>", methods=["POST"])
|
||||
def update_financial_verification(request_id):
|
||||
request = Requests.get(g.current_user, request_id)
|
||||
fv_data = {**http_request.form, **http_request.files}
|
||||
is_extended = fv_extended(http_request)
|
||||
|
||||
try:
|
||||
updated_request = UpdateFinancialVerification(
|
||||
PENumberValidator(),
|
||||
TaskOrderNumberValidator(),
|
||||
g.current_user,
|
||||
request,
|
||||
fv_data,
|
||||
is_extended=is_extended,
|
||||
).execute()
|
||||
except FormValidationError as e:
|
||||
return render_template(
|
||||
"requests/financial_verification.html",
|
||||
jedi_request=request,
|
||||
f=e.form,
|
||||
extended=is_extended,
|
||||
)
|
||||
|
||||
if updated_request.legacy_task_order.verified:
|
||||
portfolio = Requests.auto_approve_and_create_portfolio(updated_request)
|
||||
flash("new_portfolio")
|
||||
return redirect(
|
||||
url_for("portfolios.new_application", portfolio_id=portfolio.id)
|
||||
)
|
||||
else:
|
||||
return redirect(url_for("requests.requests_index", modal="pendingCCPOApproval"))
|
||||
|
||||
|
||||
@requests_bp.route("/requests/verify/<string:request_id>/draft", methods=["POST"])
|
||||
def save_financial_verification_draft(request_id):
|
||||
user = g.current_user
|
||||
request = Requests.get(user, request_id)
|
||||
fv_data = {**http_request.form, **http_request.files}
|
||||
is_extended = fv_extended(http_request)
|
||||
|
||||
try:
|
||||
updated_request = SaveFinancialVerificationDraft(
|
||||
PENumberValidator(),
|
||||
TaskOrderNumberValidator(),
|
||||
user,
|
||||
request,
|
||||
fv_data,
|
||||
is_extended=is_extended,
|
||||
).execute()
|
||||
except FormValidationError as e:
|
||||
return render_template(
|
||||
"requests/financial_verification.html",
|
||||
jedi_request=request,
|
||||
f=e.form,
|
||||
extended=is_extended,
|
||||
)
|
||||
|
||||
return redirect(
|
||||
url_for(
|
||||
"requests.financial_verification",
|
||||
request_id=updated_request.id,
|
||||
is_extended=is_extended,
|
||||
saved_draft=True,
|
||||
)
|
||||
)
|
@ -1,107 +0,0 @@
|
||||
import pendulum
|
||||
from flask import render_template, g, url_for
|
||||
|
||||
from . import requests_bp
|
||||
from atst.domain.requests import Requests
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.forms.data import SERVICE_BRANCHES
|
||||
from atst.utils.flash import formatted_flash as flash
|
||||
|
||||
|
||||
class RequestsIndex(object):
|
||||
def __init__(self, user):
|
||||
self.user = user
|
||||
|
||||
def execute(self):
|
||||
if (
|
||||
Permissions.REVIEW_AND_APPROVE_JEDI_PORTFOLIO_REQUEST
|
||||
in self.user.atat_permissions
|
||||
):
|
||||
context = self._ccpo_view(self.user)
|
||||
|
||||
else:
|
||||
context = self._non_ccpo_view(self.user)
|
||||
|
||||
return {
|
||||
**context,
|
||||
"possible_statuses": Requests.possible_statuses(),
|
||||
"possible_dod_components": [b[0] for b in SERVICE_BRANCHES[1:]],
|
||||
}
|
||||
|
||||
def _ccpo_view(self, user):
|
||||
requests = Requests.get_many()
|
||||
mapped_requests = [self._map_request(r, "ccpo") for r in requests]
|
||||
num_action_required = len(
|
||||
[r for r in mapped_requests if r.get("action_required")]
|
||||
)
|
||||
|
||||
return {
|
||||
"requests": mapped_requests,
|
||||
"pending_financial_verification": False,
|
||||
"pending_ccpo_acceptance": False,
|
||||
"extended_view": True,
|
||||
"kpi_inprogress": Requests.in_progress_count(),
|
||||
"kpi_pending": Requests.pending_ccpo_count(),
|
||||
"kpi_completed": Requests.completed_count(),
|
||||
"num_action_required": num_action_required,
|
||||
}
|
||||
|
||||
def _non_ccpo_view(self, user):
|
||||
requests = Requests.get_many(creator=user)
|
||||
mapped_requests = [self._map_request(r, "mission_owner") for r in requests]
|
||||
num_action_required = len(
|
||||
[r for r in mapped_requests if r.get("action_required")]
|
||||
)
|
||||
pending_fv = any(r.is_pending_financial_verification for r in requests)
|
||||
pending_ccpo = any(r.is_pending_ccpo_acceptance for r in requests)
|
||||
|
||||
return {
|
||||
"requests": mapped_requests,
|
||||
"pending_financial_verification": pending_fv,
|
||||
"pending_ccpo_acceptance": pending_ccpo,
|
||||
"num_action_required": num_action_required,
|
||||
"extended_view": False,
|
||||
}
|
||||
|
||||
def _portfolio_link_for_request(self, request):
|
||||
if request.is_approved:
|
||||
return url_for(
|
||||
"portfolios.portfolio_applications", portfolio_id=request.portfolio.id
|
||||
)
|
||||
else:
|
||||
return None
|
||||
|
||||
def _map_request(self, request, viewing_role):
|
||||
time_created = pendulum.instance(request.time_created)
|
||||
is_new = time_created.add(days=1) > pendulum.now()
|
||||
app_count = request.body.get("details_of_use", {}).get(
|
||||
"num_software_systems", 0
|
||||
)
|
||||
annual_usage = request.annual_spend
|
||||
|
||||
return {
|
||||
"portfolio_id": request.portfolio.id if request.portfolio else None,
|
||||
"name": request.displayname,
|
||||
"is_new": is_new,
|
||||
"is_approved": request.is_approved,
|
||||
"status": request.status_displayname,
|
||||
"app_count": app_count,
|
||||
"last_submission_timestamp": request.last_submission_timestamp,
|
||||
"last_edited_timestamp": request.latest_revision.time_updated,
|
||||
"full_name": request.creator.full_name,
|
||||
"annual_usage": annual_usage,
|
||||
"edit_link": url_for("requests.edit", request_id=request.id),
|
||||
"action_required": request.action_required_by == viewing_role,
|
||||
"dod_component": request.latest_revision.dod_component,
|
||||
"portfolio_link": self._portfolio_link_for_request(request),
|
||||
}
|
||||
|
||||
|
||||
@requests_bp.route("/requests", methods=["GET"])
|
||||
def requests_index():
|
||||
context = RequestsIndex(g.current_user).execute()
|
||||
|
||||
if context.get("num_action_required"):
|
||||
flash("requests_action_required", count=context.get("num_action_required"))
|
||||
|
||||
return render_template("requests/index.html", **context)
|
@ -1,162 +0,0 @@
|
||||
from collections import defaultdict
|
||||
|
||||
from atst.domain.requests import Requests
|
||||
import atst.forms.new_request as request_forms
|
||||
|
||||
|
||||
class JEDIRequestFlow(object):
|
||||
def __init__(
|
||||
self,
|
||||
current_step,
|
||||
current_user=None,
|
||||
request=None,
|
||||
post_data=None,
|
||||
request_id=None,
|
||||
existing_request=None,
|
||||
):
|
||||
self.current_step = current_step
|
||||
|
||||
self.current_user = current_user
|
||||
self.request = request
|
||||
|
||||
self.post_data = post_data
|
||||
self.is_post = self.post_data is not None
|
||||
|
||||
self.request_id = request_id
|
||||
self.form = self._form()
|
||||
|
||||
self.existing_request = existing_request
|
||||
|
||||
def _form(self):
|
||||
if self.is_post:
|
||||
return self.form_class()(self.post_data)
|
||||
else:
|
||||
return self.form_class()(data=self.current_step_data)
|
||||
|
||||
def validate(self):
|
||||
return self.form.validate()
|
||||
|
||||
def validate_warnings(self):
|
||||
existing_request_data = (
|
||||
self.existing_request and self.existing_request.body.get(self.form_section)
|
||||
) or None
|
||||
|
||||
valid = self.form.perform_extra_validation(existing_request_data)
|
||||
return valid
|
||||
|
||||
@property
|
||||
def current_screen(self):
|
||||
return self.screens[self.current_step - 1]
|
||||
|
||||
@property
|
||||
def form_section(self):
|
||||
return self.current_screen["section"]
|
||||
|
||||
def form_class(self):
|
||||
return self.current_screen["form"]
|
||||
|
||||
# maps user data to fields in InformationAboutYouForm; this should be moved
|
||||
# into the request initialization process when we have a request schema, or
|
||||
# we just shouldn't record this data on the request
|
||||
def map_user_data(self, user):
|
||||
return {
|
||||
"fname_request": user.first_name,
|
||||
"lname_request": user.last_name,
|
||||
"email_request": user.email,
|
||||
"phone_number": user.phone_number,
|
||||
"phone_ext": user.phone_ext,
|
||||
"service_branch": user.service_branch,
|
||||
"designation": user.designation,
|
||||
"citizenship": user.citizenship,
|
||||
"date_latest_training": user.date_latest_training,
|
||||
}
|
||||
|
||||
@property
|
||||
def current_step_data(self):
|
||||
data = {}
|
||||
|
||||
if self.is_post:
|
||||
data = self.post_data
|
||||
|
||||
if self.request:
|
||||
if self.form_section == "review_submit":
|
||||
data = self.request.body
|
||||
elif self.form_section == "information_about_you":
|
||||
form_data = self.request.body.get(self.form_section, {})
|
||||
data = {**self.map_user_data(self.request.creator), **form_data}
|
||||
else:
|
||||
data = self.request.body.get(self.form_section, {})
|
||||
elif self.form_section == "information_about_you":
|
||||
data = self.map_user_data(self.current_user)
|
||||
|
||||
return defaultdict(lambda: defaultdict(lambda: None), data)
|
||||
|
||||
@property
|
||||
def can_submit(self):
|
||||
return self.request and Requests.should_allow_submission(self.request)
|
||||
|
||||
@property
|
||||
def next_screen(self):
|
||||
return self.current_step + 1
|
||||
|
||||
@property
|
||||
def screens(self):
|
||||
return [
|
||||
{
|
||||
"title": "Details of Use",
|
||||
"section": "details_of_use",
|
||||
"form": request_forms.DetailsOfUseForm,
|
||||
},
|
||||
{
|
||||
"title": "Information About You",
|
||||
"section": "information_about_you",
|
||||
"form": request_forms.InformationAboutYouForm,
|
||||
},
|
||||
{
|
||||
"title": "Portfolio Owner",
|
||||
"section": "primary_poc",
|
||||
"form": request_forms.PortfolioOwnerForm,
|
||||
},
|
||||
{
|
||||
"title": "Review & Submit",
|
||||
"section": "review_submit",
|
||||
"form": request_forms.ReviewAndSubmitForm,
|
||||
},
|
||||
]
|
||||
|
||||
@property
|
||||
def is_review_screen(self):
|
||||
return self.screens[-1] == self.current_screen
|
||||
|
||||
def create_or_update_request(self):
|
||||
request_data = self.map_request_data(self.form_section, self.form.data)
|
||||
if self.request_id:
|
||||
Requests.update(self.request_id, request_data)
|
||||
else:
|
||||
request = Requests.create(self.current_user, request_data)
|
||||
self.request_id = request.id
|
||||
|
||||
def map_request_data(self, section, data):
|
||||
if section == "primary_poc":
|
||||
if data.get("am_poc", False):
|
||||
try:
|
||||
request_user_info = self.existing_request.body.get(
|
||||
"information_about_you", {}
|
||||
)
|
||||
except AttributeError:
|
||||
request_user_info = {}
|
||||
|
||||
data = {
|
||||
**data,
|
||||
"dodid_poc": self.current_user.dod_id,
|
||||
"fname_poc": request_user_info.get(
|
||||
"fname_request", self.current_user.first_name
|
||||
),
|
||||
"lname_poc": request_user_info.get(
|
||||
"lname_request", self.current_user.last_name
|
||||
),
|
||||
"email_poc": request_user_info.get(
|
||||
"email_request", self.current_user.email
|
||||
),
|
||||
}
|
||||
return {section: data}
|
@ -1,187 +0,0 @@
|
||||
from flask import g, redirect, render_template, url_for, request as http_request
|
||||
|
||||
from . import requests_bp
|
||||
from atst.domain.requests import Requests
|
||||
from atst.domain.authz import Authorization
|
||||
from atst.routes.requests.jedi_request_flow import JEDIRequestFlow
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
from atst.forms.data import (
|
||||
SERVICE_BRANCHES,
|
||||
ASSISTANCE_ORG_TYPES,
|
||||
DATA_TRANSFER_AMOUNTS,
|
||||
COMPLETION_DATE_RANGES,
|
||||
FUNDING_TYPES,
|
||||
TASK_ORDER_SOURCES,
|
||||
)
|
||||
from atst.utils.flash import formatted_flash as flash
|
||||
|
||||
|
||||
@requests_bp.context_processor
|
||||
def option_data():
|
||||
return {
|
||||
"service_branches": SERVICE_BRANCHES,
|
||||
"assistance_org_types": ASSISTANCE_ORG_TYPES,
|
||||
"data_transfer_amounts": DATA_TRANSFER_AMOUNTS,
|
||||
"completion_date_ranges": COMPLETION_DATE_RANGES,
|
||||
"funding_types": FUNDING_TYPES,
|
||||
"task_order_sources": TASK_ORDER_SOURCES,
|
||||
}
|
||||
|
||||
|
||||
@requests_bp.route("/requests/new/<int:screen>", methods=["GET"])
|
||||
def requests_form_new(screen):
|
||||
jedi_flow = JEDIRequestFlow(screen, request=None, current_user=g.current_user)
|
||||
|
||||
if jedi_flow.is_review_screen and not jedi_flow.can_submit:
|
||||
flash("request_incomplete")
|
||||
|
||||
return render_template(
|
||||
"requests/screen-%d.html" % int(screen),
|
||||
f=jedi_flow.form,
|
||||
data=jedi_flow.current_step_data,
|
||||
screens=jedi_flow.screens,
|
||||
current=screen,
|
||||
next_screen=screen + 1,
|
||||
can_submit=jedi_flow.can_submit,
|
||||
)
|
||||
|
||||
|
||||
@requests_bp.route(
|
||||
"/requests/new/<int:screen>", methods=["GET"], defaults={"request_id": None}
|
||||
)
|
||||
@requests_bp.route("/requests/new/<int:screen>/<string:request_id>", methods=["GET"])
|
||||
def requests_form_update(screen=1, request_id=None):
|
||||
request = (
|
||||
Requests.get(g.current_user, request_id) if request_id is not None else None
|
||||
)
|
||||
jedi_flow = JEDIRequestFlow(
|
||||
screen, request=request, request_id=request_id, current_user=g.current_user
|
||||
)
|
||||
|
||||
if jedi_flow.is_review_screen and not jedi_flow.can_submit:
|
||||
flash("request_incomplete")
|
||||
|
||||
if request.review_comment:
|
||||
flash("request_review_comment", comment=request.review_comment)
|
||||
|
||||
return render_template(
|
||||
"requests/screen-%d.html" % int(screen),
|
||||
f=jedi_flow.form,
|
||||
data=jedi_flow.current_step_data,
|
||||
screens=jedi_flow.screens,
|
||||
current=screen,
|
||||
next_screen=screen + 1,
|
||||
request_id=request_id,
|
||||
jedi_request=jedi_flow.request,
|
||||
can_submit=jedi_flow.can_submit,
|
||||
)
|
||||
|
||||
|
||||
@requests_bp.route(
|
||||
"/requests/new/<int:screen>", methods=["POST"], defaults={"request_id": None}
|
||||
)
|
||||
@requests_bp.route("/requests/new/<int:screen>/<string:request_id>", methods=["POST"])
|
||||
def requests_update(screen=1, request_id=None):
|
||||
screen = int(screen)
|
||||
post_data = http_request.form
|
||||
current_user = g.current_user
|
||||
existing_request = (
|
||||
Requests.get(g.current_user, request_id) if request_id is not None else None
|
||||
)
|
||||
jedi_flow = JEDIRequestFlow(
|
||||
screen,
|
||||
post_data=post_data,
|
||||
request_id=request_id,
|
||||
current_user=current_user,
|
||||
existing_request=existing_request,
|
||||
)
|
||||
|
||||
has_next_screen = jedi_flow.next_screen <= len(jedi_flow.screens)
|
||||
valid = jedi_flow.validate() and jedi_flow.validate_warnings()
|
||||
|
||||
if valid:
|
||||
jedi_flow.create_or_update_request()
|
||||
|
||||
if has_next_screen:
|
||||
where = url_for(
|
||||
"requests.requests_form_update",
|
||||
screen=jedi_flow.next_screen,
|
||||
request_id=jedi_flow.request_id,
|
||||
)
|
||||
else:
|
||||
where = "/requests"
|
||||
return redirect(where)
|
||||
else:
|
||||
rerender_args = dict(
|
||||
f=jedi_flow.form,
|
||||
data=post_data,
|
||||
screens=jedi_flow.screens,
|
||||
current=screen,
|
||||
next_screen=jedi_flow.next_screen,
|
||||
request_id=jedi_flow.request_id,
|
||||
)
|
||||
return render_template("requests/screen-%d.html" % int(screen), **rerender_args)
|
||||
|
||||
|
||||
@requests_bp.route("/requests/submit/<string:request_id>", methods=["POST"])
|
||||
def requests_submit(request_id=None):
|
||||
request = Requests.get(g.current_user, request_id)
|
||||
Requests.submit(request)
|
||||
|
||||
if request.status == RequestStatus.PENDING_FINANCIAL_VERIFICATION:
|
||||
modal = "pendingFinancialVerification"
|
||||
else:
|
||||
modal = "pendingCCPOAcceptance"
|
||||
|
||||
return redirect(url_for("requests.requests_index", modal=modal))
|
||||
|
||||
|
||||
@requests_bp.route("/requests/details/<string:request_id>", methods=["GET"])
|
||||
def view_request_details(request_id=None):
|
||||
request = Requests.get(g.current_user, request_id)
|
||||
requires_fv_action = (
|
||||
request.is_pending_financial_verification
|
||||
or request.is_pending_financial_verification_changes
|
||||
)
|
||||
|
||||
data = request.body
|
||||
if request.has_financial_data:
|
||||
data["legacy_task_order"] = request.legacy_task_order.to_dictionary()
|
||||
|
||||
return render_template(
|
||||
"requests/details.html",
|
||||
data=data,
|
||||
jedi_request=request,
|
||||
requires_fv_action=requires_fv_action,
|
||||
)
|
||||
|
||||
|
||||
@requests_bp.route("/requests/edit/<string:request_id>")
|
||||
def edit(request_id):
|
||||
user = g.current_user
|
||||
request = Requests.get(user, request_id)
|
||||
is_ccpo = Authorization.is_ccpo(user)
|
||||
|
||||
redirect_url = ""
|
||||
|
||||
if request.creator == user:
|
||||
if request.is_pending_financial_verification:
|
||||
redirect_url = url_for(
|
||||
"requests.financial_verification", request_id=request.id
|
||||
)
|
||||
elif request.is_pending_financial_verification_changes:
|
||||
redirect_url = url_for(
|
||||
"requests.financial_verification", request_id=request.id, extended=True
|
||||
)
|
||||
elif request.is_approved:
|
||||
redirect_url = url_for(
|
||||
"requests.view_request_details", request_id=request.id
|
||||
)
|
||||
else:
|
||||
redirect_url = url_for(
|
||||
"requests.requests_form_update", screen=1, request_id=request.id
|
||||
)
|
||||
elif is_ccpo:
|
||||
redirect_url = url_for("requests.approval", request_id=request.id)
|
||||
|
||||
return redirect(redirect_url)
|
@ -1,39 +0,0 @@
|
||||
import re
|
||||
from flask import url_for
|
||||
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
|
||||
from tests.factories import RequestFactory, LegacyTaskOrderFactory, UserFactory
|
||||
|
||||
|
||||
def test_can_show_financial_data(client, user_session):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
|
||||
legacy_task_order = LegacyTaskOrderFactory.create()
|
||||
request = RequestFactory.create_with_status(
|
||||
status=RequestStatus.PENDING_CCPO_APPROVAL,
|
||||
legacy_task_order=legacy_task_order,
|
||||
creator=user,
|
||||
)
|
||||
response = client.get(
|
||||
url_for("requests.view_request_details", request_id=request.id)
|
||||
)
|
||||
|
||||
body = response.data.decode()
|
||||
assert re.search(r">\s+Financial Verification\s+<", body)
|
||||
|
||||
|
||||
def test_can_not_show_financial_data(client, user_session):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
|
||||
request = RequestFactory.create_with_status(
|
||||
status=RequestStatus.PENDING_CCPO_ACCEPTANCE, creator=user
|
||||
)
|
||||
response = client.get(
|
||||
url_for("requests.view_request_details", request_id=request.id)
|
||||
)
|
||||
|
||||
body = response.data.decode()
|
||||
assert not re.search(r">\s+Financial Verification\s+<", body)
|
@ -1,52 +0,0 @@
|
||||
from tests.factories import UserFactory, RequestFactory
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
|
||||
|
||||
def test_creator_pending_finver(client, user_session):
|
||||
request = RequestFactory.create_with_status(
|
||||
RequestStatus.PENDING_FINANCIAL_VERIFICATION
|
||||
)
|
||||
user_session(request.creator)
|
||||
response = client.get(
|
||||
"/requests/edit/{}".format(request.id), follow_redirects=False
|
||||
)
|
||||
assert "verify" in response.location
|
||||
|
||||
|
||||
def test_creator_pending_finver_changes(client, user_session):
|
||||
request = RequestFactory.create_with_status(
|
||||
RequestStatus.CHANGES_REQUESTED_TO_FINVER
|
||||
)
|
||||
user_session(request.creator)
|
||||
response = client.get(
|
||||
"/requests/edit/{}".format(request.id), follow_redirects=False
|
||||
)
|
||||
assert "verify" in response.location
|
||||
|
||||
|
||||
def test_creator_approved(client, user_session):
|
||||
request = RequestFactory.create_with_status(RequestStatus.APPROVED)
|
||||
user_session(request.creator)
|
||||
response = client.get(
|
||||
"/requests/edit/{}".format(request.id), follow_redirects=False
|
||||
)
|
||||
assert "details" in response.location
|
||||
|
||||
|
||||
def test_creator_approved(client, user_session):
|
||||
request = RequestFactory.create_with_status(RequestStatus.STARTED)
|
||||
user_session(request.creator)
|
||||
response = client.get(
|
||||
"/requests/edit/{}".format(request.id), follow_redirects=False
|
||||
)
|
||||
assert "new" in response.location
|
||||
|
||||
|
||||
def test_ccpo(client, user_session):
|
||||
ccpo = UserFactory.from_atat_role("ccpo")
|
||||
request = RequestFactory.create_with_status(RequestStatus.STARTED)
|
||||
user_session(ccpo)
|
||||
response = client.get(
|
||||
"/requests/edit/{}".format(request.id), follow_redirects=False
|
||||
)
|
||||
assert "approval" in response.location
|
@ -1,249 +0,0 @@
|
||||
import datetime
|
||||
import re
|
||||
import pytest
|
||||
from tests.factories import (
|
||||
RequestFactory,
|
||||
UserFactory,
|
||||
RequestRevisionFactory,
|
||||
RequestStatusEventFactory,
|
||||
RequestReviewFactory,
|
||||
)
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
from atst.domain.roles import Roles
|
||||
from atst.domain.requests import Requests
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from tests.assert_util import dict_contains
|
||||
|
||||
ERROR_CLASS = "alert--error"
|
||||
|
||||
|
||||
def test_submit_invalid_request_form(monkeypatch, client, user_session):
|
||||
user_session()
|
||||
response = client.post(
|
||||
"/requests/new/1",
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data="total_ram=5",
|
||||
)
|
||||
assert re.search(ERROR_CLASS, response.data.decode())
|
||||
|
||||
|
||||
def test_submit_valid_request_form(monkeypatch, client, user_session):
|
||||
user_session()
|
||||
monkeypatch.setattr(
|
||||
"atst.forms.new_request.DetailsOfUseForm.validate", lambda s: True
|
||||
)
|
||||
|
||||
response = client.post(
|
||||
"/requests/new/1",
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data="meaning=42",
|
||||
)
|
||||
assert "/requests/new/2" in response.headers.get("Location")
|
||||
|
||||
|
||||
def test_owner_can_view_request(client, user_session):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
request = RequestFactory.create(creator=user)
|
||||
|
||||
response = client.get(
|
||||
"/requests/new/1/{}".format(request.id), follow_redirects=True
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_non_owner_cannot_view_request(client, user_session):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
request = RequestFactory.create()
|
||||
|
||||
response = client.get(
|
||||
"/requests/new/1/{}".format(request.id), follow_redirects=True
|
||||
)
|
||||
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_ccpo_can_view_request(client, user_session):
|
||||
ccpo = Roles.get("ccpo")
|
||||
user = UserFactory.create(atat_role=ccpo)
|
||||
user_session(user)
|
||||
request = RequestFactory.create()
|
||||
|
||||
response = client.get(
|
||||
"/requests/new/1/{}".format(request.id), follow_redirects=True
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="create request flow no longer active")
|
||||
def test_nonexistent_request(client, user_session):
|
||||
user_session()
|
||||
response = client.get("/requests/new/1/foo", follow_redirects=True)
|
||||
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_creator_info_is_autopopulated_for_existing_request(
|
||||
monkeypatch, client, user_session
|
||||
):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
request = RequestFactory.create(creator=user, initial_revision={})
|
||||
|
||||
response = client.get("/requests/new/2/{}".format(request.id))
|
||||
body = response.data.decode()
|
||||
prepopulated_values = [
|
||||
"first_name",
|
||||
"last_name",
|
||||
"email",
|
||||
"phone_number",
|
||||
"date_latest_training",
|
||||
]
|
||||
for attr in prepopulated_values:
|
||||
value = getattr(user, attr)
|
||||
if isinstance(value, datetime.date):
|
||||
value = value.strftime("%m/%d/%Y")
|
||||
assert "initial-value='{}'".format(value) in body
|
||||
|
||||
|
||||
def test_creator_info_is_autopopulated_for_new_request(
|
||||
monkeypatch, client, user_session
|
||||
):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
|
||||
response = client.get("/requests/new/2")
|
||||
body = response.data.decode()
|
||||
assert "initial-value='{}'".format(user.first_name) in body
|
||||
assert "initial-value='{}'".format(user.last_name) in body
|
||||
assert "initial-value='{}'".format(user.email) in body
|
||||
|
||||
|
||||
def test_non_creator_info_is_not_autopopulated(monkeypatch, client, user_session):
|
||||
user = UserFactory.create()
|
||||
creator = UserFactory.create()
|
||||
user_session(user)
|
||||
request = RequestFactory.create(creator=creator, initial_revision={})
|
||||
|
||||
response = client.get("/requests/new/2/{}".format(request.id))
|
||||
body = response.data.decode()
|
||||
assert not user.first_name in body
|
||||
assert not user.last_name in body
|
||||
assert not user.email in body
|
||||
|
||||
|
||||
def test_am_poc_causes_poc_to_be_autopopulated(client, user_session):
|
||||
creator = UserFactory.create()
|
||||
user_session(creator)
|
||||
request = RequestFactory.create(creator=creator, initial_revision={})
|
||||
client.post(
|
||||
"/requests/new/3/{}".format(request.id),
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data="am_poc=yes",
|
||||
)
|
||||
request = Requests.get(creator, request.id)
|
||||
assert request.body["primary_poc"]["dodid_poc"] == creator.dod_id
|
||||
|
||||
|
||||
def test_not_am_poc_requires_poc_info_to_be_completed(client, user_session):
|
||||
creator = UserFactory.create()
|
||||
user_session(creator)
|
||||
request = RequestFactory.create(creator=creator, initial_revision={})
|
||||
response = client.post(
|
||||
"/requests/new/3/{}".format(request.id),
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data="am_poc=no",
|
||||
follow_redirects=True,
|
||||
)
|
||||
assert ERROR_CLASS in response.data.decode()
|
||||
|
||||
|
||||
def test_not_am_poc_allows_user_to_fill_in_poc_info(client, user_session):
|
||||
creator = UserFactory.create()
|
||||
user_session(creator)
|
||||
request = RequestFactory.create(creator=creator, initial_revision={})
|
||||
poc_input = {
|
||||
"am_poc": "no",
|
||||
"fname_poc": "test",
|
||||
"lname_poc": "user",
|
||||
"email_poc": "test.user@mail.com",
|
||||
"dodid_poc": "1234567890",
|
||||
}
|
||||
response = client.post(
|
||||
"/requests/new/3/{}".format(request.id),
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data=urlencode(poc_input),
|
||||
)
|
||||
assert ERROR_CLASS not in response.data.decode()
|
||||
|
||||
|
||||
def test_poc_details_can_be_autopopulated_on_new_request(client, user_session):
|
||||
creator = UserFactory.create()
|
||||
user_session(creator)
|
||||
response = client.post(
|
||||
"/requests/new/3",
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data="am_poc=yes",
|
||||
)
|
||||
request_id = response.headers["Location"].split("/")[-1]
|
||||
request = Requests.get(creator, request_id)
|
||||
|
||||
assert request.body["primary_poc"]["dodid_poc"] == creator.dod_id
|
||||
|
||||
|
||||
def test_poc_autofill_checks_information_about_you_form_first(client, user_session):
|
||||
creator = UserFactory.create()
|
||||
user_session(creator)
|
||||
request = RequestFactory.create(
|
||||
creator=creator,
|
||||
initial_revision=dict(
|
||||
fname_request="Alice",
|
||||
lname_request="Adams",
|
||||
email_request="alice.adams@mail.mil",
|
||||
),
|
||||
)
|
||||
poc_input = {"am_poc": "yes"}
|
||||
client.post(
|
||||
"/requests/new/3/{}".format(request.id),
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data=urlencode(poc_input),
|
||||
)
|
||||
request = Requests.get(creator, request.id)
|
||||
assert dict_contains(
|
||||
request.body["primary_poc"],
|
||||
{
|
||||
"fname_poc": "Alice",
|
||||
"lname_poc": "Adams",
|
||||
"email_poc": "alice.adams@mail.mil",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def test_can_review_data(user_session, client):
|
||||
creator = UserFactory.create()
|
||||
user_session(creator)
|
||||
request = RequestFactory.create(creator=creator)
|
||||
response = client.get("/requests/new/4/{}".format(request.id))
|
||||
body = response.data.decode()
|
||||
# assert a sampling of the request data is on the review page
|
||||
assert request.body["primary_poc"]["fname_poc"] in body
|
||||
assert request.body["information_about_you"]["email_request"] in body
|
||||
|
||||
|
||||
def test_displays_ccpo_review_comment(user_session, client):
|
||||
creator = UserFactory.create()
|
||||
ccpo = UserFactory.from_atat_role("ccpo")
|
||||
user_session(creator)
|
||||
request = RequestFactory.create(creator=creator)
|
||||
request = Requests.set_status(request, RequestStatus.CHANGES_REQUESTED)
|
||||
review_comment = "add all of the correct info, instead of the incorrect info"
|
||||
RequestReviewFactory.create(
|
||||
reviewer=ccpo, comment=review_comment, status=request.status_events[-1]
|
||||
)
|
||||
response = client.get("/requests/new/1/{}".format(request.id))
|
||||
body = response.data.decode()
|
||||
assert review_comment in body
|
@ -1,42 +0,0 @@
|
||||
import pytest
|
||||
from tests.factories import RequestFactory
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
|
||||
|
||||
def _mock_func(*args, **kwargs):
|
||||
return RequestFactory.create()
|
||||
|
||||
|
||||
def test_submit_reviewed_request(monkeypatch, client, user_session):
|
||||
user_session()
|
||||
monkeypatch.setattr("atst.domain.requests.Requests.get", _mock_func)
|
||||
monkeypatch.setattr("atst.domain.requests.Requests.submit", _mock_func)
|
||||
monkeypatch.setattr("atst.models.request.Request.status", "pending")
|
||||
# this just needs to send a known invalid form value
|
||||
response = client.post(
|
||||
"/requests/submit/1",
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data="",
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert "/requests" in response.headers["Location"]
|
||||
assert "modal=pendingCCPOAcceptance" in response.headers["Location"]
|
||||
|
||||
|
||||
def test_submit_autoapproved_reviewed_request(monkeypatch, client, user_session):
|
||||
user_session()
|
||||
monkeypatch.setattr("atst.domain.requests.Requests.get", _mock_func)
|
||||
monkeypatch.setattr("atst.domain.requests.Requests.submit", _mock_func)
|
||||
monkeypatch.setattr(
|
||||
"atst.models.request.Request.status",
|
||||
RequestStatus.PENDING_FINANCIAL_VERIFICATION,
|
||||
)
|
||||
response = client.post(
|
||||
"/requests/submit/1",
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data="",
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert (
|
||||
"/requests?modal=pendingFinancialVerification" in response.headers["Location"]
|
||||
)
|
@ -1,201 +0,0 @@
|
||||
import os
|
||||
from flask import url_for
|
||||
|
||||
from atst.models.attachment import Attachment
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
from atst.domain.roles import Roles
|
||||
|
||||
from tests.factories import (
|
||||
RequestFactory,
|
||||
LegacyTaskOrderFactory,
|
||||
UserFactory,
|
||||
RequestReviewFactory,
|
||||
RequestStatusEventFactory,
|
||||
)
|
||||
|
||||
|
||||
def test_ccpo_can_view_approval(user_session, client):
|
||||
ccpo = Roles.get("ccpo")
|
||||
user = UserFactory.create(atat_role=ccpo)
|
||||
user_session(user)
|
||||
|
||||
request = RequestFactory.create()
|
||||
response = client.get(url_for("requests.approval", request_id=request.id))
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_ccpo_prepopulated_as_mission_owner(user_session, client):
|
||||
user = UserFactory.from_atat_role("ccpo")
|
||||
user_session(user)
|
||||
|
||||
request = RequestFactory.create_with_status(RequestStatus.PENDING_CCPO_ACCEPTANCE)
|
||||
response = client.get(url_for("requests.approval", request_id=request.id))
|
||||
|
||||
body = response.data.decode()
|
||||
assert user.first_name in body
|
||||
assert user.last_name in body
|
||||
|
||||
|
||||
def test_non_ccpo_cannot_view_approval(user_session, client):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
|
||||
request = RequestFactory.create(creator=user)
|
||||
response = client.get(url_for("requests.approval", request_id=request.id))
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def prepare_request_pending_approval(creator, pdf_attachment=None):
|
||||
legacy_task_order = LegacyTaskOrderFactory.create(
|
||||
number="abc123", pdf=pdf_attachment
|
||||
)
|
||||
return RequestFactory.create_with_status(
|
||||
status=RequestStatus.PENDING_CCPO_APPROVAL,
|
||||
legacy_task_order=legacy_task_order,
|
||||
creator=creator,
|
||||
)
|
||||
|
||||
|
||||
def test_ccpo_sees_pdf_link(user_session, client, pdf_upload):
|
||||
ccpo = UserFactory.from_atat_role("ccpo")
|
||||
user_session(ccpo)
|
||||
|
||||
attachment = Attachment.attach(pdf_upload)
|
||||
request = prepare_request_pending_approval(ccpo, pdf_attachment=attachment)
|
||||
|
||||
response = client.get(url_for("requests.approval", request_id=request.id))
|
||||
download_url = url_for("requests.task_order_pdf_download", request_id=request.id)
|
||||
|
||||
body = response.data.decode()
|
||||
assert download_url in body
|
||||
|
||||
|
||||
def test_ccpo_does_not_see_pdf_link_if_no_pdf(user_session, client, pdf_upload):
|
||||
ccpo = UserFactory.from_atat_role("ccpo")
|
||||
user_session(ccpo)
|
||||
|
||||
request = prepare_request_pending_approval(ccpo)
|
||||
|
||||
response = client.get(url_for("requests.approval", request_id=request.id))
|
||||
download_url = url_for("requests.task_order_pdf_download", request_id=request.id)
|
||||
|
||||
body = response.data.decode()
|
||||
assert download_url not in body
|
||||
|
||||
|
||||
def test_task_order_download(app, client, user_session, pdf_upload):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
|
||||
attachment = Attachment.attach(pdf_upload)
|
||||
legacy_task_order = LegacyTaskOrderFactory.create(number="abc123", pdf=attachment)
|
||||
request = RequestFactory.create(legacy_task_order=legacy_task_order, creator=user)
|
||||
|
||||
# ensure that real data for pdf upload has been flushed to disk
|
||||
pdf_upload.seek(0)
|
||||
pdf_content = pdf_upload.read()
|
||||
pdf_upload.close()
|
||||
full_path = os.path.join(
|
||||
app.config.get("STORAGE_CONTAINER"), attachment.object_name
|
||||
)
|
||||
with open(full_path, "wb") as output_file:
|
||||
output_file.write(pdf_content)
|
||||
output_file.flush()
|
||||
|
||||
response = client.get(
|
||||
url_for("requests.task_order_pdf_download", request_id=request.id)
|
||||
)
|
||||
assert response.data == pdf_content
|
||||
|
||||
|
||||
def test_task_order_download_does_not_exist(client, user_session):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
request = RequestFactory.create(creator=user)
|
||||
response = client.get(
|
||||
url_for("requests.task_order_pdf_download", request_id=request.id)
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_can_submit_request_approval(client, user_session):
|
||||
user = UserFactory.from_atat_role("ccpo")
|
||||
user_session(user)
|
||||
request = RequestFactory.create_with_status(
|
||||
status=RequestStatus.PENDING_CCPO_ACCEPTANCE
|
||||
)
|
||||
review_data = RequestReviewFactory.dictionary()
|
||||
review_data["review"] = "approving"
|
||||
response = client.post(
|
||||
url_for("requests.submit_approval", request_id=request.id), data=review_data
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert request.status == RequestStatus.PENDING_FINANCIAL_VERIFICATION
|
||||
|
||||
|
||||
def test_can_submit_request_denial(client, user_session):
|
||||
user = UserFactory.from_atat_role("ccpo")
|
||||
user_session(user)
|
||||
request = RequestFactory.create_with_status(
|
||||
status=RequestStatus.PENDING_CCPO_ACCEPTANCE
|
||||
)
|
||||
review_data = RequestReviewFactory.dictionary()
|
||||
review_data["review"] = "denying"
|
||||
response = client.post(
|
||||
url_for("requests.submit_approval", request_id=request.id), data=review_data
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert request.status == RequestStatus.CHANGES_REQUESTED
|
||||
|
||||
|
||||
def test_ccpo_user_can_comment_on_request(client, user_session):
|
||||
user = UserFactory.from_atat_role("ccpo")
|
||||
user_session(user)
|
||||
request = RequestFactory.create_with_status(
|
||||
status=RequestStatus.PENDING_CCPO_ACCEPTANCE
|
||||
)
|
||||
assert len(request.internal_comments) == 0
|
||||
|
||||
comment_text = "This is the greatest request in the history of requests"
|
||||
comment_form_data = {"text": comment_text}
|
||||
response = client.post(
|
||||
url_for("requests.create_internal_comment", request_id=request.id),
|
||||
data=comment_form_data,
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert len(request.internal_comments) == 1
|
||||
assert request.internal_comments[0].text == comment_text
|
||||
|
||||
|
||||
def test_comment_text_is_required(client, user_session):
|
||||
user = UserFactory.from_atat_role("ccpo")
|
||||
user_session(user)
|
||||
request = RequestFactory.create_with_status(
|
||||
status=RequestStatus.PENDING_CCPO_ACCEPTANCE
|
||||
)
|
||||
assert len(request.internal_comments) == 0
|
||||
|
||||
comment_form_data = {"text": ""}
|
||||
response = client.post(
|
||||
url_for("requests.create_internal_comment", request_id=request.id),
|
||||
data=comment_form_data,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert len(request.internal_comments) == 0
|
||||
|
||||
|
||||
def test_other_user_cannot_comment_on_request(client, user_session):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
request = RequestFactory.create_with_status(
|
||||
status=RequestStatus.PENDING_CCPO_ACCEPTANCE
|
||||
)
|
||||
|
||||
comment_text = "What is this even"
|
||||
comment_form_data = {"text": comment_text}
|
||||
response = client.post(
|
||||
url_for("requests.create_internal_comment", request_id=request.id),
|
||||
data=comment_form_data,
|
||||
)
|
||||
|
||||
assert response.status_code == 404
|
@ -1,543 +0,0 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
from flask import url_for
|
||||
import datetime
|
||||
|
||||
from atst.eda_client import MockEDAClient
|
||||
from atst.routes.requests.financial_verification import (
|
||||
GetFinancialVerificationForm,
|
||||
UpdateFinancialVerification,
|
||||
SaveFinancialVerificationDraft,
|
||||
)
|
||||
|
||||
from tests.mocks import MOCK_VALID_PE_ID
|
||||
from tests.factories import RequestFactory, UserFactory, LegacyTaskOrderFactory
|
||||
from atst.forms.exceptions import FormValidationError
|
||||
from atst.domain.requests.financial_verification import (
|
||||
PENumberValidator,
|
||||
TaskOrderNumberValidator,
|
||||
)
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
from atst.models.attachment import Attachment
|
||||
from atst.domain.requests.query import RequestsQuery
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fv_data():
|
||||
return {
|
||||
"request-pe_id": "123",
|
||||
"legacy_task_order-number": MockEDAClient.MOCK_CONTRACT_NUMBER,
|
||||
"request-fname_co": "Contracting",
|
||||
"request-lname_co": "Officer",
|
||||
"request-email_co": "jane@mail.mil",
|
||||
"request-office_co": "WHS",
|
||||
"request-fname_cor": "Officer",
|
||||
"request-lname_cor": "Representative",
|
||||
"request-email_cor": "jane@mail.mil",
|
||||
"request-office_cor": "WHS",
|
||||
"request-uii_ids": "1234",
|
||||
"request-treasury_code": "00123456",
|
||||
"request-ba_code": "02A",
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def e_fv_data(pdf_upload):
|
||||
return {
|
||||
"legacy_task_order-funding_type": "RDTE",
|
||||
"legacy_task_order-funding_type_other": "other",
|
||||
"legacy_task_order-expiration_date": "1/1/{}".format(
|
||||
datetime.date.today().year + 1
|
||||
),
|
||||
"legacy_task_order-clin_0001": "50000",
|
||||
"legacy_task_order-clin_0003": "13000",
|
||||
"legacy_task_order-clin_1001": "30000",
|
||||
"legacy_task_order-clin_1003": "7000",
|
||||
"legacy_task_order-clin_2001": "30000",
|
||||
"legacy_task_order-clin_2003": "7000",
|
||||
"legacy_task_order-pdf": pdf_upload,
|
||||
}
|
||||
|
||||
|
||||
MANUAL_TO_NUMBER = "DCA10096D0051"
|
||||
|
||||
|
||||
TrueValidator = MagicMock()
|
||||
TrueValidator.validate = MagicMock(return_value=True)
|
||||
|
||||
FalseValidator = MagicMock()
|
||||
FalseValidator.validate = MagicMock(return_value=False)
|
||||
|
||||
|
||||
def test_update_fv(fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {**fv_data, "pe_id": MOCK_VALID_PE_ID}
|
||||
|
||||
updated_request = UpdateFinancialVerification(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=False
|
||||
).execute()
|
||||
|
||||
assert updated_request.is_pending_ccpo_approval
|
||||
|
||||
|
||||
def test_update_fv_re_enter_pe_number(fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {**fv_data, "pe_id": "0101228M"}
|
||||
update_fv = UpdateFinancialVerification(
|
||||
PENumberValidator(), TrueValidator, user, request, data, is_extended=False
|
||||
)
|
||||
|
||||
with pytest.raises(FormValidationError):
|
||||
update_fv.execute()
|
||||
updated_request = update_fv.execute()
|
||||
|
||||
assert updated_request.is_pending_ccpo_approval
|
||||
|
||||
|
||||
def test_update_fv_invalid_task_order_number(fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {**fv_data, "legacy_task_order-number": MANUAL_TO_NUMBER}
|
||||
update_fv = UpdateFinancialVerification(
|
||||
TrueValidator,
|
||||
TaskOrderNumberValidator(),
|
||||
user,
|
||||
request,
|
||||
data,
|
||||
is_extended=False,
|
||||
)
|
||||
|
||||
with pytest.raises(FormValidationError):
|
||||
update_fv.execute()
|
||||
|
||||
|
||||
def test_draft_without_pe_id(fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {"request-uii_ids": "1234"}
|
||||
assert SaveFinancialVerificationDraft(
|
||||
PENumberValidator(),
|
||||
TaskOrderNumberValidator(),
|
||||
user,
|
||||
request,
|
||||
data,
|
||||
is_extended=False,
|
||||
).execute()
|
||||
|
||||
|
||||
def test_update_fv_extended(fv_data, e_fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {**fv_data, **e_fv_data}
|
||||
update_fv = UpdateFinancialVerification(
|
||||
TrueValidator, TaskOrderNumberValidator(), user, request, data, is_extended=True
|
||||
)
|
||||
|
||||
assert update_fv.execute()
|
||||
|
||||
|
||||
def test_update_fv_extended_does_not_validate_task_order(fv_data, e_fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {**fv_data, **e_fv_data, "legacy_task_order-number": "abc123"}
|
||||
update_fv = UpdateFinancialVerification(
|
||||
TrueValidator, TaskOrderNumberValidator(), user, request, data, is_extended=True
|
||||
)
|
||||
|
||||
assert update_fv.execute()
|
||||
|
||||
|
||||
def test_update_fv_missing_extended_data(fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
update_fv = UpdateFinancialVerification(
|
||||
TrueValidator,
|
||||
TaskOrderNumberValidator(),
|
||||
user,
|
||||
request,
|
||||
fv_data,
|
||||
is_extended=True,
|
||||
)
|
||||
|
||||
with pytest.raises(FormValidationError):
|
||||
update_fv.execute()
|
||||
|
||||
|
||||
def test_update_fv_submission(fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
updated_request = UpdateFinancialVerification(
|
||||
TrueValidator, TrueValidator, user, request, fv_data
|
||||
).execute()
|
||||
assert updated_request
|
||||
|
||||
|
||||
def test_save_empty_draft():
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
save_draft = SaveFinancialVerificationDraft(
|
||||
TrueValidator, TrueValidator, user, request, {}, is_extended=False
|
||||
)
|
||||
|
||||
assert save_draft.execute()
|
||||
|
||||
|
||||
def test_save_draft_with_ba_code():
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {"ba_code": "02A"}
|
||||
save_draft = SaveFinancialVerificationDraft(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=False
|
||||
)
|
||||
|
||||
assert save_draft.execute()
|
||||
|
||||
|
||||
def test_save_draft_allows_invalid_data():
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {
|
||||
"legacy_task_order-number": MANUAL_TO_NUMBER,
|
||||
"request-pe_id": "123",
|
||||
"request-ba_code": "a",
|
||||
}
|
||||
|
||||
assert SaveFinancialVerificationDraft(
|
||||
PENumberValidator(),
|
||||
TaskOrderNumberValidator(),
|
||||
user,
|
||||
request,
|
||||
data,
|
||||
is_extended=True,
|
||||
).execute()
|
||||
|
||||
|
||||
def test_save_draft_and_then_submit():
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {"ba_code": "02A"}
|
||||
updated_request = SaveFinancialVerificationDraft(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=False
|
||||
).execute()
|
||||
|
||||
with pytest.raises(FormValidationError):
|
||||
UpdateFinancialVerification(
|
||||
TrueValidator, TrueValidator, user, updated_request, data
|
||||
).execute()
|
||||
|
||||
|
||||
def test_updated_request_has_pdf(fv_data, e_fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {**fv_data, **e_fv_data, "legacy_task_order-number": MANUAL_TO_NUMBER}
|
||||
updated_request = UpdateFinancialVerification(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=True
|
||||
).execute()
|
||||
assert updated_request.legacy_task_order.pdf
|
||||
|
||||
|
||||
def test_can_save_draft_with_just_pdf(e_fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {"legacy_task_order-pdf": e_fv_data["legacy_task_order-pdf"]}
|
||||
SaveFinancialVerificationDraft(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=True
|
||||
).execute()
|
||||
|
||||
form = GetFinancialVerificationForm(user, request, is_extended=True).execute()
|
||||
assert form.legacy_task_order.pdf
|
||||
|
||||
|
||||
def test_task_order_info_present_in_extended_form(fv_data, e_fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {
|
||||
"legacy_task_order-clin_0001": "1",
|
||||
"legacy_task_order-number": fv_data["legacy_task_order-number"],
|
||||
}
|
||||
SaveFinancialVerificationDraft(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=True
|
||||
).execute()
|
||||
|
||||
form = GetFinancialVerificationForm(user, request, is_extended=True).execute()
|
||||
assert form.legacy_task_order.clin_0001.data
|
||||
|
||||
|
||||
def test_update_ignores_empty_values(fv_data, e_fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {**fv_data, **e_fv_data, "legacy_task_order-funding_type": ""}
|
||||
SaveFinancialVerificationDraft(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=True
|
||||
).execute()
|
||||
|
||||
|
||||
def test_can_save_draft_with_funding_type(fv_data, e_fv_data):
|
||||
request = RequestFactory.create()
|
||||
user = UserFactory.create()
|
||||
data = {
|
||||
"legacy_task_order-number": fv_data["legacy_task_order-number"],
|
||||
"legacy_task_order-funding_type": e_fv_data["legacy_task_order-funding_type"],
|
||||
}
|
||||
updated_request = SaveFinancialVerificationDraft(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=False
|
||||
).execute()
|
||||
|
||||
assert updated_request.legacy_task_order.funding_type
|
||||
|
||||
|
||||
def test_update_fv_route(client, user_session, fv_data):
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
user_session(user)
|
||||
response = client.post(
|
||||
url_for("requests.financial_verification", request_id=request.id),
|
||||
data=fv_data,
|
||||
follow_redirects=False,
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_save_fv_draft_route(client, user_session, fv_data):
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
user_session(user)
|
||||
response = client.post(
|
||||
url_for("requests.save_financial_verification_draft", request_id=request.id),
|
||||
data=fv_data,
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_get_fv_form_route(client, user_session, fv_data):
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
user_session(user)
|
||||
response = client.get(
|
||||
url_for("requests.financial_verification", request_id=request.id),
|
||||
data=fv_data,
|
||||
follow_redirects=False,
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_manual_task_order_triggers_extended_form(
|
||||
client, user_session, fv_data, e_fv_data
|
||||
):
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
|
||||
data = {**fv_data, **e_fv_data, "legacy_task_order-number": MANUAL_TO_NUMBER}
|
||||
|
||||
UpdateFinancialVerification(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=True
|
||||
).execute()
|
||||
|
||||
user_session(user)
|
||||
response = client.get(
|
||||
url_for("requests.financial_verification", request_id=request.id),
|
||||
data=fv_data,
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert "extended" in response.headers["Location"]
|
||||
|
||||
|
||||
def test_manual_to_does_not_trigger_approval(client, user_session, fv_data, e_fv_data):
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
data = {
|
||||
**fv_data,
|
||||
**e_fv_data,
|
||||
"legacy_task_order-number": MANUAL_TO_NUMBER,
|
||||
"request-pe_id": "0101228N",
|
||||
}
|
||||
user_session(user)
|
||||
client.post(
|
||||
url_for(
|
||||
"requests.financial_verification", request_id=request.id, extended=True
|
||||
),
|
||||
data=data,
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
updated_request = RequestsQuery.get(request.id)
|
||||
assert updated_request.status != RequestStatus.APPROVED
|
||||
|
||||
|
||||
def test_eda_task_order_does_trigger_approval(client, user_session, fv_data, e_fv_data):
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
data = {
|
||||
**fv_data,
|
||||
**e_fv_data,
|
||||
"legacy_task_order-number": MockEDAClient.MOCK_CONTRACT_NUMBER,
|
||||
"request-pe_id": "0101228N",
|
||||
}
|
||||
user_session(user)
|
||||
client.post(
|
||||
url_for(
|
||||
"requests.financial_verification", request_id=request.id, extended=True
|
||||
),
|
||||
data=data,
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
updated_request = RequestsQuery.get(request.id)
|
||||
assert updated_request.status == RequestStatus.APPROVED
|
||||
|
||||
|
||||
def test_attachment_on_non_extended_form(client, user_session, fv_data, e_fv_data):
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
data = {
|
||||
**fv_data,
|
||||
**e_fv_data,
|
||||
"legacy_task_order-number": MockEDAClient.MOCK_CONTRACT_NUMBER,
|
||||
"request-pe_id": "0101228N",
|
||||
}
|
||||
user_session(user)
|
||||
client.post(
|
||||
url_for(
|
||||
"requests.financial_verification", request_id=request.id, extended=True
|
||||
),
|
||||
data=data,
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
response = client.get(
|
||||
url_for("requests.financial_verification", request_id=request.id)
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_task_order_number_persists_in_form(fv_data, e_fv_data):
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
data = {
|
||||
**fv_data,
|
||||
"legacy_task_order-number": MANUAL_TO_NUMBER,
|
||||
"request-pe_id": "0101228N",
|
||||
}
|
||||
|
||||
try:
|
||||
UpdateFinancialVerification(
|
||||
TrueValidator, FalseValidator, user, request, data, is_extended=False
|
||||
).execute()
|
||||
except FormValidationError:
|
||||
pass
|
||||
|
||||
form = GetFinancialVerificationForm(user, request, is_extended=True).execute()
|
||||
assert form.legacy_task_order.number.data == MANUAL_TO_NUMBER
|
||||
|
||||
|
||||
def test_can_submit_once_to_details_are_entered(fv_data, e_fv_data):
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
data = {
|
||||
**fv_data,
|
||||
"legacy_task_order-number": MANUAL_TO_NUMBER,
|
||||
"request-pe_id": "0101228N",
|
||||
}
|
||||
|
||||
try:
|
||||
UpdateFinancialVerification(
|
||||
TrueValidator, FalseValidator, user, request, data, is_extended=False
|
||||
).execute()
|
||||
except FormValidationError:
|
||||
pass
|
||||
|
||||
data = {
|
||||
**fv_data,
|
||||
**e_fv_data,
|
||||
"legacy_task_order-number": MANUAL_TO_NUMBER,
|
||||
"request-pe_id": "0101228N",
|
||||
}
|
||||
assert UpdateFinancialVerification(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=True
|
||||
).execute()
|
||||
|
||||
|
||||
def test_existing_task_order_with_pdf(fv_data, e_fv_data, client, user_session):
|
||||
# Use finver route to create initial TO #1, complete with PDF
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
data = {**fv_data, **e_fv_data, "legacy_task_order-number": MANUAL_TO_NUMBER}
|
||||
UpdateFinancialVerification(
|
||||
TrueValidator, TaskOrderNumberValidator(), user, request, data, is_extended=True
|
||||
).execute()
|
||||
|
||||
# Save draft on a new finver form, but with same number as TO #1
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
data = {"legacy_task_order-number": MANUAL_TO_NUMBER}
|
||||
SaveFinancialVerificationDraft(
|
||||
TrueValidator,
|
||||
TaskOrderNumberValidator(),
|
||||
user,
|
||||
request,
|
||||
data,
|
||||
is_extended=False,
|
||||
).execute()
|
||||
|
||||
# Get finver form
|
||||
user_session(user)
|
||||
response = client.get(
|
||||
url_for("requests.financial_verification", request_id=request.id),
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_pdf_clearing(fv_data, e_fv_data, pdf_upload, pdf_upload2):
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user)
|
||||
data = {**fv_data, **e_fv_data, "legacy_task_order-pdf": pdf_upload}
|
||||
|
||||
SaveFinancialVerificationDraft(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=True
|
||||
).execute()
|
||||
|
||||
data = {**data, "legacy_task_order-pdf": pdf_upload2}
|
||||
UpdateFinancialVerification(
|
||||
TrueValidator, TrueValidator, user, request, data, is_extended=True
|
||||
).execute()
|
||||
|
||||
form = GetFinancialVerificationForm(user, request, is_extended=True).execute()
|
||||
assert form.legacy_task_order.pdf.data == pdf_upload2.filename
|
||||
|
||||
|
||||
# TODO: This test manages an edge case for our current non-unique handling of
|
||||
# task orders. Because two requests can reference the same task order but we
|
||||
# only record one request ID on the PDF attachment, multiple task
|
||||
# orders/requests reference the same task order but only one of them is noted
|
||||
# in the related attachment entity. I have changed the handling in
|
||||
# FinancialVerificationBase#_get_form to be more generous in how it finds the
|
||||
# PDF filename and prepopulates the form data with that name.
|
||||
def test_always_derives_pdf_filename(fv_data, e_fv_data, pdf_upload):
|
||||
user = UserFactory.create()
|
||||
request_one = RequestFactory.create(creator=user)
|
||||
attachment = Attachment.attach(
|
||||
pdf_upload, resource="legacy_task_order", resource_id=request_one.id
|
||||
)
|
||||
legacy_task_order = LegacyTaskOrderFactory.create(pdf=attachment)
|
||||
request_two = RequestFactory.create(
|
||||
creator=user, legacy_task_order=legacy_task_order
|
||||
)
|
||||
|
||||
form_one = GetFinancialVerificationForm(
|
||||
user, request_one, is_extended=True
|
||||
).execute()
|
||||
form_two = GetFinancialVerificationForm(
|
||||
user, request_two, is_extended=True
|
||||
).execute()
|
||||
|
||||
assert form_one.legacy_task_order.pdf.data == attachment.filename
|
||||
assert form_two.legacy_task_order.pdf.data == attachment.filename
|
@ -1,28 +0,0 @@
|
||||
from flask import url_for
|
||||
|
||||
from atst.routes.requests.index import RequestsIndex
|
||||
from tests.factories import RequestFactory, UserFactory
|
||||
from atst.domain.requests import Requests
|
||||
|
||||
|
||||
def test_action_required_mission_owner():
|
||||
creator = UserFactory.create()
|
||||
requests = RequestFactory.create_batch(5, creator=creator)
|
||||
Requests.submit(requests[0])
|
||||
Requests.approve_and_create_portfolio(requests[1])
|
||||
|
||||
context = RequestsIndex(creator).execute()
|
||||
|
||||
assert context["requests"][0]["action_required"] == False
|
||||
|
||||
|
||||
def test_action_required_ccpo():
|
||||
creator = UserFactory.create()
|
||||
requests = RequestFactory.create_batch(5, creator=creator)
|
||||
Requests.submit(requests[0])
|
||||
Requests.approve_and_create_portfolio(requests[1])
|
||||
|
||||
ccpo = UserFactory.from_atat_role("ccpo")
|
||||
context = RequestsIndex(ccpo).execute()
|
||||
|
||||
assert context["num_action_required"] == 1
|
@ -1,84 +0,0 @@
|
||||
import pytest
|
||||
from urllib.parse import urlencode
|
||||
from .factories import UserFactory, RequestFactory
|
||||
|
||||
from atst.routes.requests.jedi_request_flow import JEDIRequestFlow
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
from atst.domain.requests import Requests
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def screens(app):
|
||||
return JEDIRequestFlow(3).screens
|
||||
|
||||
|
||||
def serialize_dates(data):
|
||||
if not data:
|
||||
return data
|
||||
|
||||
dates = {
|
||||
k: v.strftime("%m/%d/%Y") for k, v in data.items() if hasattr(v, "strftime")
|
||||
}
|
||||
|
||||
new_data = data.copy()
|
||||
new_data.update(dates)
|
||||
|
||||
return new_data
|
||||
|
||||
|
||||
def test_stepthrough_request_form(user_session, screens, client):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
mock_request = RequestFactory.create()
|
||||
mock_body = mock_request.body
|
||||
|
||||
def post_form(url, redirects=False, data=""):
|
||||
return client.post(
|
||||
url,
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data=data,
|
||||
follow_redirects=redirects,
|
||||
)
|
||||
|
||||
def take_a_step(inc, req=None, data=None):
|
||||
req_url = "/requests/new/{}".format(inc)
|
||||
if req:
|
||||
req_url += "/" + req
|
||||
# we do it twice, with and without redirect, in order to get the
|
||||
# destination url
|
||||
prelim_resp = post_form(req_url, data=data)
|
||||
response = post_form(req_url, True, data=data)
|
||||
assert prelim_resp.status_code == 302
|
||||
return (prelim_resp.headers.get("Location"), response)
|
||||
|
||||
# GET the initial form
|
||||
response = client.get("/requests/new/1")
|
||||
assert screens[0]["title"] in response.data.decode()
|
||||
|
||||
# POST to each of the form pages up until review and submit
|
||||
req_id = None
|
||||
for i in range(1, len(screens)):
|
||||
# get appropriate form data to POST for this section
|
||||
section = screens[i - 1]["section"]
|
||||
massaged = serialize_dates(mock_body[section])
|
||||
post_data = urlencode(massaged)
|
||||
|
||||
effective_url, resp = take_a_step(i, req=req_id, data=post_data)
|
||||
req_id = effective_url.split("/")[-1]
|
||||
screen_title = screens[i]["title"].replace("&", "&")
|
||||
|
||||
assert "/requests/new/{}/{}".format(i + 1, req_id) in effective_url
|
||||
assert screen_title in resp.data.decode()
|
||||
|
||||
# at this point, the real request we made and the mock_request bodies
|
||||
# should be equivalent
|
||||
assert Requests.get(user, req_id).body == mock_body
|
||||
|
||||
# finish the review and submit step
|
||||
client.post(
|
||||
"/requests/submit/{}".format(req_id),
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
)
|
||||
|
||||
finished_request = Requests.get(user, req_id)
|
||||
assert finished_request.status == RequestStatus.PENDING_CCPO_ACCEPTANCE
|
Loading…
x
Reference in New Issue
Block a user