Django Celery socket.error [Errno 61] В соединении отказано

Я использую Redis, Celery 4.0 и Django 1.10, но получаю сообщение об отказе соединения [Errrno 61] при запуске задачи «тест» из оболочки. Это моя структура проекта:

myproj
│
├── app1
    ├── __init__.py
    ├── tasks.py
    myproj
    ├── __init__.py
    ├── urls.py
    ├── settings
    │   ├── __init__.py
    │   ├── base.py
    │   ├── local.py
    ├── local
    │   ├── __init__.py
    │   ├── celery.py
    │   ├── wsgi.py

myproj/app1/tasks.py:

from __future__ import absolute_import
from celery import task

@task(name='app1.tasks.test')
def test():
    print('this is a test')

myproj/myproj/local/celery.py:

from __future__ import absolute_import
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings.local')
app = Celery('myproj_local')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks()

мой проект/мой проект/местный/__init__.py:

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']

Я думаю, что что-то не так в этом файле инициализации, потому что задача запускается из оболочки, когда я перемещаю содержимое в myproj/myproj/__init__.py:

from __future__ import absolute_import, unicode_literals

from .local.celery import app as celery_app

__all__ = ['celery_app']

Celery запускается в каталоге myproj командой:

celery -A myproj.local.celery worker -l info

Полная ошибка:

python manage.py shell --settings=myproj.settings.local
(InteractiveConsole)
>>> from app1.tasks import test
>>> test.delay()
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File env/lib/python2.7/site-packages/celery/app/task.py", line 413, in delay
return self.apply_async(args, kwargs)
  File env/lib/python2.7/site-packages/celery/app/task.py", line 536, in apply_async
**options
  File env/lib/python2.7/site-packages/celery/app/base.py", line 717, in send_task
amqp.send_task_message(P, name, message, **options)
  File env/lib/python2.7/site-packages/celery/app/amqp.py", line 554, in send_task_message
**properties
  File env/lib/python2.7/site-packages/kombu/messaging.py", line 178, in publish
exchange_name, declare,
 File env/lib/python2.7/site-packages/kombu/connection.py", line 527, in _ensured
errback and errback(exc, 0)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py", line 35, in __exit__
self.gen.throw(type, value, traceback)
  File env/lib/python2.7/site-packages/kombu/connection.py", line 419, in _reraise_as_library_errors
sys.exc_info()[2])
  File env/lib/python2.7/site-packages/kombu/connection.py", line 414, in _reraise_as_library_errors
yield
  File env/lib/python2.7/site-packages/kombu/connection.py", line 515, in _ensured
reraise_as_library_errors=False,
  File env/lib/python2.7/site-packages/kombu/connection.py", line 405, in ensure_connection
callback)
  File env/lib/python2.7/site-packages/kombu/utils/functional.py", line 333, in retry_over_time
return fun(*args, **kwargs)
  File env/lib/python2.7/site-packages/kombu/connection.py", line 261, in connect
return self.connection
  File env/lib/python2.7/site-packages/kombu/connection.py", line 802, in connection
self._connection = self._establish_connection()
  File env/lib/python2.7/site-packages/kombu/connection.py", line 757, in _establish_connection
conn = self.transport.establish_connection()
  File env/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 130, in establish_connection
conn.connect()
  File env/lib/python2.7/site-packages/amqp/connection.py", line 294, in connect
self.transport.connect()
  File env/lib/python2.7/site-packages/amqp/transport.py", line 103, in connect
self._connect(self.host, self.port, self.connect_timeout)
  File env/lib/python2.7/site-packages/amqp/transport.py", line 144, in _connect
self.sock.connect(sa)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
OperationalError: [Errno 61] Connection refused

person Toon Gijbels    schedule 26.11.2016    source источник


Ответы (2)


Ага! Я понял! У меня была такая же проблема. В файле myproj/app1/tasks.py:

from __future__ import absolute_import
from celery import task
app = Celery('myproj_local')
app.config_from_object('django.conf:settings')
@app.task(name='random_name')
def test():
    print('this is a test')

Затем сельдерей будет инициализирован в «приложении», а конфиг будет загружен в «приложение», и ваша задача будет отложена.

Но это будет работать, только если ваш конфиг действителен.

person Владимир Закотнев    schedule 07.12.2016
comment
Спасибо, я тоже не смотрел на очевидное! :) - person Bunny Rabbit; 25.12.2016

Вам нужно установить BROKER_URL так, чтобы он указывал на REDIS.

Если у вас есть только одна очередь, убедитесь, что ваш воркер подключен к очереди по умолчанию.

Если в настройках вы указали очередь по умолчанию, вам необходимо настроить воркера на получение задачи, как показано ниже:

celery -A myproj.local.celery worker -l info -Q queue_name

person kkiat    schedule 26.11.2016
comment
BROKER_URL задан правильно и есть только одна очередь. Моя установка работает, когда я перемещаю файл celery.py из его подкаталога. - person Toon Gijbels; 26.11.2016