Skip to content

Commit 7fe9f9f

Browse files
committed
[WIP] Templates share image
1 parent 6b018bc commit 7fe9f9f

File tree

39 files changed

+345
-197
lines changed

39 files changed

+345
-197
lines changed

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.google.api.services.dataflow.model.Job;
2727
import com.google.auth.Credentials;
2828
import com.google.cloud.bigquery.TableId;
29+
import com.google.cloud.storage.Blob;
30+
import com.google.cloud.storage.Storage;
2931
import com.google.cloud.teleport.metadata.DirectRunnerTest;
3032
import com.google.cloud.teleport.metadata.MultiTemplateIntegrationTest;
3133
import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
@@ -52,6 +54,7 @@
5254
import java.util.Date;
5355
import java.util.List;
5456
import java.util.Random;
57+
import java.util.UUID;
5558
import java.util.concurrent.ExecutionException;
5659
import org.apache.beam.it.common.PipelineLauncher;
5760
import org.apache.beam.it.common.PipelineLauncher.JobState;
@@ -75,6 +78,7 @@
7578
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
7679
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
7780
import org.apache.commons.lang3.RandomStringUtils;
81+
import org.apache.parquet.Strings;
7882
import org.checkerframework.checker.nullness.qual.Nullable;
7983
import org.junit.After;
8084
import org.junit.Before;
@@ -142,7 +146,14 @@ protected void starting(Description description) {
142146
})
143147
.build();
144148

149+
public static final String STAGING_PREFIX;
150+
145151
static {
152+
STAGING_PREFIX =
153+
new SimpleDateFormat("yyyy-MM-dd-HH-mm").format(new Date())
154+
+ "-"
155+
+ UUID.randomUUID().toString().substring(0, 6)
156+
+ "_IT";
146157
Runtime.getRuntime().addShutdownHook(new Thread(stagedTemplates::invalidateAll));
147158
}
148159

@@ -308,8 +319,6 @@ private String getSpecPath(
308319
() -> {
309320
LOG.info("Preparing test for {} ({})", templateMetadata.name(), dataflowTemplateClass);
310321

311-
String prefix = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date()) + "_IT";
312-
313322
File pom = new File(pomPath).getAbsoluteFile();
314323
if (!pom.exists()) {
315324
throw new IllegalArgumentException(
@@ -332,7 +341,26 @@ private String getSpecPath(
332341
+ " -DspecPath or provide a proper -DstageBucket for automatic staging.");
333342
}
334343

335-
String[] mavenCmd = buildMavenStageCommand(prefix, pom, bucketName, template);
344+
boolean flex =
345+
templateMetadata.flexContainerName() != null
346+
&& !templateMetadata.flexContainerName().isEmpty();
347+
String blobPath =
348+
String.format(
349+
"%s/%s%s", STAGING_PREFIX, flex ? "flex/" : "", templateMetadata.name());
350+
String stagePath = String.format("gs://%s/%s", bucketName, blobPath);
351+
352+
// Check template metadata file existence
353+
try (Storage storage = ArtifactUtils.createStorageClient(credentials)) {
354+
Blob blob =
355+
storage.get(
356+
bucketName, blobPath, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
357+
if (blob != null && blob.exists() && blob.getSize() > 0) {
358+
LOG.info("Find templates at {}", stagePath);
359+
return stagePath;
360+
}
361+
}
362+
363+
String[] mavenCmd = buildMavenStageCommand(STAGING_PREFIX, pom, bucketName, template);
336364
LOG.info("Running command to stage templates: {}", String.join(" ", mavenCmd));
337365

338366
try {
@@ -344,13 +372,7 @@ private String getSpecPath(
344372
throw new RuntimeException("Error staging template, check Maven logs.");
345373
}
346374

347-
boolean flex =
348-
templateMetadata.flexContainerName() != null
349-
&& !templateMetadata.flexContainerName().isEmpty();
350-
return String.format(
351-
"gs://%s/%s/%s%s",
352-
bucketName, prefix, flex ? "flex/" : "", templateMetadata.name());
353-
375+
return stagePath;
354376
} catch (Exception e) {
355377
throw new IllegalArgumentException("Error staging template", e);
356378
}
@@ -417,6 +439,14 @@ private String[] buildMavenStageCommand(
417439
// that will copy only the shaded jar to the docker image.
418440
boolean skipShade = templateMetadata.type() != TemplateType.XLANG;
419441

442+
String templateOrContainer;
443+
@Nullable String flexContainerName = templateMetadata.flexContainerName();
444+
if (Strings.isNullOrEmpty(flexContainerName)) {
445+
templateOrContainer = "-DtemplateName=" + templateMetadata.name();
446+
} else {
447+
templateOrContainer = "-DflexContainerName=" + flexContainerName;
448+
}
449+
420450
return new String[] {
421451
"mvn",
422452
"compile",
@@ -442,7 +472,7 @@ private String[] buildMavenStageCommand(
442472
"-DbucketName=" + bucketName,
443473
"-DgcpTempLocation=" + bucketName,
444474
"-DstagePrefix=" + prefix,
445-
"-DtemplateName=" + templateMetadata.name(),
475+
templateOrContainer,
446476
"-DunifiedWorker=" + System.getProperty("unifiedWorker"),
447477
// Print stacktrace when command fails
448478
"-e"
@@ -814,7 +844,7 @@ private static void cleanUpTemplates(String metafileName) {
814844
if (cmd != null) {
815845
Process exec = Runtime.getRuntime().exec(cmd);
816846
if (exec.waitFor() != 0) {
817-
LOG.warn("Error deleting staged image {}", imgName);
847+
LOG.warn("Error deleting staged image {}. It might already be deleted.", imgName);
818848
}
819849
}
820850
} else {

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/FlexTemplateDataflowJobResourceManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@
3636
import org.slf4j.Logger;
3737
import org.slf4j.LoggerFactory;
3838

39+
/**
40+
* Used when additional flex template is needed for integration tests (e.g. using another template
41+
* to generate data). For generic template integration test, Use TemplateTestBase's subclasses to
42+
* manage the templates.
43+
*/
3944
public class FlexTemplateDataflowJobResourceManager implements ResourceManager {
4045

4146
private static final Logger LOG =
@@ -49,6 +54,9 @@ public class FlexTemplateDataflowJobResourceManager implements ResourceManager {
4954
private static final String PROJECT = TestProperties.project();
5055
private static final String REGION = TestProperties.region();
5156
private static final Credentials CREDENTIALS = TestProperties.googleCredentials();
57+
// TODO(yathu): we should use TemplateTestBase.stagedTemplates to managed all staged templates
58+
// during workflow run.
59+
// Currently templates involved here get compiled and staged twice.
5260
private static Map<String, String> specPaths = new HashMap<>();
5361

5462
private FlexTemplateDataflowJobResourceManager(Builder builder) {
@@ -173,6 +181,11 @@ public FlexTemplateDataflowJobResourceManager build() {
173181
}
174182
}
175183

184+
// TODO(yathu) this method was forked and diverged from TemplateTestBase.buildAndStageTemplate,
185+
// causing involved
186+
// templates get compiled and staged twice. We should use TemplateTestBase.stagedTemplates to
187+
// managed all staged
188+
// templates during workflow run.
176189
private void buildAndStageTemplate(
177190
String templateName, String modulePath, String additionalMavenProfile) {
178191
LOG.info("Building and Staging {} template", templateName);

plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/DockerfileGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@
4141
* includes Python, Yaml and Xlang templates
4242
*/
4343
public class DockerfileGenerator {
44-
44+
// TODO(DO NOT MERGE) - testing!
4545
public static final String BASE_CONTAINER_IMAGE =
46-
"gcr.io/dataflow-templates-base/java17-template-launcher-base-distroless:latest";
46+
"gcr.io/dataflow-build/yathu/java17-template-launcher-base-distroless:850175030";
4747
// Keep in sync with python version used in
4848
// https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/python/generate_dependencies.sh
4949
public static final String BASE_PYTHON_CONTAINER_IMAGE =

plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/TemplateSpecsGenerator.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.io.File;
2727
import java.io.FileWriter;
2828
import java.io.IOException;
29-
import java.nio.file.Path;
3029
import java.util.List;
3130
import java.util.logging.Logger;
3231
import org.apache.commons.lang3.StringUtils;
@@ -121,14 +120,6 @@ public File saveMetadata(
121120
}
122121

123122
String imageName = templateDash.toLowerCase();
124-
if (StringUtils.isNotEmpty(templateAnnotation.flexContainerName())) {
125-
imageName = Path.of(templateAnnotation.flexContainerName()).getFileName().toString();
126-
}
127-
128-
if (!targetDirectory.exists()) {
129-
targetDirectory.mkdirs();
130-
}
131-
132123
File file = new File(targetDirectory, imageName + "-generated-metadata.json");
133124
LOG.info("Saving image spec metadata " + file.getAbsolutePath());
134125

plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/TemplateSpecsGeneratorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ public void saveMetadataNestedFlex() {
105105
assertNotNull(saveMetadata);
106106
assertTrue(saveMetadata.exists());
107107
assertEquals(
108-
saveMetadata.getPath(),
109-
outputFolder.toPath().resolve("AtoBNestedFlex-generated-metadata.json").toString());
108+
outputFolder.toPath().resolve("atobnestedflex-generated-metadata.json").toString(),
109+
saveMetadata.getPath());
110110
}
111111

112112
@Test

plugins/templates-maven-plugin/src/main/java/com/google/cloud/teleport/plugin/maven/TemplatesReleaseMojo.java

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ public class TemplatesReleaseMojo extends TemplatesBaseMojo {
5757
@Parameter(defaultValue = "${templateName}", readonly = true, required = false)
5858
protected String templateName;
5959

60+
@Parameter(defaultValue = "${flexContainerName}", readonly = true, required = false)
61+
protected String flexContainerName;
62+
6063
@Parameter(defaultValue = "${bucketName}", readonly = true, required = true)
6164
protected String bucketName;
6265

@@ -170,6 +173,36 @@ public void execute() throws MojoExecutionException {
170173
return;
171174
}
172175

176+
String useRegion = StringUtils.isNotEmpty(region) ? region : "us-central1";
177+
TemplatesStageMojo configuredMojo =
178+
new TemplatesStageMojo(
179+
project,
180+
session,
181+
outputDirectory,
182+
outputClassesDirectory,
183+
resourcesDirectory,
184+
targetDirectory,
185+
projectId,
186+
templateName,
187+
flexContainerName,
188+
bucketName,
189+
librariesBucketName,
190+
stagePrefix,
191+
useRegion,
192+
artifactRegion,
193+
gcpTempLocation,
194+
baseContainerImage,
195+
basePythonContainerImage,
196+
pythonTemplateLauncherEntryPoint,
197+
javaTemplateLauncherEntryPoint,
198+
pythonVersion,
199+
beamVersion,
200+
artifactRegistry,
201+
stagingArtifactRegistry,
202+
unifiedWorker,
203+
generateSBOM);
204+
configuredMojo.stageCommandSpecs(templateDefinitions);
205+
173206
for (TemplateDefinitions definition : templateDefinitions) {
174207

175208
ImageSpec imageSpec = definition.buildSpecModel(true);
@@ -181,36 +214,6 @@ public void execute() throws MojoExecutionException {
181214

182215
LOG.info("Staging template {}...", currentTemplateName);
183216

184-
String useRegion = StringUtils.isNotEmpty(region) ? region : "us-central1";
185-
186-
// TODO: is there a better way to get the plugin on the _same project_?
187-
TemplatesStageMojo configuredMojo =
188-
new TemplatesStageMojo(
189-
project,
190-
session,
191-
outputDirectory,
192-
outputClassesDirectory,
193-
resourcesDirectory,
194-
targetDirectory,
195-
projectId,
196-
templateName,
197-
bucketName,
198-
librariesBucketName,
199-
stagePrefix,
200-
useRegion,
201-
artifactRegion,
202-
gcpTempLocation,
203-
baseContainerImage,
204-
basePythonContainerImage,
205-
pythonTemplateLauncherEntryPoint,
206-
javaTemplateLauncherEntryPoint,
207-
pythonVersion,
208-
beamVersion,
209-
artifactRegistry,
210-
stagingArtifactRegistry,
211-
unifiedWorker,
212-
generateSBOM);
213-
214217
String templatePath = configuredMojo.stageTemplate(definition, imageSpec, pluginManager);
215218

216219
if (!definition.getTemplateAnnotation().stageImageOnly()) {

plugins/templates-maven-plugin/src/main/java/com/google/cloud/teleport/plugin/maven/TemplatesRunMojo.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ public class TemplatesRunMojo extends TemplatesBaseMojo {
7474
@Parameter(defaultValue = "${templateName}", readonly = true, required = false)
7575
protected String templateName;
7676

77+
@Parameter(defaultValue = "${flexContainerName}", readonly = true, required = false)
78+
protected String flexContainerName;
79+
7780
@Parameter(defaultValue = "${bucketName}", readonly = true, required = true)
7881
protected String bucketName;
7982

@@ -203,6 +206,7 @@ public void execute() throws MojoExecutionException {
203206
targetDirectory,
204207
projectId,
205208
templateName,
209+
flexContainerName,
206210
bucketName,
207211
bucketName,
208212
stagePrefix,

0 commit comments

Comments
 (0)