diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java index 4c27501e6340..67ebed51f3c2 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import java.util.function.UnaryOperator; +import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.annotations.SdkProtectedApi; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.core.FileTransformerConfiguration; @@ -162,6 +164,28 @@ default SplitResult split(Consumer split(SplittingTransformerConfiguration splitConfig, + UnaryOperator responseMapper) { + Validate.notNull(splitConfig, "splitConfig must not be null"); + Validate.notNull(responseMapper, "responseMapper must not be null"); + CompletableFuture future = new CompletableFuture<>(); + SdkPublisher> transformer = SplittingTransformer + .builder() + .upstreamResponseTransformer(this) + .maximumBufferSizeInBytes(splitConfig.bufferSizeInBytes()) + .resultFuture(future) + .responseMapper(responseMapper) + .build(); + return AsyncResponseTransformer.SplitResult.builder() + .publisher(transformer) + .resultFuture(future) + .build(); + } + /** * Each AsyncResponseTransformer should return a well-formed name that can be used to identify the implementation. * The Transformer name should only include alphanumeric characters. diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java index e250eab650b0..ab6216fefd61 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import java.util.function.UnaryOperator; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -79,6 +80,19 @@ public SplitResult> split(SplittingTransform .build(); } + @Override + public SplitResult> split( + SplittingTransformerConfiguration splitConfig, + UnaryOperator responseMapper) { + CompletableFuture> future = new CompletableFuture<>(); + SdkPublisher> transformer = + new ByteArraySplittingTransformer<>(this, future, responseMapper); + return AsyncResponseTransformer.SplitResult.>builder() + .publisher(transformer) + .resultFuture(future) + .build(); + } + @Override public String name() { return TransformerType.BYTES.getName(); diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java index 2531f7f8166b..bc3e273aa7ae 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -84,12 +85,22 @@ public class ByteArraySplittingTransformer implements SdkPublisher buffers; + private final UnaryOperator responseMapper; + public ByteArraySplittingTransformer(AsyncResponseTransformer> upstreamResponseTransformer, CompletableFuture> resultFuture) { + this(upstreamResponseTransformer, resultFuture, UnaryOperator.identity()); + } + + public ByteArraySplittingTransformer(AsyncResponseTransformer> + upstreamResponseTransformer, + CompletableFuture> resultFuture, + UnaryOperator responseMapper) { this.upstreamResponseTransformer = upstreamResponseTransformer; this.resultFuture = resultFuture; this.buffers = new ConcurrentHashMap<>(); + this.responseMapper = responseMapper; } @Override @@ -181,7 +192,7 @@ private void handleSubscriptionCancel() { CompletableFuture> upstreamPrepareFuture = upstreamResponseTransformer.prepare(); CompletableFutureUtils.forwardResultTo(upstreamPrepareFuture, resultFuture); - upstreamResponseTransformer.onResponse(responseT.get()); + upstreamResponseTransformer.onResponse(responseMapper.apply(responseT.get())); int totalPartCount = nextPartNumber.get() - 1; if (buffers.size() != totalPartCount) { diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java index d4cf1c7a2356..2eb12399c33b 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java @@ -19,6 +19,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.UnaryOperator; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -112,16 +113,18 @@ public class SplittingTransformer implements SdkPublisher upstreamResponseTransformer, - Long maximumBufferSizeInBytes, - CompletableFuture resultFuture) { + private final UnaryOperator responseMapper; + + private SplittingTransformer(Builder builder) { this.upstreamResponseTransformer = Validate.paramNotNull( - upstreamResponseTransformer, "upstreamResponseTransformer"); - this.resultFuture = Validate.paramNotNull( - resultFuture, "resultFuture"); - Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes"); + builder.upstreamResponseTransformer, "upstreamResponseTransformer"); + this.resultFuture = Validate.paramNotNull(builder.returnFuture, "resultFuture"); + Validate.notNull(builder.maximumBufferSize, "maximumBufferSizeInBytes"); this.maximumBufferInBytes = Validate.isPositive( - maximumBufferSizeInBytes, "maximumBufferSizeInBytes"); + builder.maximumBufferSize, "maximumBufferSizeInBytes"); + this.responseMapper = builder.responseMapper != null + ? builder.responseMapper + : UnaryOperator.identity(); this.resultFuture.whenComplete((r, e) -> { if (e == null) { @@ -296,7 +299,7 @@ public CompletableFuture prepare() { public void onResponse(ResponseT response) { if (onResponseCalled.compareAndSet(false, true)) { log.trace(() -> "calling onResponse on the upstream transformer"); - upstreamResponseTransformer.onResponse(response); + upstreamResponseTransformer.onResponse(responseMapper.apply(response)); } this.response = response; } @@ -393,6 +396,7 @@ public static final class Builder { private Long maximumBufferSize; private CompletableFuture returnFuture; private AsyncResponseTransformer upstreamResponseTransformer; + private UnaryOperator responseMapper; private Builder() { } @@ -437,10 +441,13 @@ public Builder resultFuture(CompletableFuture retur return this; } + public Builder responseMapper(UnaryOperator responseMapper) { + this.responseMapper = responseMapper; + return this; + } + public SplittingTransformer build() { - return new SplittingTransformer<>(this.upstreamResponseTransformer, - this.maximumBufferSize, - this.returnFuture); + return new SplittingTransformer<>(this); } } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java index c86a3fea12fb..7cf914dac662 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java @@ -334,7 +334,7 @@ private CopyObjectRequest attachSdkAttribute(CopyObjectRequest copyObjectRequest } private GetObjectRequest attachSdkAttribute(GetObjectRequest request, - Consumer builderMutation) { + Consumer builderMutation) { AwsRequestOverrideConfiguration modifiedRequestOverrideConfig = request.overrideConfiguration() .map(o -> o.toBuilder().applyMutation(builderMutation).build()) @@ -650,11 +650,18 @@ public final Download downloadWithPresignedUrl( TransferProgressUpdater progressUpdater = new TransferProgressUpdater(presignedDownloadRequest, null); progressUpdater.transferInitiated(); - responseTransformer = isS3ClientMultipartEnabled() - && presignedDownloadRequest.presignedUrlDownloadRequest().range() == null - ? progressUpdater.wrapForNonSerialFileDownload( - responseTransformer, GetObjectRequest.builder().build()) - : progressUpdater.wrapResponseTransformer(responseTransformer); + if (isS3ClientMultipartEnabled() + && presignedDownloadRequest.presignedUrlDownloadRequest().range() == null) { + if (responseTransformer.split(b -> b.bufferSizeInBytes(1L)).parallelSplitSupported()) { + responseTransformer = progressUpdater.wrapForNonSerialFileDownload( + responseTransformer, GetObjectRequest.builder().build()); + } else { + responseTransformer = progressUpdater.wrapResponseTransformerForMultipartDownload( + responseTransformer, GetObjectRequest.builder().build()); + } + } else { + responseTransformer = progressUpdater.wrapResponseTransformer(responseTransformer); + } progressUpdater.registerCompletion(returnFuture); try { diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/internal/multipart/CustomTransformerMultipartIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/internal/multipart/CustomTransformerMultipartIntegrationTest.java new file mode 100644 index 000000000000..5e96ff1c4302 --- /dev/null +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/internal/multipart/CustomTransformerMultipartIntegrationTest.java @@ -0,0 +1,204 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.internal.multipart; + +import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3IntegrationTestBase; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; +import software.amazon.awssdk.services.s3.model.ChecksumMode; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; + +/** + * Integration test verifying that a custom AsyncResponseTransformer receives + * correct full-object response metadata when used with the multipart client. + */ +@Timeout(value = 5, unit = TimeUnit.MINUTES) +public class CustomTransformerMultipartIntegrationTest extends S3IntegrationTestBase { + + private static final String BUCKET = temporaryBucketName(CustomTransformerMultipartIntegrationTest.class); + private static final int MIB = 1024 * 1024; + private static final int PART_SIZE = 5 * MIB; + private static final String LARGE_KEY = "large-object.dat"; + private static final String MPU_CHECKSUM_KEY = "mpu-checksum-object.dat"; + private static final long LARGE_OBJECT_SIZE = 3L * PART_SIZE; // 15MB → 3 parts + + private static S3AsyncClient multipartClient; + + @BeforeAll + static void setup() throws Exception { + setUp(); + createBucket(BUCKET); + + multipartClient = S3AsyncClient.builder() + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .region(DEFAULT_REGION) + .multipartEnabled(true) + .multipartConfiguration(c -> c.minimumPartSizeInBytes((long) PART_SIZE)) + .build(); + + // Upload a large object via multipart + byte[] data = new byte[(int) LARGE_OBJECT_SIZE]; + new Random(42).nextBytes(data); + multipartClient.putObject(r -> r.bucket(BUCKET).key(LARGE_KEY), + AsyncRequestBody.fromBytes(data)).join(); + + // Upload MPU object with CRC32 checksum + uploadMpuWithChecksum(); + } + + @AfterAll + static void tearDown() { + deleteBucketAndAllContents(BUCKET); + if (multipartClient != null) { + multipartClient.close(); + } + } + + @Test + void customTransformer_receivesFullObjectMetadata() { + AsyncResponseTransformer customTransformer = + new AsyncResponseTransformer() { + private CompletableFuture future; + private GetObjectResponse response; + + @Override + public CompletableFuture prepare() { + future = new CompletableFuture<>(); + return future; + } + + @Override + public void onResponse(GetObjectResponse r) { + this.response = r; + } + + @Override + public void onStream(SdkPublisher publisher) { + publisher.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } + @Override public void onNext(ByteBuffer b) { } + @Override public void onError(Throwable t) { future.completeExceptionally(t); } + @Override public void onComplete() { + future.complete( + "contentLength=" + response.contentLength() + + "|contentRange=" + response.contentRange()); + } + }); + } + + @Override + public void exceptionOccurred(Throwable error) { + future.completeExceptionally(error); + } + }; + + String result = multipartClient.getObject( + GetObjectRequest.builder().bucket(BUCKET).key(LARGE_KEY).build(), + customTransformer).join(); + + assertThat(result).contains("contentLength=" + LARGE_OBJECT_SIZE); + assertThat(result).contains("contentRange=bytes 0-" + (LARGE_OBJECT_SIZE - 1) + "/" + LARGE_OBJECT_SIZE); + } + + @Test + void customTransformer_mpuWithChecksumMode_checksumNulled() { + AsyncResponseTransformer customTransformer = + new AsyncResponseTransformer() { + private CompletableFuture future; + private GetObjectResponse response; + + @Override public CompletableFuture prepare() { future = new CompletableFuture<>(); return future; } + @Override public void onResponse(GetObjectResponse r) { this.response = r; } + @Override public void onStream(SdkPublisher publisher) { + publisher.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } + @Override public void onNext(ByteBuffer b) { } + @Override public void onError(Throwable t) { future.completeExceptionally(t); } + @Override public void onComplete() { + future.complete("contentLength=" + response.contentLength() + + "|checksumType=" + response.checksumType() + + "|checksumCRC32=" + response.checksumCRC32()); + } + }); + } + @Override public void exceptionOccurred(Throwable error) { future.completeExceptionally(error); } + }; + + String result = multipartClient.getObject( + GetObjectRequest.builder().bucket(BUCKET).key(MPU_CHECKSUM_KEY) + .checksumMode(ChecksumMode.ENABLED).build(), + customTransformer).join(); + + long expectedSize = 2L * PART_SIZE; + assertThat(result).contains("contentLength=" + expectedSize); + assertThat(result).contains("checksumType=COMPOSITE"); + assertThat(result).contains("checksumCRC32=null"); + } + + private static void uploadMpuWithChecksum() { + S3Client syncClient = S3Client.builder() + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .region(DEFAULT_REGION).build(); + + CreateMultipartUploadResponse createResp = syncClient.createMultipartUpload(b -> b.bucket(BUCKET) + .key(MPU_CHECKSUM_KEY).checksumAlgorithm(ChecksumAlgorithm.CRC32)); + String uploadId = createResp.uploadId(); + List parts = new ArrayList<>(); + + for (int i = 1; i <= 2; i++) { + byte[] data = new byte[PART_SIZE]; + new Random(i).nextBytes(data); + final int partNum = i; + UploadPartResponse uploadResp = syncClient.uploadPart( + b -> b.bucket(BUCKET).key(MPU_CHECKSUM_KEY).uploadId(uploadId).partNumber(partNum) + .checksumAlgorithm(ChecksumAlgorithm.CRC32), + RequestBody.fromBytes(data)); + parts.add(CompletedPart.builder() + .partNumber(partNum).eTag(uploadResp.eTag()).checksumCRC32(uploadResp.checksumCRC32()).build()); + } + + syncClient.completeMultipartUpload(b -> b.bucket(BUCKET).key(MPU_CHECKSUM_KEY).uploadId(uploadId) + .multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())); + syncClient.close(); + } +} diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientFileDownloadIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientFileDownloadIntegrationTest.java index 9af2ff6b6617..df8c7aefff25 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientFileDownloadIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientFileDownloadIntegrationTest.java @@ -24,6 +24,7 @@ import java.security.MessageDigest; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -33,7 +34,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import software.amazon.awssdk.core.FileTransformerConfiguration; +import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.SdkRequest; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; @@ -43,6 +47,7 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3IntegrationTestBase; +import software.amazon.awssdk.services.s3.model.ChecksumMode; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.testutils.RandomTempFile; @@ -124,6 +129,70 @@ void download_emptyFile_shouldSucceed() throws Exception { assertThat(downloadedHash).isEqualTo(originalHash); } + @ParameterizedTest(name = "multipartDownload_{0}_hasCorrectFullObjectMetadata") + @MethodSource("transformerTypes") + void multipartDownload_hasCorrectFullObjectMetadata(String type) throws Exception { + if ("toFile".equals(type)) { + Path downloadPath = tmpPath().resolve("metadata-" + UUID.randomUUID() + ".dat"); + GetObjectResponse response = s3Client.getObject( + GetObjectRequest.builder().bucket(TEST_BUCKET).key(TEST_KEY).build(), + AsyncResponseTransformer.toFile(downloadPath)).join(); + + assertThat(response.contentLength()).isEqualTo((long) OBJ_SIZE); + assertThat(response.contentRange()).isEqualTo("bytes 0-" + (OBJ_SIZE - 1) + "/" + OBJ_SIZE); + assertThat(response.sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + Files.deleteIfExists(downloadPath); + } else { + String smallKey = "small-toBytes-" + UUID.randomUUID(); + int smallSize = 15 * MIB; + byte[] data = new byte[smallSize]; + new Random(42).nextBytes(data); + s3Client.putObject(r -> r.bucket(TEST_BUCKET).key(smallKey), + AsyncRequestBody.fromBytes(data)).join(); + + ResponseBytes response = s3Client.getObject( + GetObjectRequest.builder().bucket(TEST_BUCKET).key(smallKey).build(), + AsyncResponseTransformer.toBytes()).join(); + + assertThat(response.response().contentLength()).isEqualTo((long) smallSize); + assertThat(response.response().contentRange()).isEqualTo("bytes 0-" + (smallSize - 1) + "/" + smallSize); + assertThat(response.asByteArray().length).isEqualTo(smallSize); + assertThat(response.response().sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + s3Client.deleteObject(r -> r.bucket(TEST_BUCKET).key(smallKey)).join(); + } + } + + @Test + void multipartDownload_withRange_preservesPartialMetadata() throws Exception { + Path downloadPath = tmpPath().resolve("metadata-range-" + UUID.randomUUID() + ".dat"); + GetObjectResponse response = s3Client.getObject( + GetObjectRequest.builder().bucket(TEST_BUCKET).key(TEST_KEY) + .range("bytes=0-1048575").build(), + AsyncResponseTransformer.toFile(downloadPath)).join(); + + assertThat(response.contentLength()).isEqualTo(1048576L); + assertThat(response.contentRange()).contains("bytes 0-1048575/"); + assertThat(response.sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + Files.deleteIfExists(downloadPath); + } + + @Test + void multipartDownload_checksumModeEnabled_hasCorrectFullObjectMetadata() throws Exception { + Path downloadPath = tmpPath().resolve("metadata-checksum-" + UUID.randomUUID() + ".dat"); + GetObjectResponse response = s3Client.getObject( + GetObjectRequest.builder().bucket(TEST_BUCKET).key(TEST_KEY) + .checksumMode(ChecksumMode.ENABLED).build(), + AsyncResponseTransformer.toFile(downloadPath)).join(); + + assertThat(response.contentLength()).isEqualTo((long) OBJ_SIZE); + assertThat(response.contentRange()).isEqualTo("bytes 0-" + (OBJ_SIZE - 1) + "/" + OBJ_SIZE); + assertThat(response.sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + Files.deleteIfExists(downloadPath); + } + + static java.util.stream.Stream transformerTypes() { + return java.util.stream.Stream.of("toFile", "toBytes"); + } private Path tmpPath() { return Paths.get(JavaSystemSetting.TEMP_DIRECTORY.getStringValueOrThrow()); diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/presignedurl/AsyncPresignedUrlExtensionTestSuite.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/presignedurl/AsyncPresignedUrlExtensionTestSuite.java index 1ff737b52850..b5a3edbfd8f7 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/presignedurl/AsyncPresignedUrlExtensionTestSuite.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/presignedurl/AsyncPresignedUrlExtensionTestSuite.java @@ -24,7 +24,9 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -41,15 +43,21 @@ import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.metrics.MetricCollection; import software.amazon.awssdk.metrics.MetricPublisher; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3IntegrationTestBase; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; import software.amazon.awssdk.services.s3.model.ChecksumMode; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.presigner.S3Presigner; import software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest; import software.amazon.awssdk.services.s3.presignedurl.model.PresignedUrlDownloadRequest; @@ -70,8 +78,10 @@ public abstract class AsyncPresignedUrlExtensionTestSuite extends S3IntegrationT protected static String testGetObjectKey; protected static String testLargeObjectKey; + protected static String testMpuChecksumKey; protected static String testObjectContent; protected static byte[] testLargeObjectContent; + protected static byte[] testMpuObjectContent; protected static String expectedLargeObjectMd5; protected abstract S3AsyncClient createS3AsyncClient(); @@ -105,6 +115,7 @@ static void setUpTestSuite() throws Exception { .build(), AsyncRequestBody.fromBytes(testLargeObjectContent) ).join(); + uploadMpuObjectWithChecksum(); S3TestUtils.addCleanupTask(AsyncPresignedUrlExtensionTestSuite.class, () -> { s3.deleteObject(DeleteObjectRequest.builder() .bucket(testBucket) @@ -114,6 +125,10 @@ static void setUpTestSuite() throws Exception { .bucket(testBucket) .key(testLargeObjectKey) .build()); + s3.deleteObject(DeleteObjectRequest.builder() + .bucket(testBucket) + .key(testMpuChecksumKey) + .build()); deleteBucketAndAllContents(testBucket); }); } @@ -152,6 +167,8 @@ void getObject_withValidPresignedUrl_returnsContent(String testDescription, assertThat(response.asByteArray().length).isEqualTo(testLargeObjectContent.length); } assertThat(response.response()).isNotNull(); + assertThat(response.response().contentLength()).isEqualTo((long) testLargeObjectContent.length); + assertThat(response.response().sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); } } @@ -239,6 +256,8 @@ public void close() {} assertThat(response).isNotNull(); assertThat(downloadFile).exists(); assertThat(downloadFile.toFile().length()).isEqualTo(testLargeObjectContent.length); + assertThat(response.contentLength()).isEqualTo((long) testLargeObjectContent.length); + assertThat(response.sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); assertThat(collectedMetrics).isNotEmpty(); } } @@ -359,7 +378,123 @@ static Stream rangeTestData() { ); } + @Test + void getObject_withRangeRequest_preservesPartialMetadata() throws Exception { + PresignedUrlDownloadRequest request = createRequestForKey(testLargeObjectKey, "bytes=0-1048575"); + ResponseBytes response = + presignedUrlExtension.getObject(request, AsyncResponseTransformer.toBytes()) + .get(30, TimeUnit.SECONDS); + + assertThat(response.response().contentLength()).isEqualTo(1048576L); + assertThat(response.response().contentRange()).isNotNull(); + assertThat(response.response().sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + } + + @ParameterizedTest(name = "getObject_largeObject_{0}_hasCorrectFullObjectMetadata") + @MethodSource("transformerTypes") + void getObject_largeObject_hasCorrectFullObjectMetadata(String type) throws Exception { + PresignedUrlDownloadRequest request = createRequestForKey(testLargeObjectKey); + + if ("toFile".equals(type)) { + Path downloadFile = temporaryFolder.resolve("large-metadata-test-" + UUID.randomUUID() + ".bin"); + GetObjectResponse response = + presignedUrlExtension.getObject(request, downloadFile) + .get(60, TimeUnit.SECONDS); + + assertThat(response.contentLength()).isEqualTo((long) testLargeObjectContent.length); + assertThat(downloadFile.toFile().length()).isEqualTo(testLargeObjectContent.length); + assertThat(response.sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + } else { + ResponseBytes response = + presignedUrlExtension.getObject(request, AsyncResponseTransformer.toBytes()) + .get(60, TimeUnit.SECONDS); + + assertThat(response.asByteArray().length).isEqualTo(testLargeObjectContent.length); + assertThat(response.response().contentLength()).isEqualTo((long) testLargeObjectContent.length); + assertThat(response.response().sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + } + } + + static Stream transformerTypes() { + return Stream.of("toFile", "toBytes"); + } + + @ParameterizedTest(name = "getObject_mpuObject_{0}_hasCorrectMetadata") + @MethodSource("checksumModes") + void getObject_mpuObject_hasCorrectMetadata(String mode) throws Exception { + PresignedUrlDownloadRequest request; + if ("withChecksumMode".equals(mode)) { + PresignedGetObjectRequest presigned = presigner.presignGetObject(r -> r + .getObjectRequest(req -> req.bucket(testBucket).key(testMpuChecksumKey) + .checksumMode(ChecksumMode.ENABLED)) + .signatureDuration(Duration.ofMinutes(10))); + request = PresignedUrlDownloadRequest.builder().presignedUrl(presigned.url()).build(); + } else { + request = PresignedUrlDownloadRequest.builder() + .presignedUrl(createPresignedUrl(testMpuChecksumKey)) + .build(); + } + + ResponseBytes response = + presignedUrlExtension.getObject(request, AsyncResponseTransformer.toBytes()) + .get(60, TimeUnit.SECONDS); + + assertThat(response.asByteArray().length).isEqualTo(testMpuObjectContent.length); + assertThat(response.response().contentLength()).isEqualTo((long) testMpuObjectContent.length); + assertThat(response.response().sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + } + + static Stream checksumModes() { + return Stream.of("withChecksumMode", "withoutChecksumMode"); + } + + @Test + void getObject_mpuObjectWithRange_preservesPartialMetadata() throws Exception { + PresignedUrlDownloadRequest request = PresignedUrlDownloadRequest.builder() + .presignedUrl(createPresignedUrl(testMpuChecksumKey)) + .range("bytes=0-1048575") + .build(); + + ResponseBytes response = + presignedUrlExtension.getObject(request, AsyncResponseTransformer.toBytes()) + .get(30, TimeUnit.SECONDS); + + assertThat(response.response().contentLength()).isEqualTo(1048576L); + assertThat(response.response().contentRange()).contains("bytes 0-1048575/"); + assertThat(response.response().sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + } + // Helper methods + private static void uploadMpuObjectWithChecksum() { + testMpuChecksumKey = generateRandomObjectKey() + "-mpu-checksum"; + int partSize = 5 * 1024 * 1024; + int numParts = 2; + testMpuObjectContent = new byte[partSize * numParts]; + new Random(42).nextBytes(testMpuObjectContent); + + CreateMultipartUploadResponse createResp = + s3.createMultipartUpload(b -> b.bucket(testBucket).key(testMpuChecksumKey) + .checksumAlgorithm(ChecksumAlgorithm.CRC32)); + String uploadId = createResp.uploadId(); + List parts = new ArrayList<>(); + + for (int i = 0; i < numParts; i++) { + byte[] partData = Arrays.copyOfRange(testMpuObjectContent, i * partSize, (i + 1) * partSize); + final int partNum = i + 1; + UploadPartResponse uploadResp = s3.uploadPart( + b -> b.bucket(testBucket).key(testMpuChecksumKey).uploadId(uploadId).partNumber(partNum) + .checksumAlgorithm(ChecksumAlgorithm.CRC32), + RequestBody.fromBytes(partData)); + parts.add(CompletedPart.builder() + .partNumber(partNum).eTag(uploadResp.eTag()) + .checksumCRC32(uploadResp.checksumCRC32()).build()); + } + + s3.completeMultipartUpload(b -> b.bucket(testBucket).key(testMpuChecksumKey).uploadId(uploadId) + .multipartUpload(CompletedMultipartUpload.builder() + .parts(parts).build())); + } + private static String generateRandomObjectKey() { return "async-presigned-url-extension-test-" + UUID.randomUUID(); } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java index 099ec7f7fb78..8fa1bbceac51 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java @@ -46,15 +46,19 @@ public CompletableFuture downloadObject( logSinglePartMessage(getObjectRequest); return s3AsyncClient.getObject(getObjectRequest, asyncResponseTransformer); } + SplittingTransformerConfiguration splitConfig = SplittingTransformerConfiguration.builder() + .bufferSizeInBytes(bufferSizeInBytes) + .build(); AsyncResponseTransformer.SplitResult split = - asyncResponseTransformer.split(SplittingTransformerConfiguration.builder() - .bufferSizeInBytes(bufferSizeInBytes) - .build()); - if (!split.parallelSplitSupported()) { - return downloadPartsSerially(getObjectRequest, split); + asyncResponseTransformer.split(splitConfig); + + if (split.parallelSplitSupported()) { + return downloadPartsNonSerially(getObjectRequest, split, maxInFlightParts); } - return downloadPartsNonSerially(getObjectRequest, split, maxInFlightParts); + // Serial path: split with a response rewrite so the customer sees full-object metadata + split = MultipartDownloadUtils.splitWithResponseRewrite(asyncResponseTransformer, splitConfig); + return downloadPartsSerially(getObjectRequest, split); } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtils.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtils.java index c7e9885e34d3..f9dc07385092 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtils.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtils.java @@ -20,10 +20,16 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.function.UnaryOperator; import java.util.regex.Matcher; import java.util.regex.Pattern; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.SplittingTransformerConfiguration; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.AsyncResponseTransformer.SplitResult; +import software.amazon.awssdk.services.s3.model.ChecksumType; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.S3Request; @SdkInternalApi @@ -126,4 +132,44 @@ public static long calculateTotalParts(long contentLength, long partSize) { } + /** + * Rewrites a first-part response to represent the full object. + * + * @param firstPartResponse the GetObjectResponse from the first part request + * @return full-object response with total content-length, full content-range, + * and checksum values nulled if checksum type is COMPOSITE + */ + public static GetObjectResponse toFullObjectResponse(GetObjectResponse firstPartResponse) { + String contentRange = firstPartResponse.contentRange(); + Optional totalOpt = parseContentRangeForTotalSize(contentRange); + if (!totalOpt.isPresent()) { + return firstPartResponse; + } + long totalLength = totalOpt.get(); + String fullRange = "bytes 0-" + (totalLength - 1) + "/" + totalLength; + + GetObjectResponse.Builder builder = firstPartResponse.toBuilder() + .contentLength(totalLength) + .contentRange(fullRange); + + if (firstPartResponse.checksumType() == ChecksumType.COMPOSITE) { + builder.sdkFields().stream() + .filter(f -> f.memberName().startsWith("Checksum") && !"ChecksumType".equals(f.memberName())) + .forEach(f -> f.set(builder, null)); + } + + return builder.build(); + } + + /** + * Splits the given transformer with a response mapper that applies {@link #toFullObjectResponse} + * to the first part's response before it reaches the customer's transformer. + */ + public static SplitResult splitWithResponseRewrite( + AsyncResponseTransformer transformer, + SplittingTransformerConfiguration splitConfig) { + UnaryOperator mapper = MultipartDownloadUtils::toFullObjectResponse; + return transformer.split(splitConfig, mapper); + } + } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriber.java index a0550d2a10f1..90fbd7324228 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriber.java @@ -283,7 +283,7 @@ private void sendNextRequest(AsyncResponseTransformer CompletableFuture downloadObject( doMultipartDownload(presignedRequest, asyncResponseTransformer) .whenComplete((result, error) -> { Throwable cause = error instanceof CompletionException ? error.getCause() : error; - if (cause instanceof EmptyObjectRangeNotSatisfiableException) { + // Parallel path wraps it as EmptyObjectRangeNotSatisfiableException; + // serial path (toBytes, custom transformers) surfaces raw S3Exception. + if (cause instanceof EmptyObjectRangeNotSatisfiableException + || isRangeNotSatisfiable(cause)) { log.debug(() -> "Received 416 on first request, falling back to non-range GET for empty object"); asyncPresignedUrlExtension.getObject(presignedRequest, asyncResponseTransformer) .whenComplete((r, e) -> { @@ -99,6 +102,8 @@ private CompletableFuture doMultipartDownload( if (split.parallelSplitSupported()) { return downloadPartsInParallel(presignedRequest, split); } + // Serial path: split with response mapper to convert part response to full-object response + split = MultipartDownloadUtils.splitWithResponseRewrite(asyncResponseTransformer, splittingConfig); return downloadPartsSerially(presignedRequest, split); } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtilsTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtilsTest.java index acc535b686eb..b88c6cd8a33d 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtilsTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtilsTest.java @@ -19,7 +19,10 @@ import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.MULTIPART_DOWNLOAD_RESUME_CONTEXT; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.http.SdkHttpResponse; +import software.amazon.awssdk.services.s3.model.ChecksumType; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; class MultipartDownloadUtilsTest { @@ -99,4 +102,77 @@ void calculateTotalParts_shouldCalculateCorrectly() { assertThat(MultipartDownloadUtils.calculateTotalParts(Long.MAX_VALUE, 2)) .isEqualTo((Long.MAX_VALUE / 2) + 1); } + + @Test + void toFullObjectResponse_setsContentLengthAndContentRange() { + GetObjectResponse response = GetObjectResponse.builder() + .contentLength(1024L) + .contentRange("bytes 0-1023/4096") + .eTag("\"abc123\"") + .build(); + response = setHttpResponse(response, 206, "Partial Content"); + + GetObjectResponse result = MultipartDownloadUtils.toFullObjectResponse(response); + + assertThat(result.contentLength()).isEqualTo(4096L); + assertThat(result.contentRange()).isEqualTo("bytes 0-4095/4096"); + assertThat(result.eTag()).isEqualTo("\"abc123\""); + } + + @Test + void toFullObjectResponse_noContentRange_returnsUnchanged() { + GetObjectResponse response = GetObjectResponse.builder() + .contentLength(1024L) + .build(); + response = setHttpResponse(response, 206, "Partial Content"); + + GetObjectResponse result = MultipartDownloadUtils.toFullObjectResponse(response); + + assertThat(result.contentLength()).isEqualTo(1024L); + } + + @Test + void toFullObjectResponse_compositeChecksum_nullsChecksumValues() { + GetObjectResponse response = GetObjectResponse.builder() + .contentLength(8388608L) + .contentRange("bytes 0-8388607/25165824") + .checksumType(ChecksumType.COMPOSITE) + .checksumCRC32("/g/pWA==") + .build(); + response = setHttpResponse(response, 206, "Partial Content"); + + GetObjectResponse result = MultipartDownloadUtils.toFullObjectResponse(response); + + assertThat(result.checksumType()).isEqualTo(ChecksumType.COMPOSITE); + assertThat(result.checksumCRC32()).isNull(); + assertThat(result.checksumCRC32C()).isNull(); + assertThat(result.checksumCRC64NVME()).isNull(); + assertThat(result.checksumSHA1()).isNull(); + assertThat(result.checksumSHA256()).isNull(); + assertThat(result.contentLength()).isEqualTo(25165824L); + } + + @Test + void toFullObjectResponse_fullObjectChecksum_preservesChecksumValues() { + GetObjectResponse response = GetObjectResponse.builder() + .contentLength(8388608L) + .contentRange("bytes 0-8388607/25165824") + .checksumType(ChecksumType.FULL_OBJECT) + .checksumCRC32("abc123") + .build(); + response = setHttpResponse(response, 206, "Partial Content"); + + GetObjectResponse result = MultipartDownloadUtils.toFullObjectResponse(response); + + assertThat(result.checksumType()).isEqualTo(ChecksumType.FULL_OBJECT); + assertThat(result.checksumCRC32()).isEqualTo("abc123"); + } + + private static GetObjectResponse setHttpResponse(GetObjectResponse response, int statusCode, String statusText) { + SdkHttpResponse httpResponse = SdkHttpResponse.builder() + .statusCode(statusCode) + .statusText(statusText) + .build(); + return (GetObjectResponse) response.toBuilder().sdkHttpResponse(httpResponse).build(); + } } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloaderSubscriberWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloaderSubscriberWiremockTest.java index c71534842bb9..850726f634b0 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloaderSubscriberWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloaderSubscriberWiremockTest.java @@ -31,6 +31,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -44,8 +45,11 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -71,6 +75,9 @@ public void setup(WireMockRuntimeInfo wiremock) throws MalformedURLException { .build(); s3AsyncClient = S3AsyncClient.builder() .endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort())) + .credentialsProvider(software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create( + software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create("accessKey", "secretKey"))) + .region(software.amazon.awssdk.regions.Region.US_EAST_1) .multipartEnabled(true) .multipartConfiguration(multipartConfig) .build(); @@ -339,6 +346,59 @@ void presignedUrlDownload_withRangeHeader_emptyObject_shouldThrow416(String tran .hasRootCauseInstanceOf(S3Exception.class); } + /** + * Verifies that custom serial transformers on the presigned URL path correctly trigger + * the empty-object 416 fallback. + */ + @Test + void presignedUrlDownload_emptyObject_customTransformer_fallbackWorks() { + // Range request → 416 (simulates empty object race) + stubFor(get(urlEqualTo(PRESIGNED_URL_PATH)) + .withHeader("Range", matching("bytes=.*")) + .willReturn(aResponse() + .withStatus(416) + .withBody("InvalidRange" + + "The requested range is not satisfiable"))); + + // Non-range fallback GET → 200 with empty body (the correct fallback for empty object) + stubFor(get(urlEqualTo(PRESIGNED_URL_PATH)) + .withHeader("Range", absent()) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Length", "0") + .withBody(""))); + + AsyncResponseTransformer customTransformer = + new AsyncResponseTransformer() { + private CompletableFuture future; + @Override + public CompletableFuture prepare() { + future = new CompletableFuture<>(); + return future; + } + @Override public void onResponse(GetObjectResponse r) { } + @Override public void onStream(SdkPublisher p) { + p.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } + @Override public void onNext(ByteBuffer b) { } + @Override public void onError(Throwable t) { future.completeExceptionally(t); } + @Override public void onComplete() { future.complete("done"); } + }); + } + @Override public void exceptionOccurred(Throwable e) { future.completeExceptionally(e); } + }; + + PresignedUrlDownloadRequest request = PresignedUrlDownloadRequest.builder() + .presignedUrl(presignedUrl) + .build(); + + // Should succeed: 416 triggers fallback → non-range GET returns 200 + String result = s3AsyncClient.presignedUrlExtension() + .getObject(request, customTransformer) + .join(); + assertThat(result).isEqualTo("done"); + } + @AfterEach void cleanup() { if (tempFile != null && Files.exists(tempFile)) { diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java index cd6a668f4c53..18b23517dfc5 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java @@ -41,6 +41,7 @@ import com.github.tomakehurst.wiremock.junit5.WireMockTest; import com.github.tomakehurst.wiremock.stubbing.Scenario; import java.net.URI; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -50,15 +51,19 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import org.junit.Test; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.SplittingTransformerConfiguration; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer; import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer; @@ -328,4 +333,55 @@ private CompletableFuture mock200Response(S3AsyncClient s3Client, int runNumb .overrideConfiguration(c -> c.putHeader("RunNum", runId)), transformerSupplier.transformer()); } + + @org.junit.jupiter.api.Test + void multipartDownload_customTransformer_hasFullObjectMetadata() { + int numParts = 3; + int partSize = 1024; + util.stubAllParts(BUCKET, KEY, numParts, partSize); + + AsyncResponseTransformer customTransformer = + new AsyncResponseTransformer() { + private CompletableFuture future; + private GetObjectResponse response; + + @Override + public CompletableFuture prepare() { + future = new CompletableFuture<>(); + return future; + } + + @Override + public void onResponse(GetObjectResponse r) { + this.response = r; + } + + @Override + public void onStream(SdkPublisher publisher) { + publisher.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } + @Override public void onNext(ByteBuffer b) { } + @Override public void onError(Throwable t) { future.completeExceptionally(t); } + @Override public void onComplete() { + future.complete("contentLength=" + response.contentLength() + + ",contentRange=" + response.contentRange()); + } + }); + } + + @Override + public void exceptionOccurred(Throwable error) { + future.completeExceptionally(error); + } + }; + + String result = multipartClient.getObject( + GetObjectRequest.builder().bucket(BUCKET).key(KEY).build(), + customTransformer + ).join(); + + long totalSize = (long) numParts * partSize; + assertThat(result).contains("contentLength=" + totalSize); + assertThat(result).contains("contentRange=bytes 0-" + (totalSize - 1) + "/" + totalSize); + } }