Convert Users, Roles, WorkspaceUsers, TaskOrders to use classmethods
This commit is contained in:
parent
7b5d76e260
commit
b72a16569f
@ -1,20 +1,21 @@
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from atst.database import db
|
||||
from atst.models import Role
|
||||
from .exceptions import NotFoundError
|
||||
|
||||
|
||||
class Roles(object):
|
||||
def __init__(self, db_session):
|
||||
self.db_session = db_session
|
||||
|
||||
def get(self, role_name):
|
||||
@classmethod
|
||||
def get(cls, role_name):
|
||||
try:
|
||||
role = self.db_session.query(Role).filter_by(name=role_name).one()
|
||||
role = db.session.query(Role).filter_by(name=role_name).one()
|
||||
except NoResultFound:
|
||||
raise NotFoundError("role")
|
||||
|
||||
return role
|
||||
|
||||
def get_all(self):
|
||||
return self.db_session.query(Role).all()
|
||||
@classmethod
|
||||
def get_all(cls):
|
||||
return db.session.query(Role).all()
|
||||
|
@ -1,17 +1,17 @@
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from atst.database import db
|
||||
from atst.models.task_order import TaskOrder
|
||||
from .exceptions import NotFoundError
|
||||
|
||||
|
||||
class TaskOrders(object):
|
||||
def __init__(self, db_session):
|
||||
self.db_session = db_session
|
||||
|
||||
@classmethod
|
||||
def get(self, order_number):
|
||||
try:
|
||||
task_order = (
|
||||
self.db_session.query(TaskOrder).filter_by(number=order_number).one()
|
||||
db.session.query(TaskOrder).filter_by(number=order_number).one()
|
||||
)
|
||||
except NoResultFound:
|
||||
raise NotFoundError("task_order")
|
||||
|
@ -1,6 +1,7 @@
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from atst.database import db
|
||||
from atst.models import User
|
||||
|
||||
from .roles import Roles
|
||||
@ -8,47 +9,48 @@ from .exceptions import NotFoundError, AlreadyExistsError
|
||||
|
||||
|
||||
class Users(object):
|
||||
def __init__(self, db_session):
|
||||
self.db_session = db_session
|
||||
self.roles_repo = Roles(db_session)
|
||||
|
||||
def get(self, user_id):
|
||||
@classmethod
|
||||
def get(cls, user_id):
|
||||
try:
|
||||
user = self.db_session.query(User).filter_by(id=user_id).one()
|
||||
user = db.session.query(User).filter_by(id=user_id).one()
|
||||
except NoResultFound:
|
||||
raise NotFoundError("user")
|
||||
|
||||
return user
|
||||
|
||||
@classmethod
|
||||
def create(self, user_id, atat_role_name):
|
||||
atat_role = self.roles_repo.get(atat_role_name)
|
||||
atat_role = Roles.get(atat_role_name)
|
||||
|
||||
try:
|
||||
user = User(id=user_id, atat_role=atat_role)
|
||||
self.db_session.add(user)
|
||||
self.db_session.commit()
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
except IntegrityError:
|
||||
raise AlreadyExistsError("user")
|
||||
|
||||
return user
|
||||
|
||||
def get_or_create(self, user_id, *args, **kwargs):
|
||||
@classmethod
|
||||
def get_or_create(cls, user_id, *args, **kwargs):
|
||||
try:
|
||||
user = self.get(user_id)
|
||||
user = Users.get(user_id)
|
||||
except NotFoundError:
|
||||
user = self.create(user_id, *args, **kwargs)
|
||||
self.db_session.add(user)
|
||||
self.db_session.commit()
|
||||
user = Users.create(user_id, *args, **kwargs)
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
|
||||
return user
|
||||
|
||||
@classmethod
|
||||
def update(self, user_id, atat_role_name):
|
||||
|
||||
user = self.get(user_id)
|
||||
atat_role = self.roles_repo.get(atat_role_name)
|
||||
user = Users.get(user_id)
|
||||
atat_role = Roles.get(atat_role_name)
|
||||
user.atat_role = atat_role
|
||||
|
||||
self.db_session.add(user)
|
||||
self.db_session.commit()
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
|
||||
return user
|
||||
|
@ -1,22 +1,21 @@
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from atst.database import db
|
||||
from atst.models.workspace_role import WorkspaceRole
|
||||
from atst.models.workspace_user import WorkspaceUser
|
||||
from atst.models.user import User
|
||||
|
||||
from .roles import Roles
|
||||
from .users import Users
|
||||
from .exceptions import NotFoundError
|
||||
|
||||
|
||||
class WorkspaceUsers(object):
|
||||
def __init__(self, db_session):
|
||||
self.db_session = db_session
|
||||
self.roles_repo = Roles(db_session)
|
||||
self.users_repo = Users(db_session)
|
||||
|
||||
def get(self, workspace_id, user_id):
|
||||
@classmethod
|
||||
def get(cls, workspace_id, user_id):
|
||||
try:
|
||||
user = self.users_repo.get(user_id)
|
||||
user = Users.get(user_id)
|
||||
except NoResultFound:
|
||||
raise NotFoundError("user")
|
||||
|
||||
@ -31,24 +30,25 @@ class WorkspaceUsers(object):
|
||||
|
||||
return WorkspaceUser(user, workspace_role)
|
||||
|
||||
def add_many(self, workspace_id, workspace_user_dicts):
|
||||
@classmethod
|
||||
def add_many(cls, workspace_id, workspace_user_dicts):
|
||||
workspace_users = []
|
||||
|
||||
for user_dict in workspace_user_dicts:
|
||||
try:
|
||||
user = self.users_repo.get(user_dict["id"])
|
||||
user = Users.get(user_dict["id"])
|
||||
except NoResultFound:
|
||||
default_role = self.roles_repo.get("developer")
|
||||
default_role = Roles.get("developer")
|
||||
user = User(id=user_dict["id"], atat_role=default_role)
|
||||
|
||||
try:
|
||||
role = self.roles_repo.get(user_dict["workspace_role"])
|
||||
role = Roles.get(user_dict["workspace_role"])
|
||||
except NoResultFound:
|
||||
raise NotFoundError("role")
|
||||
|
||||
try:
|
||||
existing_workspace_role = (
|
||||
self.db_session.query(WorkspaceRole)
|
||||
db.session.query(WorkspaceRole)
|
||||
.filter(
|
||||
WorkspaceRole.user == user,
|
||||
WorkspaceRole.workspace_id == workspace_id,
|
||||
@ -66,8 +66,8 @@ class WorkspaceUsers(object):
|
||||
workspace_user = WorkspaceUser(user, new_workspace_role)
|
||||
workspace_users.append(workspace_user)
|
||||
|
||||
self.db_session.add(user)
|
||||
db.session.add(user)
|
||||
|
||||
self.db_session.commit()
|
||||
db.session.commit()
|
||||
|
||||
return workspace_users
|
||||
|
@ -3,21 +3,16 @@ from atst.domain.roles import Roles
|
||||
from atst.domain.exceptions import NotFoundError
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def roles_repo(session):
|
||||
return Roles(session)
|
||||
|
||||
|
||||
def test_get_all_roles(roles_repo):
|
||||
roles = roles_repo.get_all()
|
||||
def test_get_all_roles():
|
||||
roles = Roles.get_all()
|
||||
assert roles
|
||||
|
||||
|
||||
def test_get_existing_role(roles_repo):
|
||||
role = roles_repo.get("developer")
|
||||
def test_get_existing_role():
|
||||
role = Roles.get("developer")
|
||||
assert role.name == "developer"
|
||||
|
||||
|
||||
def test_get_nonexistent_role(roles_repo):
|
||||
def test_get_nonexistent_role():
|
||||
with pytest.raises(NotFoundError):
|
||||
roles_repo.get("nonexistent")
|
||||
Roles.get("nonexistent")
|
||||
|
@ -6,10 +6,6 @@ from atst.domain.task_orders import TaskOrders
|
||||
from tests.factories import TaskOrderFactory
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def task_orders(session):
|
||||
return TaskOrders(session)
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def new_task_order(session):
|
||||
def make_task_order(**kwargs):
|
||||
@ -22,13 +18,13 @@ def new_task_order(session):
|
||||
return make_task_order
|
||||
|
||||
|
||||
def test_can_get_task_order(task_orders, new_task_order):
|
||||
def test_can_get_task_order(new_task_order):
|
||||
new_to = new_task_order(number="0101969F")
|
||||
to = task_orders.get(new_to.number)
|
||||
to = TaskOrders.get(new_to.number)
|
||||
|
||||
assert to.id == to.id
|
||||
|
||||
|
||||
def test_nonexistent_task_order_raises(task_orders):
|
||||
def test_nonexistent_task_order_raises():
|
||||
with pytest.raises(NotFoundError):
|
||||
task_orders.get("some fake number")
|
||||
TaskOrders.get("some fake number")
|
||||
|
@ -5,69 +5,64 @@ from atst.domain.users import Users
|
||||
from atst.domain.exceptions import NotFoundError, AlreadyExistsError
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def users_repo(session):
|
||||
return Users(session)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def user_id():
|
||||
return uuid4()
|
||||
|
||||
|
||||
def test_create_user(users_repo, user_id):
|
||||
user = users_repo.create(user_id, "developer")
|
||||
def test_create_user(user_id):
|
||||
user = Users.create(user_id, "developer")
|
||||
assert user.id == user_id
|
||||
|
||||
|
||||
def test_create_user_with_nonexistent_role(users_repo, user_id):
|
||||
def test_create_user_with_nonexistent_role(user_id):
|
||||
with pytest.raises(NotFoundError):
|
||||
users_repo.create(user_id, "nonexistent")
|
||||
Users.create(user_id, "nonexistent")
|
||||
|
||||
|
||||
def test_create_already_existing_user(users_repo, user_id):
|
||||
users_repo.create(user_id, "developer")
|
||||
def test_create_already_existing_user(user_id):
|
||||
Users.create(user_id, "developer")
|
||||
with pytest.raises(AlreadyExistsError):
|
||||
users_repo.create(user_id, "developer")
|
||||
Users.create(user_id, "developer")
|
||||
|
||||
|
||||
def test_get_or_create_nonexistent_user(users_repo, user_id):
|
||||
user = users_repo.get_or_create(user_id, atat_role_name="developer")
|
||||
def test_get_or_create_nonexistent_user(user_id):
|
||||
user = Users.get_or_create(user_id, atat_role_name="developer")
|
||||
assert user.id == user_id
|
||||
|
||||
|
||||
def test_get_or_create_existing_user(users_repo, user_id):
|
||||
users_repo.get_or_create(user_id, atat_role_name="developer")
|
||||
user = users_repo.get_or_create(user_id, atat_role_name="developer")
|
||||
def test_get_or_create_existing_user(user_id):
|
||||
Users.get_or_create(user_id, atat_role_name="developer")
|
||||
user = Users.get_or_create(user_id, atat_role_name="developer")
|
||||
assert user
|
||||
|
||||
|
||||
def test_get_user(users_repo, user_id):
|
||||
users_repo.create(user_id, "developer")
|
||||
user = users_repo.get(user_id)
|
||||
def test_get_user(user_id):
|
||||
Users.create(user_id, "developer")
|
||||
user = Users.get(user_id)
|
||||
assert user.id == user_id
|
||||
|
||||
|
||||
def test_get_nonexistent_user(users_repo, user_id):
|
||||
users_repo.create(user_id, "developer")
|
||||
def test_get_nonexistent_user(user_id):
|
||||
Users.create(user_id, "developer")
|
||||
with pytest.raises(NotFoundError):
|
||||
users_repo.get(uuid4())
|
||||
Users.get(uuid4())
|
||||
|
||||
|
||||
def test_update_user(users_repo, user_id):
|
||||
users_repo.create(user_id, "developer")
|
||||
updated_user = users_repo.update(user_id, "ccpo")
|
||||
def test_update_user(user_id):
|
||||
Users.create(user_id, "developer")
|
||||
updated_user = Users.update(user_id, "ccpo")
|
||||
|
||||
assert updated_user.atat_role.name == "ccpo"
|
||||
|
||||
|
||||
def test_update_nonexistent_user(users_repo, user_id):
|
||||
users_repo.create(user_id, "developer")
|
||||
def test_update_nonexistent_user(user_id):
|
||||
Users.create(user_id, "developer")
|
||||
with pytest.raises(NotFoundError):
|
||||
users_repo.update(uuid4(), "ccpo")
|
||||
Users.update(uuid4(), "ccpo")
|
||||
|
||||
|
||||
def test_update_existing_user_with_nonexistent_role(users_repo, user_id):
|
||||
users_repo.create(user_id, "developer")
|
||||
def test_update_existing_user_with_nonexistent_role(user_id):
|
||||
Users.create(user_id, "developer")
|
||||
with pytest.raises(NotFoundError):
|
||||
users_repo.update(user_id, "nonexistent")
|
||||
Users.update(user_id, "nonexistent")
|
||||
|
@ -5,37 +5,27 @@ from atst.domain.workspace_users import WorkspaceUsers
|
||||
from atst.domain.users import Users
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def users_repo(session):
|
||||
return Users(session)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def workspace_users_repo(session):
|
||||
return WorkspaceUsers(session)
|
||||
|
||||
|
||||
def test_can_create_new_workspace_user(users_repo, workspace_users_repo):
|
||||
def test_can_create_new_workspace_user():
|
||||
workspace_id = uuid4()
|
||||
user = users_repo.create(uuid4(), "developer")
|
||||
user = Users.create(uuid4(), "developer")
|
||||
|
||||
workspace_user_dicts = [{"id": user.id, "workspace_role": "owner"}]
|
||||
|
||||
workspace_users = workspace_users_repo.add_many(workspace_id, workspace_user_dicts)
|
||||
workspace_users = WorkspaceUsers.add_many(workspace_id, workspace_user_dicts)
|
||||
|
||||
assert workspace_users[0].user.id == user.id
|
||||
assert workspace_users[0].user.atat_role.name == "developer"
|
||||
assert workspace_users[0].workspace_role.role.name == "owner"
|
||||
|
||||
|
||||
def test_can_update_existing_workspace_user(users_repo, workspace_users_repo):
|
||||
def test_can_update_existing_workspace_user():
|
||||
workspace_id = uuid4()
|
||||
user = users_repo.create(uuid4(), "developer")
|
||||
user = Users.create(uuid4(), "developer")
|
||||
|
||||
workspace_users_repo.add_many(
|
||||
WorkspaceUsers.add_many(
|
||||
workspace_id, [{"id": user.id, "workspace_role": "owner"}]
|
||||
)
|
||||
workspace_users = workspace_users_repo.add_many(
|
||||
workspace_users = WorkspaceUsers.add_many(
|
||||
workspace_id, [{"id": user.id, "workspace_role": "developer"}]
|
||||
)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user