make crl_check a CRLCache method

This commit is contained in:
dandds 2018-08-17 11:13:01 -04:00 committed by luis cielak
parent f5cc9daee9
commit 7fb407a3b1
4 changed files with 41 additions and 44 deletions

View File

@ -1,7 +1,7 @@
from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.exceptions import UnauthenticatedError, NotFoundError
from atst.domain.users import Users from atst.domain.users import Users
from .utils import parse_sdn, email_from_certificate from .utils import parse_sdn, email_from_certificate
from .crl import crl_check, CRLRevocationException from .crl import CRLRevocationException
class AuthenticationContext(): class AuthenticationContext():
@ -45,7 +45,7 @@ class AuthenticationContext():
def _crl_check(self): def _crl_check(self):
try: try:
crl_check(self.crl_cache, self.cert) self.crl_cache.crl_check(self.cert)
except CRLRevocationException as exc: except CRLRevocationException as exc:
raise UnauthenticatedError("CRL check failed. " + str(exc)) raise UnauthenticatedError("CRL check failed. " + str(exc))

View File

@ -9,22 +9,6 @@ class CRLRevocationException(Exception):
pass pass
def crl_check(cache, cert):
parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
store = cache.get_store(parsed)
context = crypto.X509StoreContext(store, parsed)
try:
context.verify_certificate()
return True
except crypto.X509StoreContextError as err:
raise CRLRevocationException(
"Certificate revoked or errored. Error: {}. Args: {}".format(
type(err), err.args
)
)
class CRLCache(): class CRLCache():
_PEM_RE = re.compile( _PEM_RE = re.compile(
@ -38,7 +22,7 @@ class CRLCache():
self._load_roots(root_location) self._load_roots(root_location)
self._build_crl_cache(crl_locations) self._build_crl_cache(crl_locations)
def get_store(self, cert): def _get_store(self, cert):
return self._build_store(cert.get_issuer().der()) return self._build_store(cert.get_issuer().der())
def _load_roots(self, root_location): def _load_roots(self, root_location):
@ -91,3 +75,18 @@ class CRLCache():
else: else:
return self._add_certificate_chain_to_store(store, ca.get_issuer()) return self._add_certificate_chain_to_store(store, ca.get_issuer())
def crl_check(self, cert):
parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
store = self._get_store(parsed)
context = crypto.X509StoreContext(store, parsed)
try:
context.verify_certificate()
return True
except crypto.X509StoreContextError as err:
raise CRLRevocationException(
"Certificate revoked or errored. Error: {}. Args: {}".format(
type(err), err.args
)
)

View File

@ -1,7 +1,7 @@
import pytest import pytest
from atst.domain.authnid import AuthenticationContext from atst.domain.authnid import AuthenticationContext
from atst.domain.authnid.crl import CRLCache from atst.domain.authnid.crl import CRLCache, CRLRevocationException
from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.exceptions import UnauthenticatedError, NotFoundError
from atst.domain.users import Users from atst.domain.users import Users
@ -12,20 +12,24 @@ CERT = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read()
class MockCRLCache(): class MockCRLCache():
def get_store(self, cert): def __init__(self, valid=True):
pass self.valid = valid
def crl_check(self, cert):
if self.valid:
return True
raise CRLRevocationException()
def test_can_authenticate(monkeypatch): def test_can_authenticate():
monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
MockCRLCache(), "SUCCESS", DOD_SDN, CERT MockCRLCache(), "SUCCESS", DOD_SDN, CERT
) )
assert auth_context.authenticate() assert auth_context.authenticate()
def test_unsuccessful_status(monkeypatch): def test_unsuccessful_status():
monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
MockCRLCache(), "FAILURE", DOD_SDN, CERT MockCRLCache(), "FAILURE", DOD_SDN, CERT
) )
@ -36,11 +40,9 @@ def test_unsuccessful_status(monkeypatch):
assert "client authentication" in message assert "client authentication" in message
def test_crl_check_fails(monkeypatch): def test_crl_check_fails():
cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=['ssl/client-certs/client-ca.der.crl'])
cert = open("ssl/client-certs/bad-atat.mil.crt", "r").read()
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
cache, "SUCCESS", DOD_SDN, cert MockCRLCache(False), "SUCCESS", DOD_SDN, CERT
) )
with pytest.raises(UnauthenticatedError) as excinfo: with pytest.raises(UnauthenticatedError) as excinfo:
assert auth_context.authenticate() assert auth_context.authenticate()
@ -49,8 +51,7 @@ def test_crl_check_fails(monkeypatch):
assert "CRL check" in message assert "CRL check" in message
def test_bad_sdn(monkeypatch): def test_bad_sdn():
monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
MockCRLCache(), "SUCCESS", "abc123", CERT MockCRLCache(), "SUCCESS", "abc123", CERT
) )
@ -61,8 +62,7 @@ def test_bad_sdn(monkeypatch):
assert "SDN" in message assert "SDN" in message
def test_user_exists(monkeypatch): def test_user_exists():
monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
user = UserFactory.create(**DOD_SDN_INFO) user = UserFactory.create(**DOD_SDN_INFO)
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
MockCRLCache(), "SUCCESS", DOD_SDN, CERT MockCRLCache(), "SUCCESS", DOD_SDN, CERT
@ -72,8 +72,7 @@ def test_user_exists(monkeypatch):
assert auth_user == user assert auth_user == user
def test_creates_user(monkeypatch): def test_creates_user():
monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
# check user does not exist # check user does not exist
with pytest.raises(NotFoundError): with pytest.raises(NotFoundError):
Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) Users.get_by_dod_id(DOD_SDN_INFO["dod_id"])
@ -86,8 +85,7 @@ def test_creates_user(monkeypatch):
assert user.email == FIXTURE_EMAIL_ADDRESS assert user.email == FIXTURE_EMAIL_ADDRESS
def test_user_cert_has_no_email(monkeypatch): def test_user_cert_has_no_email():
monkeypatch.setattr("atst.domain.authnid.crl_check", lambda *args: True)
cert = open("ssl/client-certs/atat.mil.crt").read() cert = open("ssl/client-certs/atat.mil.crt").read()
auth_context = AuthenticationContext( auth_context = AuthenticationContext(
MockCRLCache(), "SUCCESS", DOD_SDN, cert MockCRLCache(), "SUCCESS", DOD_SDN, cert

View File

@ -5,7 +5,7 @@ import os
import shutil import shutil
from OpenSSL import crypto, SSL from OpenSSL import crypto, SSL
from atst.domain.authnid.crl import crl_check, CRLCache, CRLRevocationException from atst.domain.authnid.crl import CRLCache, CRLRevocationException
import atst.domain.authnid.crl.util as util import atst.domain.authnid.crl.util as util
from tests.mocks import FIXTURE_EMAIL_ADDRESS from tests.mocks import FIXTURE_EMAIL_ADDRESS
@ -54,9 +54,9 @@ def test_can_validate_certificate():
) )
good_cert = open("ssl/client-certs/atat.mil.crt", "rb").read() good_cert = open("ssl/client-certs/atat.mil.crt", "rb").read()
bad_cert = open("ssl/client-certs/bad-atat.mil.crt", "rb").read() bad_cert = open("ssl/client-certs/bad-atat.mil.crt", "rb").read()
assert crl_check(cache, good_cert) assert cache.crl_check(good_cert)
with pytest.raises(CRLRevocationException): with pytest.raises(CRLRevocationException):
crl_check(cache, bad_cert) cache.crl_check(bad_cert)
def test_can_dynamically_update_crls(tmpdir): def test_can_dynamically_update_crls(tmpdir):
@ -64,18 +64,18 @@ def test_can_dynamically_update_crls(tmpdir):
shutil.copyfile("ssl/client-certs/client-ca.der.crl", crl_file) shutil.copyfile("ssl/client-certs/client-ca.der.crl", crl_file)
cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[crl_file]) cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[crl_file])
cert = open("ssl/client-certs/atat.mil.crt", "rb").read() cert = open("ssl/client-certs/atat.mil.crt", "rb").read()
assert crl_check(cache, cert) assert cache.crl_check(cert)
# override the original CRL with one that revokes atat.mil.crt # override the original CRL with one that revokes atat.mil.crt
shutil.copyfile("tests/fixtures/test.der.crl", crl_file) shutil.copyfile("tests/fixtures/test.der.crl", crl_file)
with pytest.raises(CRLRevocationException): with pytest.raises(CRLRevocationException):
assert crl_check(cache, cert) assert cache.crl_check(cert)
def test_throws_error_for_missing_issuer(): def test_throws_error_for_missing_issuer():
cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[]) cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[])
cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read() cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read()
with pytest.raises(CRLRevocationException) as exc: with pytest.raises(CRLRevocationException) as exc:
assert crl_check(cache, cert) assert cache.crl_check(cert)
(message,) = exc.value.args (message,) = exc.value.args
assert "issuer" in message assert "issuer" in message