Python / Asyncio / Case study

asyncio - A case study.

By Marcelo Fernandes Sep 17, 2017

Problem Proposal

During the web software development process, sometimes we end up having big data tables on our database, and we happen to make queries on those tables in order to retrieve useful information for the client that is accessing it. This is a time consuming task, and the biggest issue to be solved is the delay between the information request and the information retrieval, therefore it is the perfect problem for this case study. Let's get it on.



Computer Specifications

This example will be running on a: i5-4670 CPU @ 3.40GHz with 4 cores and 24Gib of System Memory @ 1600 MHz



Setting Up the Database

In this problem we will be using Postgres, its probably the correct database for this example, the natural option (Sqlite3) does not support concurrency.


Making the db


sudo -i -u postgres
create database case_study

Lets create some tables:


import psycopg2


# Create a connection with a database.
dbinfo = "dbname='case_study' user='postgres' host='localhost' password='postgres'"
conn = psycopg2.connect(dbinfo)

# Instantiate a cursor, that is going to be our
# main tool to insert data in our db.
c = conn.cursor()

# sql command for creating some generic tables.
first_table = """CREATE TABLE first_table
                 (value integer)"""

second_table = """CREATE TABLE second_table
                  (value integer)"""

third_table = """CREATE TABLE third_table
                 (value integer)"""

fourth_table = """CREATE TABLE fourth_table
                 (value integer)"""

c.execute(first_table)
c.execute(second_table)
c.execute(third_table)
c.execute(fourth_table)

# Save (commit) the changes
conn.commit()

# Close the connections.
conn.close()

This example is so simple that it hurts... We basically created four tables, in which the only column is called "value" and we are going to store an integer inside it. Take a look at the diagram:


Now we have to populate this table with some data (this may take a while):


from random import randrange

import psycopg2


# Create a connection with a database.
dbinfo = "dbname='case_study' user='postgres' host='localhost' password='postgres'"
conn = psycopg2.connect(dbinfo)

c = conn.cursor()

# We are going to create random integers
# between 0 and 9 inclusive.
RANDOM_RANGE = 10

# sql command for creating some random inputs.
for i in range(60000000):
    c.execute("INSERT INTO first_table VALUES (%s)" % randrange(RANDOM_RANGE))
    c.execute("INSERT INTO second_table VALUES (%s)" % randrange(RANDOM_RANGE))
    c.execute("INSERT INTO third_table VALUES (%s)" % randrange(RANDOM_RANGE))
    c.execute("INSERT INTO fourth_table VALUES (%s)" % randrange(RANDOM_RANGE))

# Save (commit) the changes
conn.commit()

# Close the connections.
conn.close()




Defining the functions

Our functions are going to call some basic DB procedures. They are going to be the tools that our future coroutines will be using to reach out the DB.


import psycopg2


# Create a connection with a database.
dbinfo = "dbname='case_study' user='postgres' host='localhost' password='postgres'"


def calculate_summation(table):

    conn = psycopg2.connect(dbinfo)
    c = conn.cursor()
    c.execute("SELECT SUM(value) FROM %s" % table)
    result = c.fetchone()[0]

    print("--> The Sum for the table '{}', is: {}".format(table, result))
    conn.close()


def calculate_average(table):

    conn = psycopg2.connect(dbinfo)
    c = conn.cursor()
    c.execute("SELECT AVG(value) FROM %s" % table)
    result = c.fetchone()[0]

    print("----> The Average for the table '{}', is: {}".format(table, result))
    conn.close()

Obs: we have to create a cursor for each function, because when we launch concurrency in postgres, two or more threads can not share the same cursor properly, we would fail our concurrency doing so.


We are also interested in declaring a manager in order to help distributing the jobs for those two functions (it will be pretty simple now, but we will specialize it once we use asyncio)


def manager(tables):

    for table in tables:
        calculate_average(table)
        calculate_summation(table)



Measuring the time for the synchronous usage

Now we want to calculate the summation and the average value for each one of the four tables that we created, for that, we will call our manager and check out how long it takes to finish.


if __name__ == '__main__':

    from time import time

    tables = ['first_table', 'second_table',
              'third_table', 'fourth_table']

    start = time()
    manager(tables)

    print("\n\n*** It took ", time() - start, ' seconds to finish.')

# Output:
#----> The Average for the table 'first_table', is: 4.500336930693069
#--> The Sum for the table 'first_table', is: 45453403
#----> The Average for the table 'second_table', is: 4.500577029702971
#--> The Sum for the table 'second_table', is: 45455828
#----> The Average for the table 'third_table', is: 4.501210495049505
#--> The Sum for the table 'third_table', is: 45462226
#----> The Average for the table 'fourth_table', is: 4.500959603960396
#--> The Sum for the table 'fourth_table', is: 45459692


#*** It took  19.662637233734131  seconds to finish.

It takes on average in my computer, around 19-20 seconds to finish the calculations, it seems to be very slow and not practical.



Introducing asyncio

Now we're gonna make some changes on our manager in order to work with asyncio.


import asyncio

async def manager(tables):

    average_tasks = [asyncio.ensure_future(calculate_average(table))
                     for table in tables]

    await asyncio.wait(average_tasks)

    sum_tasks = [asyncio.ensure_future(calculate_summation(table))
                 for table in tables]

    await asyncio.wait(sum_tasks)

That is the only change we need for the manager. But we still have to make our connections work asynchronously:


# This guys works as a wrapper for psycopg
#  to work as an asynchronous lib
import aiopg

async def sql_runner(sql):

    # We have to acquire connections
    async with aiopg.create_pool(dbinfo) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as c:
                await c.execute(sql)
                ret = []
                async for row in c:
                    ret.append(row)
                return ret



Measuring the time for the Asynchronous usage


if __name__ == '__main__':

    from time import time

    tables = ['first_table', 'second_table',
              'third_table', 'fourth_table']

    start = time()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(manager(tables))

    print("\n\n*** It took ", time() - start, ' seconds to finish.')

#----> The Average for the table 'third_table', is: [(Decimal('4.4994398333333333'),)]
#----> The Average for the table 'fourth_table', is: [(Decimal('4.5000095000000000'),)]
#----> The Average for the table 'first_table', is: [(Decimal('4.4999212333333333'),)]
#----> The Average for the table 'second_table', is: [(Decimal('4.4999479000000000'),)]
#--> The Sum for the table 'fourth_table', is: [(135000285,)]
#--> The Sum for the table 'second_table', is: [(134998437,)]
#--> The Sum for the table 'third_table', is: [(134983195,)]
#--> The Sum for the table 'first_table', is: [(134997637,)]


#*** It took  5.051668062210083  seconds to finish.

Conclusions

The result seems to be very good, we dropped our time from 19.6 seconds to 5.05. Also, we can see that the result ran asynchronously, once the output does not follow the order in which the functions where called.


If you ask me what would happen in a larger databases, I would say that this gap would be even bigger.



The code

The running code:


import asyncio

# This guys works as a wrapper for psycopg
#  to work as an asynchronous lib
import aiopg


# Create a connection with a database.
dbinfo = "dbname='case_study' user='postgres' host='localhost' password='postgres'"


async def sql_runner(sql):

    # We have to acquire connections
    async with aiopg.create_pool(dbinfo) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as c:
                await c.execute(sql)
                ret = []
                async for row in c:
                    ret.append(row)
                return ret


async def calculate_summation(table):

    sql = "SELECT SUM(value) FROM %s" % table

    result = await sql_runner(sql)

    print("--> The Sum for the table '{}', is: {}".format(table, result))


async def calculate_average(table):

    sql = "SELECT AVG(value) FROM %s" % table

    result = await sql_runner(sql)

    print("----> The Average for the table '{}', is: {}".format(table, result))


async def manager(tables):

    average_tasks = [asyncio.ensure_future(calculate_average(table))
                     for table in tables]

    await asyncio.wait(average_tasks)

    sum_tasks = [asyncio.ensure_future(calculate_summation(table))
                 for table in tables]

    await asyncio.wait(sum_tasks)


if __name__ == '__main__':

    from time import time

    tables = ['first_table', 'second_table',
              'third_table', 'fourth_table']

    start = time()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(manager(tables))

    print("\n\n*** It took ", time() - start, ' seconds to finish.')



Notes


References

Link 1