From d7478e322ae52676f3f4937fadb1b10b4e53e338 Mon Sep 17 00:00:00 2001 From: dandds Date: Thu, 22 Aug 2019 11:55:00 -0400 Subject: [PATCH] Use Celery instead of RQ. Celery provides a more robust set of queueing options for both tasks and worker processes. Updates include: - infrastructure necessary to run Celery, including celery entrypoint - backgrounded functions are now imported directly from atst.jobs - update tests as-needed - update kubernetes worker pod command --- Pipfile | 2 +- Pipfile.lock | 191 ++++++++++++-------- atst/app.py | 8 +- atst/jobs.py | 18 ++ atst/queue.py | 64 ++----- atst/routes/applications/team.py | 4 +- atst/routes/dev.py | 4 +- atst/routes/portfolios/invitations.py | 4 +- atst/utils/notification_sender.py | 7 +- celery_worker.py | 7 + deploy/aws/aws.yml | 7 +- deploy/azure/azure.yml | 7 +- script/dev_queue | 6 +- tests/conftest.py | 7 - tests/routes/applications/test_team.py | 10 +- tests/routes/portfolios/test_invitations.py | 34 ++-- tests/test_queue.py | 5 - tests/utils/test_notification_sender.py | 15 +- 18 files changed, 209 insertions(+), 191 deletions(-) create mode 100644 atst/jobs.py create mode 100644 celery_worker.py delete mode 100644 tests/test_queue.py diff --git a/Pipfile b/Pipfile index 233fa6c0..a84969ea 100644 --- a/Pipfile +++ b/Pipfile @@ -20,12 +20,12 @@ pyopenssl = "*" requests = "*" apache-libcloud = "*" lockfile = "*" -"flask-rq2" = "*" werkzeug = "*" PyYAML = "*" azure-storage = "*" azure-storage-common = "*" boto3 = "*" +celery = "*" [dev-packages] bandit = "*" diff --git a/Pipfile.lock b/Pipfile.lock index c9b3b9dd..a3a72fdb 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "f09d90a1b4b86eff2e0ed453ceefcce4f19d54c57fb50ebed1f02aae8ab83c2a" + "sha256": "bf3b598c052193f70249da97ac746bc53aeb72f45a4515e10945a4274aba7b18" }, "pipfile-spec": 6, "requires": { @@ -18,10 +18,17 @@ "default": { "alembic": { "hashes": [ - "sha256:cdb7d98bd5cbf65acd38d70b1c05573c432e6473a82f955cdea541b5c153b0cc" + "sha256:4a4811119efbdc5259d1f4c8f6de977b36ad3bcc919f59a29c2960c5ef9149e4" ], "index": "pypi", - "version": "==1.0.11" + "version": "==1.1.0" + }, + "amqp": { + "hashes": [ + "sha256:19a917e260178b8d410122712bac69cb3e6db010d68f6101e7307508aded5e68", + "sha256:19d851b879a471fcfdcf01df9936cff924f422baa77653289f7095dedd5fb26a" + ], + "version": "==2.5.1" }, "apache-libcloud": { "hashes": [ @@ -69,20 +76,35 @@ "index": "pypi", "version": "==2.1.0" }, + "billiard": { + "hashes": [ + "sha256:01afcb4e7c4fd6480940cfbd4d9edc19d7a7509d6ada533984d0d0f49901ec82", + "sha256:b8809c74f648dfe69b973c8e660bcec00603758c9db8ba89d7719f88d5f01f26" + ], + "version": "==3.6.1.0" + }, "boto3": { "hashes": [ - "sha256:3ec5b520dbd0a430cdd581a8250991fb0f21ee7e668a8928f15006b312fa86dc", - "sha256:8aec0247131a0db1e33d28ad13910e01e6dfa208e8ab8ee5a4095e92dbaabf45" + "sha256:366a1f3ec37b9434f25247cbe876f9ca1b53d35e35af18f74c735445100b4bc4", + "sha256:e7718b48cd073ad59a99a33d14252319dfaf550be3682b0c6a58da052fb05fcc" ], "index": "pypi", - "version": "==1.9.204" + "version": "==1.9.217" }, "botocore": { "hashes": [ - "sha256:56cd1114e0ce35733e890b321160c8c438243f4fa54d3d074dfa6bdce4ee55aa", - "sha256:f86504bcc9c44d5b2e7b019f2f279b70f17b1400d2fc4775bc009ec473530cad" + "sha256:68a0a22ca4e0e7e7ab482f63e21debfe402841fc49b8503dec0a7307b565d774", + "sha256:7a213b876e58b1b5380cf30faa05ba45073692ad4a3cc803ba763082a36436bb" ], - "version": "==1.12.204" + "version": "==1.12.217" + }, + "celery": { + "hashes": [ + "sha256:821d11967f0f3f8fe24bd61ecfc7b6acbb5a926b719f1e8c4d5ff7bc09e18d81", + "sha256:ae4541fb3af5182bd4af749fee9b89c4858f2792d34bb5d034967e662cf9b55c" + ], + "index": "pypi", + "version": "==4.4.0rc3" }, "certifi": { "hashes": [ @@ -138,13 +160,6 @@ ], "version": "==7.0" }, - "croniter": { - "hashes": [ - "sha256:0d905dbe6f131a910fd3dde792f0129788cd2cb3a8048c5f7aaa212670b0cef2", - "sha256:538adeb3a7f7816c3cdec6db974c441620d764c25ff4ed0146ee7296b8a50590" - ], - "version": "==0.3.30" - }, "cryptography": { "hashes": [ "sha256:24b61e5fcb506424d3ec4e18bca995833839bf13c59fc43e530e488f28d46b8c", @@ -168,11 +183,11 @@ }, "docutils": { "hashes": [ - "sha256:02aec4bd92ab067f6ff27a38a38a41173bf01bed8f89157768c1573f53e474a6", - "sha256:51e64ef2ebfb29cae1faa133b3710143496eca21c530f3f71424d77687764274", - "sha256:7a4bd47eaf6596e1295ecb11361139febe29b084a87bf005bf899f9a42edc3c6" + "sha256:6c4f696463b79f1fb8ba0c594b63840ebd41f059e92b31957c46b74a4599b6d0", + "sha256:9e4d7ecfc600058e07ba661411a2b7de2fd0fafa17d1a7f7361cd47b1175c827", + "sha256:a2aeea129088da402665e92e0b25b04b073c04b2dce4ab65caaa38b7ce2e1a99" ], - "version": "==0.14" + "version": "==0.15.2" }, "flask": { "hashes": [ @@ -189,14 +204,6 @@ "index": "pypi", "version": "==0.12" }, - "flask-rq2": { - "hashes": [ - "sha256:3ef6395065255447f8e1516ccca24858ba87da1d71a6975e0e3b55256bf04967", - "sha256:abe1e52d3b98abe37e85830a614ba6af864516f1b6cf2229f352f8500eafc5fd" - ], - "index": "pypi", - "version": "==18.3" - }, "flask-session": { "hashes": [ "sha256:a31c27e0c3287f00c825b3d9625aba585f4df4cccedb1e7dd5a69a215881a731", @@ -228,6 +235,13 @@ ], "version": "==2.8" }, + "importlib-metadata": { + "hashes": [ + "sha256:23d3d873e008a513952355379d93cbcab874c58f4f034ff657c7a87422fa64e8", + "sha256:80d2de76188eabfbfcf27e6a37342c2827801e59c4cc14b0371c56fed43820e3" + ], + "version": "==0.19" + }, "itsdangerous": { "hashes": [ "sha256:321b033d07f2a4136d3ec762eac9f16a10ccd60f53c0c91af90217ace7ba1f19", @@ -249,6 +263,13 @@ ], "version": "==0.9.4" }, + "kombu": { + "hashes": [ + "sha256:55274dc75eb3c3994538b0973a0fadddb236b698a4bc135b8aa4981e0a710b8f", + "sha256:e5f0312dfb9011bebbf528ccaf118a6c2b5c3b8244451f08381fb23e7715809b" + ], + "version": "==4.6.4" + }, "lockfile": { "hashes": [ "sha256:6aed02de03cba24efabcd600b30540140634fc06cfa603822d508d5361e9f799", @@ -296,6 +317,13 @@ ], "version": "==1.1.1" }, + "more-itertools": { + "hashes": [ + "sha256:409cd48d4db7052af495b09dec721011634af3753ae1ef92d2b32f73a745f832", + "sha256:92b8c4b06dac4f0611c0729b2f2ede52b2e1bac1ab48f089c7ddc12e26bb60c4" + ], + "version": "==7.2.0" + }, "pendulum": { "hashes": [ "sha256:1cde6e3c6310fb882c98f373795f807cb2bd6af01f34d2857e6e283b5ee91e09", @@ -372,6 +400,13 @@ ], "version": "==1.0.4" }, + "pytz": { + "hashes": [ + "sha256:26c0b32e437e54a18161324a2fca3c4b9846b74a8dccddd843113109e1116b32", + "sha256:c894d57500a4cd2d5c71114aaab77dbab5eabd9022308ce5ac9bb93a60a6f0c7" + ], + "version": "==2019.2" + }, "pytzdata": { "hashes": [ "sha256:c0c8316eaf6c25ba45816390a1a45c39790767069b3275c5f7de3ddf773eb810", @@ -400,11 +435,11 @@ }, "redis": { "hashes": [ - "sha256:45682ecf226c7611efe731974c4fa3390170ba045b9cdb26f0051114a5c2a68b", - "sha256:f2609a85e5f37f489ba3b5652e1175dc3711c4d7a7818c4f657615810afd23df" + "sha256:98a22fb750c9b9bb46e75e945dc3f61d0ab30d06117cbb21ff9cd1d315fedd3b", + "sha256:c504251769031b0dd7dd5cf786050a6050197c6de0d37778c80c08cb04ae8275" ], "index": "pypi", - "version": "==3.3.6" + "version": "==3.3.8" }, "requests": { "hashes": [ @@ -414,20 +449,6 @@ "index": "pypi", "version": "==2.22.0" }, - "rq": { - "hashes": [ - "sha256:2798d26a7b850e759f23f69695a389d676a9c08f2c14f96f0d34d9648c9d5616", - "sha256:4f27c6a690d1bd02b9157e615d8819555b9b359c0c4ec8ff0013d160c31b40bb" - ], - "version": "==1.1.0" - }, - "rq-scheduler": { - "hashes": [ - "sha256:90eb3915e31cc2032c301e5ab1fd5ad6f23d9500435046995e009f098e18efbe", - "sha256:ff7d45b34a8a39c9c83634f642aeef950641c75c4eeea3fa140bf574bfc6aca2" - ], - "version": "==0.9" - }, "s3transfer": { "hashes": [ "sha256:6efc926738a3cd576c2a79725fed9afde92378aa5c6a957e3af010cb019fac9d", @@ -444,10 +465,10 @@ }, "sqlalchemy": { "hashes": [ - "sha256:217e7fc52199a05851eee9b6a0883190743c4fb9c8ac4313ccfceaffd852b0ff" + "sha256:2f8ff566a4d3a92246d367f2e9cd6ed3edeef670dcd6dda6dfdc9efed88bcd80" ], "index": "pypi", - "version": "==1.3.6" + "version": "==1.3.8" }, "unipath": { "hashes": [ @@ -465,6 +486,13 @@ "markers": "python_version >= '3.4'", "version": "==1.25.3" }, + "vine": { + "hashes": [ + "sha256:133ee6d7a9016f177ddeaf191c1f58421a1dcc6ee9a42c58b34bed40e1d2cd87", + "sha256:ea4947cc56d1fd6f2095c8d543ee25dad966f78692528e68b4fada11ba3f98af" + ], + "version": "==1.3.0" + }, "webassets": { "hashes": [ "sha256:e7d9c8887343123fd5b32309b33167428cb1318cdda97ece12d0907fd69d38db" @@ -486,6 +514,13 @@ "sha256:e3ee092c827582c50877cdbd49e9ce6d2c5c1f6561f849b3b068c1b8029626f1" ], "version": "==2.2.1" + }, + "zipp": { + "hashes": [ + "sha256:3718b1cbcd963c7d4c5511a8240812904164b7f381b647143a89d3b98f9bcd8e", + "sha256:f06903e9f1f43b12d371004b4ac7b06ab39a44adc747266928ae6debfa7b3335" + ], + "version": "==0.6.0" } }, "develop": { @@ -653,10 +688,10 @@ }, "faker": { "hashes": [ - "sha256:96ad7902706f2409a2d0c3de5132f69b413555a419bacec99d3f16e657895b47", - "sha256:b3bb64aff9571510de6812df45122b633dbc6227e870edae3ed9430f94698521" + "sha256:1d3f700e8dfcefd6e657118d71405d53e86974448aba78884f119bbd84c0cddf", + "sha256:d5366e120191c5610fceeebfe1c298dc46da0277096f639c6dd7e2eaee0fa547" ], - "version": "==2.0.0" + "version": "==2.0.1" }, "flask": { "hashes": [ @@ -675,10 +710,10 @@ }, "gitpython": { "hashes": [ - "sha256:c15c55ff890cd3a6a8330059e80885410a328f645551b55a91d858bfb3eb2573", - "sha256:df752b6b6f06f11213e91c4925aea7eaf9e37e88fb71c8a7a1aa0a5c10852120" + "sha256:947cc75913e7b6da108458136607e2ee0e40c20be1e12d4284e7c6c12956c276", + "sha256:d2f4945f8260f6981d724f5957bc076398ada55cb5d25aaee10108bcdc894100" ], - "version": "==2.1.13" + "version": "==3.0.2" }, "honcho": { "hashes": [ @@ -740,10 +775,10 @@ }, "jedi": { "hashes": [ - "sha256:53c850f1a7d3cfcd306cc513e2450a54bdf5cacd7604b74e42dd1f0758eaaf36", - "sha256:e07457174ef7cb2342ff94fa56484fe41cec7ef69b0059f01d3f812379cb6f7c" + "sha256:786b6c3d80e2f06fd77162a07fed81b8baa22dde5d62896a790a331d6ac21a27", + "sha256:ba859c74fa3c966a22f2aeebe1b74ee27e2a462f56d3f5f7ca4a59af61bfe42e" ], - "version": "==0.14.1" + "version": "==0.15.1" }, "jinja2": { "hashes": [ @@ -754,26 +789,26 @@ }, "lazy-object-proxy": { "hashes": [ - "sha256:159a745e61422217881c4de71f9eafd9d703b93af95618635849fe469a283661", - "sha256:23f63c0821cc96a23332e45dfaa83266feff8adc72b9bcaef86c202af765244f", - "sha256:3b11be575475db2e8a6e11215f5aa95b9ec14de658628776e10d96fa0b4dac13", - "sha256:3f447aff8bc61ca8b42b73304f6a44fa0d915487de144652816f950a3f1ab821", - "sha256:4ba73f6089cd9b9478bc0a4fa807b47dbdb8fad1d8f31a0f0a5dbf26a4527a71", - "sha256:4f53eadd9932055eac465bd3ca1bd610e4d7141e1278012bd1f28646aebc1d0e", - "sha256:64483bd7154580158ea90de5b8e5e6fc29a16a9b4db24f10193f0c1ae3f9d1ea", - "sha256:6f72d42b0d04bfee2397aa1862262654b56922c20a9bb66bb76b6f0e5e4f9229", - "sha256:7c7f1ec07b227bdc561299fa2328e85000f90179a2f44ea30579d38e037cb3d4", - "sha256:7c8b1ba1e15c10b13cad4171cfa77f5bb5ec2580abc5a353907780805ebe158e", - "sha256:8559b94b823f85342e10d3d9ca4ba5478168e1ac5658a8a2f18c991ba9c52c20", - "sha256:a262c7dfb046f00e12a2bdd1bafaed2408114a89ac414b0af8755c696eb3fc16", - "sha256:acce4e3267610c4fdb6632b3886fe3f2f7dd641158a843cf6b6a68e4ce81477b", - "sha256:be089bb6b83fac7f29d357b2dc4cf2b8eb8d98fe9d9ff89f9ea6012970a853c7", - "sha256:bfab710d859c779f273cc48fb86af38d6e9210f38287df0069a63e40b45a2f5c", - "sha256:c10d29019927301d524a22ced72706380de7cfc50f767217485a912b4c8bd82a", - "sha256:dd6e2b598849b3d7aee2295ac765a578879830fb8966f70be8cd472e6069932e", - "sha256:e408f1eacc0a68fed0c08da45f31d0ebb38079f043328dce69ff133b95c29dc1" + "sha256:02b260c8deb80db09325b99edf62ae344ce9bc64d68b7a634410b8e9a568edbf", + "sha256:18f9c401083a4ba6e162355873f906315332ea7035803d0fd8166051e3d402e3", + "sha256:1f2c6209a8917c525c1e2b55a716135ca4658a3042b5122d4e3413a4030c26ce", + "sha256:2f06d97f0ca0f414f6b707c974aaf8829c2292c1c497642f63824119d770226f", + "sha256:616c94f8176808f4018b39f9638080ed86f96b55370b5a9463b2ee5c926f6c5f", + "sha256:63b91e30ef47ef68a30f0c3c278fbfe9822319c15f34b7538a829515b84ca2a0", + "sha256:77b454f03860b844f758c5d5c6e5f18d27de899a3db367f4af06bec2e6013a8e", + "sha256:83fe27ba321e4cfac466178606147d3c0aa18e8087507caec78ed5a966a64905", + "sha256:84742532d39f72df959d237912344d8a1764c2d03fe58beba96a87bfa11a76d8", + "sha256:874ebf3caaf55a020aeb08acead813baf5a305927a71ce88c9377970fe7ad3c2", + "sha256:9f5caf2c7436d44f3cec97c2fa7791f8a675170badbfa86e1992ca1b84c37009", + "sha256:a0c8758d01fcdfe7ae8e4b4017b13552efa7f1197dd7358dc9da0576f9d0328a", + "sha256:a4def978d9d28cda2d960c279318d46b327632686d82b4917516c36d4c274512", + "sha256:ad4f4be843dace866af5fc142509e9b9817ca0c59342fdb176ab6ad552c927f5", + "sha256:ae33dd198f772f714420c5ab698ff05ff900150486c648d29951e9c70694338e", + "sha256:b4a2b782b8a8c5522ad35c93e04d60e2ba7f7dcb9271ec8e8c3e08239be6c7b4", + "sha256:c462eb33f6abca3b34cdedbe84d761f31a60b814e173b98ede3c81bb48967c4f", + "sha256:fd135b8d35dfdcdb984828c84d695937e58cc5f49e1c854eb311c4d6aa03f4f1" ], - "version": "==1.4.1" + "version": "==1.4.2" }, "markupsafe": { "hashes": [ @@ -1090,10 +1125,10 @@ }, "zipp": { "hashes": [ - "sha256:4970c3758f4e89a7857a973b1e2a5d75bcdc47794442f2e2dd4fe8e0466e809a", - "sha256:8a5712cfd3bb4248015eb3b0b3c54a5f6ee3f2425963ef2a0125b8bc40aafaec" + "sha256:3718b1cbcd963c7d4c5511a8240812904164b7f381b647143a89d3b98f9bcd8e", + "sha256:f06903e9f1f43b12d371004b4ac7b06ab39a44adc747266928ae6debfa7b3335" ], - "version": "==0.5.2" + "version": "==0.6.0" } } } diff --git a/atst/app.py b/atst/app.py index 4f7bdbff..69c87cbf 100644 --- a/atst/app.py +++ b/atst/app.py @@ -25,10 +25,10 @@ from atst.domain.authz import Authorization from atst.domain.csp import make_csp_provider from atst.domain.portfolios import Portfolios from atst.models.permissions import Permissions +from atst.queue import celery, update_celery from atst.utils import mailer from atst.utils.form_cache import FormCache from atst.utils.json import CustomJSONEncoder -from atst.queue import queue from atst.utils.notification_sender import NotificationSender from atst.utils.session_limiter import SessionLimiter @@ -59,13 +59,14 @@ def make_app(config): app.config.update(config) app.config.update({"SESSION_REDIS": app.redis}) + update_celery(celery, app) + make_flask_callbacks(app) register_filters(app) make_csp_provider(app, config.get("CSP", "mock")) make_crl_validator(app) make_mailer(app) make_notification_sender(app) - queue.init_app(app) db.init_app(app) csrf.init_app(app) @@ -149,6 +150,7 @@ def map_config(config): return { **config["default"], "ENV": config["default"]["ENVIRONMENT"], + "BROKER_URL": config["default"]["REDIS_URI"], "DEBUG": config["default"].getboolean("DEBUG"), "SQLALCHEMY_ECHO": config["default"].getboolean("SQLALCHEMY_ECHO"), "CLASSIFIED": config["default"].getboolean("CLASSIFIED"), @@ -248,7 +250,7 @@ def make_mailer(app): def make_notification_sender(app): - app.notification_sender = NotificationSender(queue) + app.notification_sender = NotificationSender() def make_session_limiter(app, session, config): diff --git a/atst/jobs.py b/atst/jobs.py new file mode 100644 index 00000000..09c35093 --- /dev/null +++ b/atst/jobs.py @@ -0,0 +1,18 @@ +from flask import current_app as app + +from atst.queue import celery + + +@celery.task() +def send_mail(recipients, subject, body): + app.mailer.send(recipients, subject, body) + + +@celery.task() +def send_notification_mail(recipients, subject, body): + app.logger.info( + "Sending a notification to these recipients: {}\n\nSubject: {}\n\n{}".format( + recipients, subject, body + ) + ) + app.mailer.send(recipients, subject, body) diff --git a/atst/queue.py b/atst/queue.py index f519a55c..e3ae049d 100644 --- a/atst/queue.py +++ b/atst/queue.py @@ -1,57 +1,15 @@ -from flask_rq2 import RQ -from flask import current_app as app +from celery import Celery + +celery = Celery(__name__) -class ATSTQueue(RQ): +def update_celery(celery, app): + celery.conf.update(app.config) - """Internal helpers to get the queue that actually does the work. + class ContextTask(celery.Task): + def __call__(self, *args, **kwargs): + with app.app_context(): + return self.run(*args, **kwargs) - The RQ object always uses the "default" queue, unless we explicitly request - otherwise. These helpers allow us to use `.queue_name` to get the name of - the configured queue and `_queue_job` will use the appropriate queue. - - """ - - @property - def queue_name(self): - return self.queues[0] - - def get_queue(self, name=None): - if not name: - name = self.queue_name - return super().get_queue(name) - - def _queue_job(self, function, *args, **kwargs): - self.get_queue().enqueue(function, *args, **kwargs) - - # pylint: disable=pointless-string-statement - """Instance methods to queue up application-specific jobs.""" - - def send_mail(self, recipients, subject, body): - self._queue_job(ATSTQueue._send_mail, recipients, subject, body) - - def send_notification_mail(self, recipients, subject, body): - self._queue_job(ATSTQueue._send_notification_mail, recipients, subject, body) - - # pylint: disable=pointless-string-statement - """Class methods to actually perform the work. - - Must be a class method (or a module-level function) because we being able - to pickle the class is more effort than its worth. - """ - - @classmethod - def _send_mail(self, recipients, subject, body): - app.mailer.send(recipients, subject, body) - - @classmethod - def _send_notification_mail(self, recipients, subject, body): - app.logger.info( - "Sending a notification to these recipients: {}\n\nSubject: {}\n\n{}".format( - recipients, subject, body - ) - ) - app.mailer.send(recipients, subject, body) - - -queue = ATSTQueue() + celery.Task = ContextTask + return celery diff --git a/atst/routes/applications/team.py b/atst/routes/applications/team.py index 81760ddb..be151916 100644 --- a/atst/routes/applications/team.py +++ b/atst/routes/applications/team.py @@ -14,7 +14,7 @@ from atst.forms.team import TeamForm from atst.models import Permissions from atst.utils.flash import formatted_flash as flash from atst.utils.localization import translate -from atst.queue import queue +from atst.jobs import send_mail def get_form_permission_value(member, edit_perm_set): @@ -129,7 +129,7 @@ def send_application_invitation(invitee_email, inviter_name, token): body = render_template( "emails/application/invitation.txt", owner=inviter_name, token=token ) - queue.send_mail( + send_mail.delay( [invitee_email], translate("email.application_invite", {"inviter_name": inviter_name}), body, diff --git a/atst/routes/dev.py b/atst/routes/dev.py index 74c3a943..65163342 100644 --- a/atst/routes/dev.py +++ b/atst/routes/dev.py @@ -15,7 +15,7 @@ from atst.domain.exceptions import AlreadyExistsError, NotFoundError from atst.domain.users import Users from atst.domain.permission_sets import PermissionSets from atst.forms.data import SERVICE_BRANCHES -from atst.queue import queue +from atst.jobs import send_mail from atst.utils import pick @@ -174,7 +174,7 @@ def dev_new_user(): @bp.route("/test-email") def test_email(): - queue.send_mail( + send_mail.delay( [request.args.get("to")], request.args.get("subject"), request.args.get("body") ) diff --git a/atst/routes/portfolios/invitations.py b/atst/routes/portfolios/invitations.py index 1975be18..16fc0bf2 100644 --- a/atst/routes/portfolios/invitations.py +++ b/atst/routes/portfolios/invitations.py @@ -6,7 +6,7 @@ from atst.domain.exceptions import AlreadyExistsError from atst.domain.invitations import PortfolioInvitations from atst.domain.portfolios import Portfolios from atst.models import Permissions -from atst.queue import queue +from atst.jobs import send_mail from atst.utils.flash import formatted_flash as flash from atst.utils.localization import translate import atst.forms.portfolio_member as member_forms @@ -16,7 +16,7 @@ def send_portfolio_invitation(invitee_email, inviter_name, token): body = render_template( "emails/portfolio/invitation.txt", owner=inviter_name, token=token ) - queue.send_mail( + send_mail.delay( [invitee_email], translate("email.portfolio_invite", {"inviter_name": inviter_name}), body, diff --git a/atst/utils/notification_sender.py b/atst/utils/notification_sender.py index cfd8c8d1..9b8e8a25 100644 --- a/atst/utils/notification_sender.py +++ b/atst/utils/notification_sender.py @@ -1,6 +1,6 @@ from sqlalchemy import select -from atst.queue import ATSTQueue +from atst.jobs import send_notification_mail from atst.database import db from atst.models import NotificationRecipient @@ -8,12 +8,9 @@ from atst.models import NotificationRecipient class NotificationSender(object): EMAIL_SUBJECT = "ATST notification" - def __init__(self, queue: ATSTQueue): - self.queue = queue - def send(self, body, type_=None): recipients = self._get_recipients(type_) - self.queue.send_notification_mail(recipients, self.EMAIL_SUBJECT, body) + send_notification_mail.delay(recipients, self.EMAIL_SUBJECT, body) def _get_recipients(self, type_): query = select([NotificationRecipient.email]) diff --git a/celery_worker.py b/celery_worker.py new file mode 100644 index 00000000..f1371182 --- /dev/null +++ b/celery_worker.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python + +from atst.app import celery, make_app, make_config + +config = make_config() +app = make_app(config) +app.app_context().push() diff --git a/deploy/aws/aws.yml b/deploy/aws/aws.yml index 8f8e46d0..a1cfdda2 100644 --- a/deploy/aws/aws.yml +++ b/deploy/aws/aws.yml @@ -142,9 +142,10 @@ spec: image: 904153757533.dkr.ecr.us-east-2.amazonaws.com/atat:884d95ada21a5097f5c07f305d8e4e24d0f2a03f args: [ "/opt/atat/atst/.venv/bin/python", - "/opt/atat/atst/.venv/bin/flask", - "rq", - "worker" + "/opt/atat/atst/.venv/bin/celery", + "-A", + "celery_worker.celery", + "worker", ] resources: requests: diff --git a/deploy/azure/azure.yml b/deploy/azure/azure.yml index ea645df5..a76afb22 100644 --- a/deploy/azure/azure.yml +++ b/deploy/azure/azure.yml @@ -143,9 +143,10 @@ spec: image: pwatat.azurecr.io/atat:884d95ada21a5097f5c07f305d8e4e24d0f2a03f args: [ "/opt/atat/atst/.venv/bin/python", - "/opt/atat/atst/.venv/bin/flask", - "rq", - "worker" + "/opt/atat/atst/.venv/bin/celery", + "-A", + "celery_worker.celery", + "worker", ] resources: requests: diff --git a/script/dev_queue b/script/dev_queue index db171e3e..670dd96b 100755 --- a/script/dev_queue +++ b/script/dev_queue @@ -4,8 +4,10 @@ set -e +WORKER="pipenv run celery -A celery_worker.celery worker --loglevel=info" + if [[ `command -v entr` ]]; then - find atst | entr -r flask rq worker + find atst | entr -r $WORKER else - flask rq worker + $WORKER fi diff --git a/tests/conftest.py b/tests/conftest.py index cede1f3c..fed42f2d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,7 +9,6 @@ from collections import OrderedDict from atst.app import make_app, make_config from atst.database import db as _db -from atst.queue import queue as atst_queue import tests.factories as factories from tests.mocks import PDF_FILENAME, PDF_FILENAME2 from tests.utils import FakeLogger, FakeNotificationSender @@ -158,12 +157,6 @@ def extended_financial_verification_data(pdf_upload): } -@pytest.fixture(scope="function", autouse=True) -def queue(): - yield atst_queue - atst_queue.get_queue().empty() - - @pytest.fixture def crl_failover_open_app(app): app.config.update({"CRL_FAIL_OPEN": True}) diff --git a/tests/routes/applications/test_team.py b/tests/routes/applications/test_team.py index cf01a083..2ba80a9e 100644 --- a/tests/routes/applications/test_team.py +++ b/tests/routes/applications/test_team.py @@ -1,10 +1,11 @@ import uuid +from unittest.mock import Mock + from flask import url_for from atst.domain.permission_sets import PermissionSets from atst.models import CSPRole from atst.forms.data import ENV_ROLE_NO_ACCESS as NO_ACCESS -from atst.queue import queue from tests.factories import * @@ -149,8 +150,9 @@ def test_update_team_revoke_environment_access(client, user_session, db, session assert user not in environment.users -def test_create_member(client, user_session, session): - queue_length = len(queue.get_queue()) +def test_create_member(monkeypatch, client, user_session, session): + job_mock = Mock() + monkeypatch.setattr("atst.jobs.send_mail.delay", job_mock) user = UserFactory.create() application = ApplicationFactory.create( environments=[{"name": "Naboo"}, {"name": "Endor"}] @@ -198,7 +200,7 @@ def test_create_member(client, user_session, session): ) assert invitation.role.application == application - assert len(queue.get_queue()) == queue_length + 1 + assert job_mock.called def test_remove_member_success(client, user_session): diff --git a/tests/routes/portfolios/test_invitations.py b/tests/routes/portfolios/test_invitations.py index 1ec7e9d3..2638a9af 100644 --- a/tests/routes/portfolios/test_invitations.py +++ b/tests/routes/portfolios/test_invitations.py @@ -1,10 +1,11 @@ import datetime +from unittest.mock import Mock + from flask import url_for from atst.domain.portfolios import Portfolios from atst.models import InvitationStatus, PortfolioRoleStatus from atst.domain.permission_sets import PermissionSets -from atst.queue import queue from tests.factories import * @@ -183,7 +184,11 @@ def test_user_can_only_revoke_invites_in_their_portfolio(client, user_session): assert not invite.is_revoked -def test_user_can_only_resend_invites_in_their_portfolio(client, user_session, queue): +def test_user_can_only_resend_invites_in_their_portfolio( + monkeypatch, client, user_session +): + job_mock = Mock() + monkeypatch.setattr("atst.jobs.send_mail.delay", job_mock) portfolio = PortfolioFactory.create() other_portfolio = PortfolioFactory.create() user = UserFactory.create() @@ -206,10 +211,12 @@ def test_user_can_only_resend_invites_in_their_portfolio(client, user_session, q ) assert response.status_code == 404 - assert len(queue.get_queue()) == 0 + assert not job_mock.called -def test_resend_invitation_sends_email(client, user_session, queue): +def test_resend_invitation_sends_email(monkeypatch, client, user_session): + job_mock = Mock() + monkeypatch.setattr("atst.jobs.send_mail.delay", job_mock) user = UserFactory.create() portfolio = PortfolioFactory.create() ws_role = PortfolioRoleFactory.create( @@ -227,12 +234,14 @@ def test_resend_invitation_sends_email(client, user_session, queue): ) ) - assert len(queue.get_queue()) == 1 + assert job_mock.called def test_existing_member_invite_resent_to_email_submitted_in_form( - client, user_session, queue + monkeypatch, client, user_session ): + job_mock = Mock() + monkeypatch.setattr("atst.jobs.send_mail.delay", job_mock) portfolio = PortfolioFactory.create() user = UserFactory.create() ws_role = PortfolioRoleFactory.create( @@ -253,10 +262,10 @@ def test_existing_member_invite_resent_to_email_submitted_in_form( ) ) - send_mail_job = queue.get_queue().jobs[0] assert user.email != "example@example.com" - assert send_mail_job.func.__func__.__name__ == "_send_mail" - assert send_mail_job.args[0] == ["example@example.com"] + ordered_args, _unordered_args = job_mock.call_args + recipients, _subject, _message = ordered_args + assert recipients[0] == "example@example.com" _DEFAULT_PERMS_FORM_DATA = { @@ -278,11 +287,12 @@ def test_user_with_permission_has_add_member_link(client, user_session): ) -def test_invite_member(client, user_session, session): +def test_invite_member(monkeypatch, client, user_session, session): + job_mock = Mock() + monkeypatch.setattr("atst.jobs.send_mail.delay", job_mock) user_data = UserFactory.dictionary() portfolio = PortfolioFactory.create() user_session(portfolio.owner) - queue_length = len(queue.get_queue()) response = client.post( url_for("portfolios.invite_member", portfolio_id=portfolio.id), @@ -307,5 +317,5 @@ def test_invite_member(client, user_session, session): ) assert invitation.role.portfolio == portfolio - assert len(queue.get_queue()) == queue_length + 1 + assert job_mock.called assert len(invitation.role.permission_sets) == 5 diff --git a/tests/test_queue.py b/tests/test_queue.py deleted file mode 100644 index eb377bc9..00000000 --- a/tests/test_queue.py +++ /dev/null @@ -1,5 +0,0 @@ -def test_send_mail(queue): - queue.send_mail( - ["lordvader@geocities.net"], "death start", "how is it coming along?" - ) - assert len(queue.get_queue()) == 1 diff --git a/tests/utils/test_notification_sender.py b/tests/utils/test_notification_sender.py index 5aec0081..7c53a83d 100644 --- a/tests/utils/test_notification_sender.py +++ b/tests/utils/test_notification_sender.py @@ -6,22 +6,19 @@ from atst.utils.notification_sender import NotificationSender @pytest.fixture -def mock_queue(queue): - return Mock(spec=queue) +def notification_sender(): + return NotificationSender() -@pytest.fixture -def notification_sender(mock_queue): - return NotificationSender(mock_queue) - - -def test_can_send_notification(mock_queue, notification_sender): +def test_can_send_notification(monkeypatch, notification_sender): + job_mock = Mock() + monkeypatch.setattr("atst.jobs.send_notification_mail.delay", job_mock) recipient_email = "test@example.com" email_body = "This is a test" NotificationRecipientFactory.create(email=recipient_email) notification_sender.send(email_body) - mock_queue.send_notification_mail.assert_called_once_with( + job_mock.assert_called_once_with( ("test@example.com",), notification_sender.EMAIL_SUBJECT, email_body )