diff --git a/atst/domain/users.py b/atst/domain/users.py index 87f95756..54cf4ae0 100644 --- a/atst/domain/users.py +++ b/atst/domain/users.py @@ -29,7 +29,7 @@ class Users(object): return user @classmethod - def create(cls, atat_role_name, **kwargs): + def create(cls, atat_role_name="developer", **kwargs): atat_role = Roles.get(atat_role_name) try: @@ -42,11 +42,11 @@ class Users(object): return user @classmethod - def get_or_create(cls, user_id, **kwargs): + def get_or_create_by_dod_id(cls, dod_id, **kwargs): try: - user = Users.get(user_id) + user = Users.get_by_dod_id(dod_id) except NotFoundError: - user = Users.create(id=user_id, **kwargs) + user = Users.create(dod_id=dod_id, **kwargs) db.session.add(user) db.session.commit() diff --git a/atst/models/user.py b/atst/models/user.py index 174d3775..6a1b530d 100644 --- a/atst/models/user.py +++ b/atst/models/user.py @@ -1,4 +1,4 @@ -from sqlalchemy import String, ForeignKey, Column, UniqueConstraint +from sqlalchemy import String, ForeignKey, Column from sqlalchemy.orm import relationship from sqlalchemy.dialects.postgresql import UUID diff --git a/atst/routes/__init__.py b/atst/routes/__init__.py index 3c1f99ac..29cadc2e 100644 --- a/atst/routes/__init__.py +++ b/atst/routes/__init__.py @@ -1,7 +1,9 @@ -from flask import Blueprint, render_template, g +from flask import Blueprint, render_template, g, redirect, session, url_for, request import pendulum from atst.domain.requests import Requests +from atst.domain.users import Users +from atst.domain.authnid.utils import parse_sdn bp = Blueprint("atst", __name__) @@ -24,3 +26,39 @@ def styleguide(): @bp.route('/') def catch_all(path): return render_template("{}.html".format(path)) + + +@bp.route('/login-redirect') +def log_in_user(): + # FIXME: Find or create user based on the X-Ssl-Client-S-Dn header + # TODO: Store/log the X-Ssl-Client-Cert in case it's needed? + if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and is_valid_certificate(request): + sdn = request.environ.get('HTTP_X_SSL_CLIENT_S_DN') + # TODO: error handling for bad SDN, database errors, etc + sdn_parts = parse_sdn(sdn) + user = Users.get_or_create_by_dod_id(**sdn_parts) + + session["user_id"] = user.id + + return redirect(url_for("atst.home")) + else: + template = render_template('not_authorized.html', atst_url=app.config['ATST_PASSTHROUGH']) + response = app.make_response(template) + response.status_code = 403 + + return response + +def is_valid_certificate(request): + cert = request.environ.get('HTTP_X_SSL_CLIENT_CERT') + if cert: + result = app.crl_validator.validate(cert.encode()) + if not result: + app.logger.info(app.crl_validator.errors[-1]) + return result + else: + return False + +def construct_redirect(uuid): + access_token = app.token_manager.token(uuid) + url = f'{app.config["ATST_REDIRECT"]}?bearer-token={access_token}' + return app.make_response(redirect(url)) diff --git a/tests/conftest.py b/tests/conftest.py index b166162f..34dc3d41 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,10 +16,9 @@ def app(request): ctx = _app.app_context() ctx.push() - def teardown(): - ctx.pop() + yield _app - return _app + ctx.pop() def apply_migrations(): @@ -34,9 +33,6 @@ def apply_migrations(): @pytest.fixture(scope='session') def db(app, request): - def teardown(): - _db.drop_all() - _db.app = app apply_migrations() diff --git a/tests/domain/test_users.py b/tests/domain/test_users.py index ca6daa2a..16d78366 100644 --- a/tests/domain/test_users.py +++ b/tests/domain/test_users.py @@ -4,6 +4,7 @@ from uuid import uuid4 from atst.domain.users import Users from atst.domain.exceptions import NotFoundError +DOD_ID = "my_dod_id" def test_create_user(): @@ -17,15 +18,13 @@ def test_create_user_with_nonexistent_role(): def test_get_or_create_nonexistent_user(): - user_id = uuid4() - user = Users.get_or_create(user_id, atat_role_name="developer") - assert user.id == user_id + user = Users.get_or_create_by_dod_id(DOD_ID, atat_role_name="developer") + assert user.dod_id == DOD_ID def test_get_or_create_existing_user(): - user_id = uuid4() - Users.get_or_create(user_id, atat_role_name="developer") - user = Users.get_or_create(user_id, atat_role_name="developer") + Users.get_or_create_by_dod_id(DOD_ID, atat_role_name="developer") + user = Users.get_or_create_by_dod_id(DOD_ID, atat_role_name="developer") assert user @@ -42,8 +41,8 @@ def test_get_nonexistent_user(): def test_get_user_by_dod_id(): - new_user = Users.create("developer", dod_id="my_dod_id") - user = Users.get_by_dod_id("my_dod_id") + new_user = Users.create("developer", dod_id=DOD_ID) + user = Users.get_by_dod_id(DOD_ID) assert user == new_user diff --git a/tests/domain/test_workspace_users.py b/tests/domain/test_workspace_users.py index 6f2a9266..2c651235 100644 --- a/tests/domain/test_workspace_users.py +++ b/tests/domain/test_workspace_users.py @@ -1,4 +1,3 @@ -import pytest from uuid import uuid4 from atst.domain.workspace_users import WorkspaceUsers diff --git a/tests/factories.py b/tests/factories.py index 582ce461..d1a129e5 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -1,7 +1,7 @@ import factory from uuid import uuid4 -from atst.models import Request, RequestStatusEvent +from atst.models import Request from atst.models.pe_number import PENumber from atst.models.task_order import TaskOrder diff --git a/tests/test_auth.py b/tests/test_auth.py index 91fdd741..5e7a37c0 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,78 +1,30 @@ -import re -import pytest +from flask import session + MOCK_USER = {"id": "438567dd-25fa-4d83-a8cc-8aa8366cb24a"} +DOD_SDN_INFO = { + 'first_name': 'ART', + 'last_name': 'GARFUNKEL', + 'dod_id': '5892460358' + } +DOD_SDN = f"CN={DOD_SDN_INFO['last_name']}.{DOD_SDN_INFO['first_name']}.G.{DOD_SDN_INFO['dod_id']},OU=OTHER,OU=PKI,OU=DoD,O=U.S. Government,C=US" + + def _fetch_user_info(c, t): return MOCK_USER -@pytest.mark.skip -def test_redirects_when_not_logged_in(): - pass - # response = yield http_client.fetch( - # base_url + "/home", raise_error=False, follow_redirects=False - # ) - # location = response.headers["Location"] - # assert response.code == 302 - # assert response.error - # assert re.match("/\??", location) +def test_login(client, monkeypatch): + monkeypatch.setattr("atst.routes.is_valid_certificate", lambda *args: True) -# @pytest.mark.skip -# def test_redirects_when_session_does_not_exist(): - # monkeypatch.setattr("atst.handlers.main.Main.get_secure_cookie", lambda s,c: 'stale cookie!') - # response = yield http_client.fetch( - # base_url + "/home", raise_error=False, follow_redirects=False - # ) - # location = response.headers["Location"] - # cookie = response.headers._dict.get('Set-Cookie') - # # should clear session cookie - # assert 'atat=""' in cookie - # assert response.code == 302 - # assert response.error - # assert re.match("/\??", location) + resp = client.get( + "/login-redirect", + environ_base={ + "HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS", + "HTTP_X_SSL_CLIENT_S_DN": DOD_SDN, + }, + ) - -# @pytest.mark.skip -# def test_login_with_valid_bearer_token(): -# monkeypatch.setattr("atst.handlers.login_redirect.LoginRedirect._fetch_user_info", _fetch_user_info) -# response = client.fetch( -# base_url + "/login-redirect?bearer-token=abc-123", -# follow_redirects=False, -# raise_error=False, -# ) -# assert response.headers["Set-Cookie"].startswith("atat") -# assert response.headers["Location"] == "/home" -# assert response.code == 302 -# -# -# @pytest.mark.skip -# def test_login_via_dev_endpoint(): -# response = yield http_client.fetch( -# base_url + "/login-dev", raise_error=False, follow_redirects=False -# ) -# assert response.headers["Set-Cookie"].startswith("atat") -# assert response.code == 302 -# assert response.headers["Location"] == "/home" -# -# -# @pytest.mark.skip -# def test_login_with_invalid_bearer_token(): -# _response = yield http_client.fetch( -# base_url + "/home", -# raise_error=False, -# headers={"Cookie": "bearer-token=anything"}, -# ) -# -# @pytest.mark.skip -# def test_valid_login_creates_session(): -# monkeypatch.setattr("atst.handlers.login_redirect.LoginRedirect._fetch_user_info", _fetch_user_info) -# assert len(app.sessions.sessions) == 0 -# yield http_client.fetch( -# base_url + "/login-redirect?bearer-token=abc-123", -# follow_redirects=False, -# raise_error=False, -# ) -# assert len(app.sessions.sessions) == 1 -# session = list(app.sessions.sessions.values())[0] -# assert "atat_permissions" in session["user"] -# assert isinstance(session["user"]["atat_permissions"], list) + assert resp.status_code == 302 + assert "home" in resp.headers["Location"] + assert session["user_id"] diff --git a/tests/test_routes.py b/tests/test_routes.py index fedffb21..71a4f9bc 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -1,6 +1,3 @@ -import pytest - - def test_routes(client): for path in ( "/",