Merge pull request #453 from dod-ccpo/audit-log-env-role-updates

Add Audit Log events for environment role updates
This commit is contained in:
montana-mil 2018-11-27 15:55:49 -05:00 committed by GitHub
commit d2a033a58a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 93 additions and 17 deletions

View File

@ -10,7 +10,7 @@ class CSPRole(Enum):
NONSENSE_ROLE = "nonsense_role" NONSENSE_ROLE = "nonsense_role"
class EnvironmentRole(Base, mixins.TimestampsMixin): class EnvironmentRole(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
__tablename__ = "environment_roles" __tablename__ = "environment_roles"
id = types.Id() id = types.Id()
@ -29,6 +29,23 @@ class EnvironmentRole(Base, mixins.TimestampsMixin):
self.role, self.user.full_name, self.environment.name, self.id self.role, self.user.full_name, self.environment.name, self.id
) )
@property
def history(self):
return self.get_changes()
@property
def event_details(self):
return {
"updated_user_name": self.user.displayname,
"updated_user_id": str(self.user_id),
"environment": self.environment.displayname,
"environment_id": str(self.environment_id),
"project": self.environment.project.name,
"project_id": str(self.environment.project_id),
"workspace": self.environment.project.workspace.name,
"workspace_id": str(self.environment.project.workspace.id),
}
Index( Index(
"environments_role_user_environment", "environments_role_user_environment",

View File

@ -17,9 +17,12 @@ class AuditableMixin(object):
request_id = resource.auditable_request_id() request_id = resource.auditable_request_id()
resource_type = resource.auditable_resource_type() resource_type = resource.auditable_resource_type()
display_name = resource.auditable_displayname() display_name = resource.auditable_displayname()
changed_state = resource.auditable_changed_state()
event_details = resource.auditable_event_details() event_details = resource.auditable_event_details()
changed_state = (
resource.auditable_changed_state() if action == ACTION_UPDATE else None
)
audit_event = AuditEvent( audit_event = AuditEvent(
user_id=user_id, user_id=user_id,
workspace_id=workspace_id, workspace_id=workspace_id,
@ -52,14 +55,12 @@ class AuditableMixin(object):
@staticmethod @staticmethod
def audit_update(mapper, connection, target): def audit_update(mapper, connection, target):
target.create_audit_event(connection, target, ACTION_UPDATE) if AuditableMixin.get_changes(target):
target.create_audit_event(connection, target, ACTION_UPDATE)
def get_changes(self): def get_changes(self):
""" """
This function borrows largely from a gist: This function returns a dictionary of the form {item: [from_value, to_value]},
https://gist.github.com/ngse/c20058116b8044c65d3fbceda3fdf423#file-audit_mixin-py-L106-L120
It returns a dictionary of the form {item: [from_value, to_value]},
where 'item' is the attribute on the target that has been updated, where 'item' is the attribute on the target that has been updated,
'from_value' is the value of the attribute before it was updated, 'from_value' is the value of the attribute before it was updated,
and 'to_value' is the current value of the attribute. and 'to_value' is the current value of the attribute.
@ -71,7 +72,9 @@ class AuditableMixin(object):
for attr in attrs: for attr in attrs:
history = getattr(inspect(self).attrs, attr.key).history history = getattr(inspect(self).attrs, attr.key).history
if history.has_changes(): if history.has_changes():
previous_state[attr.key] = [history.deleted.pop(), history.added.pop()] deleted = history.deleted.pop() if history.deleted else None
added = history.added.pop() if history.added else None
previous_state[attr.key] = [deleted, added]
return previous_state return previous_state
def auditable_changed_state(self): def auditable_changed_state(self):

View File

@ -30,12 +30,16 @@
{% if event.event_details %} {% if event.event_details %}
for User <code>{{ event.event_details.updated_user_id }}</code> ({{ event.event_details.updated_user_name }}) for User <code>{{ event.event_details.updated_user_id }}</code> ({{ event.event_details.updated_user_name }})
<br>
{% endif %}
{% if event.changed_state %} {% if event.event_details["environment"] %}
from {{ event.changed_state.role[0] }} to {{ event.changed_state.role[1] }} <br>
<br> in Environment <code>{{ event.event_details["environment_id"] }}</code> ({{ event.event_details["environment"] }})
<br>
in Project <code>{{ event.event_details["project_id"] }}</code> ({{ event.event_details["project"] }})
<br>
in Workspace <code>{{ event.event_details["workspace_id"] }}</code> ({{ event.event_details["workspace"] }})
{% endif %}
<br>
{% endif %} {% endif %}
{% if event.workspace %} {% if event.workspace %}
@ -43,6 +47,11 @@
{% elif event.request %} {% elif event.request %}
on Request <code>{{ event.request_id }}</code> ({{ event.request.displayname }}) on Request <code>{{ event.request_id }}</code> ({{ event.request.displayname }})
{% endif %} {% endif %}
{% if event.changed_state.role %}
from {{ event.changed_state.role[0] }} to {{ event.changed_state.role[1] }}
<br>
{% endif %}
</div> </div>
</article> </article>
</li> </li>

View File

@ -134,7 +134,8 @@ def test_update_workspace_role_role(workspace, workspace_owner):
"workspace_role": "developer", "workspace_role": "developer",
"dod_id": "1234567890", "dod_id": "1234567890",
} }
member = Workspaces.create_member(workspace_owner, workspace, user_data) WorkspaceRoleFactory._meta.sqlalchemy_session_persistence = "flush"
member = WorkspaceRoleFactory.create(workspace=workspace)
role_name = "admin" role_name = "admin"
updated_member = Workspaces.update_member( updated_member = Workspaces.update_member(

View File

@ -13,10 +13,13 @@ from tests.factories import (
UserFactory, UserFactory,
InvitationFactory, InvitationFactory,
WorkspaceRoleFactory, WorkspaceRoleFactory,
EnvironmentFactory,
EnvironmentRoleFactory,
ProjectFactory,
) )
def test_has_no_history(session): def test_has_no_ws_role_history(session):
owner = UserFactory.create() owner = UserFactory.create()
user = UserFactory.create() user = UserFactory.create()
@ -33,7 +36,7 @@ def test_has_no_history(session):
assert not create_event.changed_state assert not create_event.changed_state
def test_has_role_history(session): def test_has_ws_role_history(session):
owner = UserFactory.create() owner = UserFactory.create()
user = UserFactory.create() user = UserFactory.create()
@ -58,7 +61,7 @@ def test_has_role_history(session):
assert changed_events[0].changed_state["role"][1] == "admin" assert changed_events[0].changed_state["role"][1] == "admin"
def test_has_status_history(session): def test_has_ws_status_history(session):
owner = UserFactory.create() owner = UserFactory.create()
user = UserFactory.create() user = UserFactory.create()
@ -81,6 +84,49 @@ def test_has_status_history(session):
assert changed_events[0].changed_state["status"][1] == "active" assert changed_events[0].changed_state["status"][1] == "active"
def test_has_no_env_role_history(session):
owner = UserFactory.create()
user = UserFactory.create()
workspace = Workspaces.create(RequestFactory.create(creator=owner))
project = ProjectFactory.create(workspace=workspace)
environment = EnvironmentFactory.create(project=project, name="new environment!")
env_role = EnvironmentRoleFactory.create(
user=user, environment=environment, role="developer"
)
create_event = (
session.query(AuditEvent)
.filter(AuditEvent.resource_id == env_role.id, AuditEvent.action == "create")
.one()
)
assert not create_event.changed_state
def test_has_env_role_history(session):
owner = UserFactory.create()
user = UserFactory.create()
workspace = Workspaces.create(RequestFactory.create(creator=owner))
workspace_role = WorkspaceRoleFactory.create(workspace=workspace, user=user)
project = ProjectFactory.create(workspace=workspace)
environment = EnvironmentFactory.create(project=project, name="new environment!")
env_role = EnvironmentRoleFactory.create(
user=user, environment=environment, role="developer"
)
Environments.update_environment_roles(
owner, workspace, workspace_role, [{"role": "admin", "id": environment.id}]
)
changed_events = (
session.query(AuditEvent)
.filter(AuditEvent.resource_id == env_role.id, AuditEvent.action == "update")
.all()
)
# changed_state["role"] returns a list [previous role, current role]
assert changed_events[0].changed_state["role"][0] == "developer"
assert changed_events[0].changed_state["role"][1] == "admin"
def test_event_details(): def test_event_details():
owner = UserFactory.create() owner = UserFactory.create()
user = UserFactory.create() user = UserFactory.create()