Merge pull request #387 from dod-ccpo/save-finver-draft

Save Financial Verification Draft
This commit is contained in:
richard-dds
2018-10-29 10:26:30 -04:00
committed by GitHub
19 changed files with 1104 additions and 629 deletions

View File

@@ -0,0 +1,74 @@
import re
from atst.domain.task_orders import TaskOrders
from atst.domain.pe_numbers import PENumbers
from atst.domain.exceptions import NotFoundError
class PENumberValidator(object):
PE_REGEX = re.compile(
r"""
(0?\d) # program identifier
(0?\d) # category
(\d) # activity
(\d+) # sponsor element
(.+) # service
""",
re.X,
)
def validate(self, request, field):
if field.errors:
return False
if self._same_as_previous(request, field.data):
return True
try:
PENumbers.get(field.data)
except NotFoundError:
self._apply_error(field)
return False
return True
def suggest_pe_id(self, pe_id):
suggestion = pe_id
match = self.PE_REGEX.match(pe_id)
if match:
(program, category, activity, sponsor, service) = match.groups()
if len(program) < 2:
program = "0" + program
if len(category) < 2:
category = "0" + category
suggestion = "".join((program, category, activity, sponsor, service))
if suggestion != pe_id:
return suggestion
return None
def _same_as_previous(self, request, pe_id):
return request.pe_number == pe_id
def _apply_error(self, field):
suggestion = self.suggest_pe_id(field.data)
error_str = (
"We couldn't find that PE number. {}"
"If you have double checked it you can submit anyway. "
"Your request will need to go through a manual review."
).format('Did you mean "{}"? '.format(suggestion) if suggestion else "")
field.errors += (error_str,)
class TaskOrderNumberValidator(object):
def validate(self, field):
try:
TaskOrders.get(field.data)
except NotFoundError:
self._apply_error(field)
return False
return True
def _apply_error(self, field):
field.errors += ("Task Order number not found",)

View File

@@ -1,7 +1,5 @@
from werkzeug.datastructures import FileStorage
import dateutil
from atst.domain.task_orders import TaskOrders
from atst.domain.workspaces import Workspaces
from atst.models.request_revision import RequestRevision
from atst.models.request_status_event import RequestStatusEvent, RequestStatus
@@ -76,14 +74,15 @@ class Requests(object):
@classmethod
def update(cls, request_id, request_delta):
request = RequestsQuery.get_with_lock(request_id)
return Requests._update(request, request_delta)
@classmethod
def _update(cls, request, request_delta):
new_body = deep_merge(request_delta, request.body)
revision = create_revision_from_request_body(new_body)
request.revisions.append(revision)
request = RequestsQuery.add_and_commit(request)
return request
return RequestsQuery.add_and_commit(request)
@classmethod
def approve_and_create_workspace(cls, request):
@@ -156,35 +155,12 @@ class Requests(object):
return Requests.status_count(RequestStatus.APPROVED)
@classmethod
def update_financial_verification(cls, request_id, financial_data):
def update_financial_verification(cls, request_id, financial_data, task_order=None):
request = RequestsQuery.get_with_lock(request_id)
request_data = financial_data.copy()
task_order_data = {
k: request_data.pop(k)
for (k, v) in financial_data.items()
if k in TaskOrders.TASK_ORDER_DATA
}
if task_order_data:
task_order_number = request_data.pop("task_order_number")
else:
task_order_number = request_data.get("task_order_number")
if "task_order" in request_data and isinstance(
request_data["task_order"], FileStorage
):
task_order_data["pdf"] = request_data.pop("task_order")
task_order = TaskOrders.get_or_create_task_order(
task_order_number, task_order_data
)
if task_order:
request.task_order = task_order
request = Requests.update(request.id, {"financial_verification": request_data})
request = Requests._update(request, {"financial_verification": financial_data})
return request
@classmethod

View File

@@ -2,9 +2,9 @@ from sqlalchemy.orm.exc import NoResultFound
from flask import current_app as app
from atst.database import db
from atst.models.task_order import TaskOrder, Source
from atst.models.attachment import Attachment
from atst.models.task_order import TaskOrder, Source, FundingType
from .exceptions import NotFoundError
from atst.utils import update_obj
class TaskOrders(object):
@@ -18,25 +18,28 @@ class TaskOrders(object):
)
except NoResultFound:
if TaskOrders._client():
task_order = TaskOrders._get_from_eda(order_number)
task_order = TaskOrders.get_from_eda(order_number)
else:
raise NotFoundError("task_order")
return task_order
@classmethod
def _get_from_eda(cls, order_number):
def get_from_eda(cls, order_number):
to_data = TaskOrders._client().get_contract(order_number, status="y")
if to_data:
# TODO: we need to determine exactly what we're getting and storing from the EDA client
return TaskOrders.create(source=Source.EDA, **to_data)
return TaskOrders.create(
source=Source.EDA, funding_type=FundingType.PROC, **to_data
)
else:
raise NotFoundError("task_order")
@classmethod
def create(cls, **kwargs):
task_order = TaskOrder(**kwargs)
def create(cls, source=Source.MANUAL, **kwargs):
to_data = {k: v for k, v in kwargs.items() if v not in ["", None]}
task_order = TaskOrder(source=source, **to_data)
db.session.add(task_order)
db.session.commit()
@@ -48,18 +51,8 @@ class TaskOrders(object):
return app.eda_client
@classmethod
def get_or_create_task_order(cls, number, task_order_data=None):
try:
return TaskOrders.get(number)
except NotFoundError:
if task_order_data:
pdf_file = task_order_data.pop("pdf")
# should catch the error here
attachment = Attachment.attach(pdf_file)
return TaskOrders.create(
**task_order_data,
number=number,
source=Source.MANUAL,
pdf=attachment,
)
def update(cls, task_order, dct):
updated = update_obj(task_order, dct, ignore_vals=lambda v: v in ["", None])
db.session.add(updated)
db.session.commit()
return updated

6
atst/forms/exceptions.py Normal file
View File

@@ -0,0 +1,6 @@
class FormValidationError(Exception):
message = "Form validation failed."
def __init__(self, form):
self.form = form

View File

@@ -1,171 +1,77 @@
import re
import pendulum
from wtforms.fields.html5 import DateField, EmailField
from wtforms.fields import StringField, FileField
from wtforms.validators import InputRequired, Email, Regexp
from wtforms.fields import StringField, FileField, FormField
from wtforms.validators import InputRequired, Email, Regexp, Optional
from flask_wtf.file import FileAllowed
from atst.domain.exceptions import NotFoundError
from atst.domain.pe_numbers import PENumbers
from atst.domain.task_orders import TaskOrders
from .fields import NewlineListField, SelectField, NumberStringField
from .forms import ValidatedForm
from atst.forms.forms import ValidatedForm
from .data import FUNDING_TYPES
from .validators import DateRange
PE_REGEX = re.compile(
r"""
(0?\d) # program identifier
(0?\d) # category
(\d) # activity
(\d+) # sponsor element
(.+) # service
""",
re.X,
)
TREASURY_CODE_REGEX = re.compile(r"^0*([1-9]{4}|[1-9]{6})$")
BA_CODE_REGEX = re.compile(r"[0-9]{2}\w?$")
def suggest_pe_id(pe_id):
suggestion = pe_id
match = PE_REGEX.match(pe_id)
if match:
(program, category, activity, sponsor, service) = match.groups()
if len(program) < 2:
program = "0" + program
if len(category) < 2:
category = "0" + category
suggestion = "".join((program, category, activity, sponsor, service))
if suggestion != pe_id:
return suggestion
return None
def validate_pe_id(field, existing_request):
try:
PENumbers.get(field.data)
except NotFoundError:
suggestion = suggest_pe_id(field.data)
error_str = (
"We couldn't find that PE number. {}"
"If you have double checked it you can submit anyway. "
"Your request will need to go through a manual review."
).format('Did you mean "{}"? '.format(suggestion) if suggestion else "")
field.errors += (error_str,)
return False
return True
def validate_task_order_number(field):
try:
TaskOrders.get(field.data)
except NotFoundError:
field.errors += ("Task Order number not found",)
return False
return True
def number_to_int(num):
if num:
return int(num)
class BaseFinancialForm(ValidatedForm):
def reset(self):
"""
Reset UII info so that it can be de-parsed rendered properly.
This is a stupid workaround, and there's probably a better way.
"""
self.uii_ids.process_data(self.uii_ids.data)
def coerce_choice(val):
if val is None:
return None
elif isinstance(val, str):
return val
else:
return val.value
class DraftValidateMixin(object):
def validate_draft(self):
"""
Make all fields optional before validation, and then return them to
their previous state.
"""
for field in self:
field.validators.insert(0, Optional())
valid = self.validate()
for field in self:
field.validators.pop(0)
def perform_extra_validation(self, existing_request):
valid = True
if not existing_request or existing_request.get("pe_id") != self.pe_id.data:
valid = validate_pe_id(self.pe_id, existing_request)
return valid
@property
def is_missing_task_order_number(self):
return False
task_order_number = StringField(
class TaskOrderForm(ValidatedForm, DraftValidateMixin):
def do_validate_number(self):
for field in self:
if field.name != "task_order-number":
field.validators.insert(0, Optional())
valid = super().validate()
for field in self:
if field.name != "task_order-number":
field.validators.pop(0)
return valid
number = StringField(
"Task Order Number associated with this request",
description="Include the original Task Order number (including the 000X at the end). Do not include any modification numbers. Note that there may be a lag between approving a task order and when it becomes available in our system.",
validators=[InputRequired()],
)
uii_ids = NewlineListField(
"Unique Item Identifier (UII)s related to your application(s) if you already have them.",
description="If you have more than one UII, place each one on a new line.",
)
pe_id = StringField(
"Program Element Number",
description="PE numbers help the Department of Defense identify which offices' budgets are contributing towards this resource use. <br/><em>It should be 7 digits followed by 1-3 letters, and should have a zero as the first and third digits.</em>",
validators=[InputRequired()],
)
treasury_code = StringField(
"Program Treasury Code",
description="Program Treasury Code (or Appropriations Code) identifies resource types. <br/> <em>It should be a four digit or six digit number, optionally prefixed by one or more zeros.</em>",
validators=[InputRequired(), Regexp(TREASURY_CODE_REGEX)],
)
ba_code = StringField(
"Program Budget Activity (BA) Code",
description="BA Code is used to identify the purposes, projects, or types of activities financed by the appropriation fund. <br/><em>It should be two digits, followed by an optional letter.</em>",
validators=[InputRequired(), Regexp(BA_CODE_REGEX)],
)
fname_co = StringField("KO First Name", validators=[InputRequired()])
lname_co = StringField("KO Last Name", validators=[InputRequired()])
email_co = EmailField("KO Email", validators=[InputRequired(), Email()])
office_co = StringField("KO Office", validators=[InputRequired()])
fname_cor = StringField("COR First Name", validators=[InputRequired()])
lname_cor = StringField("COR Last Name", validators=[InputRequired()])
email_cor = EmailField("COR Email", validators=[InputRequired(), Email()])
office_cor = StringField("COR Office", validators=[InputRequired()])
class FinancialForm(BaseFinancialForm):
def perform_extra_validation(self, existing_request):
previous_valid = super().perform_extra_validation(existing_request)
task_order_valid = validate_task_order_number(self.task_order_number)
return previous_valid and task_order_valid
@property
def is_missing_task_order_number(self):
return "task_order_number" in self.errors
@property
def is_only_missing_task_order_number(self):
return "task_order_number" in self.errors and len(self.errors) == 1
class ExtendedFinancialForm(BaseFinancialForm):
def validate(self, *args, **kwargs):
if self.funding_type.data == "OTHER":
self.funding_type_other.validators.append(InputRequired())
return super().validate(*args, **kwargs)
funding_type = SelectField(
description="What is the source of funding?",
choices=FUNDING_TYPES,
validators=[InputRequired()],
coerce=coerce_choice,
render_kw={"required": False},
)
@@ -227,10 +133,114 @@ class ExtendedFinancialForm(BaseFinancialForm):
filters=[number_to_int],
)
task_order = FileField(
pdf = FileField(
"Upload a copy of your Task Order",
validators=[
FileAllowed(["pdf"], "Only PDF documents can be uploaded."),
InputRequired(),
],
render_kw={"required": False},
)
class RequestFinancialVerificationForm(ValidatedForm, DraftValidateMixin):
uii_ids = NewlineListField(
"Unique Item Identifier (UII)s related to your application(s) if you already have them.",
description="If you have more than one UII, place each one on a new line.",
)
pe_id = StringField(
"Program Element Number",
description="PE numbers help the Department of Defense identify which offices' budgets are contributing towards this resource use. <br/><em>It should be 7 digits followed by 1-3 letters, and should have a zero as the first and third digits.</em>",
validators=[InputRequired()],
)
treasury_code = StringField(
"Program Treasury Code",
description="Program Treasury Code (or Appropriations Code) identifies resource types. <br/> <em>It should be a four digit or six digit number, optionally prefixed by one or more zeros.</em>",
validators=[InputRequired(), Regexp(TREASURY_CODE_REGEX)],
)
ba_code = StringField(
"Program Budget Activity (BA) Code",
description="BA Code is used to identify the purposes, projects, or types of activities financed by the appropriation fund. <br/><em>It should be two digits, followed by an optional letter.</em>",
validators=[InputRequired(), Regexp(BA_CODE_REGEX)],
)
fname_co = StringField("KO First Name", validators=[InputRequired()])
lname_co = StringField("KO Last Name", validators=[InputRequired()])
email_co = EmailField("KO Email", validators=[InputRequired(), Email()])
office_co = StringField("KO Office", validators=[InputRequired()])
fname_cor = StringField("COR First Name", validators=[InputRequired()])
lname_cor = StringField("COR Last Name", validators=[InputRequired()])
email_cor = EmailField("COR Email", validators=[InputRequired(), Email()])
office_cor = StringField("COR Office", validators=[InputRequired()])
def reset(self):
"""
Reset UII info so that it can be de-parsed rendered properly.
This is a stupid workaround, and there's probably a better way.
"""
self.uii_ids.process_data(self.uii_ids.data)
class FinancialVerificationForm(ValidatedForm):
task_order = FormField(TaskOrderForm)
request = FormField(RequestFinancialVerificationForm)
def validate(self, *args, **kwargs):
if not kwargs.get("is_extended", True):
return self.do_validate_request()
if self.task_order.funding_type.data == "OTHER":
self.task_order.funding_type_other.validators.append(InputRequired())
to_pdf_validators = None
if kwargs.get("has_attachment"):
to_pdf_validators = list(self.task_order.pdf.validators)
self.task_order.pdf.validators = []
valid = super().validate()
if to_pdf_validators:
self.task_order.pdf.validators = to_pdf_validators
return valid
def do_validate_request(self):
"""
Called do_validate_request to avoid being considered an inline
validator by wtforms.
"""
request_valid = self.request.validate(self)
task_order_valid = self.task_order.do_validate_number()
return request_valid and task_order_valid
def validate_draft(self):
return self.task_order.validate_draft() and self.request.validate_draft()
def reset(self):
self.request.reset()
@property
def pe_id(self):
return self.request.pe_id
@property
def task_order_number(self):
return self.task_order.number
@property
def is_missing_task_order_number(self):
return "number" in self.errors.get("task_order", {})
@property
def is_only_missing_task_order_number(self):
return "task_order_number" in self.errors and len(self.errors) == 1

View File

@@ -1,9 +1,12 @@
from sqlalchemy import Column, String
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm.exc import NoResultFound
from flask import current_app as app
from atst.models import Base, types, mixins
from atst.database import db
from atst.uploader import UploadError
from atst.domain.exceptions import NotFoundError
class AttachmentError(Exception):
@@ -16,20 +19,56 @@ class Attachment(Base, mixins.TimestampsMixin):
id = types.Id()
filename = Column(String, nullable=False)
object_name = Column(String, unique=True, nullable=False)
resource = Column(String)
resource_id = Column(UUID(as_uuid=True), index=True)
@classmethod
def attach(cls, fyle):
def attach(cls, fyle, resource=None, resource_id=None):
try:
filename, object_name = app.uploader.upload(fyle)
except UploadError as e:
raise AttachmentError("Could not add attachment. " + str(e))
attachment = Attachment(filename=filename, object_name=object_name)
attachment = Attachment(
filename=filename,
object_name=object_name,
resource=resource,
resource_id=resource_id,
)
db.session.add(attachment)
db.session.commit()
return attachment
@classmethod
def get(cls, id_):
try:
return db.session.query(Attachment).filter_by(id=id_).one()
except NoResultFound:
raise NotFoundError("attachment")
@classmethod
def get_for_resource(cls, resource, resource_id):
try:
return (
db.session.query(Attachment)
.filter_by(resource=resource, resource_id=resource_id)
.one()
)
except NoResultFound:
raise NotFoundError("attachment")
@classmethod
def delete_for_resource(cls, resource, resource_id):
try:
return (
db.session.query(Attachment)
.filter_by(resource=resource, resource_id=resource_id)
.delete()
)
except NoResultFound:
raise NotFoundError("attachment")
def __repr__(self):
return "<Attachment(name='{}', id='{}')>".format(self.filename, self.id)

View File

@@ -1,26 +1,14 @@
from sqlalchemy import event
from flask import g
import re
from atst.models.audit_event import AuditEvent
from atst.utils import camel_to_snake, getattr_path
ACTION_CREATE = "create"
ACTION_UPDATE = "update"
ACTION_DELETE = "delete"
def getattr_path(obj, path, default=None):
_obj = obj
for item in path.split("."):
_obj = getattr(_obj, item, default)
return _obj
def camel_to_snake(camel_cased):
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", camel_cased)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
class AuditableMixin(object):
@staticmethod
def create_audit_event(connection, resource, action):

View File

@@ -6,6 +6,7 @@ from atst.models import Base, types, mixins
from atst.models.request_status_event import RequestStatus
from atst.utils import first_or_none
from atst.models.request_revision import RequestRevision
from atst.models.task_order import Source as TaskOrderSource
def map_properties_to_dict(properties, instance):
@@ -135,7 +136,7 @@ class Request(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
@property
def financial_verification(self):
return self.body.get("financial_verification")
return self.body.get("financial_verification", {})
@property
def is_financially_verified(self):
@@ -224,6 +225,18 @@ class Request(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
def contracting_officer_email(self):
return self.latest_revision.email_co
@property
def pe_number(self):
return self.body.get("financial_verification", {}).get("pe_id")
@property
def has_manual_task_order(self):
return (
self.task_order.source == TaskOrderSource.MANUAL
if self.task_order is not None
else None
)
def __repr__(self):
return "<Request(status='{}', name='{}', creator='{}', is_approved='{}', time_created='{}', id='{}')>".format(
self.status_displayname,

View File

@@ -1,134 +1,275 @@
from flask import g, render_template, redirect, url_for
from flask import request as http_request
from werkzeug.datastructures import ImmutableMultiDict, FileStorage
from . import requests_bp
from atst.domain.requests import Requests
from atst.forms.financial import FinancialForm, ExtendedFinancialForm
from atst.forms.financial import FinancialVerificationForm
from atst.forms.exceptions import FormValidationError
from atst.domain.exceptions import NotFoundError
from atst.domain.requests.financial_verification import (
PENumberValidator,
TaskOrderNumberValidator,
)
from atst.models.attachment import Attachment
from atst.domain.task_orders import TaskOrders
class FinancialVerification:
def __init__(self, request, extended=False, post_data=None):
def fv_extended(_http_request):
return _http_request.args.get("extended", "false").lower() in ["true", "t"]
class FinancialVerification(object):
def __init__(self, request):
self.request = request.latest_revision
self.task_order = request.task_order
class FinancialVerificationBase(object):
def _get_form(self, request, is_extended, formdata=None):
_formdata = ImmutableMultiDict(formdata) if formdata is not None else None
fv = FinancialVerification(request)
form = FinancialVerificationForm(obj=fv, formdata=_formdata)
if is_extended:
try:
attachment = Attachment.get_for_resource("task_order", self.request.id)
form.task_order.pdf.data = attachment.filename
except NotFoundError:
pass
return form
def _process_attachment(self, is_extended, form):
attachment = None
if is_extended:
attachment = None
if isinstance(form.task_order.pdf.data, FileStorage):
Attachment.delete_for_resource("task_order", self.request.id)
attachment = Attachment.attach(
form.task_order.pdf.data, "task_order", self.request.id
)
elif isinstance(form.task_order.pdf.data, str):
try:
attachment = Attachment.get_for_resource(
"task_order", self.request.id
)
except NotFoundError:
pass
if attachment:
form.task_order.pdf.data = attachment.filename
return attachment
def _try_create_task_order(self, form, attachment, is_extended):
task_order_number = form.task_order.number.data
if not task_order_number:
return None
task_order_data = form.task_order.data
if attachment:
task_order_data["pdf"] = attachment
try:
task_order = TaskOrders.get(task_order_number)
task_order = TaskOrders.update(task_order, task_order_data)
return task_order
except NotFoundError:
pass
try:
return TaskOrders.get_from_eda(task_order_number)
except NotFoundError:
pass
return TaskOrders.create(**task_order_data)
def _raise(self, form):
form.reset()
raise FormValidationError(form)
class GetFinancialVerificationForm(FinancialVerificationBase):
def __init__(self, user, request, is_extended=False):
self.user = user
self.request = request
self._extended = extended
self._post_data = post_data
self._form = None
self.reset()
self.is_extended = is_extended
def reset(self):
self._updateable = False
self._valid = False
self.workspace = None
if self._form:
self._form.reset()
def execute(self):
form = self._get_form(self.request, self.is_extended)
return form
@property
def is_extended(self):
return self._extended or self.is_pending_changes
@property
def is_pending_changes(self):
return self.request.is_pending_financial_verification_changes
class UpdateFinancialVerification(FinancialVerificationBase):
def __init__(
self,
pe_validator,
task_order_validator,
user,
request,
fv_data,
is_extended=False,
):
self.pe_validator = pe_validator
self.task_order_validator = task_order_validator
self.user = user
self.request = request
self.fv_data = fv_data
self.is_extended = is_extended
@property
def _task_order_data(self):
if self.request.task_order:
task_order = self.request.task_order
data = task_order.to_dictionary()
data["task_order_number"] = task_order.number
data["funding_type"] = task_order.funding_type.value
return data
else:
return {}
def execute(self):
form = self._get_form(self.request, self.is_extended, self.fv_data)
@property
def _form_data(self):
if self._post_data:
return self._post_data
else:
form_data = self.request.body.get("financial_verification", {})
form_data.update(self._task_order_data)
should_update = True
should_submit = True
updated_request = None
return form_data
attachment = self._process_attachment(self.is_extended, form)
@property
def form(self):
if not self._form:
if self.is_extended:
self._form = ExtendedFinancialForm(data=self._form_data)
else:
self._form = FinancialForm(data=self._form_data)
if not form.validate(is_extended=self.is_extended, has_attachment=attachment):
should_update = False
return self._form
if not self.pe_validator.validate(self.request, form.pe_id):
should_submit = False
def validate(self):
if self.form.validate():
self._updateable = True
self._valid = self.form.perform_extra_validation(
self.request.body.get("financial_verification")
if not self.task_order_validator.validate(form.task_order.number):
should_submit = False
if should_update:
task_order = self._try_create_task_order(form, attachment, self.is_extended)
updated_request = Requests.update_financial_verification(
self.request.id, form.request.data, task_order=task_order
)
if should_submit:
return Requests.submit_financial_verification(updated_request)
self._raise(form)
class SaveFinancialVerificationDraft(FinancialVerificationBase):
def __init__(
self,
pe_validator,
task_order_validator,
user,
request,
fv_data,
is_extended=False,
):
self.pe_validator = pe_validator
self.task_order_validator = task_order_validator
self.user = user
self.request = request
self.fv_data = fv_data
self.is_extended = is_extended
def execute(self):
form = self._get_form(self.request, self.is_extended, self.fv_data)
valid = True
if not form.validate_draft():
self._raise(form)
if not self.pe_validator.validate(self.request, form.pe_id):
valid = False
if form.task_order.number.data and not self.task_order_validator.validate(
form.task_order.number
):
valid = False
attachment = self._process_attachment(self.is_extended, form)
task_order = self._try_create_task_order(form, attachment, self.is_extended)
updated_request = Requests.update_financial_verification(
self.request.id, form.request.data, task_order=task_order
)
if valid:
return updated_request
else:
self._updateable = False
self._valid = False
return self._valid
@property
def pending(self):
return self.request.is_pending_ccpo_approval
def finalize(self):
if self._updateable:
self.request = Requests.update_financial_verification(
self.request.id, self.form.data
)
if self._valid:
self.request = Requests.submit_financial_verification(self.request)
if self.request.is_financially_verified:
self.workspace = Requests.approve_and_create_workspace(self.request)
self._raise(form)
@requests_bp.route("/requests/verify/<string:request_id>", methods=["GET"])
def financial_verification(request_id):
request = Requests.get(g.current_user, request_id)
finver = FinancialVerification(request, extended=http_request.args.get("extended"))
is_extended = fv_extended(http_request)
should_be_extended = not is_extended and request.has_manual_task_order
if should_be_extended:
return redirect(
url_for(".financial_verification", request_id=request_id, extended=True)
)
form = GetFinancialVerificationForm(
g.current_user, request, is_extended=is_extended
).execute()
return render_template(
"requests/financial_verification.html",
f=finver.form,
jedi_request=finver.request,
review_comment=finver.request.review_comment,
extended=finver.is_extended,
f=form,
jedi_request=request,
review_comment=request.review_comment,
extended=is_extended,
)
@requests_bp.route("/requests/verify/<string:request_id>", methods=["POST"])
def update_financial_verification(request_id):
request = Requests.get(g.current_user, request_id)
finver = FinancialVerification(
request, extended=http_request.args.get("extended"), post_data=http_request.form
)
fv_data = {**http_request.form, **http_request.files}
is_extended = fv_extended(http_request)
finver.validate()
finver.finalize()
if finver.workspace:
return redirect(
url_for(
"workspaces.new_project",
workspace_id=finver.workspace.id,
newWorkspace=True,
)
)
elif finver.pending:
return redirect(url_for("requests.requests_index", modal="pendingCCPOApproval"))
else:
finver.reset()
try:
updated_request = UpdateFinancialVerification(
PENumberValidator(),
TaskOrderNumberValidator(),
g.current_user,
request,
fv_data,
is_extended=is_extended,
).execute()
except FormValidationError as e:
return render_template(
"requests/financial_verification.html",
jedi_request=finver.request,
f=finver.form,
extended=finver.is_extended,
jedi_request=request,
f=e.form,
extended=is_extended,
)
if updated_request.task_order.verified:
workspace = Requests.approve_and_create_workspace(updated_request)
return redirect(
url_for(
"workspaces.new_project", workspace_id=workspace.id, newWorkspace=True
)
)
else:
return redirect(url_for("requests.requests_index", modal="pendingCCPOApproval"))
@requests_bp.route("/requests/verify/<string:request_id>/draft", methods=["POST"])
def save_financial_verification_draft(request_id):
request = Requests.get(g.current_user, request_id)
fv_data = {**http_request.form, **http_request.files}
is_extended = fv_extended(http_request)
try:
SaveFinancialVerificationDraft(
PENumberValidator(),
TaskOrderNumberValidator(),
g.current_user,
request,
fv_data,
is_extended=is_extended,
).execute()
except FormValidationError as e:
return render_template(
"requests/financial_verification.html",
jedi_request=request,
f=e.form,
extended=is_extended,
)
return redirect(url_for("requests.requests_index"))

View File

@@ -1,3 +1,6 @@
import re
def first_or_none(predicate, lst):
return next((x for x in lst if predicate(x)), None)
@@ -18,3 +21,35 @@ def deep_merge(source, destination: dict):
return b
return _deep_merge(source, dict(destination))
def getattr_path(obj, path, default=None):
_obj = obj
for item in path.split("."):
if isinstance(_obj, dict):
_obj = _obj.get(item)
else:
_obj = getattr(_obj, item, default)
return _obj
def update_obj(obj, dct, ignore_vals=lambda v: v is None):
for k, v in dct.items():
if hasattr(obj, k) and not ignore_vals(v):
setattr(obj, k, v)
return obj
def camel_to_snake(camel_cased):
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", camel_cased)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
def drop(keys, dct):
_keys = set(keys)
return {k: v for k, v in dct.items() if k not in _keys}
def pick(keys, dct):
_keys = set(keys)
return {k: v for (k, v) in dct.items() if k in _keys}