Snippets Python / Asyncio Base Event Loop

Asyncio Base Event Loop

By Marcelo Fernandes Sep 10, 2017

Event Loop

The event loop is the central execution device, it provides multiple facilities:

  • Registering, executing and cancelling delayed calls (timeouts).
  • Creating client and server transports for various kinds of communication.
  • Launching subprocesses and the associated transports for communication with an external program
  • Delegating costly function calls to a pool of threads.

Click here if you want to skip to the examples


Shortcuts:



class ayncio.AbstractEventLoop

This is the abstract base class of event loops


class AbstractEventLoop:
    """Abstract event loop."""

    # Running and stopping the event loop.

    def run_forever(self):
        """Run the event loop until stop() is called."""
        raise NotImplementedError

    def run_until_complete(self, future):
        """Run the event loop until a Future is done.
        Return the Future's result, or raise its exception.
        """
        raise NotImplementedError

    def stop(self):
        """Stop the event loop as soon as reasonable.
        Exactly how soon that is may depend on the implementation, but
        no more I/O callbacks should be scheduled.
        """
        raise NotImplementedError

    def is_running(self):
        """Return whether the event loop is currently running."""
        raise NotImplementedError

    def is_closed(self):
        """Returns True if the event loop was closed."""
        raise NotImplementedError

    def close(self):
        """Close the loop.
        The loop should not be running.
        This is idempotent and irreversible.
        No other methods should be called after this one.
        """
        raise NotImplementedError

    def shutdown_asyncgens(self):
        """Shutdown all active asynchronous generators."""
        raise NotImplementedError

    # Methods scheduling callbacks.  All these return Handles.

    def _timer_handle_cancelled(self, handle):
        """Notification that a TimerHandle has been cancelled."""
        raise NotImplementedError

    def call_soon(self, callback, *args):
        return self.call_later(0, callback, *args)

    def call_later(self, delay, callback, *args):
        raise NotImplementedError

    def call_at(self, when, callback, *args):
        raise NotImplementedError

    def time(self):
        raise NotImplementedError

    def create_future(self):
        raise NotImplementedError

    # Method scheduling a coroutine object: create a task.

    def create_task(self, coro):
        raise NotImplementedError

    # Methods for interacting with threads.

    def call_soon_threadsafe(self, callback, *args):
        raise NotImplementedError

    def run_in_executor(self, executor, func, *args):
        raise NotImplementedError

    def set_default_executor(self, executor):
        raise NotImplementedError

    # Network I/O methods returning Futures.

    def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
        raise NotImplementedError

    def getnameinfo(self, sockaddr, flags=0):
        raise NotImplementedError

    def create_connection(self, protocol_factory, host=None, port=None, *,
                          ssl=None, family=0, proto=0, flags=0, sock=None,
                          local_addr=None, server_hostname=None):
        raise NotImplementedError

    def create_server(self, protocol_factory, host=None, port=None, *,
                      family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
                      sock=None, backlog=100, ssl=None, reuse_address=None,
                      reuse_port=None):
        """A coroutine which creates a TCP server bound to host and port.
        The return value is a Server object which can be used to stop
        the service.
        If host is an empty string or None all interfaces are assumed
        and a list of multiple sockets will be returned (most likely
        one for IPv4 and another one for IPv6). The host parameter can also be a
        sequence (e.g. list) of hosts to bind to.
        family can be set to either AF_INET or AF_INET6 to force the
        socket to use IPv4 or IPv6. If not set it will be determined
        from host (defaults to AF_UNSPEC).
        flags is a bitmask for getaddrinfo().
        sock can optionally be specified in order to use a preexisting
        socket object.
        backlog is the maximum number of queued connections passed to
        listen() (defaults to 100).
        ssl can be set to an SSLContext to enable SSL over the
        accepted connections.
        reuse_address tells the kernel to reuse a local socket in
        TIME_WAIT state, without waiting for its natural timeout to
        expire. If not specified will automatically be set to True on
        UNIX.
        reuse_port tells the kernel to allow this endpoint to be bound to
        the same port as other existing endpoints are bound to, so long as
        they all set this flag when being created. This option is not
        supported on Windows.
        """
        raise NotImplementedError

    def create_unix_connection(self, protocol_factory, path, *,
                               ssl=None, sock=None,
                               server_hostname=None):
        raise NotImplementedError

    def create_unix_server(self, protocol_factory, path, *,
                           sock=None, backlog=100, ssl=None):
        """A coroutine which creates a UNIX Domain Socket server.
        The return value is a Server object, which can be used to stop
        the service.
        path is a str, representing a file systsem path to bind the
        server socket to.
        sock can optionally be specified in order to use a preexisting
        socket object.
        backlog is the maximum number of queued connections passed to
        listen() (defaults to 100).
        ssl can be set to an SSLContext to enable SSL over the
        accepted connections.
        """
        raise NotImplementedError

    def create_datagram_endpoint(self, protocol_factory,
                                 local_addr=None, remote_addr=None, *,
                                 family=0, proto=0, flags=0,
                                 reuse_address=None, reuse_port=None,
                                 allow_broadcast=None, sock=None):
        """A coroutine which creates a datagram endpoint.
        This method will try to establish the endpoint in the background.
        When successful, the coroutine returns a (transport, protocol) pair.
        protocol_factory must be a callable returning a protocol instance.
        socket family AF_INET or socket.AF_INET6 depending on host (or
        family if specified), socket type SOCK_DGRAM.
        reuse_address tells the kernel to reuse a local socket in
        TIME_WAIT state, without waiting for its natural timeout to
        expire. If not specified it will automatically be set to True on
        UNIX.
        reuse_port tells the kernel to allow this endpoint to be bound to
        the same port as other existing endpoints are bound to, so long as
        they all set this flag when being created. This option is not
        supported on Windows and some UNIX's. If the
        :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
        capability is unsupported.
        allow_broadcast tells the kernel to allow this endpoint to send
        messages to the broadcast address.
        sock can optionally be specified in order to use a preexisting
        socket object.
        """
        raise NotImplementedError

    # Pipes and subprocesses.

    def connect_read_pipe(self, protocol_factory, pipe):
        """Register read pipe in event loop. Set the pipe to non-blocking mode.
        protocol_factory should instantiate object with Protocol interface.
        pipe is a file-like object.
        Return pair (transport, protocol), where transport supports the
        ReadTransport interface."""
        # The reason to accept file-like object instead of just file descriptor
        # is: we need to own pipe and close it at transport finishing
        # Can got complicated errors if pass f.fileno(),
        # close fd in pipe transport then close f and vise versa.
        raise NotImplementedError

    def connect_write_pipe(self, protocol_factory, pipe):
        """Register write pipe in event loop.
        protocol_factory should instantiate object with BaseProtocol interface.
        Pipe is file-like object already switched to nonblocking.
        Return pair (transport, protocol), where transport support
        WriteTransport interface."""
        # The reason to accept file-like object instead of just file descriptor
        # is: we need to own pipe and close it at transport finishing
        # Can got complicated errors if pass f.fileno(),
        # close fd in pipe transport then close f and vise versa.
        raise NotImplementedError

    def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                         **kwargs):
        raise NotImplementedError

    def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                        **kwargs):
        raise NotImplementedError

    # Ready-based callback registration methods.
    # The add_*() methods return None.
    # The remove_*() methods return True if something was removed,
    # False if there was nothing to delete.

    def add_reader(self, fd, callback, *args):
        raise NotImplementedError

    def remove_reader(self, fd):
        raise NotImplementedError

    def add_writer(self, fd, callback, *args):
        raise NotImplementedError

    def remove_writer(self, fd):
        raise NotImplementedError

    # Completion based I/O methods returning Futures.

    def sock_recv(self, sock, nbytes):
        raise NotImplementedError

    def sock_sendall(self, sock, data):
        raise NotImplementedError

    def sock_connect(self, sock, address):
        raise NotImplementedError

    def sock_accept(self, sock):
        raise NotImplementedError

    # Signal handling.

    def add_signal_handler(self, sig, callback, *args):
        raise NotImplementedError

    def remove_signal_handler(self, sig):
        raise NotImplementedError

    # Task factory.

    def set_task_factory(self, factory):
        raise NotImplementedError

    def get_task_factory(self):
        raise NotImplementedError

    # Error handlers.

    def get_exception_handler(self):
        raise NotImplementedError

    def set_exception_handler(self, handler):
        raise NotImplementedError

    def default_exception_handler(self, context):
        raise NotImplementedError

    def call_exception_handler(self, context):
        raise NotImplementedError

    # Debug flag management.

    def get_debug(self):
        raise NotImplementedError

    def set_debug(self, enabled):
        raise NotImplementedError

This abstract class will have the contract to all the methods that we are going to write about in the next topics.



Running an event loop


AbstractEventLoop.run_forever()

Run until stop() is called. If stop() is called before run_forever() is called, the I/O selector once with a timeout of zero, runs all callbacks scheduled in response to I/O events (and those that were already scheduled), and then exits. If stop() is called while run_forever() is running, this will run the current batch of callbacks and then exit. Note that callbacks scheduled by callbacks will not run in that case; they will run the next time run_forever() is called


AbstractEventLoop.run_until_complete(future)

Run until the Future is done. If the argument is a coroutine object, it is wrapped by ensure_future(), that schedules the execution of a coroutine object wrapping it in a future and returning a Task object. In the end, it will return the Future's result, or will raise its exception.


AbstractEventLoop.is_running()

Returns running status of event loop.


AbstractEventLoop.stop()

Stop running the event loop. This causes run_forever() to exit at the next suitable opportunity


AbstractEventLoop.is_closed()

Returns True if the event loop was closed.


AbstractEventLoop.close()

Close the event loop. The loop must not be running. Pending callbacks will be lost.
This clears the queues and shuts down the executor, but does not wait for the executor to finish. This is idempotent and irreversible. No other methods should be called after this one.


AbstractEventLoop.shutdown_asyncgens()

Schedule all currently open asynchronous generator objects to close with an aclose() call. After calling this method, the event loop will issue a warning whenever a new asynchronous generator is iterated. Should be used to finalize all scheduled asynchronous generators reliably. Example:


try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()



Calls


Most asyncio functions don't accept keywords.


AbstractEventLoop.call_soon(callback, *args)

Arrange for a callback to be called as soon as possible. The callback is called after call_soon() returns, when control returns to the event loop.
This operates as a FIFO queue, callbacks are called in the order in which they are registered. Each callback will be called exactly once. Any positional arguments after the callback will be passed tot he callback when it is called. An instance of asyncio.Handle is returned, which can be used to cancel the callback.
Use functools.partial to pass keywords to the callback.

AbstractEventLoop.call_soon_threadsafe(callback, *args)

Like call_soon() but thread safe.



Delayed calls


The event loop has its own internal clock for computing timeouts. Which clock is used depends on the (platform-specific) event loop implementation; ideally it is a monotonic clock. This will generally be a different clock than time.time()


AbstractEventLoop.call_later(delay, callback, *args)

Arrange for the callback to be called after the given delay seconds (either int or float)
An instance of asyncio.Handle is returned, which can be used to cancel the callback
callback will be called exactly once per call to call_later(). If two callbacks are scheduled for exactly the same time, it is undefined which will be called first. The optional positional args are gonna be passed to the callback when it is called.
If you want to call with named arguments (keyword arguments), than you should use a closure for functools.partial()

AbstractEventLoop.call_at(when, callback, *args)

Arrange for the callback to be called at the given absolute timestamp when (an int or float), using the same time reference as AbstractEventLoop.time()
This method's behavior is the same as call_later().
An instance of asyncio.Handle is returned, which can be used to cancel the callback.

AbstractEventLoop.time()

Return the current time, as float value, according to the event loop's internal clock.



Futures


AbstractEventLoop.create_future()

Create an asyncio.Future object attached to the loop.
This is a preferred way to create futures in asyncio, as event loop implementations can provide alternative implementations of the Future class (with better performance or instrumentation).



Tasks


AbstractEventLoop.create_task(coro)

Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.
Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.

AbstractEventLoop.set_task_factory(factory)

Set a task factory that will be used by AbstractEventLoop.create_task()
If factory is None the default task factory will be set.
If factory is a callable, it should have a signature matching (loop, coro), where loop will be a reference to the active event loop, coro will be a coroutine object. The callable must return an asyncio.Future compatible object.

AbstractEventLoop.get_task_factory()

Return a task factory, or None if the default one is in use.



Executor


Call a function in an concurrent.futures.Executor concrete class (pool of threads or pool of processes).
By default, an event loop uses a thread pool executor concurrent.futures.ThreadPoolExecutor

AbstractEventLoop.run_in_executor(executor, func, *args)

Arrange for a func to be called in the specified executor.
The executor argument should be an concurrent.futures.Executor instance. The default executor is used if executor is None This method is a coroutine.

AbstractEventLoop.set_default_executor(executor)

Set the default executor used by run_in_executor()



Debug Mode


AbstractEventLoop.get_debug()

Get the debug mode (bool) of the event loop.
The default value is True if the environment variable PYTHONASYNCIODEBUG is set to a non-empty string, False otherwise.

AbstractEventLoop.set_debug(enabled:bool)

Set the debug mode of the event loop




Examples


Hello World with call_soon()


Example usign .call_soon() method to schedule a callback.
The callback displays "Hello World" and then stops the event loop:



import asyncio

def hello_world(loop):
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()


Display the current date with call_later()

Example of callback displaying the current date every second. The callback uses the AbstractEventLoop.call_later() method to reschedule itself during 5 seconds, and then stops the event loop:


import asyncio
import datetime

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()

loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()


Notes


References:


link 1