feat(storage): implement transport, connector, worker#5956
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the appendable_object_writer module and supporting bidi_write infrastructure to enable bidirectional streaming writes for Google Cloud Storage. Key feedback includes avoiding .expect() on parameter conversions to prevent panics, simplifying complex string-folding logic, removing an unnecessary clone of the request object, and ensuring the background worker fails loudly with an explicit error if the stream closes unexpectedly while flushes are pending.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5956 +/- ##
==========================================
+ Coverage 97.72% 97.73% +0.01%
==========================================
Files 242 246 +4
Lines 60882 61921 +1039
==========================================
+ Hits 59495 60519 +1024
- Misses 1387 1402 +15 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
joshuatants
left a comment
There was a problem hiding this comment.
We're gone through this extensively offline.
f3e8345 to
4c4b7be
Compare
4c4b7be to
ce34c25
Compare
1713d73 to
9cca986
Compare
coryan
left a comment
There was a problem hiding this comment.
This is still too large. Aim for at most 200 lines of new code, which probably means 400 to 600 lines per PR. 1600 lines is simply too large.
| use std::sync::{Arc, Mutex}; | ||
| use std::time::Duration; | ||
| use tokio::sync::mpsc::Sender; | ||
| /// Represents a bidirectional streaming connection. |
There was a problem hiding this comment.
nit: blank line after the use declarations.
| let resource = match req.spec.resource { | ||
| Some(r) => { | ||
| let object: crate::google::storage::v2::Object = gaxi::prost::ToProto::to_proto(r) | ||
| .map_err(|e: gaxi::prost::ConvertError| Error::io(e.to_string()))?; |
There was a problem hiding this comment.
This is not an I/O error, this is a deserialization error, so you want:
| .map_err(|e: gaxi::prost::ConvertError| Error::io(e.to_string()))?; | |
| .map_err(Error::deser)?; |
Please don't use e.to_string() when mapping errors, we want to preserve the error types, not just the error messages.
| }; | ||
| self.params = req | ||
| .params | ||
| .map(|p| p.to_proto().map_err(|e| Error::io(e.to_string()))) |
There was a problem hiding this comment.
Here too:
| .map(|p| p.to_proto().map_err(|e| Error::io(e.to_string()))) | |
| .map(|p| p.to_proto().map_err(|Error::ser) |
|
|
||
| //! Internal traits and types for Appendable Object Write (Bidi Write). | ||
|
|
||
| #![allow(dead_code)] |
There was a problem hiding this comment.
Is there any way to have a smaller scope for this?
| match self.handle_response(m) { | ||
| // Successful end of stream, return without error. | ||
| None => break None, | ||
| // An unrecoverable in the stream or its data, return |
There was a problem hiding this comment.
| // An unrecoverable in the stream or its data, return | |
| // An unrecoverable error in the stream or its data, return |
| use tokio::sync::oneshot; | ||
|
|
||
| type WriteResult<T> = std::result::Result<T, WriteError>; | ||
| type LoopResult<T> = std::result::Result<T, Arc<crate::Error>>; |
There was a problem hiding this comment.
Why is this an Arc<crate::Error> ? That is only needed if more than one caller needs to receive the same error, which happens in bidi streaming reads, but it is not clear it could happen on bidi writes. I would expect there is only one writer at a time?
Ok. I am going to split this PR into smaller PRs. I'll update once done. Thank you and sorry about this! |
This PR comes after PR #5932.