Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 62 additions & 63 deletions test/cases/nvidia/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package nvidia
import (
"context"
_ "embed"
"flag"
"fmt"
"log"
"os"
Expand All @@ -14,8 +13,8 @@ import (
"testing"

fwext "github.com/aws/aws-k8s-tester/internal/e2e"
"github.com/aws/aws-k8s-tester/test/common"
"github.com/aws/aws-k8s-tester/test/manifests"
"github.com/aws/aws-sdk-go-v2/aws"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -26,17 +25,22 @@ import (
"sigs.k8s.io/e2e-framework/pkg/envconf"
)

type Config struct {
common.MetricOps
NodeType string `flag:"nodeType" desc:"node type for the tests"`
InstallDevicePlugin bool `flag:"installDevicePlugin" desc:"install nvidia device plugin"`
EfaEnabled bool `flag:"efaEnabled" desc:"enable efa tests"`
NvidiaTestImage string `flag:"nvidiaTestImage" desc:"nccl test image for nccl tests"`
PytorchImage string `flag:"pytorchImage" desc:"pytorch cuda image for single node tests"`
SkipUnitTestSubcommand string `flag:"skipUnitTestSubcommand" desc:"optional command to skip specified unit test"`
}

var (
testenv env.Environment
nodeType *string
installDevicePlugin *bool
efaEnabled *bool
nvidiaTestImage *string
pytorchImage *string
skipUnitTestSubcommand *string
nodeCount int
gpuPerNode int
efaPerNode int
testenv env.Environment
testConfig Config
nodeCount int
gpuPerNode int
efaPerNode int
)

func deployMPIOperator(ctx context.Context, config *envconf.Config) (context.Context, error) {
Expand All @@ -51,31 +55,6 @@ func deployMPIOperator(ctx context.Context, config *envconf.Config) (context.Con
return ctx, nil
}

func deployNvidiaDevicePlugin(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "nvidia-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy nvidia-device-plugin: %v", err)
}
return ctx, nil
}

func deployEFAPlugin(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "aws-efa-k8s-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy efa-device-plugin: %v", err)
}

return ctx, nil
}

func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Context, error) {
clientset, err := kubernetes.NewForConfig(config.Client().RESTConfig())
if err != nil {
Expand All @@ -93,9 +72,9 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex
}
}

if *nodeType != "" {
if testConfig.NodeType != "" {
for _, v := range nodes.Items {
if v.Labels["node.kubernetes.io/instance-type"] == *nodeType {
if v.Labels["node.kubernetes.io/instance-type"] == testConfig.NodeType {
nodeCount++
gpu := v.Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
Expand All @@ -105,7 +84,7 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex
}
} else {
log.Printf("No node type specified. Using the node type %s in the node groups.", nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
nodeType = aws.String(nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
testConfig.NodeType = nodes.Items[0].Labels["node.kubernetes.io/instance-type"]
nodeCount = len(nodes.Items)
gpu := nodes.Items[0].Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
Expand All @@ -117,28 +96,31 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex
}

func TestMain(m *testing.M) {
nodeType = flag.String("nodeType", "", "node type for the tests")
nvidiaTestImage = flag.String("nvidiaTestImage", "", "nccl test image for nccl tests")
pytorchImage = flag.String("pytorchImage", "763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.1.0-gpu-py310-cu121-ubuntu20.04-ec2", "pytorch cuda image for single node tests")
efaEnabled = flag.Bool("efaEnabled", false, "enable efa tests")
installDevicePlugin = flag.Bool("installDevicePlugin", true, "install nvidia device plugin")
skipUnitTestSubcommand = flag.String("skipUnitTestSubcommand", "", "optional command to skip specified unit test, `-s test1|test2|...`")
testConfig = Config{
InstallDevicePlugin: true,
PytorchImage: "763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.1.0-gpu-py310-cu121-ubuntu20.04-ec2",
}

_, err := common.ParseFlags(&testConfig)
if err != nil {
log.Fatalf("failed to parse flags: %v", err)
}
cfg, err := envconf.NewFromFlags()
if err != nil {
log.Fatalf("failed to initialize test environment: %v", err)
}
testenv = env.NewWithConfig(cfg)

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
testenv = testenv.WithContext(ctx)
testenv = env.NewWithConfig(cfg).WithContext(ctx)

// all NVIDIA tests require the device plugin and MPI operator
deploymentManifests := [][]byte{
manifestsList := [][]byte{
manifests.MpiOperatorManifest,
}

setUpFunctions := []env.Func{
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
err := fwext.ApplyManifests(config.Client().RESTConfig(), deploymentManifests...)
err := fwext.ApplyManifests(config.Client().RESTConfig(), manifestsList...)
if err != nil {
return ctx, err
}
Expand All @@ -147,27 +129,44 @@ func TestMain(m *testing.M) {
deployMPIOperator,
}

if *installDevicePlugin {
deploymentManifests = append(deploymentManifests, manifests.NvidiaDevicePluginManifest)
setUpFunctions = append(setUpFunctions, deployNvidiaDevicePlugin)
if testConfig.InstallDevicePlugin {
manifestsList = append(manifestsList, manifests.NvidiaDevicePluginManifest)
setUpFunctions = append(setUpFunctions, func(ctx context.Context, config *envconf.Config) (context.Context, error) {
return common.DeployDaemonSet("nvidia-device-plugin-daemonset", "kube-system")(ctx, config)
})
}

if *efaEnabled {
deploymentManifests = append(deploymentManifests, manifests.EfaDevicePluginManifest)
setUpFunctions = append(setUpFunctions, deployEFAPlugin)
if testConfig.EfaEnabled {
manifestsList = append(manifestsList, manifests.EfaDevicePluginManifest)
setUpFunctions = append(setUpFunctions, func(ctx context.Context, config *envconf.Config) (context.Context, error) {
return common.DeployDaemonSet("aws-efa-k8s-device-plugin-daemonset", "kube-system")(ctx, config)
})
}

if len(testConfig.MetricDimensions) > 0 {
renderedCloudWatchAgentManifest, err := manifests.RenderCloudWatchAgentManifest(testConfig.MetricDimensions)
if err != nil {
log.Printf("Warning: failed to render CloudWatch Agent manifest: %v", err)
}
manifestsList = append(manifestsList, manifests.DCGMExporterManifest, renderedCloudWatchAgentManifest)
setUpFunctions = append(setUpFunctions, func(ctx context.Context, config *envconf.Config) (context.Context, error) {
if ctx, err := common.DeployDaemonSet("dcgm-exporter", "kube-system")(ctx, config); err != nil {
return ctx, err
}
if ctx, err := common.DeployDaemonSet("cwagent", "amazon-cloudwatch")(ctx, config); err != nil {
return ctx, err
}
return ctx, nil
})
}

setUpFunctions = append(setUpFunctions, checkNodeTypes)
testenv.Setup(setUpFunctions...)

testenv.Finish(
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
err := fwext.DeleteManifests(cfg.Client().RESTConfig(), manifests.EfaDevicePluginManifest)
if err != nil {
return ctx, err
}
slices.Reverse(deploymentManifests)
err = fwext.DeleteManifests(config.Client().RESTConfig(), deploymentManifests...)
slices.Reverse(manifestsList)
err := fwext.DeleteManifests(config.Client().RESTConfig(), manifestsList...)
if err != nil {
return ctx, err
}
Expand Down
12 changes: 6 additions & 6 deletions test/cases/nvidia/mpi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ func multiNode(testName string) features.Feature {
WithLabel("hardware", "gpu").
WithLabel("hardware", "efa").
Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
if *nvidiaTestImage == "" {
if testConfig.NvidiaTestImage == "" {
t.Fatal(fmt.Errorf("nvidiaTestImage must be set to run unit test, use https://github.com/aws/aws-k8s-tester/blob/main/test/images/nvidia/Dockerfile to build the image and -nvidiaTestImage to set the image url"))
}
maxBytes := "2G"
ncclBuffSize := "4194304"
if slices.Contains(instanceSupportsRdmaRead, *nodeType) {
if slices.Contains(instanceSupportsRdmaRead, testConfig.NodeType) {
t.Log("Instance supports RDMA")
maxBytes = "16G"
ncclBuffSize = "8388608"
Expand All @@ -79,7 +79,7 @@ func multiNode(testName string) features.Feature {
WorkerNodeCount: nodeCount,
WorkerNodeGpuCount: nodeCount * gpuPerNode,
GpuPerNode: gpuPerNode,
NvidiaTestImage: *nvidiaTestImage,
NvidiaTestImage: testConfig.NvidiaTestImage,
EfaInterfacePerNode: efaPerNode,
MaxBytes: maxBytes,
NcclBuffSize: ncclBuffSize,
Expand Down Expand Up @@ -118,10 +118,10 @@ func multiNode(testName string) features.Feature {
if !t.Failed() {
t.Log("Multi node job completed")
// Verify GPU Direct RDMA is used on P4/P5
if *efaEnabled && slices.Contains(instanceSupportsRdmaRead, *nodeType) {
if testConfig.EfaEnabled && slices.Contains(instanceSupportsRdmaRead, testConfig.NodeType) {
pattern := regexp.MustCompile(`\[send\] via NET/.*Libfabric/.*/GDRDMA`)
if !pattern.MatchString(log) {
t.Errorf("GPU Direct RDMA is not utilized for inter-node communication in NCCL tests on instances that support GDRDMA: %s", *nodeType)
t.Errorf("GPU Direct RDMA is not utilized for inter-node communication in NCCL tests on instances that support GDRDMA: %s", testConfig.NodeType)
}
}
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func singleNode() features.Feature {
renderedSingleNodeManifest, err = fwext.RenderManifests(mpiJobPytorchTrainingSingleNodeManifest, struct {
PytorchTestImage string
}{
PytorchTestImage: *pytorchImage,
PytorchTestImage: testConfig.PytorchImage,
})
if err != nil {
t.Fatal(err)
Expand Down
8 changes: 4 additions & 4 deletions test/cases/nvidia/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func TestSingleNodeUnitTest(t *testing.T) {
WithLabel("suite", "nvidia").
WithLabel("hardware", "gpu").
Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
if *nvidiaTestImage == "" {
if testConfig.NvidiaTestImage == "" {
t.Fatal(fmt.Errorf("nvidiaTestImage must be set to run unit test, use https://github.com/aws/aws-k8s-tester/blob/main/test/images/nvidia/Dockerfile to build the image and -nvidiaTestImage to set the image url"))
}
var err error
renderedJobUnitTestSingleNodeManifest, err = fwext.RenderManifests(jobUnitTestSingleNodeManifest, unitTestManifestTplVars{
NvidiaTestImage: *nvidiaTestImage,
SkipTestSubcommand: *skipUnitTestSubcommand,
NvidiaTestImage: testConfig.NvidiaTestImage,
SkipTestSubcommand: testConfig.SkipUnitTestSubcommand,
GpuPerNode: gpuPerNode,
NodeType: *nodeType,
NodeType: testConfig.NodeType,
})
if err != nil {
t.Fatal(err)
Expand Down