Implement log_in_user
This commit is contained in:
parent
df0b4e64c0
commit
13146e9362
@ -29,7 +29,7 @@ class Users(object):
|
|||||||
return user
|
return user
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, atat_role_name, **kwargs):
|
def create(cls, atat_role_name="developer", **kwargs):
|
||||||
atat_role = Roles.get(atat_role_name)
|
atat_role = Roles.get(atat_role_name)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -42,11 +42,11 @@ class Users(object):
|
|||||||
return user
|
return user
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_or_create(cls, user_id, **kwargs):
|
def get_or_create_by_dod_id(cls, dod_id, **kwargs):
|
||||||
try:
|
try:
|
||||||
user = Users.get(user_id)
|
user = Users.get_by_dod_id(dod_id)
|
||||||
except NotFoundError:
|
except NotFoundError:
|
||||||
user = Users.create(id=user_id, **kwargs)
|
user = Users.create(dod_id=dod_id, **kwargs)
|
||||||
db.session.add(user)
|
db.session.add(user)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
from sqlalchemy import String, ForeignKey, Column, UniqueConstraint
|
from sqlalchemy import String, ForeignKey, Column
|
||||||
from sqlalchemy.orm import relationship
|
from sqlalchemy.orm import relationship
|
||||||
from sqlalchemy.dialects.postgresql import UUID
|
from sqlalchemy.dialects.postgresql import UUID
|
||||||
|
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
from flask import Blueprint, render_template, g
|
from flask import Blueprint, render_template, g, redirect, session, url_for, request
|
||||||
import pendulum
|
import pendulum
|
||||||
|
|
||||||
from atst.domain.requests import Requests
|
from atst.domain.requests import Requests
|
||||||
|
from atst.domain.users import Users
|
||||||
|
from atst.domain.authnid.utils import parse_sdn
|
||||||
|
|
||||||
bp = Blueprint("atst", __name__)
|
bp = Blueprint("atst", __name__)
|
||||||
|
|
||||||
@ -24,3 +26,39 @@ def styleguide():
|
|||||||
@bp.route('/<path:path>')
|
@bp.route('/<path:path>')
|
||||||
def catch_all(path):
|
def catch_all(path):
|
||||||
return render_template("{}.html".format(path))
|
return render_template("{}.html".format(path))
|
||||||
|
|
||||||
|
|
||||||
|
@bp.route('/login-redirect')
|
||||||
|
def log_in_user():
|
||||||
|
# FIXME: Find or create user based on the X-Ssl-Client-S-Dn header
|
||||||
|
# TODO: Store/log the X-Ssl-Client-Cert in case it's needed?
|
||||||
|
if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and is_valid_certificate(request):
|
||||||
|
sdn = request.environ.get('HTTP_X_SSL_CLIENT_S_DN')
|
||||||
|
# TODO: error handling for bad SDN, database errors, etc
|
||||||
|
sdn_parts = parse_sdn(sdn)
|
||||||
|
user = Users.get_or_create_by_dod_id(**sdn_parts)
|
||||||
|
|
||||||
|
session["user_id"] = user.id
|
||||||
|
|
||||||
|
return redirect(url_for("atst.home"))
|
||||||
|
else:
|
||||||
|
template = render_template('not_authorized.html', atst_url=app.config['ATST_PASSTHROUGH'])
|
||||||
|
response = app.make_response(template)
|
||||||
|
response.status_code = 403
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def is_valid_certificate(request):
|
||||||
|
cert = request.environ.get('HTTP_X_SSL_CLIENT_CERT')
|
||||||
|
if cert:
|
||||||
|
result = app.crl_validator.validate(cert.encode())
|
||||||
|
if not result:
|
||||||
|
app.logger.info(app.crl_validator.errors[-1])
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def construct_redirect(uuid):
|
||||||
|
access_token = app.token_manager.token(uuid)
|
||||||
|
url = f'{app.config["ATST_REDIRECT"]}?bearer-token={access_token}'
|
||||||
|
return app.make_response(redirect(url))
|
||||||
|
@ -16,10 +16,9 @@ def app(request):
|
|||||||
ctx = _app.app_context()
|
ctx = _app.app_context()
|
||||||
ctx.push()
|
ctx.push()
|
||||||
|
|
||||||
def teardown():
|
yield _app
|
||||||
ctx.pop()
|
|
||||||
|
|
||||||
return _app
|
ctx.pop()
|
||||||
|
|
||||||
|
|
||||||
def apply_migrations():
|
def apply_migrations():
|
||||||
@ -34,9 +33,6 @@ def apply_migrations():
|
|||||||
@pytest.fixture(scope='session')
|
@pytest.fixture(scope='session')
|
||||||
def db(app, request):
|
def db(app, request):
|
||||||
|
|
||||||
def teardown():
|
|
||||||
_db.drop_all()
|
|
||||||
|
|
||||||
_db.app = app
|
_db.app = app
|
||||||
|
|
||||||
apply_migrations()
|
apply_migrations()
|
||||||
|
@ -4,6 +4,7 @@ from uuid import uuid4
|
|||||||
from atst.domain.users import Users
|
from atst.domain.users import Users
|
||||||
from atst.domain.exceptions import NotFoundError
|
from atst.domain.exceptions import NotFoundError
|
||||||
|
|
||||||
|
DOD_ID = "my_dod_id"
|
||||||
|
|
||||||
|
|
||||||
def test_create_user():
|
def test_create_user():
|
||||||
@ -17,15 +18,13 @@ def test_create_user_with_nonexistent_role():
|
|||||||
|
|
||||||
|
|
||||||
def test_get_or_create_nonexistent_user():
|
def test_get_or_create_nonexistent_user():
|
||||||
user_id = uuid4()
|
user = Users.get_or_create_by_dod_id(DOD_ID, atat_role_name="developer")
|
||||||
user = Users.get_or_create(user_id, atat_role_name="developer")
|
assert user.dod_id == DOD_ID
|
||||||
assert user.id == user_id
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_or_create_existing_user():
|
def test_get_or_create_existing_user():
|
||||||
user_id = uuid4()
|
Users.get_or_create_by_dod_id(DOD_ID, atat_role_name="developer")
|
||||||
Users.get_or_create(user_id, atat_role_name="developer")
|
user = Users.get_or_create_by_dod_id(DOD_ID, atat_role_name="developer")
|
||||||
user = Users.get_or_create(user_id, atat_role_name="developer")
|
|
||||||
assert user
|
assert user
|
||||||
|
|
||||||
|
|
||||||
@ -42,8 +41,8 @@ def test_get_nonexistent_user():
|
|||||||
|
|
||||||
|
|
||||||
def test_get_user_by_dod_id():
|
def test_get_user_by_dod_id():
|
||||||
new_user = Users.create("developer", dod_id="my_dod_id")
|
new_user = Users.create("developer", dod_id=DOD_ID)
|
||||||
user = Users.get_by_dod_id("my_dod_id")
|
user = Users.get_by_dod_id(DOD_ID)
|
||||||
assert user == new_user
|
assert user == new_user
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
import pytest
|
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from atst.domain.workspace_users import WorkspaceUsers
|
from atst.domain.workspace_users import WorkspaceUsers
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import factory
|
import factory
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from atst.models import Request, RequestStatusEvent
|
from atst.models import Request
|
||||||
from atst.models.pe_number import PENumber
|
from atst.models.pe_number import PENumber
|
||||||
from atst.models.task_order import TaskOrder
|
from atst.models.task_order import TaskOrder
|
||||||
|
|
||||||
|
@ -1,78 +1,30 @@
|
|||||||
import re
|
from flask import session
|
||||||
import pytest
|
|
||||||
|
|
||||||
MOCK_USER = {"id": "438567dd-25fa-4d83-a8cc-8aa8366cb24a"}
|
MOCK_USER = {"id": "438567dd-25fa-4d83-a8cc-8aa8366cb24a"}
|
||||||
|
DOD_SDN_INFO = {
|
||||||
|
'first_name': 'ART',
|
||||||
|
'last_name': 'GARFUNKEL',
|
||||||
|
'dod_id': '5892460358'
|
||||||
|
}
|
||||||
|
DOD_SDN = f"CN={DOD_SDN_INFO['last_name']}.{DOD_SDN_INFO['first_name']}.G.{DOD_SDN_INFO['dod_id']},OU=OTHER,OU=PKI,OU=DoD,O=U.S. Government,C=US"
|
||||||
|
|
||||||
|
|
||||||
def _fetch_user_info(c, t):
|
def _fetch_user_info(c, t):
|
||||||
return MOCK_USER
|
return MOCK_USER
|
||||||
|
|
||||||
@pytest.mark.skip
|
|
||||||
def test_redirects_when_not_logged_in():
|
|
||||||
pass
|
|
||||||
# response = yield http_client.fetch(
|
|
||||||
# base_url + "/home", raise_error=False, follow_redirects=False
|
|
||||||
# )
|
|
||||||
# location = response.headers["Location"]
|
|
||||||
# assert response.code == 302
|
|
||||||
# assert response.error
|
|
||||||
# assert re.match("/\??", location)
|
|
||||||
|
|
||||||
|
def test_login(client, monkeypatch):
|
||||||
|
monkeypatch.setattr("atst.routes.is_valid_certificate", lambda *args: True)
|
||||||
|
|
||||||
# @pytest.mark.skip
|
resp = client.get(
|
||||||
# def test_redirects_when_session_does_not_exist():
|
"/login-redirect",
|
||||||
# monkeypatch.setattr("atst.handlers.main.Main.get_secure_cookie", lambda s,c: 'stale cookie!')
|
environ_base={
|
||||||
# response = yield http_client.fetch(
|
"HTTP_X_SSL_CLIENT_VERIFY": "SUCCESS",
|
||||||
# base_url + "/home", raise_error=False, follow_redirects=False
|
"HTTP_X_SSL_CLIENT_S_DN": DOD_SDN,
|
||||||
# )
|
},
|
||||||
# location = response.headers["Location"]
|
)
|
||||||
# cookie = response.headers._dict.get('Set-Cookie')
|
|
||||||
# # should clear session cookie
|
|
||||||
# assert 'atat=""' in cookie
|
|
||||||
# assert response.code == 302
|
|
||||||
# assert response.error
|
|
||||||
# assert re.match("/\??", location)
|
|
||||||
|
|
||||||
|
assert resp.status_code == 302
|
||||||
# @pytest.mark.skip
|
assert "home" in resp.headers["Location"]
|
||||||
# def test_login_with_valid_bearer_token():
|
assert session["user_id"]
|
||||||
# monkeypatch.setattr("atst.handlers.login_redirect.LoginRedirect._fetch_user_info", _fetch_user_info)
|
|
||||||
# response = client.fetch(
|
|
||||||
# base_url + "/login-redirect?bearer-token=abc-123",
|
|
||||||
# follow_redirects=False,
|
|
||||||
# raise_error=False,
|
|
||||||
# )
|
|
||||||
# assert response.headers["Set-Cookie"].startswith("atat")
|
|
||||||
# assert response.headers["Location"] == "/home"
|
|
||||||
# assert response.code == 302
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# @pytest.mark.skip
|
|
||||||
# def test_login_via_dev_endpoint():
|
|
||||||
# response = yield http_client.fetch(
|
|
||||||
# base_url + "/login-dev", raise_error=False, follow_redirects=False
|
|
||||||
# )
|
|
||||||
# assert response.headers["Set-Cookie"].startswith("atat")
|
|
||||||
# assert response.code == 302
|
|
||||||
# assert response.headers["Location"] == "/home"
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# @pytest.mark.skip
|
|
||||||
# def test_login_with_invalid_bearer_token():
|
|
||||||
# _response = yield http_client.fetch(
|
|
||||||
# base_url + "/home",
|
|
||||||
# raise_error=False,
|
|
||||||
# headers={"Cookie": "bearer-token=anything"},
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# @pytest.mark.skip
|
|
||||||
# def test_valid_login_creates_session():
|
|
||||||
# monkeypatch.setattr("atst.handlers.login_redirect.LoginRedirect._fetch_user_info", _fetch_user_info)
|
|
||||||
# assert len(app.sessions.sessions) == 0
|
|
||||||
# yield http_client.fetch(
|
|
||||||
# base_url + "/login-redirect?bearer-token=abc-123",
|
|
||||||
# follow_redirects=False,
|
|
||||||
# raise_error=False,
|
|
||||||
# )
|
|
||||||
# assert len(app.sessions.sessions) == 1
|
|
||||||
# session = list(app.sessions.sessions.values())[0]
|
|
||||||
# assert "atat_permissions" in session["user"]
|
|
||||||
# assert isinstance(session["user"]["atat_permissions"], list)
|
|
||||||
|
@ -1,6 +1,3 @@
|
|||||||
import pytest
|
|
||||||
|
|
||||||
|
|
||||||
def test_routes(client):
|
def test_routes(client):
|
||||||
for path in (
|
for path in (
|
||||||
"/",
|
"/",
|
||||||
|
Loading…
x
Reference in New Issue
Block a user