11mod engines;
22
33use std:: collections:: { BTreeMap , BTreeSet , HashSet } ;
4+ use std:: hash:: { DefaultHasher , Hash , Hasher } ;
45use std:: io:: { stdout, Read , Seek , SeekFrom , Write } ;
56use std:: path:: { Path , PathBuf } ;
67use std:: time:: { Duration , Instant } ;
@@ -19,7 +20,7 @@ use rand::seq::SliceRandom;
1920use sqllogictest:: substitution:: well_known;
2021use sqllogictest:: {
2122 default_column_validator, default_normalizer, default_validator, update_record_with_output,
22- AsyncDB , Injected , MakeConnection , Record , Runner ,
23+ AsyncDB , Injected , MakeConnection , Partitioner , Record , Runner ,
2324} ;
2425use tokio_util:: task:: AbortOnDropHandle ;
2526
@@ -32,6 +33,10 @@ pub enum Color {
3233 Never ,
3334}
3435
36+ // Env keys for partitioning.
37+ const PARTITION_ID_ENV_KEY : & str = "SLT_PARTITION_ID" ;
38+ const PARTITION_COUNT_ENV_KEY : & str = "SLT_PARTITION_COUNT" ;
39+
3540#[ derive( Parser , Debug , Clone ) ]
3641#[ clap( about, version, author) ]
3742struct Opt {
@@ -112,6 +117,18 @@ struct Opt {
112117 /// The engine name is a label by default.
113118 #[ clap( long = "label" ) ]
114119 labels : Vec < String > ,
120+
121+ /// Partition ID for sharding the test files. When used with `partition_count`,
122+ /// divides the test files into shards based on the hash of the file path.
123+ ///
124+ /// Useful for running tests in parallel across multiple CI jobs. Currently
125+ /// automatically configured in Buildkite.
126+ #[ clap( long, env = PARTITION_ID_ENV_KEY ) ]
127+ partition_id : Option < u64 > ,
128+
129+ /// Total number of partitions for test sharding. More details in `partition_id`.
130+ #[ clap( long, env = PARTITION_COUNT_ENV_KEY ) ]
131+ partition_count : Option < u64 > ,
115132}
116133
117134/// Connection configuration.
@@ -138,10 +155,62 @@ impl DBConfig {
138155 }
139156}
140157
158+ struct HashPartitioner {
159+ count : u64 ,
160+ id : u64 ,
161+ }
162+
163+ impl HashPartitioner {
164+ fn new ( count : u64 , id : u64 ) -> Result < Self > {
165+ if count == 0 {
166+ bail ! ( "partition count must be greater than zero" ) ;
167+ }
168+ if id >= count {
169+ bail ! ( "partition id (zero-based) must be less than count" ) ;
170+ }
171+ Ok ( Self { count, id } )
172+ }
173+ }
174+
175+ impl Partitioner for HashPartitioner {
176+ fn matches ( & self , file_name : & str ) -> bool {
177+ let mut hasher = DefaultHasher :: new ( ) ;
178+ file_name. hash ( & mut hasher) ;
179+ hasher. finish ( ) % self . count == self . id
180+ }
181+ }
182+
183+ #[ allow( clippy:: needless_return) ]
184+ fn import_partition_config_from_ci ( ) {
185+ if std:: env:: var_os ( PARTITION_ID_ENV_KEY ) . is_some ( )
186+ || std:: env:: var_os ( PARTITION_COUNT_ENV_KEY ) . is_some ( )
187+ {
188+ // Ignore if already set.
189+ return ;
190+ }
191+
192+ // Buildkite
193+ {
194+ const ID : & str = "BUILDKITE_PARALLEL_JOB" ;
195+ const COUNT : & str = "BUILDKITE_PARALLEL_JOB_COUNT" ;
196+
197+ if let ( Some ( id) , Some ( count) ) = ( std:: env:: var_os ( ID ) , std:: env:: var_os ( COUNT ) ) {
198+ std:: env:: set_var ( PARTITION_ID_ENV_KEY , id) ;
199+ std:: env:: set_var ( PARTITION_COUNT_ENV_KEY , count) ;
200+ eprintln ! ( "Imported partition config from Buildkite." ) ;
201+ return ;
202+ }
203+ }
204+
205+ // TODO: more CI providers
206+ }
207+
141208#[ tokio:: main]
142209pub async fn main ( ) -> Result < ( ) > {
143210 tracing_subscriber:: fmt:: init ( ) ;
144211
212+ import_partition_config_from_ci ( ) ;
213+
145214 let cli = Opt :: command ( ) . disable_help_flag ( true ) . arg (
146215 Arg :: new ( "help" )
147216 . long ( "help" )
@@ -167,6 +236,8 @@ pub async fn main() -> Result<()> {
167236 r#override,
168237 format,
169238 labels,
239+ partition_count,
240+ partition_id,
170241 } = Opt :: from_arg_matches ( & matches)
171242 . map_err ( |err| err. exit ( ) )
172243 . unwrap ( ) ;
@@ -205,17 +276,34 @@ pub async fn main() -> Result<()> {
205276 Color :: Auto => { }
206277 }
207278
279+ let partitioner = if let Some ( count) = partition_count {
280+ let id = partition_id. context ( "parallel job count is specified but job id is not" ) ?;
281+ Some ( HashPartitioner :: new ( count, id) ?)
282+ } else {
283+ None
284+ } ;
285+
208286 let glob_patterns = files;
209- let mut files: Vec < PathBuf > = Vec :: new ( ) ;
210- for glob_pattern in glob_patterns. into_iter ( ) {
211- let pathbufs = glob:: glob ( & glob_pattern) . context ( "failed to read glob pattern" ) ?;
212- for pathbuf in pathbufs. into_iter ( ) . try_collect :: < _ , Vec < _ > , _ > ( ) ? {
213- files. push ( pathbuf)
287+ let mut all_files = Vec :: new ( ) ;
288+
289+ for glob_pattern in glob_patterns {
290+ let mut files: Vec < PathBuf > = glob:: glob ( & glob_pattern)
291+ . context ( "failed to read glob pattern" ) ?
292+ . try_collect ( ) ?;
293+
294+ // Test against partitioner only if there are multiple files matched, e.g., expanded from an `*`.
295+ if files. len ( ) > 1 {
296+ if let Some ( partitioner) = & partitioner {
297+ let len = files. len ( ) ;
298+ files. retain ( |path| partitioner. matches ( path. to_str ( ) . unwrap ( ) ) ) ;
299+ let len_after = files. len ( ) ;
300+ eprintln ! (
301+ "Running {len_after} out of {len} test cases for glob pattern \" {glob_pattern}\" based on partitioning." ,
302+ ) ;
303+ }
214304 }
215- }
216305
217- if files. is_empty ( ) {
218- bail ! ( "no test case found" ) ;
306+ all_files. extend ( files) ;
219307 }
220308
221309 let config = DBConfig {
@@ -227,7 +315,7 @@ pub async fn main() -> Result<()> {
227315 } ;
228316
229317 if r#override || format {
230- return update_test_files ( files , & engine, config, format) . await ;
318+ return update_test_files ( all_files , & engine, config, format) . await ;
231319 }
232320
233321 let mut report = Report :: new ( junit. clone ( ) . unwrap_or_else ( || "sqllogictest" . to_string ( ) ) ) ;
@@ -241,7 +329,7 @@ pub async fn main() -> Result<()> {
241329 jobs,
242330 keep_db_on_failure,
243331 & mut test_suite,
244- files ,
332+ all_files ,
245333 & engine,
246334 config,
247335 & labels,
@@ -252,7 +340,7 @@ pub async fn main() -> Result<()> {
252340 } else {
253341 run_serial (
254342 & mut test_suite,
255- files ,
343+ all_files ,
256344 & engine,
257345 config,
258346 & labels,
0 commit comments