A Fast Method to Bulk Insert a Pandas DataFrame into Postgres
Aug 8, 2020 · 774 words · 4 minutes read
There can be times when you want to write a Pandas DataFrame directly to a database without writing to disk. Unfortunately the default methods in Pandas are slow. In this post I compare several methods for inserting the DataFrame into a Postgres database – skip here to see the fastest.
Dataset
For this post, the dataset is daily stock quotes for companies in the S&P500 for the period of 1998 through July 31st, 2013.
The dataset contains 1,808,278 records of the date, time (always 0), open, close, high, low, volume, and ticker symbol.
The destination Postgres table schema is defined by:
CREATE TABLE IF NOT EXISTS public.dest (
date TIMESTAMP,
time INT,
open NUMERIC,
high NUMERIC,
low NUMERIC,
close NUMERIC,
volume NUMERIC,
ticker TEXT
);
You can see most fields are numbers (NUMERIC or INT) but there is also the date column (TIMESTAMP) and the ticker symbol (TEXT).
Timing
The timing of each method was measured using timeit.repeat and taking the minimum time, which should be a good estimate of the fastest the code can execute.
Variability in timing is attributed to other processes running in the background on the system.
In order to measure fairly I included processing steps specific to each implementation in the timing.
The built-in DataFrame.to_sql method has no additional processing but the psycopg2 methods do (converting to a list of dictionaries or string buffer).
Method specific processing timing could be measured separately from the insert but this way each approach is starting from the same place – a processed DataFrame that is ready to be inserted into the database.
Cleanup
Before each iteration of timeit.repeat I run setup code to TRUNCATE the table.
This set is excluded from timing.
pandas.DataFrame.to_sql
If you’re looking to insert a Pandas DataFrame into a database, the to_sql method is likely the first thing you think of.
Simply call the to_sql method on your DataFrame (e.g. df.to_sql), give the name of the destination table (dest), and provide a SQLAlchemy engine (engine).
If the table already exists (this one does) then tell Pandas to append, rather than fail, (if_exists="append").
df.to_sql(
name="dest",
con=engine,
if_exists="append",
index=False
)
This, however, is painfully slow. On my system, the best time was 142.67s (2min 22s).
pandas.DataFrame.to_sql with method="multi"
Given that you tried the previous approach and were unsatisfied, you might dig into the documentation looking for options that may improve performance.
One choice is to set method="multi" which will insert multiple records at a time in a singe INSERT statement — i.e. perform a batch insert.
df.to_sql(
name="dest",
con=engine,
if_exists="append",
index=False,
method="multi"
)
This actually performs worse than the default method!
The best time was 196.56s (3min 17s).
pandas.DataFrame.to_sql with method=callable
Another choice is to set method to a callable function with signature (pd_table, conn, keys, data_iter). The Pandas documentation shows an example of a callable that uses the Postgres COPY statement.
def psql_insert_copy(table, conn, keys, data_iter):
"""
Execute SQL statement inserting data
Parameters
----------
table : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
Column names
data_iter : Iterable that iterates the values to be inserted
"""
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)
columns = ', '.join('"{}"'.format(k) for k in keys)
if table.schema:
table_name = '{}.{}'.format(table.schema, table.name)
else:
table_name = table.name
sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)
df.to_sql(
name="dest",
con=engine,
if_exists="append",
index=False,
method=psql_insert_copy
)
This improves performance quite a bit.
The best time was 14.83s.
psycopg2 using execute_values
Another option is to use the psycopg2 module instead of SQLAlchemy. Similarly to the to_sql method=multi, the psycopg2 module provides the execute_values function which performs bulk inserts into the database. However we have to convert the DataFrame into another format (e.g. a list of dictionaries).
with conn.cursor() as c:
execute_values(
cur=c,
sql="""
INSERT INTO dest
(date, time, open, high, low, close, volume, ticker)
VALUES %s;
""",
argslist=df.to_dict(orient="records"),
template="""
(
%(date)s, %(time)s, %(open)s,
%(high)s, %(low)s, %(close)s,
%(volume)s, %(ticker)s
)
"""
)
conn.commit()
Despite converting the DataFrame, this method is still pretty fast – the best time was 46.86s. Yet there’s other options too.
psycopg2 using copy_from and a string buffer
This next approach uses the psycopg2 copy_from function to
sio = StringIO()
sio.write(df.to_csv(index=None, header=None))
sio.seek(0)
with conn.cursor() as c:
c.copy_from(
file=sio,
table="dest",
columns=[
"date",
"time",
"open",
"high",
"low",
"close",
"volume",
"ticker"
],
sep=","
)
conn.commit()
17.33s
psycopg2 using copy_from and csv
This final approach
sio = StringIO()
writer = csv.writer(sio)
writer.writerows(df.values)
sio.seek(0)
with conn.cursor() as c:
c.copy_from(
file=sio,
table="dest",
columns=[
"date",
"time",
"open",
"high",
"low",
"close",
"volume",
"ticker"
],
sep=","
)
conn.commit()
Results
