Merge pull request #557 from dod-ccpo/crl-errors
handle exceptions when loading CRLs
This commit is contained in:
commit
315c70802d
@ -5,6 +5,12 @@ import hashlib
|
|||||||
from OpenSSL import crypto, SSL
|
from OpenSSL import crypto, SSL
|
||||||
|
|
||||||
|
|
||||||
|
def get_common_name(x509_name_object):
|
||||||
|
for comp in x509_name_object.get_components():
|
||||||
|
if comp[0] == b"CN":
|
||||||
|
return comp[1].decode()
|
||||||
|
|
||||||
|
|
||||||
class CRLRevocationException(Exception):
|
class CRLRevocationException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -25,9 +31,7 @@ class NoOpCRLCache(CRLInterface):
|
|||||||
def _get_cn(self, cert):
|
def _get_cn(self, cert):
|
||||||
try:
|
try:
|
||||||
parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
|
parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
|
||||||
for comp in parsed.get_subject().get_components():
|
return get_common_name(parsed.get_subject())
|
||||||
if comp[0] == b"CN":
|
|
||||||
return comp[1].decode()
|
|
||||||
except crypto.Error:
|
except crypto.Error:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -54,14 +58,14 @@ class CRLCache(CRLInterface):
|
|||||||
def __init__(
|
def __init__(
|
||||||
self, root_location, crl_locations=[], store_class=crypto.X509Store, logger=None
|
self, root_location, crl_locations=[], store_class=crypto.X509Store, logger=None
|
||||||
):
|
):
|
||||||
|
self.logger = logger
|
||||||
self.store_class = store_class
|
self.store_class = store_class
|
||||||
self.certificate_authorities = {}
|
self.certificate_authorities = {}
|
||||||
self._load_roots(root_location)
|
self._load_roots(root_location)
|
||||||
self._build_crl_cache(crl_locations)
|
self._build_crl_cache(crl_locations)
|
||||||
self.logger = logger
|
|
||||||
|
|
||||||
def _get_store(self, cert):
|
def _get_store(self, cert):
|
||||||
return self._build_store(cert.get_issuer().der())
|
return self._build_store(cert.get_issuer())
|
||||||
|
|
||||||
def _load_roots(self, root_location):
|
def _load_roots(self, root_location):
|
||||||
with open(root_location, "rb") as f:
|
with open(root_location, "rb") as f:
|
||||||
@ -76,35 +80,39 @@ class CRLCache(CRLInterface):
|
|||||||
self.crl_cache = {}
|
self.crl_cache = {}
|
||||||
for crl_location in crl_locations:
|
for crl_location in crl_locations:
|
||||||
crl = self._load_crl(crl_location)
|
crl = self._load_crl(crl_location)
|
||||||
self.crl_cache[crl.get_issuer().der()] = crl_location
|
if crl:
|
||||||
|
self.crl_cache[crl.get_issuer().der()] = crl_location
|
||||||
|
|
||||||
def _load_crl(self, crl_location):
|
def _load_crl(self, crl_location):
|
||||||
with open(crl_location, "rb") as crl_file:
|
with open(crl_location, "rb") as crl_file:
|
||||||
return crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
try:
|
||||||
|
return crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
||||||
|
except crypto.Error:
|
||||||
|
self._log_info("Could not load CRL at location {}".format(crl_location))
|
||||||
|
|
||||||
def _build_store(self, issuer):
|
def _build_store(self, issuer):
|
||||||
store = self.store_class()
|
store = self.store_class()
|
||||||
self._log_info("STORE ID: {}. Building store.".format(id(store)))
|
self._log_info("STORE ID: {}. Building store.".format(id(store)))
|
||||||
store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
|
store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
|
||||||
crl_location = self._get_crl_location(issuer)
|
crl_location = self.crl_cache.get(issuer.der())
|
||||||
with open(crl_location, "rb") as crl_file:
|
issuer_name = get_common_name(issuer)
|
||||||
crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
|
||||||
store.add_crl(crl)
|
|
||||||
self._log_info(
|
|
||||||
"STORE ID: {}. Adding CRL with issuer {}".format(
|
|
||||||
id(store), crl.get_issuer()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
store = self._add_certificate_chain_to_store(store, crl.get_issuer())
|
|
||||||
return store
|
|
||||||
|
|
||||||
def _get_crl_location(self, issuer):
|
|
||||||
crl_location = self.crl_cache.get(issuer)
|
|
||||||
|
|
||||||
if not crl_location:
|
if not crl_location:
|
||||||
raise CRLRevocationException("Could not find matching CRL for issuer")
|
raise CRLRevocationException(
|
||||||
|
"Could not find matching CRL for issuer with Common Name {}".format(
|
||||||
|
issuer_name
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
return crl_location
|
crl = self._load_crl(crl_location)
|
||||||
|
store.add_crl(crl)
|
||||||
|
self._log_info(
|
||||||
|
"STORE ID: {}. Adding CRL with issuer Common Name {}".format(
|
||||||
|
id(store), issuer_name
|
||||||
|
)
|
||||||
|
)
|
||||||
|
store = self._add_certificate_chain_to_store(store, crl.get_issuer())
|
||||||
|
return store
|
||||||
|
|
||||||
# this _should_ happen just twice for the DoD PKI (intermediary, root) but
|
# this _should_ happen just twice for the DoD PKI (intermediary, root) but
|
||||||
# theoretically it can build a longer certificate chain
|
# theoretically it can build a longer certificate chain
|
||||||
|
@ -8,7 +8,7 @@ from OpenSSL import crypto, SSL
|
|||||||
from atst.domain.authnid.crl import CRLCache, CRLRevocationException, NoOpCRLCache
|
from atst.domain.authnid.crl import CRLCache, CRLRevocationException, NoOpCRLCache
|
||||||
import atst.domain.authnid.crl.util as util
|
import atst.domain.authnid.crl.util as util
|
||||||
|
|
||||||
from tests.mocks import FIXTURE_EMAIL_ADDRESS
|
from tests.mocks import FIXTURE_EMAIL_ADDRESS, DOD_CN
|
||||||
|
|
||||||
|
|
||||||
class MockX509Store:
|
class MockX509Store:
|
||||||
@ -46,6 +46,16 @@ def test_can_build_trusted_root_list():
|
|||||||
assert len(cache.certificate_authorities.keys()) == content.count("BEGIN CERT")
|
assert len(cache.certificate_authorities.keys()) == content.count("BEGIN CERT")
|
||||||
|
|
||||||
|
|
||||||
|
def test_can_build_crl_list_with_missing_crls():
|
||||||
|
location = "ssl/client-certs/client-ca.der.crl"
|
||||||
|
cache = CRLCache(
|
||||||
|
"ssl/client-certs/client-ca.crt",
|
||||||
|
crl_locations=["tests/fixtures/sample.pdf"],
|
||||||
|
store_class=MockX509Store,
|
||||||
|
)
|
||||||
|
assert len(cache.crl_cache.keys()) == 0
|
||||||
|
|
||||||
|
|
||||||
def test_can_validate_certificate():
|
def test_can_validate_certificate():
|
||||||
cache = CRLCache(
|
cache = CRLCache(
|
||||||
"ssl/server-certs/ca-chain.pem",
|
"ssl/server-certs/ca-chain.pem",
|
||||||
@ -72,11 +82,17 @@ def test_can_dynamically_update_crls(tmpdir):
|
|||||||
|
|
||||||
def test_throws_error_for_missing_issuer():
|
def test_throws_error_for_missing_issuer():
|
||||||
cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[])
|
cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[])
|
||||||
|
# this cert is self-signed, and so the application does not have a
|
||||||
|
# corresponding CRL for it
|
||||||
cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read()
|
cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read()
|
||||||
with pytest.raises(CRLRevocationException) as exc:
|
with pytest.raises(CRLRevocationException) as exc:
|
||||||
assert cache.crl_check(cert)
|
assert cache.crl_check(cert)
|
||||||
(message,) = exc.value.args
|
(message,) = exc.value.args
|
||||||
|
# objects that the issuer is missing
|
||||||
assert "issuer" in message
|
assert "issuer" in message
|
||||||
|
# names the issuer we were expecting to find a CRL for; same as the
|
||||||
|
# certificate subject in this case because the cert is self-signed
|
||||||
|
assert DOD_CN in message
|
||||||
|
|
||||||
|
|
||||||
def test_multistep_certificate_chain():
|
def test_multistep_certificate_chain():
|
||||||
|
@ -2,7 +2,8 @@ from tests.factories import RequestFactory, UserFactory
|
|||||||
|
|
||||||
|
|
||||||
DOD_SDN_INFO = {"first_name": "ART", "last_name": "GARFUNKEL", "dod_id": "5892460358"}
|
DOD_SDN_INFO = {"first_name": "ART", "last_name": "GARFUNKEL", "dod_id": "5892460358"}
|
||||||
DOD_SDN = f"CN={DOD_SDN_INFO['last_name']}.{DOD_SDN_INFO['first_name']}.G.{DOD_SDN_INFO['dod_id']},OU=OTHER,OU=PKI,OU=DoD,O=U.S. Government,C=US"
|
DOD_CN = f"{DOD_SDN_INFO['last_name']}.{DOD_SDN_INFO['first_name']}.G.{DOD_SDN_INFO['dod_id']}"
|
||||||
|
DOD_SDN = f"CN={DOD_CN},OU=OTHER,OU=PKI,OU=DoD,O=U.S. Government,C=US"
|
||||||
|
|
||||||
MOCK_VALID_PE_ID = "080675309U"
|
MOCK_VALID_PE_ID = "080675309U"
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user