4141#include < Processors/QueryPlan/ExpressionStep.h>
4242#include < Processors/QueryPlan/QueryPlan.h>
4343#include < Processors/QueryPlan/ReadFromMergeTree.h>
44+ #include < Processors/Sinks/SinkToStorage.h>
4445#include < Processors/Sources/NullSource.h>
4546#include < Processors/Transforms/FilterTransform.h>
4647#include < Processors/Transforms/MaterializingTransform.h>
@@ -83,6 +84,9 @@ extern const int SAMPLING_NOT_SUPPORTED;
8384extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
8485extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
8586extern const int STORAGE_REQUIRES_PARAMETER;
87+ extern const int UNKNOWN_TABLE;
88+ extern const int ACCESS_DENIED;
89+ extern const int TABLE_IS_READ_ONLY;
8690}
8791
8892namespace
@@ -143,6 +147,8 @@ StorageMerge::StorageMerge(
143147 const String & source_database_name_or_regexp_,
144148 bool database_is_regexp_,
145149 const DBToTableSetMap & source_databases_and_tables_,
150+ const std::optional<String> & table_to_write_,
151+ bool table_to_write_auto_,
146152 ContextPtr context_)
147153 : IStorage(table_id_)
148154 , WithContext(context_->getGlobalContext ())
@@ -151,6 +157,7 @@ StorageMerge::StorageMerge(
151157 database_is_regexp_,
152158 source_database_name_or_regexp_, {},
153159 source_databases_and_tables_)
160+ , table_to_write_auto(table_to_write_auto_)
154161{
155162 StorageInMemoryMetadata storage_metadata;
156163 storage_metadata.setColumns (columns_.empty ()
@@ -159,6 +166,8 @@ StorageMerge::StorageMerge(
159166 storage_metadata.setComment (comment);
160167 setInMemoryMetadata (storage_metadata);
161168 setVirtuals (createVirtuals ());
169+ if (!table_to_write_auto)
170+ setTableToWrite (table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
162171}
163172
164173StorageMerge::StorageMerge (
@@ -168,6 +177,8 @@ StorageMerge::StorageMerge(
168177 const String & source_database_name_or_regexp_,
169178 bool database_is_regexp_,
170179 const String & source_table_regexp_,
180+ const std::optional<String> & table_to_write_,
181+ bool table_to_write_auto_,
171182 ContextPtr context_)
172183 : IStorage(table_id_)
173184 , WithContext(context_->getGlobalContext ())
@@ -176,6 +187,7 @@ StorageMerge::StorageMerge(
176187 database_is_regexp_,
177188 source_database_name_or_regexp_,
178189 source_table_regexp_, {})
190+ , table_to_write_auto(table_to_write_auto_)
179191{
180192 StorageInMemoryMetadata storage_metadata;
181193 storage_metadata.setColumns (columns_.empty ()
@@ -184,6 +196,8 @@ StorageMerge::StorageMerge(
184196 storage_metadata.setComment (comment);
185197 setInMemoryMetadata (storage_metadata);
186198 setVirtuals (createVirtuals ());
199+ if (!table_to_write_auto)
200+ setTableToWrite (table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
187201}
188202
189203StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators (ContextPtr context_) const
@@ -293,6 +307,29 @@ void StorageMerge::forEachTable(F && func) const
293307 });
294308}
295309
310+ template <typename F>
311+ void StorageMerge::forEachTableName (F && func) const
312+ {
313+ auto database_table_iterators = database_name_or_regexp.getDatabaseIterators (getContext ());
314+
315+ for (auto & iterator : database_table_iterators)
316+ {
317+ while (iterator->isValid ())
318+ {
319+ const auto & table = iterator->table ();
320+ if (table.get () != this )
321+ {
322+ QualifiedTableName table_name;
323+ table_name.database = iterator->databaseName ();
324+ table_name.table = iterator->name ();
325+ func (table_name);
326+ }
327+
328+ iterator->next ();
329+ }
330+ }
331+ }
332+
296333bool StorageMerge::isRemote () const
297334{
298335 auto first_remote_table = traverseTablesUntil ([](const StoragePtr & table) { return table && table->isRemote (); });
@@ -1702,6 +1739,77 @@ std::optional<UInt64> StorageMerge::totalRowsOrBytes(F && func) const
17021739 return first_table ? std::nullopt : std::make_optional (total_rows_or_bytes);
17031740}
17041741
1742+ void StorageMerge::setTableToWrite (
1743+ const std::optional<String> & table_to_write_,
1744+ const String & source_database_name_or_regexp_,
1745+ bool database_is_regexp_)
1746+ {
1747+ if (!table_to_write_.has_value ())
1748+ {
1749+ table_to_write = std::nullopt ;
1750+ return ;
1751+ }
1752+
1753+ auto qualified_name = QualifiedTableName::parseFromString (*table_to_write_);
1754+
1755+ if (qualified_name.database .empty ())
1756+ {
1757+ if (database_is_regexp_)
1758+ throw Exception (ErrorCodes::BAD_ARGUMENTS, " Argument 'table_to_write' must contain database if 'db_name' is regular expression" );
1759+
1760+ qualified_name.database = source_database_name_or_regexp_;
1761+ }
1762+
1763+ table_to_write = qualified_name;
1764+ }
1765+
1766+ SinkToStoragePtr StorageMerge::write (
1767+ const ASTPtr & query,
1768+ const StorageMetadataPtr & metadata_snapshot,
1769+ ContextPtr context_,
1770+ bool async_insert)
1771+ {
1772+ const auto & access = context_->getAccess ();
1773+
1774+ if (table_to_write_auto)
1775+ {
1776+ table_to_write = std::nullopt ;
1777+ bool any_table_found = false ;
1778+ forEachTableName ([&](const auto & table_name)
1779+ {
1780+ any_table_found = true ;
1781+ if (!table_to_write.has_value () || table_to_write->getFullName () < table_name.getFullName ())
1782+ {
1783+ if (access->isGranted (AccessType::INSERT, table_name.database , table_name.table ))
1784+ table_to_write = table_name;
1785+ }
1786+ });
1787+ if (!table_to_write.has_value ())
1788+ {
1789+ if (any_table_found)
1790+ throw Exception (ErrorCodes::ACCESS_DENIED, " Not allowed to write in any suitable table for storage {}" , getName ());
1791+ else
1792+ throw Exception (ErrorCodes::UNKNOWN_TABLE, " Can't find any table to write for storage {}" , getName ());
1793+ }
1794+ }
1795+ else
1796+ {
1797+ if (!table_to_write.has_value ())
1798+ throw Exception (ErrorCodes::TABLE_IS_READ_ONLY, " Method write is not allowed in storage {} without described table to write" , getName ());
1799+
1800+ access->checkAccess (AccessType::INSERT, table_to_write->database , table_to_write->table );
1801+ }
1802+
1803+ auto database = DatabaseCatalog::instance ().getDatabase (table_to_write->database );
1804+ auto table = database->getTable (table_to_write->table , context_);
1805+ auto table_lock = table->lockForShare (
1806+ context_->getInitialQueryId (),
1807+ context_->getSettingsRef ()[Setting::lock_acquire_timeout]);
1808+ auto sink = table->write (query, metadata_snapshot, context_, async_insert);
1809+ sink->addTableLock (table_lock);
1810+ return sink;
1811+ }
1812+
17051813void registerStorageMerge (StorageFactory & factory)
17061814{
17071815 factory.registerStorage (" Merge" , [](const StorageFactory::Arguments & args)
@@ -1712,10 +1820,12 @@ void registerStorageMerge(StorageFactory & factory)
17121820
17131821 ASTs & engine_args = args.engine_args ;
17141822
1715- if (engine_args.size () != 2 )
1823+ size_t size = engine_args.size ();
1824+
1825+ if (size < 2 || size > 3 )
17161826 throw Exception (ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
1717- " Storage Merge requires exactly 2 parameters - name "
1718- " of source database and regexp for table names. " );
1827+ " Storage Merge requires 2 or 3 parameters - name "
1828+ " of source database, regexp for table names, and optional table name for writing " );
17191829
17201830 auto [is_regexp, database_ast] = StorageMerge::evaluateDatabaseName (engine_args[0 ], args.getLocalContext ());
17211831
@@ -1727,8 +1837,24 @@ void registerStorageMerge(StorageFactory & factory)
17271837 engine_args[1 ] = evaluateConstantExpressionAsLiteral (engine_args[1 ], args.getLocalContext ());
17281838 String table_name_regexp = checkAndGetLiteralArgument<String>(engine_args[1 ], " table_name_regexp" );
17291839
1840+ std::optional<String> table_to_write = std::nullopt ;
1841+ bool table_to_write_auto = false ;
1842+ if (size == 3 )
1843+ {
1844+ bool is_identifier = engine_args[2 ]->as <ASTIdentifier>();
1845+ engine_args[2 ] = evaluateConstantExpressionOrIdentifierAsLiteral (engine_args[2 ], args.getLocalContext ());
1846+ table_to_write = checkAndGetLiteralArgument<String>(engine_args[2 ], " table_to_write" );
1847+ if (is_identifier && table_to_write == " auto" )
1848+ {
1849+ if (is_regexp)
1850+ throw Exception (ErrorCodes::BAD_ARGUMENTS, " RegExp for database with auto table_to_write is forbidden" );
1851+ table_to_write_auto = true ;
1852+ }
1853+ }
1854+
17301855 return std::make_shared<StorageMerge>(
1731- args.table_id , args.columns , args.comment , source_database_name_or_regexp, is_regexp, table_name_regexp, args.getLocalContext ());
1856+ args.table_id , args.columns , args.comment , source_database_name_or_regexp, is_regexp,
1857+ table_name_regexp, table_to_write, table_to_write_auto, args.getLocalContext ());
17321858 },
17331859 {
17341860 .supports_schema_inference = true
0 commit comments