beginning of a CRL loader rewrite
This commit is contained in:
parent
06c44f86c9
commit
a72c8498a2
@ -16,7 +16,7 @@ from atst.routes.workspaces import bp as workspace_routes
|
|||||||
from atst.routes.requests import requests_bp
|
from atst.routes.requests import requests_bp
|
||||||
from atst.routes.dev import bp as dev_routes
|
from atst.routes.dev import bp as dev_routes
|
||||||
from atst.routes.errors import make_error_pages
|
from atst.routes.errors import make_error_pages
|
||||||
from atst.domain.authnid.crl import Validator
|
from atst.domain.authnid.crl import Validator, CRLCache
|
||||||
from atst.domain.auth import apply_authentication
|
from atst.domain.auth import apply_authentication
|
||||||
|
|
||||||
|
|
||||||
@ -141,7 +141,5 @@ def make_crl_validator(app):
|
|||||||
crl_locations = []
|
crl_locations = []
|
||||||
for filename in pathlib.Path(app.config["CRL_DIRECTORY"]).glob("*"):
|
for filename in pathlib.Path(app.config["CRL_DIRECTORY"]).glob("*"):
|
||||||
crl_locations.append(filename.absolute())
|
crl_locations.append(filename.absolute())
|
||||||
app.crl_validator = Validator(
|
app.crl_cache = CRLCache(app.config["CA_CHAIN"], crl_locations)
|
||||||
roots=[app.config["CA_CHAIN"]], crl_locations=crl_locations, logger=app.logger
|
|
||||||
)
|
|
||||||
|
|
||||||
|
@ -13,6 +13,58 @@ def sha256_checksum(filename, block_size=65536):
|
|||||||
return sha256.hexdigest()
|
return sha256.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
class CRLCache():
|
||||||
|
|
||||||
|
_PEM_RE = re.compile(
|
||||||
|
b"-----BEGIN CERTIFICATE-----\r?.+?\r?-----END CERTIFICATE-----\r?\n?",
|
||||||
|
re.DOTALL,
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, root_location, crl_locations=[], store_class=crypto.X509Store):
|
||||||
|
self.crl_cache = {}
|
||||||
|
self.store_class = store_class
|
||||||
|
self._load_roots(root_location)
|
||||||
|
self._build_x509_stores(crl_locations)
|
||||||
|
|
||||||
|
def _load_roots(self, root_location):
|
||||||
|
self.certificate_authorities = {}
|
||||||
|
with open(root_location, "rb") as f:
|
||||||
|
for raw_ca in self._parse_roots(f.read()):
|
||||||
|
ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca)
|
||||||
|
self.certificate_authorities[ca.get_subject().der()] = ca
|
||||||
|
|
||||||
|
def _parse_roots(self, root_str):
|
||||||
|
return [match.group(0) for match in self._PEM_RE.finditer(root_str)]
|
||||||
|
|
||||||
|
def _build_x509_stores(self, crl_locations):
|
||||||
|
self.x509_stores = {}
|
||||||
|
for crl_path in crl_locations:
|
||||||
|
issuer, store = self._build_store(crl_path)
|
||||||
|
self.x509_stores[issuer] = store
|
||||||
|
|
||||||
|
def _build_store(self, crl_location):
|
||||||
|
store = self.store_class()
|
||||||
|
store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
|
||||||
|
with open(crl_location, "rb") as crl_file:
|
||||||
|
crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
||||||
|
self.crl_cache[crl.get_issuer().der()] = (crl_location, sha256_checksum(crl_location))
|
||||||
|
store.add_crl(crl)
|
||||||
|
store = self._add_certificate_chain_to_store(store, crl.get_issuer())
|
||||||
|
return (crl.get_issuer().der(), store)
|
||||||
|
|
||||||
|
# this _should_ happen just twice for the DoD PKI (intermediary, root) but
|
||||||
|
# theoretically it can build a longer certificate chain
|
||||||
|
def _add_certificate_chain_to_store(self, store, issuer):
|
||||||
|
ca = self.certificate_authorities.get(issuer.der())
|
||||||
|
# i.e., it is the root CA
|
||||||
|
if issuer == ca.get_subject():
|
||||||
|
return store
|
||||||
|
|
||||||
|
store.add_cert(ca)
|
||||||
|
return self._add_certificate_chain_to_store(store, ca.get_issuer())
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Validator:
|
class Validator:
|
||||||
|
|
||||||
_PEM_RE = re.compile(
|
_PEM_RE = re.compile(
|
||||||
@ -20,13 +72,19 @@ class Validator:
|
|||||||
re.DOTALL,
|
re.DOTALL,
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, crl_locations=[], roots=[], base_store=crypto.X509Store, logger=None):
|
def __init__(self, root, crl_locations=[], base_store=crypto.X509Store, logger=None):
|
||||||
self.crl_locations = crl_locations
|
self.crl_locations = crl_locations
|
||||||
self.roots = roots
|
self.root = root
|
||||||
self.base_store = base_store
|
self.base_store = base_store
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
self._reset()
|
self._reset()
|
||||||
|
|
||||||
|
def _add_roots(self, roots):
|
||||||
|
with open(filename, "rb") as f:
|
||||||
|
for raw_ca in self._parse_roots(f.read()):
|
||||||
|
ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca)
|
||||||
|
self._add_carefully("add_cert", ca)
|
||||||
|
|
||||||
def _reset(self):
|
def _reset(self):
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
self.store = self.base_store()
|
self.store = self.base_store()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user