commit
dcea156dc5
@ -16,7 +16,7 @@ from atst.routes.workspaces import bp as workspace_routes
|
|||||||
from atst.routes.requests import requests_bp
|
from atst.routes.requests import requests_bp
|
||||||
from atst.routes.dev import bp as dev_routes
|
from atst.routes.dev import bp as dev_routes
|
||||||
from atst.routes.errors import make_error_pages
|
from atst.routes.errors import make_error_pages
|
||||||
from atst.domain.authnid.crl import Validator
|
from atst.domain.authnid.crl import CRLCache
|
||||||
from atst.domain.auth import apply_authentication
|
from atst.domain.auth import apply_authentication
|
||||||
|
|
||||||
|
|
||||||
@ -141,7 +141,5 @@ def make_crl_validator(app):
|
|||||||
crl_locations = []
|
crl_locations = []
|
||||||
for filename in pathlib.Path(app.config["CRL_DIRECTORY"]).glob("*"):
|
for filename in pathlib.Path(app.config["CRL_DIRECTORY"]).glob("*"):
|
||||||
crl_locations.append(filename.absolute())
|
crl_locations.append(filename.absolute())
|
||||||
app.crl_validator = Validator(
|
app.crl_cache = CRLCache(app.config["CA_CHAIN"], crl_locations)
|
||||||
roots=[app.config["CA_CHAIN"]], crl_locations=crl_locations, logger=app.logger
|
|
||||||
)
|
|
||||||
|
|
||||||
|
@ -1,17 +1,18 @@
|
|||||||
from atst.domain.exceptions import UnauthenticatedError, NotFoundError
|
from atst.domain.exceptions import UnauthenticatedError, NotFoundError
|
||||||
from atst.domain.users import Users
|
from atst.domain.users import Users
|
||||||
from .utils import parse_sdn, email_from_certificate
|
from .utils import parse_sdn, email_from_certificate
|
||||||
|
from .crl import CRLRevocationException
|
||||||
|
|
||||||
|
|
||||||
class AuthenticationContext():
|
class AuthenticationContext():
|
||||||
|
|
||||||
def __init__(self, crl_validator, auth_status, sdn, cert):
|
def __init__(self, crl_cache, auth_status, sdn, cert):
|
||||||
if None in locals().values():
|
if None in locals().values():
|
||||||
raise UnauthenticatedError(
|
raise UnauthenticatedError(
|
||||||
"Missing required authentication context components"
|
"Missing required authentication context components"
|
||||||
)
|
)
|
||||||
|
|
||||||
self.crl_validator = crl_validator
|
self.crl_cache = crl_cache
|
||||||
self.auth_status = auth_status
|
self.auth_status = auth_status
|
||||||
self.sdn = sdn
|
self.sdn = sdn
|
||||||
self.cert = cert.encode()
|
self.cert = cert.encode()
|
||||||
@ -21,8 +22,7 @@ class AuthenticationContext():
|
|||||||
if not self.auth_status == "SUCCESS":
|
if not self.auth_status == "SUCCESS":
|
||||||
raise UnauthenticatedError("SSL/TLS client authentication failed")
|
raise UnauthenticatedError("SSL/TLS client authentication failed")
|
||||||
|
|
||||||
elif not self._crl_check():
|
self._crl_check()
|
||||||
raise UnauthenticatedError("Client certificate failed CRL check")
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -44,12 +44,10 @@ class AuthenticationContext():
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def _crl_check(self):
|
def _crl_check(self):
|
||||||
if self.cert:
|
try:
|
||||||
result = self.crl_validator.validate(self.cert)
|
self.crl_cache.crl_check(self.cert)
|
||||||
return result
|
except CRLRevocationException as exc:
|
||||||
|
raise UnauthenticatedError("CRL check failed. " + str(exc))
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def parsed_sdn(self):
|
def parsed_sdn(self):
|
||||||
|
@ -5,124 +5,88 @@ import hashlib
|
|||||||
from OpenSSL import crypto, SSL
|
from OpenSSL import crypto, SSL
|
||||||
|
|
||||||
|
|
||||||
def sha256_checksum(filename, block_size=65536):
|
class CRLRevocationException(Exception):
|
||||||
sha256 = hashlib.sha256()
|
pass
|
||||||
with open(filename, "rb") as f:
|
|
||||||
for block in iter(lambda: f.read(block_size), b""):
|
|
||||||
sha256.update(block)
|
|
||||||
return sha256.hexdigest()
|
|
||||||
|
|
||||||
|
|
||||||
class Validator:
|
class CRLCache():
|
||||||
|
|
||||||
_PEM_RE = re.compile(
|
_PEM_RE = re.compile(
|
||||||
b"-----BEGIN CERTIFICATE-----\r?.+?\r?-----END CERTIFICATE-----\r?\n?",
|
b"-----BEGIN CERTIFICATE-----\r?.+?\r?-----END CERTIFICATE-----\r?\n?",
|
||||||
re.DOTALL,
|
re.DOTALL,
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, crl_locations=[], roots=[], base_store=crypto.X509Store, logger=None):
|
def __init__(self, root_location, crl_locations=[], store_class=crypto.X509Store):
|
||||||
self.crl_locations = crl_locations
|
self.store_class = store_class
|
||||||
self.roots = roots
|
self.certificate_authorities = {}
|
||||||
self.base_store = base_store
|
self._load_roots(root_location)
|
||||||
self.logger = logger
|
self._build_crl_cache(crl_locations)
|
||||||
self._reset()
|
|
||||||
|
|
||||||
def _reset(self):
|
def _get_store(self, cert):
|
||||||
self.cache = {}
|
return self._build_store(cert.get_issuer().der())
|
||||||
self.store = self.base_store()
|
|
||||||
self._add_crls(self.crl_locations)
|
|
||||||
self._add_roots(self.roots)
|
|
||||||
self.store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
|
|
||||||
|
|
||||||
def log_error(self, message):
|
def _load_roots(self, root_location):
|
||||||
if self.logger:
|
with open(root_location, "rb") as f:
|
||||||
self.logger.error(message)
|
for raw_ca in self._parse_roots(f.read()):
|
||||||
|
ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca)
|
||||||
def _add_crls(self, locations):
|
self.certificate_authorities[ca.get_subject().der()] = ca
|
||||||
for filename in locations:
|
|
||||||
try:
|
|
||||||
self._add_crl(filename)
|
|
||||||
except crypto.Error as err:
|
|
||||||
self.log_error(
|
|
||||||
"CRL could not be parsed. Filename: {}, Error: {}, args: {}".format(
|
|
||||||
filename, type(err), err.args
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# This caches the CRL issuer with the CRL filepath and a checksum, in addition to adding the CRL to the store.
|
|
||||||
|
|
||||||
def _add_crl(self, filename):
|
|
||||||
with open(filename, "rb") as crl_file:
|
|
||||||
crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
|
||||||
self.cache[crl.get_issuer().der()] = (filename, sha256_checksum(filename))
|
|
||||||
self._add_carefully("add_crl", crl)
|
|
||||||
|
|
||||||
def _parse_roots(self, root_str):
|
def _parse_roots(self, root_str):
|
||||||
return [match.group(0) for match in self._PEM_RE.finditer(root_str)]
|
return [match.group(0) for match in self._PEM_RE.finditer(root_str)]
|
||||||
|
|
||||||
def _add_roots(self, roots):
|
def _build_crl_cache(self, crl_locations):
|
||||||
for filename in roots:
|
self.crl_cache = {}
|
||||||
with open(filename, "rb") as f:
|
for crl_location in crl_locations:
|
||||||
for raw_ca in self._parse_roots(f.read()):
|
crl = self._load_crl(crl_location)
|
||||||
ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca)
|
self.crl_cache[crl.get_issuer().der()] = crl_location
|
||||||
self._add_carefully("add_cert", ca)
|
|
||||||
|
|
||||||
# in testing, it seems that openssl is maintaining a local cache of certs
|
def _load_crl(self, crl_location):
|
||||||
# in a hash table and throws errors if you try to add redundant certs or
|
with open(crl_location, "rb") as crl_file:
|
||||||
# CRLs. For now, we catch and ignore that error with great specificity.
|
return crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
||||||
|
|
||||||
def _add_carefully(self, method_name, obj):
|
def _build_store(self, issuer):
|
||||||
try:
|
store = self.store_class()
|
||||||
getattr(self.store, method_name)(obj)
|
store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
|
||||||
except crypto.Error as error:
|
crl_location = self._get_crl_location(issuer)
|
||||||
if self._is_preloaded_error(error):
|
with open(crl_location, "rb") as crl_file:
|
||||||
pass
|
crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
||||||
else:
|
store.add_crl(crl)
|
||||||
raise error
|
store = self._add_certificate_chain_to_store(store, crl.get_issuer())
|
||||||
|
return store
|
||||||
|
|
||||||
PRELOADED_CRL = (
|
def _get_crl_location(self, issuer):
|
||||||
[
|
crl_location = self.crl_cache.get(issuer)
|
||||||
(
|
|
||||||
"x509 certificate routines",
|
|
||||||
"X509_STORE_add_crl",
|
|
||||||
"cert already in hash table",
|
|
||||||
)
|
|
||||||
],
|
|
||||||
)
|
|
||||||
PRELOADED_CERT = (
|
|
||||||
[
|
|
||||||
(
|
|
||||||
"x509 certificate routines",
|
|
||||||
"X509_STORE_add_cert",
|
|
||||||
"cert already in hash table",
|
|
||||||
)
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
def _is_preloaded_error(self, error):
|
if not crl_location:
|
||||||
return error.args == self.PRELOADED_CRL or error.args == self.PRELOADED_CERT
|
raise CRLRevocationException("Could not find matching CRL for issuer")
|
||||||
|
|
||||||
# Checks that the CRL currently in-memory is up-to-date via the checksum.
|
return crl_location
|
||||||
|
|
||||||
def refresh_cache(self, cert):
|
# this _should_ happen just twice for the DoD PKI (intermediary, root) but
|
||||||
der = cert.get_issuer().der()
|
# theoretically it can build a longer certificate chain
|
||||||
if der in self.cache:
|
|
||||||
filename, checksum = self.cache[der]
|
|
||||||
if sha256_checksum(filename) != checksum:
|
|
||||||
self._reset()
|
|
||||||
|
|
||||||
def validate(self, cert):
|
def _add_certificate_chain_to_store(self, store, issuer):
|
||||||
|
ca = self.certificate_authorities.get(issuer.der())
|
||||||
|
store.add_cert(ca)
|
||||||
|
|
||||||
|
if issuer == ca.get_subject():
|
||||||
|
# i.e., it is the root CA and we are at the end of the chain
|
||||||
|
return store
|
||||||
|
|
||||||
|
else:
|
||||||
|
return self._add_certificate_chain_to_store(store, ca.get_issuer())
|
||||||
|
|
||||||
|
def crl_check(self, cert):
|
||||||
parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
|
parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
|
||||||
self.refresh_cache(parsed)
|
store = self._get_store(parsed)
|
||||||
context = crypto.X509StoreContext(self.store, parsed)
|
context = crypto.X509StoreContext(store, parsed)
|
||||||
try:
|
try:
|
||||||
context.verify_certificate()
|
context.verify_certificate()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except crypto.X509StoreContextError as err:
|
except crypto.X509StoreContextError as err:
|
||||||
self.log_error(
|
raise CRLRevocationException(
|
||||||
"Certificate revoked or errored. Error: {}. Args: {}".format(
|
"Certificate revoked or errored. Error: {}. Args: {}".format(
|
||||||
type(err), err.args
|
type(err), err.args
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return False
|
|
||||||
|
@ -32,7 +32,7 @@ def catch_all(path):
|
|||||||
|
|
||||||
def _make_authentication_context():
|
def _make_authentication_context():
|
||||||
return AuthenticationContext(
|
return AuthenticationContext(
|
||||||
crl_validator=app.crl_validator,
|
crl_cache=app.crl_cache,
|
||||||
auth_status=request.environ.get("HTTP_X_SSL_CLIENT_VERIFY"),
|
auth_status=request.environ.get("HTTP_X_SSL_CLIENT_VERIFY"),
|
||||||
sdn=request.environ.get("HTTP_X_SSL_CLIENT_S_DN"),
|
sdn=request.environ.get("HTTP_X_SSL_CLIENT_S_DN"),
|
||||||
cert=request.environ.get("HTTP_X_SSL_CLIENT_CERT")
|
cert=request.environ.get("HTTP_X_SSL_CLIENT_CERT")
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from atst.domain.authnid import AuthenticationContext
|
from atst.domain.authnid import AuthenticationContext
|
||||||
|
from atst.domain.authnid.crl import CRLCache, CRLRevocationException
|
||||||
from atst.domain.exceptions import UnauthenticatedError, NotFoundError
|
from atst.domain.exceptions import UnauthenticatedError, NotFoundError
|
||||||
from atst.domain.users import Users
|
from atst.domain.users import Users
|
||||||
|
|
||||||
@ -10,25 +11,27 @@ from tests.factories import UserFactory
|
|||||||
CERT = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read()
|
CERT = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read()
|
||||||
|
|
||||||
|
|
||||||
class MockCRLValidator():
|
class MockCRLCache():
|
||||||
|
def __init__(self, valid=True):
|
||||||
|
self.valid = valid
|
||||||
|
|
||||||
def __init__(self, value):
|
def crl_check(self, cert):
|
||||||
self.value = value
|
if self.valid:
|
||||||
|
return True
|
||||||
|
|
||||||
def validate(self, cert):
|
raise CRLRevocationException()
|
||||||
return self.value
|
|
||||||
|
|
||||||
|
|
||||||
def test_can_authenticate():
|
def test_can_authenticate():
|
||||||
auth_context = AuthenticationContext(
|
auth_context = AuthenticationContext(
|
||||||
MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT
|
MockCRLCache(), "SUCCESS", DOD_SDN, CERT
|
||||||
)
|
)
|
||||||
assert auth_context.authenticate()
|
assert auth_context.authenticate()
|
||||||
|
|
||||||
|
|
||||||
def test_unsuccessful_status():
|
def test_unsuccessful_status():
|
||||||
auth_context = AuthenticationContext(
|
auth_context = AuthenticationContext(
|
||||||
MockCRLValidator(True), "FAILURE", DOD_SDN, CERT
|
MockCRLCache(), "FAILURE", DOD_SDN, CERT
|
||||||
)
|
)
|
||||||
with pytest.raises(UnauthenticatedError) as excinfo:
|
with pytest.raises(UnauthenticatedError) as excinfo:
|
||||||
assert auth_context.authenticate()
|
assert auth_context.authenticate()
|
||||||
@ -39,7 +42,7 @@ def test_unsuccessful_status():
|
|||||||
|
|
||||||
def test_crl_check_fails():
|
def test_crl_check_fails():
|
||||||
auth_context = AuthenticationContext(
|
auth_context = AuthenticationContext(
|
||||||
MockCRLValidator(False), "SUCCESS", DOD_SDN, CERT
|
MockCRLCache(False), "SUCCESS", DOD_SDN, CERT
|
||||||
)
|
)
|
||||||
with pytest.raises(UnauthenticatedError) as excinfo:
|
with pytest.raises(UnauthenticatedError) as excinfo:
|
||||||
assert auth_context.authenticate()
|
assert auth_context.authenticate()
|
||||||
@ -50,7 +53,7 @@ def test_crl_check_fails():
|
|||||||
|
|
||||||
def test_bad_sdn():
|
def test_bad_sdn():
|
||||||
auth_context = AuthenticationContext(
|
auth_context = AuthenticationContext(
|
||||||
MockCRLValidator(True), "SUCCESS", "abc123", CERT
|
MockCRLCache(), "SUCCESS", "abc123", CERT
|
||||||
)
|
)
|
||||||
with pytest.raises(UnauthenticatedError) as excinfo:
|
with pytest.raises(UnauthenticatedError) as excinfo:
|
||||||
auth_context.get_user()
|
auth_context.get_user()
|
||||||
@ -62,7 +65,7 @@ def test_bad_sdn():
|
|||||||
def test_user_exists():
|
def test_user_exists():
|
||||||
user = UserFactory.create(**DOD_SDN_INFO)
|
user = UserFactory.create(**DOD_SDN_INFO)
|
||||||
auth_context = AuthenticationContext(
|
auth_context = AuthenticationContext(
|
||||||
MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT
|
MockCRLCache(), "SUCCESS", DOD_SDN, CERT
|
||||||
)
|
)
|
||||||
auth_user = auth_context.get_user()
|
auth_user = auth_context.get_user()
|
||||||
|
|
||||||
@ -75,7 +78,7 @@ def test_creates_user():
|
|||||||
Users.get_by_dod_id(DOD_SDN_INFO["dod_id"])
|
Users.get_by_dod_id(DOD_SDN_INFO["dod_id"])
|
||||||
|
|
||||||
auth_context = AuthenticationContext(
|
auth_context = AuthenticationContext(
|
||||||
MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT
|
MockCRLCache(), "SUCCESS", DOD_SDN, CERT
|
||||||
)
|
)
|
||||||
user = auth_context.get_user()
|
user = auth_context.get_user()
|
||||||
assert user.dod_id == DOD_SDN_INFO["dod_id"]
|
assert user.dod_id == DOD_SDN_INFO["dod_id"]
|
||||||
@ -85,7 +88,7 @@ def test_creates_user():
|
|||||||
def test_user_cert_has_no_email():
|
def test_user_cert_has_no_email():
|
||||||
cert = open("ssl/client-certs/atat.mil.crt").read()
|
cert = open("ssl/client-certs/atat.mil.crt").read()
|
||||||
auth_context = AuthenticationContext(
|
auth_context = AuthenticationContext(
|
||||||
MockCRLValidator(True), "SUCCESS", DOD_SDN, cert
|
MockCRLCache(), "SUCCESS", DOD_SDN, cert
|
||||||
)
|
)
|
||||||
user = auth_context.get_user()
|
user = auth_context.get_user()
|
||||||
|
|
||||||
|
@ -4,11 +4,15 @@ import re
|
|||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
from OpenSSL import crypto, SSL
|
from OpenSSL import crypto, SSL
|
||||||
from atst.domain.authnid.crl import Validator
|
|
||||||
|
from atst.domain.authnid.crl import CRLCache, CRLRevocationException
|
||||||
import atst.domain.authnid.crl.util as util
|
import atst.domain.authnid.crl.util as util
|
||||||
|
|
||||||
|
from tests.mocks import FIXTURE_EMAIL_ADDRESS
|
||||||
|
|
||||||
|
|
||||||
class MockX509Store():
|
class MockX509Store():
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.crls = []
|
self.crls = []
|
||||||
self.certs = []
|
self.certs = []
|
||||||
@ -22,50 +26,71 @@ class MockX509Store():
|
|||||||
def set_flags(self, flag):
|
def set_flags(self, flag):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def test_can_build_crl_list(monkeypatch):
|
def test_can_build_crl_list(monkeypatch):
|
||||||
location = 'ssl/client-certs/client-ca.der.crl'
|
location = "ssl/client-certs/client-ca.der.crl"
|
||||||
validator = Validator(crl_locations=[location], base_store=MockX509Store)
|
cache = CRLCache(
|
||||||
assert len(validator.store.crls) == 1
|
"ssl/client-certs/client-ca.crt",
|
||||||
|
crl_locations=[location],
|
||||||
|
store_class=MockX509Store,
|
||||||
|
)
|
||||||
|
assert len(cache.crl_cache.keys()) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_can_build_trusted_root_list():
|
def test_can_build_trusted_root_list():
|
||||||
location = 'ssl/server-certs/ca-chain.pem'
|
location = "ssl/server-certs/ca-chain.pem"
|
||||||
validator = Validator(roots=[location], base_store=MockX509Store)
|
cache = CRLCache(
|
||||||
|
root_location=location, crl_locations=[], store_class=MockX509Store
|
||||||
|
)
|
||||||
with open(location) as f:
|
with open(location) as f:
|
||||||
content = f.read()
|
content = f.read()
|
||||||
assert len(validator.store.certs) == content.count('BEGIN CERT')
|
assert len(cache.certificate_authorities.keys()) == content.count("BEGIN CERT")
|
||||||
|
|
||||||
|
|
||||||
def test_can_validate_certificate():
|
def test_can_validate_certificate():
|
||||||
validator = Validator(
|
cache = CRLCache(
|
||||||
roots=['ssl/server-certs/ca-chain.pem'],
|
"ssl/server-certs/ca-chain.pem",
|
||||||
crl_locations=['ssl/client-certs/client-ca.der.crl']
|
crl_locations=["ssl/client-certs/client-ca.der.crl"],
|
||||||
)
|
)
|
||||||
good_cert = open('ssl/client-certs/atat.mil.crt', 'rb').read()
|
good_cert = open("ssl/client-certs/atat.mil.crt", "rb").read()
|
||||||
bad_cert = open('ssl/client-certs/bad-atat.mil.crt', 'rb').read()
|
bad_cert = open("ssl/client-certs/bad-atat.mil.crt", "rb").read()
|
||||||
assert validator.validate(good_cert)
|
assert cache.crl_check(good_cert)
|
||||||
assert validator.validate(bad_cert) == False
|
with pytest.raises(CRLRevocationException):
|
||||||
|
cache.crl_check(bad_cert)
|
||||||
|
|
||||||
|
|
||||||
def test_can_dynamically_update_crls(tmpdir):
|
def test_can_dynamically_update_crls(tmpdir):
|
||||||
crl_file = tmpdir.join('test.crl')
|
crl_file = tmpdir.join("test.crl")
|
||||||
shutil.copyfile('ssl/client-certs/client-ca.der.crl', crl_file)
|
shutil.copyfile("ssl/client-certs/client-ca.der.crl", crl_file)
|
||||||
validator = Validator(
|
cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[crl_file])
|
||||||
roots=['ssl/server-certs/ca-chain.pem'],
|
cert = open("ssl/client-certs/atat.mil.crt", "rb").read()
|
||||||
crl_locations=[crl_file]
|
assert cache.crl_check(cert)
|
||||||
)
|
|
||||||
cert = open('ssl/client-certs/atat.mil.crt', 'rb').read()
|
|
||||||
assert validator.validate(cert)
|
|
||||||
# override the original CRL with one that revokes atat.mil.crt
|
# override the original CRL with one that revokes atat.mil.crt
|
||||||
shutil.copyfile('tests/fixtures/test.der.crl', crl_file)
|
shutil.copyfile("tests/fixtures/test.der.crl", crl_file)
|
||||||
assert validator.validate(cert) == False
|
with pytest.raises(CRLRevocationException):
|
||||||
|
assert cache.crl_check(cert)
|
||||||
|
|
||||||
|
|
||||||
|
def test_throws_error_for_missing_issuer():
|
||||||
|
cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[])
|
||||||
|
cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read()
|
||||||
|
with pytest.raises(CRLRevocationException) as exc:
|
||||||
|
assert cache.crl_check(cert)
|
||||||
|
(message,) = exc.value.args
|
||||||
|
assert "issuer" in message
|
||||||
|
|
||||||
|
|
||||||
def test_parse_disa_pki_list():
|
def test_parse_disa_pki_list():
|
||||||
with open('tests/fixtures/disa-pki.html') as disa:
|
with open("tests/fixtures/disa-pki.html") as disa:
|
||||||
disa_html = disa.read()
|
disa_html = disa.read()
|
||||||
crl_list = util.crl_list_from_disa_html(disa_html)
|
crl_list = util.crl_list_from_disa_html(disa_html)
|
||||||
href_matches = re.findall('DOD(ROOT|EMAIL|ID)?CA', disa_html)
|
href_matches = re.findall("DOD(ROOT|EMAIL|ID)?CA", disa_html)
|
||||||
assert len(crl_list) > 0
|
assert len(crl_list) > 0
|
||||||
assert len(crl_list) == len(href_matches)
|
assert len(crl_list) == len(href_matches)
|
||||||
|
|
||||||
|
|
||||||
class MockStreamingResponse():
|
class MockStreamingResponse():
|
||||||
|
|
||||||
def __init__(self, content_chunks, code=200):
|
def __init__(self, content_chunks, code=200):
|
||||||
self.content_chunks = content_chunks
|
self.content_chunks = content_chunks
|
||||||
self.status_code = code
|
self.status_code = code
|
||||||
@ -79,13 +104,19 @@ class MockStreamingResponse():
|
|||||||
def __exit__(self, *args):
|
def __exit__(self, *args):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def test_write_crl(tmpdir, monkeypatch):
|
def test_write_crl(tmpdir, monkeypatch):
|
||||||
monkeypatch.setattr('requests.get', lambda u, **kwargs: MockStreamingResponse([b'it worked']))
|
monkeypatch.setattr(
|
||||||
crl = 'crl_1'
|
"requests.get", lambda u, **kwargs: MockStreamingResponse([b"it worked"])
|
||||||
|
)
|
||||||
|
crl = "crl_1"
|
||||||
assert util.write_crl(tmpdir, "random_target_dir", crl)
|
assert util.write_crl(tmpdir, "random_target_dir", crl)
|
||||||
assert [p.basename for p in tmpdir.listdir()] == [crl]
|
assert [p.basename for p in tmpdir.listdir()] == [crl]
|
||||||
assert [p.read() for p in tmpdir.listdir()] == ['it worked']
|
assert [p.read() for p in tmpdir.listdir()] == ["it worked"]
|
||||||
|
|
||||||
|
|
||||||
def test_skips_crl_if_it_has_not_been_modified(tmpdir, monkeypatch):
|
def test_skips_crl_if_it_has_not_been_modified(tmpdir, monkeypatch):
|
||||||
monkeypatch.setattr('requests.get', lambda u, **kwargs: MockStreamingResponse([b'it worked'], 304))
|
monkeypatch.setattr(
|
||||||
assert not util.write_crl(tmpdir, "random_target_dir", 'crl_file_name')
|
"requests.get", lambda u, **kwargs: MockStreamingResponse([b"it worked"], 304)
|
||||||
|
)
|
||||||
|
assert not util.write_crl(tmpdir, "random_target_dir", "crl_file_name")
|
||||||
|
BIN
tests/fixtures/test.der.crl
vendored
BIN
tests/fixtures/test.der.crl
vendored
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user