Skip to content

Commit ce79211

Browse files
committed
NIFI-14869: Add MSSQL MERGE-based upsert via new DatabaseDialectService
Implement supportsUpsert() and getUpsertStatement() in MSSQLDatabaseDialectService to produce a MERGE statement for UPSERT operations on SQL Server. - Added new controller service: MSSQLDatabaseDialectService Provides SELECT with paging, UPSERT using MERGE, and basic ALTER/CREATE DDL generation for SQL Server 2012+. - Added new controller service: MSSQL2008DatabaseDialectService Extends MSSQLDatabaseDialectService to support SQL Server 2008, including SELECT with ROW_NUMBER() paging. - Registered both services in META-INF/services/org.apache.nifi.controller.ControllerService Enables NiFi to discover and use the new MSSQL dialect services. - Added unit tests: - TestMSSQLDatabaseDialectService.java: Verifies SQL generation and UPSERT logic for MSSQLDatabaseDialectService. - TestMSSQL2008DatabaseDialectService.java: Verifies paging and SELECT logic for MSSQL2008DatabaseDialectService. Signed-off-by: Adel Wageih <[email protected]>
1 parent 7839892 commit ce79211

File tree

5 files changed

+788
-0
lines changed

5 files changed

+788
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.processors.standard.db.impl;
18+
19+
import org.apache.nifi.annotation.documentation.CapabilityDescription;
20+
import org.apache.nifi.annotation.documentation.Tags;
21+
import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
22+
import org.apache.nifi.database.dialect.service.api.PageRequest;
23+
import org.apache.nifi.database.dialect.service.api.QueryStatementRequest;
24+
import org.apache.nifi.database.dialect.service.api.StandardStatementResponse;
25+
import org.apache.nifi.database.dialect.service.api.StatementRequest;
26+
import org.apache.nifi.database.dialect.service.api.StatementResponse;
27+
import org.apache.nifi.database.dialect.service.api.StatementType;
28+
import org.apache.nifi.database.dialect.service.api.TableDefinition;
29+
30+
import java.util.List;
31+
import java.util.Optional;
32+
33+
@Tags({"mssql", "sqlserver", "database", "dialect"})
34+
@CapabilityDescription("Microsoft SQL Server 2008 Database Dialect Service providing SELECT with ROW_NUMBER() paging, UPSERT using MERGE, and basic ALTER/CREATE DDL generation.")
35+
public class MSSQL2008DatabaseDialectService extends MSSQLDatabaseDialectService {
36+
37+
@Override
38+
public StatementResponse getStatement(final StatementRequest statementRequest) {
39+
if (statementRequest.statementType() == StatementType.SELECT) {
40+
return new StandardStatementResponse(buildSelect2008(statementRequest));
41+
}
42+
return super.getStatement(statementRequest);
43+
}
44+
45+
private String buildSelect2008(final StatementRequest statementRequest) {
46+
if (!(statementRequest instanceof QueryStatementRequest query)) {
47+
throw new IllegalArgumentException("Query Statement Request not found [" + statementRequest.getClass() + "]");
48+
}
49+
50+
final TableDefinition table = statementRequest.tableDefinition();
51+
final String qualifiedTableName = qualifyTableName(table);
52+
53+
final Optional<String> derivedTable = query.derivedTable();
54+
if (derivedTable.isPresent()) {
55+
final String tableAlias = "AS " + table.tableName();
56+
return "SELECT * FROM (" + derivedTable.get() + ") " + tableAlias;
57+
}
58+
59+
final String selectColumns = buildSelectColumns(table.columns());
60+
61+
final Optional<PageRequest> page = query.pageRequest();
62+
final Long limit;
63+
final Long offset;
64+
final String indexColumnName;
65+
if (page.isPresent()) {
66+
final PageRequest p = page.get();
67+
limit = p.limit().isPresent() ? p.limit().getAsLong() : null;
68+
offset = p.offset();
69+
indexColumnName = p.indexColumnName().orElse(null);
70+
} else {
71+
limit = null;
72+
offset = null;
73+
indexColumnName = null;
74+
}
75+
76+
final String whereClause = query.whereClause().orElse(null);
77+
final String orderByClause = query.orderByClause().orElse(null);
78+
79+
final boolean partitioned = indexColumnName != null && !indexColumnName.isBlank();
80+
final boolean hasOrder = orderByClause != null && !orderByClause.isBlank();
81+
final boolean useWindowPaging = limit != null && !partitioned && offset != null && (offset > 0 || hasOrder);
82+
83+
final StringBuilder sql = new StringBuilder("SELECT ");
84+
85+
if (limit != null && !partitioned) {
86+
if (useWindowPaging) {
87+
sql.append("* FROM (SELECT ");
88+
}
89+
final long effectiveOffset = (offset == null) ? 0 : offset;
90+
if (effectiveOffset + limit > 0) {
91+
sql.append("TOP ").append(effectiveOffset + limit).append(' ');
92+
}
93+
}
94+
95+
sql.append(selectColumns);
96+
97+
if (useWindowPaging && hasOrder) {
98+
sql.append(", ROW_NUMBER() OVER(ORDER BY ")
99+
.append(orderByClause)
100+
.append(" asc) rnum");
101+
}
102+
103+
sql.append(" FROM ").append(qualifiedTableName);
104+
105+
if (whereClause != null && !whereClause.isBlank()) {
106+
sql.append(" WHERE ").append(whereClause);
107+
if (partitioned) {
108+
sql.append(" AND ")
109+
.append(indexColumnName)
110+
.append(" >= ")
111+
.append(offset != null ? offset : 0);
112+
if (limit != null) {
113+
sql.append(" AND ")
114+
.append(indexColumnName)
115+
.append(" < ")
116+
.append((offset == null ? 0 : offset) + limit);
117+
}
118+
}
119+
}
120+
121+
if (orderByClause != null && !orderByClause.isBlank() && !partitioned) {
122+
sql.append(" ORDER BY ").append(orderByClause);
123+
}
124+
125+
if (useWindowPaging) {
126+
sql.append(") A WHERE rnum > ")
127+
.append(offset)
128+
.append(" AND rnum <= ")
129+
.append(offset + limit);
130+
}
131+
132+
return sql.toString();
133+
}
134+
135+
private String buildSelectColumns(final List<ColumnDefinition> columns) {
136+
if (columns == null || columns.isEmpty()) {
137+
return "*";
138+
}
139+
final StringBuilder sb = new StringBuilder();
140+
for (int i = 0; i < columns.size(); i++) {
141+
if (i > 0) sb.append(", ");
142+
sb.append(columns.get(i).columnName());
143+
}
144+
return sb.toString();
145+
}
146+
147+
private String qualifyTableName(final TableDefinition table) {
148+
final StringBuilder name = new StringBuilder();
149+
table.catalog().ifPresent(c -> name.append(c).append('.'));
150+
table.schemaName().ifPresent(s -> name.append(s).append('.'));
151+
name.append(table.tableName());
152+
return name.toString();
153+
}
154+
}
155+
156+

0 commit comments

Comments
 (0)