Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions tests/integration/clientv3/concurrency/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
Expand Down Expand Up @@ -49,16 +51,12 @@ func TestResumeElection(t *testing.T) {
defer cancel()

// become leader
if err = e.Campaign(ctx, "candidate1"); err != nil {
t.Fatalf("Campaign() returned non nil err: %s", err)
}
require.NoErrorf(t, e.Campaign(ctx, "candidate1"), "Campaign() returned non nil err")

// get the leadership details of the current election
var leader *clientv3.GetResponse
leader, err = e.Leader(ctx)
if err != nil {
t.Fatalf("Leader() returned non nil err: %s", err)
}
require.NoErrorf(t, err, "Leader() returned non nil err")

// Recreate the election
e = concurrency.ResumeElection(s, prefix,
Expand Down Expand Up @@ -86,19 +84,13 @@ func TestResumeElection(t *testing.T) {
// put some random data to generate a change event, this put should be
// ignored by Observe() because it is not under the election prefix.
_, err = cli.Put(ctx, "foo", "bar")
if err != nil {
t.Fatalf("Put('foo') returned non nil err: %s", err)
}
require.NoErrorf(t, err, "Put('foo') returned non nil err")

// resign as leader
if err := e.Resign(ctx); err != nil {
t.Fatalf("Resign() returned non nil err: %s", err)
}
require.NoErrorf(t, e.Resign(ctx), "Resign() returned non nil err")

// elect a different candidate
if err := e.Campaign(ctx, "candidate2"); err != nil {
t.Fatalf("Campaign() returned non nil err: %s", err)
}
require.NoErrorf(t, e.Campaign(ctx, "candidate2"), "Campaign() returned non nil err")

// wait for observed leader change
resp := <-respChan
Expand Down
5 changes: 2 additions & 3 deletions tests/integration/clientv3/connectivity/black_hole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
defer cli.Close()

wch := cli.Watch(context.Background(), "foo", clientv3.WithCreatedNotify())
if _, ok := <-wch; !ok {
t.Fatalf("watch failed on creation")
}
_, ok := <-wch
require.Truef(t, ok, "watch failed on creation")

// endpoint can switch to eps[1] when it detects the failure of eps[0]
cli.SetEndpoints(eps...)
Expand Down
8 changes: 2 additions & 6 deletions tests/integration/clientv3/connectivity/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ func TestDialTLSExpired(t *testing.T) {
DialOptions: []grpc.DialOption{grpc.WithBlock()},
TLS: tls,
})
if !clientv3test.IsClientTimeout(err) {
t.Fatalf("expected dial timeout error, got %v", err)
}
require.Truef(t, clientv3test.IsClientTimeout(err), "expected dial timeout error")
}

// TestDialTLSNoConfig ensures the client fails to dial / times out
Expand All @@ -85,9 +83,7 @@ func TestDialTLSNoConfig(t *testing.T) {
c.Close()
}
}()
if !clientv3test.IsClientTimeout(err) {
t.Fatalf("expected dial timeout error, got %v", err)
}
require.Truef(t, clientv3test.IsClientTimeout(err), "expected dial timeout error")
}

// TestDialSetEndpointsBeforeFail ensures SetEndpoints can replace unavailable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
if len(ev.Events) != 0 {
t.Fatal("expected no event")
}
if err = ev.Err(); !errors.Is(err, rpctypes.ErrNoLeader) {
t.Fatalf("expected %v, got %v", rpctypes.ErrNoLeader, err)
}
require.ErrorIs(t, ev.Err(), rpctypes.ErrNoLeader)
case <-time.After(integration2.RequestWaitTimeout): // enough time to detect leader lost
t.Fatal("took too long to detect leader lost")
}
Expand Down Expand Up @@ -302,9 +300,7 @@ func TestDropReadUnderNetworkPartition(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
_, err = kvc.Get(ctx, "a")
cancel()
if !errors.Is(err, rpctypes.ErrLeaderChanged) {
t.Fatalf("expected %v, got %v", rpctypes.ErrLeaderChanged, err)
}
require.ErrorIsf(t, err, rpctypes.ErrLeaderChanged, "expected %v, got %v", rpctypes.ErrLeaderChanged, err)

for i := 0; i < 5; i++ {
ctx, cancel = context.WithTimeout(context.TODO(), 10*time.Second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

clientv3 "go.etcd.io/etcd/client/v3"
recipe "go.etcd.io/etcd/client/v3/experimental/recipes"
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
Expand All @@ -40,12 +42,8 @@ func TestBarrierMultiNode(t *testing.T) {

func testBarrier(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
b := recipe.NewBarrier(chooseClient(), "test-barrier")
if err := b.Hold(); err != nil {
t.Fatalf("could not hold barrier (%v)", err)
}
if err := b.Hold(); err == nil {
t.Fatalf("able to double-hold barrier")
}
require.NoErrorf(t, b.Hold(), "could not hold barrier")
require.Errorf(t, b.Hold(), "able to double-hold barrier")

// put a random key to move the revision forward
if _, err := chooseClient().Put(context.Background(), "x", ""); err != nil {
Expand Down Expand Up @@ -75,9 +73,7 @@ func testBarrier(t *testing.T, waiters int, chooseClient func() *clientv3.Client
default:
}

if err := b.Release(); err != nil {
t.Fatalf("could not release barrier (%v)", err)
}
require.NoErrorf(t, b.Release(), "could not release barrier")

timerC := time.After(time.Duration(waiters*100) * time.Millisecond)
for i := 0; i < waiters; i++ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ func TestDoubleBarrier(t *testing.T) {
default:
}

if err := b.Enter(); err != nil {
t.Fatalf("could not enter last barrier (%v)", err)
}
require.NoErrorf(t, b.Enter(), "could not enter last barrier")

timerC := time.After(time.Duration(waiters*100) * time.Millisecond)
for i := 0; i < waiters-1; i++ {
Expand Down
34 changes: 10 additions & 24 deletions tests/integration/clientv3/experimental/recipes/v3_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ func testMutexLock(t *testing.T, waiters int, chooseClient func() *clientv3.Clie
t.Fatalf("lock %d followers did not wait", i)
default:
}
if err := m.Unlock(context.TODO()); err != nil {
t.Fatalf("could not release lock (%v)", err)
}
require.NoErrorf(t, m.Unlock(context.TODO()), "could not release lock")
}
}
wg.Wait()
Expand Down Expand Up @@ -233,9 +231,7 @@ func TestMutexWaitsOnCurrentHolder(t *testing.T) {
t.Fatal("failed to receive watch response")
}
}
if putCounts != 2 {
t.Fatalf("expect 2 put events, but got %v", putCounts)
}
require.Equalf(t, 2, putCounts, "expect 2 put events, but got %v", putCounts)

newOwnerSession, err := concurrency.NewSession(cli)
if err != nil {
Expand All @@ -250,12 +246,9 @@ func TestMutexWaitsOnCurrentHolder(t *testing.T) {

select {
case wrp := <-wch:
if len(wrp.Events) != 1 {
t.Fatalf("expect a event, but got %v events", len(wrp.Events))
}
if e := wrp.Events[0]; e.Type != mvccpb.PUT {
t.Fatalf("expect a put event on prefix test-mutex, but got event type %v", e.Type)
}
require.Lenf(t, wrp.Events, 1, "expect a event, but got %v events", len(wrp.Events))
e := wrp.Events[0]
require.Equalf(t, mvccpb.PUT, e.Type, "expect a put event on prefix test-mutex, but got event type %v", e.Type)
case <-time.After(time.Second):
t.Fatalf("failed to receive a watch response")
}
Expand All @@ -266,12 +259,9 @@ func TestMutexWaitsOnCurrentHolder(t *testing.T) {
// ensures the deletion of victim waiter from server side.
select {
case wrp := <-wch:
if len(wrp.Events) != 1 {
t.Fatalf("expect a event, but got %v events", len(wrp.Events))
}
if e := wrp.Events[0]; e.Type != mvccpb.DELETE {
t.Fatalf("expect a delete event on prefix test-mutex, but got event type %v", e.Type)
}
require.Lenf(t, wrp.Events, 1, "expect a event, but got %v events", len(wrp.Events))
e := wrp.Events[0]
require.Equalf(t, mvccpb.DELETE, e.Type, "expect a delete event on prefix test-mutex, but got event type %v", e.Type)
case <-time.After(time.Second):
t.Fatal("failed to receive a watch response")
}
Expand Down Expand Up @@ -357,18 +347,14 @@ func testRWMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client
t.Fatalf("rlock %d readers did not wait", i)
default:
}
if err := wl.Unlock(); err != nil {
t.Fatalf("could not release lock (%v)", err)
}
require.NoErrorf(t, wl.Unlock(), "could not release lock")
case rl := <-rlockedC:
select {
case <-wlockedC:
t.Fatalf("rlock %d writers did not wait", i)
default:
}
if err := rl.RUnlock(); err != nil {
t.Fatalf("could not release rlock (%v)", err)
}
require.NoErrorf(t, rl.RUnlock(), "could not release rlock")
}
}
}
27 changes: 9 additions & 18 deletions tests/integration/clientv3/experimental/recipes/v3_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"

recipe "go.etcd.io/etcd/client/v3/experimental/recipes"
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
)
Expand Down Expand Up @@ -57,12 +59,8 @@ func TestQueueOneReaderOneWriter(t *testing.T) {
q := recipe.NewQueue(etcdc, "testq")
for i := 0; i < 5; i++ {
s, err := q.Dequeue()
if err != nil {
t.Fatalf("error dequeueing (%v)", err)
}
if s != fmt.Sprintf("%d", i) {
t.Fatalf("expected dequeue value %v, got %v", s, i)
}
require.NoErrorf(t, err, "error dequeueing (%v)", err)
require.Equalf(t, s, fmt.Sprintf("%d", i), "expected dequeue value %v, got %v", s, i)
}
}

Expand Down Expand Up @@ -103,25 +101,18 @@ func TestPrQueueOneReaderOneWriter(t *testing.T) {
for i := 0; i < 5; i++ {
// [0, 2] priority for priority collision to test seq keys
pr := uint16(rand.Intn(3))
if err := q.Enqueue(fmt.Sprintf("%d", pr), pr); err != nil {
t.Fatalf("error enqueuing (%v)", err)
}
require.NoErrorf(t, q.Enqueue(fmt.Sprintf("%d", pr), pr), "error enqueuing")
}

// read back items; confirm priority order is respected
lastPr := -1
for i := 0; i < 5; i++ {
s, err := q.Dequeue()
if err != nil {
t.Fatalf("error dequeueing (%v)", err)
}
require.NoErrorf(t, err, "error dequeueing (%v)", err)
curPr := 0
if _, err := fmt.Sscanf(s, "%d", &curPr); err != nil {
t.Fatalf(`error parsing item "%s" (%v)`, s, err)
}
if lastPr > curPr {
t.Fatalf("expected priority %v > %v", curPr, lastPr)
}
_, err = fmt.Sscanf(s, "%d", &curPr)
require.NoErrorf(t, err, `error parsing item "%s" (%v)`, s, err)
require.LessOrEqualf(t, lastPr, curPr, "expected priority %v > %v", curPr, lastPr)
}
}

Expand Down
Loading