Skip to content

Commit 0042145

Browse files
embed file in cf map and create notebook
Signed-off-by: Kevin <[email protected]>
1 parent f1d4eeb commit 0042145

File tree

4 files changed

+266
-55
lines changed

4 files changed

+266
-55
lines changed

tests/kfto/kfto_mnist_sdk_test.go

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,37 +17,65 @@ limitations under the License.
1717
package kfto
1818

1919
import (
20+
"strings"
2021
"testing"
2122
"time"
2223

2324
. "github.com/onsi/gomega"
2425
. "github.com/project-codeflare/codeflare-common/support"
26+
27+
v1 "k8s.io/api/core/v1"
2528
)
2629

2730
func TestMnistSDK(t *testing.T) {
2831
test := With(t)
2932

3033
// Create a namespace
3134
namespace := test.NewTestNamespace()
32-
33-
jupyterNotebookConfigMapFileName := "mnist_kfto.ipynb"
34-
mnist := readMnistScriptTemplate(test, "resources/mnist.py")
35-
36-
jupyterNotebook := ReadFile(test, "resources/mnist_kfto.ipynb")
37-
config := CreateConfigMap(test, namespace.Name, map[string][]byte{
38-
jupyterNotebookConfigMapFileName: jupyterNotebook,
39-
"mnist.py": mnist,
40-
})
41-
42-
// Define the regular(non-admin) user
4335
userName := GetNotebookUserName(test)
4436
userToken := GetNotebookUserToken(test)
37+
jupyterNotebookConfigMapFileName := "mnist_kfto.ipynb"
38+
mnist := readMnistScriptTemplate(test, "resources/kfto_sdk_train.py")
4539

4640
// Create role binding with Namespace specific admin cluster role
4741
CreateUserRoleBindingWithClusterRole(test, userName, namespace.Name, "admin")
4842

43+
requiredChangesInNotebook := map[string]string{
44+
"${api_url}": GetOpenShiftApiUrl(test),
45+
"${train_function}": "train_func_2",
46+
"${password}": userToken,
47+
"${num_gpus}": "2",
48+
"${namespace}": namespace.Name,
49+
}
50+
51+
jupyterNotebook := string(ReadFile(test, "resources/mnist_kfto.ipynb"))
52+
for oldValue, newValue := range requiredChangesInNotebook {
53+
jupyterNotebook = strings.Replace(string(jupyterNotebook), oldValue, newValue, -1)
54+
}
55+
56+
config := CreateConfigMap(test, namespace.Name, map[string][]byte{
57+
jupyterNotebookConfigMapFileName: []byte(jupyterNotebook),
58+
"kfto_sdk_mnist.py": mnist,
59+
})
60+
4961
// Create Notebook CR
5062
createNotebook(test, namespace, userToken, config.Name, jupyterNotebookConfigMapFileName, 0)
63+
64+
// Gracefully cleanup Notebook
65+
defer func() {
66+
deleteNotebook(test, namespace)
67+
test.Eventually(listNotebooks(test, namespace), TestTimeoutGpuProvisioning).Should(HaveLen(0))
68+
}()
69+
70+
// Make sure pytorch job is created
71+
Eventually(PyTorchJob(test, namespace.Name, "pytorch-ddp")).
72+
Should(WithTransform(PyTorchJobConditionRunning, Equal(v1.ConditionTrue)))
73+
74+
// Make sure that the job eventually succeeds
75+
Eventually(PyTorchJob(test, namespace.Name, "pytorch-ddp")).
76+
Should(WithTransform(PyTorchJobConditionSucceeded, Equal(v1.ConditionTrue)))
77+
78+
// TODO: write torch job logs?
5179
time.Sleep(60 * time.Second)
5280
}
5381

tests/kfto/notebook.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,20 @@ func GetNotebookImage(t Test) string {
125125
}
126126
return notebook_image
127127
}
128+
129+
func deleteNotebook(test Test, namespace *corev1.Namespace) {
130+
err := test.Client().Dynamic().Resource(notebookResource).Namespace(namespace.Name).Delete(test.Ctx(), "jupyter-nb-kube-3aadmin", metav1.DeleteOptions{})
131+
test.Expect(err).NotTo(gomega.HaveOccurred())
132+
}
133+
134+
func listNotebooks(test Test, namespace *corev1.Namespace) []*unstructured.Unstructured {
135+
ntbs, err := test.Client().Dynamic().Resource(notebookResource).Namespace(namespace.Name).List(test.Ctx(), metav1.ListOptions{})
136+
test.Expect(err).NotTo(gomega.HaveOccurred())
137+
138+
ntbsp := []*unstructured.Unstructured{}
139+
for _, v := range ntbs.Items {
140+
ntbsp = append(ntbsp, &v)
141+
}
142+
143+
return ntbsp
144+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
def train_func():
2+
import os
3+
import torch
4+
import torch.distributed as dist
5+
import torch.nn as nn
6+
import torch.optim as optim
7+
from torchvision import datasets, transforms
8+
from torch.utils.data import DataLoader, DistributedSampler
9+
10+
# Initialize distributed process group
11+
dist.init_process_group(backend="nccl" if torch.cuda.is_available() else "gloo")
12+
rank = dist.get_rank()
13+
world_size = dist.get_world_size()
14+
local_rank = int(os.getenv("LOCAL_RANK", 0))
15+
torch.cuda.set_device(local_rank)
16+
17+
# Configuration
18+
batch_size = 64
19+
epochs = 5
20+
learning_rate = 0.01
21+
22+
# Dataset and DataLoader
23+
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
24+
train_dataset = datasets.MNIST(root="/tmp/datasets/mnist", train=True, download=True, transform=transform)
25+
train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
26+
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, sampler=train_sampler)
27+
28+
# Model, Loss, and Optimizer
29+
model = nn.Sequential(
30+
nn.Flatten(),
31+
nn.Linear(28 * 28, 128),
32+
nn.ReLU(),
33+
nn.Linear(128, 10)
34+
).cuda(local_rank)
35+
36+
model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
37+
criterion = nn.CrossEntropyLoss().cuda(local_rank)
38+
optimizer = optim.SGD(model.parameters(), lr=learning_rate)
39+
40+
# Training loop
41+
for epoch in range(epochs):
42+
model.train()
43+
epoch_loss = 0
44+
for batch_idx, (data, target) in enumerate(train_loader):
45+
data, target = data.cuda(local_rank, non_blocking=True), target.cuda(local_rank, non_blocking=True)
46+
47+
optimizer.zero_grad()
48+
output = model(data)
49+
loss = criterion(output, target)
50+
loss.backward()
51+
optimizer.step()
52+
53+
epoch_loss += loss.item()
54+
55+
# Log epoch stats
56+
print(f"Rank {rank} | Epoch {epoch + 1}/{epochs} | Loss: {epoch_loss / len(train_loader)}")
57+
58+
# Cleanup
59+
dist.destroy_process_group()
60+
61+
def train_func_2():
62+
import os
63+
import torch
64+
import torch.nn.functional as F
65+
from torch.utils.data import DistributedSampler
66+
from torchvision import datasets, transforms
67+
import torch.distributed as dist
68+
69+
# [1] Setup PyTorch DDP. Distributed environment will be set automatically by Training Operator.
70+
dist.init_process_group(backend="nccl" if torch.cuda.is_available() else "gloo")
71+
Distributor = torch.nn.parallel.DistributedDataParallel
72+
local_rank = int(os.getenv("LOCAL_RANK", 0))
73+
print(
74+
"Distributed Training for WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}".format(
75+
dist.get_world_size(),
76+
dist.get_rank(),
77+
local_rank,
78+
)
79+
)
80+
81+
# [2] Create PyTorch CNN Model.
82+
class Net(torch.nn.Module):
83+
def __init__(self):
84+
super(Net, self).__init__()
85+
self.conv1 = torch.nn.Conv2d(1, 20, 5, 1)
86+
self.conv2 = torch.nn.Conv2d(20, 50, 5, 1)
87+
self.fc1 = torch.nn.Linear(4 * 4 * 50, 500)
88+
self.fc2 = torch.nn.Linear(500, 10)
89+
90+
def forward(self, x):
91+
x = F.relu(self.conv1(x))
92+
x = F.max_pool2d(x, 2, 2)
93+
x = F.relu(self.conv2(x))
94+
x = F.max_pool2d(x, 2, 2)
95+
x = x.view(-1, 4 * 4 * 50)
96+
x = F.relu(self.fc1(x))
97+
x = self.fc2(x)
98+
return F.log_softmax(x, dim=1)
99+
100+
# [3] Attach model to the correct GPU device and distributor.
101+
device = torch.device(f"cuda:{local_rank}")
102+
model = Net().to(device)
103+
model = Distributor(model)
104+
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
105+
106+
# [4] Setup FashionMNIST dataloader and distribute data across PyTorchJob workers.
107+
dataset = datasets.FashionMNIST(
108+
"./data",
109+
download=True,
110+
train=True,
111+
transform=transforms.Compose([transforms.ToTensor()]),
112+
)
113+
train_loader = torch.utils.data.DataLoader(
114+
dataset=dataset,
115+
batch_size=128,
116+
sampler=DistributedSampler(dataset),
117+
)
118+
119+
# [5] Start model Training.
120+
for epoch in range(3):
121+
for batch_idx, (data, target) in enumerate(train_loader):
122+
# Attach Tensors to the device.
123+
data = data.to(device)
124+
target = target.to(device)
125+
126+
optimizer.zero_grad()
127+
output = model(data)
128+
loss = F.nll_loss(output, target)
129+
loss.backward()
130+
optimizer.step()
131+
if batch_idx % 10 == 0 and dist.get_rank() == 0:
132+
print(
133+
"Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format(
134+
epoch,
135+
batch_idx * len(data),
136+
len(train_loader.dataset),
137+
100.0 * batch_idx / len(train_loader),
138+
loss.item(),
139+
)
140+
)

0 commit comments

Comments
 (0)