Merge pull request #473 from dod-ccpo/auditable-refactor

Better error handling for Auditable class
This commit is contained in:
montana-mil 2018-12-06 11:23:27 -05:00 committed by GitHub
commit d05fc24e6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 112 additions and 48 deletions

View File

@ -13,15 +13,13 @@ class AuditableMixin(object):
@staticmethod @staticmethod
def create_audit_event(connection, resource, action): def create_audit_event(connection, resource, action):
user_id = getattr_path(g, "current_user.id") user_id = getattr_path(g, "current_user.id")
workspace_id = resource.auditable_workspace_id() workspace_id = resource.workspace_id
request_id = resource.auditable_request_id() request_id = resource.request_id
resource_type = resource.auditable_resource_type() resource_type = resource.resource_type
display_name = resource.auditable_displayname() display_name = resource.displayname
event_details = resource.auditable_event_details() event_details = resource.event_details
changed_state = ( changed_state = resource.history if action == ACTION_UPDATE else None
resource.auditable_changed_state() if action == ACTION_UPDATE else None
)
audit_event = AuditEvent( audit_event = AuditEvent(
user_id=user_id, user_id=user_id,
@ -77,20 +75,26 @@ class AuditableMixin(object):
previous_state[attr.key] = [deleted, added] previous_state[attr.key] = [deleted, added]
return previous_state return previous_state
def auditable_changed_state(self): @property
return getattr_path(self, "history") def history(self):
return None
def auditable_event_details(self): @property
return getattr_path(self, "event_details") def event_details(self):
return None
def auditable_resource_type(self): @property
def resource_type(self):
return camel_to_snake(type(self).__name__) return camel_to_snake(type(self).__name__)
def auditable_workspace_id(self): @property
return getattr_path(self, "workspace_id") def workspace_id(self):
return None
def auditable_request_id(self): @property
return getattr_path(self, "request_id") def request_id(self):
return None
def auditable_displayname(self): @property
return getattr_path(self, "displayname") def displayname(self):
return None

View File

@ -43,7 +43,7 @@ class RequestStatusEvent(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
@property @property
def displayname(self): def displayname(self):
return self.new_status.value return self.new_status.value if self.new_status else None
@property @property
def log_name(self): def log_name(self):

View File

@ -44,10 +44,15 @@ def test_accept_invitation():
def test_accept_expired_invitation(): def test_accept_expired_invitation():
user = UserFactory.create() user = UserFactory.create()
workspace = WorkspaceFactory.create()
ws_role = WorkspaceRoleFactory.create(user=user, workspace=workspace)
increment = Invitations.EXPIRATION_LIMIT_MINUTES + 1 increment = Invitations.EXPIRATION_LIMIT_MINUTES + 1
expiration_time = datetime.datetime.now() - datetime.timedelta(minutes=increment) expiration_time = datetime.datetime.now() - datetime.timedelta(minutes=increment)
invite = InvitationFactory.create( invite = InvitationFactory.create(
user_id=user.id, expiration_time=expiration_time, status=Status.PENDING user=user,
expiration_time=expiration_time,
status=Status.PENDING,
workspace_role=ws_role,
) )
with pytest.raises(ExpiredError): with pytest.raises(ExpiredError):
Invitations.accept(user, invite.token) Invitations.accept(user, invite.token)
@ -57,30 +62,42 @@ def test_accept_expired_invitation():
def test_accept_rejected_invite(): def test_accept_rejected_invite():
user = UserFactory.create() user = UserFactory.create()
invite = InvitationFactory.create(user_id=user.id, status=Status.REJECTED_EXPIRED) workspace = WorkspaceFactory.create()
ws_role = WorkspaceRoleFactory.create(user=user, workspace=workspace)
invite = InvitationFactory.create(
user=user, status=Status.REJECTED_EXPIRED, workspace_role=ws_role
)
with pytest.raises(InvitationError): with pytest.raises(InvitationError):
Invitations.accept(user, invite.token) Invitations.accept(user, invite.token)
def test_accept_revoked_invite(): def test_accept_revoked_invite():
user = UserFactory.create() user = UserFactory.create()
invite = InvitationFactory.create(user_id=user.id, status=Status.REVOKED) workspace = WorkspaceFactory.create()
ws_role = WorkspaceRoleFactory.create(user=user, workspace=workspace)
invite = InvitationFactory.create(
user=user, status=Status.REVOKED, workspace_role=ws_role
)
with pytest.raises(InvitationError): with pytest.raises(InvitationError):
Invitations.accept(user, invite.token) Invitations.accept(user, invite.token)
def test_wrong_user_accepts_invitation(): def test_wrong_user_accepts_invitation():
user = UserFactory.create() user = UserFactory.create()
workspace = WorkspaceFactory.create()
ws_role = WorkspaceRoleFactory.create(user=user, workspace=workspace)
wrong_user = UserFactory.create() wrong_user = UserFactory.create()
invite = InvitationFactory.create(user_id=user.id) invite = InvitationFactory.create(user=user, workspace_role=ws_role)
with pytest.raises(WrongUserError): with pytest.raises(WrongUserError):
Invitations.accept(wrong_user, invite.token) Invitations.accept(wrong_user, invite.token)
def test_user_cannot_accept_invitation_accepted_by_wrong_user(): def test_user_cannot_accept_invitation_accepted_by_wrong_user():
user = UserFactory.create() user = UserFactory.create()
workspace = WorkspaceFactory.create()
ws_role = WorkspaceRoleFactory.create(user=user, workspace=workspace)
wrong_user = UserFactory.create() wrong_user = UserFactory.create()
invite = InvitationFactory.create(user_id=user.id) invite = InvitationFactory.create(user=user, workspace_role=ws_role)
with pytest.raises(WrongUserError): with pytest.raises(WrongUserError):
Invitations.accept(wrong_user, invite.token) Invitations.accept(wrong_user, invite.token)
with pytest.raises(InvitationError): with pytest.raises(InvitationError):

View File

@ -2,22 +2,44 @@ import pytest
import datetime import datetime
from atst.models.invitation import Invitation, Status from atst.models.invitation import Invitation, Status
from atst.models.workspace_role import Status as WorkspaceRoleStatus
from tests.factories import InvitationFactory from tests.factories import (
InvitationFactory,
WorkspaceFactory,
UserFactory,
WorkspaceRoleFactory,
)
def test_expired_invite_is_not_revokable(): def test_expired_invite_is_not_revokable():
workspace = WorkspaceFactory.create()
user = UserFactory.create()
ws_role = WorkspaceRoleFactory.create(
workspace=workspace, user=user, status=WorkspaceRoleStatus.PENDING
)
invite = InvitationFactory.create( invite = InvitationFactory.create(
expiration_time=datetime.datetime.now() - datetime.timedelta(minutes=60) expiration_time=datetime.datetime.now() - datetime.timedelta(minutes=60),
workspace_role=ws_role,
) )
assert not invite.is_revokable assert not invite.is_revokable
def test_unexpired_invite_is_revokable(): def test_unexpired_invite_is_revokable():
invite = InvitationFactory.create() workspace = WorkspaceFactory.create()
user = UserFactory.create()
ws_role = WorkspaceRoleFactory.create(
workspace=workspace, user=user, status=WorkspaceRoleStatus.PENDING
)
invite = InvitationFactory.create(workspace_role=ws_role)
assert invite.is_revokable assert invite.is_revokable
def test_invite_is_not_revokable_if_invite_is_not_pending(): def test_invite_is_not_revokable_if_invite_is_not_pending():
invite = InvitationFactory.create(status=Status.ACCEPTED) workspace = WorkspaceFactory.create()
user = UserFactory.create()
ws_role = WorkspaceRoleFactory.create(
workspace=workspace, user=user, status=WorkspaceRoleStatus.PENDING
)
invite = InvitationFactory.create(workspace_role=ws_role, status=Status.ACCEPTED)
assert not invite.is_revokable assert not invite.is_revokable

View File

@ -7,6 +7,7 @@ from atst.models.workspace_role import Status
from atst.models.role import Role from atst.models.role import Role
from atst.models.invitation import Status as InvitationStatus from atst.models.invitation import Status as InvitationStatus
from atst.models.audit_event import AuditEvent from atst.models.audit_event import AuditEvent
from atst.models.workspace_role import Status as WorkspaceRoleStatus
from tests.factories import ( from tests.factories import (
RequestFactory, RequestFactory,
UserFactory, UserFactory,
@ -196,43 +197,63 @@ def test_status_when_member_is_active():
def test_status_when_invitation_has_been_rejected_for_expirations(): def test_status_when_invitation_has_been_rejected_for_expirations():
workspace = WorkspaceFactory.create()
user = UserFactory.create()
workspace_role = WorkspaceRoleFactory.create( workspace_role = WorkspaceRoleFactory.create(
invitations=[InvitationFactory.create(status=InvitationStatus.REJECTED_EXPIRED)] workspace=workspace, user=user, status=WorkspaceRoleStatus.PENDING
)
invitation = InvitationFactory.create(
workspace_role=workspace_role, status=InvitationStatus.REJECTED_EXPIRED
) )
assert workspace_role.display_status == "Invite expired" assert workspace_role.display_status == "Invite expired"
def test_status_when_invitation_has_been_rejected_for_wrong_user(): def test_status_when_invitation_has_been_rejected_for_wrong_user():
workspace = WorkspaceFactory.create()
user = UserFactory.create()
workspace_role = WorkspaceRoleFactory.create( workspace_role = WorkspaceRoleFactory.create(
invitations=[ workspace=workspace, user=user, status=WorkspaceRoleStatus.PENDING
InvitationFactory.create(status=InvitationStatus.REJECTED_WRONG_USER) )
] invitation = InvitationFactory.create(
workspace_role=workspace_role, status=InvitationStatus.REJECTED_WRONG_USER
) )
assert workspace_role.display_status == "Error on invite" assert workspace_role.display_status == "Error on invite"
def test_status_when_invitation_is_expired(): def test_status_when_invitation_is_expired():
workspace = WorkspaceFactory.create()
user = UserFactory.create()
workspace_role = WorkspaceRoleFactory.create( workspace_role = WorkspaceRoleFactory.create(
invitations=[ workspace=workspace, user=user, status=WorkspaceRoleStatus.PENDING
InvitationFactory.create( )
status=InvitationStatus.PENDING, invitation = InvitationFactory.create(
expiration_time=datetime.datetime.now() - datetime.timedelta(seconds=1), workspace_role=workspace_role,
) status=InvitationStatus.PENDING,
] expiration_time=datetime.datetime.now() - datetime.timedelta(seconds=1),
) )
assert workspace_role.display_status == "Invite expired" assert workspace_role.display_status == "Invite expired"
def test_can_not_resend_invitation_if_active(): def test_can_not_resend_invitation_if_active():
workspace = WorkspaceFactory.create()
user = UserFactory.create()
workspace_role = WorkspaceRoleFactory.create( workspace_role = WorkspaceRoleFactory.create(
invitations=[InvitationFactory.create(status=InvitationStatus.ACCEPTED)] workspace=workspace, user=user, status=WorkspaceRoleStatus.PENDING
)
invitation = InvitationFactory.create(
workspace_role=workspace_role, status=InvitationStatus.ACCEPTED
) )
assert not workspace_role.can_resend_invitation assert not workspace_role.can_resend_invitation
def test_can_resend_invitation_if_expired(): def test_can_resend_invitation_if_expired():
workspace = WorkspaceFactory.create()
user = UserFactory.create()
workspace_role = WorkspaceRoleFactory.create( workspace_role = WorkspaceRoleFactory.create(
invitations=[InvitationFactory.create(status=InvitationStatus.REJECTED_EXPIRED)] workspace=workspace, user=user, status=WorkspaceRoleStatus.PENDING
)
invitation = InvitationFactory.create(
workspace_role=workspace_role, status=InvitationStatus.REJECTED_EXPIRED
) )
assert workspace_role.can_resend_invitation assert workspace_role.can_resend_invitation

View File

@ -19,7 +19,7 @@ def test_existing_member_accepts_valid_invite(client, user_session):
ws_role = WorkspaceRoleFactory.create( ws_role = WorkspaceRoleFactory.create(
workspace=workspace, user=user, status=WorkspaceRoleStatus.PENDING workspace=workspace, user=user, status=WorkspaceRoleStatus.PENDING
) )
invite = InvitationFactory.create(user_id=user.id, workspace_role_id=ws_role.id) invite = InvitationFactory.create(user_id=user.id, workspace_role=ws_role)
# the user does not have access to the workspace before accepting the invite # the user does not have access to the workspace before accepting the invite
assert len(Workspaces.for_user(user)) == 0 assert len(Workspaces.for_user(user)) == 0
@ -76,7 +76,7 @@ def test_member_accepts_invalid_invite(client, user_session):
) )
invite = InvitationFactory.create( invite = InvitationFactory.create(
user_id=user.id, user_id=user.id,
workspace_role_id=ws_role.id, workspace_role=ws_role,
status=InvitationStatus.REJECTED_WRONG_USER, status=InvitationStatus.REJECTED_WRONG_USER,
) )
user_session(user) user_session(user)
@ -109,7 +109,7 @@ def test_user_accepts_invite_with_wrong_dod_id(client, user_session):
ws_role = WorkspaceRoleFactory.create( ws_role = WorkspaceRoleFactory.create(
user=user, workspace=workspace, status=WorkspaceRoleStatus.PENDING user=user, workspace=workspace, status=WorkspaceRoleStatus.PENDING
) )
invite = InvitationFactory.create(user_id=user.id, workspace_role_id=ws_role.id) invite = InvitationFactory.create(user_id=user.id, workspace_role=ws_role)
user_session(different_user) user_session(different_user)
response = client.get(url_for("workspaces.accept_invitation", token=invite.token)) response = client.get(url_for("workspaces.accept_invitation", token=invite.token))
@ -124,7 +124,7 @@ def test_user_accepts_expired_invite(client, user_session):
) )
invite = InvitationFactory.create( invite = InvitationFactory.create(
user_id=user.id, user_id=user.id,
workspace_role_id=ws_role.id, workspace_role=ws_role,
status=InvitationStatus.REJECTED_EXPIRED, status=InvitationStatus.REJECTED_EXPIRED,
expiration_time=datetime.datetime.now() - datetime.timedelta(seconds=1), expiration_time=datetime.datetime.now() - datetime.timedelta(seconds=1),
) )
@ -142,7 +142,7 @@ def test_revoke_invitation(client, user_session):
) )
invite = InvitationFactory.create( invite = InvitationFactory.create(
user_id=user.id, user_id=user.id,
workspace_role_id=ws_role.id, workspace_role=ws_role,
status=InvitationStatus.REJECTED_EXPIRED, status=InvitationStatus.REJECTED_EXPIRED,
expiration_time=datetime.datetime.now() - datetime.timedelta(seconds=1), expiration_time=datetime.datetime.now() - datetime.timedelta(seconds=1),
) )
@ -166,7 +166,7 @@ def test_resend_invitation_sends_email(client, user_session, queue):
user=user, workspace=workspace, status=WorkspaceRoleStatus.PENDING user=user, workspace=workspace, status=WorkspaceRoleStatus.PENDING
) )
invite = InvitationFactory.create( invite = InvitationFactory.create(
user_id=user.id, workspace_role_id=ws_role.id, status=InvitationStatus.PENDING user_id=user.id, workspace_role=ws_role, status=InvitationStatus.PENDING
) )
user_session(workspace.owner) user_session(workspace.owner)
client.post( client.post(
@ -190,7 +190,7 @@ def test_existing_member_invite_resent_to_email_submitted_in_form(
) )
invite = InvitationFactory.create( invite = InvitationFactory.create(
user_id=user.id, user_id=user.id,
workspace_role_id=ws_role.id, workspace_role=ws_role,
status=InvitationStatus.PENDING, status=InvitationStatus.PENDING,
email="example@example.com", email="example@example.com",
) )