Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.flink.api.java.utils.ParameterTool;

import java.io.Serializable;
import java.util.Arrays;

public class Configuration implements Serializable {
private final ParameterTool param;
Expand All @@ -23,6 +24,15 @@ public String getString(String configKey, String defaultValue) {
return param.get(configKey, defaultValue);
}

public String[] getStringArray(String configKey, String[] defaultValue) {
String value = param.get(configKey);
if (value == null || value.trim().isEmpty()) {
return defaultValue;
}

return Arrays.stream(value.split(",")).map(String::trim).toArray(String[]::new);
}

public Integer getInteger(String configKey, Integer defaultValue) {
return param.getInt(configKey, defaultValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.junit.Test;
import org.mockito.Mock;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -36,6 +37,28 @@ public void shouldGetNullIfParamIsNotSet() {
assertNull(configuration.getString("config_not_exist"));
}

@Test
public void shouldGetStringArrayFromParamTool() {
when(parameterTool.get("config_array_key")).thenReturn("test_value, test_value_2");

assertArrayEquals(new String[]{"test_value", "test_value_2"}, configuration.getStringArray("config_array_key", new String[]{"default_not_used"}));
}

@Test
public void shouldGetNullStringArrayIfParamIsNotSet() {
String[] defaultValue = new String[]{"default"};

assertArrayEquals(defaultValue, configuration.getStringArray("config_not_exist", defaultValue));
}

@Test
public void shouldGetEmptyStringArrayForBlankValue() {
String[] defaultValue = new String[]{"default"};
when(parameterTool.get("config_array_key")).thenReturn(" ");

assertArrayEquals(defaultValue, configuration.getStringArray("config_array_key", defaultValue));
}

@Test
public void shouldGetIntegerFromParamTool() {
when(parameterTool.getInt("test_config", 1)).thenReturn(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,5 +232,4 @@ private void addSink(StreamInfo streamInfo) {
sinkOrchestrator.addSubscriber(telemetryExporter);
streamInfo.getDataStream().sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator, daggerStatsDReporter));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public SinkOrchestrator(MetricsTelemetryExporter telemetryExporter) {
* @columnNames columnNames the column names
* @StencilClientOrchestrator stencilClientOrchestrator the stencil client orchestrator
*/
public Sink getSink(Configuration configuration, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator, DaggerStatsDReporter daggerStatsDReporter) {
public Sink getSink(Configuration configuration, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator,
DaggerStatsDReporter daggerStatsDReporter, String influxMeasurementOverrideName) {
String sinkType = configuration.getString("SINK_TYPE", "influx");
addMetric(TelemetryTypes.SINK_TYPE.getValue(), sinkType);
Sink sink;
Expand Down Expand Up @@ -85,12 +86,18 @@ public Sink getSink(Configuration configuration, String[] columnNames, StencilCl
.build();
break;
default:
sink = new InfluxDBSink(new InfluxDBFactoryWrapper(), configuration, columnNames, new ErrorHandler());
sink = new InfluxDBSink(new InfluxDBFactoryWrapper(), configuration, columnNames, new ErrorHandler(), influxMeasurementOverrideName);
}
notifySubscriber();
return sink;
}

public Sink getSink(Configuration configuration, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator,
DaggerStatsDReporter daggerStatsDReporter) {
String influxMeasurementOverrideName = null;
return getSink(configuration, columnNames, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName);
}

private void reportTelemetry(KafkaSerializerBuilder kafkaSchemaBuilder) {
TelemetryPublisher pub = (TelemetryPublisher) kafkaSchemaBuilder;
pub.addSubscriber(telemetryExporter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ public class InfluxDBSink implements Sink<Row, Void, Void, Void> {
private String[] columnNames;
private ErrorHandler errorHandler;
private ErrorReporter errorReporter;
private final String influxMeasurementOverrideName;

public InfluxDBSink(InfluxDBFactoryWrapper influxDBFactory, Configuration configuration, String[] columnNames, ErrorHandler errorHandler) {
public InfluxDBSink(InfluxDBFactoryWrapper influxDBFactory, Configuration configuration, String[] columnNames,
ErrorHandler errorHandler, String influxMeasurementOverrideName) {
this.influxDBFactory = influxDBFactory;
this.configuration = configuration;
this.columnNames = columnNames;
this.errorHandler = errorHandler;
this.influxMeasurementOverrideName = influxMeasurementOverrideName;
}

@Override
Expand All @@ -46,7 +49,7 @@ public SinkWriter<Row, Void, Void> createWriter(InitContext context, List<Void>
errorReporter = ErrorReporterFactory.getErrorReporter(context.metricGroup(), configuration);
}

InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDB, columnNames, errorHandler, errorReporter);
InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDB, columnNames, errorHandler, errorReporter, influxMeasurementOverrideName);
return influxDBWriter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ public class InfluxDBWriter implements SinkWriter<Row, Void, Void> {
private ErrorReporter errorReporter;
private boolean useRowFieldNames;

public InfluxDBWriter(Configuration configuration, InfluxDB influxDB, String[] columnNames, ErrorHandler errorHandler, ErrorReporter errorReporter) {
public InfluxDBWriter(Configuration configuration, InfluxDB influxDB, String[] columnNames, ErrorHandler errorHandler,
ErrorReporter errorReporter, String influxMeasurementOverrideName) {
databaseName = configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT);
retentionPolicy = configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT);
measurementName = configuration.getString(Constants.SINK_INFLUX_MEASUREMENT_NAME_KEY, Constants.SINK_INFLUX_MEASUREMENT_NAME_DEFAULT);
if (Strings.isNullOrEmpty(influxMeasurementOverrideName)) {
measurementName = configuration.getString(Constants.SINK_INFLUX_MEASUREMENT_NAME_KEY, Constants.SINK_INFLUX_MEASUREMENT_NAME_DEFAULT);
} else {
measurementName = influxMeasurementOverrideName;
}
useRowFieldNames = configuration.getBoolean(Constants.SINK_INFLUX_USING_ROW_FIELD_NAMES_KEY, Constants.SINK_INFLUX_USING_ROW_FIELD_NAMES_DEFAULT);
this.influxDB = influxDB;
this.columnNames = columnNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ public class Constants {
public static final String SINK_INFLUX_RETENTION_POLICY_DEFAULT = "";
public static final String SINK_INFLUX_MEASUREMENT_NAME_KEY = "SINK_INFLUX_MEASUREMENT_NAME";
public static final String SINK_INFLUX_MEASUREMENT_NAME_DEFAULT = "";

// A custom job can use this configuration to get all Influx measurement names as a list
// and configure them in the job builder pipeline accordingly.
// The initial design assumed custom job authors would know the sink targets and hardcode them.
// If measurement names need to change, they can now be updated through configuration without changing the code.
public static final String SINK_INFLUX_MEASUREMENTS_LIST_KEY = "SINK_INFLUX_MEASUREMENTS_LIST";

public static final String SINK_INFLUX_URL_KEY = "SINK_INFLUX_URL";
public static final String SINK_INFLUX_URL_DEFAULT = "";
public static final String SINK_INFLUX_USERNAME_KEY = "SINK_INFLUX_USERNAME";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.MockitoAnnotations.initMocks;

public class SinkOrchestratorTest {
private final String influxMeasurementOverrideName = "";

private static final String SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS = "SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS";
private static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE = "com.gotocompany.dagger.core.utils.SinkKafkaConfigUtil";
Expand All @@ -51,23 +52,23 @@ public void setup() {
@Test
public void shouldGiveInfluxSinkWhenConfiguredToUseInflux() throws Exception {
when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("influx");
Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter);
Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName);

assertThat(sinkFunction, instanceOf(InfluxDBSink.class));
}

@Test
public void shouldGiveLogSinkWhenConfiguredToUseLog() throws Exception {
when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("log");
Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter);
Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName);

assertThat(sinkFunction, instanceOf(LogSink.class));
}

@Test
public void shouldGiveInfluxWhenConfiguredToUseNothing() throws Exception {
when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("");
Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter);
Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName);

assertThat(sinkFunction, instanceOf(InfluxDBSink.class));
}
Expand Down Expand Up @@ -107,7 +108,7 @@ public void shouldReturnSinkMetrics() {

when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("influx");

sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter);
sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName);
assertEquals(expectedMetrics, sinkOrchestrator.getTelemetry());
}

Expand All @@ -116,7 +117,7 @@ public void shouldReturnBigQuerySink() {
when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("bigquery");
when(configuration.getString("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", "")).thenReturn("some.class");
when(configuration.getParam()).thenReturn(ParameterTool.fromMap(Collections.emptyMap()));
Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter);
Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName);
assertThat(sinkFunction, instanceOf(BigQuerySink.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import static org.mockito.MockitoAnnotations.initMocks;

public class InfluxDBSinkTest {

private final String influxMeasurementOverrideName = "";
private static final int SINK_INFLUX_BATCH_SIZE = 100;
private static final int INFLUX_FLUSH_DURATION = 1000;

Expand Down Expand Up @@ -74,7 +74,7 @@ public void setUp() throws Exception {

@Test
public void shouldCallInfluxDbFactoryWhileCreatingWriter() throws Exception {
InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler);
InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler, influxMeasurementOverrideName);
List<Void> state = new ArrayList<>();
influxDBSink.createWriter(context, state);

Expand All @@ -83,7 +83,7 @@ public void shouldCallInfluxDbFactoryWhileCreatingWriter() throws Exception {

@Test
public void shouldCreateInfluxWriter() throws IOException {
InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler);
InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler, influxMeasurementOverrideName);
List<Void> state = new ArrayList<>();
SinkWriter<Row, Void, Void> writer = influxDBSink.createWriter(context, state);

Expand All @@ -92,7 +92,7 @@ public void shouldCreateInfluxWriter() throws IOException {

@Test
public void shouldCallBatchModeOnInfluxWhenBatchSettingsExist() throws Exception {
InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler);
InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler, influxMeasurementOverrideName);
List<Void> state = new ArrayList<>();
influxDBSink.createWriter(context, state);
verify(influxDb).enableBatch(eq(SINK_INFLUX_BATCH_SIZE), eq(INFLUX_FLUSH_DURATION), eq(TimeUnit.MILLISECONDS), any(ThreadFactory.class), any(BiConsumer.class));
Expand Down
Loading
Loading