Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

pykafka producer deadlock on nosetests' integration tests #1004

Open
hoathenguyen85 opened this issue May 18, 2020 · 0 comments
Open

pykafka producer deadlock on nosetests' integration tests #1004

hoathenguyen85 opened this issue May 18, 2020 · 0 comments

Comments

@hoathenguyen85
Copy link

I posted a question at https://stackoverflow.com/questions/61800654/pykafka-deadlock-on-nosetests-integration-tests. This only happens on python3.6. We are using pykafka 2.7.0 on kafka 1.0.0.

When running nosetest that goes through following code:

   def publish(self, topic, message):
       topic = topic.lower()
       self._log.info('publish top topic ' + topic)

       if topic not in self.producer_map:
           k_topic = self.__messenger.topics[topic.encode()]
           self._log.info(k_topic)
           new_producer = k_topic.get_producer()
           self.producer_map[topic] = new_producer

       self.producer_map[topic].produce(message)

when the tests finishes, it'lll hang caused by these producer trying to stop. I use gdb to look at where it hangs and it shows this.

#9 Frame 0x7f4824fe7958, for file /usr/lib64/python3.6/threading.py, line 1072, in _wait_for_tstate_lock (self=<Thread(_target=<function at remote 0x7f4824ef4ae8>, _name='3: pykafka.OwnedBroker.queue_reader for broker 0', _args=(), _kwargs={}, _daemonic=True, _ident=139948465846016, _tstate_lock=<_thread.lock at remote 0x7f4824de78a0>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7670>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7670>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7670>, _waiters=<collections.deque at remote 0x7f4824ddc2b8>) at remote 0x7f4824de3128>, _flag=True) at remote 0x7f4824de31d0>, _is_stopped=False, _initialized=True, _stderr=<redacted>, <redacted>, <_io.TextIOWrapper at remote 0x7f48549b1708>)) at remote 0x7f48275474a8>)) at remote 0x7f4827547518>)) at remote 0x7f48275476d8>) at remote 0x7f4824de3160>, block=True, timeout=-1, lock=<_thread.lock at remote 0x7f4824de78a0>)
    elif lock.acquire(block, timeout):
#13 Frame 0x7f4824e045a0, for file /usr/lib64/python3.6/threading.py, line 1056, in join (self=<Thread(_target=<function at remote 0x7f4824ef4ae8>, _name='3: pykafka.OwnedBroker.queue_reader for broker 0', _args=(), _kwargs={}, _daemonic=True, _ident=139948465846016, _tstate_lock=<_thread.lock at remote 0x7f4824de78a0>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7670>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de76---Type <return> to continue, or q <return> to quit---
70>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7670>, _waiters=<collections.deque at remote 0x7f4824ddc2b8>) at remote 0x7f4824de3128>, _flag=True) at remote 0x7f4824de31d0>, _is_stopped=False, _initialized=True, _stderr=<redacted>, <redacted>, <redacted>, <redacted>)) at remote 0x7f4827547518>)) at remote 0x7f48275476d8>) at remote 0x7f4824de3160>, timeout=None)
    self._wait_for_tstate_lock()
#17 (frame information optimized out)
#21 Frame 0x7f484d8bee08, for file /usr/local/lib/python3.6/site-packages/pykafka/producer.py, line 235, in __del__ (self=<Producer(_cluster=<Cluster(_seed_hosts='<redacted>:9092', _socket_timeout_ms=30000, _offsets_channel_socket_timeout_ms=10000, _handler=<ThreadingHandler at remote 0x7f4824e81da0>, _brokers={0: <Broker(_connection=<BrokerConnection(_buff=<bytearray at remote 0x7f4824fdb228>, host=b'<redacted>', port=9092, _handler=<...>, _socket=<socket at remote 0x7f4824dff048>, source_host='', source_port=0, _wrap_socket=<function at remote 0x7f4824dfba60>) at remote 0x7f4824fd5898>, _offsets_channel_connection=None, _id=0, _host=b'<redacted>', _port=9092, _source_host='', _source_port=0, _ssl_config=None, _handler=<...>, _req_handler=<RequestHandler(handler=<...>, shared=<Shared at remote 0x7f4824dd0048>, t=<Thread(_target=<function at remote 0x7f4824dfbae8>, _name="1: pykafka.RequestHandler.worker for b'<redacted>':9092", _args=(), _kwargs={}, _daemonic=True, _ident=139948381951744, _tstate_lock=<_thread.lock at remote 0x7f4824de7120>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de71e8>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de71e8>, release=<built-in method release of _thread.lock object at remote 0x7f4824de71e8>, _waiters=<collections.deque at remote 0x7f4824e079a0>) at remote 0x7f4835420198>, _flag=True) at remote 0x7f4835424358>, _is_stopped=False, _initialized=True, _stderr=<redacted>, <redacted>, <redacted>, <_io.TextIOWrapper at remote 0x7f48549b1708>)) at remote 0x7f48275474a8>)) at remote 0x7f4827547518>)) at remote 0x7f48275476d8>) at remote 0x7f4835424940>) at remote 0x7f4824f62438>, _offsets_channel_req_handler=None, _socket_timeout_ms=30000, _offsets_channel_socket_timeout_ms=10000, _buffer_size=1048576, _req_handlers={}, _broker_version='0.9.0', _api_versions={0: <ApiVersionsSpec at remote 0x7f4824e88408>, 1: <ApiVersionsSpec at remote 0x7f4824e88458>, 2: <ApiVersionsSpec at remote 0x7f4824e884a8>, 3: <ApiVersionsSpec at remote 0x7f4824e884f8>, 4: <ApiVersionsSpec at remote 0x7f4824e88548>, 5: <ApiVersionsSpec at remote 0x7f4824e88598>, 6: <ApiVersionsSpec at remote 0x7f4824e885e8>, 7: <ApiVersionsSpec at remote 0x7f4824e88638>, 8: <ApiVersionsSpec at remote 0x7f4824e88688>, 9: <ApiVersionsSpec at remote 0x7f4824e886d8>, 10: <ApiVersionsSpec at remote 0x7f4824e88728>, 11: <ApiVersionsSpec at remote 0x7f4824e88778>, 12: <ApiVersionsSpec at remote 0x7f4824e887c8>, 13: <ApiVersionsSpec at remote 0x7f4824e88818>, 14: <ApiVersionsSpec at remote 0x7f4824e88868>, 15: <ApiVersionsSpec at remote 0x7f4824e888b8>, 16: <ApiVersionsSpec at remote 0x7f4824e88908>}) at remote 0x7f482500f4e0>}, _topics=<TopicDict(_cluster=<weakref at remote 0x7f4824f9c4f8>, _exclude_internal_topics=True) at remote 0x7f4824fcd888>, _source_address='', _source_host='', _source_port=0, _ssl_config=None, _zookeeper_connect=None, _max_connection_retries=3, _max_connection_retries_offset_mgr=8, _broker_version='0.9.0', _api_versions={...}, controller_broker=None) at remote 0x7f4824ed8198>, _protocol_version=0, _topic=<Topic(_name=b'<redacted>', _cluster=<...>, _partitions={0: <Partition(_id=0, _leader=<...>, _replicas=[<...>], _isr=[<...>], _topic=<weakref at remote 0x7f4824e0cdb8>) at remote 0x7f48353ffbe0>}) at remote 0x7f4824deedd8>, _partitioner=<RandomPartitioner(idx=0) at remote 0x7f48526baba8>, _compression=0, _max_retries=3, _retry_backoff_ms=100, _required_acks=1, _ack_timeout_ms=10000, _max_queued_messages=100000, _min_queued_messages=70000, _linger_ms=5000, _queue_empty_timeout_ms=0, _block_on_queue_full=True, _max_request_size=1000012, _synchronous=False, _worker_exception=None, _owned_brokers={0: <OwnedBroker(producer=<weakproxy at remote 0x7f4824e0ce08>, broker=<...>, lock=<_thread.RLock at remote 0x7f4824fd8420>, flush_ready=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7698>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7698>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7698>, _waiters=<collections.deque at remote 0x7f4824ddc118>) at remote 0x7f4824de3320>, _flag=False) at remote 0x7f4824de33c8>, has_message=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7710>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7710>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7710>, _waiters=<collections.deque at remote 0x7f4824ddc0b0>) at remote 0x7f4824de3240>, _flag=True) at remote 0x7f4824de3390>, slot_available=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7788>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7788>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7788>, _waiters=<collections.deque at remote 0x7f4824ddc180>) at remote 0x7f4824de3208>, _flag=True) at remote 0x7f4824de3278>, queue=<collections.deque at remote 0x7f4824ddc1e8>, messages_pending=2, running=False, _auto_start=True, _queue_reader_worker=<Thread(_target=<function at remote 0x7f4824ef4ae8>, _name='3: pykafka.OwnedBroker.queue_reader for broker 0', _args=(...), _kwargs={}, _daemonic=True, _ident=139948465846016, _tstate_lock=<_thread.lock at remote 0x7f4824de78a0>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7670>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7670>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7670>, _waiters=<collections.deque at remote 0x7f4824ddc2b8>) at remote 0x7f4824de3128>, _flag=True) at remote 0x7f4824de31d0>, _is_stopped=False, _initialized=True, _stderr=<...>) at remote 0x7f4824de3160>) at remote 0x7f4824e81cc0>}, _delivery_reports=<_DeliveryReportNone(queue=None) at remote 0x7f4824e814a8>, _pending_timeout_ms=5000, _auto_start=True, _serializer=None, _running=True, _update_lock=<_thread.lock at remote 0x7f4824de7620>) at remote 0x7f4824fdb208>)
    self.stop()
#30 Garbage-collecting

What I do to fix this is to stop the producer right after it produces a message, which is not ideal as it'll waste time when sending lots of messages.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant