diff --git a/atst/domain/authnid/__init__.py b/atst/domain/authnid/__init__.py index 35a4477c..b31e00fb 100644 --- a/atst/domain/authnid/__init__.py +++ b/atst/domain/authnid/__init__.py @@ -1,7 +1,7 @@ from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users from .utils import parse_sdn, email_from_certificate -from .crl import Validator +from .crl import crl_check, CRLException class AuthenticationContext(): @@ -22,8 +22,7 @@ class AuthenticationContext(): if not self.auth_status == "SUCCESS": raise UnauthenticatedError("SSL/TLS client authentication failed") - elif not self._crl_check(): - raise UnauthenticatedError("Client certificate failed CRL check") + self._crl_check() return True @@ -45,13 +44,10 @@ class AuthenticationContext(): return None def _crl_check(self): - validator = Validator(self.crl_cache, self.cert) - if self.cert: - result = validator.validate() - return result - - else: - return False + try: + crl_check(self.crl_cache, self.cert) + except CRLException as exc: + raise UnauthenticatedError("CRL check failed. " + str(exc)) @property def parsed_sdn(self): diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index 5bbc7c72..5c53ffe9 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -4,6 +4,8 @@ import re import hashlib from OpenSSL import crypto, SSL +class CRLException(Exception): + pass def sha256_checksum(filename, block_size=65536): sha256 = hashlib.sha256() @@ -13,6 +15,22 @@ def sha256_checksum(filename, block_size=65536): return sha256.hexdigest() +def crl_check(cache, cert): + parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert) + store = cache.get_store(parsed) + context = crypto.X509StoreContext(store, parsed) + try: + context.verify_certificate() + return True + + except crypto.X509StoreContextError as err: + raise CRLException( + "Certificate revoked or errored. Error: {}. Args: {}".format( + type(err), err.args + ) + ) + + class CRLCache(): _PEM_RE = re.compile( @@ -77,109 +95,3 @@ class CRLCache(): else: return self.x509_stores[issuer] - -class Validator: - - def __init__(self, cache, cert, logger=None): - self.cache = cache - self.cert = cert - self.logger = logger - - def validate(self): - parsed = crypto.load_certificate(crypto.FILETYPE_PEM, self.cert) - store = self.cache.get_store(parsed) - context = crypto.X509StoreContext(store, parsed) - try: - context.verify_certificate() - return True - - except crypto.X509StoreContextError as err: - self.log_error( - "Certificate revoked or errored. Error: {}. Args: {}".format( - type(err), err.args - ) - ) - return False - - def _add_roots(self, roots): - with open(filename, "rb") as f: - for raw_ca in self._parse_roots(f.read()): - ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca) - self._add_carefully("add_cert", ca) - - def _reset(self): - self.cache = {} - self.store = self.base_store() - self._add_crls(self.crl_locations) - self._add_roots(self.roots) - self.store.set_flags(crypto.X509StoreFlags.CRL_CHECK) - - def log_error(self, message): - if self.logger: - self.logger.error(message) - - def _add_crls(self, locations): - for filename in locations: - try: - self._add_crl(filename) - except crypto.Error as err: - self.log_error( - "CRL could not be parsed. Filename: {}, Error: {}, args: {}".format( - filename, type(err), err.args - ) - ) - - # This caches the CRL issuer with the CRL filepath and a checksum, in addition to adding the CRL to the store. - - def _add_crl(self, filename): - with open(filename, "rb") as crl_file: - crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) - self.cache[crl.get_issuer().der()] = (filename, sha256_checksum(filename)) - self._add_carefully("add_crl", crl) - - def _parse_roots(self, root_str): - return [match.group(0) for match in self._PEM_RE.finditer(root_str)] - - def _add_roots(self, roots): - for filename in roots: - with open(filename, "rb") as f: - for raw_ca in self._parse_roots(f.read()): - ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca) - self._add_carefully("add_cert", ca) - - # in testing, it seems that openssl is maintaining a local cache of certs - # in a hash table and throws errors if you try to add redundant certs or - # CRLs. For now, we catch and ignore that error with great specificity. - - def _add_carefully(self, method_name, obj): - try: - getattr(self.store, method_name)(obj) - except crypto.Error as error: - if self._is_preloaded_error(error): - pass - else: - raise error - - PRELOADED_CRL = ( - [ - ( - "x509 certificate routines", - "X509_STORE_add_crl", - "cert already in hash table", - ) - ], - ) - PRELOADED_CERT = ( - [ - ( - "x509 certificate routines", - "X509_STORE_add_cert", - "cert already in hash table", - ) - ], - ) - - def _is_preloaded_error(self, error): - return error.args == self.PRELOADED_CRL or error.args == self.PRELOADED_CERT - - # Checks that the CRL currently in-memory is up-to-date via the checksum. diff --git a/tests/domain/authnid/test_authentication_context.py b/tests/domain/authnid/test_authentication_context.py index f07d6e94..d7c8e157 100644 --- a/tests/domain/authnid/test_authentication_context.py +++ b/tests/domain/authnid/test_authentication_context.py @@ -1,6 +1,7 @@ import pytest from atst.domain.authnid import AuthenticationContext +from atst.domain.authnid.crl import CRLCache from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users @@ -16,7 +17,7 @@ class MockCRLCache(): def test_can_authenticate(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, CERT ) @@ -24,7 +25,7 @@ def test_can_authenticate(monkeypatch): def test_unsuccessful_status(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) auth_context = AuthenticationContext( MockCRLCache(), "FAILURE", DOD_SDN, CERT ) @@ -36,9 +37,10 @@ def test_unsuccessful_status(monkeypatch): def test_crl_check_fails(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: False) + cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=['ssl/client-certs/client-ca.der.crl']) + cert = open("ssl/client-certs/bad-atat.mil.crt", "r").read() auth_context = AuthenticationContext( - MockCRLCache(), "SUCCESS", DOD_SDN, CERT + cache, "SUCCESS", DOD_SDN, cert ) with pytest.raises(UnauthenticatedError) as excinfo: assert auth_context.authenticate() @@ -48,7 +50,7 @@ def test_crl_check_fails(monkeypatch): def test_bad_sdn(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", "abc123", CERT ) @@ -60,7 +62,7 @@ def test_bad_sdn(monkeypatch): def test_user_exists(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) user = UserFactory.create(**DOD_SDN_INFO) auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, CERT @@ -71,7 +73,7 @@ def test_user_exists(monkeypatch): def test_creates_user(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) # check user does not exist with pytest.raises(NotFoundError): Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) @@ -85,7 +87,7 @@ def test_creates_user(monkeypatch): def test_user_cert_has_no_email(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) cert = open("ssl/client-certs/atat.mil.crt").read() auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, cert diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index fcd371b3..34828567 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -4,7 +4,7 @@ import re import os import shutil from OpenSSL import crypto, SSL -from atst.domain.authnid.crl import Validator, CRLCache +from atst.domain.authnid.crl import crl_check, CRLCache, CRLException import atst.domain.authnid.crl.util as util @@ -39,18 +39,20 @@ def test_can_validate_certificate(): cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=['ssl/client-certs/client-ca.der.crl']) good_cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() bad_cert = open('ssl/client-certs/bad-atat.mil.crt', 'rb').read() - assert Validator(cache, good_cert).validate() - assert Validator(cache, bad_cert).validate() == False + assert crl_check(cache, good_cert) + with pytest.raises(CRLException): + crl_check(cache, bad_cert) def test_can_dynamically_update_crls(tmpdir): crl_file = tmpdir.join('test.crl') shutil.copyfile('ssl/client-certs/client-ca.der.crl', crl_file) cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=[crl_file]) cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() - assert Validator(cache, cert).validate() + assert crl_check(cache, cert) # override the original CRL with one that revokes atat.mil.crt shutil.copyfile('tests/fixtures/test.der.crl', crl_file) - assert Validator(cache, cert).validate() == False + with pytest.raises(CRLException): + assert crl_check(cache, cert) def test_parse_disa_pki_list(): with open('tests/fixtures/disa-pki.html') as disa: