implement CRL checking from authnid
This commit is contained in:
parent
be079a62dc
commit
ac95bf371e
4
.gitignore
vendored
4
.gitignore
vendored
@ -30,3 +30,7 @@ static/assets/*
|
||||
log/*
|
||||
|
||||
config/dev.ini
|
||||
|
||||
# CRLs
|
||||
/crl
|
||||
/crl-tmp
|
||||
|
13
atst/app.py
13
atst/app.py
@ -1,5 +1,6 @@
|
||||
import os
|
||||
import re
|
||||
import pathlib
|
||||
from configparser import ConfigParser
|
||||
from flask import Flask, request, g
|
||||
from flask_session import Session
|
||||
@ -13,6 +14,7 @@ from atst.routes import bp
|
||||
from atst.routes.workspaces import bp as workspace_routes
|
||||
from atst.routes.requests import requests_bp
|
||||
from atst.routes.dev import bp as dev_routes
|
||||
from atst.domain.authnid.crl.validator import Validator
|
||||
|
||||
|
||||
ENV = os.getenv("FLASK_ENV", "dev")
|
||||
@ -33,6 +35,7 @@ def make_app(config):
|
||||
app.config.update({"SESSION_REDIS": redis})
|
||||
|
||||
make_flask_callbacks(app)
|
||||
make_crl_validator(app)
|
||||
|
||||
db.init_app(app)
|
||||
Session(app)
|
||||
@ -123,3 +126,13 @@ def make_config():
|
||||
|
||||
def make_redis(config):
|
||||
return redis.Redis.from_url(config['REDIS_URI'])
|
||||
|
||||
def make_crl_validator(app):
|
||||
crl_locations = []
|
||||
for filename in pathlib.Path(app.config["CRL_DIRECTORY"]).glob("*"):
|
||||
crl_locations.append(filename.absolute())
|
||||
app.crl_validator = Validator(
|
||||
roots=[app.config["CA_CHAIN"]], crl_locations=crl_locations
|
||||
)
|
||||
for e in app.crl_validator.errors:
|
||||
app.logger.error(e)
|
||||
|
@ -35,7 +35,7 @@ def catch_all(path):
|
||||
|
||||
@bp.route('/login-redirect')
|
||||
def login_redirect():
|
||||
if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and is_valid_certificate(request):
|
||||
if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and _is_valid_certificate(request):
|
||||
sdn = request.environ.get('HTTP_X_SSL_CLIENT_S_DN')
|
||||
sdn_parts = parse_sdn(sdn)
|
||||
user = Users.get_or_create_by_dod_id(**sdn_parts)
|
||||
@ -54,7 +54,7 @@ def unauthorized():
|
||||
return response
|
||||
|
||||
|
||||
def is_valid_certificate(request):
|
||||
def _is_valid_certificate(request):
|
||||
cert = request.environ.get('HTTP_X_SSL_CLIENT_CERT')
|
||||
if cert:
|
||||
result = app.crl_validator.validate(cert.encode())
|
||||
|
@ -19,3 +19,5 @@ PGDATABASE = atat
|
||||
SESSION_TYPE = redis
|
||||
SESSION_COOKIE_NAME=atat
|
||||
SESSION_USE_SIGNER = True
|
||||
CRL_DIRECTORY = crl
|
||||
CA_CHAIN = ssl/server-certs/ca-chain.pem
|
||||
|
@ -2,3 +2,4 @@
|
||||
PGHOST = postgreshost
|
||||
PGDATABASE = atat_test
|
||||
REDIS_URI = redis://redishost:6379
|
||||
CRL_DIRECTORY = tests/fixtures/crl
|
||||
|
@ -1,2 +1,3 @@
|
||||
[default]
|
||||
PGDATABASE = atat_test
|
||||
CRL_DIRECTORY = tests/fixtures/crl
|
||||
|
@ -5,7 +5,7 @@ set -e
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
mkdir -p crl-tmp
|
||||
pipenv run python ./authnid/crl/util.py crl-tmp
|
||||
pipenv run python ./atst/domain/authnid/crl/util.py crl-tmp
|
||||
mkdir -p crl
|
||||
rsync -rq crl-tmp/. crl/.
|
||||
rm -rf crl-tmp
|
||||
|
BIN
tests/fixtures/crl/client-ca.der.crl
vendored
Normal file
BIN
tests/fixtures/crl/client-ca.der.crl
vendored
Normal file
Binary file not shown.
@ -10,7 +10,7 @@ def _fetch_user_info(c, t):
|
||||
|
||||
|
||||
def test_successful_login_redirect(client, monkeypatch):
|
||||
monkeypatch.setattr("atst.routes.is_valid_certificate", lambda *args: True)
|
||||
monkeypatch.setattr("atst.routes._is_valid_certificate", lambda *args: True)
|
||||
|
||||
resp = client.get(
|
||||
"/login-redirect",
|
||||
@ -52,3 +52,36 @@ def test_protected_route(client, app):
|
||||
resp = client.post(route)
|
||||
assert resp.status_code == 302
|
||||
assert resp.headers["Location"] == "http://localhost/"
|
||||
|
||||
|
||||
# this implicitly relies on the test config and test CRL in tests/fixtures/crl
|
||||
def test_crl_validation_on_login(client):
|
||||
good_cert = open('ssl/client-certs/atat.mil.crt', 'rb').read()
|
||||
bad_cert = open('ssl/client-certs/bad-atat.mil.crt', 'rb').read()
|
||||
|
||||
# bad cert is on the test CRL
|
||||
resp = client.get(
|
||||
"/login-redirect",
|
||||
environ_base={
|
||||
"HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS",
|
||||
"HTTP_X_SSL_CLIENT_S_DN": DOD_SDN,
|
||||
"HTTP_X_SSL_CLIENT_CERT": bad_cert.decode()
|
||||
},
|
||||
)
|
||||
assert resp.status_code == 302
|
||||
assert "unauthorized" in resp.headers["Location"]
|
||||
assert "user_id" not in session
|
||||
|
||||
# good cert is not on the test CRL, passes
|
||||
resp = client.get(
|
||||
"/login-redirect",
|
||||
environ_base={
|
||||
"HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS",
|
||||
"HTTP_X_SSL_CLIENT_S_DN": DOD_SDN,
|
||||
"HTTP_X_SSL_CLIENT_CERT": good_cert.decode()
|
||||
},
|
||||
)
|
||||
assert resp.status_code == 302
|
||||
assert "home" in resp.headers["Location"]
|
||||
assert session["user_id"]
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user