Home Blog CV Projects Patterns Notes Book Colophon Search

Python Batched SQLite Writes

25 Jun, 2024

import apsw
import time

# Number of rows to insert
NUM_ROWS = 1_000_000

# Open a connection to the SQLite database in WAL mode
connection = apsw.Connection("test.db")

# Create a cursor
cursor = connection.cursor()
cursor.execute("PRAGMA journal_mode=WAL")

# Create a table
cursor.execute("""
CREATE TABLE demo_table (
    data TEXT
)
""")

# Start timing
start_time = time.time()

# Begin a transaction
cursor.e```xecute("BEGIN TRANSACTION")

# Insert rows
for i in range(NUM_ROWS):
    cursor.execute("INSERT INTO demo_table (data) VALUES (?)", (f"Row {i}",))

# Commit the transaction
cursor.execute("COMMIT")

# End timing
end_time = time.time()

# Calculate and print the time taken to insert the rows
time_taken = end_time - start_time
print(f"Time taken to insert {NUM_ROWS} rows: {time_taken:.2f} seconds {NUM_ROWS/time_taken:.2f}")

# Close the connection
connection.close()

Running 1 million inserts:

$ python3 sqlitew1.py 
Time taken to insert 1000000 rows: 1.44 seconds 693532.92

Here's a work in progress version using asyncio with an API so that writes get batched:

import asyncio
import time

try:
    import uvloop
    uvloop.install()
except ImportError:
    pass

# Try to import APSW, fallback to sqlite3 if APSW is not available
#try:
#    import apsw
#    print(apsw)
#    using_apsw = True
#except ImportError:
#    import sqlite3
#    using_apsw = False

import sqlite3
using_apsw = False

class AsyncDatabase:
    def __init__(self, db_path):
        if using_apsw:
            # print('Using APSW')
            self.connection = apsw.Connection(db_path)
            self.cursor = self.connection.cursor()
        else:
            # print('Using built-in SQLite')
            self.connection = sqlite3.connect(db_path, check_same_thread=False)
            self.cursor = self.connection.cursor()
            
        self._commit_event = asyncio.Event()
        self._needs_commit = False
        self._lock = asyncio.Lock()

        pragmas = {
            "journal_mode": "WAL",
            "busy_timeout": "5000",
            "synchronous": "NORMAL",
            "cache_size": "1000000",  # cache_size is in pages. Assuming page size of 1KB
            "foreign_keys": "true",
            "temp_store": "memory",
            "locking_mode": "immediate",
        }

        for key, value in pragmas.items():
            self.cursor.execute(f"PRAGMA {key}={value};")
        if using_apsw:
            self.cursor.execute("BEGIN TRANSACTION")
        else:
            self.connection.commit()  # Ensure the PRAGMA settings are applied
        asyncio.create_task(self._periodic_commit(0.03))

    async def write(self, fn, *k, **p):
        async with self._lock:
            await fn(self.cursor, *k, **p)
        self._needs_commit = True
        await self._commit_event.wait()  # Wait for the commit event to be set

    async def read(self, query, *params):
        async with self._lock:
            self.cursor.execute(query, params)
            return self.cursor.fetchall()

    async def _periodic_commit(self, interval):
        while True:
            await asyncio.sleep(interval)
            if self._needs_commit:
                await self._commit()

    async def _commit(self):
        async with self._lock:
            # print('In _commit with a lock')
            if using_apsw:
                self.cursor.execute("COMMIT")
                self.cursor.execute("BEGIN TRANSACTION")
            else:
                self.connection.commit()
            self._needs_commit = False
            self._commit_event.set()  # Signal that a commit has occurred
            self._commit_event.clear()  # Clear the event after a write



async def create_table(cursor):
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS my_table (
            column1 TEXT
        );
    ''')

async def insert_query(cursor, i):
    cursor.execute('INSERT INTO my_table (column1) VALUES (?);', (f'Row {i}',))

async def main():
    db = AsyncDatabase('file.db')
    
    # Create table before running operations
    await db.write(create_table)


    async def task(i):
        await db.write(insert_query, i)
    start_time = time.time()
    for j in range(20):
        tasks = []
        for i in range(10_000):
            tasks.append(task(i))
        await asyncio.gather(*tasks)
    elapsed_time = time.time() - start_time

    results = await db.read('SELECT COUNT(*) FROM my_table;')
    total_inserts = results[0][0]
    print(f"Total rows inserted: {total_inserts}")

    inserts_per_second = total_inserts / elapsed_time
    print(f"Inserts per second: {inserts_per_second:.2f}")

if __name__ == '__main__':
    asyncio.run(main())

Running:

$ rm file.db ; time python3 h.py 
Total rows inserted: 200000
Inserts per second: 144222.65

real	0m1.504s
user	0m1.345s
sys	0m0.032s

Comments

Be the first to comment.

Add Comment





Copyright James Gardner 1996-2020 All Rights Reserved. Admin.