Connection Adapters¶
Pika uses connection adapters to provide a flexible method for adapting pika’s
core communication to different IOLoop implementations. In addition to asynchronous adapters, there is the BlockingConnection adapter that provides a more idiomatic procedural approach to using Pika.
Adapters¶
Requesting message acknowledgements from another thread¶
The single-threaded usage constraint of an individual Pika connection adapter instance may result in a dropped AMQP/stream connection due to AMQP heartbeat timeout in consumers that take a long time to process an incoming message. A common solution is to delegate processing of the incoming messages to another thread, while the connection adapter’s thread continues to service its I/O loop’s message pump, permitting AMQP heartbeats and other I/O to be serviced in a timely fashion.
Messages processed in another thread may not be acknowledged directly from that thread, since all accesses to the connection adapter instance must be from a single thread, which is the thread running the adapter’s I/O loop. This is accomplished by requesting a callback to be executed in the adapter’s I/O loop thread. For example, the callback function’s implementation might look like this:
def ack_message(channel, delivery_tag):
"""Note that `channel` must be the same Pika channel instance via which
the message being acknowledged was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't acknowledge this message;
# log and/or do something that makes sense for your app in this case.
pass
The code running in the other thread may request the ack_message() function
to be executed in the connection adapter’s I/O loop thread using an
adapter-specific mechanism:
pika.BlockingConnectionabstracts its I/O loop from the application and thus exposespika.BlockingConnection.add_callback_threadsafe(). Refer to this method’s docstring for additional information. For example:connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))
Please see the documentation of other adapters for their specific methods.
This threadsafe callback request mechanism may also be used to delegate publishing of messages, etc., from a background thread to the connection adapter’s thread.
Connection recovery¶
Pika requires connection recovery to be performed by the application code and strives to make it a straightforward process.
Different connection adapters take different approaches to connection recovery.
For pika.BlockingConnection adapter exception handling can be used to check
for connection errors. Here is a very basic example:
import pika
while True:
try:
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message_callback)
channel.start_consuming()
# Don't recover if connection was closed by broker
except pika.exceptions.ConnectionClosedByBroker:
break
# Don't recover on channel errors
except pika.exceptions.AMQPChannelError:
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
continue
A similar example can be found in examples/blocking_consume_recover_multiple_hosts.py.
Generic operation retry libraries such as retry can be used. Decorators make it possible to configure some additional recovery behaviours, like delays between retries and limiting the number of retries:
from retry import retry
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message_callback)
try:
channel.start_consuming()
# Don't recover connections closed by server
except pika.exceptions.ConnectionClosedByBroker:
pass
consume()
For asynchronous adapters, use on_close_callback to react to connection
failure events. This callback can be used to clean up and recover the
connection.
An example of recovery using on_close_callback can be found in
examples/asynchronous_consumer_example.py.