FastAPI and Celery
欢迎转载,请支持原创,保留原文链接:blog.ilibrary.me
- https://testdriven.io/blog/fastapi-and-celery/
- FastAPI有内置的Starlette
- Flower,Flower is an open-source web application for monitoring and managing Celery clusters. It provides real-time information about the status of Celery workers and tasks.
Steps:
brew install redis
brew services start redis
redis-cli keys "*"
celery -A worker.celery worker --loglevel=info --logfile=logs/celery.log --pool threads
celery --broker=redis://localhost:6379/0 flower --port=5555
- Go to http://0.0.0.0:5555/ to check dashboard, powered by flower.
- 不能直接定义async celery task,Celery不支持asyncio, 可以通过函数调用的方式在普通函数里面创建async任务.
- Mac上启动的时候一定要加
--pool threads
参数:celery -A worker.celery worker --loglevel=info --logfile=logs/celery.log --pool threads
, 要不然会task会莫名退出。stackoverflow - 用下面的代码做async封装:
@celery.task(name="html") def html_async(source_name, data_cls="HotUndergraduateProgramData"): print(celery.conf) print(f"当前工作目录是: {os.getcwd()}") async def asyncfunc(): s = Scrape(source_name, data_cls, max_concurrent_tasks=50) await s.html() asyncio.run(asyncfunc()) #loop = asyncio.get_event_loop() #loop.run_until_complete(asyncfunc()) # run_coroutine_sync(asyncfunc(), 1000) return {"status":"Done"}
- Mac上启动的时候一定要加
attempted relative import with no known parent package
- 从
worker.py
里面引用main.py
,循环引用了。 ModuleNotFoundError: No module named 'tasks'
, 子包也引用不了,所以不是循环引用的问题.- 原因找到了,把import从方法体里面抽到文件最上面就可以了。但是循环引用的问题需要另外解决。
- 从
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 5
, 代码抛错了,先用命令行把抛错的问题修复,然后再跑job. 或者在job里面try catch
.
sample task file
# worker.py
import os
import time
from celery import Celery
celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")
@celery.task(name="create_task")
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True