@@ -143,6 +143,7 @@ StorageMerge::StorageMerge(
143143 const String & source_database_name_or_regexp_,
144144 bool database_is_regexp_,
145145 const DBToTableSetMap & source_databases_and_tables_,
146+ const std::optional<String> & table_to_write_,
146147 ContextPtr context_)
147148 : IStorage(table_id_)
148149 , WithContext(context_->getGlobalContext ())
@@ -157,6 +158,7 @@ StorageMerge::StorageMerge(
157158 storage_metadata.setComment (comment);
158159 setInMemoryMetadata (storage_metadata);
159160 setVirtuals (createVirtuals ());
161+ setTableToWrite (table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
160162}
161163
162164StorageMerge::StorageMerge (
@@ -166,6 +168,7 @@ StorageMerge::StorageMerge(
166168 const String & source_database_name_or_regexp_,
167169 bool database_is_regexp_,
168170 const String & source_table_regexp_,
171+ const std::optional<String> & table_to_write_,
169172 ContextPtr context_)
170173 : IStorage(table_id_)
171174 , WithContext(context_->getGlobalContext ())
@@ -180,6 +183,7 @@ StorageMerge::StorageMerge(
180183 storage_metadata.setComment (comment);
181184 setInMemoryMetadata (storage_metadata);
182185 setVirtuals (createVirtuals ());
186+ setTableToWrite (table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
183187}
184188
185189StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators (ContextPtr context_) const
@@ -1668,6 +1672,44 @@ std::optional<UInt64> StorageMerge::totalRowsOrBytes(F && func) const
16681672 return first_table ? std::nullopt : std::make_optional (total_rows_or_bytes);
16691673}
16701674
1675+ void StorageMerge::setTableToWrite (
1676+ const std::optional<String> & table_to_write_,
1677+ const String & source_database_name_or_regexp_,
1678+ bool database_is_regexp_)
1679+ {
1680+ if (!table_to_write_.has_value ())
1681+ {
1682+ table_to_write = std::nullopt ;
1683+ return ;
1684+ }
1685+
1686+ auto qualified_name = QualifiedTableName::parseFromString (*table_to_write_);
1687+
1688+ if (qualified_name.database .empty ())
1689+ {
1690+ if (database_is_regexp_)
1691+ throw Exception (ErrorCodes::BAD_ARGUMENTS, " Argument 'table_to_write' must contain database if 'db_name' is regular expression." );
1692+
1693+ qualified_name.database = source_database_name_or_regexp_;
1694+ }
1695+
1696+ table_to_write = qualified_name;
1697+ }
1698+
1699+ SinkToStoragePtr StorageMerge::write (
1700+ const ASTPtr & query,
1701+ const StorageMetadataPtr & metadata_snapshot,
1702+ ContextPtr context_,
1703+ bool async_insert)
1704+ {
1705+ if (!table_to_write.has_value ())
1706+ throw Exception (ErrorCodes::NOT_IMPLEMENTED, " Method write is not allowed in storage {} without described table to write." , getName ());
1707+
1708+ auto database = DatabaseCatalog::instance ().getDatabase (table_to_write->database );
1709+ auto table = database->getTable (table_to_write->table , context_);
1710+ return table->write (query, metadata_snapshot, context_, async_insert);
1711+ }
1712+
16711713void registerStorageMerge (StorageFactory & factory)
16721714{
16731715 factory.registerStorage (" Merge" , [](const StorageFactory::Arguments & args)
@@ -1678,10 +1720,12 @@ void registerStorageMerge(StorageFactory & factory)
16781720
16791721 ASTs & engine_args = args.engine_args ;
16801722
1681- if (engine_args.size () != 2 )
1723+ size_t size = engine_args.size ();
1724+
1725+ if (size < 2 || size > 3 )
16821726 throw Exception (ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
1683- " Storage Merge requires exactly 2 parameters - name "
1684- " of source database and regexp for table names." );
1727+ " Storage Merge requires 2 or 3 parameters - name "
1728+ " of source database, regexp for table names, and optional table name for writing ." );
16851729
16861730 auto [is_regexp, database_ast] = StorageMerge::evaluateDatabaseName (engine_args[0 ], args.getLocalContext ());
16871731
@@ -1693,8 +1737,15 @@ void registerStorageMerge(StorageFactory & factory)
16931737 engine_args[1 ] = evaluateConstantExpressionAsLiteral (engine_args[1 ], args.getLocalContext ());
16941738 String table_name_regexp = checkAndGetLiteralArgument<String>(engine_args[1 ], " table_name_regexp" );
16951739
1740+ std::optional<String> table_to_write = std::nullopt ;
1741+ if (size == 3 )
1742+ {
1743+ engine_args[2 ] = evaluateConstantExpressionOrIdentifierAsLiteral (engine_args[2 ], args.getLocalContext ());
1744+ table_to_write = checkAndGetLiteralArgument<String>(engine_args[2 ], " table_to_write" );
1745+ }
1746+
16961747 return std::make_shared<StorageMerge>(
1697- args.table_id , args.columns , args.comment , source_database_name_or_regexp, is_regexp, table_name_regexp, args.getContext ());
1748+ args.table_id , args.columns , args.comment , source_database_name_or_regexp, is_regexp, table_name_regexp, table_to_write, args.getContext ());
16981749 },
16991750 {
17001751 .supports_schema_inference = true
0 commit comments