more straightforward crl check function

This commit is contained in:
dandds 2018-08-16 14:45:46 -04:00
parent 2db84fb19a
commit e931560dc6
4 changed files with 41 additions and 129 deletions

View File

@ -1,7 +1,7 @@
from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.exceptions import UnauthenticatedError, NotFoundError
from atst.domain.users import Users from atst.domain.users import Users
from .utils import parse_sdn, email_from_certificate from .utils import parse_sdn, email_from_certificate
from .crl import Validator from .crl import crl_check, CRLException
class AuthenticationContext(): class AuthenticationContext():
@ -22,8 +22,7 @@ class AuthenticationContext():
if not self.auth_status == "SUCCESS": if not self.auth_status == "SUCCESS":
raise UnauthenticatedError("SSL/TLS client authentication failed") raise UnauthenticatedError("SSL/TLS client authentication failed")
elif not self._crl_check(): self._crl_check()
raise UnauthenticatedError("Client certificate failed CRL check")
return True return True
@ -45,13 +44,10 @@ class AuthenticationContext():
return None return None
def _crl_check(self): def _crl_check(self):
validator = Validator(self.crl_cache, self.cert) try:
if self.cert: crl_check(self.crl_cache, self.cert)
result = validator.validate() except CRLException as exc:
return result raise UnauthenticatedError("CRL check failed. " + str(exc))
else:
return False
@property @property
def parsed_sdn(self): def parsed_sdn(self):

View File

@ -4,6 +4,8 @@ import re
import hashlib import hashlib
from OpenSSL import crypto, SSL from OpenSSL import crypto, SSL
class CRLException(Exception):
pass
def sha256_checksum(filename, block_size=65536): def sha256_checksum(filename, block_size=65536):
sha256 = hashlib.sha256() sha256 = hashlib.sha256()
@ -13,6 +15,22 @@ def sha256_checksum(filename, block_size=65536):
return sha256.hexdigest() return sha256.hexdigest()
def crl_check(cache, cert):
parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
store = cache.get_store(parsed)
context = crypto.X509StoreContext(store, parsed)
try:
context.verify_certificate()
return True
except crypto.X509StoreContextError as err:
raise CRLException(
"Certificate revoked or errored. Error: {}. Args: {}".format(
type(err), err.args
)
)
class CRLCache(): class CRLCache():
_PEM_RE = re.compile( _PEM_RE = re.compile(
@ -77,109 +95,3 @@ class CRLCache():
else: else:
return self.x509_stores[issuer] return self.x509_stores[issuer]
class Validator:
def __init__(self, cache, cert, logger=None):
self.cache = cache
self.cert = cert
self.logger = logger
def validate(self):
parsed = crypto.load_certificate(crypto.FILETYPE_PEM, self.cert)
store = self.cache.get_store(parsed)
context = crypto.X509StoreContext(store, parsed)
try:
context.verify_certificate()
return True
except crypto.X509StoreContextError as err:
self.log_error(
"Certificate revoked or errored. Error: {}. Args: {}".format(
type(err), err.args
)
)
return False
def _add_roots(self, roots):
with open(filename, "rb") as f:
for raw_ca in self._parse_roots(f.read()):
ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca)
self._add_carefully("add_cert", ca)
def _reset(self):
self.cache = {}
self.store = self.base_store()
self._add_crls(self.crl_locations)
self._add_roots(self.roots)
self.store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
def log_error(self, message):
if self.logger:
self.logger.error(message)
def _add_crls(self, locations):
for filename in locations:
try:
self._add_crl(filename)
except crypto.Error as err:
self.log_error(
"CRL could not be parsed. Filename: {}, Error: {}, args: {}".format(
filename, type(err), err.args
)
)
# This caches the CRL issuer with the CRL filepath and a checksum, in addition to adding the CRL to the store.
def _add_crl(self, filename):
with open(filename, "rb") as crl_file:
crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
self.cache[crl.get_issuer().der()] = (filename, sha256_checksum(filename))
self._add_carefully("add_crl", crl)
def _parse_roots(self, root_str):
return [match.group(0) for match in self._PEM_RE.finditer(root_str)]
def _add_roots(self, roots):
for filename in roots:
with open(filename, "rb") as f:
for raw_ca in self._parse_roots(f.read()):
ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca)
self._add_carefully("add_cert", ca)
# in testing, it seems that openssl is maintaining a local cache of certs
# in a hash table and throws errors if you try to add redundant certs or
# CRLs. For now, we catch and ignore that error with great specificity.
def _add_carefully(self, method_name, obj):
try:
getattr(self.store, method_name)(obj)
except crypto.Error as error:
if self._is_preloaded_error(error):
pass
else:
raise error
PRELOADED_CRL = (
[
(
"x509 certificate routines",
"X509_STORE_add_crl",
"cert already in hash table",
)
],
)
PRELOADED_CERT = (
[
(
"x509 certificate routines",
"X509_STORE_add_cert",
"cert already in hash table",
)
],
)
def _is_preloaded_error(self, error):
return error.args == self.PRELOADED_CRL or error.args == self.PRELOADED_CERT
# Checks that the CRL currently in-memory is up-to-date via the checksum.

View File

@ -1,6 +1,7 @@
import pytest import pytest
from atst.domain.authnid import AuthenticationContext from atst.domain.authnid import AuthenticationContext
from atst.domain.authnid.crl import CRLCache
from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.exceptions import UnauthenticatedError, NotFoundError
from atst.domain.users import Users from atst.domain.users import Users
@ -16,7 +17,7 @@ class MockCRLCache():
def test_can_authenticate(monkeypatch): def test_can_authenticate(monkeypatch):
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
MockCRLCache(), "SUCCESS", DOD_SDN, CERT MockCRLCache(), "SUCCESS", DOD_SDN, CERT
) )
@ -24,7 +25,7 @@ def test_can_authenticate(monkeypatch):
def test_unsuccessful_status(monkeypatch): def test_unsuccessful_status(monkeypatch):
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
MockCRLCache(), "FAILURE", DOD_SDN, CERT MockCRLCache(), "FAILURE", DOD_SDN, CERT
) )
@ -36,9 +37,10 @@ def test_unsuccessful_status(monkeypatch):
def test_crl_check_fails(monkeypatch): def test_crl_check_fails(monkeypatch):
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: False) cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=['ssl/client-certs/client-ca.der.crl'])
cert = open("ssl/client-certs/bad-atat.mil.crt", "r").read()
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
MockCRLCache(), "SUCCESS", DOD_SDN, CERT cache, "SUCCESS", DOD_SDN, cert
) )
with pytest.raises(UnauthenticatedError) as excinfo: with pytest.raises(UnauthenticatedError) as excinfo:
assert auth_context.authenticate() assert auth_context.authenticate()
@ -48,7 +50,7 @@ def test_crl_check_fails(monkeypatch):
def test_bad_sdn(monkeypatch): def test_bad_sdn(monkeypatch):
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
MockCRLCache(), "SUCCESS", "abc123", CERT MockCRLCache(), "SUCCESS", "abc123", CERT
) )
@ -60,7 +62,7 @@ def test_bad_sdn(monkeypatch):
def test_user_exists(monkeypatch): def test_user_exists(monkeypatch):
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
user = UserFactory.create(**DOD_SDN_INFO) user = UserFactory.create(**DOD_SDN_INFO)
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
MockCRLCache(), "SUCCESS", DOD_SDN, CERT MockCRLCache(), "SUCCESS", DOD_SDN, CERT
@ -71,7 +73,7 @@ def test_user_exists(monkeypatch):
def test_creates_user(monkeypatch): def test_creates_user(monkeypatch):
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
# check user does not exist # check user does not exist
with pytest.raises(NotFoundError): with pytest.raises(NotFoundError):
Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) Users.get_by_dod_id(DOD_SDN_INFO["dod_id"])
@ -85,7 +87,7 @@ def test_creates_user(monkeypatch):
def test_user_cert_has_no_email(monkeypatch): def test_user_cert_has_no_email(monkeypatch):
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True) monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
cert = open("ssl/client-certs/atat.mil.crt").read() cert = open("ssl/client-certs/atat.mil.crt").read()
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
MockCRLCache(), "SUCCESS", DOD_SDN, cert MockCRLCache(), "SUCCESS", DOD_SDN, cert

View File

@ -4,7 +4,7 @@ import re
import os import os
import shutil import shutil
from OpenSSL import crypto, SSL from OpenSSL import crypto, SSL
from atst.domain.authnid.crl import Validator, CRLCache from atst.domain.authnid.crl import crl_check, CRLCache, CRLException
import atst.domain.authnid.crl.util as util import atst.domain.authnid.crl.util as util
@ -39,18 +39,20 @@ def test_can_validate_certificate():
cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=['ssl/client-certs/client-ca.der.crl']) cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=['ssl/client-certs/client-ca.der.crl'])
good_cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() good_cert = open('ssl/client-certs/atat.mil.crt', 'rb').read()
bad_cert = open('ssl/client-certs/bad-atat.mil.crt', 'rb').read() bad_cert = open('ssl/client-certs/bad-atat.mil.crt', 'rb').read()
assert Validator(cache, good_cert).validate() assert crl_check(cache, good_cert)
assert Validator(cache, bad_cert).validate() == False with pytest.raises(CRLException):
crl_check(cache, bad_cert)
def test_can_dynamically_update_crls(tmpdir): def test_can_dynamically_update_crls(tmpdir):
crl_file = tmpdir.join('test.crl') crl_file = tmpdir.join('test.crl')
shutil.copyfile('ssl/client-certs/client-ca.der.crl', crl_file) shutil.copyfile('ssl/client-certs/client-ca.der.crl', crl_file)
cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=[crl_file]) cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=[crl_file])
cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() cert = open('ssl/client-certs/atat.mil.crt', 'rb').read()
assert Validator(cache, cert).validate() assert crl_check(cache, cert)
# override the original CRL with one that revokes atat.mil.crt # override the original CRL with one that revokes atat.mil.crt
shutil.copyfile('tests/fixtures/test.der.crl', crl_file) shutil.copyfile('tests/fixtures/test.der.crl', crl_file)
assert Validator(cache, cert).validate() == False with pytest.raises(CRLException):
assert crl_check(cache, cert)
def test_parse_disa_pki_list(): def test_parse_disa_pki_list():
with open('tests/fixtures/disa-pki.html') as disa: with open('tests/fixtures/disa-pki.html') as disa: