Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker crashed with invalid file descriptor error when use with celery and gevent #355

Open
justlovediaodiao opened this issue Jan 6, 2022 · 1 comment

Comments

@justlovediaodiao
Copy link

justlovediaodiao commented Jan 6, 2022

Environment

os: Ubuntu 20.04 LTS
broker: redis 4.0.9
python: 3.8
celery: 5.1.2
billiard: 3.6.4.0
gevent: 21.1.2

Celery startup config

supervisor

[program:celery_worker_email]
environment = C_FORCE_ROOT="true"
command = /var/www/backend/venv/bin/celery -A run_celery.celery worker -Q "email" -c 20 -n email@%%h
directory = /var/www/backend/
stdout_logfile = /var/log/backend/worker_email.log
redirect_stderr = true
stopasgroup = true
stopwaitsecs = 30

run_celery.py

import gevent.monkey; gevent.monkey.patch_all()

from app import create_app, celery
app = create_app()

Log

[2022-01-05 01:20:00,086: ERROR/ForkPoolWorker-21] Thread 'ResultHandler' crashed: ValueError('invalid file descriptor 20')
Traceback (most recent call last):
  File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 504, in run
    return self.body()
  File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 899, in body
    for _ in self._process_result(1.0):  # blocking
  File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 864, in _process_result
    ready, task = poll(timeout)
  File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 1370, in _poll_result
    if self._outqueue._reader.poll(timeout):
  File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 292, in poll
    return self._poll(timeout)
  File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 470, in _poll
    r = wait([self], timeout)
  File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 1003, in wait
    return _poll(object_list, timeout)
  File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 983, in _poll
    raise ValueError('invalid file descriptor %i' % fd)
ValueError: invalid file descriptor 20

[2022-01-05 01:20:00,550: ERROR/MainProcess] Process 'ForkPoolWorker-21' pid:155045 exited with 'exitcode 1'

[2022-01-05 01:20:10,978: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: exitcode 1 Job: 74.')
Traceback (most recent call last):
  File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 1265, in mark_as_worker_lost
    raise WorkerLostError(
billiard.exceptions.WorkerLostError: Worker exited prematurely: exitcode 1 Job: 74.

How to reproduce

  1. Use above configs to start celery. (gevent.monkey.patch_all() must be used)
  2. Use kill <pid> to kill some worker processes. Main process will create new worker processes to replace the killed worker processes.
  3. execute xxx_task.delay() in python shell to send some tasks.
  4. once any newly created worker processes receive a task, it will trigger invalid file descriptor error and crash.

I’m not sure if the above steps will definitely reproduce the issue, but I’ve reproduced it several times.

Analysis

ResultHandler is used by main process to read result of worker processes. It should be running at main process, but worker processes executed it.

billiard/pool.py:

class Pool(object):

    def __init__(self, ...):
        # ...

        self._setup_queues()

        # ...

        for i in range(self._processes):
            self._create_worker_process(i)

        # ...

        # Thread processing results in the outqueue.
        self._result_handler = self.create_result_handler()
        self.handle_result_event = self._result_handler.handle_event

        if threads:
            self._result_handler.start()
    
    # ...

    def create_result_handler(self, **extra_kwargs):
        return self.ResultHandler(
            self._outqueue, self._quick_get, self._cache,
            self._poll_result, self._join_exited_workers,
            self._putlock, self.restart_state, self.check_timeouts,
            self.on_job_ready, on_ready_counters=self._on_ready_counters,
            **extra_kwargs
        )
    
    # ...

    def _setup_queues(self):
        self._inqueue = self._ctx.SimpleQueue()
        self._outqueue = self._ctx.SimpleQueue()
        self._quick_put = self._inqueue._writer.send
        self._quick_get = self._outqueue._reader.recv

        def _poll_result(timeout):
            if self._outqueue._reader.poll(timeout):
                return True, self._quick_get()
            return False, None
        self._poll_result = _poll_result
    
    # ...

    def get_process_queues(self):
        return self._inqueue, self._outqueue, None

    def _create_worker_process(self, i):
        sentinel = self._ctx.Event() if self.allow_restart else None
        inq, outq, synq = self.get_process_queues()
        on_ready_counter = self._ctx.Value('i')
        w = self.WorkerProcess(self.Worker(
            inq, outq, synq, self._initializer, self._initargs,
            self._maxtasksperchild, sentinel, self._on_process_exit,
            # Need to handle all signals if using the ipc semaphore,
            # to make sure the semaphore is released.
            sigprotection=self.threads,
            wrap_exception=self._wrap_exception,
            max_memory_per_child=self._max_memory_per_child,
            on_ready_counter=on_ready_counter,
        ))
        self._pool.append(w)

        # ...

# ...

class Worker(object):

    def after_fork(self):
        if hasattr(self.inq, '_writer'):
            self.inq._writer.close()
        if hasattr(self.outq, '_reader'):
            self.outq._reader.close()
        # ...

As above code shows, on startup, main process creates _outqueue and fork to create worker processes. After fork, worker processes close _outqueue._reader.
Then main process starts a thread to run ResultHandler. ResultHandler executes _poll_result of _outqueue._reader.

If a worker process is crashed, main process will fork a new worker process to replace it.

As fork(2) - Linux man page says, after fork, child process will only keep the current thread, and other threads disappear.

So there are no ResultHandler threads in newly worker processes.

The child process is created with a single thread--the one that called fork(). The entire virtual address space of the parent is replicated in the child, including the states of mutexes, condition variables, and other pthreads objects; the use of pthread_atfork(3) may be helpful for dealing with problems that this can cause.

When use with gevent.monkey.patch_all(), thread will be replaced with coroutine (it should be, not sure). ResultHandler is not runing in a new thread.
Fork will copy _poll_result calls. Worker process is running _poll_result of _outqueue._reader, and close _outqueue._reader.
Reading a closed file descriptor is invalid, _poll_result raises an invalid file descriptor exception.

billiard/connection.py:

if hasattr(select, 'poll'):
    def _poll(fds, timeout):
        if timeout is not None:
            timeout = int(timeout * 1000)  # timeout is in milliseconds
        fd_map = {}
        pollster = select.poll()
        for fd in fds:
            pollster.register(fd, select.POLLIN)
            if hasattr(fd, 'fileno'):
                fd_map[fd.fileno()] = fd
            else:
                fd_map[fd] = fd
        ls = []
        for fd, event in pollster.poll(timeout):
            if event & select.POLLNVAL:
                raise ValueError('invalid file descriptor %i' % fd)
            ls.append(fd_map[fd])
        return ls
@rikthevik
Copy link

Thanks for digging this out. I had the same issue on celery==4.1.1

This was breaking my --pool=prefork workers when I used --max-tasks-per-child. When I removed gevent monkey patching from my celery entry point, the prefork workers restarted correctly.

Good luck!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants