Skip to content
This repository was archived by the owner on Feb 10, 2023. It is now read-only.

Commit 51eb116

Browse files
Merge pull request #1 from Statflo/s3-multipart-upload
Multipart Upload
2 parents 1e2554e + 45fb309 commit 51eb116

16 files changed

+713
-42
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ Map<String, ?> env = ImmutableMap.<String, Object> builder()
5959
FileSystems.newFileSystem("s3:///", env, Thread.currentThread().getContextClassLoader());
6060
```
6161

62+
##### Uploading Objects Using Multipart Upload API
63+
64+
By default s3fs will upload an object to s3 by calling `AmazonS3Client#putObject()`.
65+
If you whant to upload file using the Multipart Upload API :
66+
67+
```java
68+
Map<String, ?> env = ImmutableMap.<String, Object> builder()
69+
.put(com.upplication.s3fs.AmazonS3Factory.MULTIPART_UPLOAD_ENABLED, "true")
70+
.build()
71+
FileSystems.newFileSystem("s3:///", env, Thread.currentThread().getContextClassLoader());
72+
```
73+
6274
Complete settings lists:
6375

6476
* s3fs_access_key

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>com.upplication</groupId>
55
<artifactId>s3fs</artifactId>
66
<packaging>jar</packaging>
7-
<version>2.2.2</version>
7+
<version>2.3.0-SNAPSHOT</version>
88
<name>s3fs</name>
99
<description>S3 filesystem provider for Java 7</description>
1010
<url>https://github.com/Upplication/Amazon-S3-FileSystem-NIO2</url>
@@ -62,6 +62,7 @@
6262
<com.google.guava.guava.version>18.0</com.google.guava.guava.version>
6363
<org.apache.tika.tika-core.version>1.5</org.apache.tika.tika-core.version>
6464
<com.google.code.findbugs.jsr305.version>1.3.9</com.google.code.findbugs.jsr305.version>
65+
<s3.stream.upload.version>1.0.1</s3.stream.upload.version>
6566
</properties>
6667

6768
<dependencies>
@@ -101,6 +102,11 @@
101102
<artifactId>jsr305</artifactId>
102103
<version>${com.google.code.findbugs.jsr305.version}</version>
103104
</dependency>
105+
<dependency>
106+
<groupId>com.github.alexmojaki</groupId>
107+
<artifactId>s3-stream-upload</artifactId>
108+
<version>${s3.stream.upload.version}</version>
109+
</dependency>
104110

105111
<!-- for testing -->
106112
<dependency>

src/main/java/com/upplication/s3fs/S3FileChannel.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.amazonaws.services.s3.model.ObjectMetadata;
44
import com.amazonaws.services.s3.model.S3Object;
5-
import com.amazonaws.util.IOUtils;
65
import org.apache.tika.Tika;
76

87
import java.io.*;

src/main/java/com/upplication/s3fs/S3FileSystem.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.amazonaws.services.s3.model.Bucket;
1616
import com.google.common.collect.ImmutableList;
1717
import com.google.common.collect.ImmutableSet;
18+
import java.util.Properties;
1819

1920
/**
2021
* S3FileSystem with a concrete client configured and ready to use.
@@ -24,16 +25,18 @@
2425
public class S3FileSystem extends FileSystem implements Comparable<S3FileSystem> {
2526

2627
private final S3FileSystemProvider provider;
28+
private final Properties properties;
2729
private final String key;
2830
private final AmazonS3 client;
2931
private final String endpoint;
3032
private int cache;
3133

32-
public S3FileSystem(S3FileSystemProvider provider, String key, AmazonS3 client, String endpoint) {
34+
public S3FileSystem(S3FileSystemProvider provider, String key, AmazonS3 client, String endpoint, Properties props) {
3335
this.provider = provider;
3436
this.key = key;
3537
this.client = client;
3638
this.endpoint = endpoint;
39+
this.properties = props;
3740
this.cache = 60000; // 1 minute cache for the s3Path
3841
}
3942

@@ -176,4 +179,8 @@ public int compareTo(S3FileSystem o) {
176179
public int getCache() {
177180
return cache;
178181
}
182+
183+
public Properties getProperties() {
184+
return properties;
185+
}
179186
}

src/main/java/com/upplication/s3fs/S3FileSystemProvider.java

Lines changed: 115 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.amazonaws.services.s3.model.Bucket;
77
import com.amazonaws.services.s3.model.ObjectMetadata;
88
import com.amazonaws.services.s3.model.S3Object;
9+
import com.amazonaws.services.s3.model.S3ObjectId;
910
import com.google.common.base.Preconditions;
1011
import com.google.common.collect.ImmutableList;
1112
import com.google.common.collect.ImmutableSet;
@@ -31,9 +32,13 @@
3132
import java.util.concurrent.ConcurrentHashMap;
3233
import java.util.concurrent.ConcurrentMap;
3334

35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
3438
import static com.google.common.collect.Sets.difference;
3539
import static com.upplication.s3fs.AmazonS3Factory.*;
3640
import static java.lang.String.format;
41+
import static java.lang.String.format;
3742

3843
/**
3944
* Spec:
@@ -65,13 +70,21 @@
6570
*/
6671
public class S3FileSystemProvider extends FileSystemProvider {
6772

73+
private static final Logger LOGGER = LoggerFactory.getLogger(S3FileSystemProvider.class);
6874
public static final String CHARSET_KEY = "s3fs_charset";
6975
public static final String AMAZON_S3_FACTORY_CLASS = "s3fs_amazon_s3_factory";
76+
public static final String MULTIPART_UPLOAD_ENABLED = "s3fs_multipart_upload_enabled";
77+
public static final String MULTIPART_UPLOAD_PART_SIZE = "s3fs_multipart_upload_part_size";
78+
public static final String MULTIPART_UPLOAD_NUM_STREAMS = "s3fs_multipart_upload_num_streams";
79+
public static final String MULTIPART_UPLOAD_QUEUE_CAPACITY = "s3fs_multipart_upload_queue_capacity";
80+
public static final String MULTIPART_UPLOAD_NUM_UPLOAD_THREADS = "s3fs_multipart_upload_num_upload_threads";
7081

7182
private static final ConcurrentMap<String, S3FileSystem> fileSystems = new ConcurrentHashMap<>();
7283
private static final List<String> PROPS_TO_OVERLOAD = Arrays.asList(ACCESS_KEY, SECRET_KEY, REQUEST_METRIC_COLLECTOR_CLASS, CONNECTION_TIMEOUT, MAX_CONNECTIONS, MAX_ERROR_RETRY, PROTOCOL, PROXY_DOMAIN,
7384
PROXY_HOST, PROXY_PASSWORD, PROXY_PORT, PROXY_USERNAME, PROXY_WORKSTATION, SOCKET_SEND_BUFFER_SIZE_HINT, SOCKET_RECEIVE_BUFFER_SIZE_HINT, SOCKET_TIMEOUT,
74-
USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS);
85+
USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS,
86+
MULTIPART_UPLOAD_ENABLED, MULTIPART_UPLOAD_PART_SIZE, MULTIPART_UPLOAD_NUM_STREAMS,
87+
MULTIPART_UPLOAD_QUEUE_CAPACITY, MULTIPART_UPLOAD_NUM_UPLOAD_THREADS);
7588

7689
private S3Utils s3Utils = new S3Utils();
7790
private Cache cache = new Cache();
@@ -95,6 +108,9 @@ public FileSystem newFileSystem(URI uri, Map<String, ?> env) {
95108
// create the filesystem with the final properties, store and return
96109
S3FileSystem fileSystem = createFileSystem(uri, props);
97110
fileSystems.put(fileSystem.getKey(), fileSystem);
111+
112+
LOGGER.debug("New file system created. url:{}, props:{}", uri, props);
113+
98114
return fileSystem;
99115
}
100116

@@ -302,6 +318,8 @@ public Path getPath(URI uri) {
302318

303319
@Override
304320
public DirectoryStream<Path> newDirectoryStream(Path dir, DirectoryStream.Filter<? super Path> filter) throws IOException {
321+
LOGGER.debug("New directory stream. path:{}, filter:{}", dir, filter);
322+
305323
final S3Path s3Path = toS3Path(dir);
306324
return new DirectoryStream<Path>() {
307325
@Override
@@ -316,8 +334,47 @@ public Iterator<Path> iterator() {
316334
};
317335
}
318336

337+
private S3MultipartUploadOutputStream createMultipartUploadOutputStream(final S3Path s3Path, Set<? extends OpenOption> opts) throws IOException {
338+
final S3ObjectId objectId = s3Path.toS3ObjectId();
339+
final Set<OpenOption> options = Sets.newHashSet(opts);
340+
final S3FileSystem fileSystem = s3Path.getFileSystem();
341+
final Properties properties = fileSystem.getProperties();
342+
final AmazonS3 client = s3Path.getFileSystem().getClient();
343+
final boolean createOpt = options.remove(StandardOpenOption.CREATE);
344+
final boolean createNewOpt = options.remove(StandardOpenOption.CREATE_NEW);
345+
final S3MultipartUploadOutputStream stream = new S3MultipartUploadOutputStream(client, objectId, properties);
346+
347+
// validate options
348+
if (options.isEmpty()) {
349+
return stream;
350+
}
351+
352+
// Remove irrelevant/ignored options
353+
options.remove(StandardOpenOption.WRITE);
354+
options.remove(StandardOpenOption.SPARSE);
355+
options.remove(StandardOpenOption.TRUNCATE_EXISTING);
356+
357+
if (!options.isEmpty()) {
358+
throw new UnsupportedOperationException(format("Unsupported operation: %s", options));
359+
}
360+
361+
if (createNewOpt && fileSystem.provider().exists(s3Path)) {
362+
fileSystem.provider().delete(s3Path);
363+
}
364+
365+
if (!createOpt && fileSystem.provider().exists(s3Path)) {
366+
throw new FileAlreadyExistsException(format("Target already exists: %s", s3Path));
367+
}
368+
369+
return stream;
370+
}
371+
319372
@Override
320373
public InputStream newInputStream(Path path, OpenOption... options) throws IOException {
374+
LOGGER.debug("New input stream. path:{}, options:{}", path, options);
375+
376+
System.out.println("newInputStream");
377+
321378
S3Path s3Path = toS3Path(path);
322379
String key = s3Path.getKey();
323380

@@ -342,14 +399,46 @@ public InputStream newInputStream(Path path, OpenOption... options) throws IOExc
342399

343400
@Override
344401
public SeekableByteChannel newByteChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
345-
S3Path s3Path = toS3Path(path);
346-
return new S3SeekableByteChannel(s3Path, options);
402+
LOGGER.debug("New byte channel. path:{}, options:{}", path, options);
403+
404+
final S3Path s3Path = toS3Path(path);
405+
final boolean multipartEnabled = isMultipartUploadCapable(s3Path, options);
406+
407+
if (!multipartEnabled) {
408+
409+
LOGGER.debug("Using S3SeekableByteChannel");
410+
411+
return new S3SeekableByteChannel(s3Path, options);
412+
}
413+
414+
LOGGER.debug("Using S3MultipartFileChannel");
415+
416+
final S3MultipartUploadOutputStream outputStream = createMultipartUploadOutputStream(s3Path, options);
417+
final FileChannel channel = new S3MultipartUploadChannel(outputStream);
418+
419+
return channel;
347420
}
348421

349422
@Override
350423
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
351-
S3Path s3Path = toS3Path(path);
352-
return new S3FileChannel(s3Path, options);
424+
LOGGER.debug("New file channel. path:{}, filter:{}", path, options);
425+
426+
final S3Path s3Path = toS3Path(path);
427+
final boolean multipartEnabled = isMultipartUploadCapable(s3Path, options);
428+
429+
if (!multipartEnabled) {
430+
431+
LOGGER.debug("Using S3FileChannel");
432+
433+
return new S3FileChannel(s3Path, options);
434+
}
435+
436+
LOGGER.debug("Using S3MultipartFileChannel");
437+
438+
final S3MultipartUploadOutputStream outputStream = createMultipartUploadOutputStream(s3Path, options);
439+
final FileChannel channel = new S3MultipartUploadChannel(outputStream);
440+
441+
return channel;
353442
}
354443

355444
/**
@@ -359,6 +448,8 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options,
359448
*/
360449
@Override
361450
public void createDirectory(Path dir, FileAttribute<?>... attrs) throws IOException {
451+
LOGGER.debug("Create directory. path:{}, attrs:{}", dir, attrs);
452+
362453
S3Path s3Path = toS3Path(dir);
363454
Preconditions.checkArgument(attrs.length == 0, "attrs not yet supported: %s", ImmutableList.copyOf(attrs)); // TODO
364455
if (exists(s3Path))
@@ -378,6 +469,8 @@ public void createDirectory(Path dir, FileAttribute<?>... attrs) throws IOExcept
378469

379470
@Override
380471
public void delete(Path path) throws IOException {
472+
LOGGER.debug("Delete path:{}", path);
473+
381474
S3Path s3Path = toS3Path(path);
382475
if (Files.notExists(s3Path))
383476
throw new NoSuchFileException("the path: " + this + " not exists");
@@ -393,6 +486,8 @@ public void delete(Path path) throws IOException {
393486

394487
@Override
395488
public void copy(Path source, Path target, CopyOption... options) throws IOException {
489+
LOGGER.debug("Copy {} to target. options:{}", source, target, options);
490+
396491
if (isSameFile(source, target))
397492
return;
398493

@@ -424,6 +519,8 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep
424519

425520
@Override
426521
public void move(Path source, Path target, CopyOption... options) throws IOException {
522+
LOGGER.debug("Move {} to target. options:{}", source, target, options);
523+
427524
if (options != null && Arrays.asList(options).contains(StandardCopyOption.ATOMIC_MOVE))
428525
throw new AtomicMoveNotSupportedException(source.toString(), target.toString(), "Atomic not supported");
429526
copy(source, target, options);
@@ -550,7 +647,7 @@ public void setAttribute(Path path, String attribute, Object value, LinkOption..
550647
* @return S3FileSystem never null
551648
*/
552649
public S3FileSystem createFileSystem(URI uri, Properties props) {
553-
return new S3FileSystem(this, getFileSystemKey(uri, props), getAmazonS3(uri, props), uri.getHost());
650+
return new S3FileSystem(this, getFileSystemKey(uri, props), getAmazonS3(uri, props), uri.getHost(), props);
554651
}
555652

556653
protected AmazonS3 getAmazonS3(URI uri, Properties props) {
@@ -634,4 +731,16 @@ public Cache getCache() {
634731
public void setCache(Cache cache) {
635732
this.cache = cache;
636733
}
734+
735+
private boolean isMultipartUploadCapable(final S3Path s3Path, final Set<? extends OpenOption> options) {
736+
// Not supported options
737+
if (options.contains(StandardOpenOption.READ) || options.contains(StandardOpenOption.APPEND)) {
738+
return false;
739+
}
740+
741+
final S3FileSystem fileSystem = s3Path.getFileSystem();
742+
final Properties properties = fileSystem.getProperties();
743+
744+
return Boolean.parseBoolean(properties.getProperty(MULTIPART_UPLOAD_ENABLED, "false"));
745+
}
637746
}

0 commit comments

Comments
 (0)