diff --git a/network/benchmarks/netperf/launch.go b/network/benchmarks/netperf/launch.go index a70478bd3b..b2b8c903a8 100644 --- a/network/benchmarks/netperf/launch.go +++ b/network/benchmarks/netperf/launch.go @@ -27,7 +27,6 @@ limitations under the License. package main import ( - "context" "flag" "fmt" "os" @@ -78,365 +77,7 @@ func init() { "(boolean) Run the cleanup resources phase only (use this flag to clean up orphaned resources from a test run)") flag.IntVar(&testFrom, "testFrom", 0, "start from test number testFrom") flag.IntVar(&testTo, "testTo", 5, "end at test number testTo") -<<<<<<< HEAD flag.BoolVar(&jsonOutput, "json", false, "Output JSON data along with CSV data") -======= -} - -func setupClient() *kubernetes.Clientset { - config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) - if err != nil { - panic(err) - } - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - panic(err) - } - - return clientset -} - -// getMinions : Only return schedulable/worker nodes -func getMinionNodes(ctx context.Context, c *kubernetes.Clientset) *api.NodeList { - nodes, err := c.CoreV1().Nodes().List(ctx, - metav1.ListOptions{ - FieldSelector: "spec.unschedulable=false", - }) - if err != nil { - fmt.Println("Failed to fetch nodes", err) - return nil - } - return nodes -} - -func cleanup(ctx context.Context, c *kubernetes.Clientset) { - // Cleanup existing rcs, pods and services in our namespace - rcs, err := c.CoreV1().ReplicationControllers(testNamespace).List(ctx, everythingSelector) - if err != nil { - fmt.Println("Failed to get replication controllers", err) - return - } - for _, rc := range rcs.Items { - fmt.Println("Deleting rc", rc.GetName()) - if err := c.CoreV1().ReplicationControllers(testNamespace).Delete( - ctx, rc.GetName(), metav1.DeleteOptions{}); err != nil { - fmt.Println("Failed to delete rc", rc.GetName(), err) - } - } - pods, err := c.CoreV1().Pods(testNamespace).List(ctx, everythingSelector) - if err != nil { - fmt.Println("Failed to get pods", err) - return - } - for _, pod := range pods.Items { - fmt.Println("Deleting pod", pod.GetName()) - if err := c.CoreV1().Pods(testNamespace).Delete(ctx, pod.GetName(), metav1.DeleteOptions{GracePeriodSeconds: new(int64)}); err != nil { - fmt.Println("Failed to delete pod", pod.GetName(), err) - } - } - svcs, err := c.CoreV1().Services(testNamespace).List(ctx, everythingSelector) - if err != nil { - fmt.Println("Failed to get services", err) - return - } - for _, svc := range svcs.Items { - fmt.Println("Deleting svc", svc.GetName()) - err := c.CoreV1().Services(testNamespace).Delete( - ctx, svc.GetName(), metav1.DeleteOptions{}) - if err != nil { - fmt.Println("Failed to get service", err) - } - } -} - -// createServices: Long-winded function to programmatically create our two services -func createServices(ctx context.Context, c *kubernetes.Clientset) bool { - // Create our namespace if not present - if _, err := c.CoreV1().Namespaces().Get(ctx, testNamespace, metav1.GetOptions{}); err != nil { - _, err := c.CoreV1().Namespaces().Create(ctx, &api.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}, metav1.CreateOptions{}) - if err != nil { - fmt.Println("Failed to create service", err) - } - } - - // Create the orchestrator service that points to the coordinator pod - orchLabels := map[string]string{"app": "netperf-orch"} - orchService := &api.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "netperf-orch", - }, - Spec: api.ServiceSpec{ - Selector: orchLabels, - Ports: []api.ServicePort{{ - Name: "netperf-orch", - Protocol: api.ProtocolTCP, - Port: orchestratorPort, - TargetPort: intstr.FromInt(orchestratorPort), - }}, - Type: api.ServiceTypeClusterIP, - }, - } - if _, err := c.CoreV1().Services(testNamespace).Create(ctx, orchService, metav1.CreateOptions{}); err != nil { - fmt.Println("Failed to create orchestrator service", err) - return false - } - fmt.Println("Created orchestrator service") - - // Create the netperf-w2 service that points a clusterIP at the worker 2 pod - netperfW2Labels := map[string]string{"app": "netperf-w2"} - netperfW2Service := &api.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "netperf-w2", - }, - Spec: api.ServiceSpec{ - Selector: netperfW2Labels, - Ports: []api.ServicePort{ - { - Name: "netperf-w2", - Protocol: api.ProtocolTCP, - Port: iperf3Port, - TargetPort: intstr.FromInt(iperf3Port), - }, - { - Name: "netperf-w2-qperf19766", - Protocol: api.ProtocolTCP, - Port: qperf19766, - TargetPort: intstr.FromInt(qperf19766), - }, - { - Name: "netperf-w2-qperf19765", - Protocol: api.ProtocolTCP, - Port: qperf19765, - TargetPort: intstr.FromInt(qperf19765), - }, - { - Name: "netperf-w2-sctp", - Protocol: api.ProtocolSCTP, - Port: iperf3Port, - TargetPort: intstr.FromInt(iperf3Port), - }, - { - Name: "netperf-w2-udp", - Protocol: api.ProtocolUDP, - Port: iperf3Port, - TargetPort: intstr.FromInt(iperf3Port), - }, - { - Name: "netperf-w2-netperf", - Protocol: api.ProtocolTCP, - Port: netperfPort, - TargetPort: intstr.FromInt(netperfPort), - }, - }, - Type: api.ServiceTypeClusterIP, - }, - } - if _, err := c.CoreV1().Services(testNamespace).Create(ctx, netperfW2Service, metav1.CreateOptions{}); err != nil { - fmt.Println("Failed to create netperf-w2 service", err) - return false - } - fmt.Println("Created netperf-w2 service") - return true -} - -// createRCs - Create replication controllers for all workers and the orchestrator -func createRCs(ctx context.Context, c *kubernetes.Clientset) bool { - // Create the orchestrator RC - name := "netperf-orch" - fmt.Println("Creating replication controller", name) - replicas := int32(1) - - _, err := c.CoreV1().ReplicationControllers(testNamespace).Create(ctx, &api.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Spec: api.ReplicationControllerSpec{ - Replicas: &replicas, - Selector: map[string]string{"app": name}, - Template: &api.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"app": name}, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: name, - Image: netperfImage, - Ports: []api.ContainerPort{{ContainerPort: orchestratorPort}}, - Args: []string{ - "--mode=orchestrator", - fmt.Sprintf("--testFrom=%d", testFrom), - fmt.Sprintf("--testTo=%d", testTo), - }, - ImagePullPolicy: "Always", - }, - }, - TerminationGracePeriodSeconds: new(int64), - }, - }, - }, - }, metav1.CreateOptions{}) - if err != nil { - fmt.Println("Error creating orchestrator replication controller", err) - return false - } - fmt.Println("Created orchestrator replication controller") - for i := 1; i <= 3; i++ { - // Bring up pods slowly - time.Sleep(3 * time.Second) - kubeNode := primaryNode.GetName() - if i == 3 { - kubeNode = secondaryNode.GetName() - } - name = fmt.Sprintf("netperf-w%d", i) - fmt.Println("Creating replication controller", name) - portSpec := []api.ContainerPort{} - if i > 1 { - // Worker W1 is a client-only pod - no ports are exposed - portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolTCP}) - portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolSCTP}) - } - - workerEnv := []api.EnvVar{ - {Name: "worker", Value: name}, - {Name: "kubeNode", Value: kubeNode}, - {Name: "podname", Value: name}, - } - - replicas := int32(1) - - _, err := c.CoreV1().ReplicationControllers(testNamespace).Create(ctx, &api.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Spec: api.ReplicationControllerSpec{ - Replicas: &replicas, - Selector: map[string]string{"app": name}, - Template: &api.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"app": name}, - }, - Spec: api.PodSpec{ - NodeName: kubeNode, - Containers: []api.Container{ - { - Name: name, - Image: netperfImage, - Ports: portSpec, - Args: []string{"--mode=worker"}, - Env: workerEnv, - ImagePullPolicy: "Always", - }, - }, - TerminationGracePeriodSeconds: new(int64), - }, - }, - }, - }, metav1.CreateOptions{}) - if err != nil { - fmt.Println("Error creating orchestrator replication controller", name, ":", err) - return false - } - } - - return true -} - -func getOrchestratorPodName(pods *api.PodList) string { - for _, pod := range pods.Items { - if strings.Contains(pod.GetName(), "netperf-orch-") { - return pod.GetName() - } - } - return "" -} - -// Retrieve the logs for the pod/container and check if csv data has been generated -func getCsvResultsFromPod(ctx context.Context, c *kubernetes.Clientset, podName string) *string { - body, err := c.CoreV1().Pods(testNamespace).GetLogs(podName, &api.PodLogOptions{Timestamps: false}).DoRaw(ctx) - if err != nil { - fmt.Printf("Error (%s) reading logs from pod %s", err, podName) - return nil - } - logData := string(body) - index := strings.Index(logData, csvDataMarker) - endIndex := strings.Index(logData, csvEndDataMarker) - if index == -1 || endIndex == -1 { - return nil - } - csvData := string(body[index+len(csvDataMarker)+1 : endIndex]) - return &csvData -} - -// processCsvData : Process the CSV datafile and generate line and bar graphs -func processCsvData(csvData *string) bool { - t := time.Now().UTC() - outputFileDirectory := fmt.Sprintf("results_%s-%s", testNamespace, tag) - outputFilePrefix := fmt.Sprintf("%s-%s_%s.", testNamespace, tag, t.Format("20060102150405")) - fmt.Printf("Test concluded - CSV raw data written to %s/%scsv\n", outputFileDirectory, outputFilePrefix) - if _, err := os.Stat(outputFileDirectory); os.IsNotExist(err) { - err := os.Mkdir(outputFileDirectory, 0766) - if err != nil { - fmt.Println("Error creating directory", err) - return false - } - - } - fd, err := os.OpenFile(fmt.Sprintf("%s/%scsv", outputFileDirectory, outputFilePrefix), os.O_RDWR|os.O_CREATE, 0666) - if err != nil { - fmt.Println("ERROR writing output CSV datafile", err) - return false - } - _, err = fd.WriteString(*csvData) - if err != nil { - fmt.Println("Error writing string", err) - return false - } - fd.Close() - return true -} - -func executeTests(ctx context.Context, c *kubernetes.Clientset) bool { - for i := 0; i < iterations; i++ { - cleanup(ctx, c) - if !createServices(ctx, c) { - fmt.Println("Failed to create services - aborting test") - return false - } - time.Sleep(3 * time.Second) - if !createRCs(ctx, c) { - fmt.Println("Failed to create replication controllers - aborting test") - return false - } - fmt.Println("Waiting for netperf pods to start up") - - var orchestratorPodName string - for len(orchestratorPodName) == 0 { - fmt.Println("Waiting for orchestrator pod creation") - time.Sleep(60 * time.Second) - var pods *api.PodList - var err error - if pods, err = c.CoreV1().Pods(testNamespace).List(ctx, everythingSelector); err != nil { - fmt.Println("Failed to fetch pods - waiting for pod creation", err) - continue - } - orchestratorPodName = getOrchestratorPodName(pods) - } - fmt.Println("Orchestrator Pod is", orchestratorPodName) - - // The pods orchestrate themselves, we just wait for the results file to show up in the orchestrator container - for { - // Monitor the orchestrator pod for the CSV results file - csvdata := getCsvResultsFromPod(ctx, c, orchestratorPodName) - if csvdata == nil { - fmt.Println("Scanned orchestrator pod filesystem - no results file found yet...waiting for orchestrator to write CSV file...") - time.Sleep(60 * time.Second) - continue - } - if processCsvData(csvdata) { - break - } - } - fmt.Printf("TEST RUN (Iteration %d) FINISHED - cleaning up services and pods\n", i) - } - return false ->>>>>>> origin/master } func main() { @@ -448,7 +89,6 @@ func main() { fmt.Println("Docker image : ", netperfImage) fmt.Println("------------------------------------------------------------") -<<<<<<< HEAD testParams := lib.TestParams{ Iterations: iterations, Tag: tag, @@ -470,30 +110,4 @@ func main() { fmt.Println("CSV Result File : ", result.CsvResultFile) fmt.Println("JSON Result File : ", result.JsonResultFile) } -======= - ctx := context.Background() - - var c *kubernetes.Clientset - if c = setupClient(); c == nil { - fmt.Println("Failed to setup REST client to Kubernetes cluster") - return - } - if cleanupOnly { - cleanup(ctx, c) - return - } - nodes := getMinionNodes(ctx, c) - if nodes == nil { - return - } - if len(nodes.Items) < 2 { - fmt.Println("Insufficient number of nodes for test (need minimum 2 nodes)") - return - } - primaryNode = nodes.Items[0] - secondaryNode = nodes.Items[1] - fmt.Printf("Selected primary,secondary nodes = (%s, %s)\n", primaryNode.GetName(), secondaryNode.GetName()) - executeTests(ctx, c) - cleanup(ctx, c) ->>>>>>> origin/master }