diff --git a/atst/app.py b/atst/app.py index 2f27b661..19e09ca2 100644 --- a/atst/app.py +++ b/atst/app.py @@ -13,7 +13,6 @@ from atst.assets import environment as assets_environment from atst.filters import register_filters from atst.routes import bp from atst.routes.portfolios import portfolios_bp as portfolio_routes -from atst.routes.requests import requests_bp from atst.routes.task_orders import task_orders_bp from atst.routes.dev import bp as dev_routes from atst.routes.users import bp as user_routes @@ -68,7 +67,6 @@ def make_app(config): app.register_blueprint(portfolio_routes) app.register_blueprint(task_orders_bp) app.register_blueprint(user_routes) - app.register_blueprint(requests_bp) if ENV != "prod": app.register_blueprint(dev_routes) diff --git a/atst/routes/requests/__init__.py b/atst/routes/requests/__init__.py deleted file mode 100644 index d4214f56..00000000 --- a/atst/routes/requests/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -from flask import Blueprint - -from atst.domain.requests import Requests - -requests_bp = Blueprint("requests", __name__) - -from . import index -from . import requests_form -from . import financial_verification -from . import approval - - -@requests_bp.context_processor -def annual_spend_threshold(): - return {"annual_spend_threshold": Requests.ANNUAL_SPEND_THRESHOLD} diff --git a/atst/routes/requests/approval.py b/atst/routes/requests/approval.py deleted file mode 100644 index 9ec31bc9..00000000 --- a/atst/routes/requests/approval.py +++ /dev/null @@ -1,97 +0,0 @@ -from flask import ( - render_template, - g, - Response, - request as http_request, - redirect, - url_for, -) -from flask import current_app as app - -from . import requests_bp -from atst.domain.requests import Requests -from atst.domain.exceptions import NotFoundError -from atst.forms.ccpo_review import CCPOReviewForm -from atst.forms.internal_comment import InternalCommentForm - - -def map_ccpo_authorizing(user): - return {"fname_ccpo": user.first_name, "lname_ccpo": user.last_name} - - -def render_approval(request, form=None, internal_comment_form=None): - data = request.body - if request.has_financial_data: - data["legacy_task_order"] = request.legacy_task_order.to_dictionary() - - if not form: - mo_data = map_ccpo_authorizing(g.current_user) - form = CCPOReviewForm(data=mo_data) - - if not internal_comment_form: - internal_comment_form = InternalCommentForm() - - return render_template( - "requests/approval.html", - data=data, - reviews=list(reversed(request.reviews)), - jedi_request=request, - current_status=request.status.value, - review_form=form or CCPOReviewForm(), - internal_comment_form=internal_comment_form, - comments=request.internal_comments, - ) - - -@requests_bp.route("/requests/approval/", methods=["GET"]) -def approval(request_id): - request = Requests.get_for_approval(g.current_user, request_id) - - return render_approval(request) - - -@requests_bp.route("/requests/submit_approval/", methods=["POST"]) -def submit_approval(request_id): - request = Requests.get_for_approval(g.current_user, request_id) - - form = CCPOReviewForm(http_request.form) - if form.validate(): - if http_request.form.get("review") == "approving": - Requests.advance(g.current_user, request, form.data) - else: - Requests.request_changes(g.current_user, request, form.data) - - return redirect(url_for("requests.requests_index")) - else: - return render_approval(request, form) - - -@requests_bp.route("/requests/task_order_download/", methods=["GET"]) -def task_order_pdf_download(request_id): - request = Requests.get(g.current_user, request_id) - if request.legacy_task_order and request.legacy_task_order.pdf: - pdf = request.legacy_task_order.pdf - generator = app.csp.files.download(pdf.object_name) - return Response( - generator, - headers={ - "Content-Disposition": "attachment; filename={}".format(pdf.filename) - }, - mimetype="application/pdf", - ) - - else: - raise NotFoundError("legacy_task_order pdf") - - -@requests_bp.route("/requests/internal_comments/", methods=["POST"]) -def create_internal_comment(request_id): - form = InternalCommentForm(http_request.form) - request = Requests.get(g.current_user, request_id) - if form.validate(): - Requests.add_internal_comment(g.current_user, request, form.data.get("text")) - return redirect( - url_for("requests.approval", request_id=request_id, _anchor="ccpo-notes") - ) - else: - return render_approval(request, internal_comment_form=form) diff --git a/atst/routes/requests/financial_verification.py b/atst/routes/requests/financial_verification.py deleted file mode 100644 index 970d85a4..00000000 --- a/atst/routes/requests/financial_verification.py +++ /dev/null @@ -1,291 +0,0 @@ -from flask import g, render_template, redirect, url_for -from flask import request as http_request -from werkzeug.datastructures import ImmutableMultiDict, FileStorage - -from . import requests_bp -from atst.domain.requests import Requests -from atst.forms.financial import FinancialVerificationForm -from atst.forms.exceptions import FormValidationError -from atst.domain.exceptions import NotFoundError -from atst.domain.requests.financial_verification import ( - PENumberValidator, - TaskOrderNumberValidator, -) -from atst.models.attachment import Attachment -from atst.domain.legacy_task_orders import LegacyTaskOrders -from atst.utils.flash import formatted_flash as flash - - -def fv_extended(_http_request): - return _http_request.args.get("extended", "false").lower() in ["true", "t"] - - -class FinancialVerification(object): - def __init__(self, request): - self.request = request.latest_revision - self.legacy_task_order = request.legacy_task_order - - -class FinancialVerificationBase(object): - def _get_form(self, request, is_extended, formdata=None): - _formdata = ImmutableMultiDict(formdata) if formdata is not None else None - fv = FinancialVerification(request) - form = FinancialVerificationForm(obj=fv, formdata=_formdata) - - if not form.has_pdf_upload: - if isinstance(form.legacy_task_order.pdf.data, Attachment): - form.legacy_task_order.pdf.data = ( - form.legacy_task_order.pdf.data.filename - ) - else: - try: - attachment = Attachment.get_for_resource( - "legacy_task_order", self.request.id - ) - form.legacy_task_order.pdf.data = attachment.filename - except NotFoundError: - pass - - return form - - def _process_attachment(self, is_extended, form): - attachment = None - if is_extended: - attachment = None - if isinstance(form.legacy_task_order.pdf.data, FileStorage): - Attachment.delete_for_resource("legacy_task_order", self.request.id) - attachment = Attachment.attach( - form.legacy_task_order.pdf.data, - "legacy_task_order", - self.request.id, - ) - elif isinstance(form.legacy_task_order.pdf.data, str): - try: - attachment = Attachment.get_for_resource( - "legacy_task_order", self.request.id - ) - except NotFoundError: - pass - - if attachment: - form.legacy_task_order.pdf.data = attachment.filename - - return attachment - - def _try_create_task_order(self, form, attachment, is_extended): - task_order_number = form.legacy_task_order.number.data - if not task_order_number: - return None - - task_order_data = form.legacy_task_order.data - - if attachment: - task_order_data["pdf"] = attachment - - try: - legacy_task_order = LegacyTaskOrders.get(task_order_number) - legacy_task_order = LegacyTaskOrders.update( - legacy_task_order, task_order_data - ) - return legacy_task_order - except NotFoundError: - pass - - try: - return LegacyTaskOrders.get_from_eda(task_order_number) - except NotFoundError: - pass - - return LegacyTaskOrders.create(**task_order_data) - - def _raise(self, form): - form.reset() - raise FormValidationError(form) - - -class GetFinancialVerificationForm(FinancialVerificationBase): - def __init__(self, user, request, is_extended=False): - self.user = user - self.request = request - self.is_extended = is_extended - - def execute(self): - form = self._get_form(self.request, self.is_extended) - form.reset() - return form - - -class UpdateFinancialVerification(FinancialVerificationBase): - def __init__( - self, - pe_validator, - task_order_validator, - user, - request, - fv_data, - is_extended=False, - ): - self.pe_validator = pe_validator - self.task_order_validator = task_order_validator - self.user = user - self.request = request - self.fv_data = fv_data - self.is_extended = is_extended - - def execute(self): - form = self._get_form(self.request, self.is_extended, self.fv_data) - - should_update = True - should_submit = True - updated_request = None - - attachment = self._process_attachment(self.is_extended, form) - - if not form.validate(is_extended=self.is_extended, has_attachment=attachment): - should_update = False - - if not self.pe_validator.validate(self.request, form.pe_id): - should_submit = False - - if not self.is_extended and not self.task_order_validator.validate( - form.legacy_task_order.number - ): - should_submit = False - - if should_update: - legacy_task_order = self._try_create_task_order( - form, attachment, self.is_extended - ) - updated_request = Requests.update_financial_verification( - self.request.id, form.request.data, legacy_task_order=legacy_task_order - ) - if should_submit: - return Requests.submit_financial_verification(updated_request) - - self._raise(form) - - -class SaveFinancialVerificationDraft(FinancialVerificationBase): - def __init__( - self, - pe_validator, - task_order_validator, - user, - request, - fv_data, - is_extended=False, - ): - self.pe_validator = pe_validator - self.task_order_validator = task_order_validator - self.user = user - self.request = request - self.fv_data = fv_data - self.is_extended = is_extended - - def execute(self): - form = self._get_form(self.request, self.is_extended, self.fv_data) - attachment = self._process_attachment(self.is_extended, form) - legacy_task_order = self._try_create_task_order( - form, attachment, self.is_extended - ) - updated_request = Requests.update_financial_verification( - self.request.id, form.request.data, legacy_task_order=legacy_task_order - ) - - return updated_request - - -@requests_bp.route("/requests/verify//draft", methods=["GET"]) -@requests_bp.route("/requests/verify/", methods=["GET"]) -def financial_verification(request_id): - request = Requests.get(g.current_user, request_id) - is_extended = fv_extended(http_request) - saved_draft = http_request.args.get("saved_draft", False) - - should_be_extended = not is_extended and request.has_manual_task_order - if should_be_extended: - return redirect( - url_for(".financial_verification", request_id=request_id, extended=True) - ) - - form = GetFinancialVerificationForm( - g.current_user, request, is_extended=is_extended - ).execute() - - if request.review_comment: - flash("request_review_comment", comment=request.review_comment) - - return render_template( - "requests/financial_verification.html", - f=form, - jedi_request=request, - extended=is_extended, - saved_draft=saved_draft, - ) - - -@requests_bp.route("/requests/verify/", methods=["POST"]) -def update_financial_verification(request_id): - request = Requests.get(g.current_user, request_id) - fv_data = {**http_request.form, **http_request.files} - is_extended = fv_extended(http_request) - - try: - updated_request = UpdateFinancialVerification( - PENumberValidator(), - TaskOrderNumberValidator(), - g.current_user, - request, - fv_data, - is_extended=is_extended, - ).execute() - except FormValidationError as e: - return render_template( - "requests/financial_verification.html", - jedi_request=request, - f=e.form, - extended=is_extended, - ) - - if updated_request.legacy_task_order.verified: - portfolio = Requests.auto_approve_and_create_portfolio(updated_request) - flash("new_portfolio") - return redirect( - url_for("portfolios.new_application", portfolio_id=portfolio.id) - ) - else: - return redirect(url_for("requests.requests_index", modal="pendingCCPOApproval")) - - -@requests_bp.route("/requests/verify//draft", methods=["POST"]) -def save_financial_verification_draft(request_id): - user = g.current_user - request = Requests.get(user, request_id) - fv_data = {**http_request.form, **http_request.files} - is_extended = fv_extended(http_request) - - try: - updated_request = SaveFinancialVerificationDraft( - PENumberValidator(), - TaskOrderNumberValidator(), - user, - request, - fv_data, - is_extended=is_extended, - ).execute() - except FormValidationError as e: - return render_template( - "requests/financial_verification.html", - jedi_request=request, - f=e.form, - extended=is_extended, - ) - - return redirect( - url_for( - "requests.financial_verification", - request_id=updated_request.id, - is_extended=is_extended, - saved_draft=True, - ) - ) diff --git a/atst/routes/requests/index.py b/atst/routes/requests/index.py deleted file mode 100644 index eb0c7204..00000000 --- a/atst/routes/requests/index.py +++ /dev/null @@ -1,107 +0,0 @@ -import pendulum -from flask import render_template, g, url_for - -from . import requests_bp -from atst.domain.requests import Requests -from atst.models.permissions import Permissions -from atst.forms.data import SERVICE_BRANCHES -from atst.utils.flash import formatted_flash as flash - - -class RequestsIndex(object): - def __init__(self, user): - self.user = user - - def execute(self): - if ( - Permissions.REVIEW_AND_APPROVE_JEDI_PORTFOLIO_REQUEST - in self.user.atat_permissions - ): - context = self._ccpo_view(self.user) - - else: - context = self._non_ccpo_view(self.user) - - return { - **context, - "possible_statuses": Requests.possible_statuses(), - "possible_dod_components": [b[0] for b in SERVICE_BRANCHES[1:]], - } - - def _ccpo_view(self, user): - requests = Requests.get_many() - mapped_requests = [self._map_request(r, "ccpo") for r in requests] - num_action_required = len( - [r for r in mapped_requests if r.get("action_required")] - ) - - return { - "requests": mapped_requests, - "pending_financial_verification": False, - "pending_ccpo_acceptance": False, - "extended_view": True, - "kpi_inprogress": Requests.in_progress_count(), - "kpi_pending": Requests.pending_ccpo_count(), - "kpi_completed": Requests.completed_count(), - "num_action_required": num_action_required, - } - - def _non_ccpo_view(self, user): - requests = Requests.get_many(creator=user) - mapped_requests = [self._map_request(r, "mission_owner") for r in requests] - num_action_required = len( - [r for r in mapped_requests if r.get("action_required")] - ) - pending_fv = any(r.is_pending_financial_verification for r in requests) - pending_ccpo = any(r.is_pending_ccpo_acceptance for r in requests) - - return { - "requests": mapped_requests, - "pending_financial_verification": pending_fv, - "pending_ccpo_acceptance": pending_ccpo, - "num_action_required": num_action_required, - "extended_view": False, - } - - def _portfolio_link_for_request(self, request): - if request.is_approved: - return url_for( - "portfolios.portfolio_applications", portfolio_id=request.portfolio.id - ) - else: - return None - - def _map_request(self, request, viewing_role): - time_created = pendulum.instance(request.time_created) - is_new = time_created.add(days=1) > pendulum.now() - app_count = request.body.get("details_of_use", {}).get( - "num_software_systems", 0 - ) - annual_usage = request.annual_spend - - return { - "portfolio_id": request.portfolio.id if request.portfolio else None, - "name": request.displayname, - "is_new": is_new, - "is_approved": request.is_approved, - "status": request.status_displayname, - "app_count": app_count, - "last_submission_timestamp": request.last_submission_timestamp, - "last_edited_timestamp": request.latest_revision.time_updated, - "full_name": request.creator.full_name, - "annual_usage": annual_usage, - "edit_link": url_for("requests.edit", request_id=request.id), - "action_required": request.action_required_by == viewing_role, - "dod_component": request.latest_revision.dod_component, - "portfolio_link": self._portfolio_link_for_request(request), - } - - -@requests_bp.route("/requests", methods=["GET"]) -def requests_index(): - context = RequestsIndex(g.current_user).execute() - - if context.get("num_action_required"): - flash("requests_action_required", count=context.get("num_action_required")) - - return render_template("requests/index.html", **context) diff --git a/atst/routes/requests/jedi_request_flow.py b/atst/routes/requests/jedi_request_flow.py deleted file mode 100644 index 269ff632..00000000 --- a/atst/routes/requests/jedi_request_flow.py +++ /dev/null @@ -1,162 +0,0 @@ -from collections import defaultdict - -from atst.domain.requests import Requests -import atst.forms.new_request as request_forms - - -class JEDIRequestFlow(object): - def __init__( - self, - current_step, - current_user=None, - request=None, - post_data=None, - request_id=None, - existing_request=None, - ): - self.current_step = current_step - - self.current_user = current_user - self.request = request - - self.post_data = post_data - self.is_post = self.post_data is not None - - self.request_id = request_id - self.form = self._form() - - self.existing_request = existing_request - - def _form(self): - if self.is_post: - return self.form_class()(self.post_data) - else: - return self.form_class()(data=self.current_step_data) - - def validate(self): - return self.form.validate() - - def validate_warnings(self): - existing_request_data = ( - self.existing_request and self.existing_request.body.get(self.form_section) - ) or None - - valid = self.form.perform_extra_validation(existing_request_data) - return valid - - @property - def current_screen(self): - return self.screens[self.current_step - 1] - - @property - def form_section(self): - return self.current_screen["section"] - - def form_class(self): - return self.current_screen["form"] - - # maps user data to fields in InformationAboutYouForm; this should be moved - # into the request initialization process when we have a request schema, or - # we just shouldn't record this data on the request - def map_user_data(self, user): - return { - "fname_request": user.first_name, - "lname_request": user.last_name, - "email_request": user.email, - "phone_number": user.phone_number, - "phone_ext": user.phone_ext, - "service_branch": user.service_branch, - "designation": user.designation, - "citizenship": user.citizenship, - "date_latest_training": user.date_latest_training, - } - - @property - def current_step_data(self): - data = {} - - if self.is_post: - data = self.post_data - - if self.request: - if self.form_section == "review_submit": - data = self.request.body - elif self.form_section == "information_about_you": - form_data = self.request.body.get(self.form_section, {}) - data = {**self.map_user_data(self.request.creator), **form_data} - else: - data = self.request.body.get(self.form_section, {}) - elif self.form_section == "information_about_you": - data = self.map_user_data(self.current_user) - - return defaultdict(lambda: defaultdict(lambda: None), data) - - @property - def can_submit(self): - return self.request and Requests.should_allow_submission(self.request) - - @property - def next_screen(self): - return self.current_step + 1 - - @property - def screens(self): - return [ - { - "title": "Details of Use", - "section": "details_of_use", - "form": request_forms.DetailsOfUseForm, - }, - { - "title": "Information About You", - "section": "information_about_you", - "form": request_forms.InformationAboutYouForm, - }, - { - "title": "Portfolio Owner", - "section": "primary_poc", - "form": request_forms.PortfolioOwnerForm, - }, - { - "title": "Review & Submit", - "section": "review_submit", - "form": request_forms.ReviewAndSubmitForm, - }, - ] - - @property - def is_review_screen(self): - return self.screens[-1] == self.current_screen - - def create_or_update_request(self): - request_data = self.map_request_data(self.form_section, self.form.data) - if self.request_id: - Requests.update(self.request_id, request_data) - else: - request = Requests.create(self.current_user, request_data) - self.request_id = request.id - - def map_request_data(self, section, data): - if section == "primary_poc": - if data.get("am_poc", False): - try: - request_user_info = self.existing_request.body.get( - "information_about_you", {} - ) - except AttributeError: - request_user_info = {} - - data = { - **data, - "dodid_poc": self.current_user.dod_id, - "fname_poc": request_user_info.get( - "fname_request", self.current_user.first_name - ), - "lname_poc": request_user_info.get( - "lname_request", self.current_user.last_name - ), - "email_poc": request_user_info.get( - "email_request", self.current_user.email - ), - } - return {section: data} diff --git a/atst/routes/requests/requests_form.py b/atst/routes/requests/requests_form.py deleted file mode 100644 index 8d8f17a4..00000000 --- a/atst/routes/requests/requests_form.py +++ /dev/null @@ -1,187 +0,0 @@ -from flask import g, redirect, render_template, url_for, request as http_request - -from . import requests_bp -from atst.domain.requests import Requests -from atst.domain.authz import Authorization -from atst.routes.requests.jedi_request_flow import JEDIRequestFlow -from atst.models.request_status_event import RequestStatus -from atst.forms.data import ( - SERVICE_BRANCHES, - ASSISTANCE_ORG_TYPES, - DATA_TRANSFER_AMOUNTS, - COMPLETION_DATE_RANGES, - FUNDING_TYPES, - TASK_ORDER_SOURCES, -) -from atst.utils.flash import formatted_flash as flash - - -@requests_bp.context_processor -def option_data(): - return { - "service_branches": SERVICE_BRANCHES, - "assistance_org_types": ASSISTANCE_ORG_TYPES, - "data_transfer_amounts": DATA_TRANSFER_AMOUNTS, - "completion_date_ranges": COMPLETION_DATE_RANGES, - "funding_types": FUNDING_TYPES, - "task_order_sources": TASK_ORDER_SOURCES, - } - - -@requests_bp.route("/requests/new/", methods=["GET"]) -def requests_form_new(screen): - jedi_flow = JEDIRequestFlow(screen, request=None, current_user=g.current_user) - - if jedi_flow.is_review_screen and not jedi_flow.can_submit: - flash("request_incomplete") - - return render_template( - "requests/screen-%d.html" % int(screen), - f=jedi_flow.form, - data=jedi_flow.current_step_data, - screens=jedi_flow.screens, - current=screen, - next_screen=screen + 1, - can_submit=jedi_flow.can_submit, - ) - - -@requests_bp.route( - "/requests/new/", methods=["GET"], defaults={"request_id": None} -) -@requests_bp.route("/requests/new//", methods=["GET"]) -def requests_form_update(screen=1, request_id=None): - request = ( - Requests.get(g.current_user, request_id) if request_id is not None else None - ) - jedi_flow = JEDIRequestFlow( - screen, request=request, request_id=request_id, current_user=g.current_user - ) - - if jedi_flow.is_review_screen and not jedi_flow.can_submit: - flash("request_incomplete") - - if request.review_comment: - flash("request_review_comment", comment=request.review_comment) - - return render_template( - "requests/screen-%d.html" % int(screen), - f=jedi_flow.form, - data=jedi_flow.current_step_data, - screens=jedi_flow.screens, - current=screen, - next_screen=screen + 1, - request_id=request_id, - jedi_request=jedi_flow.request, - can_submit=jedi_flow.can_submit, - ) - - -@requests_bp.route( - "/requests/new/", methods=["POST"], defaults={"request_id": None} -) -@requests_bp.route("/requests/new//", methods=["POST"]) -def requests_update(screen=1, request_id=None): - screen = int(screen) - post_data = http_request.form - current_user = g.current_user - existing_request = ( - Requests.get(g.current_user, request_id) if request_id is not None else None - ) - jedi_flow = JEDIRequestFlow( - screen, - post_data=post_data, - request_id=request_id, - current_user=current_user, - existing_request=existing_request, - ) - - has_next_screen = jedi_flow.next_screen <= len(jedi_flow.screens) - valid = jedi_flow.validate() and jedi_flow.validate_warnings() - - if valid: - jedi_flow.create_or_update_request() - - if has_next_screen: - where = url_for( - "requests.requests_form_update", - screen=jedi_flow.next_screen, - request_id=jedi_flow.request_id, - ) - else: - where = "/requests" - return redirect(where) - else: - rerender_args = dict( - f=jedi_flow.form, - data=post_data, - screens=jedi_flow.screens, - current=screen, - next_screen=jedi_flow.next_screen, - request_id=jedi_flow.request_id, - ) - return render_template("requests/screen-%d.html" % int(screen), **rerender_args) - - -@requests_bp.route("/requests/submit/", methods=["POST"]) -def requests_submit(request_id=None): - request = Requests.get(g.current_user, request_id) - Requests.submit(request) - - if request.status == RequestStatus.PENDING_FINANCIAL_VERIFICATION: - modal = "pendingFinancialVerification" - else: - modal = "pendingCCPOAcceptance" - - return redirect(url_for("requests.requests_index", modal=modal)) - - -@requests_bp.route("/requests/details/", methods=["GET"]) -def view_request_details(request_id=None): - request = Requests.get(g.current_user, request_id) - requires_fv_action = ( - request.is_pending_financial_verification - or request.is_pending_financial_verification_changes - ) - - data = request.body - if request.has_financial_data: - data["legacy_task_order"] = request.legacy_task_order.to_dictionary() - - return render_template( - "requests/details.html", - data=data, - jedi_request=request, - requires_fv_action=requires_fv_action, - ) - - -@requests_bp.route("/requests/edit/") -def edit(request_id): - user = g.current_user - request = Requests.get(user, request_id) - is_ccpo = Authorization.is_ccpo(user) - - redirect_url = "" - - if request.creator == user: - if request.is_pending_financial_verification: - redirect_url = url_for( - "requests.financial_verification", request_id=request.id - ) - elif request.is_pending_financial_verification_changes: - redirect_url = url_for( - "requests.financial_verification", request_id=request.id, extended=True - ) - elif request.is_approved: - redirect_url = url_for( - "requests.view_request_details", request_id=request.id - ) - else: - redirect_url = url_for( - "requests.requests_form_update", screen=1, request_id=request.id - ) - elif is_ccpo: - redirect_url = url_for("requests.approval", request_id=request.id) - - return redirect(redirect_url) diff --git a/tests/routes/requests/requests_form/test_details.py b/tests/routes/requests/requests_form/test_details.py deleted file mode 100644 index ddeade54..00000000 --- a/tests/routes/requests/requests_form/test_details.py +++ /dev/null @@ -1,39 +0,0 @@ -import re -from flask import url_for - -from atst.models.request_status_event import RequestStatus - -from tests.factories import RequestFactory, LegacyTaskOrderFactory, UserFactory - - -def test_can_show_financial_data(client, user_session): - user = UserFactory.create() - user_session(user) - - legacy_task_order = LegacyTaskOrderFactory.create() - request = RequestFactory.create_with_status( - status=RequestStatus.PENDING_CCPO_APPROVAL, - legacy_task_order=legacy_task_order, - creator=user, - ) - response = client.get( - url_for("requests.view_request_details", request_id=request.id) - ) - - body = response.data.decode() - assert re.search(r">\s+Financial Verification\s+<", body) - - -def test_can_not_show_financial_data(client, user_session): - user = UserFactory.create() - user_session(user) - - request = RequestFactory.create_with_status( - status=RequestStatus.PENDING_CCPO_ACCEPTANCE, creator=user - ) - response = client.get( - url_for("requests.view_request_details", request_id=request.id) - ) - - body = response.data.decode() - assert not re.search(r">\s+Financial Verification\s+<", body) diff --git a/tests/routes/requests/requests_form/test_edit.py b/tests/routes/requests/requests_form/test_edit.py deleted file mode 100644 index 37e7b243..00000000 --- a/tests/routes/requests/requests_form/test_edit.py +++ /dev/null @@ -1,52 +0,0 @@ -from tests.factories import UserFactory, RequestFactory -from atst.models.request_status_event import RequestStatus - - -def test_creator_pending_finver(client, user_session): - request = RequestFactory.create_with_status( - RequestStatus.PENDING_FINANCIAL_VERIFICATION - ) - user_session(request.creator) - response = client.get( - "/requests/edit/{}".format(request.id), follow_redirects=False - ) - assert "verify" in response.location - - -def test_creator_pending_finver_changes(client, user_session): - request = RequestFactory.create_with_status( - RequestStatus.CHANGES_REQUESTED_TO_FINVER - ) - user_session(request.creator) - response = client.get( - "/requests/edit/{}".format(request.id), follow_redirects=False - ) - assert "verify" in response.location - - -def test_creator_approved(client, user_session): - request = RequestFactory.create_with_status(RequestStatus.APPROVED) - user_session(request.creator) - response = client.get( - "/requests/edit/{}".format(request.id), follow_redirects=False - ) - assert "details" in response.location - - -def test_creator_approved(client, user_session): - request = RequestFactory.create_with_status(RequestStatus.STARTED) - user_session(request.creator) - response = client.get( - "/requests/edit/{}".format(request.id), follow_redirects=False - ) - assert "new" in response.location - - -def test_ccpo(client, user_session): - ccpo = UserFactory.from_atat_role("ccpo") - request = RequestFactory.create_with_status(RequestStatus.STARTED) - user_session(ccpo) - response = client.get( - "/requests/edit/{}".format(request.id), follow_redirects=False - ) - assert "approval" in response.location diff --git a/tests/routes/requests/requests_form/test_new.py b/tests/routes/requests/requests_form/test_new.py deleted file mode 100644 index bc3c1705..00000000 --- a/tests/routes/requests/requests_form/test_new.py +++ /dev/null @@ -1,249 +0,0 @@ -import datetime -import re -import pytest -from tests.factories import ( - RequestFactory, - UserFactory, - RequestRevisionFactory, - RequestStatusEventFactory, - RequestReviewFactory, -) -from atst.models.request_status_event import RequestStatus -from atst.domain.roles import Roles -from atst.domain.requests import Requests -from urllib.parse import urlencode - -from tests.assert_util import dict_contains - -ERROR_CLASS = "alert--error" - - -def test_submit_invalid_request_form(monkeypatch, client, user_session): - user_session() - response = client.post( - "/requests/new/1", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data="total_ram=5", - ) - assert re.search(ERROR_CLASS, response.data.decode()) - - -def test_submit_valid_request_form(monkeypatch, client, user_session): - user_session() - monkeypatch.setattr( - "atst.forms.new_request.DetailsOfUseForm.validate", lambda s: True - ) - - response = client.post( - "/requests/new/1", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data="meaning=42", - ) - assert "/requests/new/2" in response.headers.get("Location") - - -def test_owner_can_view_request(client, user_session): - user = UserFactory.create() - user_session(user) - request = RequestFactory.create(creator=user) - - response = client.get( - "/requests/new/1/{}".format(request.id), follow_redirects=True - ) - - assert response.status_code == 200 - - -def test_non_owner_cannot_view_request(client, user_session): - user = UserFactory.create() - user_session(user) - request = RequestFactory.create() - - response = client.get( - "/requests/new/1/{}".format(request.id), follow_redirects=True - ) - - assert response.status_code == 404 - - -def test_ccpo_can_view_request(client, user_session): - ccpo = Roles.get("ccpo") - user = UserFactory.create(atat_role=ccpo) - user_session(user) - request = RequestFactory.create() - - response = client.get( - "/requests/new/1/{}".format(request.id), follow_redirects=True - ) - - assert response.status_code == 200 - - -@pytest.mark.skip(reason="create request flow no longer active") -def test_nonexistent_request(client, user_session): - user_session() - response = client.get("/requests/new/1/foo", follow_redirects=True) - - assert response.status_code == 404 - - -def test_creator_info_is_autopopulated_for_existing_request( - monkeypatch, client, user_session -): - user = UserFactory.create() - user_session(user) - request = RequestFactory.create(creator=user, initial_revision={}) - - response = client.get("/requests/new/2/{}".format(request.id)) - body = response.data.decode() - prepopulated_values = [ - "first_name", - "last_name", - "email", - "phone_number", - "date_latest_training", - ] - for attr in prepopulated_values: - value = getattr(user, attr) - if isinstance(value, datetime.date): - value = value.strftime("%m/%d/%Y") - assert "initial-value='{}'".format(value) in body - - -def test_creator_info_is_autopopulated_for_new_request( - monkeypatch, client, user_session -): - user = UserFactory.create() - user_session(user) - - response = client.get("/requests/new/2") - body = response.data.decode() - assert "initial-value='{}'".format(user.first_name) in body - assert "initial-value='{}'".format(user.last_name) in body - assert "initial-value='{}'".format(user.email) in body - - -def test_non_creator_info_is_not_autopopulated(monkeypatch, client, user_session): - user = UserFactory.create() - creator = UserFactory.create() - user_session(user) - request = RequestFactory.create(creator=creator, initial_revision={}) - - response = client.get("/requests/new/2/{}".format(request.id)) - body = response.data.decode() - assert not user.first_name in body - assert not user.last_name in body - assert not user.email in body - - -def test_am_poc_causes_poc_to_be_autopopulated(client, user_session): - creator = UserFactory.create() - user_session(creator) - request = RequestFactory.create(creator=creator, initial_revision={}) - client.post( - "/requests/new/3/{}".format(request.id), - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data="am_poc=yes", - ) - request = Requests.get(creator, request.id) - assert request.body["primary_poc"]["dodid_poc"] == creator.dod_id - - -def test_not_am_poc_requires_poc_info_to_be_completed(client, user_session): - creator = UserFactory.create() - user_session(creator) - request = RequestFactory.create(creator=creator, initial_revision={}) - response = client.post( - "/requests/new/3/{}".format(request.id), - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data="am_poc=no", - follow_redirects=True, - ) - assert ERROR_CLASS in response.data.decode() - - -def test_not_am_poc_allows_user_to_fill_in_poc_info(client, user_session): - creator = UserFactory.create() - user_session(creator) - request = RequestFactory.create(creator=creator, initial_revision={}) - poc_input = { - "am_poc": "no", - "fname_poc": "test", - "lname_poc": "user", - "email_poc": "test.user@mail.com", - "dodid_poc": "1234567890", - } - response = client.post( - "/requests/new/3/{}".format(request.id), - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data=urlencode(poc_input), - ) - assert ERROR_CLASS not in response.data.decode() - - -def test_poc_details_can_be_autopopulated_on_new_request(client, user_session): - creator = UserFactory.create() - user_session(creator) - response = client.post( - "/requests/new/3", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data="am_poc=yes", - ) - request_id = response.headers["Location"].split("/")[-1] - request = Requests.get(creator, request_id) - - assert request.body["primary_poc"]["dodid_poc"] == creator.dod_id - - -def test_poc_autofill_checks_information_about_you_form_first(client, user_session): - creator = UserFactory.create() - user_session(creator) - request = RequestFactory.create( - creator=creator, - initial_revision=dict( - fname_request="Alice", - lname_request="Adams", - email_request="alice.adams@mail.mil", - ), - ) - poc_input = {"am_poc": "yes"} - client.post( - "/requests/new/3/{}".format(request.id), - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data=urlencode(poc_input), - ) - request = Requests.get(creator, request.id) - assert dict_contains( - request.body["primary_poc"], - { - "fname_poc": "Alice", - "lname_poc": "Adams", - "email_poc": "alice.adams@mail.mil", - }, - ) - - -def test_can_review_data(user_session, client): - creator = UserFactory.create() - user_session(creator) - request = RequestFactory.create(creator=creator) - response = client.get("/requests/new/4/{}".format(request.id)) - body = response.data.decode() - # assert a sampling of the request data is on the review page - assert request.body["primary_poc"]["fname_poc"] in body - assert request.body["information_about_you"]["email_request"] in body - - -def test_displays_ccpo_review_comment(user_session, client): - creator = UserFactory.create() - ccpo = UserFactory.from_atat_role("ccpo") - user_session(creator) - request = RequestFactory.create(creator=creator) - request = Requests.set_status(request, RequestStatus.CHANGES_REQUESTED) - review_comment = "add all of the correct info, instead of the incorrect info" - RequestReviewFactory.create( - reviewer=ccpo, comment=review_comment, status=request.status_events[-1] - ) - response = client.get("/requests/new/1/{}".format(request.id)) - body = response.data.decode() - assert review_comment in body diff --git a/tests/routes/requests/requests_form/test_submit.py b/tests/routes/requests/requests_form/test_submit.py deleted file mode 100644 index c824015d..00000000 --- a/tests/routes/requests/requests_form/test_submit.py +++ /dev/null @@ -1,42 +0,0 @@ -import pytest -from tests.factories import RequestFactory -from atst.models.request_status_event import RequestStatus - - -def _mock_func(*args, **kwargs): - return RequestFactory.create() - - -def test_submit_reviewed_request(monkeypatch, client, user_session): - user_session() - monkeypatch.setattr("atst.domain.requests.Requests.get", _mock_func) - monkeypatch.setattr("atst.domain.requests.Requests.submit", _mock_func) - monkeypatch.setattr("atst.models.request.Request.status", "pending") - # this just needs to send a known invalid form value - response = client.post( - "/requests/submit/1", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data="", - follow_redirects=False, - ) - assert "/requests" in response.headers["Location"] - assert "modal=pendingCCPOAcceptance" in response.headers["Location"] - - -def test_submit_autoapproved_reviewed_request(monkeypatch, client, user_session): - user_session() - monkeypatch.setattr("atst.domain.requests.Requests.get", _mock_func) - monkeypatch.setattr("atst.domain.requests.Requests.submit", _mock_func) - monkeypatch.setattr( - "atst.models.request.Request.status", - RequestStatus.PENDING_FINANCIAL_VERIFICATION, - ) - response = client.post( - "/requests/submit/1", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data="", - follow_redirects=False, - ) - assert ( - "/requests?modal=pendingFinancialVerification" in response.headers["Location"] - ) diff --git a/tests/routes/requests/test_approval.py b/tests/routes/requests/test_approval.py deleted file mode 100644 index 9d507789..00000000 --- a/tests/routes/requests/test_approval.py +++ /dev/null @@ -1,201 +0,0 @@ -import os -from flask import url_for - -from atst.models.attachment import Attachment -from atst.models.request_status_event import RequestStatus -from atst.domain.roles import Roles - -from tests.factories import ( - RequestFactory, - LegacyTaskOrderFactory, - UserFactory, - RequestReviewFactory, - RequestStatusEventFactory, -) - - -def test_ccpo_can_view_approval(user_session, client): - ccpo = Roles.get("ccpo") - user = UserFactory.create(atat_role=ccpo) - user_session(user) - - request = RequestFactory.create() - response = client.get(url_for("requests.approval", request_id=request.id)) - assert response.status_code == 200 - - -def test_ccpo_prepopulated_as_mission_owner(user_session, client): - user = UserFactory.from_atat_role("ccpo") - user_session(user) - - request = RequestFactory.create_with_status(RequestStatus.PENDING_CCPO_ACCEPTANCE) - response = client.get(url_for("requests.approval", request_id=request.id)) - - body = response.data.decode() - assert user.first_name in body - assert user.last_name in body - - -def test_non_ccpo_cannot_view_approval(user_session, client): - user = UserFactory.create() - user_session(user) - - request = RequestFactory.create(creator=user) - response = client.get(url_for("requests.approval", request_id=request.id)) - assert response.status_code == 404 - - -def prepare_request_pending_approval(creator, pdf_attachment=None): - legacy_task_order = LegacyTaskOrderFactory.create( - number="abc123", pdf=pdf_attachment - ) - return RequestFactory.create_with_status( - status=RequestStatus.PENDING_CCPO_APPROVAL, - legacy_task_order=legacy_task_order, - creator=creator, - ) - - -def test_ccpo_sees_pdf_link(user_session, client, pdf_upload): - ccpo = UserFactory.from_atat_role("ccpo") - user_session(ccpo) - - attachment = Attachment.attach(pdf_upload) - request = prepare_request_pending_approval(ccpo, pdf_attachment=attachment) - - response = client.get(url_for("requests.approval", request_id=request.id)) - download_url = url_for("requests.task_order_pdf_download", request_id=request.id) - - body = response.data.decode() - assert download_url in body - - -def test_ccpo_does_not_see_pdf_link_if_no_pdf(user_session, client, pdf_upload): - ccpo = UserFactory.from_atat_role("ccpo") - user_session(ccpo) - - request = prepare_request_pending_approval(ccpo) - - response = client.get(url_for("requests.approval", request_id=request.id)) - download_url = url_for("requests.task_order_pdf_download", request_id=request.id) - - body = response.data.decode() - assert download_url not in body - - -def test_task_order_download(app, client, user_session, pdf_upload): - user = UserFactory.create() - user_session(user) - - attachment = Attachment.attach(pdf_upload) - legacy_task_order = LegacyTaskOrderFactory.create(number="abc123", pdf=attachment) - request = RequestFactory.create(legacy_task_order=legacy_task_order, creator=user) - - # ensure that real data for pdf upload has been flushed to disk - pdf_upload.seek(0) - pdf_content = pdf_upload.read() - pdf_upload.close() - full_path = os.path.join( - app.config.get("STORAGE_CONTAINER"), attachment.object_name - ) - with open(full_path, "wb") as output_file: - output_file.write(pdf_content) - output_file.flush() - - response = client.get( - url_for("requests.task_order_pdf_download", request_id=request.id) - ) - assert response.data == pdf_content - - -def test_task_order_download_does_not_exist(client, user_session): - user = UserFactory.create() - user_session(user) - request = RequestFactory.create(creator=user) - response = client.get( - url_for("requests.task_order_pdf_download", request_id=request.id) - ) - assert response.status_code == 404 - - -def test_can_submit_request_approval(client, user_session): - user = UserFactory.from_atat_role("ccpo") - user_session(user) - request = RequestFactory.create_with_status( - status=RequestStatus.PENDING_CCPO_ACCEPTANCE - ) - review_data = RequestReviewFactory.dictionary() - review_data["review"] = "approving" - response = client.post( - url_for("requests.submit_approval", request_id=request.id), data=review_data - ) - assert response.status_code == 302 - assert request.status == RequestStatus.PENDING_FINANCIAL_VERIFICATION - - -def test_can_submit_request_denial(client, user_session): - user = UserFactory.from_atat_role("ccpo") - user_session(user) - request = RequestFactory.create_with_status( - status=RequestStatus.PENDING_CCPO_ACCEPTANCE - ) - review_data = RequestReviewFactory.dictionary() - review_data["review"] = "denying" - response = client.post( - url_for("requests.submit_approval", request_id=request.id), data=review_data - ) - assert response.status_code == 302 - assert request.status == RequestStatus.CHANGES_REQUESTED - - -def test_ccpo_user_can_comment_on_request(client, user_session): - user = UserFactory.from_atat_role("ccpo") - user_session(user) - request = RequestFactory.create_with_status( - status=RequestStatus.PENDING_CCPO_ACCEPTANCE - ) - assert len(request.internal_comments) == 0 - - comment_text = "This is the greatest request in the history of requests" - comment_form_data = {"text": comment_text} - response = client.post( - url_for("requests.create_internal_comment", request_id=request.id), - data=comment_form_data, - ) - assert response.status_code == 302 - assert len(request.internal_comments) == 1 - assert request.internal_comments[0].text == comment_text - - -def test_comment_text_is_required(client, user_session): - user = UserFactory.from_atat_role("ccpo") - user_session(user) - request = RequestFactory.create_with_status( - status=RequestStatus.PENDING_CCPO_ACCEPTANCE - ) - assert len(request.internal_comments) == 0 - - comment_form_data = {"text": ""} - response = client.post( - url_for("requests.create_internal_comment", request_id=request.id), - data=comment_form_data, - ) - assert response.status_code == 200 - assert len(request.internal_comments) == 0 - - -def test_other_user_cannot_comment_on_request(client, user_session): - user = UserFactory.create() - user_session(user) - request = RequestFactory.create_with_status( - status=RequestStatus.PENDING_CCPO_ACCEPTANCE - ) - - comment_text = "What is this even" - comment_form_data = {"text": comment_text} - response = client.post( - url_for("requests.create_internal_comment", request_id=request.id), - data=comment_form_data, - ) - - assert response.status_code == 404 diff --git a/tests/routes/requests/test_financial_verification.py b/tests/routes/requests/test_financial_verification.py deleted file mode 100644 index 734d0864..00000000 --- a/tests/routes/requests/test_financial_verification.py +++ /dev/null @@ -1,543 +0,0 @@ -import pytest -from unittest.mock import MagicMock -from flask import url_for -import datetime - -from atst.eda_client import MockEDAClient -from atst.routes.requests.financial_verification import ( - GetFinancialVerificationForm, - UpdateFinancialVerification, - SaveFinancialVerificationDraft, -) - -from tests.mocks import MOCK_VALID_PE_ID -from tests.factories import RequestFactory, UserFactory, LegacyTaskOrderFactory -from atst.forms.exceptions import FormValidationError -from atst.domain.requests.financial_verification import ( - PENumberValidator, - TaskOrderNumberValidator, -) -from atst.models.request_status_event import RequestStatus -from atst.models.attachment import Attachment -from atst.domain.requests.query import RequestsQuery - - -@pytest.fixture -def fv_data(): - return { - "request-pe_id": "123", - "legacy_task_order-number": MockEDAClient.MOCK_CONTRACT_NUMBER, - "request-fname_co": "Contracting", - "request-lname_co": "Officer", - "request-email_co": "jane@mail.mil", - "request-office_co": "WHS", - "request-fname_cor": "Officer", - "request-lname_cor": "Representative", - "request-email_cor": "jane@mail.mil", - "request-office_cor": "WHS", - "request-uii_ids": "1234", - "request-treasury_code": "00123456", - "request-ba_code": "02A", - } - - -@pytest.fixture -def e_fv_data(pdf_upload): - return { - "legacy_task_order-funding_type": "RDTE", - "legacy_task_order-funding_type_other": "other", - "legacy_task_order-expiration_date": "1/1/{}".format( - datetime.date.today().year + 1 - ), - "legacy_task_order-clin_0001": "50000", - "legacy_task_order-clin_0003": "13000", - "legacy_task_order-clin_1001": "30000", - "legacy_task_order-clin_1003": "7000", - "legacy_task_order-clin_2001": "30000", - "legacy_task_order-clin_2003": "7000", - "legacy_task_order-pdf": pdf_upload, - } - - -MANUAL_TO_NUMBER = "DCA10096D0051" - - -TrueValidator = MagicMock() -TrueValidator.validate = MagicMock(return_value=True) - -FalseValidator = MagicMock() -FalseValidator.validate = MagicMock(return_value=False) - - -def test_update_fv(fv_data): - request = RequestFactory.create() - user = UserFactory.create() - data = {**fv_data, "pe_id": MOCK_VALID_PE_ID} - - updated_request = UpdateFinancialVerification( - TrueValidator, TrueValidator, user, request, data, is_extended=False - ).execute() - - assert updated_request.is_pending_ccpo_approval - - -def test_update_fv_re_enter_pe_number(fv_data): - request = RequestFactory.create() - user = UserFactory.create() - data = {**fv_data, "pe_id": "0101228M"} - update_fv = UpdateFinancialVerification( - PENumberValidator(), TrueValidator, user, request, data, is_extended=False - ) - - with pytest.raises(FormValidationError): - update_fv.execute() - updated_request = update_fv.execute() - - assert updated_request.is_pending_ccpo_approval - - -def test_update_fv_invalid_task_order_number(fv_data): - request = RequestFactory.create() - user = UserFactory.create() - data = {**fv_data, "legacy_task_order-number": MANUAL_TO_NUMBER} - update_fv = UpdateFinancialVerification( - TrueValidator, - TaskOrderNumberValidator(), - user, - request, - data, - is_extended=False, - ) - - with pytest.raises(FormValidationError): - update_fv.execute() - - -def test_draft_without_pe_id(fv_data): - request = RequestFactory.create() - user = UserFactory.create() - data = {"request-uii_ids": "1234"} - assert SaveFinancialVerificationDraft( - PENumberValidator(), - TaskOrderNumberValidator(), - user, - request, - data, - is_extended=False, - ).execute() - - -def test_update_fv_extended(fv_data, e_fv_data): - request = RequestFactory.create() - user = UserFactory.create() - data = {**fv_data, **e_fv_data} - update_fv = UpdateFinancialVerification( - TrueValidator, TaskOrderNumberValidator(), user, request, data, is_extended=True - ) - - assert update_fv.execute() - - -def test_update_fv_extended_does_not_validate_task_order(fv_data, e_fv_data): - request = RequestFactory.create() - user = UserFactory.create() - data = {**fv_data, **e_fv_data, "legacy_task_order-number": "abc123"} - update_fv = UpdateFinancialVerification( - TrueValidator, TaskOrderNumberValidator(), user, request, data, is_extended=True - ) - - assert update_fv.execute() - - -def test_update_fv_missing_extended_data(fv_data): - request = RequestFactory.create() - user = UserFactory.create() - update_fv = UpdateFinancialVerification( - TrueValidator, - TaskOrderNumberValidator(), - user, - request, - fv_data, - is_extended=True, - ) - - with pytest.raises(FormValidationError): - update_fv.execute() - - -def test_update_fv_submission(fv_data): - request = RequestFactory.create() - user = UserFactory.create() - updated_request = UpdateFinancialVerification( - TrueValidator, TrueValidator, user, request, fv_data - ).execute() - assert updated_request - - -def test_save_empty_draft(): - request = RequestFactory.create() - user = UserFactory.create() - save_draft = SaveFinancialVerificationDraft( - TrueValidator, TrueValidator, user, request, {}, is_extended=False - ) - - assert save_draft.execute() - - -def test_save_draft_with_ba_code(): - request = RequestFactory.create() - user = UserFactory.create() - data = {"ba_code": "02A"} - save_draft = SaveFinancialVerificationDraft( - TrueValidator, TrueValidator, user, request, data, is_extended=False - ) - - assert save_draft.execute() - - -def test_save_draft_allows_invalid_data(): - request = RequestFactory.create() - user = UserFactory.create() - data = { - "legacy_task_order-number": MANUAL_TO_NUMBER, - "request-pe_id": "123", - "request-ba_code": "a", - } - - assert SaveFinancialVerificationDraft( - PENumberValidator(), - TaskOrderNumberValidator(), - user, - request, - data, - is_extended=True, - ).execute() - - -def test_save_draft_and_then_submit(): - request = RequestFactory.create() - user = UserFactory.create() - data = {"ba_code": "02A"} - updated_request = SaveFinancialVerificationDraft( - TrueValidator, TrueValidator, user, request, data, is_extended=False - ).execute() - - with pytest.raises(FormValidationError): - UpdateFinancialVerification( - TrueValidator, TrueValidator, user, updated_request, data - ).execute() - - -def test_updated_request_has_pdf(fv_data, e_fv_data): - request = RequestFactory.create() - user = UserFactory.create() - data = {**fv_data, **e_fv_data, "legacy_task_order-number": MANUAL_TO_NUMBER} - updated_request = UpdateFinancialVerification( - TrueValidator, TrueValidator, user, request, data, is_extended=True - ).execute() - assert updated_request.legacy_task_order.pdf - - -def test_can_save_draft_with_just_pdf(e_fv_data): - request = RequestFactory.create() - user = UserFactory.create() - data = {"legacy_task_order-pdf": e_fv_data["legacy_task_order-pdf"]} - SaveFinancialVerificationDraft( - TrueValidator, TrueValidator, user, request, data, is_extended=True - ).execute() - - form = GetFinancialVerificationForm(user, request, is_extended=True).execute() - assert form.legacy_task_order.pdf - - -def test_task_order_info_present_in_extended_form(fv_data, e_fv_data): - request = RequestFactory.create() - user = UserFactory.create() - data = { - "legacy_task_order-clin_0001": "1", - "legacy_task_order-number": fv_data["legacy_task_order-number"], - } - SaveFinancialVerificationDraft( - TrueValidator, TrueValidator, user, request, data, is_extended=True - ).execute() - - form = GetFinancialVerificationForm(user, request, is_extended=True).execute() - assert form.legacy_task_order.clin_0001.data - - -def test_update_ignores_empty_values(fv_data, e_fv_data): - request = RequestFactory.create() - user = UserFactory.create() - data = {**fv_data, **e_fv_data, "legacy_task_order-funding_type": ""} - SaveFinancialVerificationDraft( - TrueValidator, TrueValidator, user, request, data, is_extended=True - ).execute() - - -def test_can_save_draft_with_funding_type(fv_data, e_fv_data): - request = RequestFactory.create() - user = UserFactory.create() - data = { - "legacy_task_order-number": fv_data["legacy_task_order-number"], - "legacy_task_order-funding_type": e_fv_data["legacy_task_order-funding_type"], - } - updated_request = SaveFinancialVerificationDraft( - TrueValidator, TrueValidator, user, request, data, is_extended=False - ).execute() - - assert updated_request.legacy_task_order.funding_type - - -def test_update_fv_route(client, user_session, fv_data): - user = UserFactory.create() - request = RequestFactory.create(creator=user) - user_session(user) - response = client.post( - url_for("requests.financial_verification", request_id=request.id), - data=fv_data, - follow_redirects=False, - ) - - assert response.status_code == 200 - - -def test_save_fv_draft_route(client, user_session, fv_data): - user = UserFactory.create() - request = RequestFactory.create(creator=user) - user_session(user) - response = client.post( - url_for("requests.save_financial_verification_draft", request_id=request.id), - data=fv_data, - follow_redirects=True, - ) - - assert response.status_code == 200 - - -def test_get_fv_form_route(client, user_session, fv_data): - user = UserFactory.create() - request = RequestFactory.create(creator=user) - user_session(user) - response = client.get( - url_for("requests.financial_verification", request_id=request.id), - data=fv_data, - follow_redirects=False, - ) - - assert response.status_code == 200 - - -def test_manual_task_order_triggers_extended_form( - client, user_session, fv_data, e_fv_data -): - user = UserFactory.create() - request = RequestFactory.create(creator=user) - - data = {**fv_data, **e_fv_data, "legacy_task_order-number": MANUAL_TO_NUMBER} - - UpdateFinancialVerification( - TrueValidator, TrueValidator, user, request, data, is_extended=True - ).execute() - - user_session(user) - response = client.get( - url_for("requests.financial_verification", request_id=request.id), - data=fv_data, - follow_redirects=False, - ) - assert "extended" in response.headers["Location"] - - -def test_manual_to_does_not_trigger_approval(client, user_session, fv_data, e_fv_data): - user = UserFactory.create() - request = RequestFactory.create(creator=user) - data = { - **fv_data, - **e_fv_data, - "legacy_task_order-number": MANUAL_TO_NUMBER, - "request-pe_id": "0101228N", - } - user_session(user) - client.post( - url_for( - "requests.financial_verification", request_id=request.id, extended=True - ), - data=data, - follow_redirects=True, - ) - - updated_request = RequestsQuery.get(request.id) - assert updated_request.status != RequestStatus.APPROVED - - -def test_eda_task_order_does_trigger_approval(client, user_session, fv_data, e_fv_data): - user = UserFactory.create() - request = RequestFactory.create(creator=user) - data = { - **fv_data, - **e_fv_data, - "legacy_task_order-number": MockEDAClient.MOCK_CONTRACT_NUMBER, - "request-pe_id": "0101228N", - } - user_session(user) - client.post( - url_for( - "requests.financial_verification", request_id=request.id, extended=True - ), - data=data, - follow_redirects=True, - ) - - updated_request = RequestsQuery.get(request.id) - assert updated_request.status == RequestStatus.APPROVED - - -def test_attachment_on_non_extended_form(client, user_session, fv_data, e_fv_data): - user = UserFactory.create() - request = RequestFactory.create(creator=user) - data = { - **fv_data, - **e_fv_data, - "legacy_task_order-number": MockEDAClient.MOCK_CONTRACT_NUMBER, - "request-pe_id": "0101228N", - } - user_session(user) - client.post( - url_for( - "requests.financial_verification", request_id=request.id, extended=True - ), - data=data, - follow_redirects=True, - ) - - response = client.get( - url_for("requests.financial_verification", request_id=request.id) - ) - - assert response.status_code == 200 - - -def test_task_order_number_persists_in_form(fv_data, e_fv_data): - user = UserFactory.create() - request = RequestFactory.create(creator=user) - data = { - **fv_data, - "legacy_task_order-number": MANUAL_TO_NUMBER, - "request-pe_id": "0101228N", - } - - try: - UpdateFinancialVerification( - TrueValidator, FalseValidator, user, request, data, is_extended=False - ).execute() - except FormValidationError: - pass - - form = GetFinancialVerificationForm(user, request, is_extended=True).execute() - assert form.legacy_task_order.number.data == MANUAL_TO_NUMBER - - -def test_can_submit_once_to_details_are_entered(fv_data, e_fv_data): - user = UserFactory.create() - request = RequestFactory.create(creator=user) - data = { - **fv_data, - "legacy_task_order-number": MANUAL_TO_NUMBER, - "request-pe_id": "0101228N", - } - - try: - UpdateFinancialVerification( - TrueValidator, FalseValidator, user, request, data, is_extended=False - ).execute() - except FormValidationError: - pass - - data = { - **fv_data, - **e_fv_data, - "legacy_task_order-number": MANUAL_TO_NUMBER, - "request-pe_id": "0101228N", - } - assert UpdateFinancialVerification( - TrueValidator, TrueValidator, user, request, data, is_extended=True - ).execute() - - -def test_existing_task_order_with_pdf(fv_data, e_fv_data, client, user_session): - # Use finver route to create initial TO #1, complete with PDF - user = UserFactory.create() - request = RequestFactory.create(creator=user) - data = {**fv_data, **e_fv_data, "legacy_task_order-number": MANUAL_TO_NUMBER} - UpdateFinancialVerification( - TrueValidator, TaskOrderNumberValidator(), user, request, data, is_extended=True - ).execute() - - # Save draft on a new finver form, but with same number as TO #1 - user = UserFactory.create() - request = RequestFactory.create(creator=user) - data = {"legacy_task_order-number": MANUAL_TO_NUMBER} - SaveFinancialVerificationDraft( - TrueValidator, - TaskOrderNumberValidator(), - user, - request, - data, - is_extended=False, - ).execute() - - # Get finver form - user_session(user) - response = client.get( - url_for("requests.financial_verification", request_id=request.id), - follow_redirects=True, - ) - - assert response.status_code == 200 - - -def test_pdf_clearing(fv_data, e_fv_data, pdf_upload, pdf_upload2): - user = UserFactory.create() - request = RequestFactory.create(creator=user) - data = {**fv_data, **e_fv_data, "legacy_task_order-pdf": pdf_upload} - - SaveFinancialVerificationDraft( - TrueValidator, TrueValidator, user, request, data, is_extended=True - ).execute() - - data = {**data, "legacy_task_order-pdf": pdf_upload2} - UpdateFinancialVerification( - TrueValidator, TrueValidator, user, request, data, is_extended=True - ).execute() - - form = GetFinancialVerificationForm(user, request, is_extended=True).execute() - assert form.legacy_task_order.pdf.data == pdf_upload2.filename - - -# TODO: This test manages an edge case for our current non-unique handling of -# task orders. Because two requests can reference the same task order but we -# only record one request ID on the PDF attachment, multiple task -# orders/requests reference the same task order but only one of them is noted -# in the related attachment entity. I have changed the handling in -# FinancialVerificationBase#_get_form to be more generous in how it finds the -# PDF filename and prepopulates the form data with that name. -def test_always_derives_pdf_filename(fv_data, e_fv_data, pdf_upload): - user = UserFactory.create() - request_one = RequestFactory.create(creator=user) - attachment = Attachment.attach( - pdf_upload, resource="legacy_task_order", resource_id=request_one.id - ) - legacy_task_order = LegacyTaskOrderFactory.create(pdf=attachment) - request_two = RequestFactory.create( - creator=user, legacy_task_order=legacy_task_order - ) - - form_one = GetFinancialVerificationForm( - user, request_one, is_extended=True - ).execute() - form_two = GetFinancialVerificationForm( - user, request_two, is_extended=True - ).execute() - - assert form_one.legacy_task_order.pdf.data == attachment.filename - assert form_two.legacy_task_order.pdf.data == attachment.filename diff --git a/tests/routes/requests/test_requests_index.py b/tests/routes/requests/test_requests_index.py deleted file mode 100644 index 6243bd00..00000000 --- a/tests/routes/requests/test_requests_index.py +++ /dev/null @@ -1,28 +0,0 @@ -from flask import url_for - -from atst.routes.requests.index import RequestsIndex -from tests.factories import RequestFactory, UserFactory -from atst.domain.requests import Requests - - -def test_action_required_mission_owner(): - creator = UserFactory.create() - requests = RequestFactory.create_batch(5, creator=creator) - Requests.submit(requests[0]) - Requests.approve_and_create_portfolio(requests[1]) - - context = RequestsIndex(creator).execute() - - assert context["requests"][0]["action_required"] == False - - -def test_action_required_ccpo(): - creator = UserFactory.create() - requests = RequestFactory.create_batch(5, creator=creator) - Requests.submit(requests[0]) - Requests.approve_and_create_portfolio(requests[1]) - - ccpo = UserFactory.from_atat_role("ccpo") - context = RequestsIndex(ccpo).execute() - - assert context["num_action_required"] == 1 diff --git a/tests/test_integration.py b/tests/test_integration.py deleted file mode 100644 index 152e960b..00000000 --- a/tests/test_integration.py +++ /dev/null @@ -1,84 +0,0 @@ -import pytest -from urllib.parse import urlencode -from .factories import UserFactory, RequestFactory - -from atst.routes.requests.jedi_request_flow import JEDIRequestFlow -from atst.models.request_status_event import RequestStatus -from atst.domain.requests import Requests - - -@pytest.fixture -def screens(app): - return JEDIRequestFlow(3).screens - - -def serialize_dates(data): - if not data: - return data - - dates = { - k: v.strftime("%m/%d/%Y") for k, v in data.items() if hasattr(v, "strftime") - } - - new_data = data.copy() - new_data.update(dates) - - return new_data - - -def test_stepthrough_request_form(user_session, screens, client): - user = UserFactory.create() - user_session(user) - mock_request = RequestFactory.create() - mock_body = mock_request.body - - def post_form(url, redirects=False, data=""): - return client.post( - url, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data=data, - follow_redirects=redirects, - ) - - def take_a_step(inc, req=None, data=None): - req_url = "/requests/new/{}".format(inc) - if req: - req_url += "/" + req - # we do it twice, with and without redirect, in order to get the - # destination url - prelim_resp = post_form(req_url, data=data) - response = post_form(req_url, True, data=data) - assert prelim_resp.status_code == 302 - return (prelim_resp.headers.get("Location"), response) - - # GET the initial form - response = client.get("/requests/new/1") - assert screens[0]["title"] in response.data.decode() - - # POST to each of the form pages up until review and submit - req_id = None - for i in range(1, len(screens)): - # get appropriate form data to POST for this section - section = screens[i - 1]["section"] - massaged = serialize_dates(mock_body[section]) - post_data = urlencode(massaged) - - effective_url, resp = take_a_step(i, req=req_id, data=post_data) - req_id = effective_url.split("/")[-1] - screen_title = screens[i]["title"].replace("&", "&") - - assert "/requests/new/{}/{}".format(i + 1, req_id) in effective_url - assert screen_title in resp.data.decode() - - # at this point, the real request we made and the mock_request bodies - # should be equivalent - assert Requests.get(user, req_id).body == mock_body - - # finish the review and submit step - client.post( - "/requests/submit/{}".format(req_id), - headers={"Content-Type": "application/x-www-form-urlencoded"}, - ) - - finished_request = Requests.get(user, req_id) - assert finished_request.status == RequestStatus.PENDING_CCPO_ACCEPTANCE