raise exceptions, map to http error codes
This commit is contained in:
parent
7b8934e0cb
commit
d5ed99089c
@ -14,3 +14,17 @@ class AlreadyExistsError(Exception):
|
||||
@property
|
||||
def message(self):
|
||||
return "{} already exists".format(self.resource_name)
|
||||
|
||||
|
||||
class UnauthorizedError(Exception):
|
||||
def __init__(self, user, action):
|
||||
self.user = user
|
||||
self.action = action
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
return "User {} not authorized to {}".format(self.user.id, self.action)
|
||||
|
||||
|
||||
class UnauthenticatedError(Exception):
|
||||
pass
|
||||
|
@ -1,10 +1,11 @@
|
||||
from flask import Blueprint, abort, render_template, g, redirect, session, url_for, request
|
||||
from flask import Blueprint, render_template, g, redirect, session, url_for, request
|
||||
from flask import current_app as app
|
||||
import pendulum
|
||||
|
||||
from atst.domain.requests import Requests
|
||||
from atst.domain.users import Users
|
||||
from atst.domain.authnid.utils import parse_sdn
|
||||
from atst.domain.exceptions import UnauthenticatedError
|
||||
|
||||
bp = Blueprint("atst", __name__)
|
||||
|
||||
@ -29,6 +30,9 @@ def catch_all(path):
|
||||
return render_template("{}.html".format(path))
|
||||
|
||||
|
||||
# TODO: this should be partly consolidated into a domain function that takes
|
||||
# all the necessary UWSGI environment values as args and either returns a user
|
||||
# or raises the UnauthenticatedError
|
||||
@bp.route('/login-redirect')
|
||||
def login_redirect():
|
||||
if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and _is_valid_certificate(request):
|
||||
@ -39,7 +43,7 @@ def login_redirect():
|
||||
|
||||
return redirect(url_for("atst.home"))
|
||||
else:
|
||||
return abort(401)
|
||||
raise UnauthenticatedError()
|
||||
|
||||
|
||||
def _is_valid_certificate(request):
|
||||
|
@ -1,13 +1,19 @@
|
||||
from flask import render_template
|
||||
|
||||
import atst.domain.exceptions as exceptions
|
||||
|
||||
|
||||
def make_error_pages(app):
|
||||
@app.errorhandler(404)
|
||||
@app.errorhandler(exceptions.NotFoundError)
|
||||
@app.errorhandler(exceptions.UnauthorizedError)
|
||||
# pylint: disable=unused-variable
|
||||
def not_found(e):
|
||||
return render_template("not_found.html"), 404
|
||||
|
||||
|
||||
@app.errorhandler(401)
|
||||
@app.errorhandler(exceptions.UnauthenticatedError)
|
||||
# pylint: disable=unused-variable
|
||||
def unauthorized(e):
|
||||
return render_template('unauthorized.html'), 401
|
||||
|
||||
return app
|
||||
|
@ -1,9 +1,10 @@
|
||||
from flask import abort, g, redirect, render_template, url_for, request as http_request
|
||||
from flask import g, redirect, render_template, url_for, request as http_request
|
||||
|
||||
from . import requests_bp
|
||||
from atst.domain.requests import Requests
|
||||
from atst.routes.requests.jedi_request_flow import JEDIRequestFlow
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.domain.exceptions import UnauthorizedError
|
||||
|
||||
|
||||
@requests_bp.route("/requests/new/<int:screen>", methods=["GET"])
|
||||
@ -26,8 +27,8 @@ def requests_form_new(screen):
|
||||
)
|
||||
@requests_bp.route("/requests/new/<int:screen>/<string:request_id>", methods=["GET"])
|
||||
def requests_form_update(screen=1, request_id=None):
|
||||
if request_id and not _can_view_request(request_id):
|
||||
abort(404)
|
||||
if request_id:
|
||||
_check_can_view_request(request_id)
|
||||
|
||||
request = Requests.get(request_id) if request_id is not None else None
|
||||
jedi_flow = JEDIRequestFlow(screen, request, request_id=request_id)
|
||||
@ -107,8 +108,11 @@ def requests_submit(request_id=None):
|
||||
|
||||
# TODO: generalize this, along with other authorizations, into a policy-pattern
|
||||
# for authorization in the application
|
||||
def _can_view_request(request_id):
|
||||
return (
|
||||
Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST in g.current_user.atat_permissions
|
||||
or Requests.is_creator(request_id, g.current_user.id)
|
||||
)
|
||||
def _check_can_view_request(request_id):
|
||||
if Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST in g.current_user.atat_permissions:
|
||||
pass
|
||||
elif Requests.is_creator(request_id, g.current_user.id):
|
||||
pass
|
||||
else:
|
||||
raise UnauthorizedError(g.current_user, "view request {}".format(request_id))
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user