Remove Requests domain classes

This commit is contained in:
Patrick Smith 2019-02-20 15:37:04 -05:00
parent c8a139a941
commit 6fb333acb9
9 changed files with 0 additions and 847 deletions

View File

@ -1 +0,0 @@
from .requests import Requests, create_revision_from_request_body

View File

@ -1,29 +0,0 @@
from atst.models.permissions import Permissions
from atst.domain.authz import Authorization
from atst.domain.exceptions import UnauthorizedError
class RequestsAuthorization(object):
def __init__(self, user, request):
self.user = user
self.request = request
@property
def can_view(self):
return (
Authorization.has_atat_permission(
self.user, Permissions.REVIEW_AND_APPROVE_JEDI_PORTFOLIO_REQUEST
)
or self.request.creator == self.user
)
def check_can_view(self, message):
if not self.can_view:
raise UnauthorizedError(self.user, message)
def check_can_approve(self):
return Authorization.check_atat_permission(
self.user,
Permissions.REVIEW_AND_APPROVE_JEDI_PORTFOLIO_REQUEST,
"cannot review and approve requests",
)

View File

@ -1,74 +0,0 @@
import re
from atst.domain.legacy_task_orders import LegacyTaskOrders
from atst.domain.pe_numbers import PENumbers
from atst.domain.exceptions import NotFoundError
class PENumberValidator(object):
PE_REGEX = re.compile(
r"""
(0?\d) # program identifier
(0?\d) # category
(\d) # activity
(\d+) # sponsor element
(.+) # service
""",
re.X,
)
def validate(self, request, field):
if field.errors:
return False
if self._same_as_previous(request, field.data):
return True
try:
PENumbers.get(field.data)
except NotFoundError:
self._apply_error(field)
return False
return True
def suggest_pe_id(self, pe_id):
suggestion = pe_id
match = self.PE_REGEX.match(pe_id)
if match:
(program, category, activity, sponsor, service) = match.groups()
if len(program) < 2:
program = "0" + program
if len(category) < 2:
category = "0" + category
suggestion = "".join((program, category, activity, sponsor, service))
if suggestion != pe_id:
return suggestion
return None
def _same_as_previous(self, request, pe_id):
return request.pe_number == pe_id
def _apply_error(self, field):
suggestion = self.suggest_pe_id(field.data)
error_str = (
"We couldn't find that PE number. {}"
"If you have double checked it you can submit anyway. "
"Your request will need to go through a manual review."
).format('Did you mean "{}"? '.format(suggestion) if suggestion else "")
field.errors += (error_str,)
class TaskOrderNumberValidator(object):
def validate(self, field):
try:
LegacyTaskOrders.get(field.data)
except NotFoundError:
self._apply_error(field)
return False
return True
def _apply_error(self, field):
field.errors += ("Task Order number not found",)

View File

@ -1,73 +0,0 @@
from sqlalchemy import exists, and_, exc, text
from sqlalchemy.orm.exc import NoResultFound
from atst.database import db
from atst.domain.common import Query
from atst.models.request import Request
from atst.domain.exceptions import NotFoundError
class RequestsQuery(Query):
model = Request
@classmethod
def exists(cls, request_id, creator):
try:
return db.session.query(
exists().where(
and_(Request.id == request_id, Request.creator == creator)
)
).scalar()
except exc.DataError:
return False
@classmethod
def get_many(cls, creator=None):
filters = []
if creator:
filters.append(Request.creator == creator)
requests = (
db.session.query(Request)
.filter(*filters)
.order_by(Request.time_created.desc())
.all()
)
return requests
@classmethod
def get_with_lock(cls, request_id):
try:
# Query for request matching id, acquiring a row-level write lock.
# https://www.postgresql.org/docs/10/static/sql-select.html#SQL-FOR-UPDATE-SHARE
return (
db.session.query(Request)
.filter_by(id=request_id)
.with_for_update(of=Request)
.one()
)
except NoResultFound:
raise NotFoundError("requests")
@classmethod
def status_count(cls, status, creator=None):
bindings = {"status": status.name}
raw = """
SELECT count(requests_with_status.id)
FROM (
SELECT DISTINCT ON (rse.request_id) r.*, rse.new_status as status
FROM request_status_events rse JOIN requests r ON r.id = rse.request_id
ORDER BY rse.request_id, rse.sequence DESC
) as requests_with_status
WHERE requests_with_status.status = :status
"""
if creator:
raw += " AND requests_with_status.user_id = :user_id"
bindings["user_id"] = creator.id
results = db.session.execute(text(raw), bindings).fetchone()
(count,) = results
return count

View File

@ -1,239 +0,0 @@
import dateutil
from atst.domain.portfolios import Portfolios
from atst.models.request_revision import RequestRevision
from atst.models.request_status_event import RequestStatusEvent, RequestStatus
from atst.models.request_review import RequestReview
from atst.models.request_internal_comment import RequestInternalComment
from atst.utils import deep_merge
from atst.queue import queue
from atst.filters import dollars
from .query import RequestsQuery
from .authorization import RequestsAuthorization
from .status_event_handler import RequestStatusEventHandler
def create_revision_from_request_body(body):
body = {k: v for p in body.values() for k, v in p.items()}
DATES = ["start_date", "date_latest_training"]
coerced_timestamps = {
k: dateutil.parser.parse(v)
for k, v in body.items()
if k in DATES and isinstance(v, str)
}
body = {**body, **coerced_timestamps}
return RequestRevision(**body)
class Requests(object):
AUTO_ACCEPT_THRESHOLD = 1_000_000
ANNUAL_SPEND_THRESHOLD = 1_000_000
@classmethod
def create(cls, creator, body):
revision = create_revision_from_request_body(body)
request = RequestsQuery.create(creator=creator, revisions=[revision])
request = Requests.set_status(request, RequestStatus.STARTED)
request = RequestsQuery.add_and_commit(request)
return request
@classmethod
def exists(cls, request_id, creator):
return RequestsQuery.exists(request_id, creator)
@classmethod
def get(cls, user, request_id):
request = RequestsQuery.get(request_id)
RequestsAuthorization(user, request).check_can_view("get request")
return request
@classmethod
def get_for_approval(cls, user, request_id):
request = RequestsQuery.get(request_id)
RequestsAuthorization(user, request).check_can_approve()
return request
@classmethod
def get_many(cls, creator=None):
return RequestsQuery.get_many(creator)
@classmethod
def submit(cls, request):
request = Requests.set_status(request, RequestStatus.SUBMITTED)
if Requests.should_auto_accept(request):
request = Requests.set_status(
request, RequestStatus.PENDING_FINANCIAL_VERIFICATION
)
Requests._add_review(
user=None,
request=request,
review_data={
"comment": "Auto-acceptance for dollar value below {}".format(
dollars(Requests.AUTO_ACCEPT_THRESHOLD)
)
},
)
else:
request = Requests.set_status(
request, RequestStatus.PENDING_CCPO_ACCEPTANCE
)
request = RequestsQuery.add_and_commit(request)
return request
@classmethod
def update(cls, request_id, request_delta):
request = RequestsQuery.get_with_lock(request_id)
return Requests._update(request, request_delta)
@classmethod
def _update(cls, request, request_delta):
new_body = deep_merge(request_delta, request.body)
revision = create_revision_from_request_body(new_body)
request.revisions.append(revision)
return RequestsQuery.add_and_commit(request)
@classmethod
def approve_and_create_portfolio(cls, request):
approved_request = Requests.set_status(request, RequestStatus.APPROVED)
portfolio = Portfolios.create_from_request(approved_request)
RequestsQuery.add_and_commit(approved_request)
return portfolio
@classmethod
def auto_approve_and_create_portfolio(
cls,
request,
reason="Financial verification information found in Electronic Document Access API",
):
portfolio = Requests.approve_and_create_portfolio(request)
Requests._add_review(
user=None, request=request, review_data={"comment": reason}
)
return portfolio
@classmethod
def set_status(cls, request, status: RequestStatus):
old_status = request.status
status_event = RequestStatusEvent(
new_status=status, revision=request.latest_revision
)
request.status_events.append(status_event)
updated_request = RequestsQuery.add_and_commit(request)
RequestStatusEventHandler(queue).handle_status_change(
updated_request, old_status, status
)
return updated_request
@classmethod
def should_auto_accept(cls, request):
try:
dollar_value = request.body["details_of_use"]["dollar_value"]
except KeyError:
return False
return dollar_value < cls.AUTO_ACCEPT_THRESHOLD
_VALID_SUBMISSION_STATUSES = [
RequestStatus.STARTED,
RequestStatus.CHANGES_REQUESTED,
]
@classmethod
def should_allow_submission(cls, request):
all_request_sections = [
"details_of_use",
"information_about_you",
"primary_poc",
]
existing_request_sections = request.body.keys()
return request.status in Requests._VALID_SUBMISSION_STATUSES and all(
section in existing_request_sections for section in all_request_sections
)
@classmethod
def status_count(cls, status, creator=None):
return RequestsQuery.status_count(status, creator)
@classmethod
def in_progress_count(cls):
return sum(
[
Requests.status_count(RequestStatus.STARTED),
Requests.status_count(RequestStatus.PENDING_FINANCIAL_VERIFICATION),
Requests.status_count(RequestStatus.CHANGES_REQUESTED),
]
)
@classmethod
def pending_ccpo_count(cls):
return sum(
[
Requests.status_count(RequestStatus.PENDING_CCPO_ACCEPTANCE),
Requests.status_count(RequestStatus.PENDING_CCPO_APPROVAL),
]
)
@classmethod
def completed_count(cls):
return Requests.status_count(RequestStatus.APPROVED)
@classmethod
def update_financial_verification(
cls, request_id, financial_data, legacy_task_order=None
):
request = RequestsQuery.get_with_lock(request_id)
if legacy_task_order:
request.legacy_task_order = legacy_task_order
request = Requests._update(request, {"financial_verification": financial_data})
return request
@classmethod
def submit_financial_verification(cls, request):
request = Requests.set_status(request, RequestStatus.PENDING_CCPO_APPROVAL)
request = RequestsQuery.add_and_commit(request)
return request
@classmethod
def _add_review(cls, user=None, request=None, review_data=None):
request.latest_status.review = RequestReview(reviewer=user, **review_data)
request = RequestsQuery.add_and_commit(request)
return request
@classmethod
def advance(cls, user, request, review_data):
if request.status == RequestStatus.PENDING_CCPO_ACCEPTANCE:
Requests.set_status(request, RequestStatus.PENDING_FINANCIAL_VERIFICATION)
elif request.status == RequestStatus.PENDING_CCPO_APPROVAL:
Requests.approve_and_create_portfolio(request)
return Requests._add_review(user=user, request=request, review_data=review_data)
@classmethod
def request_changes(cls, user, request, review_data):
if request.status == RequestStatus.PENDING_CCPO_ACCEPTANCE:
Requests.set_status(request, RequestStatus.CHANGES_REQUESTED)
elif request.status == RequestStatus.PENDING_CCPO_APPROVAL:
Requests.set_status(request, RequestStatus.CHANGES_REQUESTED_TO_FINVER)
return Requests._add_review(user=user, request=request, review_data=review_data)
@classmethod
def add_internal_comment(cls, user, request, comment_text):
RequestsAuthorization(user, request).check_can_approve()
comment = RequestInternalComment(request=request, text=comment_text, user=user)
RequestsQuery.add_and_commit(comment)
return request
@classmethod
def possible_statuses(cls):
return [s[1].value for s in RequestStatus.__members__.items()]

View File

@ -1,35 +0,0 @@
from flask import render_template
from atst.models.request_status_event import RequestStatus
class RequestStatusEventHandler(object):
STATUS_TRANSITIONS = set(
[
(
RequestStatus.PENDING_CCPO_ACCEPTANCE,
RequestStatus.PENDING_FINANCIAL_VERIFICATION,
),
(RequestStatus.PENDING_CCPO_ACCEPTANCE, RequestStatus.CHANGES_REQUESTED),
(
RequestStatus.PENDING_CCPO_APPROVAL,
RequestStatus.CHANGES_REQUESTED_TO_FINVER,
),
(RequestStatus.PENDING_CCPO_APPROVAL, RequestStatus.APPROVED),
]
)
def __init__(self, queue):
self.queue = queue
def handle_status_change(self, request, old_status, new_status):
if (old_status, new_status) in self.STATUS_TRANSITIONS:
self._send_email(request)
def _send_email(self, request):
email_body = render_template(
"emails/request_status_change.txt", request=request
)
self.queue.send_mail(
[request.creator.email], "Your JEDI request status has changed", email_body
)

View File

@ -16,7 +16,6 @@ import pendulum
import os
from werkzeug.exceptions import NotFound
from atst.domain.requests import Requests
from atst.domain.users import Users
from atst.domain.authnid import AuthenticationContext
from atst.domain.audit_log import AuditLog

View File

@ -1,273 +0,0 @@
import pytest
from uuid import uuid4
from atst.domain.exceptions import NotFoundError
from atst.domain.requests import Requests
from atst.domain.requests.authorization import RequestsAuthorization
from atst.models.request import Request
from atst.models.request_status_event import RequestStatus
from tests.factories import (
RequestFactory,
UserFactory,
RequestStatusEventFactory,
RequestRevisionFactory,
RequestReviewFactory,
)
@pytest.fixture(scope="function")
def new_request(session):
return RequestFactory.create()
def test_can_get_request():
factory_req = RequestFactory.create()
request = Requests.get(factory_req.creator, factory_req.id)
assert request.id == factory_req.id
def test_nonexistent_request_raises():
a_user = UserFactory.build()
with pytest.raises(NotFoundError):
Requests.get(a_user, uuid4())
def test_new_request_has_started_status():
request = Requests.create(UserFactory.build(), {})
assert request.status == RequestStatus.STARTED
def test_auto_approve_less_than_1m():
new_request = RequestFactory.create(initial_revision={"dollar_value": 999_999})
request = Requests.submit(new_request)
assert request.status == RequestStatus.PENDING_FINANCIAL_VERIFICATION
assert request.reviews
assert request.reviews[0].full_name_reviewer == "System"
def test_dont_auto_approve_if_dollar_value_is_1m_or_above():
new_request = RequestFactory.create(initial_revision={"dollar_value": 1_000_000})
request = Requests.submit(new_request)
assert request.status == RequestStatus.PENDING_CCPO_ACCEPTANCE
def test_dont_auto_approve_if_no_dollar_value_specified():
new_request = RequestFactory.create(initial_revision={})
request = Requests.submit(new_request)
assert request.status == RequestStatus.PENDING_CCPO_ACCEPTANCE
def test_should_allow_submission():
new_request = RequestFactory.create()
assert Requests.should_allow_submission(new_request)
RequestStatusEventFactory.create(
request=new_request,
new_status=RequestStatus.CHANGES_REQUESTED,
revision=new_request.latest_revision,
)
assert Requests.should_allow_submission(new_request)
# new, blank revision
RequestRevisionFactory.create(request=new_request)
assert not Requests.should_allow_submission(new_request)
def test_request_knows_its_last_submission_timestamp(new_request):
submitted_request = Requests.submit(new_request)
assert submitted_request.last_submission_timestamp
def test_request_knows_if_it_has_no_last_submission_timestamp(new_request):
assert new_request.last_submission_timestamp is None
def test_exists(session):
user_allowed = UserFactory.create()
user_denied = UserFactory.create()
request = RequestFactory.create(creator=user_allowed)
assert Requests.exists(request.id, user_allowed)
assert not Requests.exists(request.id, user_denied)
def test_status_count(session):
# make sure table is empty
session.query(Request).delete()
request1 = RequestFactory.create()
request2 = RequestFactory.create()
RequestStatusEventFactory.create(
sequence=2,
request_id=request2.id,
revision=request2.latest_revision,
new_status=RequestStatus.PENDING_FINANCIAL_VERIFICATION,
)
assert Requests.status_count(RequestStatus.PENDING_FINANCIAL_VERIFICATION) == 1
assert Requests.status_count(RequestStatus.STARTED) == 1
assert Requests.in_progress_count() == 2
def test_status_count_scoped_to_creator(session):
# make sure table is empty
session.query(Request).delete()
user = UserFactory.create()
request1 = RequestFactory.create()
request2 = RequestFactory.create(creator=user)
assert Requests.status_count(RequestStatus.STARTED) == 2
assert Requests.status_count(RequestStatus.STARTED, creator=user) == 1
request_financial_data = {
"pe_id": "123",
"task_order_number": "021345",
"fname_co": "Contracting",
"lname_co": "Officer",
"email_co": "jane@mail.mil",
"office_co": "WHS",
"fname_cor": "Officer",
"lname_cor": "Representative",
"email_cor": "jane@mail.mil",
"office_cor": "WHS",
"uii_ids": "1234",
"treasury_code": "00123456",
"ba_code": "024A",
}
def test_set_status_sets_revision():
request = RequestFactory.create()
Requests.set_status(request, RequestStatus.APPROVED)
assert request.latest_revision == request.status_events[-1].revision
def test_advance_to_financial_verification():
request = RequestFactory.create_with_status(
status=RequestStatus.PENDING_CCPO_ACCEPTANCE
)
review_data = RequestReviewFactory.dictionary()
Requests.advance(UserFactory.create(), request, review_data)
assert request.status == RequestStatus.PENDING_FINANCIAL_VERIFICATION
current_review = request.latest_status.review
assert current_review.fname_mao == review_data["fname_mao"]
def test_advance_to_approval():
request = RequestFactory.create_with_status(
status=RequestStatus.PENDING_CCPO_APPROVAL
)
review_data = RequestReviewFactory.dictionary()
Requests.advance(UserFactory.create(), request, review_data)
assert request.status == RequestStatus.APPROVED
def test_request_changes_to_request_application():
request = RequestFactory.create_with_status(
status=RequestStatus.PENDING_CCPO_ACCEPTANCE
)
review_data = RequestReviewFactory.dictionary()
Requests.request_changes(UserFactory.create(), request, review_data)
assert request.status == RequestStatus.CHANGES_REQUESTED
current_review = request.latest_status.review
assert current_review.fname_mao == review_data["fname_mao"]
def test_request_changes_to_financial_verification_info():
request = RequestFactory.create_with_status(
status=RequestStatus.PENDING_CCPO_APPROVAL
)
review_data = RequestReviewFactory.dictionary()
Requests.request_changes(UserFactory.create(), request, review_data)
assert request.status == RequestStatus.CHANGES_REQUESTED_TO_FINVER
current_review = request.latest_status.review
assert current_review.fname_mao == review_data["fname_mao"]
def test_add_internal_comment():
request = RequestFactory.create()
ccpo = UserFactory.from_atat_role("ccpo")
assert len(request.internal_comments) == 0
request = Requests.add_internal_comment(ccpo, request, "this is my comment")
assert len(request.internal_comments) == 1
assert request.internal_comments[0].text == "this is my comment"
def test_creator_can_view_own_request():
creator = UserFactory.create()
request = RequestFactory.create(creator=creator)
assert RequestsAuthorization(creator, request).can_view
def test_ccpo_can_view_request():
ccpo = UserFactory.from_atat_role("ccpo")
request = RequestFactory.create()
assert RequestsAuthorization(ccpo, request).can_view
def test_random_user_cannot_view_request():
user = UserFactory.create()
request = RequestFactory.create()
assert not RequestsAuthorization(user, request).can_view
def test_auto_approve_and_create_portfolio():
request = RequestFactory.create()
portfolio = Requests.auto_approve_and_create_portfolio(request)
assert portfolio
assert request.reviews[0]
assert request.reviews[0].full_name_reviewer == "System"
class TestStatusNotifications(object):
def _assert_job(self, queue, request):
assert len(queue.get_queue()) == 1
job = queue.get_queue().jobs[0]
assert job.func == queue._send_mail
assert job.args[0] == [request.creator.email]
def test_pending_finver_triggers_notification(self, queue):
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_CCPO_ACCEPTANCE)
request = Requests.set_status(
request, RequestStatus.PENDING_FINANCIAL_VERIFICATION
)
self._assert_job(queue, request)
def test_changes_requested_triggers_notification(self, queue):
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_CCPO_ACCEPTANCE)
request = Requests.set_status(request, RequestStatus.CHANGES_REQUESTED)
self._assert_job(queue, request)
def test_changes_requested_to_finver_triggers_notification(self, queue):
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_CCPO_APPROVAL)
request = Requests.set_status(
request, RequestStatus.CHANGES_REQUESTED_TO_FINVER
)
self._assert_job(queue, request)
def test_approval_triggers_notification(self, queue):
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_CCPO_APPROVAL)
request = Requests.set_status(request, RequestStatus.APPROVED)
self._assert_job(queue, request)
def test_submitted_does_not_trigger_notification(self, queue):
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.SUBMITTED)
assert len(queue.get_queue()) == 0

View File

@ -1,122 +0,0 @@
from tests.factories import (
RequestFactory,
UserFactory,
RequestStatusEventFactory,
RequestReviewFactory,
RequestRevisionFactory,
)
from atst.domain.requests import Requests
from atst.models.request_status_event import RequestStatus
def test_pending_financial_requires_mo_action():
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_FINANCIAL_VERIFICATION)
assert request.action_required_by == "mission_owner"
def test_pending_ccpo_approval_requires_ccpo():
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_CCPO_APPROVAL)
assert request.action_required_by == "ccpo"
def test_request_has_creator():
user = UserFactory.create()
request = RequestFactory.create(creator=user)
assert request.creator == user
def test_request_status_started_displayname():
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.STARTED)
assert request.status_displayname == "Started"
def test_request_status_pending_financial_displayname():
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_FINANCIAL_VERIFICATION)
assert request.status_displayname == "Pending Financial Verification"
def test_request_status_pending_ccpo_displayname():
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_CCPO_APPROVAL)
assert request.status_displayname == "Pending CCPO Approval"
def test_request_status_pending_approved_displayname():
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.APPROVED)
assert request.status_displayname == "Approved"
def test_request_status_pending_expired_displayname():
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.EXPIRED)
assert request.status_displayname == "Expired"
def test_request_status_pending_deleted_displayname():
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.DELETED)
assert request.status_displayname == "Deleted"
def test_annual_spend():
request = RequestFactory.create()
monthly = request.body.get("details_of_use").get("estimated_monthly_spend")
assert request.annual_spend == monthly * 12
def test_reviews():
request = RequestFactory.create()
ccpo = UserFactory.from_atat_role("ccpo")
RequestStatusEventFactory.create(
request=request,
revision=request.latest_revision,
review=RequestReviewFactory.create(reviewer=ccpo),
),
RequestStatusEventFactory.create(
request=request,
revision=request.latest_revision,
review=RequestReviewFactory.create(reviewer=ccpo),
),
RequestStatusEventFactory.create(request=request, revision=request.latest_revision),
assert len(request.reviews) == 2
def test_review_comment():
request = RequestFactory.create()
ccpo = UserFactory.from_atat_role("ccpo")
RequestStatusEventFactory.create(
request=request,
revision=request.latest_revision,
new_status=RequestStatus.CHANGES_REQUESTED,
review=RequestReviewFactory.create(reviewer=ccpo, comment="do better"),
)
assert request.review_comment == "do better"
RequestStatusEventFactory.create(
request=request,
revision=request.latest_revision,
new_status=RequestStatus.APPROVED,
review=RequestReviewFactory.create(reviewer=ccpo, comment="much better"),
)
assert not request.review_comment
def test_finver_last_saved_at():
request = RequestFactory.create()
RequestRevisionFactory.create(fname_co="Amanda", request=request)
assert request.last_finver_draft_saved_at