Add a beat processing schedule for environment provisioning jobs.
The beat schedule is set to once per minute for each of the three environment provisioning tasks. Adding a beat schedule surfaced two problems that are addressed here with the following changes: - Commit the SQLALchemy session in order to release the environment lock. Otherwise the change to the `claimed_until` field is not persisted. - Set `none_as_null` on the JSOB fields on the `Environment`. This avoids problems with querying on Postgres JSON fields that are empty. This also adds a small change to the development command for the Celery worker. Multiple child processes were executing the beat jobs, which lead to exceptions for environment locks and confusing log output. This contrains the dev command to a single Celery worker.
This commit is contained in:
parent
71befc96ef
commit
3a23c54723
@ -1,4 +1,4 @@
|
||||
from sqlalchemy import text, func, or_
|
||||
from sqlalchemy import func, or_
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
from typing import List
|
||||
from uuid import UUID
|
||||
@ -130,7 +130,7 @@ class Environments(object):
|
||||
results = (
|
||||
cls.base_provision_query(now)
|
||||
.filter(Environment.cloud_id != None)
|
||||
.filter(Environment.root_user_info == text("'null'"))
|
||||
.filter(Environment.root_user_info == None)
|
||||
).all()
|
||||
return [id_ for id_, in results]
|
||||
|
||||
@ -143,7 +143,7 @@ class Environments(object):
|
||||
results = (
|
||||
cls.base_provision_query(now)
|
||||
.filter(Environment.cloud_id != None)
|
||||
.filter(Environment.root_user_info != text("'null'"))
|
||||
.filter(Environment.baseline_info == text("'null'"))
|
||||
.filter(Environment.root_user_info != None)
|
||||
.filter(Environment.baseline_info == None)
|
||||
).all()
|
||||
return [id_ for id_, in results]
|
||||
|
@ -26,8 +26,8 @@ class Environment(
|
||||
creator = relationship("User")
|
||||
|
||||
cloud_id = Column(String)
|
||||
root_user_info = Column(JSONB)
|
||||
baseline_info = Column(JSONB)
|
||||
root_user_info = Column(JSONB(none_as_null=True))
|
||||
baseline_info = Column(JSONB(none_as_null=True))
|
||||
|
||||
claimed_until = Column(TIMESTAMP(timezone=True))
|
||||
|
||||
|
@ -47,3 +47,4 @@ def claim_for_update(resource, minutes=30):
|
||||
db.session.query(Model).filter(Model.id == resource.id).filter(
|
||||
Model.claimed_until != None
|
||||
).update({"claimed_until": None}, synchronize_session="fetch")
|
||||
db.session.commit()
|
||||
|
@ -5,7 +5,20 @@ celery = Celery(__name__)
|
||||
|
||||
def update_celery(celery, app):
|
||||
celery.conf.update(app.config)
|
||||
celery.conf.CELERYBEAT_SCHEDULE = {}
|
||||
celery.conf.CELERYBEAT_SCHEDULE = {
|
||||
"beat-dispatch_create_environment": {
|
||||
"task": "atst.jobs.dispatch_create_environment",
|
||||
"schedule": 60,
|
||||
},
|
||||
"beat-dispatch_create_atat_admin_user": {
|
||||
"task": "atst.jobs.dispatch_create_atat_admin_user",
|
||||
"schedule": 60,
|
||||
},
|
||||
"beat-dispatch_create_environment_baseline": {
|
||||
"task": "atst.jobs.dispatch_create_environment_baseline",
|
||||
"schedule": 60,
|
||||
},
|
||||
}
|
||||
|
||||
class ContextTask(celery.Task):
|
||||
def __call__(self, *args, **kwargs):
|
||||
|
@ -4,7 +4,7 @@
|
||||
|
||||
set -e
|
||||
|
||||
WORKER="pipenv run celery -A celery_worker.celery worker --loglevel=info -B"
|
||||
WORKER="pipenv run celery -A celery_worker.celery worker --loglevel=info -B -c 1"
|
||||
|
||||
if [[ `command -v entr` ]]; then
|
||||
find atst | entr -r $WORKER
|
||||
|
Loading…
x
Reference in New Issue
Block a user