Python / concurrent.futures / Case study

concurrent.futures - A case study.

By Marcelo Fernandes Sep 3, 2017

Problem Proposal

We have seen in the previously post about concurrent.future, that making thread pools is a good way for solving problems that demand high I/O resources, or are basically slow, once the python standard library classes, often release the GIL whenever an I/O operation is waiting to be completed, hence freeing resources for launching other threads in the meanwhile. In this post, we will be covering a problem like that, and we will be trying to simulate a real life issue involving databases I/O.

During the web software development process, sometimes we end up having big data tables on our database, and we happen to make queries on those tables in order to retrieve useful information for the client that is accessing it. This is a time consuming task, and the biggest issue to be solved is the delay between the information request and the information retrieval, therefore it is the perfect problem for this case study. Let's get it on.



Computer Specifications

This example will be running on a: i5-4670 CPU @ 3.40GHz with 4 cores and 24Gib of System Memory @ 1600 MHz



Setting Up the Database

In this problem we will be using Postgres, its probably the correct database for this example, the natural option (Sqlite3) does not support concurrency.


Making the db


sudo -i -u postgres
create database case_study

Lets create some tables:


import psycopg2


# Create a connection with a database.
dbinfo = "dbname='case_study' user='postgres' host='localhost' password='postgres'"
conn = psycopg2.connect(dbinfo)

# Instantiate a cursor, that is going to be our
# main tool to insert data in our db.
c = conn.cursor()

# sql command for creating some generic tables.
first_table = """CREATE TABLE first_table
                 (value integer)"""

second_table = """CREATE TABLE second_table
                  (value integer)"""

third_table = """CREATE TABLE third_table
                 (value integer)"""

fourth_table = """CREATE TABLE fourth_table
                 (value integer)"""

c.execute(first_table)
c.execute(second_table)
c.execute(third_table)
c.execute(fourth_table)

# Save (commit) the changes
conn.commit()

# Close the connections.
conn.close()

This example is so simple that it hurts... We basically created four tables, in which the only column is called "value" and we are going to store an integer inside it. Take a look at the diagram:


Now we have to populate this table with some data (this may take a while):


from random import randrange

import psycopg2


# Create a connection with a database.
dbinfo = "dbname='case_study' user='postgres' host='localhost' password='postgres'"
conn = psycopg2.connect(dbinfo)

c = conn.cursor()

# We are going to create random integers
# between 0 and 9 inclusive.
RANDOM_RANGE = 10

# sql command for creating some random inputs.
for i in range(60000000):
    c.execute("INSERT INTO first_table VALUES (%s)" % randrange(RANDOM_RANGE))
    c.execute("INSERT INTO second_table VALUES (%s)" % randrange(RANDOM_RANGE))
    c.execute("INSERT INTO third_table VALUES (%s)" % randrange(RANDOM_RANGE))
    c.execute("INSERT INTO fourth_table VALUES (%s)" % randrange(RANDOM_RANGE))

# Save (commit) the changes
conn.commit()

# Close the connections.
conn.close()




Defining the functions

Our functions are going to call some basic DB procedures. They are going to be the tools that our future concurrent thread pool will be using to reach out the DB.


import psycopg2


# Create a connection with a database.
dbinfo = "dbname='case_study' user='postgres' host='localhost' password='postgres'"


def calculate_summation(table):

    conn = psycopg2.connect(dbinfo)
    c = conn.cursor()
    c.execute("SELECT SUM(value) FROM %s" % table)
    result = c.fetchone()[0]

    print("--> The Sum for the table '{}', is: {}".format(table, result))
    conn.close()


def calculate_average(table):

    conn = psycopg2.connect(dbinfo)
    c = conn.cursor()
    c.execute("SELECT AVG(value) FROM %s" % table)
    result = c.fetchone()[0]

    print("----> The Average for the table '{}', is: {}".format(table, result))
    conn.close()

Obs: we have to create a cursor for each function, because when we launch a thread in postgres, two or more threads can not share the same cursor properly, we would fail our concurrency doing so.


We are also interested in declaring a manager in order to help distributing the jobs for those two functions (it will be pretty simple now, but we will specialize it once we use ThreadPools)


def manager(tables):

    for table in tables:
        calculate_average(table)
        calculate_summation(table)



Measuring the time for the synchronous usage

Now we want to calculate the summation and the average value for each one of the four tables that we created, for that, we will call our manager and check out how long it takes to finish.


if __name__ == '__main__':

    from time import time

    tables = ['first_table', 'second_table',
              'third_table', 'fourth_table']

    start = time()
    manager(tables)

    print("\n\n*** It took ", time() - start, ' seconds to finish.')

# Output:
#----> The Average for the table 'first_table', is: 4.500336930693069
#--> The Sum for the table 'first_table', is: 45453403
#----> The Average for the table 'second_table', is: 4.500577029702971
#--> The Sum for the table 'second_table', is: 45455828
#----> The Average for the table 'third_table', is: 4.501210495049505
#--> The Sum for the table 'third_table', is: 45462226
#----> The Average for the table 'fourth_table', is: 4.500959603960396
#--> The Sum for the table 'fourth_table', is: 45459692


#*** It took  19.662637233734131  seconds to finish.

It takes on average in my computer, around 19-20 seconds to finish the calculations, it seems to be very slow and not practical.



Introducing concurrent.futures

Now we're gonna make some changes on our manager in order to work with Threads.


from concurrent.futures import ThreadPoolExecutor

def manager(tables):

    with ThreadPoolExecutor(max_workers=8) as executor:
        executor.map(calculate_average, tables)
        executor.map(calculate_summation, tables)

That is the only change we need, we ended up having the same number of lines as the original code.



Measuring the time for the Asynchronous usage


if __name__ == '__main__':

    from time import time

    tables = ['first_table', 'second_table',
              'third_table', 'fourth_table']

    start = time()
    manager(tables)

    print("\n\n*** It took ", time() - start, ' seconds to finish.')

#--> The Sum for the table 'third_table', is: 13508026
#--> The Sum for the table 'fourth_table', is: 13499095
#----> The Average for the table 'first_table', is: 4.4974343812248838
#----> The Average for the table 'fourth_table', is: 4.4996143405323101
#--> The Sum for the table 'first_table', is: 13492555
#--> The Sum for the table 'second_table', is: 13501726
#----> The Average for the table 'third_table', is: 4.5025912849626807
#----> The Average for the table 'second_table', is: 4.5004913241619490


#*** It took  5.911668062210083  seconds to finish.

Conclusions

The result seems to be very good, we dropped our time from 19.6 seconds to 5.9. Also, we can see that the result ran asynchronously, once the output does not follow the order in which the functions where called.


If you ask me what would happen in a larger databases, I would say that this gap would be even bigger.



The code

The running code:


from concurrent.futures import ThreadPoolExecutor

import psycopg2


# Create a connection with a database.
dbinfo = "dbname='case_study' user='postgres' host='localhost' password='postgres'"


def calculate_summation(table):

    conn = psycopg2.connect(dbinfo)
    c = conn.cursor()
    c.execute("SELECT SUM(value) FROM %s" % table)
    result = c.fetchone()[0]

    print("--> The Sum for the table '{}', is: {}".format(table, result))


def calculate_average(table):

    conn = psycopg2.connect(dbinfo)
    c = conn.cursor()
    c.execute("SELECT AVG(value) FROM %s" % table)
    result = c.fetchone()[0]

    print("----> The Average for the table '{}', is: {}".format(table, result))


def manager(tables):

    with ThreadPoolExecutor(max_workers=8) as executor:
        executor.map(calculate_average, tables)
        executor.map(calculate_summation, tables)


if __name__ == '__main__':

    from time import time

    tables = ['first_table', 'second_table',
              'third_table', 'fourth_table']

    start = time()
    manager(tables)

    print("\n\n*** It took ", time() - start, ' seconds to finish.')


Notes


References

Link 1