Add audit event attributes to workspace role model

This commit is contained in:
Montana 2018-11-12 11:43:35 -05:00
parent ae3113f011
commit 3b6d745955
3 changed files with 74 additions and 12 deletions

View File

@ -1,6 +1,7 @@
from sqlalchemy import String, Column, ForeignKey, inspect
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.dialects.postgresql import UUID, JSON
from sqlalchemy.orm import relationship
from atst.database import db
from atst.models import Base, types
from atst.models.mixins.timestamps import TimestampsMixin
@ -20,11 +21,15 @@ class AuditEvent(Base, TimestampsMixin):
request_id = Column(UUID(as_uuid=True), ForeignKey("requests.id"), index=True)
request = relationship("Request", backref="audit_events")
changed_state = Column(JSON())
event_details = Column(JSON())
resource_type = Column(String(), nullable=False)
resource_id = Column(UUID(as_uuid=True), index=True, nullable=False)
display_name = Column(String())
action = Column(String(), nullable=False)
def save(self, connection):
attrs = inspect(self).dict

View File

@ -3,6 +3,8 @@ from flask import g
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm.attributes import get_history
from atst.database import db
from atst.models.workspace_role import WorkspaceRole
from atst.models.audit_event import AuditEvent
from atst.utils import camel_to_snake, getattr_path
@ -11,15 +13,48 @@ ACTION_UPDATE = "update"
ACTION_DELETE = "delete"
class AuditableWorkspaceRole(AuditableMixin):
target = target
changed_state = get_changed_state()
event_details = get_event_details()
@classmethod
def get_changed_state(self, resource):
previous_state = {}
inspr = inspect(target)
attrs = class_mapper(target.__class__).column_attrs
for attr in attrs:
history = getattr(inspr.attrs, attr.key).history
if history.has_changes():
previous_state[attr.key] = get_history(target, attr.key)[2].pop()
from_role = previous_role["role"]
to_role = target.role_displayname
return {"role": [from_role, to_role]}
@classmethod
def get_event_details(self):
# get user that is being updated
# does this need to query the db?
ws_role_id = target.auditable_request_id()
ws_role = (
db.session.query(WorkspaceRole)
.filter(WorkspaceRole.id == ws_role_id)
.first()
)
return {"user": ws_role.user_name}
class AuditableMixin(object):
@staticmethod
def create_audit_event(connection, resource, action, previous_state=None):
def create_audit_event(connection, resource, action):
user_id = getattr_path(g, "current_user.id")
workspace_id = resource.auditable_workspace_id()
request_id = resource.auditable_request_id()
resource_type = resource.auditable_resource_type()
display_name = resource.auditable_displayname()
previous_role_id = previous_state["role_id"]
changed_state = resource.auditable_changed_state()
event_details = resource.auditable_event_details()
audit_event = AuditEvent(
user_id=user_id,
@ -29,7 +64,8 @@ class AuditableMixin(object):
resource_id=resource.id,
display_name=display_name,
action=action,
previous_role_id=previous_role_id,
changed_state=changed_state,
event_details=event_details,
)
audit_event.save(connection)
@ -40,6 +76,17 @@ class AuditableMixin(object):
event.listen(cls, "after_delete", cls.audit_delete)
event.listen(cls, "after_update", cls.audit_update)
@staticmethod
def get_history(target):
previous_state = {}
inspr = inspect(target)
attrs = class_mapper(target.__class__).column_attrs
for attr in attrs:
history = getattr(inspr.attrs, attr.key).history
if history.has_changes():
previous_state[attr.key] = get_history(target, attr.key)[2].pop()
return previous_state
@staticmethod
def audit_insert(mapper, connection, target):
"""Listen for the `after_insert` event and create an AuditLog entry"""
@ -52,14 +99,13 @@ class AuditableMixin(object):
@staticmethod
def audit_update(mapper, connection, target):
previous_state = {}
inspr = inspect(target)
attrs = class_mapper(target.__class__).column_attrs
for attr in attrs:
history = getattr(inspr.attrs, attr.key).history
if history.has_changes():
previous_state[attr.key] = get_history(target, attr.key)[2].pop()
target.create_audit_event(connection, target, ACTION_UPDATE, previous_state)
target.create_audit_event(connection, target, ACTION_UPDATE)
def auditable_changed_state(self):
return getattr_path(self, "history")
def auditable_event_details(self):
return getattr_path(self, "auditable_event_details")
def auditable_resource_type(self):
return camel_to_snake(type(self).__name__)

View File

@ -41,6 +41,17 @@ class WorkspaceRole(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
self.role.name, self.workspace.name, self.user_id, self.id
)
@property
def history(self):
previous_state = AuditableMixin.get_history(self)
from_role = previous_role["role"]
to_role = self.role_displayname
return {"role": [from_role, to_role]}
@property
def auditable_event_details(self):
return {"updated_user": self.user_name, "updated_user_id" = self.user_id}
@property
def latest_invitation(self):
if self.invitations: