feat(storage): Implement ObjectStoreStorage::S3#2257
feat(storage): Implement ObjectStoreStorage::S3#2257CTTY wants to merge 2 commits intoapache:mainfrom
Conversation
| if config.allow_anonymous { | ||
| builder = builder.with_skip_signature(true); | ||
| } | ||
|
|
There was a problem hiding this comment.
not sure if this is in scope of the pr, but it would be great if we could attach a custom tokio runtime handle to spawn io tasks, something along the lines of
if let Some(handle) = io_runtime {
builder = builder.with_http_connector(SpawnedReqwestConnector::new(handle));
}There was a problem hiding this comment.
We had related discussions within the community going on. Allowing a custom runtime is the correct direction. But not sure if storage is the correct place to inject runtime. We are thinking of allowing setting a runtime in the Catalog. wdyt?
There was a problem hiding this comment.
Attaching a runtime at a Catalog level works great, thanks!
dannycjones
left a comment
There was a problem hiding this comment.
nice!
I'm new to the Iceberg Rust codebase, so please excuse any naive feedback :)
| #[test] | ||
| fn test_parse_s3_url_bucket_only() { | ||
| let (scheme, bucket, relative) = parse_s3_url("s3://my-bucket/").unwrap(); | ||
| assert_eq!(scheme, "s3"); | ||
| assert_eq!(bucket, "my-bucket"); | ||
| assert_eq!(relative, ""); | ||
| } |
There was a problem hiding this comment.
A few edge cases may be interesting.
s3://my-bucket->Ok?s3://->Errs3://my.bucket/->Ok("s3", "my.bucket", "")- etc.
| async fn delete_prefix(&self, path: &str) -> Result<()> { | ||
| let (store, object_path) = self.get_store_and_path(path)?; | ||
| let prefix = if object_path.as_ref().ends_with('/') { | ||
| object_path | ||
| } else { | ||
| ObjectStorePath::from(format!("{}/", object_path.as_ref())) | ||
| }; | ||
|
|
||
| let mut list_stream = store.list(Some(&prefix)); | ||
| while let Some(entry) = list_stream.next().await { | ||
| let entry = entry.map_err(from_object_store_error)?; | ||
| store | ||
| .delete(&entry.location) | ||
| .await | ||
| .map_err(from_object_store_error)?; | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { | ||
| while let Some(path) = paths.next().await { | ||
| let (store, object_path) = self.get_store_and_path(&path)?; | ||
| store | ||
| .delete(&object_path) | ||
| .await | ||
| .map_err(from_object_store_error)?; | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
For both delete_prefix and delete_stream, why not use ObjectStore::delete_stream? If we do, it can automatically use S3's DeleteObjects API and batch up to 1000 objects at a time? Or other optimizations available for other object storage as we add it.
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { |
There was a problem hiding this comment.
Are there any integration tests we should be extending to test the S3 object store implementation end-to-end? i.e. I can create a file, write some bytes, and then see that file in S3.
Which issue does this PR close?
What changes are included in this PR?
Are these changes tested?