basic implementation of email parsing for CAC user login

This commit is contained in:
dandds 2018-08-08 13:52:16 -04:00
parent c0d72cd0d6
commit 05de0665d4
4 changed files with 75 additions and 24 deletions

View File

@ -4,8 +4,8 @@ import pendulum
from atst.domain.requests import Requests from atst.domain.requests import Requests
from atst.domain.users import Users from atst.domain.users import Users
from atst.domain.authnid.utils import parse_sdn from atst.domain.authnid.utils import parse_sdn, email_from_certificate
from atst.domain.exceptions import UnauthenticatedError from atst.domain.exceptions import UnauthenticatedError, NotFoundError
bp = Blueprint("atst", __name__) bp = Blueprint("atst", __name__)
@ -35,10 +35,19 @@ def catch_all(path):
# or raises the UnauthenticatedError # or raises the UnauthenticatedError
@bp.route('/login-redirect') @bp.route('/login-redirect')
def login_redirect(): def login_redirect():
# raise S_DN parse errors
if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and _is_valid_certificate(request): if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and _is_valid_certificate(request):
sdn = request.environ.get('HTTP_X_SSL_CLIENT_S_DN') sdn = request.environ.get('HTTP_X_SSL_CLIENT_S_DN')
sdn_parts = parse_sdn(sdn) sdn_parts = parse_sdn(sdn)
user = Users.get_or_create_by_dod_id(**sdn_parts) try:
user = Users.get_by_dod_id(sdn_parts["dod_id"])
except NotFoundError:
try:
email = email_from_certificate(request.environ.get('HTTP_X_SSL_CLIENT_CERT').encode())
sdn_parts["email"] = email
except ValueError:
pass
user = Users.create(**sdn_parts)
session["user_id"] = user.id session["user_id"] = user.id
return redirect(url_for("atst.home")) return redirect(url_for("atst.home"))

View File

@ -1,6 +1,6 @@
import pytest import pytest
import atst.domain.authnid.utils as utils import atst.domain.authnid.utils as utils
from tests.mocks import DOD_SDN from tests.mocks import DOD_SDN, FIXTURE_EMAIL_ADDRESS
def test_parse_sdn(): def test_parse_sdn():
@ -17,9 +17,6 @@ def test_parse_bad_sdn():
utils.parse_sdn(None) utils.parse_sdn(None)
FIXTURE_EMAIL_ADDRESS = "artgarfunkel@uso.mil"
def test_parse_email_cert(): def test_parse_email_cert():
cert_file = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read() cert_file = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read()
email = utils.email_from_certificate(cert_file) email = utils.email_from_certificate(cert_file)

View File

@ -3,18 +3,11 @@ from tests.factories import RequestFactory, UserFactory
MOCK_USER = UserFactory.build() MOCK_USER = UserFactory.build()
MOCK_REQUEST = RequestFactory.build( MOCK_REQUEST = RequestFactory.build(
creator=MOCK_USER.id, creator=MOCK_USER.id, body={"financial_verification": {"pe_id": "0203752A"}}
body={
"financial_verification": {
"pe_id": "0203752A",
},
}
) )
DOD_SDN_INFO = { DOD_SDN_INFO = {"first_name": "ART", "last_name": "GARFUNKEL", "dod_id": "5892460358"}
'first_name': 'ART',
'last_name': 'GARFUNKEL',
'dod_id': '5892460358'
}
DOD_SDN = f"CN={DOD_SDN_INFO['last_name']}.{DOD_SDN_INFO['first_name']}.G.{DOD_SDN_INFO['dod_id']},OU=OTHER,OU=PKI,OU=DoD,O=U.S. Government,C=US" DOD_SDN = f"CN={DOD_SDN_INFO['last_name']}.{DOD_SDN_INFO['first_name']}.G.{DOD_SDN_INFO['dod_id']},OU=OTHER,OU=PKI,OU=DoD,O=U.S. Government,C=US"
MOCK_VALID_PE_ID = "8675309U" MOCK_VALID_PE_ID = "8675309U"
FIXTURE_EMAIL_ADDRESS = "artgarfunkel@uso.mil"

View File

@ -1,5 +1,8 @@
import pytest
from flask import session, url_for from flask import session, url_for
from .mocks import DOD_SDN from .mocks import DOD_SDN_INFO, DOD_SDN, FIXTURE_EMAIL_ADDRESS
from atst.domain.users import Users
from atst.domain.exceptions import NotFoundError
MOCK_USER = {"id": "438567dd-25fa-4d83-a8cc-8aa8366cb24a"} MOCK_USER = {"id": "438567dd-25fa-4d83-a8cc-8aa8366cb24a"}
@ -11,11 +14,14 @@ def _fetch_user_info(c, t):
def test_successful_login_redirect(client, monkeypatch): def test_successful_login_redirect(client, monkeypatch):
monkeypatch.setattr("atst.routes._is_valid_certificate", lambda *args: True) monkeypatch.setattr("atst.routes._is_valid_certificate", lambda *args: True)
monkeypatch.setattr("atst.routes.email_from_certificate", lambda *args: None)
resp = client.get( resp = client.get(
"/login-redirect", "/login-redirect",
environ_base={ environ_base={
"HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS", "HTTP_X_SSL_CLIENT_S_DN": DOD_SDN "HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS",
"HTTP_X_SSL_CLIENT_S_DN": DOD_SDN,
"HTTP_X_SSL_CLIENT_CERT": "",
}, },
) )
@ -58,8 +64,8 @@ UNPROTECTED_ROUTES = ["/", "/login-dev", "/login-redirect", "/unauthorized"]
def test_crl_validation_on_login(client): def test_crl_validation_on_login(client):
good_cert = open("ssl/client-certs/atat.mil.crt", "rb").read() good_cert = open("ssl/client-certs/atat.mil.crt").read()
bad_cert = open("ssl/client-certs/bad-atat.mil.crt", "rb").read() bad_cert = open("ssl/client-certs/bad-atat.mil.crt").read()
# bad cert is on the test CRL # bad cert is on the test CRL
resp = client.get( resp = client.get(
@ -67,7 +73,7 @@ def test_crl_validation_on_login(client):
environ_base={ environ_base={
"HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS", "HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS",
"HTTP_X_SSL_CLIENT_S_DN": DOD_SDN, "HTTP_X_SSL_CLIENT_S_DN": DOD_SDN,
"HTTP_X_SSL_CLIENT_CERT": bad_cert.decode(), "HTTP_X_SSL_CLIENT_CERT": bad_cert,
}, },
) )
assert resp.status_code == 401 assert resp.status_code == 401
@ -79,9 +85,55 @@ def test_crl_validation_on_login(client):
environ_base={ environ_base={
"HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS", "HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS",
"HTTP_X_SSL_CLIENT_S_DN": DOD_SDN, "HTTP_X_SSL_CLIENT_S_DN": DOD_SDN,
"HTTP_X_SSL_CLIENT_CERT": good_cert.decode(), "HTTP_X_SSL_CLIENT_CERT": good_cert,
}, },
) )
assert resp.status_code == 302 assert resp.status_code == 302
assert "home" in resp.headers["Location"] assert "home" in resp.headers["Location"]
assert session["user_id"] assert session["user_id"]
def test_creates_new_user_on_login(monkeypatch, client):
monkeypatch.setattr("atst.routes._is_valid_certificate", lambda *args: True)
cert_file = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read()
# ensure user does not exist
with pytest.raises(NotFoundError):
Users.get_by_dod_id(DOD_SDN_INFO["dod_id"])
resp = client.get(
"/login-redirect",
environ_base={
"HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS",
"HTTP_X_SSL_CLIENT_S_DN": DOD_SDN,
"HTTP_X_SSL_CLIENT_CERT": cert_file,
},
)
user = Users.get_by_dod_id(DOD_SDN_INFO["dod_id"])
assert user.first_name == DOD_SDN_INFO["first_name"]
assert user.last_name == DOD_SDN_INFO["last_name"]
assert user.email == FIXTURE_EMAIL_ADDRESS
def test_creates_new_user_without_email_on_login(monkeypatch, client):
monkeypatch.setattr("atst.routes._is_valid_certificate", lambda *args: True)
cert_file = open("ssl/client-certs/atat.mil.crt").read()
# ensure user does not exist
with pytest.raises(NotFoundError):
Users.get_by_dod_id(DOD_SDN_INFO["dod_id"])
resp = client.get(
"/login-redirect",
environ_base={
"HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS",
"HTTP_X_SSL_CLIENT_S_DN": DOD_SDN,
"HTTP_X_SSL_CLIENT_CERT": cert_file,
},
)
user = Users.get_by_dod_id(DOD_SDN_INFO["dod_id"])
assert user.first_name == DOD_SDN_INFO["first_name"]
assert user.last_name == DOD_SDN_INFO["last_name"]
assert user.email == None