Merge pull request #1156 from dod-ccpo/crl-issuer-cache
CRL Issuer Cache
This commit is contained in:
commit
d5bc49a7b9
@ -138,6 +138,28 @@ jobs:
|
|||||||
--network atat \
|
--network atat \
|
||||||
ghostinspector/test-runner-standalone:latest
|
ghostinspector/test-runner-standalone:latest
|
||||||
|
|
||||||
|
test-crl-parser:
|
||||||
|
docker:
|
||||||
|
- image: docker:18.06.0-ce-git
|
||||||
|
- image: circleci/postgres:10-alpine-ram
|
||||||
|
- image: circleci/redis:4-alpine3.8
|
||||||
|
steps:
|
||||||
|
- setup_remote_docker:
|
||||||
|
docker_layer_caching: true
|
||||||
|
version: 18.06.0-ce
|
||||||
|
- restore_docker_image
|
||||||
|
- setup_datastores:
|
||||||
|
pgdatabase: atat_test
|
||||||
|
- run:
|
||||||
|
name: Sync CRLs and run CRL test
|
||||||
|
command: |
|
||||||
|
docker run \
|
||||||
|
-e PGHOST=postgres \
|
||||||
|
-e REDIS_URI=redis://redis:6379 \
|
||||||
|
--network atat \
|
||||||
|
atat:builder \
|
||||||
|
/bin/sh -c "pipenv install --dev && /bin/sh script/sync-crls && pipenv run pytest --no-cov tests/check_crl_parse.py"
|
||||||
|
|
||||||
deploy:
|
deploy:
|
||||||
docker:
|
docker:
|
||||||
- image: docker:18.06.0-ce-git
|
- image: docker:18.06.0-ce-git
|
||||||
@ -206,6 +228,7 @@ jobs:
|
|||||||
kubectl set image deployment.apps/atst atst=${AZURE_SERVER_NAME}/atat:atat-${CIRCLE_SHA1} --namespace=atat
|
kubectl set image deployment.apps/atst atst=${AZURE_SERVER_NAME}/atat:atat-${CIRCLE_SHA1} --namespace=atat
|
||||||
kubectl set image deployment.apps/atst-worker atst-worker=${AZURE_SERVER_NAME}/atat:atat-${CIRCLE_SHA1} --namespace=atat
|
kubectl set image deployment.apps/atst-worker atst-worker=${AZURE_SERVER_NAME}/atat:atat-${CIRCLE_SHA1} --namespace=atat
|
||||||
kubectl set image deployment.apps/atst-beat atst-beat=${AZURE_SERVER_NAME}/atat:atat-${CIRCLE_SHA1} --namespace=atat
|
kubectl set image deployment.apps/atst-beat atst-beat=${AZURE_SERVER_NAME}/atat:atat-${CIRCLE_SHA1} --namespace=atat
|
||||||
|
kubectl set image cronjobs.batch/crls crls=${AZURE_SERVER_NAME}/atat:atat-${CIRCLE_SHA1} --namespace=atat
|
||||||
|
|
||||||
workflows:
|
workflows:
|
||||||
version: 2
|
version: 2
|
||||||
@ -226,3 +249,17 @@ workflows:
|
|||||||
branches:
|
branches:
|
||||||
only:
|
only:
|
||||||
- master
|
- master
|
||||||
|
|
||||||
|
test-crl-parser:
|
||||||
|
triggers:
|
||||||
|
- schedule:
|
||||||
|
cron: "0 4 * * *"
|
||||||
|
filters:
|
||||||
|
branches:
|
||||||
|
only:
|
||||||
|
- master
|
||||||
|
jobs:
|
||||||
|
- docker-build
|
||||||
|
- test-crl-parser:
|
||||||
|
requires:
|
||||||
|
- docker-build
|
||||||
|
1
.gitignore
vendored
1
.gitignore
vendored
@ -39,6 +39,7 @@ config/dev.ini
|
|||||||
/crls
|
/crls
|
||||||
/crl-tmp
|
/crl-tmp
|
||||||
*.bk
|
*.bk
|
||||||
|
crl_locations.json
|
||||||
|
|
||||||
# test CA config
|
# test CA config
|
||||||
ssl/client-certs/*.srl
|
ssl/client-certs/*.srl
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import pathlib
|
|
||||||
from configparser import ConfigParser
|
from configparser import ConfigParser
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from flask import Flask, request, g, session
|
from flask import Flask, request, g, session
|
||||||
@ -247,11 +246,10 @@ def make_crl_validator(app):
|
|||||||
if app.config.get("DISABLE_CRL_CHECK"):
|
if app.config.get("DISABLE_CRL_CHECK"):
|
||||||
app.crl_cache = NoOpCRLCache(logger=app.logger)
|
app.crl_cache = NoOpCRLCache(logger=app.logger)
|
||||||
else:
|
else:
|
||||||
crl_locations = []
|
|
||||||
for filename in pathlib.Path(app.config["CRL_STORAGE_CONTAINER"]).glob("*.crl"):
|
|
||||||
crl_locations.append(filename.absolute())
|
|
||||||
app.crl_cache = CRLCache(
|
app.crl_cache = CRLCache(
|
||||||
app.config["CA_CHAIN"], crl_locations, logger=app.logger
|
app.config["CA_CHAIN"],
|
||||||
|
app.config["CRL_STORAGE_CONTAINER"],
|
||||||
|
logger=app.logger,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,11 +1,13 @@
|
|||||||
import sys
|
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import hashlib
|
import hashlib
|
||||||
import logging
|
import logging
|
||||||
from flask import current_app as app
|
|
||||||
from datetime import datetime
|
|
||||||
from OpenSSL import crypto, SSL
|
from OpenSSL import crypto, SSL
|
||||||
|
from datetime import datetime
|
||||||
|
from flask import current_app as app
|
||||||
|
|
||||||
|
from .util import load_crl_locations_cache, serialize_crl_locations_cache
|
||||||
|
|
||||||
# error codes from OpenSSL: https://github.com/openssl/openssl/blob/2c75f03b39de2fa7d006bc0f0d7c58235a54d9bb/include/openssl/x509_vfy.h#L111
|
# error codes from OpenSSL: https://github.com/openssl/openssl/blob/2c75f03b39de2fa7d006bc0f0d7c58235a54d9bb/include/openssl/x509_vfy.h#L111
|
||||||
CRL_EXPIRED_ERROR_CODE = 12
|
CRL_EXPIRED_ERROR_CODE = 12
|
||||||
@ -70,12 +72,12 @@ class CRLCache(CRLInterface):
|
|||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
root_location,
|
root_location,
|
||||||
crl_locations=[],
|
crl_dir,
|
||||||
store_class=crypto.X509Store,
|
store_class=crypto.X509Store,
|
||||||
logger=None,
|
logger=None,
|
||||||
crl_update_func=None,
|
crl_update_func=None,
|
||||||
):
|
):
|
||||||
self._crl_locations = crl_locations
|
self._crl_dir = crl_dir
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
self.store_class = store_class
|
self.store_class = store_class
|
||||||
self.certificate_authorities = {}
|
self.certificate_authorities = {}
|
||||||
@ -96,16 +98,10 @@ class CRLCache(CRLInterface):
|
|||||||
return [match.group(0) for match in self._PEM_RE.finditer(root_str)]
|
return [match.group(0) for match in self._PEM_RE.finditer(root_str)]
|
||||||
|
|
||||||
def _build_crl_cache(self):
|
def _build_crl_cache(self):
|
||||||
self.crl_cache = {}
|
try:
|
||||||
for crl_location in self._crl_locations:
|
self.crl_cache = load_crl_locations_cache(self._crl_dir)
|
||||||
crl = self._load_crl(crl_location)
|
except FileNotFoundError:
|
||||||
if crl:
|
self.crl_cache = serialize_crl_locations_cache(self._crl_dir)
|
||||||
issuer_der = crl.get_issuer().der()
|
|
||||||
expires = crl.to_cryptography().next_update
|
|
||||||
self.crl_cache[issuer_der] = {
|
|
||||||
"location": crl_location,
|
|
||||||
"expires": expires,
|
|
||||||
}
|
|
||||||
|
|
||||||
def _load_crl(self, crl_location):
|
def _load_crl(self, crl_location):
|
||||||
with open(crl_location, "rb") as crl_file:
|
with open(crl_location, "rb") as crl_file:
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
import re
|
from datetime import datetime
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
|
|
||||||
import pendulum
|
import pendulum
|
||||||
import requests
|
import requests
|
||||||
@ -9,6 +11,10 @@ class CRLNotFoundError(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CRLParseError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
MODIFIED_TIME_BUFFER = 15 * 60
|
MODIFIED_TIME_BUFFER = 15 * 60
|
||||||
|
|
||||||
|
|
||||||
@ -70,6 +76,113 @@ CRL_LIST = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def scan_for_issuer_and_next_update(crl):
|
||||||
|
"""
|
||||||
|
Scans a CRL file byte-by-byte to find the issuer and nextUpdate fields.
|
||||||
|
|
||||||
|
Per RFC 5280, the issuer is the fourth ASN1 sequence element to occur in a
|
||||||
|
DER-encoded CRL file (https://tools.ietf.org/html/rfc5280#section-5.1).
|
||||||
|
This function takes a brute-force approach and scans the file until if find
|
||||||
|
the fourth sequence, then begins collecting that and the following bytes to
|
||||||
|
construct the issuer. It stop doing this when it finds \x17, which begins
|
||||||
|
the thisUpdate field. It then scans for the next UTCTime element (the next
|
||||||
|
occurrence of \x17) and grabs the 13 following bytes. It parses the ASN1
|
||||||
|
UTCTime byte string to derive a datetime object.
|
||||||
|
|
||||||
|
:param crl:
|
||||||
|
The path to a CRL file on-disk.
|
||||||
|
|
||||||
|
:return:
|
||||||
|
A two-element tuple. The first element is the raw DER bytes of the
|
||||||
|
issuer, the second is the parsed Python datetime object for nextUpdate.
|
||||||
|
"""
|
||||||
|
with open(crl, "rb") as f:
|
||||||
|
byte = f.read(1)
|
||||||
|
sequences = 0
|
||||||
|
issuer_finished = False
|
||||||
|
issuer = b""
|
||||||
|
while byte:
|
||||||
|
if not issuer_finished:
|
||||||
|
if byte == b"0" and sequences < 4:
|
||||||
|
sequences += 1
|
||||||
|
|
||||||
|
if byte == b"\x17" and sequences == 4:
|
||||||
|
issuer_finished = True
|
||||||
|
|
||||||
|
if sequences == 4 and not issuer_finished:
|
||||||
|
issuer += byte
|
||||||
|
else:
|
||||||
|
if byte == b"\x17":
|
||||||
|
byte_str = f.read(13)
|
||||||
|
next_update = datetime.strptime(
|
||||||
|
byte_str[1:].decode(), "%y%m%d%H%M%S"
|
||||||
|
)
|
||||||
|
return (issuer, next_update)
|
||||||
|
|
||||||
|
byte = f.read(1)
|
||||||
|
|
||||||
|
raise CRLParseError("CRL could not be scanned.")
|
||||||
|
|
||||||
|
|
||||||
|
def build_crl_locations_cache(crl_locations, logger=None):
|
||||||
|
crl_cache = {}
|
||||||
|
for crl_location in crl_locations:
|
||||||
|
try:
|
||||||
|
issuer_der, next_update = scan_for_issuer_and_next_update(crl_location)
|
||||||
|
crl_cache[issuer_der] = {"location": crl_location, "expires": next_update}
|
||||||
|
except CRLParseError:
|
||||||
|
if logger:
|
||||||
|
logger.warning(
|
||||||
|
"CRL could not be scanned for caching: {}".format(crl_location)
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
return crl_cache
|
||||||
|
|
||||||
|
|
||||||
|
JSON_CACHE = "crl_locations.json"
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize_cache_items(cache):
|
||||||
|
return {
|
||||||
|
der.hex(): {
|
||||||
|
k: v.timestamp() if hasattr(v, "timestamp") else v
|
||||||
|
for (k, v) in data.items()
|
||||||
|
}
|
||||||
|
for (der, data) in cache.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _deserialize_cache_items(cache):
|
||||||
|
return {
|
||||||
|
bytes.fromhex(der): {
|
||||||
|
k: datetime.fromtimestamp(v) if isinstance(v, float) else v
|
||||||
|
for (k, v) in data.items()
|
||||||
|
}
|
||||||
|
for (der, data) in cache.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def serialize_crl_locations_cache(crl_dir, logger=None):
|
||||||
|
crl_locations = [
|
||||||
|
"{}/{}".format(crl_dir, crl_path) for crl_path in os.listdir(crl_dir)
|
||||||
|
]
|
||||||
|
location_cache = build_crl_locations_cache(crl_locations, logger=logger)
|
||||||
|
json_location = "{}/{}".format(crl_dir, JSON_CACHE)
|
||||||
|
with open(json_location, "w") as json_file:
|
||||||
|
json_ready = _serialize_cache_items(location_cache)
|
||||||
|
json.dump(json_ready, json_file)
|
||||||
|
|
||||||
|
return location_cache
|
||||||
|
|
||||||
|
|
||||||
|
def load_crl_locations_cache(crl_dir):
|
||||||
|
json_location = "{}/{}".format(crl_dir, JSON_CACHE)
|
||||||
|
with open(json_location, "r") as json_file:
|
||||||
|
cache = json.load(json_file)
|
||||||
|
return _deserialize_cache_items(cache)
|
||||||
|
|
||||||
|
|
||||||
def crl_local_path(out_dir, crl_location):
|
def crl_local_path(out_dir, crl_location):
|
||||||
name = re.split("/", crl_location)[-1]
|
name = re.split("/", crl_location)[-1]
|
||||||
crl = os.path.join(out_dir, name)
|
crl = os.path.join(out_dir, name)
|
||||||
@ -150,7 +263,10 @@ if __name__ == "__main__":
|
|||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
logger.info("Updating CRLs")
|
logger.info("Updating CRLs")
|
||||||
try:
|
try:
|
||||||
refresh_crls(sys.argv[1], sys.argv[2], logger)
|
tmp_location = sys.argv[1]
|
||||||
|
final_location = sys.argv[2]
|
||||||
|
refresh_crls(tmp_location, final_location, logger)
|
||||||
|
serialize_crl_locations_cache(tmp_location, logger=logger)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.exception("Fatal error encountered, stopping")
|
logger.exception("Fatal error encountered, stopping")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
17
tests/check_crl_parse.py
Normal file
17
tests/check_crl_parse.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
import os
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from atst.domain.authnid.crl.util import scan_for_issuer_and_next_update
|
||||||
|
|
||||||
|
from tests.utils import parse_for_issuer_and_next_update
|
||||||
|
|
||||||
|
|
||||||
|
CRL_DIR = "crls"
|
||||||
|
_CRLS = ["{}/{}".format(CRL_DIR, file_) for file_ in os.listdir(CRL_DIR)]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("crl_path", _CRLS)
|
||||||
|
def test_crl_scan_against_parse(crl_path):
|
||||||
|
parsed_der = parse_for_issuer_and_next_update(crl_path)
|
||||||
|
scanned_der = scan_for_issuer_and_next_update(crl_path)
|
||||||
|
assert parsed_der == scanned_der
|
@ -5,6 +5,7 @@ import os
|
|||||||
import shutil
|
import shutil
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
from cryptography.hazmat.primitives.serialization import Encoding
|
from cryptography.hazmat.primitives.serialization import Encoding
|
||||||
|
from OpenSSL import crypto
|
||||||
|
|
||||||
from atst.domain.authnid.crl import (
|
from atst.domain.authnid.crl import (
|
||||||
CRLCache,
|
CRLCache,
|
||||||
@ -12,9 +13,17 @@ from atst.domain.authnid.crl import (
|
|||||||
CRLInvalidException,
|
CRLInvalidException,
|
||||||
NoOpCRLCache,
|
NoOpCRLCache,
|
||||||
)
|
)
|
||||||
|
from atst.domain.authnid.crl.util import (
|
||||||
|
scan_for_issuer_and_next_update,
|
||||||
|
build_crl_locations_cache,
|
||||||
|
serialize_crl_locations_cache,
|
||||||
|
load_crl_locations_cache,
|
||||||
|
CRLParseError,
|
||||||
|
JSON_CACHE,
|
||||||
|
)
|
||||||
|
|
||||||
from tests.mocks import FIXTURE_EMAIL_ADDRESS, DOD_CN
|
from tests.mocks import FIXTURE_EMAIL_ADDRESS, DOD_CN
|
||||||
from tests.utils import FakeLogger
|
from tests.utils import FakeLogger, parse_for_issuer_and_next_update
|
||||||
|
|
||||||
|
|
||||||
class MockX509Store:
|
class MockX509Store:
|
||||||
@ -34,7 +43,9 @@ class MockX509Store:
|
|||||||
|
|
||||||
def test_can_build_crl_list(crl_file, ca_key, ca_file, make_crl, tmpdir):
|
def test_can_build_crl_list(crl_file, ca_key, ca_file, make_crl, tmpdir):
|
||||||
crl = make_crl(ca_key)
|
crl = make_crl(ca_key)
|
||||||
cache = CRLCache(ca_file, crl_locations=[crl_file], store_class=MockX509Store)
|
dir_ = os.path.dirname(crl_file)
|
||||||
|
serialize_crl_locations_cache(dir_)
|
||||||
|
cache = CRLCache(ca_file, dir_, store_class=MockX509Store)
|
||||||
issuer_der = crl.issuer.public_bytes(default_backend())
|
issuer_der = crl.issuer.public_bytes(default_backend())
|
||||||
assert len(cache.crl_cache.keys()) == 1
|
assert len(cache.crl_cache.keys()) == 1
|
||||||
assert issuer_der in cache.crl_cache
|
assert issuer_der in cache.crl_cache
|
||||||
@ -42,26 +53,16 @@ def test_can_build_crl_list(crl_file, ca_key, ca_file, make_crl, tmpdir):
|
|||||||
assert cache.crl_cache[issuer_der]["expires"] == crl.next_update
|
assert cache.crl_cache[issuer_der]["expires"] == crl.next_update
|
||||||
|
|
||||||
|
|
||||||
def test_can_build_trusted_root_list():
|
def test_can_build_trusted_root_list(app):
|
||||||
location = "ssl/server-certs/ca-chain.pem"
|
location = "ssl/server-certs/ca-chain.pem"
|
||||||
cache = CRLCache(
|
cache = CRLCache(
|
||||||
root_location=location, crl_locations=[], store_class=MockX509Store
|
location, app.config["CRL_STORAGE_CONTAINER"], store_class=MockX509Store
|
||||||
)
|
)
|
||||||
with open(location) as f:
|
with open(location) as f:
|
||||||
content = f.read()
|
content = f.read()
|
||||||
assert len(cache.certificate_authorities.keys()) == content.count("BEGIN CERT")
|
assert len(cache.certificate_authorities.keys()) == content.count("BEGIN CERT")
|
||||||
|
|
||||||
|
|
||||||
def test_can_build_crl_list_with_missing_crls():
|
|
||||||
location = "ssl/client-certs/client-ca.der.crl"
|
|
||||||
cache = CRLCache(
|
|
||||||
"ssl/client-certs/client-ca.crt",
|
|
||||||
crl_locations=["tests/fixtures/sample.pdf"],
|
|
||||||
store_class=MockX509Store,
|
|
||||||
)
|
|
||||||
assert len(cache.crl_cache.keys()) == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_crl_validation_on_login(
|
def test_crl_validation_on_login(
|
||||||
app,
|
app,
|
||||||
client,
|
client,
|
||||||
@ -78,8 +79,9 @@ def test_crl_validation_on_login(
|
|||||||
|
|
||||||
crl = make_crl(ca_key, expired_serials=[bad_cert.serial_number])
|
crl = make_crl(ca_key, expired_serials=[bad_cert.serial_number])
|
||||||
serialize_pki_object_to_disk(crl, crl_file, encoding=Encoding.DER)
|
serialize_pki_object_to_disk(crl, crl_file, encoding=Encoding.DER)
|
||||||
|
crl_dir = os.path.dirname(crl_file)
|
||||||
|
|
||||||
cache = CRLCache(ca_file, crl_locations=[crl_file])
|
cache = CRLCache(ca_file, crl_dir)
|
||||||
assert cache.crl_check(good_cert.public_bytes(Encoding.PEM).decode())
|
assert cache.crl_check(good_cert.public_bytes(Encoding.PEM).decode())
|
||||||
with pytest.raises(CRLRevocationException):
|
with pytest.raises(CRLRevocationException):
|
||||||
cache.crl_check(bad_cert.public_bytes(Encoding.PEM).decode())
|
cache.crl_check(bad_cert.public_bytes(Encoding.PEM).decode())
|
||||||
@ -94,7 +96,8 @@ def test_can_dynamically_update_crls(
|
|||||||
make_crl,
|
make_crl,
|
||||||
serialize_pki_object_to_disk,
|
serialize_pki_object_to_disk,
|
||||||
):
|
):
|
||||||
cache = CRLCache(ca_file, crl_locations=[crl_file])
|
crl_dir = os.path.dirname(crl_file)
|
||||||
|
cache = CRLCache(ca_file, crl_dir)
|
||||||
client_cert = make_x509(rsa_key(), signer_key=ca_key, cn="chewbacca")
|
client_cert = make_x509(rsa_key(), signer_key=ca_key, cn="chewbacca")
|
||||||
client_pem = client_cert.public_bytes(Encoding.PEM)
|
client_pem = client_cert.public_bytes(Encoding.PEM)
|
||||||
assert cache.crl_check(client_pem)
|
assert cache.crl_check(client_pem)
|
||||||
@ -107,8 +110,10 @@ def test_can_dynamically_update_crls(
|
|||||||
assert cache.crl_check(client_pem)
|
assert cache.crl_check(client_pem)
|
||||||
|
|
||||||
|
|
||||||
def test_throws_error_for_missing_issuer():
|
def test_throws_error_for_missing_issuer(app):
|
||||||
cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[])
|
cache = CRLCache(
|
||||||
|
"ssl/server-certs/ca-chain.pem", app.config["CRL_STORAGE_CONTAINER"]
|
||||||
|
)
|
||||||
# this cert is self-signed, and so the application does not have a
|
# this cert is self-signed, and so the application does not have a
|
||||||
# corresponding CRL for it
|
# corresponding CRL for it
|
||||||
cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read()
|
cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read()
|
||||||
@ -123,10 +128,7 @@ def test_throws_error_for_missing_issuer():
|
|||||||
|
|
||||||
|
|
||||||
def test_multistep_certificate_chain():
|
def test_multistep_certificate_chain():
|
||||||
cache = CRLCache(
|
cache = CRLCache("tests/fixtures/chain/ca-chain.pem", "tests/fixtures/chain/")
|
||||||
"tests/fixtures/chain/ca-chain.pem",
|
|
||||||
crl_locations=["tests/fixtures/chain/intermediate.crl"],
|
|
||||||
)
|
|
||||||
cert = open("tests/fixtures/chain/client.crt", "rb").read()
|
cert = open("tests/fixtures/chain/client.crt", "rb").read()
|
||||||
assert cache.crl_check(cert)
|
assert cache.crl_check(cert)
|
||||||
|
|
||||||
@ -144,7 +146,8 @@ def test_expired_crl_raises_CRLInvalidException_with_failover_config_false(
|
|||||||
):
|
):
|
||||||
client_cert = make_x509(rsa_key(), signer_key=ca_key, cn="chewbacca")
|
client_cert = make_x509(rsa_key(), signer_key=ca_key, cn="chewbacca")
|
||||||
client_pem = client_cert.public_bytes(Encoding.PEM)
|
client_pem = client_cert.public_bytes(Encoding.PEM)
|
||||||
crl_cache = CRLCache(ca_file, crl_locations=[expired_crl_file])
|
crl_dir = os.path.dirname(expired_crl_file)
|
||||||
|
crl_cache = CRLCache(ca_file, crl_dir)
|
||||||
with pytest.raises(CRLInvalidException):
|
with pytest.raises(CRLInvalidException):
|
||||||
crl_cache.crl_check(client_pem)
|
crl_cache.crl_check(client_pem)
|
||||||
|
|
||||||
@ -154,7 +157,8 @@ def test_expired_crl_passes_with_failover_config_true(
|
|||||||
):
|
):
|
||||||
client_cert = make_x509(rsa_key(), signer_key=ca_key, cn="chewbacca")
|
client_cert = make_x509(rsa_key(), signer_key=ca_key, cn="chewbacca")
|
||||||
client_pem = client_cert.public_bytes(Encoding.PEM)
|
client_pem = client_cert.public_bytes(Encoding.PEM)
|
||||||
crl_cache = CRLCache(ca_file, crl_locations=[expired_crl_file])
|
crl_dir = os.path.dirname(expired_crl_file)
|
||||||
|
crl_cache = CRLCache(ca_file, crl_dir)
|
||||||
|
|
||||||
assert crl_cache.crl_check(client_pem)
|
assert crl_cache.crl_check(client_pem)
|
||||||
|
|
||||||
@ -173,7 +177,54 @@ def test_updates_expired_certs(
|
|||||||
def _crl_update_func():
|
def _crl_update_func():
|
||||||
shutil.copyfile(crl_file, expired_crl_file)
|
shutil.copyfile(crl_file, expired_crl_file)
|
||||||
|
|
||||||
crl_cache = CRLCache(
|
crl_dir = os.path.dirname(expired_crl_file)
|
||||||
ca_file, crl_locations=[expired_crl_file], crl_update_func=_crl_update_func
|
crl_cache = CRLCache(ca_file, crl_dir, crl_update_func=_crl_update_func)
|
||||||
)
|
|
||||||
crl_cache.crl_check(client_pem)
|
crl_cache.crl_check(client_pem)
|
||||||
|
|
||||||
|
|
||||||
|
def test_scan_for_issuer_and_next_update(crl_file):
|
||||||
|
parsed = parse_for_issuer_and_next_update(crl_file)
|
||||||
|
scanned = scan_for_issuer_and_next_update(crl_file)
|
||||||
|
assert parsed == scanned
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def bad_crl(tmpdir):
|
||||||
|
bad_file = tmpdir.join("bad.crl")
|
||||||
|
with open(bad_file, "wb") as bad:
|
||||||
|
bad.write(b"definitely not a crl")
|
||||||
|
|
||||||
|
return bad_file
|
||||||
|
|
||||||
|
|
||||||
|
def test_scan_for_issuer_and_next_update_with_bad_data(bad_crl):
|
||||||
|
with pytest.raises(CRLParseError):
|
||||||
|
scan_for_issuer_and_next_update(bad_crl)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_crl_locations_cache(crl_file):
|
||||||
|
issuer_der, next_update = parse_for_issuer_and_next_update(crl_file)
|
||||||
|
cache = build_crl_locations_cache([crl_file])
|
||||||
|
assert cache == {issuer_der: {"location": crl_file, "expires": next_update}}
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_crl_locations_cache_with_bad_data(crl_file, bad_crl):
|
||||||
|
logger = FakeLogger()
|
||||||
|
issuer_der, next_update = parse_for_issuer_and_next_update(crl_file)
|
||||||
|
cache = build_crl_locations_cache([crl_file, bad_crl], logger=logger)
|
||||||
|
assert cache == {issuer_der: {"location": crl_file, "expires": next_update}}
|
||||||
|
assert logger.messages
|
||||||
|
assert str(bad_crl) in logger.messages[0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_serialize_crl_locations_cache(crl_file, bad_crl):
|
||||||
|
dir_ = os.path.dirname(crl_file)
|
||||||
|
serialize_crl_locations_cache(dir_)
|
||||||
|
assert os.path.isfile("{}/{}".format(dir_, JSON_CACHE))
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_crl_locations_cache(crl_file, bad_crl):
|
||||||
|
dir_ = os.path.dirname(crl_file)
|
||||||
|
serialize_crl_locations_cache(dir_)
|
||||||
|
cache = load_crl_locations_cache(dir_)
|
||||||
|
assert isinstance(cache, dict)
|
||||||
|
0
tests/fixtures/crl/.keep
vendored
Normal file
0
tests/fixtures/crl/.keep
vendored
Normal file
@ -1,3 +1,4 @@
|
|||||||
|
import os
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@ -147,7 +148,8 @@ def swap_crl_cache(
|
|||||||
else:
|
else:
|
||||||
crl = make_crl(ca_key)
|
crl = make_crl(ca_key)
|
||||||
serialize_pki_object_to_disk(crl, crl_file, encoding=Encoding.DER)
|
serialize_pki_object_to_disk(crl, crl_file, encoding=Encoding.DER)
|
||||||
app.crl_cache = CRLCache(ca_file, crl_locations=[crl_file])
|
crl_dir = os.path.dirname(crl_file)
|
||||||
|
app.crl_cache = CRLCache(ca_file, crl_dir)
|
||||||
|
|
||||||
yield _swap_crl_cache
|
yield _swap_crl_cache
|
||||||
|
|
||||||
@ -172,7 +174,8 @@ def test_crl_validation_on_login(
|
|||||||
crl = make_crl(ca_key, expired_serials=[bad_cert.serial_number])
|
crl = make_crl(ca_key, expired_serials=[bad_cert.serial_number])
|
||||||
serialize_pki_object_to_disk(crl, crl_file, encoding=Encoding.DER)
|
serialize_pki_object_to_disk(crl, crl_file, encoding=Encoding.DER)
|
||||||
|
|
||||||
cache = CRLCache(ca_file, crl_locations=[crl_file])
|
crl_dir = os.path.dirname(crl_file)
|
||||||
|
cache = CRLCache(ca_file, crl_dir)
|
||||||
swap_crl_cache(cache)
|
swap_crl_cache(cache)
|
||||||
|
|
||||||
# bad cert is on the test CRL
|
# bad cert is on the test CRL
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
from flask import template_rendered
|
from flask import template_rendered
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from unittest.mock import Mock
|
from unittest.mock import Mock
|
||||||
|
from OpenSSL import crypto
|
||||||
|
|
||||||
from atst.utils.notification_sender import NotificationSender
|
from atst.utils.notification_sender import NotificationSender
|
||||||
|
|
||||||
@ -43,3 +44,10 @@ class FakeLogger:
|
|||||||
|
|
||||||
|
|
||||||
FakeNotificationSender = lambda: Mock(spec=NotificationSender)
|
FakeNotificationSender = lambda: Mock(spec=NotificationSender)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_for_issuer_and_next_update(crl):
|
||||||
|
with open(crl, "rb") as crl_file:
|
||||||
|
parsed = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
||||||
|
next_update = parsed.to_cryptography().next_update
|
||||||
|
return (parsed.get_issuer().der(), next_update)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user