diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 4cbf5832..df5b0aff 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -25,7 +25,6 @@ import ( "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/aws/session" @@ -133,9 +132,9 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey: timeKey, fmtStrftime: timeFormatter, lastInvalidPartitionKeyIndex: -1, - timer: timer, - PluginID: pluginID, - random: random, + timer: timer, + PluginID: pluginID, + random: random, }, nil } @@ -262,21 +261,14 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() (int, error) { } outputPlugin.timer.Check() + logrus.Debugf("[kinesis %d] Sending %d events to Kinesis\n", outputPlugin.PluginID, len(outputPlugin.records)) response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ Records: outputPlugin.records, StreamName: aws.String(outputPlugin.stream), }) if err != nil { logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err) - outputPlugin.timer.Start() - if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException { - logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) - } - } - return fluentbit.FLB_RETRY, err } - logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(outputPlugin.records)) return outputPlugin.processAPIResponse(response) } @@ -284,6 +276,7 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() (int, error) { // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) (int, error) { + returnCode := fluentbit.FLB_OK if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) if aws.Int64Value(response.FailedRecordCount) == int64(len(outputPlugin.records)) { @@ -294,15 +287,12 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord logrus.Warnf("[kinesis %d] %d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount)) failedRecords := make([]*kinesis.PutRecordsRequestEntry, 0, aws.Int64Value(response.FailedRecordCount)) // try to resend failed records + returnCode = fluentbit.FLB_RETRY for i, record := range response.Records { if record.ErrorMessage != nil { logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage)) failedRecords = append(failedRecords, outputPlugin.records[i]) } - if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { - logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) - return fluentbit.FLB_RETRY, nil - } } outputPlugin.records = outputPlugin.records[:0] @@ -317,7 +307,7 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord outputPlugin.records = outputPlugin.records[:0] outputPlugin.dataLength = 0 } - return fluentbit.FLB_OK, nil + return returnCode, nil } // randomString generates a random string of length 8 diff --git a/kinesis/kinesis_test.go b/kinesis/kinesis_test.go index 1cff4317..18cfc5ec 100644 --- a/kinesis/kinesis_test.go +++ b/kinesis/kinesis_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis/mock_kinesis" "github.com/aws/aws-sdk-go/aws" @@ -40,9 +42,9 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlug dataKeys: "", partitionKey: "", lastInvalidPartitionKeyIndex: -1, - timer: timer, - PluginID: 0, - random: random, + timer: timer, + PluginID: 0, + random: random, }, nil } @@ -100,3 +102,69 @@ func TestAddRecordAndFlush(t *testing.T) { retCode = outputPlugin.Flush() assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") } + +func TestFlushRetries(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl) + outputPlugin, _ := newMockOutputPlugin(mockKinesis) + + record1 := map[interface{}]interface{}{ + "testkey1": []byte("test value 1"), + } + record2 := map[interface{}]interface{}{ + "testkey2": []byte("test value 2"), + } + timeStamp := time.Now() + + outputPlugin.AddRecord(record1, &timeStamp) + outputPlugin.AddRecord(record2, &timeStamp) + + gomock.InOrder( + // First request that cannot flush whole buffer + mockKinesis.EXPECT(). + PutRecords(&kinesis.PutRecordsInput{ + Records: outputPlugin.records, + StreamName: aws.String(outputPlugin.stream), + }). + Return( + &kinesis.PutRecordsOutput{ + FailedRecordCount: aws.Int64(1), + Records: []*kinesis.PutRecordsResultEntry{ // successfully processed record + &kinesis.PutRecordsResultEntry{ + ErrorMessage: nil, + }, + &kinesis.PutRecordsResultEntry{ // failed record + ErrorCode: aws.String(kinesis.ErrCodeProvisionedThroughputExceededException), + ErrorMessage: aws.String("Some error message"), + }, + }, + }, + awserr.New( + kinesis.ErrCodeProvisionedThroughputExceededException, + "ProvisionedThroughputExceededException", + nil), + ), + + // Second request which retries only one record which hasn't been put successfully + mockKinesis.EXPECT(). + PutRecords(&kinesis.PutRecordsInput{ + // only second record is failed, so should be retried + Records: outputPlugin.records[1:2], + StreamName: aws.String(outputPlugin.stream), + }). + Return( + &kinesis.PutRecordsOutput{ + FailedRecordCount: aws.Int64(0), + }, + nil, + ), + ) + + retCode := outputPlugin.Flush() + assert.Equal(t, retCode, fluentbit.FLB_RETRY, "Expected return code to be FLB_RETRY") + + retCode = outputPlugin.Flush() + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") +}