Merge pull request #1127 from dod-ccpo/audit_log_feature_flag
Audit log feature flag
This commit is contained in:
commit
7e1a353db5
@ -150,6 +150,7 @@ def set_default_headers(app): # pragma: no cover
|
|||||||
def map_config(config):
|
def map_config(config):
|
||||||
return {
|
return {
|
||||||
**config["default"],
|
**config["default"],
|
||||||
|
"USE_AUDIT_LOG": config["default"].getboolean("USE_AUDIT_LOG"),
|
||||||
"ENV": config["default"]["ENVIRONMENT"],
|
"ENV": config["default"]["ENVIRONMENT"],
|
||||||
"BROKER_URL": config["default"]["REDIS_URI"],
|
"BROKER_URL": config["default"]["REDIS_URI"],
|
||||||
"DEBUG": config["default"].getboolean("DEBUG"),
|
"DEBUG": config["default"].getboolean("DEBUG"),
|
||||||
|
@ -17,24 +17,29 @@ class AuditableMixin(object):
|
|||||||
if changed_state is None:
|
if changed_state is None:
|
||||||
changed_state = resource.history if action == ACTION_UPDATE else None
|
changed_state = resource.history if action == ACTION_UPDATE else None
|
||||||
|
|
||||||
audit_event = AuditEvent(
|
log_data = {
|
||||||
user_id=user_id,
|
"user_id": user_id,
|
||||||
portfolio_id=resource.portfolio_id,
|
"portfolio_id": resource.portfolio_id,
|
||||||
application_id=resource.application_id,
|
"application_id": resource.application_id,
|
||||||
resource_type=resource.resource_type,
|
"resource_type": resource.resource_type,
|
||||||
resource_id=resource.id,
|
"resource_id": resource.id,
|
||||||
display_name=resource.displayname,
|
"display_name": resource.displayname,
|
||||||
action=action,
|
"action": action,
|
||||||
changed_state=changed_state,
|
"changed_state": changed_state,
|
||||||
event_details=resource.event_details,
|
"event_details": resource.event_details,
|
||||||
)
|
}
|
||||||
|
|
||||||
app.logger.info(
|
app.logger.info(
|
||||||
"Audit Event {}".format(action),
|
"Audit Event {}".format(action),
|
||||||
extra={"audit_event": audit_event.log, "tags": ["audit_event", action]},
|
extra={
|
||||||
|
"audit_event": {key: str(value) for key, value in log_data.items()},
|
||||||
|
"tags": ["audit_event", action],
|
||||||
|
},
|
||||||
)
|
)
|
||||||
audit_event.save(connection)
|
|
||||||
return audit_event
|
if app.config.get("USE_AUDIT_LOG", False):
|
||||||
|
audit_event = AuditEvent(**log_data)
|
||||||
|
audit_event.save(connection)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def __declare_last__(cls):
|
def __declare_last__(cls):
|
||||||
|
@ -1,4 +1,11 @@
|
|||||||
from flask import Blueprint, render_template, redirect, url_for, request
|
from flask import (
|
||||||
|
Blueprint,
|
||||||
|
render_template,
|
||||||
|
redirect,
|
||||||
|
url_for,
|
||||||
|
request,
|
||||||
|
current_app as app,
|
||||||
|
)
|
||||||
from atst.domain.users import Users
|
from atst.domain.users import Users
|
||||||
from atst.domain.audit_log import AuditLog
|
from atst.domain.audit_log import AuditLog
|
||||||
from atst.domain.common import Paginator
|
from atst.domain.common import Paginator
|
||||||
@ -17,9 +24,12 @@ bp.context_processor(atat_context_processor)
|
|||||||
@bp.route("/activity-history")
|
@bp.route("/activity-history")
|
||||||
@user_can(Permissions.VIEW_AUDIT_LOG, message="view activity log")
|
@user_can(Permissions.VIEW_AUDIT_LOG, message="view activity log")
|
||||||
def activity_history():
|
def activity_history():
|
||||||
pagination_opts = Paginator.get_pagination_opts(request)
|
if app.config.get("USE_AUDIT_LOG", False):
|
||||||
audit_events = AuditLog.get_all_events(pagination_opts)
|
pagination_opts = Paginator.get_pagination_opts(request)
|
||||||
return render_template("audit_log/audit_log.html", audit_events=audit_events)
|
audit_events = AuditLog.get_all_events(pagination_opts)
|
||||||
|
return render_template("audit_log/audit_log.html", audit_events=audit_events)
|
||||||
|
else:
|
||||||
|
return redirect("/")
|
||||||
|
|
||||||
|
|
||||||
@bp.route("/ccpo-users")
|
@bp.route("/ccpo-users")
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
[default]
|
[default]
|
||||||
|
USE_AUDIT_LOG = false
|
||||||
CAC_URL = http://localhost:8000/login-redirect
|
CAC_URL = http://localhost:8000/login-redirect
|
||||||
CA_CHAIN = ssl/server-certs/ca-chain.pem
|
CA_CHAIN = ssl/server-certs/ca-chain.pem
|
||||||
CLASSIFIED = false
|
CLASSIFIED = false
|
||||||
|
@ -117,7 +117,7 @@
|
|||||||
|
|
||||||
<hr>
|
<hr>
|
||||||
|
|
||||||
{% if user_can(permissions.VIEW_APPLICATION_ACTIVITY_LOG) %}
|
{% if user_can(permissions.VIEW_APPLICATION_ACTIVITY_LOG) and config.get("USE_AUDIT_LOG", False) %}
|
||||||
{% include "fragments/audit_events_log.html" %}
|
{% include "fragments/audit_events_log.html" %}
|
||||||
{{ Pagination(audit_events, url=url_for('applications.settings', application_id=application.id)) }}
|
{{ Pagination(audit_events, url=url_for('applications.settings', application_id=application.id)) }}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
@ -63,8 +63,8 @@
|
|||||||
{% if user_can(permissions.VIEW_PORTFOLIO_USERS) %}
|
{% if user_can(permissions.VIEW_PORTFOLIO_USERS) %}
|
||||||
{% include "portfolios/fragments/portfolio_members.html" %}
|
{% include "portfolios/fragments/portfolio_members.html" %}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
|
||||||
{% if user_can(permissions.VIEW_PORTFOLIO_ACTIVITY_LOG) %}
|
{% if user_can(permissions.VIEW_PORTFOLIO_ACTIVITY_LOG) and config.get("USE_AUDIT_LOG", False) %}
|
||||||
{% include "fragments/audit_events_log.html" %}
|
{% include "fragments/audit_events_log.html" %}
|
||||||
{{ Pagination(audit_events, url_for('portfolios.admin', portfolio_id=portfolio.id)) }}
|
{{ Pagination(audit_events, url_for('portfolios.admin', portfolio_id=portfolio.id)) }}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
@ -38,6 +38,19 @@ def app(request):
|
|||||||
ctx.pop()
|
ctx.pop()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def skip_audit_log(request):
|
||||||
|
"""
|
||||||
|
Conditionally skip tests marked with 'audit_log' based on the
|
||||||
|
USE_AUDIT_LOG config value.
|
||||||
|
"""
|
||||||
|
config = make_config()
|
||||||
|
if request.node.get_closest_marker("audit_log"):
|
||||||
|
use_audit_log = config.get("USE_AUDIT_LOG", False)
|
||||||
|
if not use_audit_log:
|
||||||
|
pytest.skip("audit log feature flag disabled")
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
def no_debug_app(request):
|
def no_debug_app(request):
|
||||||
config = make_config(direct_config={"DEBUG": False})
|
config = make_config(direct_config={"DEBUG": False})
|
||||||
|
@ -29,6 +29,7 @@ def developer():
|
|||||||
return UserFactory.create()
|
return UserFactory.create()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_paginate_audit_log():
|
def test_paginate_audit_log():
|
||||||
user = UserFactory.create()
|
user = UserFactory.create()
|
||||||
for _ in range(100):
|
for _ in range(100):
|
||||||
@ -38,6 +39,7 @@ def test_paginate_audit_log():
|
|||||||
assert len(events) == 25
|
assert len(events) == 25
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_paginate_ws_audit_log():
|
def test_paginate_ws_audit_log():
|
||||||
portfolio = PortfolioFactory.create()
|
portfolio = PortfolioFactory.create()
|
||||||
application = ApplicationFactory.create(portfolio=portfolio)
|
application = ApplicationFactory.create(portfolio=portfolio)
|
||||||
@ -52,6 +54,7 @@ def test_paginate_ws_audit_log():
|
|||||||
assert len(events) == 25
|
assert len(events) == 25
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_portfolio_audit_log_only_includes_current_portfolio_events():
|
def test_portfolio_audit_log_only_includes_current_portfolio_events():
|
||||||
owner = UserFactory.create()
|
owner = UserFactory.create()
|
||||||
portfolio = PortfolioFactory.create(owner=owner)
|
portfolio = PortfolioFactory.create(owner=owner)
|
||||||
@ -69,6 +72,7 @@ def test_portfolio_audit_log_only_includes_current_portfolio_events():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_get_portfolio_events_includes_app_and_env_events():
|
def test_get_portfolio_events_includes_app_and_env_events():
|
||||||
owner = UserFactory.create()
|
owner = UserFactory.create()
|
||||||
# add portfolio level events
|
# add portfolio level events
|
||||||
@ -120,6 +124,7 @@ def test_get_application_events():
|
|||||||
assert "portfolio" not in resource_types
|
assert "portfolio" not in resource_types
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_get_all_includes_ccpo_user_changes():
|
def test_get_all_includes_ccpo_user_changes():
|
||||||
user = UserFactory.create()
|
user = UserFactory.create()
|
||||||
initial_audit_log = AuditLog.get_all_events()
|
initial_audit_log = AuditLog.get_all_events()
|
||||||
|
@ -148,6 +148,7 @@ def test_resend_invitation(session):
|
|||||||
assert second_invite.is_pending
|
assert second_invite.is_pending
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_audit_event_for_accepted_invite():
|
def test_audit_event_for_accepted_invite():
|
||||||
portfolio = PortfolioFactory.create()
|
portfolio = PortfolioFactory.create()
|
||||||
user = UserFactory.create()
|
user = UserFactory.create()
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
from atst.domain.application_roles import ApplicationRoles
|
from atst.domain.application_roles import ApplicationRoles
|
||||||
from atst.models import ApplicationRoleStatus
|
from atst.models import ApplicationRoleStatus
|
||||||
from atst.models import AuditEvent
|
from atst.models import AuditEvent
|
||||||
@ -34,6 +36,7 @@ def test_application_members_excludes_deleted(session):
|
|||||||
assert app.members[0].id == member_role.id
|
assert app.members[0].id == member_role.id
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_audit_event_for_application_deletion(session):
|
def test_audit_event_for_application_deletion(session):
|
||||||
app = ApplicationFactory.create()
|
app = ApplicationFactory.create()
|
||||||
app.deleted = True
|
app.deleted = True
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
from atst.domain.permission_sets import PermissionSets
|
from atst.domain.permission_sets import PermissionSets
|
||||||
from atst.domain.environment_roles import EnvironmentRoles
|
from atst.domain.environment_roles import EnvironmentRoles
|
||||||
from atst.models.audit_event import AuditEvent
|
from atst.models.audit_event import AuditEvent
|
||||||
@ -5,6 +7,7 @@ from atst.models.audit_event import AuditEvent
|
|||||||
from tests.factories import *
|
from tests.factories import *
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_has_application_role_history(session):
|
def test_has_application_role_history(session):
|
||||||
owner = UserFactory.create()
|
owner = UserFactory.create()
|
||||||
user = UserFactory.create()
|
user = UserFactory.create()
|
||||||
|
@ -32,6 +32,7 @@ def test_add_user_to_environment():
|
|||||||
assert developer in dev_environment.users
|
assert developer in dev_environment.users
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_audit_event_for_environment_deletion(session):
|
def test_audit_event_for_environment_deletion(session):
|
||||||
env = EnvironmentFactory.create(application=ApplicationFactory.create())
|
env = EnvironmentFactory.create(application=ApplicationFactory.create())
|
||||||
env.deleted = True
|
env.deleted = True
|
||||||
|
@ -12,6 +12,7 @@ from tests.factories import *
|
|||||||
from atst.domain.portfolio_roles import PortfolioRoles
|
from atst.domain.portfolio_roles import PortfolioRoles
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_has_no_portfolio_role_history(session):
|
def test_has_no_portfolio_role_history(session):
|
||||||
owner = UserFactory.create()
|
owner = UserFactory.create()
|
||||||
user = UserFactory.create()
|
user = UserFactory.create()
|
||||||
@ -29,6 +30,7 @@ def test_has_no_portfolio_role_history(session):
|
|||||||
assert not create_event.changed_state
|
assert not create_event.changed_state
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_has_portfolio_role_history(session):
|
def test_has_portfolio_role_history(session):
|
||||||
owner = UserFactory.create()
|
owner = UserFactory.create()
|
||||||
user = UserFactory.create()
|
user = UserFactory.create()
|
||||||
@ -56,6 +58,7 @@ def test_has_portfolio_role_history(session):
|
|||||||
assert set(new_state) == PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS
|
assert set(new_state) == PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_has_portfolio_status_history(session):
|
def test_has_portfolio_status_history(session):
|
||||||
owner = UserFactory.create()
|
owner = UserFactory.create()
|
||||||
user = UserFactory.create()
|
user = UserFactory.create()
|
||||||
@ -82,6 +85,7 @@ def test_has_portfolio_status_history(session):
|
|||||||
assert changed_events[0].changed_state["status"][1] == "active"
|
assert changed_events[0].changed_state["status"][1] == "active"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_has_no_env_role_history(session):
|
def test_has_no_env_role_history(session):
|
||||||
owner = UserFactory.create()
|
owner = UserFactory.create()
|
||||||
user = UserFactory.create()
|
user = UserFactory.create()
|
||||||
@ -104,6 +108,7 @@ def test_has_no_env_role_history(session):
|
|||||||
assert not create_event.changed_state
|
assert not create_event.changed_state
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_has_env_role_history(session):
|
def test_has_env_role_history(session):
|
||||||
user = UserFactory.create()
|
user = UserFactory.create()
|
||||||
application = ApplicationFactory.create()
|
application = ApplicationFactory.create()
|
||||||
|
@ -137,6 +137,7 @@ def post_url_assert_status(no_debug_client, user_session):
|
|||||||
|
|
||||||
|
|
||||||
# ccpo.activity_history
|
# ccpo.activity_history
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_atst_activity_history_access(get_url_assert_status):
|
def test_atst_activity_history_access(get_url_assert_status):
|
||||||
ccpo = user_with(PermissionSets.VIEW_AUDIT_LOG)
|
ccpo = user_with(PermissionSets.VIEW_AUDIT_LOG)
|
||||||
rando = user_with()
|
rando = user_with()
|
||||||
|
@ -18,12 +18,14 @@ def test_dollar_fomatter(input, expected):
|
|||||||
assert dollars(input) == expected
|
assert dollars(input) == expected
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_render_audit_event_with_known_resource_type():
|
def test_render_audit_event_with_known_resource_type():
|
||||||
event = AuditEvent(resource_type="user")
|
event = AuditEvent(resource_type="user")
|
||||||
result = renderAuditEvent(event)
|
result = renderAuditEvent(event)
|
||||||
assert "<article" in result
|
assert "<article" in result
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.audit_log
|
||||||
def test_render_audit_event_with_unknown_resource_type():
|
def test_render_audit_event_with_unknown_resource_type():
|
||||||
event = AuditEvent(resource_type="boat")
|
event = AuditEvent(resource_type="boat")
|
||||||
result = renderAuditEvent(event)
|
result = renderAuditEvent(event)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user