Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions csharp/src/Drivers/Databricks/DatabricksConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
using Apache.Arrow.Adbc.Drivers.Apache.Hive2.Client;
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
using Apache.Arrow.Adbc.Drivers.Databricks.Auth;
using Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch;
using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry;
using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Enums;
using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Model;
using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift.Protocol;
Expand Down Expand Up @@ -66,13 +71,20 @@ internal class DatabricksConnection : SparkHttpConnection
private string _traceParentHeaderName = "traceparent";
private bool _traceStateEnabled = false;

private DriverConnectionParameters _connectionParams;
private HostDetails _hostDetails;
private ClientContext _clientContext;

// Default namespace
private TNamespace? _defaultNamespace;

private HttpClient? _authHttpClient;

public DatabricksConnection(IReadOnlyDictionary<string, string> properties) : base(properties)
{
_connectionParams = new DriverConnectionParameters();
_hostDetails = new HostDetails();
_clientContext = new ClientContext();
ValidateProperties();
}

Expand Down Expand Up @@ -264,6 +276,38 @@ private void ValidateProperties()
// Default QueryTimeSeconds in Hive2Connection is only 60s, which is too small for lots of long running query
QueryTimeoutSeconds = DefaultQueryTimeSeconds;
}

//Telemetry
if (Properties.TryGetValue(SparkParameters.AuthType, out string? authType))
{
_connectionParams.AuthMech = Util.StringToAuthMech(authType);
}

if (Properties.TryGetValue(SparkParameters.HostName, out string? host))
{
_hostDetails.HostUrl = host;
}
if (Properties.TryGetValue(SparkParameters.Port, out string? port))
{
_hostDetails.Port = Int32.Parse(port);
}
_connectionParams.HostInfo = _hostDetails;

if (Properties.TryGetValue(SparkParameters.UserAgentEntry, out string? userAgent))
{
_clientContext.UserAgent = userAgent;
}

string? token = null;
if(Properties.TryGetValue(SparkParameters.AccessToken, out string? accessToken))
{
token = accessToken;
}
else if(Properties.TryGetValue(SparkParameters.Token, out string? accesstoken))
{
token = accesstoken;
}
TelemetryHelper.SetParameters(_connectionParams, _clientContext, token);
}

/// <summary>
Expand Down Expand Up @@ -403,6 +447,7 @@ protected override HttpMessageHandler CreateHttpHandler()
}
}

TelemetryHelper.InitializeTelemetryClient(_authHttpClient);
return baseHandler;
}

Expand Down Expand Up @@ -737,6 +782,15 @@ protected override void Dispose(bool disposing)
if (disposing)
{
_authHttpClient?.Dispose();

try
{
TelemetryHelper.ForceFlushAsync().Wait(TimeSpan.FromSeconds(2));
}
catch (Exception ex)
{
Debug.WriteLine($"Failed to flush telemetry on dispose: {ex.Message}");
}
}
base.Dispose(disposing);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a readme file in this folder, on how is the file generated.

* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Diagnostics;
using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Model;
using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Enums;
using System;

namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
{
public class DatabricksActivityListener : IDisposable
{

private ActivityListener _activityListener;

public DatabricksActivityListener()
{
this._activityListener = new ActivityListener
{
//ShouldListenTo = (activitySource) => activitySource.Name == sourceName,
ShouldListenTo = _ => true,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
ActivityStarted = OnActivityStarted,
ActivityStopped = OnActivityStopped,
};
ActivitySource.AddActivityListener(_activityListener);
}

private void OnActivityStarted(Activity activity)
{
}

private void OnActivityStopped(Activity activity)
{
if(activity.OperationName == "ExecuteStatementAsync")
{
var sqlExecutionEvent = new SqlExecutionEvent();
var operationDetail = new OperationDetail();
operationDetail.OperationType = Util.StringToOperationType("EXECUTE_STATEMENT_ASYNC");
sqlExecutionEvent.OperationDetail = operationDetail;
TelemetryHelper.AddSqlExecutionEvent(sqlExecutionEvent);
}
// iterate over the tags and create a telemetry event
foreach (var tag in activity.Tags)
{
// example tag and handling
if(tag.Key == "sql.query")
{
var sqlExecutionEvent = new SqlExecutionEvent();
var operationDetail = new OperationDetail();
operationDetail.OperationType = Util.StringToOperationType(tag.Value);
sqlExecutionEvent.StatementType = StatementType.QUERY;
sqlExecutionEvent.OperationDetail = operationDetail;
TelemetryHelper.AddSqlExecutionEvent(sqlExecutionEvent);
}
}
}

public void Dispose()
{
this._activityListener.Dispose();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry;

public class DatabricksConnectionConfig
{
public static readonly int MAX_BATCH_SIZE = 200;
public static readonly int FLUSH_INTERVAL_MILLIS = 300000; // 5 minutes
}
3 changes: 3 additions & 0 deletions csharp/src/Drivers/Databricks/Telemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Databricks Telemetry

Uses `DatabricksActivityListener.cs` to record events.
124 changes: 124 additions & 0 deletions csharp/src/Drivers/Databricks/Telemetry/TelemetryClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text.Json;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Model;

namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
{
public class TelemetryClient
{
private readonly HttpClient _httpClient;
private readonly string? _telemetryUrl;
private readonly string? _accessToken;

public TelemetryClient(HttpClient httpClient, string? hostUrl, string? accessToken)
{
_httpClient = httpClient;
_accessToken = accessToken;
_telemetryUrl = !string.IsNullOrEmpty(hostUrl) ? accessToken != null ? $"https://{hostUrl}/telemetry" : $"https://{hostUrl}/telemetry-unauth" : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be included with the driver. We have OpenTelemetry implemented, and the way to get this is via the OTEL logger. This is basically forcing extra traffic (especially on the service side) that isn't needed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I was unaware of this. Can you point me to how to access the OTEL logger? Thank you!

}

/// <summary>
/// Sends a batch of telemetry events asynchronously
/// </summary>
/// <param name="telemetryBatch">List of telemetry events to send</param>
/// <returns>Task representing the async operation</returns>
public async Task<bool> SendTelemetryBatchAsync(List<TelemetryFrontendLog> telemetryBatch)
{
if (string.IsNullOrEmpty(_telemetryUrl) || telemetryBatch.Count == 0)
{
return false;
}

try
{
var request = new HttpRequestMessage(HttpMethod.Post, _telemetryUrl);

// Serialize the batch to JSON
var telemetryRequest = new TelemetryRequest();
telemetryRequest.UploadTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
telemetryRequest.ProtoLogs = telemetryBatch.Select(x => JsonSerializer.Serialize(x)).ToList();
request.Content = new StringContent(JsonSerializer.Serialize(telemetryRequest));

// Set headers
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
if(_accessToken != null)
{
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _accessToken);
}

var response = await _httpClient.SendAsync(request);
return response.IsSuccessStatusCode;
}
catch (Exception ex)
{
// Log the exception but don't throw to prevent telemetry failures from affecting main functionality
System.Diagnostics.Debug.WriteLine($"Failed to send telemetry: {ex.Message}");
return false;
}
}

/// <summary>
/// Sends a single telemetry event asynchronously
/// </summary>
/// <param name="telemetryEvent">Single telemetry event to send</param>
/// <returns>Task representing the async operation</returns>
public async Task<bool> SendTelemetryAsync(TelemetryFrontendLog telemetryEvent)
{
if (string.IsNullOrEmpty(_telemetryUrl))
{
return false;
}

try
{
var request = new HttpRequestMessage(HttpMethod.Post, _telemetryUrl);

// Serialize the event to JSON
var telemetryRequest = new TelemetryRequest();
telemetryRequest.UploadTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
telemetryRequest.ProtoLogs = new List<string> { JsonSerializer.Serialize(telemetryEvent) };
request.Content = new StringContent(JsonSerializer.Serialize(telemetryRequest));

// Set headers
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
if(_accessToken != null)
{
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _accessToken);
}

var response = await _httpClient.SendAsync(request);
return response.IsSuccessStatusCode;
}
catch (Exception ex)
{
// Log the exception but don't throw to prevent telemetry failures from affecting main functionality
System.Diagnostics.Debug.WriteLine($"Failed to send telemetry: {ex.Message}");
return false;
}
}
}
}
Loading