From 3a23c547238af9410042e09c34d6f822093649e9 Mon Sep 17 00:00:00 2001 From: dandds Date: Wed, 18 Sep 2019 11:12:48 -0400 Subject: [PATCH] Add a beat processing schedule for environment provisioning jobs. The beat schedule is set to once per minute for each of the three environment provisioning tasks. Adding a beat schedule surfaced two problems that are addressed here with the following changes: - Commit the SQLALchemy session in order to release the environment lock. Otherwise the change to the `claimed_until` field is not persisted. - Set `none_as_null` on the JSOB fields on the `Environment`. This avoids problems with querying on Postgres JSON fields that are empty. This also adds a small change to the development command for the Celery worker. Multiple child processes were executing the beat jobs, which lead to exceptions for environment locks and confusing log output. This contrains the dev command to a single Celery worker. --- atst/domain/environments.py | 8 ++++---- atst/models/environment.py | 4 ++-- atst/models/utils.py | 1 + atst/queue.py | 15 ++++++++++++++- script/dev_queue | 2 +- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/atst/domain/environments.py b/atst/domain/environments.py index 829fe3fa..ba723cc5 100644 --- a/atst/domain/environments.py +++ b/atst/domain/environments.py @@ -1,4 +1,4 @@ -from sqlalchemy import text, func, or_ +from sqlalchemy import func, or_ from sqlalchemy.orm.exc import NoResultFound from typing import List from uuid import UUID @@ -130,7 +130,7 @@ class Environments(object): results = ( cls.base_provision_query(now) .filter(Environment.cloud_id != None) - .filter(Environment.root_user_info == text("'null'")) + .filter(Environment.root_user_info == None) ).all() return [id_ for id_, in results] @@ -143,7 +143,7 @@ class Environments(object): results = ( cls.base_provision_query(now) .filter(Environment.cloud_id != None) - .filter(Environment.root_user_info != text("'null'")) - .filter(Environment.baseline_info == text("'null'")) + .filter(Environment.root_user_info != None) + .filter(Environment.baseline_info == None) ).all() return [id_ for id_, in results] diff --git a/atst/models/environment.py b/atst/models/environment.py index fb30e6d4..4fa11bfc 100644 --- a/atst/models/environment.py +++ b/atst/models/environment.py @@ -26,8 +26,8 @@ class Environment( creator = relationship("User") cloud_id = Column(String) - root_user_info = Column(JSONB) - baseline_info = Column(JSONB) + root_user_info = Column(JSONB(none_as_null=True)) + baseline_info = Column(JSONB(none_as_null=True)) claimed_until = Column(TIMESTAMP(timezone=True)) diff --git a/atst/models/utils.py b/atst/models/utils.py index a3df73e3..6059d33e 100644 --- a/atst/models/utils.py +++ b/atst/models/utils.py @@ -47,3 +47,4 @@ def claim_for_update(resource, minutes=30): db.session.query(Model).filter(Model.id == resource.id).filter( Model.claimed_until != None ).update({"claimed_until": None}, synchronize_session="fetch") + db.session.commit() diff --git a/atst/queue.py b/atst/queue.py index d71532b1..5fc12a3f 100644 --- a/atst/queue.py +++ b/atst/queue.py @@ -5,7 +5,20 @@ celery = Celery(__name__) def update_celery(celery, app): celery.conf.update(app.config) - celery.conf.CELERYBEAT_SCHEDULE = {} + celery.conf.CELERYBEAT_SCHEDULE = { + "beat-dispatch_create_environment": { + "task": "atst.jobs.dispatch_create_environment", + "schedule": 60, + }, + "beat-dispatch_create_atat_admin_user": { + "task": "atst.jobs.dispatch_create_atat_admin_user", + "schedule": 60, + }, + "beat-dispatch_create_environment_baseline": { + "task": "atst.jobs.dispatch_create_environment_baseline", + "schedule": 60, + }, + } class ContextTask(celery.Task): def __call__(self, *args, **kwargs): diff --git a/script/dev_queue b/script/dev_queue index 4e8720ee..ef0d5ccc 100755 --- a/script/dev_queue +++ b/script/dev_queue @@ -4,7 +4,7 @@ set -e -WORKER="pipenv run celery -A celery_worker.celery worker --loglevel=info -B" +WORKER="pipenv run celery -A celery_worker.celery worker --loglevel=info -B -c 1" if [[ `command -v entr` ]]; then find atst | entr -r $WORKER