import authz domain module

This commit is contained in:
dandds 2018-07-30 11:44:56 -04:00
parent 8c75a5239d
commit 06c2c205a1
4 changed files with 155 additions and 0 deletions

19
atst/domain/roles.py Normal file
View File

@ -0,0 +1,19 @@
from sqlalchemy.orm.exc import NoResultFound
from atst.models import Role
from .exceptions import NotFoundError
class Roles(object):
@classmethod
def get(cls, role_name):
try:
role = Role.query.filter_by(name=role_name).one()
except NoResultFound:
raise NotFoundError("role")
return role
@classmethod
def get_all(cls):
return Role.query.all()

58
atst/domain/users.py Normal file
View File

@ -0,0 +1,58 @@
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.exc import IntegrityError
from atst.models import User
from .roles import Roles
from .exceptions import NotFoundError, AlreadyExistsError
class Users(object):
def __init__(self, db_session):
self.db_session = db_session
def get(self, user_id):
try:
user = User.query.filter_by(id=user_id).one()
except NoResultFound:
raise NotFoundError("user")
return user
def create(self, user_id, atat_role_name):
atat_role = Roles.get(atat_role_name)
try:
user = User(id=user_id, atat_role=atat_role)
self.db_session.add(user)
self.db_session.commit()
except IntegrityError:
raise AlreadyExistsError("user")
return user
def get_or_create(self, user_id, *args, **kwargs):
created = False
try:
user = Users.get(user_id)
except NotFoundError:
user = Users.create(user_id, *args, **kwargs)
self.db_session.add(user)
self.db_session.commit()
created = True
return user, created
def update(self, user_id, atat_role_name):
user = Users.get(user_id)
atat_role = Roles.get(atat_role_name)
user.atat_role = atat_role
self.db_session.add(user)
self.db_session.commit()
return user

View File

@ -0,0 +1,64 @@
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.dialects.postgresql import insert
from atst.models import User, WorkspaceRole, Role
from .exceptions import NotFoundError
class WorkspaceUsers(object):
def __init__(self, db_session):
self.db_session = db_session
def get(self, workspace_id, user_id):
try:
user = User.query.filter_by(id=user_id).one()
except NoResultFound:
raise NotFoundError("user")
try:
workspace_role = (
WorkspaceRole.query.join(User)
.filter(User.id == user_id, WorkspaceRole.workspace_id == workspace_id)
.one()
)
except NoResultFound:
workspace_role = None
return WorkspaceUser(user, workspace_role)
def add_many(self, workspace_id, workspace_user_dicts):
workspace_users = []
for user_dict in workspace_user_dicts:
try:
user = User.query.filter_by(id=user_dict["id"]).one()
except NoResultFound:
default_role = Role.query.filter_by(name="developer").one_or_none()
user = User(id=user_dict["id"], atat_role=default_role)
try:
role = Role.query.filter_by(name=user_dict["workspace_role"]).one()
except NoResultFound:
raise NotFoundError("role")
try:
existing_workspace_role = WorkspaceRole.query.filter(
WorkspaceRole.user == user,
WorkspaceRole.workspace_id == workspace_id,
).one()
new_workspace_role = existing_workspace_role
new_workspace_role.role = role
except NoResultFound:
new_workspace_role = WorkspaceRole(
user=user, role_id=role.id, workspace_id=workspace_id
)
user.workspace_roles.append(new_workspace_role)
workspace_user = WorkspaceUser(user, new_workspace_role)
workspace_users.append(workspace_user)
self.db_session.add(user)
self.db_session.commit()
return workspace_users

View File

@ -0,0 +1,14 @@
class WorkspaceUser(object):
def __init__(self, user, workspace_role):
self.user = user
self.workspace_role = workspace_role
def permissions(self):
atat_permissions = set(self.user.atat_role.permissions)
workspace_permissions = (
[] if self.workspace_role is None else self.workspace_role.role.permissions
)
return set(workspace_permissions).union(atat_permissions)
def workspace_id(self):
return self.workspace_role.workspace_id