Skip to content
Merged
115 changes: 89 additions & 26 deletions cmd/tx.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package cmd

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/cosmos/cosmos-sdk/client/flags"
sdk "github.com/cosmos/cosmos-sdk/types"
clienttypes "github.com/cosmos/ibc-go/v8/modules/core/02-client/types"
chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types"
"github.com/cosmos/ibc-go/v8/modules/core/exported"
Expand All @@ -15,6 +17,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)

// transactionCmd represents the tx command
Expand Down Expand Up @@ -457,23 +460,53 @@ func relayMsgsCmd(ctx *config.Context) *cobra.Command {
return err
}

msgs := core.NewRelayMsgs()
relay := func(ctx context.Context, isSrcToDst bool, packets core.PacketInfoList, sh core.SyncHeaders, doExecuteRelay, doExecuteAck, doRefresh bool) ([]sdk.Msg, error) {
msgs := make([]sdk.Msg, 0, len(packets)+1)

doExecuteRelaySrc := len(sp.Dst) > 0
doExecuteRelayDst := len(sp.Src) > 0
doExecuteAckSrc := false
doExecuteAckDst := false
if m, err := st.UpdateClients(ctx, c[src], c[dst], isSrcToDst, doExecuteRelay, doExecuteAck, sh, doRefresh); err != nil {
return nil, err
} else {
msgs = append(msgs, m...)
}

if m, err := st.UpdateClients(cmd.Context(), c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil {
return err
} else {
msgs.Merge(m)
if m, err := st.RelayPackets(ctx, c[src], c[dst], isSrcToDst, packets, sh, doExecuteRelay); err != nil {
return nil, err
} else {
msgs = append(msgs, m...)
}
return msgs, nil
}

if m, err := st.RelayPackets(cmd.Context(), c[src], c[dst], sp, sh, doExecuteRelaySrc, doExecuteRelayDst); err != nil {
return err
} else {
msgs.Merge(m)
msgs := core.NewRelayMsgs()
{
var eg = new(errgroup.Group)

doExecuteRelaySrc := len(sp.Dst) > 0
doExecuteRelayDst := len(sp.Src) > 0
doExecuteAckSrc := false
doExecuteAckDst := false
doRefresh := viper.GetBool(flagDoRefresh)

eg.Go(func() error {
m, err := relay(cmd.Context(), true, sp.Src, sh, doExecuteRelayDst, doExecuteAckDst, doRefresh)
if err != nil {
return err
}
msgs.Dst = m
return nil
})
eg.Go(func() error {
m, err := relay(cmd.Context(), false, sp.Dst, sh, doExecuteRelaySrc, doExecuteAckSrc, doRefresh)
if err != nil {
return err
}
msgs.Src = m
return nil
})

if err := eg.Wait(); err != nil {
return err
}
}

st.Send(cmd.Context(), c[src], c[dst], msgs)
Expand Down Expand Up @@ -532,23 +565,53 @@ func relayAcksCmd(ctx *config.Context) *cobra.Command {
return err
}

msgs := core.NewRelayMsgs()
relay := func(ctx context.Context, isSrcToDst bool, acks core.PacketInfoList, sh core.SyncHeaders, doExecuteRelay, doExecuteAck, doRefresh bool) ([]sdk.Msg, error) {
msgs := make([]sdk.Msg, 0, len(acks)+1)

doExecuteRelaySrc := false
doExecuteRelayDst := false
doExecuteAckSrc := len(sp.Dst) > 0
doExecuteAckDst := len(sp.Src) > 0
if m, err := st.UpdateClients(ctx, c[src], c[dst], isSrcToDst, doExecuteRelay, doExecuteAck, sh, doRefresh); err != nil {
return nil, err
} else {
msgs = append(msgs, m...)
}

if m, err := st.UpdateClients(cmd.Context(), c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil {
return err
} else {
msgs.Merge(m)
if m, err := st.RelayAcknowledgements(ctx, c[src], c[dst], isSrcToDst, acks, sh, doExecuteAck); err != nil {
return nil, err
} else {
msgs = append(msgs, m...)
}
return msgs, nil
}

if m, err := st.RelayAcknowledgements(cmd.Context(), c[src], c[dst], sp, sh, doExecuteAckSrc, doExecuteAckDst); err != nil {
return err
} else {
msgs.Merge(m)
msgs := core.NewRelayMsgs()
{
var eg = new(errgroup.Group)

doExecuteRelaySrc := false
doExecuteRelayDst := false
doExecuteAckSrc := len(sp.Dst) > 0
doExecuteAckDst := len(sp.Src) > 0
doRefresh := viper.GetBool(flagDoRefresh)

eg.Go(func() error {
m, err := relay(cmd.Context(), true, sp.Src, sh, doExecuteRelayDst, doExecuteAckDst, doRefresh)
if err != nil {
return err
}
msgs.Dst = m
return nil
})
eg.Go(func() error {
m, err := relay(cmd.Context(), false, sp.Dst, sh, doExecuteRelaySrc, doExecuteAckSrc, doRefresh)
if err != nil {
return err
}
msgs.Src = m
return nil
})

if err := eg.Wait(); err != nil {
return err
}
}

st.Send(cmd.Context(), c[src], c[dst], msgs)
Expand Down
168 changes: 104 additions & 64 deletions core/channel-upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types"
"github.com/hyperledger-labs/yui-relayer/log"
"go.opentelemetry.io/otel/codes"
"golang.org/x/sync/errgroup"
)

type UpgradeState int
Expand Down Expand Up @@ -320,26 +321,11 @@ func upgradeChannelStep(ctx context.Context, src, dst *ProvableChain, targetSrcS
logger.ErrorContext(ctx, "failed to create SyncHeaders", err)
return nil, err
}

// Query a number of things all at once
var srcUpdateHeaders, dstUpdateHeaders []Header
if err := retry.Do(func() error {
srcUpdateHeaders, dstUpdateHeaders, err = sh.SetupBothHeadersForUpdate(ctx, src, dst)
return err
}, rtyAtt, rtyDel, rtyErr, retry.Context(ctx), retry.OnRetry(func(uint, error) {
if err := sh.Updates(ctx, src, dst); err != nil {
panic(err)
}
})); err != nil {
logger.ErrorContext(ctx, "failed to set up headers for LC update on both chains", err)
return nil, err
}

srcCtx := sh.GetQueryContext(ctx, src.ChainID())
dstCtx := sh.GetQueryContext(ctx, dst.ChainID())

// query finalized channels with proofs
srcChan, dstChan, settled, err := querySettledChannelPair(srcCtx, dstCtx, src, dst, true)
srcChan, dstChan, settled, err := querySettledChannelPair(srcCtx, dstCtx, src, dst, false)
if err != nil {
logger.ErrorContext(ctx, "failed to query the channel pair with proofs", err)
return nil, err
Expand All @@ -353,7 +339,7 @@ func upgradeChannelStep(ctx context.Context, src, dst *ProvableChain, targetSrcS
dstCtx,
src,
dst,
true,
false,
)
if err != nil {
logger.ErrorContext(ctx, "failed to query the channel upgrade pair with proofs", err)
Expand Down Expand Up @@ -563,54 +549,84 @@ func upgradeChannelStep(ctx context.Context, src, dst *ProvableChain, targetSrcS
),
)}

if srcAction != UPGRADE_ACTION_NONE {
addr := mustGetAddress(src)

if len(dstUpdateHeaders) > 0 {
out.Src = append(out.Src, src.Path().UpdateClients(dstUpdateHeaders, addr)...)
}

msg, err := buildActionMsg(
src,
srcAction,
srcChan,
addr,
dstCtx,
dst,
dstChan,
dstChanUpg,
)
if err != nil {
logger.ErrorContext(ctx, "failed to build Msg for the src chain", err)
return nil, err
}

out.Src = append(out.Src, msg)
}

if dstAction != UPGRADE_ACTION_NONE {
addr := mustGetAddress(dst)

if len(srcUpdateHeaders) > 0 {
out.Dst = append(out.Dst, dst.Path().UpdateClients(srcUpdateHeaders, addr)...)
}

msg, err := buildActionMsg(
dst,
dstAction,
dstChan,
addr,
srcCtx,
src,
srcChan,
srcChanUpg,
)
if err != nil {
logger.ErrorContext(ctx, "failed to build Msg for the dst chain", err)
return nil, err
if err := retry.Do(func() error {
var eg = new(errgroup.Group)

if srcAction != UPGRADE_ACTION_NONE {
eg.Go(func() error {

addr := mustGetAddress(src)

dstUpdateHeaders, err := sh.SetupHeadersForUpdate(ctx, dst, src)
if err != nil {
logger.ErrorContext(ctx, "failed to set up headers for LC update on dst chain", err)
return err
}

if len(dstUpdateHeaders) > 0 {
out.Src = append(out.Src, src.Path().UpdateClients(dstUpdateHeaders, addr)...)
}

msg, err := buildActionMsg(
src,
srcAction,
srcChan,
addr,
dstCtx,
dst,
dstChan,
dstChanUpg,
)
if err != nil {
logger.ErrorContext(ctx, "failed to build Msg for the src chain", err)
return err
}

out.Src = append(out.Src, msg)
return nil
})
}

if dstAction != UPGRADE_ACTION_NONE {
eg.Go(func() error {
addr := mustGetAddress(dst)

srcUpdateHeaders, err := sh.SetupHeadersForUpdate(ctx, src, dst)
if err != nil {
logger.ErrorContext(ctx, "failed to set up headers for LC update on src chain", err)
return err
}

if len(srcUpdateHeaders) > 0 {
out.Dst = append(out.Dst, dst.Path().UpdateClients(srcUpdateHeaders, addr)...)
}

msg, err := buildActionMsg(
dst,
dstAction,
dstChan,
addr,
srcCtx,
src,
srcChan,
srcChanUpg,
)
if err != nil {
logger.ErrorContext(ctx, "failed to build Msg for the dst chain", err)
return err
}

out.Dst = append(out.Dst, msg)
return nil
})
}
if err := eg.Wait(); err != nil {
return err
}

out.Dst = append(out.Dst, msg)
return nil
}, rtyAtt, rtyDel, rtyErr, retry.Context(ctx), retry.OnRetry(func(n uint, err error) {
})); err != nil {
return nil, err
}

logger.InfoContext(ctx, "successfully generates the next step of the channel upgrade")
Expand Down Expand Up @@ -750,16 +766,37 @@ func buildActionMsg(

switch action {
case UPGRADE_ACTION_TRY:
if err := ProveChannel(cpCtx, cp, cpChan); err != nil {
return nil, err
}
if err := ProveChannelUpgrade(cpCtx, cp, cpUpg); err != nil {
return nil, err
}
proposedConnectionID, err := queryProposedConnectionID(cpCtx, cp, cpUpg)
if err != nil {
return nil, err
}
return pathEnd.ChanUpgradeTry(proposedConnectionID, cpChan, cpUpg, addr), nil
case UPGRADE_ACTION_ACK:
if err := ProveChannel(cpCtx, cp, cpChan); err != nil {
return nil, err
}
if err := ProveChannelUpgrade(cpCtx, cp, cpUpg); err != nil {
return nil, err
}
return pathEnd.ChanUpgradeAck(cpChan, cpUpg, addr), nil
case UPGRADE_ACTION_CONFIRM:
if err := ProveChannel(cpCtx, cp, cpChan); err != nil {
return nil, err
}
if err := ProveChannelUpgrade(cpCtx, cp, cpUpg); err != nil {
return nil, err
}
return pathEnd.ChanUpgradeConfirm(cpChan, cpUpg, addr), nil
case UPGRADE_ACTION_OPEN:
if err := ProveChannel(cpCtx, cp, cpChan); err != nil {
return nil, err
}
return pathEnd.ChanUpgradeOpen(cpChan, addr), nil
case UPGRADE_ACTION_CANCEL:
upgErr, err := QueryChannelUpgradeError(cpCtx, cp, true)
Expand All @@ -785,6 +822,9 @@ func buildActionMsg(
}
return pathEnd.ChanUpgradeCancel(upgErr, addr), nil
case UPGRADE_ACTION_TIMEOUT:
if err := ProveChannel(cpCtx, cp, cpChan); err != nil {
return nil, err
}
return pathEnd.ChanUpgradeTimeout(cpChan, addr), nil
default:
panic(fmt.Errorf("unexpected action: %s", action))
Expand Down
Loading
Loading