Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 77 additions & 8 deletions src/jni/duckdb_java.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,36 @@ void _duckdb_jdbc_disconnect(JNIEnv *env, jclass, jobject conn_ref_buf) {
}
}

#include "duckdb/common/constants.hpp"
#include "duckdb/common/enums/statement_type.hpp"
#include "utf8proc_wrapper.hpp"

namespace {

//! When a query is expanded into several statements (e.g. dynamic PIVOT creating a temp enum, or transaction policy SET
//! wrappers), the JDBC driver must prepare the statement that actually returns the result set — typically the last
//! SELECT — not merely the syntactically last statement.
idx_t FindLastSelectStatementIndex(const duckdb::vector<duckdb::unique_ptr<SQLStatement>> &statements) {
for (idx_t i = statements.size(); i > 0; i--) {
if (statements[i - 1]->type == StatementType::SELECT_STATEMENT) {
return i - 1;
}
}
return DConstants::INVALID_INDEX;
}

void ExecuteTrailingQueries(ClientContext &context, const duckdb::vector<string> &trailing_queries) {
QueryParameters parameters;
for (const auto &trailing_sql : trailing_queries) {
auto res = context.Query(trailing_sql, parameters);
if (res->HasError()) {
res->ThrowError();
}
}
}

} // namespace

jobject _duckdb_jdbc_prepare(JNIEnv *env, jclass, jobject conn_ref_buf, jbyteArray query_j) {
auto conn_ref = get_connection(env, conn_ref_buf);
if (!conn_ref) {
Expand All @@ -205,17 +233,26 @@ jobject _duckdb_jdbc_prepare(JNIEnv *env, jclass, jobject conn_ref_buf, jbyteArr
throw InvalidInputException("No statements to execute.");
}

// if there are multiple statements, we directly execute the statements besides the last one
// we only return the result of the last statement to the user, unless one of the previous statements fails
for (idx_t i = 0; i + 1 < statements.size(); i++) {
idx_t prepare_idx = FindLastSelectStatementIndex(statements);
if (prepare_idx == DConstants::INVALID_INDEX) {
prepare_idx = statements.size() - 1;
}

duckdb::vector<string> trailing_queries;
for (idx_t i = prepare_idx + 1; i < statements.size(); i++) {
trailing_queries.push_back(statements[i]->ToString());
}

for (idx_t i = 0; i < prepare_idx; i++) {
auto res = conn_ref->Query(std::move(statements[i]));
if (res->HasError()) {
res->ThrowError();
}
}

auto stmt_ref = make_uniq<StatementHolder>();
stmt_ref->stmt = conn_ref->Prepare(std::move(statements.back()));
stmt_ref->stmt = conn_ref->Prepare(std::move(statements[prepare_idx]));
stmt_ref->trailing_queries_after_execute = std::move(trailing_queries);
if (stmt_ref->stmt->HasError()) {
string error_msg = string(stmt_ref->stmt->GetError());
stmt_ref->stmt = nullptr;
Expand All @@ -238,9 +275,17 @@ jobject _duckdb_jdbc_pending_query(JNIEnv *env, jclass, jobject conn_ref_buf, jb
throw InvalidInputException("No statements to execute.");
}

// if there are multiple statements, we directly execute the statements besides the last one
// we only return the result of the last statement to the user, unless one of the previous statements fails
for (idx_t i = 0; i + 1 < statements.size(); i++) {
idx_t prepare_idx = FindLastSelectStatementIndex(statements);
if (prepare_idx == DConstants::INVALID_INDEX) {
prepare_idx = statements.size() - 1;
}

duckdb::vector<string> trailing_queries;
for (idx_t i = prepare_idx + 1; i < statements.size(); i++) {
trailing_queries.push_back(statements[i]->ToString());
}

for (idx_t i = 0; i < prepare_idx; i++) {
auto res = conn_ref->Query(std::move(statements[i]));
if (res->HasError()) {
res->ThrowError();
Expand All @@ -255,7 +300,9 @@ jobject _duckdb_jdbc_pending_query(JNIEnv *env, jclass, jobject conn_ref_buf, jb
stream_results ? QueryResultOutputType::ALLOW_STREAMING : QueryResultOutputType::FORCE_MATERIALIZED;

auto pending_ref = make_uniq<PendingHolder>();
pending_ref->pending = conn_ref->PendingQuery(std::move(statements.back()), query_parameters);
pending_ref->pending = conn_ref->PendingQuery(std::move(statements[prepare_idx]), query_parameters);
pending_ref->trailing_queries_after_execute = std::move(trailing_queries);
pending_ref->connection_for_trailing = conn_ref;

return env->NewDirectByteBuffer(pending_ref.release(), 0);
}
Expand Down Expand Up @@ -298,6 +345,13 @@ jobject _duckdb_jdbc_execute(JNIEnv *env, jclass, jobject stmt_ref_buf, jobjectA
env->ThrowNew(exc_type, error_msg.c_str());
return nullptr;
}
try {
ExecuteTrailingQueries(*stmt_ref->stmt->context, stmt_ref->trailing_queries_after_execute);
} catch (const std::exception &ex) {
res_ref->res = nullptr;
ThrowJNI(env, ex.what());
return nullptr;
}
return env->NewDirectByteBuffer(res_ref.release(), 0);
}

Expand All @@ -317,6 +371,21 @@ jobject _duckdb_jdbc_execute_pending(JNIEnv *env, jclass, jobject pending_ref_bu
env->ThrowNew(exc_type, error_msg.c_str());
return nullptr;
}
if (!pending_ref->trailing_queries_after_execute.empty()) {
D_ASSERT(pending_ref->connection_for_trailing);
try {
for (const auto &trailing_sql : pending_ref->trailing_queries_after_execute) {
auto tres = pending_ref->connection_for_trailing->Query(trailing_sql);
if (tres->HasError()) {
tres->ThrowError();
}
}
} catch (const std::exception &ex) {
res_ref->res = nullptr;
ThrowJNI(env, ex.what());
return nullptr;
}
}
return env->NewDirectByteBuffer(res_ref.release(), 0);
}

Expand Down
6 changes: 6 additions & 0 deletions src/jni/holders.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,16 @@ struct ConnectionHolder {

struct StatementHolder {
duckdb::unique_ptr<duckdb::PreparedStatement> stmt;
//! When the preprocessor expands a statement into several (e.g. PIVOT + transaction policy SETs), we prepare the
//! last SELECT and run any trailing statements (typically SET current_transaction_invalidation_policy) after each
//! successful execute. Stored as SQL text so repeated execute() on the PreparedStatement remains correct.
duckdb::vector<std::string> trailing_queries_after_execute;
};

struct PendingHolder {
duckdb::unique_ptr<duckdb::PendingQueryResult> pending;
duckdb::vector<std::string> trailing_queries_after_execute;
duckdb::Connection *connection_for_trailing = nullptr;
};

struct ResultHolder {
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/org/duckdb/DuckDBPreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ private void prepare(String sql) throws SQLException {
try {
conn.checkOpen();

if (!isConnAutoCommit()) {
startTransaction();
}

stmtRef = DuckDBNative.duckdb_jdbc_prepare(conn.connRef, sql.getBytes(UTF_8));
// Track prepared statement inside the parent connection
conn.preparedStatements.add(this);
Expand Down
29 changes: 29 additions & 0 deletions src/test/java/org/duckdb/TestDuckDBJDBC.java
Original file line number Diff line number Diff line change
Expand Up @@ -2226,6 +2226,35 @@ public static void test_ignore_unsupported_options() throws Exception {
DriverManager.getConnection("jdbc:duckdb:;foo=bar;jdbc_ignore_unsupported_options=yes;", config).close();
}

public static void test_issue_22042_pivot_autocommit_false() throws Exception {
// Dynamic PIVOT expands to CREATE TYPE + SELECT; inside a JDBC transaction the preprocessor can also inject
// leading/trailing SET statements. The driver must still prepare the SELECT and return a ResultSet.
try (Connection conn = DriverManager.getConnection(JDBC_URL)) {
conn.setAutoCommit(false);
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("pivot values(1,2),(1,3) x(y,z) on x using first(y)")) {
int rows = 0;
boolean sawPivot2Col1 = false;
boolean sawPivot3Null = false;
while (rs.next()) {
rows++;
int c1 = rs.getInt(1);
Object c2 = rs.getObject(2);
if (c1 == 2 && c2 != null && ((Number) c2).intValue() == 1) {
sawPivot2Col1 = true;
}
if (c1 == 3 && c2 == null) {
sawPivot3Null = true;
}
}
assertEquals(rows, 2);
assertTrue(sawPivot2Col1);
assertTrue(sawPivot3Null);
}
conn.rollback();
}
}

public static void test_extension_excel() throws Exception {
// Check whether the Excel extension can be installed and loaded automatically
try (Connection conn = DriverManager.getConnection(JDBC_URL); Statement stmt = conn.createStatement();
Expand Down
Loading