Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/storage/mvcc/kvstore_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal

tx := s.b.BatchTx()
tx.LockOutsideApply()
// gofail: var compactAfterAcquiredBatchTxLock struct{}
keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
for i := range keys {
rev = BytesToRev(keys[i])
Expand Down
24 changes: 1 addition & 23 deletions tests/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@
package e2e

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"testing"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/require"

"go.etcd.io/etcd/api/v3/version"
Expand Down Expand Up @@ -325,7 +320,7 @@ func TestNoMetricsMissing(t *testing.T) {
metricsURL, err := url.JoinPath(epc.Procs[0].Config().ClientURL, "metrics")
require.NoError(t, err)

mfs, err := getMetrics(metricsURL)
mfs, err := e2e.GetMetrics(metricsURL)
require.NoError(t, err)

var missingMetrics []string
Expand All @@ -342,23 +337,6 @@ func TestNoMetricsMissing(t *testing.T) {
}
}

func getMetrics(metricsURL string) (map[string]*dto.MetricFamily, error) {
httpClient := http.Client{Transport: &http.Transport{}}
resp, err := httpClient.Get(metricsURL)
if err != nil {
return nil, err
}
defer resp.Body.Close()

data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

var parser expfmt.TextParser
return parser.TextToMetricFamilies(bytes.NewReader(data))
}

// formatMetrics is only for test purpose
/*func formatMetrics(metrics []string) string {
quoted := make([]string, len(metrics))
Expand Down
113 changes: 113 additions & 0 deletions tests/e2e/reproduce_19406_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2025 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"context"
"fmt"
"net/url"
"testing"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/stringutil"
"go.etcd.io/etcd/tests/v3/framework/e2e"

"github.com/stretchr/testify/require"
)

// TestReproduce19406 reproduces the issue: https://github.com/etcd-io/etcd/issues/19406
func TestReproduce19406(t *testing.T) {
e2e.BeforeTest(t)

compactionSleepInterval := 100 * time.Millisecond
ctx := context.TODO()

clus, cerr := e2e.NewEtcdProcessCluster(ctx, t,
e2e.WithClusterSize(1),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(1),
e2e.WithCompactionSleepInterval(compactionSleepInterval),
)
require.NoError(t, cerr)
t.Cleanup(func() { require.NoError(t, clus.Stop()) })

// Produce some data
cli := newClient(t, clus.EndpointsGRPC(), e2e.ClientConfig{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can reuse the v3 client in generateTrafficAndVerifyPutLatency?

valueSize := 10
var latestRevision int64

produceKeyNum := 20
for i := 0; i <= produceKeyNum; i++ {
resp, err := cli.Put(ctx, fmt.Sprintf("%d", i), stringutil.RandString(uint(valueSize)))
require.NoError(t, err)
latestRevision = resp.Header.Revision
}

// Sleep for PerCompactionInterationInterval to simulate a single iteration of compaction lasting at least this duration.
PerCompactionInterationInterval := compactionSleepInterval
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "compactAfterAcquiredBatchTxLock",
fmt.Sprintf(`sleep("%s")`, PerCompactionInterationInterval)))

// start compaction
t.Log("start compaction...")
_, err := cli.Compact(ctx, latestRevision, clientv3.WithCompactPhysical())
require.NoError(t, err)
t.Log("finished compaction...")

// Validate that total compaction sleep interval
// Compaction runs in batches. During each batch, it acquires a lock, releases it at the end,
// and then waits for a compactionSleepInterval before starting the next batch. This pause
// allows PUT requests to be processed.
// Therefore, the total compaction sleep interval larger or equal to
// (compaction iteration number - 1) * compactionSleepInterval
httpEndpoint := clus.EndpointsHTTP()[0]
totalKeys := produceKeyNum + 1
pauseDuration, totalDuration := getEtcdCompactionMetrics(t, httpEndpoint)
require.NoError(t, err)
actualSleepInterval := time.Duration(totalDuration-pauseDuration) * time.Millisecond
expectSleepInterval := compactionSleepInterval * time.Duration(totalKeys)
t.Logf("db_compaction_pause_duration: %.2f db_compaction_total_duration: %.2f, totalKeys: %d",
pauseDuration, totalDuration, totalKeys)
require.GreaterOrEqualf(t, actualSleepInterval, expectSleepInterval,
"expect total compact sleep interval larger than (%v) but got (%v)",
expectSleepInterval, actualSleepInterval)
}

func getEtcdCompactionMetrics(t *testing.T, httpEndpoint string) (pauseDuration, totalDuration float64) {
metricsURL, err := url.JoinPath(httpEndpoint, "metrics")
require.NoError(t, err)

// Fetch metrics from the endpoint
metricFamilies, err := e2e.GetMetrics(metricsURL)
require.NoError(t, err)

// Extract sum from histogram metric
getHistogramSum := func(name string) float64 {
mf, ok := metricFamilies[name]
require.Truef(t, ok, "metric %q not found", name)
require.NotEmptyf(t, mf.Metric, "metric %q has no data", name)

hist := mf.Metric[0].GetHistogram()
require.NotEmptyf(t, hist, "metric %q is not a histogram", name)

return hist.GetSampleSum()
}

pauseDuration = getHistogramSum("etcd_debugging_mvcc_db_compaction_pause_duration_milliseconds")
totalDuration = getHistogramSum("etcd_debugging_mvcc_db_compaction_total_duration_milliseconds")

return pauseDuration, totalDuration
}
41 changes: 41 additions & 0 deletions tests/framework/e2e/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2025 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"bytes"
"io"
"net/http"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)

func GetMetrics(metricsURL string) (map[string]*dto.MetricFamily, error) {
httpClient := http.Client{Transport: &http.Transport{}}
resp, err := httpClient.Get(metricsURL)
if err != nil {
return nil, err
}
defer resp.Body.Close()

data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

var parser expfmt.TextParser
return parser.TextToMetricFamilies(bytes.NewReader(data))
}