Source code for pika.adapters.tornado_connection
"""Use pika with the Tornado IOLoop
"""
from __future__ import annotations
import logging
from typing import Any, Callable, Optional, Sequence, Union, TYPE_CHECKING
from tornado import ioloop
from pika.adapters.utils import nbio_interface, selector_ioloop_adapter
from pika.adapters import base_connection
if TYPE_CHECKING:
from pika import connection
from pika.adapters.utils import connection_workflow
LOGGER = logging.getLogger(__name__)
[docs]
class TornadoConnection(base_connection.BaseConnection):
"""The TornadoConnection runs on the Tornado IOLoop.
"""
def __init__(self,
parameters: Optional[connection.Parameters] = None,
on_open_callback: Optional[Callable[[connection.Connection],
None]] = None,
on_open_error_callback: Optional[Callable[
[connection.Connection, BaseException], None]] = None,
on_close_callback: Optional[Callable[
[connection.Connection, BaseException], None]] = None,
custom_ioloop: Optional[Union[
ioloop.IOLoop, nbio_interface.AbstractIOServices]] = None,
internal_connection_workflow: bool = True):
"""Create a new instance of the TornadoConnection class, connecting
to RabbitMQ automatically.
:param pika.connection.Parameters|None parameters: The connection
parameters
:param callable|None on_open_callback: The method to call when the
connection is open
:param callable|None on_open_error_callback: Called if the connection
can't be established or connection establishment is interrupted by
`Connection.close()`:
on_open_error_callback(Connection, exception)
:param callable|None on_close_callback: Called when a previously fully
open connection is closed:
`on_close_callback(Connection, exception)`, where `exception` is
either an instance of `exceptions.ConnectionClosed` if closed by
user or broker or exception of another type that describes the
cause of connection failure
:param ioloop.IOLoop|nbio_interface.AbstractIOServices|None custom_ioloop:
Override using the global IOLoop in Tornado
:param bool internal_connection_workflow: True for autonomous connection
establishment which is default; False for externally-managed
connection workflow via the `create_connection()` factory
"""
if isinstance(custom_ioloop, nbio_interface.AbstractIOServices):
nbio = custom_ioloop
else:
nbio = (selector_ioloop_adapter.SelectorIOServicesAdapter(
custom_ioloop or ioloop.IOLoop.instance())) # type: ignore
super().__init__(
parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
nbio,
internal_connection_workflow=internal_connection_workflow)
[docs]
@classmethod
def create_connection(
cls,
connection_configs: Sequence[connection.Parameters],
on_done: Callable[[
Union[connection.Connection,
connection_workflow.AMQPConnectorException]
], None],
custom_ioloop: Optional[Any] = None,
workflow: Optional[
connection_workflow.AbstractAMQPConnectionWorkflow] = None
) -> connection_workflow.AbstractAMQPConnectionWorkflow:
"""Implement
:py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
"""
nbio = selector_ioloop_adapter.SelectorIOServicesAdapter(
custom_ioloop or ioloop.IOLoop.instance()) # type: ignore
def connection_factory(
params: Optional[connection.Parameters]) -> 'TornadoConnection':
"""Connection factory."""
if params is None:
raise ValueError('Expected pika.connection.Parameters '
'instance, but got None in params arg.')
return cls(parameters=params,
custom_ioloop=nbio,
internal_connection_workflow=False)
return cls._start_connection_workflow(
connection_configs=connection_configs,
connection_factory=connection_factory,
nbio=nbio,
workflow=workflow,
on_done=on_done)