Files
atst/atst/domain/invitations.py
dandds 4f304d747e Small tweaks for adding a new application member:
- raise specific invitation type if invite not found in invitation domain classes
- more terse assignments of defaults in invitation service, smh
- terser margin expression for inline input fields
- sass formatting
- use translation for cancel link
- oxford comma for app team management permission explanation
- do not format environment roles with hyphens for role selection
- generalize some additional methods in the invitation domain base class
- use plain atst.models import path
2019-04-30 17:14:58 -04:00

138 lines
3.9 KiB
Python

import datetime
from sqlalchemy.orm.exc import NoResultFound
from atst.database import db
from atst.models import ApplicationInvitation, InvitationStatus, PortfolioInvitation
from atst.domain.portfolio_roles import PortfolioRoles
from atst.domain.application_roles import ApplicationRoles
from .exceptions import NotFoundError
class WrongUserError(Exception):
def __init__(self, user, invite):
self.user = user
self.invite = invite
@property
def message(self):
return "User {} with DOD ID {} does not match expected DOD ID {} for invitation {}".format(
self.user.id, self.user.dod_id, self.invite.user.dod_id, self.invite.id
)
class ExpiredError(Exception):
def __init__(self, invite):
self.invite = invite
@property
def message(self):
return "Invitation {} has expired.".format(self.invite.id)
class InvitationError(Exception):
def __init__(self, invite):
self.invite = invite
@property
def message(self):
return "{} has a status of {}".format(self.invite.id, self.invite.status.value)
class BaseInvitations(object):
model = None
role_domain_class = None
# number of minutes a given invitation is considered valid
EXPIRATION_LIMIT_MINUTES = 360
@classmethod
def _get(cls, token):
try:
invite = db.session.query(cls.model).filter_by(token=token).one()
except NoResultFound:
raise NotFoundError(cls.model.__tablename__)
return invite
@classmethod
def create(cls, inviter, role, email):
# pylint: disable=not-callable
invite = cls.model(
role=role,
inviter=inviter,
user=role.user,
status=InvitationStatus.PENDING,
expiration_time=cls.current_expiration_time(),
email=email,
)
db.session.add(invite)
db.session.commit()
return invite
@classmethod
def accept(cls, user, token):
invite = cls._get(token)
if invite.user.dod_id != user.dod_id:
if invite.is_pending:
cls._update_status(invite, InvitationStatus.REJECTED_WRONG_USER)
raise WrongUserError(user, invite)
elif invite.is_expired:
cls._update_status(invite, InvitationStatus.REJECTED_EXPIRED)
raise ExpiredError(invite)
elif invite.is_accepted or invite.is_revoked or invite.is_rejected:
raise InvitationError(invite)
elif invite.is_pending: # pragma: no branch
cls._update_status(invite, InvitationStatus.ACCEPTED)
cls.role_domain_class.enable(invite.role)
return invite
@classmethod
def current_expiration_time(cls):
return datetime.datetime.now() + datetime.timedelta(
minutes=cls.EXPIRATION_LIMIT_MINUTES
)
@classmethod
def _update_status(cls, invite, new_status):
invite.status = new_status
db.session.add(invite)
db.session.commit()
return invite
@classmethod
def revoke(cls, token):
invite = cls._get(token)
return cls._update_status(invite, InvitationStatus.REVOKED)
@classmethod
def lookup_by_resource_and_user(cls, resource, user):
role = cls.role_domain_class.get(resource.id, user.id)
if role.latest_invitation is None:
raise NotFoundError(cls.model.__tablename__)
return role.latest_invitation
@classmethod
def resend(cls, user, token):
previous_invitation = cls._get(token)
cls._update_status(previous_invitation, InvitationStatus.REVOKED)
return cls.create(user, previous_invitation.role, previous_invitation.email)
class PortfolioInvitations(BaseInvitations):
model = PortfolioInvitation
role_domain_class = PortfolioRoles
class ApplicationInvitations(BaseInvitations):
model = ApplicationInvitation
role_domain_class = ApplicationRoles