欢迎转载,请支持原创,保留原文链接:blog.ilibrary.me
  1. https://testdriven.io/blog/fastapi-and-celery/
  2. FastAPI有内置的Starlette
  3. Flower,Flower is an open-source web application for monitoring and managing Celery clusters. It provides real-time information about the status of Celery workers and tasks.

Steps:

  1. brew install redis
  2. brew services start redis
  3. redis-cli keys "*"
  4. celery -A worker.celery worker --loglevel=info --logfile=logs/celery.log --pool threads
  5. celery --broker=redis://localhost:6379/0 flower --port=5555
  6. Go to http://0.0.0.0:5555/ to check dashboard, powered by flower.
  7. 不能直接定义async celery task,Celery不支持asyncio, 可以通过函数调用的方式在普通函数里面创建async任务.
    1. Mac上启动的时候一定要加--pool threads参数:celery -A worker.celery worker --loglevel=info --logfile=logs/celery.log --pool threads, 要不然会task会莫名退出。stackoverflow
    2. 用下面的代码做async封装:
      @celery.task(name="html")
       def html_async(source_name, data_cls="HotUndergraduateProgramData"):
           print(celery.conf)
           print(f"当前工作目录是: {os.getcwd()}")
                  
           async def asyncfunc():
               s = Scrape(source_name, data_cls, max_concurrent_tasks=50)
               await s.html()
      
           asyncio.run(asyncfunc())
      
           #loop = asyncio.get_event_loop()
           #loop.run_until_complete(asyncfunc())
      
           # run_coroutine_sync(asyncfunc(), 1000)
           return {"status":"Done"}
      
  8. attempted relative import with no known parent package
    1. worker.py里面引用main.py,循环引用了。
    2. ModuleNotFoundError: No module named 'tasks', 子包也引用不了,所以不是循环引用的问题.
    3. 原因找到了,把import从方法体里面抽到文件最上面就可以了。但是循环引用的问题需要另外解决。
  9. billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 5, 代码抛错了,先用命令行把抛错的问题修复,然后再跑job. 或者在job里面try catch.

sample task file

# worker.py
import os
import time

from celery import Celery


celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")


@celery.task(name="create_task")
def create_task(task_type):
    time.sleep(int(task_type) * 10)
    return True