Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions ClickHouse.Driver.Tests/ADO/ConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ public async Task TimeoutShouldCancelConnection()
_ = await task;
Assert.Fail("The task should have been cancelled before completion");
}
#if NETFRAMEWORK
catch (WebException ex) when (ex.Status == WebExceptionStatus.RequestCanceled)
{
/* Expected: request cancelled */
}
#endif
catch (TaskCanceledException)
{
/* Expected: task cancelled */
Expand All @@ -99,7 +105,7 @@ public async Task ServerShouldSetQueryId()
[Test]
public async Task ClientShouldSetQueryId()
{
string queryId = "MyQueryId123456";
string queryId = GetUniqueQueryId("MyQueryId123456");
var command = connection.CreateCommand();
command.CommandText = "SELECT 1";
command.QueryId = queryId;
Expand All @@ -108,7 +114,7 @@ public async Task ClientShouldSetQueryId()
}

[Test]
public async Task ClientShouldSetUserAgent()
public void ClientShouldSetUserAgent()
{
var headers = new HttpRequestMessage().Headers;
connection.AddDefaultHttpHeaders(headers);
Expand All @@ -121,7 +127,7 @@ public async Task ClientShouldSetUserAgent()
public async Task ReplaceRunningQuerySettingShouldReplace()
{
connection.CustomSettings.Add("replace_running_query", 1);
string queryId = "MyQueryId123456";
string queryId = GetUniqueQueryId("MyQueryId123456");

var command1 = connection.CreateCommand();
var command2 = connection.CreateCommand();
Expand Down Expand Up @@ -220,7 +226,7 @@ public void ShouldSaveProtocolAtConnectionString(string protocol)
[Test]
public async Task ShouldPostDynamicallyGeneratedRawStream()
{
var targetTable = "test.raw_stream";
var targetTable = $"test.{SanitizeTableName("raw_stream")}";

await connection.ExecuteStatementAsync($"DROP TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value Int32) ENGINE Null");
Expand Down
30 changes: 29 additions & 1 deletion ClickHouse.Driver.Tests/AbstractConnectionTestFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace ClickHouse.Driver.Tests;

[TestFixture]
public class AbstractConnectionTestFixture : IDisposable
public abstract class AbstractConnectionTestFixture : IDisposable
{
protected readonly ClickHouseConnection connection;

Expand All @@ -27,9 +27,37 @@ protected static string SanitizeTableName(string input)
builder.Append(c);
}

// When running in parallel, we need to avoid false failures due to running against the same tables
var frameworkSuffix = GetFrameworkSuffix();
if (!string.IsNullOrEmpty(frameworkSuffix))
builder.Append('_').Append(frameworkSuffix);

return builder.ToString();
}

private static string GetFrameworkSuffix()
{
#if NET462
return "net462";
#elif NET48
return "net48";
#elif NET6_0
return "net6";
#elif NET8_0
return "net8";
#elif NET9_0
return "net9";
#else
return "";
#endif
}

protected static string GetUniqueQueryId(string baseId)
{
var suffix = GetFrameworkSuffix();
return !string.IsNullOrEmpty(suffix) ? $"{baseId}_{suffix}" : baseId;
}

[OneTimeTearDown]
public void Dispose() => connection?.Dispose();
}
34 changes: 17 additions & 17 deletions ClickHouse.Driver.Tests/BulkCopy/BulkCopyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static IEnumerable<TestCaseData> GetInsertSingleValueTestCases()
[TestCaseSource(typeof(BulkCopyTests), nameof(GetInsertSingleValueTestCases))]
public async Task ShouldExecuteSingleValueInsertViaBulkCopy(string clickHouseType, object insertedValue)
{
var targetTable = "test." + SanitizeTableName($"bulk_single_{clickHouseType}");
var targetTable = $"test.{SanitizeTableName($"bulk_single_{clickHouseType}")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value {clickHouseType}) ENGINE Memory");
Expand Down Expand Up @@ -70,7 +70,7 @@ public async Task ShouldExecuteSingleValueInsertViaBulkCopy(string clickHouseTyp
[RequiredFeature(Feature.Date32)]
public async Task ShouldInsertDateOnly()
{
var targetTable = "test.bulk_dateonly";
var targetTable = "test." + SanitizeTableName("bulk_dateonly");

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value Date32) ENGINE Memory");
Expand Down Expand Up @@ -102,7 +102,7 @@ public async Task ShouldExecuteMultipleBulkInsertions()
var sw = new Stopwatch();
var duration = TimeSpan.FromMinutes(5);

var targetTable = "test." + SanitizeTableName($"bulk_load_test");
var targetTable = $"test.{SanitizeTableName($"bulk_load_test")}";

await connection.ExecuteStatementAsync($"DROP TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (int Int32, str String, dt DateTime) ENGINE Null");
Expand Down Expand Up @@ -138,7 +138,7 @@ public async Task ShouldExecuteMultipleBulkInsertions()
[Test]
public async Task ShouldExecuteInsertWithLessColumns()
{
var targetTable = $"test.multiple_columns";
var targetTable = $"test.{SanitizeTableName("multiple_columns")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value1 Nullable(UInt8), value2 Nullable(Float32), value3 Nullable(Int8)) ENGINE Memory");
Expand All @@ -157,7 +157,7 @@ public async Task ShouldExecuteInsertWithLessColumns()
[Test]
public async Task ShouldExecuteInsertWithBacktickedColumns()
{
var targetTable = $"test.backticked_columns";
var targetTable = $"test.{SanitizeTableName("backticked_columns")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (`field.id` Nullable(UInt8), `@value` Nullable(UInt8)) ENGINE Memory");
Expand All @@ -176,7 +176,7 @@ public async Task ShouldExecuteInsertWithBacktickedColumns()
[Test]
public async Task ShouldDetectColumnsAutomaticallyOnInit()
{
var targetTable = $"test.auto_detect_columns";
var targetTable = $"test.{SanitizeTableName("auto_detect_columns")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (field1 UInt8, field2 Int8, field3 String) ENGINE Memory");
Expand Down Expand Up @@ -208,7 +208,7 @@ public async Task ShouldDetectColumnsAutomaticallyOnInit()
[TestCase("with!exclamation")]
public async Task ShouldExecuteBulkInsertWithComplexColumnName(string columnName)
{
var targetTable = "test." + SanitizeTableName($"bulk_complex_{columnName}");
var targetTable = $"test.{SanitizeTableName($"bulk_complex_{columnName}")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (`{columnName.Replace("`", "\\`")}` Int32) ENGINE Memory");
Expand All @@ -229,7 +229,7 @@ public async Task ShouldExecuteBulkInsertWithComplexColumnName(string columnName
[Test]
public async Task ShouldInsertIntoTableWithLotsOfColumns()
{
var tableName = "test.bulk_long_columns";
var tableName = $"test.{SanitizeTableName("bulk_long_columns")}";
var columnCount = 3900;

//Generating create tbl statement with a lot of columns
Expand All @@ -252,7 +252,7 @@ public async Task ShouldInsertIntoTableWithLotsOfColumns()
[Test]
public async Task ShouldThrowSpecialExceptionOnSerializationFailure()
{
var targetTable = "test." + SanitizeTableName($"bulk_exception_uint8");
var targetTable = $"test.{SanitizeTableName($"bulk_exception_uint8")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value UInt8) ENGINE Memory");
Expand All @@ -276,7 +276,7 @@ public async Task ShouldThrowSpecialExceptionOnSerializationFailure()
[Test]
public async Task ShouldExecuteBulkInsertIntoSimpleAggregatedFunctionColumn()
{
var targetTable = "test." + SanitizeTableName($"bulk_simple_aggregated_function");
var targetTable = $"test.{SanitizeTableName($"bulk_simple_aggregated_function")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value SimpleAggregateFunction(anyLast,Nullable(Float64))) ENGINE Memory");
Expand All @@ -303,7 +303,7 @@ public async Task ShouldExecuteBulkInsertIntoSimpleAggregatedFunctionColumn()
[Test]
public async Task ShouldNotLoseRowsOnMultipleBatches()
{
var targetTable = "test.bulk_multiple_batches"; ;
var targetTable = $"test.{SanitizeTableName("bulk_multiple_batches")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value Int32) ENGINE Memory");
Expand Down Expand Up @@ -331,7 +331,7 @@ public async Task ShouldNotLoseRowsOnMultipleBatches()
[Test]
public async Task ShouldExecuteWithDBNullArrays()
{
var targetTable = $"test.bulk_dbnull_array";
var targetTable = $"test.{SanitizeTableName("bulk_dbnull_array")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (stringValue Array(String), intValue Array(Int32)) ENGINE Memory");
Expand All @@ -355,7 +355,7 @@ await bulkCopy.WriteToServerAsync(new List<object[]>
[Test]
public async Task ShouldInsertNestedTable()
{
var targetTable = "test.bulk_nested";
var targetTable = $"test.{SanitizeTableName("bulk_nested")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (`_id` UUID, `Comments` Nested(Id Nullable(String), Comment Nullable(String))) ENGINE Memory");
Expand All @@ -381,7 +381,7 @@ public async Task ShouldInsertNestedTable()
[Test]
public async Task ShouldInsertDoubleNestedTable()
{
var targetTable = "test.bulk_double_nested";
var targetTable = $"test.{SanitizeTableName("bulk_double_nested")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (Id Int64, Threads Nested(Id Int64, Comments Nested(Id Int64, Text String))) ENGINE Memory");
Expand Down Expand Up @@ -427,7 +427,7 @@ public async Task ShouldThrowExceptionOnInnerException(double fraction)
const int setSize = 3000000;
int dbNullIndex = (int)(setSize * fraction);

var targetTable = "test." + SanitizeTableName($"bulk_million_inserts");
var targetTable = $"test.{SanitizeTableName($"bulk_million_inserts")}";


var data = Enumerable.Repeat(new object[] { 1 }, setSize).ToArray();
Expand All @@ -451,7 +451,7 @@ public async Task ShouldThrowExceptionOnInnerException(double fraction)
[Test]
public async Task ShouldNotAffectSharedArrayPool()
{
var targetTable = "test." + SanitizeTableName($"array_pool");
var targetTable = $"test.{SanitizeTableName($"array_pool")}";

await connection.ExecuteStatementAsync($"DROP TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (int Int32, str String, dt DateTime) ENGINE Null");
Expand All @@ -475,7 +475,7 @@ public async Task ShouldNotAffectSharedArrayPool()
[RequiredFeature(Feature.Json)]
public async Task ShouldInsertJson()
{
var targetTable = "test." + SanitizeTableName($"bulk_json");
var targetTable = $"test.{SanitizeTableName($"bulk_json")}";
await connection.ExecuteStatementAsync($"DROP TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value JSON) ENGINE Memory");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ private static IEnumerable<TestCaseData> Get()
[TestCaseSource(typeof(BulkCopyWithDefaultsTests), nameof(Get))]
public async Task ShouldExecuteSingleValueInsertViaBulkCopyWithDefaults(string clickhouseType, object insertValue, object expectedValue, string tableName)
{
var targetTable = "test." + SanitizeTableName($"bulk_single_default_{tableName}");
var targetTable = $"test.{SanitizeTableName($"bulk_single_default_{tableName}")}";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync(
$"CREATE TABLE IF NOT EXISTS {targetTable} (`value` {clickhouseType}) ENGINE Memory");

// Use server time, otherwise a mismatch between the backing instance and the running client can cause false test failures
if (clickhouseType.Contains("toDate(now())") && insertValue == DBDefault.Value && expectedValue is DateTime time && time == DateTime.Today)
expectedValue = await connection.ExecuteScalarAsync("SELECT toDate(now())");

using var bulkCopyWithDefaults = new ClickHouseBulkCopy(connection, RowBinaryFormat.RowBinaryWithDefaults)
{
DestinationTableName = targetTable,
Expand Down
18 changes: 15 additions & 3 deletions ClickHouse.Driver.Tests/ClickHouse.Driver.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net462;net48;net6.0;net8.0;net9.0</TargetFrameworks>
Expand All @@ -17,8 +17,6 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Dapper" Version="2.1.66" />
<PackageReference Include="Dapper.Contrib" Version="2.0.78" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand All @@ -38,6 +36,20 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net6.0'">
<PackageReference Include="Dapper" Version="2.0.123" />
<PackageReference Include="Dapper.Contrib" Version="2.0.78" />
<PackageReference Include="OpenTelemetry" Version="1.6.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' != 'net6.0'">
<PackageReference Include="Dapper" Version="2.1.66" />
<PackageReference Include="Dapper.Contrib" Version="2.0.78" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net8.0' OR '$(TargetFramework)' == 'net9.0' OR '$(TargetFramework)' == 'net48' OR '$(TargetFramework)' == 'net462'">
<PackageReference Include="OpenTelemetry" Version="1.12.0" />
</ItemGroup>

Expand Down
38 changes: 38 additions & 0 deletions ClickHouse.Driver.Tests/Extensions/AssertionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Globalization;

namespace ClickHouse.Driver.Tests.Extensions;

internal static class AssertionExtensions
{
private const double DefaultEpsilon = 1e-7;

public static void AssertFloatingPointEquals(this string actualResult, object expectedValue, double epsilon = DefaultEpsilon)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a global DefaultFloatingPointTolerance for the whole project now, we can just parse the string and compare using the regular NUnit method.

{
switch (expectedValue)
{
case float @float:
float.Parse(actualResult, CultureInfo.InvariantCulture).AssertFloatingPointEquals(@float, (float)epsilon);
break;
case double @double:
double.Parse(actualResult, CultureInfo.InvariantCulture).AssertFloatingPointEquals(@double, epsilon);
break;
default:
var expected = Convert.ToString(expectedValue, CultureInfo.InvariantCulture);
Assert.That(actualResult, Is.EqualTo(expected));
break;
}
}

public static void AssertFloatingPointEquals(this double actual, double expected, double epsilon = DefaultEpsilon)
{
Assert.That(Math.Abs(actual - expected), Is.LessThan(epsilon),
$"Expected: {expected}, Actual: {actual}");
}

public static void AssertFloatingPointEquals(this float actual, float expected, float epsilon = (float)DefaultEpsilon)
{
Assert.That(Math.Abs(actual - expected), Is.LessThan(epsilon),
$"Expected: {expected}, Actual: {actual}");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Http;
using System.Threading;

namespace ClickHouse.Driver.Tests.Infrastructure;

/// <summary>
/// HttpClientFactory that uses connection pooling to prevent port exhaustion during heavy parallel load in .NET Framework TFMs.
/// </summary>
internal sealed class TestPoolHttpClientFactory : IHttpClientFactory
{
#if NETFRAMEWORK
private const int PoolSize = 16;
Copy link
Collaborator

@alex-clickhouse alex-clickhouse Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a pool of Handlers here? If port exhaustion is an issue I don't see how this fixes it...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that on first look, we shouldn't need this, but the way these connections are managed in this TFM, it's not possible to get a clean test run end to end (after multiple repetitions) without it. Easiest confirmation is to remove this to see the reproduction eventually.

I can take another look at the internals to try to avoid this, but my instinct is very few actual users are using this TFM. Most will use .NET 4.8 and I believe this TFM should be dropped to enable much better performance and availability of lower level memory primitives, which is why I decided to build my own driver.

But I can review this to complete the PR.

I do want to get in touch with you directly, if possible, about this work.

private static readonly ConcurrentBag<HttpClientHandler> HandlerPool = new ConcurrentBag<HttpClientHandler>();
private static readonly int[] Slots = new int[1];
private static readonly HttpClientHandler[] Handlers;

static TestPoolHttpClientFactory()
{
Handlers = new HttpClientHandler[PoolSize];
for (int i = 0; i < PoolSize; i++)
{
var handler = new HttpClientHandler()
{
AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate,
MaxConnectionsPerServer = 100,
UseProxy = false
};
Handlers[i] = handler;
HandlerPool.Add(handler);
}
}

public TimeSpan Timeout { get; init; } = TimeSpan.FromMinutes(2);

public HttpClient CreateClient(string name)
{
var index = (uint)Interlocked.Increment(ref Slots[0]) % PoolSize;
return new HttpClient(Handlers[index], false) { Timeout = Timeout };
}
#else
private static readonly HttpClientHandler DefaultHttpClientHandler = new() { AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate };

public TimeSpan Timeout { get; init; } = TimeSpan.FromMinutes(2);

public HttpClient CreateClient(string name) => new(DefaultHttpClientHandler, false) { Timeout = Timeout };
#endif
}
Loading
Loading