127 lines
3.7 KiB
Python
127 lines
3.7 KiB
Python
import datetime
|
|
from sqlalchemy.orm.exc import NoResultFound
|
|
|
|
from atst.database import db
|
|
from atst.models.invitation import Invitation, Status as InvitationStatus
|
|
from atst.domain.workspace_roles import WorkspaceRoles
|
|
from atst.domain.authz import Authorization, Permissions
|
|
from atst.domain.workspaces import Workspaces
|
|
|
|
from .exceptions import NotFoundError
|
|
|
|
|
|
class WrongUserError(Exception):
|
|
def __init__(self, user, invite):
|
|
self.user = user
|
|
self.invite = invite
|
|
|
|
@property
|
|
def message(self):
|
|
return "User {} with DOD ID {} does not match expected DOD ID {} for invitation {}".format(
|
|
self.user.id, self.user.dod_id, self.invite.user.dod_id, self.invite.id
|
|
)
|
|
|
|
|
|
class ExpiredError(Exception):
|
|
def __init__(self, invite):
|
|
self.invite = invite
|
|
|
|
@property
|
|
def message(self):
|
|
return "Invitation {} has expired.".format(self.invite.id)
|
|
|
|
|
|
class InvitationError(Exception):
|
|
def __init__(self, invite):
|
|
self.invite = invite
|
|
|
|
@property
|
|
def message(self):
|
|
return "{} has a status of {}".format(self.invite.id, self.invite.status.value)
|
|
|
|
|
|
class Invitations(object):
|
|
# number of minutes a given invitation is considered valid
|
|
EXPIRATION_LIMIT_MINUTES = 360
|
|
|
|
@classmethod
|
|
def _get(cls, token):
|
|
try:
|
|
invite = db.session.query(Invitation).filter_by(token=token).one()
|
|
except NoResultFound:
|
|
raise NotFoundError("invite")
|
|
|
|
return invite
|
|
|
|
@classmethod
|
|
def create(cls, inviter, workspace_role, email):
|
|
invite = Invitation(
|
|
workspace_role=workspace_role,
|
|
inviter=inviter,
|
|
user=workspace_role.user,
|
|
status=InvitationStatus.PENDING,
|
|
expiration_time=Invitations.current_expiration_time(),
|
|
email=email,
|
|
)
|
|
db.session.add(invite)
|
|
db.session.commit()
|
|
|
|
return invite
|
|
|
|
@classmethod
|
|
def accept(cls, user, token):
|
|
invite = Invitations._get(token)
|
|
|
|
if invite.user.dod_id != user.dod_id:
|
|
if invite.is_pending:
|
|
Invitations._update_status(invite, InvitationStatus.REJECTED_WRONG_USER)
|
|
raise WrongUserError(user, invite)
|
|
|
|
elif invite.is_expired:
|
|
Invitations._update_status(invite, InvitationStatus.REJECTED_EXPIRED)
|
|
raise ExpiredError(invite)
|
|
|
|
elif invite.is_accepted or invite.is_revoked or invite.is_rejected:
|
|
raise InvitationError(invite)
|
|
|
|
elif invite.is_pending:
|
|
Invitations._update_status(invite, InvitationStatus.ACCEPTED)
|
|
WorkspaceRoles.enable(invite.workspace_role)
|
|
return invite
|
|
|
|
@classmethod
|
|
def current_expiration_time(cls):
|
|
return datetime.datetime.now() + datetime.timedelta(
|
|
minutes=Invitations.EXPIRATION_LIMIT_MINUTES
|
|
)
|
|
|
|
@classmethod
|
|
def _update_status(cls, invite, new_status):
|
|
invite.status = new_status
|
|
db.session.add(invite)
|
|
db.session.commit()
|
|
|
|
return invite
|
|
|
|
@classmethod
|
|
def revoke(cls, token):
|
|
invite = Invitations._get(token)
|
|
return Invitations._update_status(invite, InvitationStatus.REVOKED)
|
|
|
|
@classmethod
|
|
def resend(cls, user, workspace_id, token):
|
|
workspace = Workspaces.get(user, workspace_id)
|
|
Authorization.check_workspace_permission(
|
|
user,
|
|
workspace,
|
|
Permissions.ASSIGN_AND_UNASSIGN_ATAT_ROLE,
|
|
"resend a workspace invitation",
|
|
)
|
|
|
|
previous_invitation = Invitations._get(token)
|
|
Invitations._update_status(previous_invitation, InvitationStatus.REVOKED)
|
|
|
|
return Invitations.create(
|
|
user, previous_invitation.workspace_role, previous_invitation.email
|
|
)
|