Skip to content

Commit 7d21c39

Browse files
committed
Trigger connection established and closed events at switch layer
Send connection_established and connection_closed directly from the switch when MAC is learned or connection with the VM closes. Assisted-by: Claude (Anthropic AI) Signed-off-by: Gunjan Vyas <[email protected]>
1 parent 6e003ff commit 7d21c39

File tree

5 files changed

+42
-7
lines changed

5 files changed

+42
-7
lines changed

cmd/gvproxy/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"slices"
1313
"strings"
1414

15+
"github.com/containers/gvisor-tap-vsock/pkg/notification"
1516
"github.com/containers/gvisor-tap-vsock/pkg/types"
1617
log "github.com/sirupsen/logrus"
1718
yaml "gopkg.in/yaml.v3"
@@ -65,6 +66,7 @@ type GvproxyConfig struct {
6566
Services string `yaml:"services,omitempty"`
6667
Ec2MetadataAccess bool `yaml:"ec2-metadata-access,omitempty"`
6768
NotificationSocket string `yaml:"notification,omitempty"`
69+
NotificationSender *notification.NotificationSender
6870
}
6971

7072
type GvproxyConfigForward struct {
@@ -241,6 +243,7 @@ func GvproxyConfigure(config *GvproxyConfig, args *GvproxyArgs, version string)
241243
config.PIDFile = args.pidFile
242244
}
243245
if args.notificationSocket != "" {
246+
log.Debugf("notification socket: %s", args.notificationSocket)
244247
uri, err := url.Parse(args.notificationSocket)
245248
if err != nil {
246249
return config, fmt.Errorf("invalid value for notification listen address: %w", err)

cmd/gvproxy/main.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,15 @@ func run(ctx context.Context, g *errgroup.Group, config *GvproxyConfig) error {
128128
}
129129
log.Info("waiting for clients...")
130130

131+
// Start the notification sender in a goroutine
131132
notificationSender := notification.NewNotificationSender(config.NotificationSocket)
132133
if config.NotificationSocket != "" {
133134
g.Go(func() error {
134135
notificationSender.Start(ctx)
135136
return nil
136137
})
137138
}
139+
vn.SetNotificationSender(notificationSender)
138140
for _, endpoint := range config.Listen {
139141
log.Infof("listening %s", endpoint)
140142
ln, err := transport.Listen(endpoint)
@@ -182,6 +184,7 @@ func run(ctx context.Context, g *errgroup.Group, config *GvproxyConfig) error {
182184
if config.Interfaces.VPNKit != "" {
183185
vpnkitListener, err := transport.Listen(config.Interfaces.VPNKit)
184186
if err != nil {
187+
notificationSender.Send(types.NotificationMessage{NotificationType: types.HypervisorError})
185188
return fmt.Errorf("vpnkit listen error: %w", err)
186189
}
187190
g.Go(func() error {
@@ -195,10 +198,10 @@ func run(ctx context.Context, g *errgroup.Group, config *GvproxyConfig) error {
195198
}
196199
conn, err := vpnkitListener.Accept()
197200
if err != nil {
201+
notificationSender.Send(types.NotificationMessage{NotificationType: types.HypervisorError})
198202
log.Errorf("vpnkit accept error: %s", err)
199203
continue
200204
}
201-
notificationSender.Send(types.NotificationMessage{NotificationType: types.ConnectionEstablished})
202205
g.Go(func() error {
203206
return vn.AcceptVpnKit(conn)
204207
})
@@ -210,6 +213,7 @@ func run(ctx context.Context, g *errgroup.Group, config *GvproxyConfig) error {
210213
if config.Interfaces.Qemu != "" {
211214
qemuListener, err := transport.Listen(config.Interfaces.Qemu)
212215
if err != nil {
216+
notificationSender.Send(types.NotificationMessage{NotificationType: types.HypervisorError})
213217
return fmt.Errorf("qemu listen error: %w", err)
214218
}
215219

@@ -227,14 +231,14 @@ func run(ctx context.Context, g *errgroup.Group, config *GvproxyConfig) error {
227231
notificationSender.Send(types.NotificationMessage{NotificationType: types.HypervisorError})
228232
return fmt.Errorf("qemu accept error: %w", err)
229233
}
230-
notificationSender.Send(types.NotificationMessage{NotificationType: types.ConnectionEstablished})
231234
return vn.AcceptQemu(ctx, conn)
232235
})
233236
}
234237

235238
if config.Interfaces.Bess != "" {
236239
bessListener, err := transport.Listen(config.Interfaces.Bess)
237240
if err != nil {
241+
notificationSender.Send(types.NotificationMessage{NotificationType: types.HypervisorError})
238242
return fmt.Errorf("bess listen error: %w", err)
239243
}
240244

@@ -252,7 +256,6 @@ func run(ctx context.Context, g *errgroup.Group, config *GvproxyConfig) error {
252256
notificationSender.Send(types.NotificationMessage{NotificationType: types.HypervisorError})
253257
return fmt.Errorf("bess accept error: %w", err)
254258
}
255-
notificationSender.Send(types.NotificationMessage{NotificationType: types.ConnectionEstablished})
256259
return vn.AcceptBess(ctx, conn)
257260
})
258261
}
@@ -276,20 +279,21 @@ func run(ctx context.Context, g *errgroup.Group, config *GvproxyConfig) error {
276279
g.Go(func() error {
277280
vfkitConn, err := transport.AcceptVfkit(conn)
278281
if err != nil {
279-
notificationSender.Send(types.NotificationMessage{NotificationType: types.HypervisorError})
280282
return fmt.Errorf("vfkit accept error: %w", err)
281283
}
282284

283-
notificationSender.Send(types.NotificationMessage{NotificationType: types.ConnectionEstablished})
284285
return vn.AcceptVfkit(ctx, vfkitConn)
285286
})
286287
}
287288

288289
if config.Interfaces.Stdio != "" {
289290
g.Go(func() error {
290291
conn := stdio.GetStdioConn()
291-
notificationSender.Send(types.NotificationMessage{NotificationType: types.ConnectionEstablished})
292-
return vn.AcceptStdio(ctx, conn)
292+
err := vn.AcceptStdio(ctx, conn)
293+
if err != nil {
294+
notificationSender.Send(types.NotificationMessage{NotificationType: types.HypervisorError})
295+
}
296+
return err
293297
})
294298
}
295299

pkg/tap/switch.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sync/atomic"
1212
"syscall"
1313

14+
"github.com/containers/gvisor-tap-vsock/pkg/notification"
1415
"github.com/containers/gvisor-tap-vsock/pkg/types"
1516
"github.com/google/gopacket"
1617
"github.com/google/gopacket/layers"
@@ -48,6 +49,8 @@ type Switch struct {
4849
writeLock sync.Mutex
4950

5051
gateway VirtualDevice
52+
53+
notificationSender *notification.NotificationSender
5154
}
5255

5356
func NewSwitch(debug bool, mtu int) *Switch {
@@ -197,6 +200,12 @@ func (e *Switch) disconnect(id int, conn net.Conn) {
197200

198201
for address, targetConn := range e.cam {
199202
if targetConn == id {
203+
if e.notificationSender != nil {
204+
e.notificationSender.Send(types.NotificationMessage{
205+
NotificationType: types.ConnectionClosed,
206+
MacAddress: address.String(),
207+
})
208+
}
200209
delete(e.cam, address)
201210
}
202211
}
@@ -267,9 +276,17 @@ func (e *Switch) rxBuf(_ context.Context, id int, buf []byte) {
267276
eth := header.Ethernet(buf)
268277

269278
e.camLock.Lock()
279+
_, exists := e.cam[eth.SourceAddress()]
270280
e.cam[eth.SourceAddress()] = id
271281
e.camLock.Unlock()
272282

283+
if !exists && e.notificationSender != nil {
284+
e.notificationSender.Send(types.NotificationMessage{
285+
NotificationType: types.ConnectionEstablished,
286+
MacAddress: eth.SourceAddress().String(),
287+
})
288+
}
289+
273290
if eth.DestinationAddress() != e.gateway.LinkAddress() {
274291
pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
275292
Payload: buffer.MakeWithData(buf),
@@ -304,3 +321,7 @@ func protocolImplementation(protocol types.Protocol) protocol {
304321
return &hyperkitProtocol{}
305322
}
306323
}
324+
325+
func (e *Switch) SetNotificationSender(notificationSender *notification.NotificationSender) {
326+
e.notificationSender = notificationSender
327+
}

pkg/types/handshake.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type UnexposeRequest struct {
2222

2323
type NotificationMessage struct {
2424
NotificationType NotificationType `json:"notification_type"`
25+
MacAddress string `json:"mac_address,omitempty"`
2526
}
2627

2728
type NotificationType string
@@ -31,4 +32,5 @@ const (
3132
ConnectionEstablished NotificationType = "connection_established"
3233
HypervisorWarning NotificationType = "hypervisor_warning"
3334
HypervisorError NotificationType = "hypervisor_error"
35+
ConnectionClosed NotificationType = "connection_closed"
3436
)

pkg/virtualnetwork/virtualnetwork.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http"
99
"os"
1010

11+
"github.com/containers/gvisor-tap-vsock/pkg/notification"
1112
"github.com/containers/gvisor-tap-vsock/pkg/tap"
1213
"github.com/containers/gvisor-tap-vsock/pkg/types"
1314
"gvisor.dev/gvisor/pkg/tcpip"
@@ -28,6 +29,10 @@ type VirtualNetwork struct {
2829
ipPool *tap.IPPool
2930
}
3031

32+
func (n *VirtualNetwork) SetNotificationSender(notificationSender *notification.NotificationSender) {
33+
n.networkSwitch.SetNotificationSender(notificationSender)
34+
}
35+
3136
func New(configuration *types.Configuration) (*VirtualNetwork, error) {
3237
_, subnet, err := net.ParseCIDR(configuration.Subnet)
3338
if err != nil {

0 commit comments

Comments
 (0)