From 1acb55fde65dbdc60464e28f34c4ec7506898078 Mon Sep 17 00:00:00 2001 From: dandds Date: Thu, 16 Aug 2018 13:02:49 -0400 Subject: [PATCH 1/8] beginning of a CRL loader rewrite --- atst/app.py | 6 +-- atst/domain/authnid/crl/__init__.py | 62 ++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/atst/app.py b/atst/app.py index ae207b01..cbca4c22 100644 --- a/atst/app.py +++ b/atst/app.py @@ -16,7 +16,7 @@ from atst.routes.workspaces import bp as workspace_routes from atst.routes.requests import requests_bp from atst.routes.dev import bp as dev_routes from atst.routes.errors import make_error_pages -from atst.domain.authnid.crl import Validator +from atst.domain.authnid.crl import Validator, CRLCache from atst.domain.auth import apply_authentication @@ -141,7 +141,5 @@ def make_crl_validator(app): crl_locations = [] for filename in pathlib.Path(app.config["CRL_DIRECTORY"]).glob("*"): crl_locations.append(filename.absolute()) - app.crl_validator = Validator( - roots=[app.config["CA_CHAIN"]], crl_locations=crl_locations, logger=app.logger - ) + app.crl_cache = CRLCache(app.config["CA_CHAIN"], crl_locations) diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index 68358e37..dc988546 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -13,6 +13,58 @@ def sha256_checksum(filename, block_size=65536): return sha256.hexdigest() +class CRLCache(): + + _PEM_RE = re.compile( + b"-----BEGIN CERTIFICATE-----\r?.+?\r?-----END CERTIFICATE-----\r?\n?", + re.DOTALL, + ) + + def __init__(self, root_location, crl_locations=[], store_class=crypto.X509Store): + self.crl_cache = {} + self.store_class = store_class + self._load_roots(root_location) + self._build_x509_stores(crl_locations) + + def _load_roots(self, root_location): + self.certificate_authorities = {} + with open(root_location, "rb") as f: + for raw_ca in self._parse_roots(f.read()): + ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca) + self.certificate_authorities[ca.get_subject().der()] = ca + + def _parse_roots(self, root_str): + return [match.group(0) for match in self._PEM_RE.finditer(root_str)] + + def _build_x509_stores(self, crl_locations): + self.x509_stores = {} + for crl_path in crl_locations: + issuer, store = self._build_store(crl_path) + self.x509_stores[issuer] = store + + def _build_store(self, crl_location): + store = self.store_class() + store.set_flags(crypto.X509StoreFlags.CRL_CHECK) + with open(crl_location, "rb") as crl_file: + crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) + self.crl_cache[crl.get_issuer().der()] = (crl_location, sha256_checksum(crl_location)) + store.add_crl(crl) + store = self._add_certificate_chain_to_store(store, crl.get_issuer()) + return (crl.get_issuer().der(), store) + + # this _should_ happen just twice for the DoD PKI (intermediary, root) but + # theoretically it can build a longer certificate chain + def _add_certificate_chain_to_store(self, store, issuer): + ca = self.certificate_authorities.get(issuer.der()) + # i.e., it is the root CA + if issuer == ca.get_subject(): + return store + + store.add_cert(ca) + return self._add_certificate_chain_to_store(store, ca.get_issuer()) + + + class Validator: _PEM_RE = re.compile( @@ -20,13 +72,19 @@ class Validator: re.DOTALL, ) - def __init__(self, crl_locations=[], roots=[], base_store=crypto.X509Store, logger=None): + def __init__(self, root, crl_locations=[], base_store=crypto.X509Store, logger=None): self.crl_locations = crl_locations - self.roots = roots + self.root = root self.base_store = base_store self.logger = logger self._reset() + def _add_roots(self, roots): + with open(filename, "rb") as f: + for raw_ca in self._parse_roots(f.read()): + ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca) + self._add_carefully("add_cert", ca) + def _reset(self): self.cache = {} self.store = self.base_store() From 2db84fb19a446fac0b93a3da9bd4c03b839847bb Mon Sep 17 00:00:00 2001 From: dandds Date: Thu, 16 Aug 2018 14:09:18 -0400 Subject: [PATCH 2/8] build individual x509 stores for each CRL --- atst/app.py | 2 +- atst/domain/authnid/__init__.py | 8 +- atst/domain/authnid/crl/__init__.py | 75 +++++++++--------- atst/routes/__init__.py | 2 +- .../authnid/test_authentication_context.py | 45 ++++++----- tests/domain/authnid/test_crl.py | 29 +++---- tests/fixtures/test.der.crl | Bin 768 -> 530 bytes 7 files changed, 80 insertions(+), 81 deletions(-) diff --git a/atst/app.py b/atst/app.py index cbca4c22..ddf66a40 100644 --- a/atst/app.py +++ b/atst/app.py @@ -16,7 +16,7 @@ from atst.routes.workspaces import bp as workspace_routes from atst.routes.requests import requests_bp from atst.routes.dev import bp as dev_routes from atst.routes.errors import make_error_pages -from atst.domain.authnid.crl import Validator, CRLCache +from atst.domain.authnid.crl import CRLCache from atst.domain.auth import apply_authentication diff --git a/atst/domain/authnid/__init__.py b/atst/domain/authnid/__init__.py index 80d645b8..35a4477c 100644 --- a/atst/domain/authnid/__init__.py +++ b/atst/domain/authnid/__init__.py @@ -1,17 +1,18 @@ from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users from .utils import parse_sdn, email_from_certificate +from .crl import Validator class AuthenticationContext(): - def __init__(self, crl_validator, auth_status, sdn, cert): + def __init__(self, crl_cache, auth_status, sdn, cert): if None in locals().values(): raise UnauthenticatedError( "Missing required authentication context components" ) - self.crl_validator = crl_validator + self.crl_cache = crl_cache self.auth_status = auth_status self.sdn = sdn self.cert = cert.encode() @@ -44,8 +45,9 @@ class AuthenticationContext(): return None def _crl_check(self): + validator = Validator(self.crl_cache, self.cert) if self.cert: - result = self.crl_validator.validate(self.cert) + result = validator.validate() return result else: diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index dc988546..5bbc7c72 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -56,28 +56,50 @@ class CRLCache(): # theoretically it can build a longer certificate chain def _add_certificate_chain_to_store(self, store, issuer): ca = self.certificate_authorities.get(issuer.der()) - # i.e., it is the root CA - if issuer == ca.get_subject(): - return store - store.add_cert(ca) - return self._add_certificate_chain_to_store(store, ca.get_issuer()) + if issuer == ca.get_subject(): + # i.e., it is the root CA and we are at the end of the chain + return store + else: + return self._add_certificate_chain_to_store(store, ca.get_issuer()) + + def get_store(self, cert): + return self._check_cache(cert.get_issuer().der()) + + def _check_cache(self, issuer): + if issuer in self.crl_cache: + filename, checksum = self.crl_cache[issuer] + if sha256_checksum(filename) != checksum: + issuer, store = self._build_store(filename) + self.x509_stores[issuer] = store + return store + else: + return self.x509_stores[issuer] class Validator: - _PEM_RE = re.compile( - b"-----BEGIN CERTIFICATE-----\r?.+?\r?-----END CERTIFICATE-----\r?\n?", - re.DOTALL, - ) - - def __init__(self, root, crl_locations=[], base_store=crypto.X509Store, logger=None): - self.crl_locations = crl_locations - self.root = root - self.base_store = base_store + def __init__(self, cache, cert, logger=None): + self.cache = cache + self.cert = cert self.logger = logger - self._reset() + + def validate(self): + parsed = crypto.load_certificate(crypto.FILETYPE_PEM, self.cert) + store = self.cache.get_store(parsed) + context = crypto.X509StoreContext(store, parsed) + try: + context.verify_certificate() + return True + + except crypto.X509StoreContextError as err: + self.log_error( + "Certificate revoked or errored. Error: {}. Args: {}".format( + type(err), err.args + ) + ) + return False def _add_roots(self, roots): with open(filename, "rb") as f: @@ -161,26 +183,3 @@ class Validator: return error.args == self.PRELOADED_CRL or error.args == self.PRELOADED_CERT # Checks that the CRL currently in-memory is up-to-date via the checksum. - - def refresh_cache(self, cert): - der = cert.get_issuer().der() - if der in self.cache: - filename, checksum = self.cache[der] - if sha256_checksum(filename) != checksum: - self._reset() - - def validate(self, cert): - parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert) - self.refresh_cache(parsed) - context = crypto.X509StoreContext(self.store, parsed) - try: - context.verify_certificate() - return True - - except crypto.X509StoreContextError as err: - self.log_error( - "Certificate revoked or errored. Error: {}. Args: {}".format( - type(err), err.args - ) - ) - return False diff --git a/atst/routes/__init__.py b/atst/routes/__init__.py index f8c4199c..68c83437 100644 --- a/atst/routes/__init__.py +++ b/atst/routes/__init__.py @@ -32,7 +32,7 @@ def catch_all(path): def _make_authentication_context(): return AuthenticationContext( - crl_validator=app.crl_validator, + crl_cache=app.crl_cache, auth_status=request.environ.get("HTTP_X_SSL_CLIENT_VERIFY"), sdn=request.environ.get("HTTP_X_SSL_CLIENT_S_DN"), cert=request.environ.get("HTTP_X_SSL_CLIENT_CERT") diff --git a/tests/domain/authnid/test_authentication_context.py b/tests/domain/authnid/test_authentication_context.py index f2a359af..f07d6e94 100644 --- a/tests/domain/authnid/test_authentication_context.py +++ b/tests/domain/authnid/test_authentication_context.py @@ -10,25 +10,23 @@ from tests.factories import UserFactory CERT = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read() -class MockCRLValidator(): - - def __init__(self, value): - self.value = value - - def validate(self, cert): - return self.value +class MockCRLCache(): + def get_store(self, cert): + pass -def test_can_authenticate(): +def test_can_authenticate(monkeypatch): + monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) auth_context = AuthenticationContext( - MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT + MockCRLCache(), "SUCCESS", DOD_SDN, CERT ) assert auth_context.authenticate() -def test_unsuccessful_status(): +def test_unsuccessful_status(monkeypatch): + monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) auth_context = AuthenticationContext( - MockCRLValidator(True), "FAILURE", DOD_SDN, CERT + MockCRLCache(), "FAILURE", DOD_SDN, CERT ) with pytest.raises(UnauthenticatedError) as excinfo: assert auth_context.authenticate() @@ -37,9 +35,10 @@ def test_unsuccessful_status(): assert "client authentication" in message -def test_crl_check_fails(): +def test_crl_check_fails(monkeypatch): + monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: False) auth_context = AuthenticationContext( - MockCRLValidator(False), "SUCCESS", DOD_SDN, CERT + MockCRLCache(), "SUCCESS", DOD_SDN, CERT ) with pytest.raises(UnauthenticatedError) as excinfo: assert auth_context.authenticate() @@ -48,9 +47,10 @@ def test_crl_check_fails(): assert "CRL check" in message -def test_bad_sdn(): +def test_bad_sdn(monkeypatch): + monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) auth_context = AuthenticationContext( - MockCRLValidator(True), "SUCCESS", "abc123", CERT + MockCRLCache(), "SUCCESS", "abc123", CERT ) with pytest.raises(UnauthenticatedError) as excinfo: auth_context.get_user() @@ -59,33 +59,36 @@ def test_bad_sdn(): assert "SDN" in message -def test_user_exists(): +def test_user_exists(monkeypatch): + monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) user = UserFactory.create(**DOD_SDN_INFO) auth_context = AuthenticationContext( - MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT + MockCRLCache(), "SUCCESS", DOD_SDN, CERT ) auth_user = auth_context.get_user() assert auth_user == user -def test_creates_user(): +def test_creates_user(monkeypatch): + monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) # check user does not exist with pytest.raises(NotFoundError): Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) auth_context = AuthenticationContext( - MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT + MockCRLCache(), "SUCCESS", DOD_SDN, CERT ) user = auth_context.get_user() assert user.dod_id == DOD_SDN_INFO["dod_id"] assert user.email == FIXTURE_EMAIL_ADDRESS -def test_user_cert_has_no_email(): +def test_user_cert_has_no_email(monkeypatch): + monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) cert = open("ssl/client-certs/atat.mil.crt").read() auth_context = AuthenticationContext( - MockCRLValidator(True), "SUCCESS", DOD_SDN, cert + MockCRLCache(), "SUCCESS", DOD_SDN, cert ) user = auth_context.get_user() diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 1b9fa2ec..fcd371b3 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -4,7 +4,7 @@ import re import os import shutil from OpenSSL import crypto, SSL -from atst.domain.authnid.crl import Validator +from atst.domain.authnid.crl import Validator, CRLCache import atst.domain.authnid.crl.util as util @@ -24,38 +24,33 @@ class MockX509Store(): def test_can_build_crl_list(monkeypatch): location = 'ssl/client-certs/client-ca.der.crl' - validator = Validator(crl_locations=[location], base_store=MockX509Store) - assert len(validator.store.crls) == 1 + cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=[location], store_class=MockX509Store) + for store in cache.x509_stores.values(): + assert len(store.crls) == 1 def test_can_build_trusted_root_list(): location = 'ssl/server-certs/ca-chain.pem' - validator = Validator(roots=[location], base_store=MockX509Store) + cache = CRLCache(root_location=location, crl_locations=[], store_class=MockX509Store) with open(location) as f: content = f.read() - assert len(validator.store.certs) == content.count('BEGIN CERT') + assert len(cache.certificate_authorities.keys()) == content.count('BEGIN CERT') def test_can_validate_certificate(): - validator = Validator( - roots=['ssl/server-certs/ca-chain.pem'], - crl_locations=['ssl/client-certs/client-ca.der.crl'] - ) + cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=['ssl/client-certs/client-ca.der.crl']) good_cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() bad_cert = open('ssl/client-certs/bad-atat.mil.crt', 'rb').read() - assert validator.validate(good_cert) - assert validator.validate(bad_cert) == False + assert Validator(cache, good_cert).validate() + assert Validator(cache, bad_cert).validate() == False def test_can_dynamically_update_crls(tmpdir): crl_file = tmpdir.join('test.crl') shutil.copyfile('ssl/client-certs/client-ca.der.crl', crl_file) - validator = Validator( - roots=['ssl/server-certs/ca-chain.pem'], - crl_locations=[crl_file] - ) + cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=[crl_file]) cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() - assert validator.validate(cert) + assert Validator(cache, cert).validate() # override the original CRL with one that revokes atat.mil.crt shutil.copyfile('tests/fixtures/test.der.crl', crl_file) - assert validator.validate(cert) == False + assert Validator(cache, cert).validate() == False def test_parse_disa_pki_list(): with open('tests/fixtures/disa-pki.html') as disa: diff --git a/tests/fixtures/test.der.crl b/tests/fixtures/test.der.crl index dc8310f225966e2eb591a9e60ec4eba01991b48a..24708b0bcd0e9d01ed2ef247ea5d4f308848d03f 100644 GIT binary patch literal 530 zcmXqLV&XGs{BFR@#;Mij(e|B}k&%U!!Jx6%klTQhjX9KsO_(V(*ih6!7{uWa<_Sp6 z%PX$TDND@DOoR!u1BEj(a}rZha|$wm!U6_-Ak|#L>~4vr6)yQMFvGZonF7KM1r7K? z(#*mfAZZ1Vyqtjy+yR`7V*F`9iIn^l2O!o<&d(L+HMB6WFf=m+VUs8@*V51w$~CYs zkYeIwSmAAOC+wAYGLm9b1LG*L+`YH4T!qioZ-V8_j0}tnOw0@oqtLv~+{DPppcwUX z?^D^Ejt`G4l``(g|CKdi4F zgqHCh+`7jh_dU{Mqy|^Y+z^SFFWnr3G}#C`L3z#jSdm)HBJuEpW74PTNRID*XZ!XL4vRe#-A`4h LU%7wlq+C1z7&E~0 literal 768 zcmZ9KNpqtx6ol{mirif(W3eP0<{*K0`_b@qZYK ztcP3JGN?Qjjd@z0JcH$um0*=v#TT($9$JMU>Bucwc>Y+r-LnoM#TZd&wJk=_EtbHk z%FX^#UEntKGLT7qjA6ArF(9;Py}2IdMPQtkA0>PqN*Le!$CsA*!$a*lSkGg=y>d56 zl<*!YsA+=4mSVG6Je!-Jb?WVdUT{g*`8KcA`2$DbdJEE_%EnT-XVB(Xny;b8m@ni| zITs4UNh>`cWGg-~aGKEzLT@LFL7sI?HH~Gx+Rw5m0QzY$^VIK;{Js8x_T}*)@9gri zTSU7QN-GX>bn@N#7grr<%vN??s@~+CV$z8D%BcEk>XnuW;|{=vO{1=(lpW8tHCeVp4+j_lo?R#4&?ReEZWz2@$DJD$}YLo@Nwm(Zx P!2d|ERpZe5Z{~jicbxO@ From e931560dc6a646ba8a7c67a9ce2b5b25f45be61b Mon Sep 17 00:00:00 2001 From: dandds Date: Thu, 16 Aug 2018 14:45:46 -0400 Subject: [PATCH 3/8] more straightforward crl check function --- atst/domain/authnid/__init__.py | 16 +-- atst/domain/authnid/crl/__init__.py | 124 +++--------------- .../authnid/test_authentication_context.py | 18 +-- tests/domain/authnid/test_crl.py | 12 +- 4 files changed, 41 insertions(+), 129 deletions(-) diff --git a/atst/domain/authnid/__init__.py b/atst/domain/authnid/__init__.py index 35a4477c..b31e00fb 100644 --- a/atst/domain/authnid/__init__.py +++ b/atst/domain/authnid/__init__.py @@ -1,7 +1,7 @@ from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users from .utils import parse_sdn, email_from_certificate -from .crl import Validator +from .crl import crl_check, CRLException class AuthenticationContext(): @@ -22,8 +22,7 @@ class AuthenticationContext(): if not self.auth_status == "SUCCESS": raise UnauthenticatedError("SSL/TLS client authentication failed") - elif not self._crl_check(): - raise UnauthenticatedError("Client certificate failed CRL check") + self._crl_check() return True @@ -45,13 +44,10 @@ class AuthenticationContext(): return None def _crl_check(self): - validator = Validator(self.crl_cache, self.cert) - if self.cert: - result = validator.validate() - return result - - else: - return False + try: + crl_check(self.crl_cache, self.cert) + except CRLException as exc: + raise UnauthenticatedError("CRL check failed. " + str(exc)) @property def parsed_sdn(self): diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index 5bbc7c72..5c53ffe9 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -4,6 +4,8 @@ import re import hashlib from OpenSSL import crypto, SSL +class CRLException(Exception): + pass def sha256_checksum(filename, block_size=65536): sha256 = hashlib.sha256() @@ -13,6 +15,22 @@ def sha256_checksum(filename, block_size=65536): return sha256.hexdigest() +def crl_check(cache, cert): + parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert) + store = cache.get_store(parsed) + context = crypto.X509StoreContext(store, parsed) + try: + context.verify_certificate() + return True + + except crypto.X509StoreContextError as err: + raise CRLException( + "Certificate revoked or errored. Error: {}. Args: {}".format( + type(err), err.args + ) + ) + + class CRLCache(): _PEM_RE = re.compile( @@ -77,109 +95,3 @@ class CRLCache(): else: return self.x509_stores[issuer] - -class Validator: - - def __init__(self, cache, cert, logger=None): - self.cache = cache - self.cert = cert - self.logger = logger - - def validate(self): - parsed = crypto.load_certificate(crypto.FILETYPE_PEM, self.cert) - store = self.cache.get_store(parsed) - context = crypto.X509StoreContext(store, parsed) - try: - context.verify_certificate() - return True - - except crypto.X509StoreContextError as err: - self.log_error( - "Certificate revoked or errored. Error: {}. Args: {}".format( - type(err), err.args - ) - ) - return False - - def _add_roots(self, roots): - with open(filename, "rb") as f: - for raw_ca in self._parse_roots(f.read()): - ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca) - self._add_carefully("add_cert", ca) - - def _reset(self): - self.cache = {} - self.store = self.base_store() - self._add_crls(self.crl_locations) - self._add_roots(self.roots) - self.store.set_flags(crypto.X509StoreFlags.CRL_CHECK) - - def log_error(self, message): - if self.logger: - self.logger.error(message) - - def _add_crls(self, locations): - for filename in locations: - try: - self._add_crl(filename) - except crypto.Error as err: - self.log_error( - "CRL could not be parsed. Filename: {}, Error: {}, args: {}".format( - filename, type(err), err.args - ) - ) - - # This caches the CRL issuer with the CRL filepath and a checksum, in addition to adding the CRL to the store. - - def _add_crl(self, filename): - with open(filename, "rb") as crl_file: - crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) - self.cache[crl.get_issuer().der()] = (filename, sha256_checksum(filename)) - self._add_carefully("add_crl", crl) - - def _parse_roots(self, root_str): - return [match.group(0) for match in self._PEM_RE.finditer(root_str)] - - def _add_roots(self, roots): - for filename in roots: - with open(filename, "rb") as f: - for raw_ca in self._parse_roots(f.read()): - ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca) - self._add_carefully("add_cert", ca) - - # in testing, it seems that openssl is maintaining a local cache of certs - # in a hash table and throws errors if you try to add redundant certs or - # CRLs. For now, we catch and ignore that error with great specificity. - - def _add_carefully(self, method_name, obj): - try: - getattr(self.store, method_name)(obj) - except crypto.Error as error: - if self._is_preloaded_error(error): - pass - else: - raise error - - PRELOADED_CRL = ( - [ - ( - "x509 certificate routines", - "X509_STORE_add_crl", - "cert already in hash table", - ) - ], - ) - PRELOADED_CERT = ( - [ - ( - "x509 certificate routines", - "X509_STORE_add_cert", - "cert already in hash table", - ) - ], - ) - - def _is_preloaded_error(self, error): - return error.args == self.PRELOADED_CRL or error.args == self.PRELOADED_CERT - - # Checks that the CRL currently in-memory is up-to-date via the checksum. diff --git a/tests/domain/authnid/test_authentication_context.py b/tests/domain/authnid/test_authentication_context.py index f07d6e94..d7c8e157 100644 --- a/tests/domain/authnid/test_authentication_context.py +++ b/tests/domain/authnid/test_authentication_context.py @@ -1,6 +1,7 @@ import pytest from atst.domain.authnid import AuthenticationContext +from atst.domain.authnid.crl import CRLCache from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users @@ -16,7 +17,7 @@ class MockCRLCache(): def test_can_authenticate(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, CERT ) @@ -24,7 +25,7 @@ def test_can_authenticate(monkeypatch): def test_unsuccessful_status(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) auth_context = AuthenticationContext( MockCRLCache(), "FAILURE", DOD_SDN, CERT ) @@ -36,9 +37,10 @@ def test_unsuccessful_status(monkeypatch): def test_crl_check_fails(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: False) + cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=['ssl/client-certs/client-ca.der.crl']) + cert = open("ssl/client-certs/bad-atat.mil.crt", "r").read() auth_context = AuthenticationContext( - MockCRLCache(), "SUCCESS", DOD_SDN, CERT + cache, "SUCCESS", DOD_SDN, cert ) with pytest.raises(UnauthenticatedError) as excinfo: assert auth_context.authenticate() @@ -48,7 +50,7 @@ def test_crl_check_fails(monkeypatch): def test_bad_sdn(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", "abc123", CERT ) @@ -60,7 +62,7 @@ def test_bad_sdn(monkeypatch): def test_user_exists(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) user = UserFactory.create(**DOD_SDN_INFO) auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, CERT @@ -71,7 +73,7 @@ def test_user_exists(monkeypatch): def test_creates_user(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) # check user does not exist with pytest.raises(NotFoundError): Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) @@ -85,7 +87,7 @@ def test_creates_user(monkeypatch): def test_user_cert_has_no_email(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) + monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) cert = open("ssl/client-certs/atat.mil.crt").read() auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, cert diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index fcd371b3..34828567 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -4,7 +4,7 @@ import re import os import shutil from OpenSSL import crypto, SSL -from atst.domain.authnid.crl import Validator, CRLCache +from atst.domain.authnid.crl import crl_check, CRLCache, CRLException import atst.domain.authnid.crl.util as util @@ -39,18 +39,20 @@ def test_can_validate_certificate(): cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=['ssl/client-certs/client-ca.der.crl']) good_cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() bad_cert = open('ssl/client-certs/bad-atat.mil.crt', 'rb').read() - assert Validator(cache, good_cert).validate() - assert Validator(cache, bad_cert).validate() == False + assert crl_check(cache, good_cert) + with pytest.raises(CRLException): + crl_check(cache, bad_cert) def test_can_dynamically_update_crls(tmpdir): crl_file = tmpdir.join('test.crl') shutil.copyfile('ssl/client-certs/client-ca.der.crl', crl_file) cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=[crl_file]) cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() - assert Validator(cache, cert).validate() + assert crl_check(cache, cert) # override the original CRL with one that revokes atat.mil.crt shutil.copyfile('tests/fixtures/test.der.crl', crl_file) - assert Validator(cache, cert).validate() == False + with pytest.raises(CRLException): + assert crl_check(cache, cert) def test_parse_disa_pki_list(): with open('tests/fixtures/disa-pki.html') as disa: From 1f7848741b7a0ed9d22a88080101dbce07dd03fc Mon Sep 17 00:00:00 2001 From: dandds Date: Thu, 16 Aug 2018 15:31:36 -0400 Subject: [PATCH 4/8] simpler CRL implementation; load as-need because we cannot marshal openssl objects in python --- atst/domain/authnid/crl/__init__.py | 43 +++++++++++++---------------- tests/domain/authnid/test_crl.py | 5 ++-- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index 5c53ffe9..da1d0b4d 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -4,9 +4,11 @@ import re import hashlib from OpenSSL import crypto, SSL + class CRLException(Exception): pass + def sha256_checksum(filename, block_size=65536): sha256 = hashlib.sha256() with open(filename, "rb") as f: @@ -39,13 +41,15 @@ class CRLCache(): ) def __init__(self, root_location, crl_locations=[], store_class=crypto.X509Store): - self.crl_cache = {} self.store_class = store_class + self.certificate_authorities = {} self._load_roots(root_location) - self._build_x509_stores(crl_locations) + self._build_crl_cache(crl_locations) + + def get_store(self, cert): + return self._build_store(cert.get_issuer().der()) def _load_roots(self, root_location): - self.certificate_authorities = {} with open(root_location, "rb") as f: for raw_ca in self._parse_roots(f.read()): ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca) @@ -54,21 +58,25 @@ class CRLCache(): def _parse_roots(self, root_str): return [match.group(0) for match in self._PEM_RE.finditer(root_str)] - def _build_x509_stores(self, crl_locations): - self.x509_stores = {} - for crl_path in crl_locations: - issuer, store = self._build_store(crl_path) - self.x509_stores[issuer] = store + def _build_crl_cache(self, crl_locations): + self.crl_cache = {} + for crl_location in crl_locations: + crl = self._load_crl(crl_location) + self.crl_cache[crl.get_issuer().der()] = crl_location - def _build_store(self, crl_location): + def _load_crl(self, crl_location): + with open(crl_location, "rb") as crl_file: + return crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) + + def _build_store(self, issuer): store = self.store_class() store.set_flags(crypto.X509StoreFlags.CRL_CHECK) + crl_location = self.crl_cache[issuer] with open(crl_location, "rb") as crl_file: crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) - self.crl_cache[crl.get_issuer().der()] = (crl_location, sha256_checksum(crl_location)) store.add_crl(crl) store = self._add_certificate_chain_to_store(store, crl.get_issuer()) - return (crl.get_issuer().der(), store) + return store # this _should_ happen just twice for the DoD PKI (intermediary, root) but # theoretically it can build a longer certificate chain @@ -82,16 +90,3 @@ class CRLCache(): else: return self._add_certificate_chain_to_store(store, ca.get_issuer()) - def get_store(self, cert): - return self._check_cache(cert.get_issuer().der()) - - def _check_cache(self, issuer): - if issuer in self.crl_cache: - filename, checksum = self.crl_cache[issuer] - if sha256_checksum(filename) != checksum: - issuer, store = self._build_store(filename) - self.x509_stores[issuer] = store - return store - else: - return self.x509_stores[issuer] - diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 34828567..8fdc74a4 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -22,11 +22,12 @@ class MockX509Store(): def set_flags(self, flag): pass + def test_can_build_crl_list(monkeypatch): location = 'ssl/client-certs/client-ca.der.crl' cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=[location], store_class=MockX509Store) - for store in cache.x509_stores.values(): - assert len(store.crls) == 1 + assert len(cache.crl_cache.keys()) == 1 + def test_can_build_trusted_root_list(): location = 'ssl/server-certs/ca-chain.pem' From 714c82364f8a8fc52f4952b42aa03b00e5e489cb Mon Sep 17 00:00:00 2001 From: dandds Date: Fri, 17 Aug 2018 10:48:49 -0400 Subject: [PATCH 5/8] more specific name for CRL revocation exception --- atst/domain/authnid/__init__.py | 4 ++-- atst/domain/authnid/crl/__init__.py | 4 ++-- tests/domain/authnid/test_crl.py | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/atst/domain/authnid/__init__.py b/atst/domain/authnid/__init__.py index b31e00fb..46dc9445 100644 --- a/atst/domain/authnid/__init__.py +++ b/atst/domain/authnid/__init__.py @@ -1,7 +1,7 @@ from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users from .utils import parse_sdn, email_from_certificate -from .crl import crl_check, CRLException +from .crl import crl_check, CRLRevocationException class AuthenticationContext(): @@ -46,7 +46,7 @@ class AuthenticationContext(): def _crl_check(self): try: crl_check(self.crl_cache, self.cert) - except CRLException as exc: + except CRLRevocationException as exc: raise UnauthenticatedError("CRL check failed. " + str(exc)) @property diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index da1d0b4d..0dcf1fc0 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -5,7 +5,7 @@ import hashlib from OpenSSL import crypto, SSL -class CRLException(Exception): +class CRLRevocationException(Exception): pass @@ -26,7 +26,7 @@ def crl_check(cache, cert): return True except crypto.X509StoreContextError as err: - raise CRLException( + raise CRLRevocationException( "Certificate revoked or errored. Error: {}. Args: {}".format( type(err), err.args ) diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 8fdc74a4..5bd009be 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -4,7 +4,7 @@ import re import os import shutil from OpenSSL import crypto, SSL -from atst.domain.authnid.crl import crl_check, CRLCache, CRLException +from atst.domain.authnid.crl import crl_check, CRLCache, CRLRevocationException import atst.domain.authnid.crl.util as util @@ -41,7 +41,7 @@ def test_can_validate_certificate(): good_cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() bad_cert = open('ssl/client-certs/bad-atat.mil.crt', 'rb').read() assert crl_check(cache, good_cert) - with pytest.raises(CRLException): + with pytest.raises(CRLRevocationException): crl_check(cache, bad_cert) def test_can_dynamically_update_crls(tmpdir): @@ -52,7 +52,7 @@ def test_can_dynamically_update_crls(tmpdir): assert crl_check(cache, cert) # override the original CRL with one that revokes atat.mil.crt shutil.copyfile('tests/fixtures/test.der.crl', crl_file) - with pytest.raises(CRLException): + with pytest.raises(CRLRevocationException): assert crl_check(cache, cert) def test_parse_disa_pki_list(): From ca2763fd030db1c3027731150c4561a1c7077b7c Mon Sep 17 00:00:00 2001 From: dandds Date: Fri, 17 Aug 2018 11:02:20 -0400 Subject: [PATCH 6/8] handle case where certificate issuer is not in existing cache --- atst/domain/authnid/crl/__init__.py | 13 ++++- tests/domain/authnid/test_crl.py | 73 +++++++++++++++++++++-------- 2 files changed, 64 insertions(+), 22 deletions(-) diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index 0dcf1fc0..f1c93fb7 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -71,15 +71,24 @@ class CRLCache(): def _build_store(self, issuer): store = self.store_class() store.set_flags(crypto.X509StoreFlags.CRL_CHECK) - crl_location = self.crl_cache[issuer] + crl_location = self._get_crl_location(issuer) with open(crl_location, "rb") as crl_file: crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) store.add_crl(crl) store = self._add_certificate_chain_to_store(store, crl.get_issuer()) return store + def _get_crl_location(self, issuer): + crl_location = self.crl_cache.get(issuer) + + if not crl_location: + raise CRLRevocationException("Could not find matching CRL for issuer") + + return crl_location + # this _should_ happen just twice for the DoD PKI (intermediary, root) but # theoretically it can build a longer certificate chain + def _add_certificate_chain_to_store(self, store, issuer): ca = self.certificate_authorities.get(issuer.der()) store.add_cert(ca) @@ -87,6 +96,6 @@ class CRLCache(): if issuer == ca.get_subject(): # i.e., it is the root CA and we are at the end of the chain return store + else: return self._add_certificate_chain_to_store(store, ca.get_issuer()) - diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 5bd009be..19757353 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -4,11 +4,15 @@ import re import os import shutil from OpenSSL import crypto, SSL + from atst.domain.authnid.crl import crl_check, CRLCache, CRLRevocationException import atst.domain.authnid.crl.util as util +from tests.mocks import FIXTURE_EMAIL_ADDRESS + class MockX509Store(): + def __init__(self): self.crls = [] self.certs = [] @@ -24,46 +28,69 @@ class MockX509Store(): def test_can_build_crl_list(monkeypatch): - location = 'ssl/client-certs/client-ca.der.crl' - cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=[location], store_class=MockX509Store) + location = "ssl/client-certs/client-ca.der.crl" + cache = CRLCache( + "ssl/client-certs/client-ca.crt", + crl_locations=[location], + store_class=MockX509Store, + ) assert len(cache.crl_cache.keys()) == 1 def test_can_build_trusted_root_list(): - location = 'ssl/server-certs/ca-chain.pem' - cache = CRLCache(root_location=location, crl_locations=[], store_class=MockX509Store) + location = "ssl/server-certs/ca-chain.pem" + cache = CRLCache( + root_location=location, crl_locations=[], store_class=MockX509Store + ) with open(location) as f: content = f.read() - assert len(cache.certificate_authorities.keys()) == content.count('BEGIN CERT') + assert len(cache.certificate_authorities.keys()) == content.count("BEGIN CERT") + def test_can_validate_certificate(): - cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=['ssl/client-certs/client-ca.der.crl']) - good_cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() - bad_cert = open('ssl/client-certs/bad-atat.mil.crt', 'rb').read() + cache = CRLCache( + "ssl/server-certs/ca-chain.pem", + crl_locations=["ssl/client-certs/client-ca.der.crl"], + ) + good_cert = open("ssl/client-certs/atat.mil.crt", "rb").read() + bad_cert = open("ssl/client-certs/bad-atat.mil.crt", "rb").read() assert crl_check(cache, good_cert) with pytest.raises(CRLRevocationException): crl_check(cache, bad_cert) + def test_can_dynamically_update_crls(tmpdir): - crl_file = tmpdir.join('test.crl') - shutil.copyfile('ssl/client-certs/client-ca.der.crl', crl_file) - cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=[crl_file]) - cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() + crl_file = tmpdir.join("test.crl") + shutil.copyfile("ssl/client-certs/client-ca.der.crl", crl_file) + cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[crl_file]) + cert = open("ssl/client-certs/atat.mil.crt", "rb").read() assert crl_check(cache, cert) # override the original CRL with one that revokes atat.mil.crt - shutil.copyfile('tests/fixtures/test.der.crl', crl_file) + shutil.copyfile("tests/fixtures/test.der.crl", crl_file) with pytest.raises(CRLRevocationException): assert crl_check(cache, cert) + +def test_throws_error_for_missing_issuer(): + cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[]) + cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read() + with pytest.raises(CRLRevocationException) as exc: + assert crl_check(cache, cert) + (message,) = exc.value.args + assert "issuer" in message + + def test_parse_disa_pki_list(): - with open('tests/fixtures/disa-pki.html') as disa: + with open("tests/fixtures/disa-pki.html") as disa: disa_html = disa.read() crl_list = util.crl_list_from_disa_html(disa_html) - href_matches = re.findall('DOD(ROOT|EMAIL|ID)?CA', disa_html) + href_matches = re.findall("DOD(ROOT|EMAIL|ID)?CA", disa_html) assert len(crl_list) > 0 assert len(crl_list) == len(href_matches) + class MockStreamingResponse(): + def __init__(self, content_chunks, code=200): self.content_chunks = content_chunks self.status_code = code @@ -77,13 +104,19 @@ class MockStreamingResponse(): def __exit__(self, *args): pass + def test_write_crl(tmpdir, monkeypatch): - monkeypatch.setattr('requests.get', lambda u, **kwargs: MockStreamingResponse([b'it worked'])) - crl = 'crl_1' + monkeypatch.setattr( + "requests.get", lambda u, **kwargs: MockStreamingResponse([b"it worked"]) + ) + crl = "crl_1" assert util.write_crl(tmpdir, "random_target_dir", crl) assert [p.basename for p in tmpdir.listdir()] == [crl] - assert [p.read() for p in tmpdir.listdir()] == ['it worked'] + assert [p.read() for p in tmpdir.listdir()] == ["it worked"] + def test_skips_crl_if_it_has_not_been_modified(tmpdir, monkeypatch): - monkeypatch.setattr('requests.get', lambda u, **kwargs: MockStreamingResponse([b'it worked'], 304)) - assert not util.write_crl(tmpdir, "random_target_dir", 'crl_file_name') + monkeypatch.setattr( + "requests.get", lambda u, **kwargs: MockStreamingResponse([b"it worked"], 304) + ) + assert not util.write_crl(tmpdir, "random_target_dir", "crl_file_name") From d453a9592c69ad3f94a9ce80b61b1b28f9b62323 Mon Sep 17 00:00:00 2001 From: dandds Date: Fri, 17 Aug 2018 11:03:27 -0400 Subject: [PATCH 7/8] remove unused checksum function --- atst/domain/authnid/crl/__init__.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index f1c93fb7..b43b5a8e 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -9,14 +9,6 @@ class CRLRevocationException(Exception): pass -def sha256_checksum(filename, block_size=65536): - sha256 = hashlib.sha256() - with open(filename, "rb") as f: - for block in iter(lambda: f.read(block_size), b""): - sha256.update(block) - return sha256.hexdigest() - - def crl_check(cache, cert): parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert) store = cache.get_store(parsed) From be52c8b9b2fbf0970e26eb688e8dd9ff0d3283ae Mon Sep 17 00:00:00 2001 From: dandds Date: Fri, 17 Aug 2018 11:13:01 -0400 Subject: [PATCH 8/8] make crl_check a CRLCache method --- atst/domain/authnid/__init__.py | 4 +-- atst/domain/authnid/crl/__init__.py | 33 +++++++++-------- .../authnid/test_authentication_context.py | 36 +++++++++---------- tests/domain/authnid/test_crl.py | 12 +++---- 4 files changed, 41 insertions(+), 44 deletions(-) diff --git a/atst/domain/authnid/__init__.py b/atst/domain/authnid/__init__.py index 46dc9445..b7f31717 100644 --- a/atst/domain/authnid/__init__.py +++ b/atst/domain/authnid/__init__.py @@ -1,7 +1,7 @@ from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users from .utils import parse_sdn, email_from_certificate -from .crl import crl_check, CRLRevocationException +from .crl import CRLRevocationException class AuthenticationContext(): @@ -45,7 +45,7 @@ class AuthenticationContext(): def _crl_check(self): try: - crl_check(self.crl_cache, self.cert) + self.crl_cache.crl_check(self.cert) except CRLRevocationException as exc: raise UnauthenticatedError("CRL check failed. " + str(exc)) diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index b43b5a8e..cad27b79 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -9,22 +9,6 @@ class CRLRevocationException(Exception): pass -def crl_check(cache, cert): - parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert) - store = cache.get_store(parsed) - context = crypto.X509StoreContext(store, parsed) - try: - context.verify_certificate() - return True - - except crypto.X509StoreContextError as err: - raise CRLRevocationException( - "Certificate revoked or errored. Error: {}. Args: {}".format( - type(err), err.args - ) - ) - - class CRLCache(): _PEM_RE = re.compile( @@ -38,7 +22,7 @@ class CRLCache(): self._load_roots(root_location) self._build_crl_cache(crl_locations) - def get_store(self, cert): + def _get_store(self, cert): return self._build_store(cert.get_issuer().der()) def _load_roots(self, root_location): @@ -91,3 +75,18 @@ class CRLCache(): else: return self._add_certificate_chain_to_store(store, ca.get_issuer()) + + def crl_check(self, cert): + parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert) + store = self._get_store(parsed) + context = crypto.X509StoreContext(store, parsed) + try: + context.verify_certificate() + return True + + except crypto.X509StoreContextError as err: + raise CRLRevocationException( + "Certificate revoked or errored. Error: {}. Args: {}".format( + type(err), err.args + ) + ) diff --git a/tests/domain/authnid/test_authentication_context.py b/tests/domain/authnid/test_authentication_context.py index d7c8e157..81a9b5d3 100644 --- a/tests/domain/authnid/test_authentication_context.py +++ b/tests/domain/authnid/test_authentication_context.py @@ -1,7 +1,7 @@ import pytest from atst.domain.authnid import AuthenticationContext -from atst.domain.authnid.crl import CRLCache +from atst.domain.authnid.crl import CRLCache, CRLRevocationException from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users @@ -12,20 +12,24 @@ CERT = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read() class MockCRLCache(): - def get_store(self, cert): - pass + def __init__(self, valid=True): + self.valid = valid + + def crl_check(self, cert): + if self.valid: + return True + + raise CRLRevocationException() -def test_can_authenticate(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_can_authenticate(): auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, CERT ) assert auth_context.authenticate() -def test_unsuccessful_status(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_unsuccessful_status(): auth_context = AuthenticationContext( MockCRLCache(), "FAILURE", DOD_SDN, CERT ) @@ -36,11 +40,9 @@ def test_unsuccessful_status(monkeypatch): assert "client authentication" in message -def test_crl_check_fails(monkeypatch): - cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=['ssl/client-certs/client-ca.der.crl']) - cert = open("ssl/client-certs/bad-atat.mil.crt", "r").read() +def test_crl_check_fails(): auth_context = AuthenticationContext( - cache, "SUCCESS", DOD_SDN, cert + MockCRLCache(False), "SUCCESS", DOD_SDN, CERT ) with pytest.raises(UnauthenticatedError) as excinfo: assert auth_context.authenticate() @@ -49,8 +51,7 @@ def test_crl_check_fails(monkeypatch): assert "CRL check" in message -def test_bad_sdn(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_bad_sdn(): auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", "abc123", CERT ) @@ -61,8 +62,7 @@ def test_bad_sdn(monkeypatch): assert "SDN" in message -def test_user_exists(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_user_exists(): user = UserFactory.create(**DOD_SDN_INFO) auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, CERT @@ -72,8 +72,7 @@ def test_user_exists(monkeypatch): assert auth_user == user -def test_creates_user(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_creates_user(): # check user does not exist with pytest.raises(NotFoundError): Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) @@ -86,8 +85,7 @@ def test_creates_user(monkeypatch): assert user.email == FIXTURE_EMAIL_ADDRESS -def test_user_cert_has_no_email(monkeypatch): - monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True) +def test_user_cert_has_no_email(): cert = open("ssl/client-certs/atat.mil.crt").read() auth_context = AuthenticationContext( MockCRLCache(), "SUCCESS", DOD_SDN, cert diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 19757353..7ba3a213 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -5,7 +5,7 @@ import os import shutil from OpenSSL import crypto, SSL -from atst.domain.authnid.crl import crl_check, CRLCache, CRLRevocationException +from atst.domain.authnid.crl import CRLCache, CRLRevocationException import atst.domain.authnid.crl.util as util from tests.mocks import FIXTURE_EMAIL_ADDRESS @@ -54,9 +54,9 @@ def test_can_validate_certificate(): ) good_cert = open("ssl/client-certs/atat.mil.crt", "rb").read() bad_cert = open("ssl/client-certs/bad-atat.mil.crt", "rb").read() - assert crl_check(cache, good_cert) + assert cache.crl_check(good_cert) with pytest.raises(CRLRevocationException): - crl_check(cache, bad_cert) + cache.crl_check(bad_cert) def test_can_dynamically_update_crls(tmpdir): @@ -64,18 +64,18 @@ def test_can_dynamically_update_crls(tmpdir): shutil.copyfile("ssl/client-certs/client-ca.der.crl", crl_file) cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[crl_file]) cert = open("ssl/client-certs/atat.mil.crt", "rb").read() - assert crl_check(cache, cert) + assert cache.crl_check(cert) # override the original CRL with one that revokes atat.mil.crt shutil.copyfile("tests/fixtures/test.der.crl", crl_file) with pytest.raises(CRLRevocationException): - assert crl_check(cache, cert) + assert cache.crl_check(cert) def test_throws_error_for_missing_issuer(): cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[]) cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read() with pytest.raises(CRLRevocationException) as exc: - assert crl_check(cache, cert) + assert cache.crl_check(cert) (message,) = exc.value.args assert "issuer" in message