From 8b172ba3e2665c337a708a5be6dd59ea2f9582a7 Mon Sep 17 00:00:00 2001 From: Montana Date: Fri, 16 Nov 2018 14:18:18 -0500 Subject: [PATCH] wip for safety --- atst/domain/workspace_roles.py | 7 +++-- atst/models/mixins/auditable.py | 43 ++++++++++++++--------------- atst/models/workspace_role.py | 13 ++++++--- tests/models/test_workspace_role.py | 35 ++++++++++++----------- 4 files changed, 53 insertions(+), 45 deletions(-) diff --git a/atst/domain/workspace_roles.py b/atst/domain/workspace_roles.py index 9da3224c..60b3f4c4 100644 --- a/atst/domain/workspace_roles.py +++ b/atst/domain/workspace_roles.py @@ -153,7 +153,10 @@ class WorkspaceRoles(object): @classmethod def enable(cls, workspace_role): - workspace_role.status = WorkspaceRoleStatus.ACTIVE + ws_role = WorkspaceRoles._get_workspace_role( + workspace_role.user, workspace_role.workspace_id + ) + ws_role.status = WorkspaceRoleStatus.ACTIVE - db.session.add(workspace_role) + db.session.add(ws_role) db.session.commit() diff --git a/atst/models/mixins/auditable.py b/atst/models/mixins/auditable.py index 3019fe7f..8aee817a 100644 --- a/atst/models/mixins/auditable.py +++ b/atst/models/mixins/auditable.py @@ -1,7 +1,5 @@ from sqlalchemy import event, inspect from flask import g -from sqlalchemy.orm import class_mapper -from sqlalchemy.orm.attributes import get_history from atst.models.audit_event import AuditEvent from atst.utils import camel_to_snake, getattr_path @@ -42,27 +40,6 @@ class AuditableMixin(object): event.listen(cls, "after_delete", cls.audit_delete) event.listen(cls, "after_update", cls.audit_update) - @staticmethod - def get_history(target): - """ - This function borrows largely from a gist: - https://gist.github.com/ngse/c20058116b8044c65d3fbceda3fdf423#file-audit_mixin-py-L106-L120 - - It returns a dictionary of the form {item: from_value}, - where 'item' is the attribute on the target that has been updated - and 'from_value' is the value of the attribute before it was updated. - - There may be more than one item in the dictionary, but that is not expected. - """ - previous_state = {} - inspr = inspect(target) - attrs = class_mapper(target.__class__).column_attrs - for attr in attrs: - history = getattr(inspr.attrs, attr.key).history - if history.has_changes(): - previous_state[attr.key] = get_history(target, attr.key)[2].pop() - return previous_state - @staticmethod def audit_insert(mapper, connection, target): """Listen for the `after_insert` event and create an AuditLog entry""" @@ -77,6 +54,26 @@ class AuditableMixin(object): def audit_update(mapper, connection, target): target.create_audit_event(connection, target, ACTION_UPDATE) + def get_changes(self): + """ + This function borrows largely from a gist: + https://gist.github.com/ngse/c20058116b8044c65d3fbceda3fdf423#file-audit_mixin-py-L106-L120 + + It returns a dictionary of the form {item: [from_value, to_value]}, + where 'item' is the attribute on the target that has been updated, + 'from_value' is the value of the attribute before it was updated, + and 'to_value' is the current value of the attribute. + + There may be more than one item in the dictionary, but that is not expected. + """ + previous_state = {} + attrs = inspect(self).mapper.column_attrs + for attr in attrs: + history = getattr(inspect(self).attrs, attr.key).history + if history.has_changes(): + previous_state[attr.key] = [history.deleted.pop(), history.added.pop()] + return previous_state + def auditable_changed_state(self): return getattr_path(self, "history") diff --git a/atst/models/workspace_role.py b/atst/models/workspace_role.py index 671271c2..5fd85807 100644 --- a/atst/models/workspace_role.py +++ b/atst/models/workspace_role.py @@ -35,7 +35,9 @@ class WorkspaceRole(Base, mixins.TimestampsMixin, mixins.AuditableMixin): UUID(as_uuid=True), ForeignKey("users.id"), index=True, nullable=False ) - status = Column(SQLAEnum(Status, native_enum=False, default=Status.PENDING)) + status = Column( + SQLAEnum(Status, native_enum=False, default=Status.PENDING), nullable=False + ) def __repr__(self): return "".format( @@ -44,15 +46,18 @@ class WorkspaceRole(Base, mixins.TimestampsMixin, mixins.AuditableMixin): @property def history(self): - previous_state = mixins.AuditableMixin.get_history(self) + previous_state = self.get_changes() change_set = {} if "role_id" in previous_state: - from_role_id = previous_state["role_id"] + from_role_id = previous_state["role_id"][0] from_role = db.session.query(Role).filter(Role.id == from_role_id).one() to_role = self.role_name change_set["role"] = [from_role.name, to_role] if "status" in previous_state: - from_status = previous_state["status"].value + if previous_state["status"][0]: + from_status = previous_state["status"][0].value + else: + from_status = "pending" to_status = self.status.value change_set["status"] = [from_status, to_status] return change_set diff --git a/tests/models/test_workspace_role.py b/tests/models/test_workspace_role.py index d0b19c0c..b36d4b33 100644 --- a/tests/models/test_workspace_role.py +++ b/tests/models/test_workspace_role.py @@ -37,16 +37,19 @@ def test_has_role_history(session): workspace = Workspaces.create(RequestFactory.create(creator=owner)) workspace_role = WorkspaceRoles.add(user, workspace.id, "developer") + # why not use workspace_role = WorkspaceRoleFactory.create()? WorkspaceRoles.update_role(workspace_role, "admin") changed_events = ( session.query(AuditEvent) - .filter(AuditEvent.resource_id == workspace_role.id, AuditEvent.changed_state != None) + .filter( + AuditEvent.resource_id == workspace_role.id, + AuditEvent.changed_state != None, + ) .all() ) - # changed_state["role"] returns a list [previous role, current role] - assert changed_events[0].changed_state["role"][0] == 'developer' - assert changed_events[0].changed_state["role"][1] == 'admin' + assert changed_events[0].changed_state["role"][0] == "developer" + assert changed_events[0].changed_state["role"][1] == "admin" def test_has_status_history(session): @@ -55,22 +58,22 @@ def test_has_status_history(session): workspace = Workspaces.create(RequestFactory.create(creator=owner)) workspace_role = WorkspaceRoles.add(user, workspace.id, "developer") - session.add(workspace_role) - session.commit() - workspace_role.status = Status.ACTIVE - session.add(workspace_role) - session.commit() - audit_events = ( + import ipdb + + ipdb.set_trace() + WorkspaceRoles.enable(workspace_role) + changed_events = ( session.query(AuditEvent) - .filter(AuditEvent.resource_id == workspace_role.id) + .filter( + AuditEvent.resource_id == workspace_role.id, + AuditEvent.changed_state != None, + ) .all() ) - changed_events = [event for event in audit_events if event.changed_state] - import ipdb; ipdb.set_trace() - - assert changed_events[0].changed_state["status"][0] - assert changed_events[0].changed_state["status"][1] + # changed_state["status"] returns a list [previous status, current status] + assert changed_events[0].changed_state["status"][0] == "pending" + assert changed_events[0].changed_state["status"][1] == "active" def test_event_details():