Merge pull request #138 from dod-ccpo/159414499-request-auth
159414499 request auth
This commit is contained in:
commit
00f1500940
@ -15,6 +15,7 @@ from atst.routes import bp
|
||||
from atst.routes.workspaces import bp as workspace_routes
|
||||
from atst.routes.requests import requests_bp
|
||||
from atst.routes.dev import bp as dev_routes
|
||||
from atst.routes.errors import make_error_pages
|
||||
from atst.domain.authnid.crl.validator import Validator
|
||||
from atst.domain.auth import apply_authentication
|
||||
|
||||
@ -45,6 +46,7 @@ def make_app(config):
|
||||
Session(app)
|
||||
assets_environment.init_app(app)
|
||||
|
||||
make_error_pages(app)
|
||||
app.register_blueprint(bp)
|
||||
app.register_blueprint(workspace_routes)
|
||||
app.register_blueprint(requests_bp)
|
||||
|
@ -14,3 +14,17 @@ class AlreadyExistsError(Exception):
|
||||
@property
|
||||
def message(self):
|
||||
return "{} already exists".format(self.resource_name)
|
||||
|
||||
|
||||
class UnauthorizedError(Exception):
|
||||
def __init__(self, user, action):
|
||||
self.user = user
|
||||
self.action = action
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
return "User {} not authorized to {}".format(self.user.id, self.action)
|
||||
|
||||
|
||||
class UnauthenticatedError(Exception):
|
||||
pass
|
||||
|
@ -1,4 +1,4 @@
|
||||
from sqlalchemy import exists, and_
|
||||
from sqlalchemy import exists, and_, exc
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
@ -42,11 +42,14 @@ class Requests(object):
|
||||
|
||||
@classmethod
|
||||
def exists(cls, request_id, creator_id):
|
||||
return db.session.query(
|
||||
exists().where(
|
||||
and_(Request.id == request_id, Request.creator == creator_id)
|
||||
)
|
||||
).scalar()
|
||||
try:
|
||||
return db.session.query(
|
||||
exists().where(
|
||||
and_(Request.id == request_id, Request.creator == creator_id)
|
||||
)
|
||||
).scalar()
|
||||
except exc.DataError:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def get(cls, request_id):
|
||||
|
@ -5,6 +5,7 @@ import pendulum
|
||||
from atst.domain.requests import Requests
|
||||
from atst.domain.users import Users
|
||||
from atst.domain.authnid.utils import parse_sdn
|
||||
from atst.domain.exceptions import UnauthenticatedError
|
||||
|
||||
bp = Blueprint("atst", __name__)
|
||||
|
||||
@ -29,6 +30,9 @@ def catch_all(path):
|
||||
return render_template("{}.html".format(path))
|
||||
|
||||
|
||||
# TODO: this should be partly consolidated into a domain function that takes
|
||||
# all the necessary UWSGI environment values as args and either returns a user
|
||||
# or raises the UnauthenticatedError
|
||||
@bp.route('/login-redirect')
|
||||
def login_redirect():
|
||||
if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and _is_valid_certificate(request):
|
||||
@ -39,15 +43,7 @@ def login_redirect():
|
||||
|
||||
return redirect(url_for("atst.home"))
|
||||
else:
|
||||
return redirect(url_for("atst.unauthorized"))
|
||||
|
||||
|
||||
@bp.route("/unauthorized")
|
||||
def unauthorized():
|
||||
template = render_template('unauthorized.html')
|
||||
response = app.make_response(template)
|
||||
response.status_code = 401
|
||||
return response
|
||||
raise UnauthenticatedError()
|
||||
|
||||
|
||||
def _is_valid_certificate(request):
|
||||
|
19
atst/routes/errors.py
Normal file
19
atst/routes/errors.py
Normal file
@ -0,0 +1,19 @@
|
||||
from flask import render_template
|
||||
|
||||
import atst.domain.exceptions as exceptions
|
||||
|
||||
|
||||
def make_error_pages(app):
|
||||
@app.errorhandler(exceptions.NotFoundError)
|
||||
@app.errorhandler(exceptions.UnauthorizedError)
|
||||
# pylint: disable=unused-variable
|
||||
def not_found(e):
|
||||
return render_template("not_found.html"), 404
|
||||
|
||||
|
||||
@app.errorhandler(exceptions.UnauthenticatedError)
|
||||
# pylint: disable=unused-variable
|
||||
def unauthorized(e):
|
||||
return render_template('unauthorized.html'), 401
|
||||
|
||||
return app
|
@ -3,6 +3,8 @@ from flask import g, redirect, render_template, url_for, request as http_request
|
||||
from . import requests_bp
|
||||
from atst.domain.requests import Requests
|
||||
from atst.routes.requests.jedi_request_flow import JEDIRequestFlow
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.domain.exceptions import UnauthorizedError
|
||||
|
||||
|
||||
@requests_bp.route("/requests/new/<int:screen>", methods=["GET"])
|
||||
@ -25,6 +27,9 @@ def requests_form_new(screen):
|
||||
)
|
||||
@requests_bp.route("/requests/new/<int:screen>/<string:request_id>", methods=["GET"])
|
||||
def requests_form_update(screen=1, request_id=None):
|
||||
if request_id:
|
||||
_check_can_view_request(request_id)
|
||||
|
||||
request = Requests.get(request_id) if request_id is not None else None
|
||||
jedi_flow = JEDIRequestFlow(screen, request, request_id=request_id)
|
||||
|
||||
@ -79,10 +84,12 @@ def requests_update(screen=1, request_id=None):
|
||||
request_id=jedi_flow.request_id,
|
||||
)
|
||||
return redirect(where)
|
||||
|
||||
else:
|
||||
return render_template(
|
||||
"requests/screen-%d.html" % int(screen), **rerender_args
|
||||
)
|
||||
|
||||
else:
|
||||
return render_template("requests/screen-%d.html" % int(screen), **rerender_args)
|
||||
|
||||
@ -94,5 +101,18 @@ def requests_submit(request_id=None):
|
||||
|
||||
if request.status == "approved":
|
||||
return redirect("/requests?modal=True")
|
||||
|
||||
else:
|
||||
return redirect("/requests")
|
||||
|
||||
|
||||
# TODO: generalize this, along with other authorizations, into a policy-pattern
|
||||
# for authorization in the application
|
||||
def _check_can_view_request(request_id):
|
||||
if Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST in g.current_user.atat_permissions:
|
||||
pass
|
||||
elif Requests.exists(request_id, g.current_user.id):
|
||||
pass
|
||||
else:
|
||||
raise UnauthorizedError(g.current_user, "view request {}".format(request_id))
|
||||
|
||||
|
12
templates/not_found.html
Normal file
12
templates/not_found.html
Normal file
@ -0,0 +1,12 @@
|
||||
{% extends "error_base.html" %}
|
||||
|
||||
{% block content %}
|
||||
|
||||
<main class="usa-section usa-content">
|
||||
|
||||
<h1>Not Found</h1>
|
||||
|
||||
</main>
|
||||
|
||||
{% endblock %}
|
||||
|
@ -5,7 +5,7 @@ from atst.domain.exceptions import NotFoundError
|
||||
from atst.domain.requests import Requests
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
|
||||
from tests.factories import RequestFactory
|
||||
from tests.factories import RequestFactory, UserFactory
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@ -48,3 +48,11 @@ def test_dont_auto_approve_if_no_dollar_value_specified(new_request):
|
||||
request = Requests.submit(new_request)
|
||||
|
||||
assert request.status == RequestStatus.PENDING_CCPO_APPROVAL
|
||||
|
||||
|
||||
def test_exists(session):
|
||||
user_allowed = UserFactory.create()
|
||||
user_denied = UserFactory.create()
|
||||
request = RequestFactory.create(creator=user_allowed.id)
|
||||
assert Requests.exists(request.id, user_allowed.id)
|
||||
assert not Requests.exists(request.id, user_denied.id)
|
||||
|
@ -1,3 +1,5 @@
|
||||
import random
|
||||
import string
|
||||
import factory
|
||||
from uuid import uuid4
|
||||
|
||||
@ -7,6 +9,8 @@ from atst.models.pe_number import PENumber
|
||||
from atst.models.task_order import TaskOrder
|
||||
from atst.models.user import User
|
||||
from atst.models.role import Role
|
||||
from atst.models.request_status_event import RequestStatusEvent
|
||||
from atst.domain.roles import Roles
|
||||
|
||||
|
||||
class RequestStatusFactory(factory.alchemy.SQLAlchemyModelFactory):
|
||||
@ -22,6 +26,7 @@ class RequestFactory(factory.alchemy.SQLAlchemyModelFactory):
|
||||
status_events = factory.RelatedFactory(
|
||||
RequestStatusFactory, "request", new_status=RequestStatus.STARTED
|
||||
)
|
||||
body = {}
|
||||
|
||||
|
||||
class PENumberFactory(factory.alchemy.SQLAlchemyModelFactory):
|
||||
@ -46,7 +51,16 @@ class UserFactory(factory.alchemy.SQLAlchemyModelFactory):
|
||||
model = User
|
||||
|
||||
id = factory.Sequence(lambda x: uuid4())
|
||||
email = "fake.user@mail.com"
|
||||
first_name = "Fake"
|
||||
last_name = "User"
|
||||
email = factory.Faker("email")
|
||||
first_name = factory.Faker("first_name")
|
||||
last_name = factory.Faker("last_name")
|
||||
atat_role = factory.SubFactory(RoleFactory)
|
||||
dod_id = factory.LazyFunction(lambda: "".join(random.choices(string.digits, k=10)))
|
||||
|
||||
|
||||
class RequestStatusEventFactory(factory.alchemy.SQLAlchemyModelFactory):
|
||||
|
||||
class Meta:
|
||||
model = RequestStatusEvent
|
||||
|
||||
id = factory.Sequence(lambda x: uuid4())
|
||||
|
@ -2,7 +2,8 @@ import re
|
||||
import pytest
|
||||
import urllib
|
||||
from tests.mocks import MOCK_USER, MOCK_REQUEST
|
||||
from tests.factories import RequestFactory
|
||||
from tests.factories import RequestFactory, UserFactory
|
||||
from atst.domain.roles import Roles
|
||||
|
||||
|
||||
ERROR_CLASS = "alert--error"
|
||||
@ -27,3 +28,41 @@ def test_submit_valid_request_form(monkeypatch, client, user_session):
|
||||
data="meaning=42",
|
||||
)
|
||||
assert "/requests/new/2" in response.headers.get("Location")
|
||||
|
||||
|
||||
def test_owner_can_view_request(client, user_session):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
request = RequestFactory.create(creator=user.id)
|
||||
|
||||
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_non_owner_cannot_view_request(client, user_session):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
request = RequestFactory.create()
|
||||
|
||||
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
|
||||
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_ccpo_can_view_request(client, user_session):
|
||||
ccpo = Roles.get("ccpo")
|
||||
user = UserFactory.create(atat_role=ccpo)
|
||||
user_session(user)
|
||||
request = RequestFactory.create()
|
||||
|
||||
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_nonexistent_request(client, user_session):
|
||||
user_session()
|
||||
response = client.get("/requests/new/1/foo", follow_redirects=True)
|
||||
|
||||
assert response.status_code == 404
|
||||
|
@ -27,8 +27,7 @@ def test_successful_login_redirect(client, monkeypatch):
|
||||
def test_unsuccessful_login_redirect(client, monkeypatch):
|
||||
resp = client.get("/login-redirect")
|
||||
|
||||
assert resp.status_code == 302
|
||||
assert "unauthorized" in resp.headers["Location"]
|
||||
assert resp.status_code == 401
|
||||
assert "user_id" not in session
|
||||
|
||||
|
||||
@ -55,7 +54,6 @@ def test_routes_are_protected(client, app):
|
||||
|
||||
|
||||
UNPROTECTED_ROUTES = ["/", "/login-dev", "/login-redirect", "/unauthorized"]
|
||||
|
||||
# this implicitly relies on the test config and test CRL in tests/fixtures/crl
|
||||
|
||||
|
||||
@ -72,8 +70,7 @@ def test_crl_validation_on_login(client):
|
||||
"HTTP_X_SSL_CLIENT_CERT": bad_cert.decode(),
|
||||
},
|
||||
)
|
||||
assert resp.status_code == 302
|
||||
assert "unauthorized" in resp.headers["Location"]
|
||||
assert resp.status_code == 401
|
||||
assert "user_id" not in session
|
||||
|
||||
# good cert is not on the test CRL, passes
|
||||
|
Loading…
x
Reference in New Issue
Block a user