wip for safety

This commit is contained in:
Montana 2018-11-16 14:18:18 -05:00
parent c071945b3b
commit 8b172ba3e2
4 changed files with 53 additions and 45 deletions

View File

@ -153,7 +153,10 @@ class WorkspaceRoles(object):
@classmethod
def enable(cls, workspace_role):
workspace_role.status = WorkspaceRoleStatus.ACTIVE
ws_role = WorkspaceRoles._get_workspace_role(
workspace_role.user, workspace_role.workspace_id
)
ws_role.status = WorkspaceRoleStatus.ACTIVE
db.session.add(workspace_role)
db.session.add(ws_role)
db.session.commit()

View File

@ -1,7 +1,5 @@
from sqlalchemy import event, inspect
from flask import g
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm.attributes import get_history
from atst.models.audit_event import AuditEvent
from atst.utils import camel_to_snake, getattr_path
@ -42,27 +40,6 @@ class AuditableMixin(object):
event.listen(cls, "after_delete", cls.audit_delete)
event.listen(cls, "after_update", cls.audit_update)
@staticmethod
def get_history(target):
"""
This function borrows largely from a gist:
https://gist.github.com/ngse/c20058116b8044c65d3fbceda3fdf423#file-audit_mixin-py-L106-L120
It returns a dictionary of the form {item: from_value},
where 'item' is the attribute on the target that has been updated
and 'from_value' is the value of the attribute before it was updated.
There may be more than one item in the dictionary, but that is not expected.
"""
previous_state = {}
inspr = inspect(target)
attrs = class_mapper(target.__class__).column_attrs
for attr in attrs:
history = getattr(inspr.attrs, attr.key).history
if history.has_changes():
previous_state[attr.key] = get_history(target, attr.key)[2].pop()
return previous_state
@staticmethod
def audit_insert(mapper, connection, target):
"""Listen for the `after_insert` event and create an AuditLog entry"""
@ -77,6 +54,26 @@ class AuditableMixin(object):
def audit_update(mapper, connection, target):
target.create_audit_event(connection, target, ACTION_UPDATE)
def get_changes(self):
"""
This function borrows largely from a gist:
https://gist.github.com/ngse/c20058116b8044c65d3fbceda3fdf423#file-audit_mixin-py-L106-L120
It returns a dictionary of the form {item: [from_value, to_value]},
where 'item' is the attribute on the target that has been updated,
'from_value' is the value of the attribute before it was updated,
and 'to_value' is the current value of the attribute.
There may be more than one item in the dictionary, but that is not expected.
"""
previous_state = {}
attrs = inspect(self).mapper.column_attrs
for attr in attrs:
history = getattr(inspect(self).attrs, attr.key).history
if history.has_changes():
previous_state[attr.key] = [history.deleted.pop(), history.added.pop()]
return previous_state
def auditable_changed_state(self):
return getattr_path(self, "history")

View File

@ -35,7 +35,9 @@ class WorkspaceRole(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
UUID(as_uuid=True), ForeignKey("users.id"), index=True, nullable=False
)
status = Column(SQLAEnum(Status, native_enum=False, default=Status.PENDING))
status = Column(
SQLAEnum(Status, native_enum=False, default=Status.PENDING), nullable=False
)
def __repr__(self):
return "<WorkspaceRole(role='{}', workspace='{}', user_id='{}', id='{}')>".format(
@ -44,15 +46,18 @@ class WorkspaceRole(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
@property
def history(self):
previous_state = mixins.AuditableMixin.get_history(self)
previous_state = self.get_changes()
change_set = {}
if "role_id" in previous_state:
from_role_id = previous_state["role_id"]
from_role_id = previous_state["role_id"][0]
from_role = db.session.query(Role).filter(Role.id == from_role_id).one()
to_role = self.role_name
change_set["role"] = [from_role.name, to_role]
if "status" in previous_state:
from_status = previous_state["status"].value
if previous_state["status"][0]:
from_status = previous_state["status"][0].value
else:
from_status = "pending"
to_status = self.status.value
change_set["status"] = [from_status, to_status]
return change_set

View File

@ -37,16 +37,19 @@ def test_has_role_history(session):
workspace = Workspaces.create(RequestFactory.create(creator=owner))
workspace_role = WorkspaceRoles.add(user, workspace.id, "developer")
# why not use workspace_role = WorkspaceRoleFactory.create()?
WorkspaceRoles.update_role(workspace_role, "admin")
changed_events = (
session.query(AuditEvent)
.filter(AuditEvent.resource_id == workspace_role.id, AuditEvent.changed_state != None)
.filter(
AuditEvent.resource_id == workspace_role.id,
AuditEvent.changed_state != None,
)
.all()
)
# changed_state["role"] returns a list [previous role, current role]
assert changed_events[0].changed_state["role"][0] == 'developer'
assert changed_events[0].changed_state["role"][1] == 'admin'
assert changed_events[0].changed_state["role"][0] == "developer"
assert changed_events[0].changed_state["role"][1] == "admin"
def test_has_status_history(session):
@ -55,22 +58,22 @@ def test_has_status_history(session):
workspace = Workspaces.create(RequestFactory.create(creator=owner))
workspace_role = WorkspaceRoles.add(user, workspace.id, "developer")
session.add(workspace_role)
session.commit()
workspace_role.status = Status.ACTIVE
session.add(workspace_role)
session.commit()
audit_events = (
import ipdb
ipdb.set_trace()
WorkspaceRoles.enable(workspace_role)
changed_events = (
session.query(AuditEvent)
.filter(AuditEvent.resource_id == workspace_role.id)
.filter(
AuditEvent.resource_id == workspace_role.id,
AuditEvent.changed_state != None,
)
.all()
)
changed_events = [event for event in audit_events if event.changed_state]
import ipdb; ipdb.set_trace()
assert changed_events[0].changed_state["status"][0]
assert changed_events[0].changed_state["status"][1]
# changed_state["status"] returns a list [previous status, current status]
assert changed_events[0].changed_state["status"][0] == "pending"
assert changed_events[0].changed_state["status"][1] == "active"
def test_event_details():