simpler CRL implementation; load as-need because we cannot marshal openssl objects in python
This commit is contained in:
parent
e931560dc6
commit
1f7848741b
@ -4,9 +4,11 @@ import re
|
|||||||
import hashlib
|
import hashlib
|
||||||
from OpenSSL import crypto, SSL
|
from OpenSSL import crypto, SSL
|
||||||
|
|
||||||
|
|
||||||
class CRLException(Exception):
|
class CRLException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def sha256_checksum(filename, block_size=65536):
|
def sha256_checksum(filename, block_size=65536):
|
||||||
sha256 = hashlib.sha256()
|
sha256 = hashlib.sha256()
|
||||||
with open(filename, "rb") as f:
|
with open(filename, "rb") as f:
|
||||||
@ -39,13 +41,15 @@ class CRLCache():
|
|||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, root_location, crl_locations=[], store_class=crypto.X509Store):
|
def __init__(self, root_location, crl_locations=[], store_class=crypto.X509Store):
|
||||||
self.crl_cache = {}
|
|
||||||
self.store_class = store_class
|
self.store_class = store_class
|
||||||
|
self.certificate_authorities = {}
|
||||||
self._load_roots(root_location)
|
self._load_roots(root_location)
|
||||||
self._build_x509_stores(crl_locations)
|
self._build_crl_cache(crl_locations)
|
||||||
|
|
||||||
|
def get_store(self, cert):
|
||||||
|
return self._build_store(cert.get_issuer().der())
|
||||||
|
|
||||||
def _load_roots(self, root_location):
|
def _load_roots(self, root_location):
|
||||||
self.certificate_authorities = {}
|
|
||||||
with open(root_location, "rb") as f:
|
with open(root_location, "rb") as f:
|
||||||
for raw_ca in self._parse_roots(f.read()):
|
for raw_ca in self._parse_roots(f.read()):
|
||||||
ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca)
|
ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca)
|
||||||
@ -54,21 +58,25 @@ class CRLCache():
|
|||||||
def _parse_roots(self, root_str):
|
def _parse_roots(self, root_str):
|
||||||
return [match.group(0) for match in self._PEM_RE.finditer(root_str)]
|
return [match.group(0) for match in self._PEM_RE.finditer(root_str)]
|
||||||
|
|
||||||
def _build_x509_stores(self, crl_locations):
|
def _build_crl_cache(self, crl_locations):
|
||||||
self.x509_stores = {}
|
self.crl_cache = {}
|
||||||
for crl_path in crl_locations:
|
for crl_location in crl_locations:
|
||||||
issuer, store = self._build_store(crl_path)
|
crl = self._load_crl(crl_location)
|
||||||
self.x509_stores[issuer] = store
|
self.crl_cache[crl.get_issuer().der()] = crl_location
|
||||||
|
|
||||||
def _build_store(self, crl_location):
|
def _load_crl(self, crl_location):
|
||||||
|
with open(crl_location, "rb") as crl_file:
|
||||||
|
return crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
||||||
|
|
||||||
|
def _build_store(self, issuer):
|
||||||
store = self.store_class()
|
store = self.store_class()
|
||||||
store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
|
store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
|
||||||
|
crl_location = self.crl_cache[issuer]
|
||||||
with open(crl_location, "rb") as crl_file:
|
with open(crl_location, "rb") as crl_file:
|
||||||
crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
||||||
self.crl_cache[crl.get_issuer().der()] = (crl_location, sha256_checksum(crl_location))
|
|
||||||
store.add_crl(crl)
|
store.add_crl(crl)
|
||||||
store = self._add_certificate_chain_to_store(store, crl.get_issuer())
|
store = self._add_certificate_chain_to_store(store, crl.get_issuer())
|
||||||
return (crl.get_issuer().der(), store)
|
return store
|
||||||
|
|
||||||
# this _should_ happen just twice for the DoD PKI (intermediary, root) but
|
# this _should_ happen just twice for the DoD PKI (intermediary, root) but
|
||||||
# theoretically it can build a longer certificate chain
|
# theoretically it can build a longer certificate chain
|
||||||
@ -82,16 +90,3 @@ class CRLCache():
|
|||||||
else:
|
else:
|
||||||
return self._add_certificate_chain_to_store(store, ca.get_issuer())
|
return self._add_certificate_chain_to_store(store, ca.get_issuer())
|
||||||
|
|
||||||
def get_store(self, cert):
|
|
||||||
return self._check_cache(cert.get_issuer().der())
|
|
||||||
|
|
||||||
def _check_cache(self, issuer):
|
|
||||||
if issuer in self.crl_cache:
|
|
||||||
filename, checksum = self.crl_cache[issuer]
|
|
||||||
if sha256_checksum(filename) != checksum:
|
|
||||||
issuer, store = self._build_store(filename)
|
|
||||||
self.x509_stores[issuer] = store
|
|
||||||
return store
|
|
||||||
else:
|
|
||||||
return self.x509_stores[issuer]
|
|
||||||
|
|
||||||
|
@ -22,11 +22,12 @@ class MockX509Store():
|
|||||||
def set_flags(self, flag):
|
def set_flags(self, flag):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def test_can_build_crl_list(monkeypatch):
|
def test_can_build_crl_list(monkeypatch):
|
||||||
location = 'ssl/client-certs/client-ca.der.crl'
|
location = 'ssl/client-certs/client-ca.der.crl'
|
||||||
cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=[location], store_class=MockX509Store)
|
cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=[location], store_class=MockX509Store)
|
||||||
for store in cache.x509_stores.values():
|
assert len(cache.crl_cache.keys()) == 1
|
||||||
assert len(store.crls) == 1
|
|
||||||
|
|
||||||
def test_can_build_trusted_root_list():
|
def test_can_build_trusted_root_list():
|
||||||
location = 'ssl/server-certs/ca-chain.pem'
|
location = 'ssl/server-certs/ca-chain.pem'
|
||||||
|
Loading…
x
Reference in New Issue
Block a user