Skip to content

Multithreaded python MERGE INTO operations hangs ducklake postgres database connection #417

@heathwinning

Description

@heathwinning

What happens?

When I run multiple python threads that write and read to/from ducklake database tables, the database connection gets stuck and all remaining database interactions hang. If I limit the application to a single worker, almost never does the database connection get stuck.

I am using postgres as the catalog to enable multi-threaded read/writes and multiple connections to the database, but this issue appears with only a multi-threaded write and read process. The issue may also be reproducible with only write operations, but I've found that reads make it appear more quickly.

In my use-case where this issue has appeared I use MERGE INTO upserts, so I have produced a minimal example using this operation. I have not tested whether this occurs in other write operations.

This was an issue on version 0.3 and nightlies, and is still an issue on 0.4.

I am open to the possibility that I've done something wrong, please let me know I've made a mistake.

To Reproduce

In this example, a ducklake database my_ducklake is initialised and table my_table is created. Records in my_table are an id between 0 and 20,000 and a text value x, along with an upsert_id that tracks which operation upserted the record.

Calling the function upsert_data upserts 10,000 randomly selected ids with random x values if upsert_id is greater than the existing record. The function also receives a wait parameter to sleep before beginning the upsert - this is to make some upserts occur out-of-order so no update occurs.

I've set two different wait regimes for the upserts:

  • the first results in few simultaneous and out-of-order upserts - this occassionally gets stuck.
  • the second is currently commented out, and allows many simultaneous and out-of-oder upserts - this often gets stuck.

(It's possible that some tweaking of the wait timing will be needed to reproduce the issue, depending on the speed of the machine)

import shutil
import threading
import random
from time import sleep
import duckdb
import numpy as np
import pandas as pd

connection = duckdb.execute(
    """
    INSTALL ducklake FROM core;
    INSTALL postgres;
    ATTACH 'ducklake:postgres:dbname=ducklake_catalog host=localhost user=ducklake' as my_ducklake  (DATA_PATH 'my_ducklake_storage') 
    """
)
shutil.rmtree("my_ducklake_storage", ignore_errors=True)
connection.execute(
    """
    DROP TABLE IF EXISTS my_ducklake.my_table;
    CREATE TABLE my_ducklake.my_table (id INTEGER, x TEXT, upsert_id INTEGER);
    """
)

n_unique_ids = 20_000
n_records_to_upsert = 10_000

def upsert_data(connection: duckdb.DuckDBPyConnection, upsert_id: int, wait: float):
    sleep(wait)
    print(f"upsert start: {upsert_id}")

    df = pd.DataFrame(
        dict(
            # randomly upsert values for ids between 0 and 10_000
            id=np.random.permutation(n_unique_ids)[:n_records_to_upsert],
            x=pd.Series(np.random.rand(n_records_to_upsert)).astype(str),
            upsert_id=upsert_id,
        )
    )
    connection.execute(
        """
                MERGE INTO my_ducklake.my_table
                USING (SELECT * FROM df) AS df
                ON (my_ducklake.my_table.id = df.id)
                WHEN MATCHED
                    AND df.upsert_id > my_ducklake.my_table.upsert_id
                    THEN UPDATE BY NAME
                WHEN NOT MATCHED
                    THEN INSERT BY NAME
        """
    )
    records_upserted = connection.sql(
        f"""select count(*) from my_ducklake.my_table where upsert_id = {upsert_id}""",
    ).fetchone()[0]
    print(f"upsert done: {upsert_id} records_upserted: {records_upserted}")

n_upserts = 30
threads = []
for upsert_id in range(n_upserts):
    # set each thread to start 2s apart on average, with a random jitter between 0-3
    # occassionally upserts will start out-of-order and try to run simultaneously
    wait = (upsert_id*2 + random.random()*3)

    # set each thread to start 1s apart on average, with a random jitter between 0-5
    # many upserts will start out-of-order and try to run simultaneously
    # wait = upsert_id + random.random() * 5

    threads.append(
        threading.Thread(
            target=upsert_data,
            args=(connection, upsert_id, wait),
        )
    )
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

When the process gets stuck, running the following query on the postgres database reveals queries with state idle in transaction that never complete. I assume these idle transactions are part of the issue, but they might just be a symptom - I'm not familiar with the underlying ducklake implementation.

SELECT
    pid,
    datname,
    usename,
    client_addr,
    now() - pg_stat_activity.query_start AS duration,
    query,
    state
FROM
    pg_stat_activity
WHERE datname = 'ducklake_catalog';

In my complete application the types of idle queries I almost always find are of the form below, but refer to various different ducklake_inlined_delete_<n> relnames. These are less common in the minimal example, but can still be reproduced.

SELECT pg_namespace.oid AS namespace_id, relname, relpages, attname,                                                               
     pg_type.typname type_name, atttypmod type_modifier, pg_attribute.attndims ndim,                                                
     attnum, pg_attribute.attnotnull AS notnull, NULL constraint_id,                                                                
     NULL constraint_type, NULL constraint_key                                                                                      
 FROM pg_class                                                                                                                      
 JOIN pg_namespace ON relnamespace = pg_namespace.oid                                                                               
 JOIN pg_attribute ON pg_class.oid=pg_attribute.attrelid                                                                            
 JOIN pg_type ON atttypid=pg_type.oid                                                                                               
 WHERE attnum > 0 AND relkind IN ('r', 'v', 'm', 'f', 'p') AND pg_namespace.nspname='public'AND relname='ducklake_inlined_delete_51'
 UNION ALL                                                                                                                          
 SELECT pg_namespace.oid AS namespace_id, relname, NULL relpages, NULL attname, NULL type_name,                                     
     NULL type_modifier, NULL ndim, NULL attnum, NULL AS notnull,                                                                   
     pg_constraint.oid AS constraint_id, contype AS constraint_type,                                                                
     conkey AS constraint_key                                                                                                       
 FROM pg_class                                                                                                                      
 JOIN pg_namespace ON relnamespace = pg_namespace.oid                                                                               
 JOIN pg_constraint ON (pg_class.oid=pg_constraint.conrelid)                                                                        
 WHERE relkind IN ('r', 'v', 'm', 'f', 'p') AND contype IN ('p', 'u')

In this minimal example, I often find idle transactions of the form below, but I have not seen these in my complete application - I'm not sure if these are relevant.

COPY (SELECT "snapshot_id", "schema_version", "next_catalog_id", "next_file_id"
    FROM (
         SELECT snapshot_id, schema_version, next_catalog_id, next_file_id
                  FROM "public".ducklake_snapshot WHERE snapshot_id = (                                                                                 
                      SELECT MAX(snapshot_id) FROM "public".ducklake_snapshot                                                                           
                  )
) AS __unnamed_subquery ) TO STDOUT (FORMAT "binary");

OS:

osx_arm64

DuckDB Version:

1.5.0

DuckLake Version:

0.4

DuckDB Client:

Python 3.11.10

Hardware:

No response

Full Name:

Heath Winning

Affiliation:

LakeSight Data

What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.

I have tested with a stable release

Did you include all relevant data sets for reproducing the issue?

Yes

Did you include all code required to reproduce the issue?

  • Yes, I have

Did you include all relevant configuration (e.g., CPU architecture, Python version, Linux distribution) to reproduce the issue?

  • Yes, I have

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions