Merge pull request #1041 from dod-ccpo/use-celery
Use Celery instead of RQ.
This commit is contained in:
@@ -25,10 +25,10 @@ from atst.domain.authz import Authorization
|
||||
from atst.domain.csp import make_csp_provider
|
||||
from atst.domain.portfolios import Portfolios
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.queue import celery, update_celery
|
||||
from atst.utils import mailer
|
||||
from atst.utils.form_cache import FormCache
|
||||
from atst.utils.json import CustomJSONEncoder
|
||||
from atst.queue import queue
|
||||
from atst.utils.notification_sender import NotificationSender
|
||||
from atst.utils.session_limiter import SessionLimiter
|
||||
|
||||
@@ -59,13 +59,14 @@ def make_app(config):
|
||||
app.config.update(config)
|
||||
app.config.update({"SESSION_REDIS": app.redis})
|
||||
|
||||
update_celery(celery, app)
|
||||
|
||||
make_flask_callbacks(app)
|
||||
register_filters(app)
|
||||
make_csp_provider(app, config.get("CSP", "mock"))
|
||||
make_crl_validator(app)
|
||||
make_mailer(app)
|
||||
make_notification_sender(app)
|
||||
queue.init_app(app)
|
||||
|
||||
db.init_app(app)
|
||||
csrf.init_app(app)
|
||||
@@ -149,6 +150,7 @@ def map_config(config):
|
||||
return {
|
||||
**config["default"],
|
||||
"ENV": config["default"]["ENVIRONMENT"],
|
||||
"BROKER_URL": config["default"]["REDIS_URI"],
|
||||
"DEBUG": config["default"].getboolean("DEBUG"),
|
||||
"SQLALCHEMY_ECHO": config["default"].getboolean("SQLALCHEMY_ECHO"),
|
||||
"CLASSIFIED": config["default"].getboolean("CLASSIFIED"),
|
||||
@@ -248,7 +250,7 @@ def make_mailer(app):
|
||||
|
||||
|
||||
def make_notification_sender(app):
|
||||
app.notification_sender = NotificationSender(queue)
|
||||
app.notification_sender = NotificationSender()
|
||||
|
||||
|
||||
def make_session_limiter(app, session, config):
|
||||
|
18
atst/jobs.py
Normal file
18
atst/jobs.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from flask import current_app as app
|
||||
|
||||
from atst.queue import celery
|
||||
|
||||
|
||||
@celery.task()
|
||||
def send_mail(recipients, subject, body):
|
||||
app.mailer.send(recipients, subject, body)
|
||||
|
||||
|
||||
@celery.task()
|
||||
def send_notification_mail(recipients, subject, body):
|
||||
app.logger.info(
|
||||
"Sending a notification to these recipients: {}\n\nSubject: {}\n\n{}".format(
|
||||
recipients, subject, body
|
||||
)
|
||||
)
|
||||
app.mailer.send(recipients, subject, body)
|
@@ -1,57 +1,15 @@
|
||||
from flask_rq2 import RQ
|
||||
from flask import current_app as app
|
||||
from celery import Celery
|
||||
|
||||
celery = Celery(__name__)
|
||||
|
||||
|
||||
class ATSTQueue(RQ):
|
||||
def update_celery(celery, app):
|
||||
celery.conf.update(app.config)
|
||||
|
||||
"""Internal helpers to get the queue that actually does the work.
|
||||
class ContextTask(celery.Task):
|
||||
def __call__(self, *args, **kwargs):
|
||||
with app.app_context():
|
||||
return self.run(*args, **kwargs)
|
||||
|
||||
The RQ object always uses the "default" queue, unless we explicitly request
|
||||
otherwise. These helpers allow us to use `.queue_name` to get the name of
|
||||
the configured queue and `_queue_job` will use the appropriate queue.
|
||||
|
||||
"""
|
||||
|
||||
@property
|
||||
def queue_name(self):
|
||||
return self.queues[0]
|
||||
|
||||
def get_queue(self, name=None):
|
||||
if not name:
|
||||
name = self.queue_name
|
||||
return super().get_queue(name)
|
||||
|
||||
def _queue_job(self, function, *args, **kwargs):
|
||||
self.get_queue().enqueue(function, *args, **kwargs)
|
||||
|
||||
# pylint: disable=pointless-string-statement
|
||||
"""Instance methods to queue up application-specific jobs."""
|
||||
|
||||
def send_mail(self, recipients, subject, body):
|
||||
self._queue_job(ATSTQueue._send_mail, recipients, subject, body)
|
||||
|
||||
def send_notification_mail(self, recipients, subject, body):
|
||||
self._queue_job(ATSTQueue._send_notification_mail, recipients, subject, body)
|
||||
|
||||
# pylint: disable=pointless-string-statement
|
||||
"""Class methods to actually perform the work.
|
||||
|
||||
Must be a class method (or a module-level function) because we being able
|
||||
to pickle the class is more effort than its worth.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def _send_mail(self, recipients, subject, body):
|
||||
app.mailer.send(recipients, subject, body)
|
||||
|
||||
@classmethod
|
||||
def _send_notification_mail(self, recipients, subject, body):
|
||||
app.logger.info(
|
||||
"Sending a notification to these recipients: {}\n\nSubject: {}\n\n{}".format(
|
||||
recipients, subject, body
|
||||
)
|
||||
)
|
||||
app.mailer.send(recipients, subject, body)
|
||||
|
||||
|
||||
queue = ATSTQueue()
|
||||
celery.Task = ContextTask
|
||||
return celery
|
||||
|
@@ -14,7 +14,7 @@ from atst.forms.team import TeamForm
|
||||
from atst.models import Permissions
|
||||
from atst.utils.flash import formatted_flash as flash
|
||||
from atst.utils.localization import translate
|
||||
from atst.queue import queue
|
||||
from atst.jobs import send_mail
|
||||
|
||||
|
||||
def get_form_permission_value(member, edit_perm_set):
|
||||
@@ -129,7 +129,7 @@ def send_application_invitation(invitee_email, inviter_name, token):
|
||||
body = render_template(
|
||||
"emails/application/invitation.txt", owner=inviter_name, token=token
|
||||
)
|
||||
queue.send_mail(
|
||||
send_mail.delay(
|
||||
[invitee_email],
|
||||
translate("email.application_invite", {"inviter_name": inviter_name}),
|
||||
body,
|
||||
|
@@ -15,7 +15,7 @@ from atst.domain.exceptions import AlreadyExistsError, NotFoundError
|
||||
from atst.domain.users import Users
|
||||
from atst.domain.permission_sets import PermissionSets
|
||||
from atst.forms.data import SERVICE_BRANCHES
|
||||
from atst.queue import queue
|
||||
from atst.jobs import send_mail
|
||||
from atst.utils import pick
|
||||
|
||||
|
||||
@@ -174,7 +174,7 @@ def dev_new_user():
|
||||
|
||||
@bp.route("/test-email")
|
||||
def test_email():
|
||||
queue.send_mail(
|
||||
send_mail.delay(
|
||||
[request.args.get("to")], request.args.get("subject"), request.args.get("body")
|
||||
)
|
||||
|
||||
|
@@ -6,7 +6,7 @@ from atst.domain.exceptions import AlreadyExistsError
|
||||
from atst.domain.invitations import PortfolioInvitations
|
||||
from atst.domain.portfolios import Portfolios
|
||||
from atst.models import Permissions
|
||||
from atst.queue import queue
|
||||
from atst.jobs import send_mail
|
||||
from atst.utils.flash import formatted_flash as flash
|
||||
from atst.utils.localization import translate
|
||||
import atst.forms.portfolio_member as member_forms
|
||||
@@ -16,7 +16,7 @@ def send_portfolio_invitation(invitee_email, inviter_name, token):
|
||||
body = render_template(
|
||||
"emails/portfolio/invitation.txt", owner=inviter_name, token=token
|
||||
)
|
||||
queue.send_mail(
|
||||
send_mail.delay(
|
||||
[invitee_email],
|
||||
translate("email.portfolio_invite", {"inviter_name": inviter_name}),
|
||||
body,
|
||||
|
@@ -1,6 +1,6 @@
|
||||
from sqlalchemy import select
|
||||
|
||||
from atst.queue import ATSTQueue
|
||||
from atst.jobs import send_notification_mail
|
||||
from atst.database import db
|
||||
from atst.models import NotificationRecipient
|
||||
|
||||
@@ -8,12 +8,9 @@ from atst.models import NotificationRecipient
|
||||
class NotificationSender(object):
|
||||
EMAIL_SUBJECT = "ATST notification"
|
||||
|
||||
def __init__(self, queue: ATSTQueue):
|
||||
self.queue = queue
|
||||
|
||||
def send(self, body, type_=None):
|
||||
recipients = self._get_recipients(type_)
|
||||
self.queue.send_notification_mail(recipients, self.EMAIL_SUBJECT, body)
|
||||
send_notification_mail.delay(recipients, self.EMAIL_SUBJECT, body)
|
||||
|
||||
def _get_recipients(self, type_):
|
||||
query = select([NotificationRecipient.email])
|
||||
|
Reference in New Issue
Block a user