diff --git a/atst/routes/__init__.py b/atst/routes/__init__.py index 965b4b37..4d767d05 100644 --- a/atst/routes/__init__.py +++ b/atst/routes/__init__.py @@ -4,8 +4,8 @@ import pendulum from atst.domain.requests import Requests from atst.domain.users import Users -from atst.domain.authnid.utils import parse_sdn -from atst.domain.exceptions import UnauthenticatedError +from atst.domain.authnid.utils import parse_sdn, email_from_certificate +from atst.domain.exceptions import UnauthenticatedError, NotFoundError bp = Blueprint("atst", __name__) @@ -35,10 +35,19 @@ def catch_all(path): # or raises the UnauthenticatedError @bp.route('/login-redirect') def login_redirect(): + # raise S_DN parse errors if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and _is_valid_certificate(request): sdn = request.environ.get('HTTP_X_SSL_CLIENT_S_DN') sdn_parts = parse_sdn(sdn) - user = Users.get_or_create_by_dod_id(**sdn_parts) + try: + user = Users.get_by_dod_id(sdn_parts["dod_id"]) + except NotFoundError: + try: + email = email_from_certificate(request.environ.get('HTTP_X_SSL_CLIENT_CERT').encode()) + sdn_parts["email"] = email + except ValueError: + pass + user = Users.create(**sdn_parts) session["user_id"] = user.id return redirect(url_for("atst.home")) diff --git a/tests/domain/authnid/test_utils.py b/tests/domain/authnid/test_utils.py index e72f095b..54e8f13d 100644 --- a/tests/domain/authnid/test_utils.py +++ b/tests/domain/authnid/test_utils.py @@ -1,6 +1,6 @@ import pytest import atst.domain.authnid.utils as utils -from tests.mocks import DOD_SDN +from tests.mocks import DOD_SDN, FIXTURE_EMAIL_ADDRESS def test_parse_sdn(): @@ -17,9 +17,6 @@ def test_parse_bad_sdn(): utils.parse_sdn(None) -FIXTURE_EMAIL_ADDRESS = "artgarfunkel@uso.mil" - - def test_parse_email_cert(): cert_file = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read() email = utils.email_from_certificate(cert_file) diff --git a/tests/mocks.py b/tests/mocks.py index 0307997e..2a2f0fab 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -3,18 +3,11 @@ from tests.factories import RequestFactory, UserFactory MOCK_USER = UserFactory.build() MOCK_REQUEST = RequestFactory.build( - creator=MOCK_USER.id, - body={ - "financial_verification": { - "pe_id": "0203752A", - }, - } + creator=MOCK_USER.id, body={"financial_verification": {"pe_id": "0203752A"}} ) -DOD_SDN_INFO = { - 'first_name': 'ART', - 'last_name': 'GARFUNKEL', - 'dod_id': '5892460358' - } +DOD_SDN_INFO = {"first_name": "ART", "last_name": "GARFUNKEL", "dod_id": "5892460358"} DOD_SDN = f"CN={DOD_SDN_INFO['last_name']}.{DOD_SDN_INFO['first_name']}.G.{DOD_SDN_INFO['dod_id']},OU=OTHER,OU=PKI,OU=DoD,O=U.S. Government,C=US" MOCK_VALID_PE_ID = "8675309U" + +FIXTURE_EMAIL_ADDRESS = "artgarfunkel@uso.mil" diff --git a/tests/test_auth.py b/tests/test_auth.py index 7e2b483d..60183451 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,5 +1,8 @@ +import pytest from flask import session, url_for -from .mocks import DOD_SDN +from .mocks import DOD_SDN_INFO, DOD_SDN, FIXTURE_EMAIL_ADDRESS +from atst.domain.users import Users +from atst.domain.exceptions import NotFoundError MOCK_USER = {"id": "438567dd-25fa-4d83-a8cc-8aa8366cb24a"} @@ -11,11 +14,14 @@ def _fetch_user_info(c, t): def test_successful_login_redirect(client, monkeypatch): monkeypatch.setattr("atst.routes._is_valid_certificate", lambda *args: True) + monkeypatch.setattr("atst.routes.email_from_certificate", lambda *args: None) resp = client.get( "/login-redirect", environ_base={ - "HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS", "HTTP_X_SSL_CLIENT_S_DN": DOD_SDN + "HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS", + "HTTP_X_SSL_CLIENT_S_DN": DOD_SDN, + "HTTP_X_SSL_CLIENT_CERT": "", }, ) @@ -58,8 +64,8 @@ UNPROTECTED_ROUTES = ["/", "/login-dev", "/login-redirect", "/unauthorized"] def test_crl_validation_on_login(client): - good_cert = open("ssl/client-certs/atat.mil.crt", "rb").read() - bad_cert = open("ssl/client-certs/bad-atat.mil.crt", "rb").read() + good_cert = open("ssl/client-certs/atat.mil.crt").read() + bad_cert = open("ssl/client-certs/bad-atat.mil.crt").read() # bad cert is on the test CRL resp = client.get( @@ -67,7 +73,7 @@ def test_crl_validation_on_login(client): environ_base={ "HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS", "HTTP_X_SSL_CLIENT_S_DN": DOD_SDN, - "HTTP_X_SSL_CLIENT_CERT": bad_cert.decode(), + "HTTP_X_SSL_CLIENT_CERT": bad_cert, }, ) assert resp.status_code == 401 @@ -79,9 +85,55 @@ def test_crl_validation_on_login(client): environ_base={ "HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS", "HTTP_X_SSL_CLIENT_S_DN": DOD_SDN, - "HTTP_X_SSL_CLIENT_CERT": good_cert.decode(), + "HTTP_X_SSL_CLIENT_CERT": good_cert, }, ) assert resp.status_code == 302 assert "home" in resp.headers["Location"] assert session["user_id"] + + +def test_creates_new_user_on_login(monkeypatch, client): + monkeypatch.setattr("atst.routes._is_valid_certificate", lambda *args: True) + cert_file = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read() + + # ensure user does not exist + with pytest.raises(NotFoundError): + Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) + + resp = client.get( + "/login-redirect", + environ_base={ + "HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS", + "HTTP_X_SSL_CLIENT_S_DN": DOD_SDN, + "HTTP_X_SSL_CLIENT_CERT": cert_file, + }, + ) + + user = Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) + assert user.first_name == DOD_SDN_INFO["first_name"] + assert user.last_name == DOD_SDN_INFO["last_name"] + assert user.email == FIXTURE_EMAIL_ADDRESS + + +def test_creates_new_user_without_email_on_login(monkeypatch, client): + monkeypatch.setattr("atst.routes._is_valid_certificate", lambda *args: True) + cert_file = open("ssl/client-certs/atat.mil.crt").read() + + # ensure user does not exist + with pytest.raises(NotFoundError): + Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) + + resp = client.get( + "/login-redirect", + environ_base={ + "HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS", + "HTTP_X_SSL_CLIENT_S_DN": DOD_SDN, + "HTTP_X_SSL_CLIENT_CERT": cert_file, + }, + ) + + user = Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) + assert user.first_name == DOD_SDN_INFO["first_name"] + assert user.last_name == DOD_SDN_INFO["last_name"] + assert user.email == None