Multipart Upload on S3 with jclouds

1. Goal

In the previous article on
S3 uploading
, we looked at how we can use the generic Blob APIs from
jclouds to upload content to S3. In this article we will use the S3
specific asynchronous API from jclouds
to upload content and leverage
the multipart upload functionality
provided
by S3
.

2. Preparation

2.1. Set up the custom API

The first part of the upload process is creating the jclouds API – this
is a custom API for Amazon S3:

public AWSS3AsyncClient s3AsyncClient() {
   String identity = ...
   String credentials = ...

   BlobStoreContext context = ContextBuilder.newBuilder("aws-s3").
      credentials(identity, credentials).buildView(BlobStoreContext.class);

   RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
   return providerContext.getAsyncApi();
}

2.2. Determining the number of parts for the content

Amazon S3 has a 5 MB limit for each part to be uploaded. As such, the
first thing we need to do is determine the right number of parts that we
can split our content into so that we don’t have parts below this 5 MB
limit:

public static int getMaximumNumberOfParts(byte[] byteArray) {
   int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024
   if (numberOfParts== 0) {
      return 1;
   }
   return numberOfParts;
}

2.3. Breaking the content into parts

Were going to break the byte array into a set number of parts:

public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {
   List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
   int fullSize = byteArray.length;
   long dimensionOfPart = fullSize / maxNumberOfParts;
   for (int i = 0; i < maxNumberOfParts; i++) {
      int previousSplitPoint = (int) (dimensionOfPart * i);
      int splitPoint = (int) (dimensionOfPart * (i + 1));
      if (i == (maxNumberOfParts - 1)) {
         splitPoint = fullSize;
      }
      byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
      parts.add(partBytes);
   }

   return parts;
}

We’re going to test the logic of breaking the byte array into parts –
we’re going to generate some bytes, split the byte array, recompose it
back together using Guava and verify that we get back the original:

@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {
   byte[] byteArray = randomByteData(16);

   int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
   List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);

   assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length,
      equalTo(byteArray.length));
   byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
   assertThat(byteArray, equalTo(unmultiplexed));
}

To generate the data, we simply use the support from Random:

byte[] randomByteData(int mb) {
   byte[] randomBytes = new byte[mb * 1024 * 1024];
   new Random().nextBytes(randomBytes);
   return randomBytes;
}

*2.4. Creating the Payloads

*

Now that we have determined the correct number of parts for our content
and we managed to break the content into parts, we need to generate the
Payload objects
for the jclouds API:

public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
   List<Payload> payloads = Lists.newArrayList();
   for (byte[] filePart : fileParts) {
      byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
      Payload partPayload = Payloads.newByteArrayPayload(filePart);
      partPayload.getContentMetadata().setContentLength((long) filePart.length);
      partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
      payloads.add(partPayload);
   }
   return payloads;
}

3. Upload

The upload process is a flexible multi-step process – this means:

  • the upload can be started before having all the data – data can be
    uploaded as it’s coming in

  • data is uploaded in chunks – if one of these operations fails, it
    can simply be retrieved

  • chunks can be uploaded in parallel – this can greatly increase the
    upload speed, especially in the case of large files

3.1. Initiating the Upload operation

The first step in the Upload operation is to initiate the process.
This request to S3 must contain the standard HTTP headers – the
ContentMD5 header in particular needs to be computed. Were going to
use the Guava hash function support here:

Hashing.md5().hashBytes(byteArray).asBytes();

This is the md5 hash of the entire byte array, not of the parts yet.

To initiate the upload, and for all further interactions with S3,
we’re going to use the AWSS3AsyncClient – the asynchronous API we
created earlier:

ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();

The *key* is the handle assigned to the object – this needs to be a
unique identifier specified by the client.

Also notice that, even though we’re using the async version of the API,
we’re blocking for the result of this operation – this is because we
will need the result of the initialize to be able to move forward.

The result of the operation is an upload id returned by S3 – this will
identify the upload throughout it’s lifecycle and will be present in all
subsequent upload operations.

3.2. Uploading the Parts

The next step is uploading the parts. Our goal here is to send these
requests in parallel, as the upload parts operation represent the bulk
of the upload process:

List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
   ListenableFuture<String> future = s3AsyncApi.uploadPart(
      container, key, partNumber + 1, uploadId, payloads.get(partNumber));
   ongoingOperations.add(future);
}

The part numbers need to be continuous but the order in which the
requests are send is not relevant.

After all of the upload part requests have been submitted, we need to
wait for their responses so that we can collect the individual ETag
value of each part:

Function<ListenableFuture<String>, String> getEtagFromOp =
  new Function<ListenableFuture<String>, String>() {
   public String apply(ListenableFuture<String> ongoingOperation) {
      try {
         return ongoingOperation.get();
      } catch (InterruptedException | ExecutionException e) {
         throw new IllegalStateException(e);
      }
   }
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);

If, for whatever reason, one of the upload part operations fails, the
operation can be retried
until it succeeds. The logic above does not
contain the retry mechanism, but building it in should be
straightforward enough.

3.3. Completing the Upload operation

The final step of the upload process is completing the multipart
operation
. The S3 API requires the responses from the previous parts
upload as a Map, which we can now easily create from the list of ETags
that we obtained above:

Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
   parts.put(i + 1, etagsOfParts.get(i));
}

And finally, send the complete request:

s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();

This will return final ETag of the finished object and will complete the
entire upload process.

4. Conclusion

In this article we built a multipart enabled, fully parallel upload
operation to S3, using the custom S3 jclouds API. This operation is
ready to be used as is, but it can be improved in a few ways.

First, retry logic should be added around the upload operations to
better deal with failures.

Next, for really large files, even though the mechanism is sending all
upload multipart requests in parallel, a throttling mechanism should
still limit the number of parallel requests being sent. This is both to
avoid bandwidth becoming a bottleneck as well as to make sure Amazon
itself doesn’t flag the upload process as exceeding an allowed limit of
requests per second – the
Guava
RateLimiter
can potentially be very well suited for this.

Leave a Reply

Your email address will not be published.