"""Use pika with the Asyncio EventLoop"""
from __future__ import annotations
import asyncio
import logging
import sys
from typing import Any, Awaitable, Callable, Optional, TYPE_CHECKING, Sequence, Union
from pika.adapters import base_connection
from pika.adapters.utils import connection_workflow, nbio_interface, io_services_utils
if TYPE_CHECKING:
from pika import connection
LOGGER = logging.getLogger(__name__)
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
[docs]
class AsyncioConnection(base_connection.BaseConnection):
""" The AsyncioConnection runs on the Asyncio EventLoop.
"""
def __init__(self,
parameters: Optional[connection.Parameters] = None,
on_open_callback: Optional[Callable[[connection.Connection],
None]] = None,
on_open_error_callback: Optional[Callable[
[connection.Connection, BaseException], None]] = None,
on_close_callback: Optional[Callable[
[connection.Connection, BaseException], None]] = None,
custom_ioloop: Optional[
Union[asyncio.AbstractEventLoop,
nbio_interface.AbstractIOServices]] = None,
internal_connection_workflow: bool = True) -> None:
""" Create a new instance of the AsyncioConnection class, connecting
to RabbitMQ automatically
:param pika.connection.Parameters parameters: Connection parameters
:param callable on_open_callback: The method to call when the connection
is open
:param None | method on_open_error_callback: Called if the connection
can't be established or connection establishment is interrupted by
`Connection.close()`: on_open_error_callback(Connection, exception).
:param None | method on_close_callback: Called when a previously fully
open connection is closed:
`on_close_callback(Connection, exception)`, where `exception` is
either an instance of `exceptions.ConnectionClosed` if closed by
user or broker or exception of another type that describes the cause
of connection failure.
:param None | asyncio.AbstractEventLoop |
nbio_interface.AbstractIOServices custom_ioloop:
Defaults to the running event loop, or a new event loop when
none is running.
:param bool internal_connection_workflow: True for autonomous connection
establishment which is default; False for externally-managed
connection workflow via the `create_connection()` factory.
"""
if isinstance(custom_ioloop, nbio_interface.AbstractIOServices):
nbio = custom_ioloop
else:
nbio = _AsyncioIOServicesAdapter(custom_ioloop)
super().__init__(
parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
nbio,
internal_connection_workflow=internal_connection_workflow)
[docs]
@classmethod
def create_connection(
cls,
connection_configs: Sequence[connection.Parameters],
on_done: Callable[[
Union[connection.Connection,
connection_workflow.AMQPConnectorException]
], None],
custom_ioloop: Optional[asyncio.AbstractEventLoop] = None,
workflow: Optional[
connection_workflow.AbstractAMQPConnectionWorkflow] = None
) -> connection_workflow.AbstractAMQPConnectionWorkflow:
"""Implement
:py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
"""
nbio = _AsyncioIOServicesAdapter(custom_ioloop)
def connection_factory(params):
"""Connection factory."""
if params is None:
raise ValueError('Expected pika.connection.Parameters '
'instance, but got None in params arg.')
return cls(parameters=params,
custom_ioloop=nbio,
internal_connection_workflow=False)
return cls._start_connection_workflow(
connection_configs=connection_configs,
connection_factory=connection_factory,
nbio=nbio,
workflow=workflow,
on_done=on_done)
class _AsyncioIOServicesAdapter(io_services_utils.SocketConnectionMixin,
io_services_utils.StreamingConnectionMixin,
nbio_interface.AbstractIOServices,
nbio_interface.AbstractFileDescriptorServices):
"""Implements
:py:class:`.utils.nbio_interface.AbstractIOServices` interface
on top of `asyncio`.
NOTE:
:py:class:`.utils.nbio_interface.AbstractFileDescriptorServices`
interface is only required by the mixins.
"""
def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None):
"""
:param asyncio.AbstractEventLoop | None loop: If None, uses the
running event loop via asyncio.get_running_loop(), or creates
a new one via asyncio.new_event_loop() when no loop is running
(e.g. when called from a non-async thread).
"""
if loop is None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
self._loop = loop
def get_native_ioloop(self) -> asyncio.AbstractEventLoop:
"""Implement
:py:meth:`.utils.nbio_interface.AbstractIOServices.get_native_ioloop()`.
"""
return self._loop
def close(self) -> None:
"""Implement
:py:meth:`.utils.nbio_interface.AbstractIOServices.close()`.
"""
self._loop.close()
def run(self) -> None:
"""Implement :py:meth:`.utils.nbio_interface.AbstractIOServices.run()`.
"""
self._loop.run_forever()
def stop(self) -> None:
"""Implement :py:meth:`.utils.nbio_interface.AbstractIOServices.stop()`.
"""
self._loop.stop()
def add_callback_threadsafe(self, callback: Callable[[], None]) -> None:
"""Implement
:py:meth:`.utils.nbio_interface.AbstractIOServices.add_callback_threadsafe()`.
"""
self._loop.call_soon_threadsafe(callback)
def call_later(self, delay: float,
callback: Callable[[], None]) -> _TimerHandle:
"""Implement
:py:meth:`.utils.nbio_interface.AbstractIOServices.call_later()`.
"""
return _TimerHandle(self._loop.call_later(delay, callback))
def getaddrinfo(self,
host: str,
port: int,
on_done: Callable[..., None],
family: int = 0,
socktype: int = 0,
proto: int = 0,
flags: int = 0) -> nbio_interface.AbstractIOReference:
"""Implement
:py:meth:`.utils.nbio_interface.AbstractIOServices.getaddrinfo()`.
"""
return self._schedule_and_wrap_in_io_ref(
self._loop.getaddrinfo(host,
port,
family=family,
type=socktype,
proto=proto,
flags=flags), on_done)
def set_reader(self, fd: int, on_readable: Callable[[], None]) -> None:
"""Implement
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_reader()`.
"""
self._loop.add_reader(fd, on_readable)
LOGGER.debug('set_reader(%s, _)', fd)
def remove_reader(self, fd: int) -> bool:
"""Implement
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_reader()`.
"""
LOGGER.debug('remove_reader(%s)', fd)
return self._loop.remove_reader(fd)
def set_writer(self, fd: int, on_writable: Callable[[], None]) -> None:
"""Implement
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_writer()`.
"""
self._loop.add_writer(fd, on_writable)
LOGGER.debug('set_writer(%s, _)', fd)
def remove_writer(self, fd: int) -> bool:
"""Implement
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_writer()`.
"""
LOGGER.debug('remove_writer(%s)', fd)
return self._loop.remove_writer(fd)
def _schedule_and_wrap_in_io_ref(
self, coro: Awaitable[Any], on_done: Callable[
[Union[base_connection.BaseConnection, BaseException]], None]
) -> _AsyncioIOReference:
"""Schedule the coroutine to run and return _AsyncioIOReference
:param coroutine-obj coro:
:param callable on_done: user callback that takes the completion result
or exception as its only arg. It will not be called if the operation
was cancelled.
:rtype: _AsyncioIOReference which is derived from
nbio_interface.AbstractIOReference
"""
if not callable(on_done):
raise TypeError(
f'on_done arg must be callable, but got {on_done!r}')
return _AsyncioIOReference(asyncio.ensure_future(coro, loop=self._loop),
on_done)
class _TimerHandle(nbio_interface.AbstractTimerReference):
"""This module's adaptation of `nbio_interface.AbstractTimerReference`.
"""
def __init__(self, handle: asyncio.Handle) -> None:
"""
:param asyncio.Handle handle:
"""
self._handle: Optional[asyncio.Handle] = handle
def cancel(self) -> None:
if self._handle is not None:
self._handle.cancel()
self._handle = None
class _AsyncioIOReference(nbio_interface.AbstractIOReference):
"""This module's adaptation of `nbio_interface.AbstractIOReference`.
"""
def __init__(
self, future: asyncio.Future, on_done: Callable[
[Union[base_connection.BaseConnection, BaseException]], None]
) -> None:
"""
:param asyncio.Future future:
:param callable on_done: user callback that takes the completion result
or exception as its only arg. It will not be called if the operation
was cancelled.
"""
if not callable(on_done):
raise TypeError(
f'on_done arg must be callable, but got {on_done!r}')
self._future = future
def on_done_adapter(future: asyncio.Future) -> None:
"""Handle completion callback from the future instance"""
# NOTE: Asyncio schedules callback for cancelled futures, but pika
# doesn't want that
if not future.cancelled():
on_done(future.exception() or future.result())
future.add_done_callback(on_done_adapter)
def cancel(self) -> bool:
"""Cancel pending operation
:returns: False if was already done or cancelled; True otherwise
:rtype: bool
"""
return self._future.cancel()