Merge pull request #412 from dod-ccpo/status-change-emails

Notify PSO or MO of request status change
This commit is contained in:
richard-dds
2018-11-01 10:13:39 -04:00
committed by GitHub
14 changed files with 198 additions and 66 deletions

View File

@@ -9,8 +9,7 @@ from tempfile import TemporaryDirectory
from atst.app import make_app, make_config
from atst.database import db as _db
from atst.domain.auth import logout
from atst.queue import queue
from atst.queue import queue as atst_queue
import tests.factories as factories
from tests.mocks import PDF_FILENAME
@@ -136,3 +135,9 @@ def extended_financial_verification_data(pdf_upload):
"clin_2003": "7000",
"task_order": pdf_upload,
}
@pytest.fixture(scope="function", autouse=True)
def queue():
yield atst_queue
atst_queue.get_queue().empty()

View File

@@ -6,13 +6,11 @@ from atst.domain.requests import Requests
from atst.domain.requests.authorization import RequestsAuthorization
from atst.models.request import Request
from atst.models.request_status_event import RequestStatus
from atst.models.task_order import Source as TaskOrderSource
from tests.factories import (
RequestFactory,
UserFactory,
RequestStatusEventFactory,
TaskOrderFactory,
RequestRevisionFactory,
RequestReviewFactory,
)
@@ -222,3 +220,44 @@ def test_random_user_cannot_view_request():
request = RequestFactory.create()
assert not RequestsAuthorization(user, request).can_view
class TestStatusNotifications(object):
def _assert_job(self, queue, request):
assert len(queue.get_queue()) == 1
job = queue.get_queue().jobs[0]
assert job.func == queue._send_mail
assert job.args[0] == [request.creator.email]
def test_pending_finver_triggers_notification(self, queue):
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_CCPO_ACCEPTANCE)
request = Requests.set_status(
request, RequestStatus.PENDING_FINANCIAL_VERIFICATION
)
self._assert_job(queue, request)
def test_changes_requested_triggers_notification(self, queue):
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_CCPO_ACCEPTANCE)
request = Requests.set_status(request, RequestStatus.CHANGES_REQUESTED)
self._assert_job(queue, request)
def test_changes_requested_to_finver_triggers_notification(self, queue):
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_CCPO_APPROVAL)
request = Requests.set_status(
request, RequestStatus.CHANGES_REQUESTED_TO_FINVER
)
self._assert_job(queue, request)
def test_approval_triggers_notification(self, queue):
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.PENDING_CCPO_APPROVAL)
request = Requests.set_status(request, RequestStatus.APPROVED)
self._assert_job(queue, request)
def test_submitted_does_not_trigger_notification(self, queue):
request = RequestFactory.create()
request = Requests.set_status(request, RequestStatus.SUBMITTED)
assert len(queue.get_queue()) == 0

View File

@@ -0,0 +1,52 @@
from tests.factories import UserFactory, RequestFactory
from atst.models.request_status_event import RequestStatus
def test_creator_pending_finver(client, user_session):
request = RequestFactory.create_with_status(
RequestStatus.PENDING_FINANCIAL_VERIFICATION
)
user_session(request.creator)
response = client.get(
"/requests/edit/{}".format(request.id), follow_redirects=False
)
assert "verify" in response.location
def test_creator_pending_finver_changes(client, user_session):
request = RequestFactory.create_with_status(
RequestStatus.CHANGES_REQUESTED_TO_FINVER
)
user_session(request.creator)
response = client.get(
"/requests/edit/{}".format(request.id), follow_redirects=False
)
assert "verify" in response.location
def test_creator_approved(client, user_session):
request = RequestFactory.create_with_status(RequestStatus.APPROVED)
user_session(request.creator)
response = client.get(
"/requests/edit/{}".format(request.id), follow_redirects=False
)
assert "details" in response.location
def test_creator_approved(client, user_session):
request = RequestFactory.create_with_status(RequestStatus.STARTED)
user_session(request.creator)
response = client.get(
"/requests/edit/{}".format(request.id), follow_redirects=False
)
assert "new" in response.location
def test_ccpo(client, user_session):
ccpo = UserFactory.from_atat_role("ccpo")
request = RequestFactory.create_with_status(RequestStatus.STARTED)
user_session(ccpo)
response = client.get(
"/requests/edit/{}".format(request.id), follow_redirects=False
)
assert "approval" in response.location

View File

@@ -237,13 +237,11 @@ def test_displays_ccpo_review_comment(user_session, client):
ccpo = UserFactory.from_atat_role("ccpo")
user_session(creator)
request = RequestFactory.create(creator=creator)
status = RequestStatusEventFactory.create(
request=request,
revision=request.latest_revision,
new_status=RequestStatus.CHANGES_REQUESTED,
)
request = Requests.set_status(request, RequestStatus.CHANGES_REQUESTED)
review_comment = "add all of the correct info, instead of the incorrect info"
RequestReviewFactory.create(reviewer=ccpo, comment=review_comment, status=status)
RequestReviewFactory.create(
reviewer=ccpo, comment=review_comment, status=request.status_events[-1]
)
response = client.get("/requests/new/1/{}".format(request.id))
body = response.data.decode()
assert review_comment in body

View File

@@ -23,18 +23,3 @@ def test_action_required_ccpo():
context = RequestsIndex(ccpo).execute()
assert context["num_action_required"] == 1
def test_ccpo_sees_approval_screen():
ccpo = UserFactory.from_atat_role("ccpo")
request = RequestFactory.create()
Requests.submit(request)
ccpo_context = RequestsIndex(ccpo).execute()
assert ccpo_context["requests"][0]["edit_link"] == url_for(
"requests.approval", request_id=request.id
)
mo_context = RequestsIndex(request.creator).execute()
assert mo_context["requests"][0]["edit_link"] != url_for(
"requests.approval", request_id=request.id
)

View File

@@ -1,17 +1,5 @@
import pytest
from atst.queue import queue
# ensure queue is always empty for unit testing
@pytest.fixture(scope="function", autouse=True)
def reset_queue():
queue.get_queue().empty()
yield
queue.get_queue().empty()
def test_send_mail():
initial = len(queue.get_queue())
def test_send_mail(queue):
queue.send_mail(
["lordvader@geocities.net"], "death start", "how is it coming along?"
)
assert len(queue.get_queue()) == initial + 1
assert len(queue.get_queue()) == 1