diff --git a/atst/domain/authnid/__init__.py b/atst/domain/authnid/__init__.py index 46dc9445..b7f31717 100644 --- a/atst/domain/authnid/__init__.py +++ b/atst/domain/authnid/__init__.py @@ -1,7 +1,7 @@ from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users from .utils import parse_sdn, email_from_certificate -from .crl import crl_check, CRLRevocationException +from .crl import CRLRevocationException class AuthenticationContext(): @@ -45,7 +45,7 @@ class AuthenticationContext(): def _crl_check(self): try: - crl_check(self.crl_cache, self.cert) + self.crl_cache.crl_check(self.cert) except CRLRevocationException as exc: raise UnauthenticatedError("CRL check failed. " + str(exc)) diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index b43b5a8e..cad27b79 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -9,22 +9,6 @@ class CRLRevocationException(Exception): pass -def crl_check(cache, cert): - parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert) - store = cache.get_store(parsed) - context = crypto.X509StoreContext(store, parsed) - try: - context.verify_certificate() - return True - - except crypto.X509StoreContextError as err: - raise CRLRevocationException( - "Certificate revoked or errored. Error: {}. Args: {}".format( - type(err), err.args - ) - ) - - class CRLCache(): _PEM_RE = re.compile( @@ -38,7 +22,7 @@ class CRLCache(): self._load_roots(root_location) self._build_crl_cache(crl_locations) - def get_store(self, cert): + def _get_store(self, cert): return self._build_store(cert.get_issuer().der()) def _load_roots(self, root_location): @@ -91,3 +75,18 @@ class CRLCache(): else: return self._add_certificate_chain_to_store(store, ca.get_issuer()) + + def crl_check(self, cert): + parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert) + store = self._get_store(parsed) + context = crypto.X509StoreContext(store, parsed) + try: + context.verify_certificate() + return True + + except crypto.X509StoreContextError as err: + raise CRLRevocationException( + "Certificate revoked or errored. Error: {}. Args: {}".format( + type(err), err.args + ) + ) diff --git a/tests/domain/authnid/test_authentication_context.py b/tests/domain/authnid/test_authentication_context.py index d7c8e157..81a9b5d3 100644 --- a/tests/domain/authnid/test_authentication_context.py +++ b/tests/domain/authnid/test_authentication_context.py @@ -1,7 +1,7 @@ import pytest from atst.domain.authnid import AuthenticationContext -from atst.domain.authnid.crl import CRLCache +from atst.domain.authnid.crl import CRLCache, CRLRevocationException from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users @@ -12,20 +12,24 @@ CERT = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read() class MockCRLCache(): - def get_store(self, cert): - pass + def __init__(self, valid=True): + self.valid = valid + + def crl_check(self, cert): + if self.valid: + return True + + raise CRLRevocationException() -def test_can_authenticate(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_can_authenticate(): auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, CERT ) assert auth_context.authenticate() -def test_unsuccessful_status(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_unsuccessful_status(): auth_context = AuthenticationContext( MockCRLCache(), "FAILURE", DOD_SDN, CERT ) @@ -36,11 +40,9 @@ def test_unsuccessful_status(monkeypatch): assert "client authentication" in message -def test_crl_check_fails(monkeypatch): - cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=['ssl/client-certs/client-ca.der.crl']) - cert = open("ssl/client-certs/bad-atat.mil.crt", "r").read() +def test_crl_check_fails(): auth_context = AuthenticationContext( - cache, "SUCCESS", DOD_SDN, cert + MockCRLCache(False), "SUCCESS", DOD_SDN, CERT ) with pytest.raises(UnauthenticatedError) as excinfo: assert auth_context.authenticate() @@ -49,8 +51,7 @@ def test_crl_check_fails(monkeypatch): assert "CRL check" in message -def test_bad_sdn(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_bad_sdn(): auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", "abc123", CERT ) @@ -61,8 +62,7 @@ def test_bad_sdn(monkeypatch): assert "SDN" in message -def test_user_exists(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_user_exists(): user = UserFactory.create(**DOD_SDN_INFO) auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, CERT @@ -72,8 +72,7 @@ def test_user_exists(monkeypatch): assert auth_user == user -def test_creates_user(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_creates_user(): # check user does not exist with pytest.raises(NotFoundError): Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) @@ -86,8 +85,7 @@ def test_creates_user(monkeypatch): assert user.email == FIXTURE_EMAIL_ADDRESS -def test_user_cert_has_no_email(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_user_cert_has_no_email(): cert = open("ssl/client-certs/atat.mil.crt").read() auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, cert diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 19757353..7ba3a213 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -5,7 +5,7 @@ import os import shutil from OpenSSL import crypto, SSL -from atst.domain.authnid.crl import crl_check, CRLCache, CRLRevocationException +from atst.domain.authnid.crl import CRLCache, CRLRevocationException import atst.domain.authnid.crl.util as util from tests.mocks import FIXTURE_EMAIL_ADDRESS @@ -54,9 +54,9 @@ def test_can_validate_certificate(): ) good_cert = open("ssl/client-certs/atat.mil.crt", "rb").read() bad_cert = open("ssl/client-certs/bad-atat.mil.crt", "rb").read() - assert crl_check(cache, good_cert) + assert cache.crl_check(good_cert) with pytest.raises(CRLRevocationException): - crl_check(cache, bad_cert) + cache.crl_check(bad_cert) def test_can_dynamically_update_crls(tmpdir): @@ -64,18 +64,18 @@ def test_can_dynamically_update_crls(tmpdir): shutil.copyfile("ssl/client-certs/client-ca.der.crl", crl_file) cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[crl_file]) cert = open("ssl/client-certs/atat.mil.crt", "rb").read() - assert crl_check(cache, cert) + assert cache.crl_check(cert) # override the original CRL with one that revokes atat.mil.crt shutil.copyfile("tests/fixtures/test.der.crl", crl_file) with pytest.raises(CRLRevocationException): - assert crl_check(cache, cert) + assert cache.crl_check(cert) def test_throws_error_for_missing_issuer(): cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[]) cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read() with pytest.raises(CRLRevocationException) as exc: - assert crl_check(cache, cert) + assert cache.crl_check(cert) (message,) = exc.value.args assert "issuer" in message