Add bounded multipart upload parallelism#228
Conversation
Add bounded multipart upload parallelism
|
✅ Action performedReview finished.
|
📝 WalkthroughWalkthrough
ChangesConcurrent multipart upload pipeline
Sequence Diagram(s)sequenceDiagram
participant App
participant PutObject as Client::PutObject
participant AsyncPool as std::async workers
participant Server as MinIO Server
App->>PutObject: PutObject(args{max_inflight_parts=N})
PutObject->>PutObject: Validate() — reject if max_inflight_parts==0
PutObject->>Server: InitiateMultipartUpload → upload_id
rect rgba(70, 130, 180, 0.5)
note over PutObject,AsyncPool: Concurrent path (N > 1)
loop for each part buffer (round-robin)
PutObject->>AsyncPool: std::async UploadPart(buffer, part_num)
AsyncPool->>Server: PUT ?uploadId&partNumber=K
Server-->>AsyncPool: ETag
end
PutObject->>AsyncPool: drain_one() per completed future
AsyncPool-->>PutObject: Part{etag, checksum}
end
PutObject->>Server: CompleteMultipartUpload(parts[])
Server-->>App: result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
tests/tests.cc (1)
711-721: ⚡ Quick winIncrease part count so bounded inflight behavior is actually exercised.
With ~10 MiB input and auto part sizing, this test usually produces only 2 parts. That validates correctness, but it doesn’t stress the producer/drain cap logic for
max_inflight_parts(especially2and4). Consider forcing a smaller explicit part size and larger payload sopart_count > max_inflight_parts.Suggested patch
- // 10MB data -> auto-calc ~5MiB parts = 2 parts, exercising multipart path - const size_t data_size = 10 * 1024 * 1024; + // Force many parts so inflight cap/drain path is exercised. + const size_t part_size = 5 * 1024 * 1024; // 5 MiB + const size_t data_size = (part_size * 5) + 7; // 6 parts total @@ - minio::s3::PutObjectArgs args(ss, static_cast<long>(data_size), 0); + minio::s3::PutObjectArgs args(ss, static_cast<long>(data_size), + static_cast<long>(part_size));🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/tests.cc` around lines 711 - 721, The test with 10MB data produces only 2 parts due to auto-calculated part sizing, which doesn't adequately stress the bounded inflight behavior for max_inflight_parts values of 2 and 4. Increase the data_size constant or specify a smaller explicit part size in the PutObjectArgs to ensure the part count exceeds the maximum inflight_values being tested (particularly for the values 2 and 4), so the producer/drain cap logic is properly exercised across all inflight test cases.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/client.cc`:
- Line 568: The user-controlled max_inflight_parts value is not properly
constrained before buffer allocation, which can cause memory exhaustion if a
huge value is passed. After the cast assignment of max_inflight_parts from
args.max_inflight_parts on line 568, add clamping logic to constrain
max_inflight_parts to both the known part_count value and a documented maximum
safe limit. This clamping must occur before any reserve() calls or buffer
allocations happen. Apply the same clamping logic to the related code block at
lines 730-744 that also uses max_inflight_parts.
- Line 774: The call to inflight_upload.future.get() at line 774 can throw
std::system_error or other exceptions from the async operation, but these
exceptions are not being caught and converted to error responses as required by
the function's error handling pattern. Wrap the future.get() call in a try-catch
block, catch std::system_error and other relevant exceptions, and convert them
into appropriate error responses instead of allowing them to propagate as
unhandled exceptions. Apply the same exception handling pattern to the similar
future.get() calls mentioned at lines 925-929 to ensure consistent error
handling throughout the function.
---
Nitpick comments:
In `@tests/tests.cc`:
- Around line 711-721: The test with 10MB data produces only 2 parts due to
auto-calculated part sizing, which doesn't adequately stress the bounded
inflight behavior for max_inflight_parts values of 2 and 4. Increase the
data_size constant or specify a smaller explicit part size in the PutObjectArgs
to ensure the part count exceeds the maximum inflight_values being tested
(particularly for the values 2 and 4), so the producer/drain cap logic is
properly exercised across all inflight test cases.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 4f7e0ee8-250d-468d-86aa-9748d3b82b27
📒 Files selected for processing (4)
include/miniocpp/args.hsrc/args.ccsrc/client.cctests/tests.cc
| long part_count = args.part_count; | ||
| double uploaded_bytes = 0; // for progress | ||
| double upload_speed = -1; // for progress | ||
| size_t max_inflight_parts = static_cast<size_t>(args.max_inflight_parts); |
There was a problem hiding this comment.
Clamp max_inflight_parts before allocating buffers.
max_inflight_parts is user-controlled and only validated against zero, but this branch allocates one full part buffer per requested slot. A small upload with an accidental huge value can exhaust memory before any useful work starts; clamp to the known part_count and to a documented maximum before reserve()/allocation.
🛡️ Proposed guard before the branch
size_t max_inflight_parts = static_cast<size_t>(args.max_inflight_parts);
+ if (part_count > 0 &&
+ max_inflight_parts > static_cast<size_t>(part_count)) {
+ max_inflight_parts = static_cast<size_t>(part_count);
+ }
+ if (max_inflight_parts > static_cast<size_t>(utils::kMaxMultipartCount)) {
+ max_inflight_parts = static_cast<size_t>(utils::kMaxMultipartCount);
+ }
if (max_inflight_parts <= 1) {Also applies to: 730-744
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/client.cc` at line 568, The user-controlled max_inflight_parts value is
not properly constrained before buffer allocation, which can cause memory
exhaustion if a huge value is passed. After the cast assignment of
max_inflight_parts from args.max_inflight_parts on line 568, add clamping logic
to constrain max_inflight_parts to both the known part_count value and a
documented maximum safe limit. This clamping must occur before any reserve()
calls or buffer allocations happen. Apply the same clamping logic to the related
code block at lines 730-744 that also uses max_inflight_parts.
| InflightUpload inflight_upload = std::move(inflight.front()); | ||
| inflight.pop_front(); | ||
|
|
||
| UploadPartResponse resp = inflight_upload.future.get(); |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Can C++ std::async(std::launch::async, ...) throw std::system_error when a new asynchronous task cannot be started, and does std::future::get rethrow exceptions from the asynchronous callable?
💡 Result:
Yes, std::async with the std::launch::async policy can throw std::system_error if the implementation is unable to start a new thread [1][2][3]. According to the C++ standard, when the launch policy is set to std::launch::async, this exception is thrown with an error condition of std::errc::resource_unavailable_try_again [1][2]. Yes, std::future::get rethrows exceptions that occurred within the asynchronous callable [4][5]. If the asynchronous function throws an exception, it is captured and stored in the future's shared state [1][3][5]. When std::future::get is called, it waits for the shared state to become ready and then rethrows that stored exception in the calling thread [4][6][5].
Citations:
- 1: https://en.cppreference.com/cpp/thread/async
- 2: https://www.eel.is/c++draft/futures.async
- 3: https://cplusplus.github.io/LWG/issue3582
- 4: https://en.cppreference.com/cpp/thread/future/get
- 5: https://app.studyraid.com/en/read/12311/397235/exception-propagation-in-async-code
- 6: https://legacy.cplusplus.com/reference/future/future/get/
🏁 Script executed:
wc -l src/client.ccRepository: minio/minio-cpp
Length of output: 78
🏁 Script executed:
sed -n '760,790p' src/client.ccRepository: minio/minio-cpp
Length of output: 1217
🏁 Script executed:
sed -n '915,945p' src/client.ccRepository: minio/minio-cpp
Length of output: 1046
🏁 Script executed:
sed -n '850,880p' src/client.ccRepository: minio/minio-cpp
Length of output: 1085
🏁 Script executed:
sed -n '940,990p' src/client.ccRepository: minio/minio-cpp
Length of output: 1714
🏁 Script executed:
sed -n '700,720p' src/client.ccRepository: minio/minio-cpp
Length of output: 990
🏁 Script executed:
sed -n '720,760p' src/client.ccRepository: minio/minio-cpp
Length of output: 1380
🏁 Script executed:
sed -n '625,660p' src/client.ccRepository: minio/minio-cpp
Length of output: 1244
🏁 Script executed:
sed -n '760,800p' src/client.ccRepository: minio/minio-cpp
Length of output: 1447
Convert async launch/get exceptions into PutObjectResponse errors.
std::async(std::launch::async) can throw std::system_error if unable to start a new thread, and future.get() rethrows exceptions from the callable. Unhandled exceptions at lines 774 and 925-929 would bypass the error-checking loop and escape as thrown exceptions instead of being converted to error responses, violating the function's error handling pattern.
🛈 Proposed exception-to-response handling
- UploadPartResponse resp = inflight_upload.future.get();
+ UploadPartResponse resp;
+ try {
+ resp = inflight_upload.future.get();
+ } catch (const std::exception& e) {
+ return error::make<PutObjectResponse>(
+ std::string("upload part worker failed: ") + e.what());
+ }
if (!resp) {
return resp;
}- inflight_upload.future = std::async(
- std::launch::async,
- [this, up_args = std::move(up_args)]() mutable -> UploadPartResponse {
- return UploadPart(up_args);
- });
+ try {
+ inflight_upload.future = std::async(
+ std::launch::async,
+ [this,
+ up_args = std::move(up_args)]() mutable -> UploadPartResponse {
+ return UploadPart(up_args);
+ });
+ } catch (const std::exception& e) {
+ return error::make<PutObjectResponse>(
+ std::string("unable to start upload part worker: ") + e.what());
+ }
inflight.push_back(std::move(inflight_upload));📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| UploadPartResponse resp = inflight_upload.future.get(); | |
| UploadPartResponse resp; | |
| try { | |
| resp = inflight_upload.future.get(); | |
| } catch (const std::exception& e) { | |
| return error::make<PutObjectResponse>( | |
| std::string("upload part worker failed: ") + e.what()); | |
| } | |
| if (!resp) { | |
| return resp; | |
| } |
| UploadPartResponse resp = inflight_upload.future.get(); | |
| try { | |
| inflight_upload.future = std::async( | |
| std::launch::async, | |
| [this, | |
| up_args = std::move(up_args)]() mutable -> UploadPartResponse { | |
| return UploadPart(up_args); | |
| }); | |
| } catch (const std::exception& e) { | |
| return error::make<PutObjectResponse>( | |
| std::string("unable to start upload part worker: ") + e.what()); | |
| } | |
| inflight.push_back(std::move(inflight_upload)); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/client.cc` at line 774, The call to inflight_upload.future.get() at line
774 can throw std::system_error or other exceptions from the async operation,
but these exceptions are not being caught and converted to error responses as
required by the function's error handling pattern. Wrap the future.get() call in
a try-catch block, catch std::system_error and other relevant exceptions, and
convert them into appropriate error responses instead of allowing them to
propagate as unhandled exceptions. Apply the same exception handling pattern to
the similar future.get() calls mentioned at lines 925-929 to ensure consistent
error handling throughout the function.
fix: #216
Add bounded multipart upload parallelism
PutObjectnow supports concurrentUploadPartcalls via the newPutObjectArgs::max_inflight_partsfield (default1= sequential behavior preserved).Changes
include/miniocpp/args.hunsigned int max_inflight_parts = 1toPutObjectArgssrc/args.ccmax_inflight_parts > 0src/client.ccPutObjectinto sequential + parallel branchestests/tests.ccPutObjectWithInflight()— MD5 integrity testsrc/client.cchighlightsmax_inflight_parts <= 1): Original single-buffer loop, unchanged.max_inflight_parts > 1):AlignedBufferpool.std::asyncdispatchesUploadPart.std::deque+drain_onelambda.CompleteMultipartUploaddoesn't care about completion order.one_byteread-ahead indexing in the unknown-size path (buf[part_size]instead ofbuf[part_size + 1]).Testing
PutObjectWithInflight()generates 10MB of random data, computes the original MD5, uploads withmax_inflight_parts = 1, 2, 4, downloads each object, and verifies MD5 matches the original.Summary by CodeRabbit
Release Notes
New Features
max_inflight_partsconfiguration parameter to control concurrent multipart upload operations (minimum value: 1).Tests