diff --git a/ci/resources/stemcell-version-bump/go.mod b/ci/resources/stemcell-version-bump/go.mod index 90fe0e21..de7fd366 100644 --- a/ci/resources/stemcell-version-bump/go.mod +++ b/ci/resources/stemcell-version-bump/go.mod @@ -3,7 +3,7 @@ module stemcell-version-bump go 1.25.0 require ( - cloud.google.com/go/storage v1.60.0 + cloud.google.com/go/storage v1.61.3 github.com/stretchr/testify v1.11.1 google.golang.org/api v0.272.0 ) diff --git a/ci/resources/stemcell-version-bump/go.sum b/ci/resources/stemcell-version-bump/go.sum index b46fff57..7c3a7d7f 100644 --- a/ci/resources/stemcell-version-bump/go.sum +++ b/ci/resources/stemcell-version-bump/go.sum @@ -16,8 +16,8 @@ cloud.google.com/go/longrunning v0.8.0 h1:LiKK77J3bx5gDLi4SMViHixjD2ohlkwBi+mKA7 cloud.google.com/go/longrunning v0.8.0/go.mod h1:UmErU2Onzi+fKDg2gR7dusz11Pe26aknR4kHmJJqIfk= cloud.google.com/go/monitoring v1.24.3 h1:dde+gMNc0UhPZD1Azu6at2e79bfdztVDS5lvhOdsgaE= cloud.google.com/go/monitoring v1.24.3/go.mod h1:nYP6W0tm3N9H/bOw8am7t62YTzZY+zUeQ+Bi6+2eonI= -cloud.google.com/go/storage v1.60.0 h1:oBfZrSOCimggVNz9Y/bXY35uUcts7OViubeddTTVzQ8= -cloud.google.com/go/storage v1.60.0/go.mod h1:q+5196hXfejkctrnx+VYU8RKQr/L3c0cBIlrjmiAKE0= +cloud.google.com/go/storage v1.61.3 h1:VS//ZfBuPGDvakfD9xyPW1RGF1Vy3BWUoVZXgW1KMOg= +cloud.google.com/go/storage v1.61.3/go.mod h1:JtqK8BBB7TWv0HVGHubtUdzYYrakOQIsMLffZ2Z/HWk= cloud.google.com/go/trace v1.11.7 h1:kDNDX8JkaAG3R2nq1lIdkb7FCSi1rCmsEtKVsty7p+U= cloud.google.com/go/trace v1.11.7/go.mod h1:TNn9d5V3fQVf6s4SCveVMIBS2LJUqo73GACmq/Tky0s= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 h1:sBEjpZlNHzK1voKq9695PJSX2o5NEXl7/OL3coiIY0c= @@ -89,8 +89,8 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6h go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q= go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= -go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.39.0 h1:5gn2urDL/FBnK8OkCfD1j3/ER79rUuTYmCvlXBKeYL8= -go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.39.0/go.mod h1:0fBG6ZJxhqByfFZDwSwpZGzJU671HkwpWaNe2t4VUPI= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.40.0 h1:ZrPRak/kS4xI3AVXy8F7pipuDXmDsrO8Lg+yQjBLjw0= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.40.0/go.mod h1:3y6kQCWztq6hyW8Z9YxQDDm0Je9AJoFar2G0yDcmhRk= go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md index c6621c63..90c34066 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md @@ -1,6 +1,35 @@ # Changes +## [1.61.3](https://github.com/googleapis/google-cloud-go/releases/tag/storage%2Fv1.61.3) (2026-03-13) + +### Documentation + +* Fix godoc formatting (#14169) ([428b228](https://github.com/googleapis/google-cloud-go/commit/428b228814dc78b1b92c83868e1a58bb32ea71f0)) + +## [1.61.2](https://github.com/googleapis/google-cloud-go/releases/tag/storage%2Fv1.61.2) (2026-03-12) + +### Bug Fixes + +* fix dependency version for auth library (#14156) ([cb354ba](https://github.com/googleapis/google-cloud-go/commit/cb354ba5691eade26d14ace3c04d79f3363c0526)) + +## [1.61.1](https://github.com/googleapis/google-cloud-go/releases/tag/storage%2Fv1.61.1) (2026-03-11) + +## [1.61.0](https://github.com/googleapis/google-cloud-go/releases/tag/storage%2Fv1.61.0) (2026-03-10) + +### Features + +* add a DeleteFolderRecursive API definition ([6f31019](https://github.com/googleapis/google-cloud-go/commit/6f310199e136b133bb4fadaa353e264e809db6d7)) +* add bucket encryption enforcement configuration (#13874) ([245c8d7](https://github.com/googleapis/google-cloud-go/commit/245c8d7638756f3ec7c55b32b8a0f924814e8547)) +* add multistream options to MRD (#13758) ([4557675](https://github.com/googleapis/google-cloud-go/commit/4557675e058e042c614a46b55c1b2346d378204b)) +* add multistream support to MRD (#13792) ([ffa7268](https://github.com/googleapis/google-cloud-go/commit/ffa7268c6195f11f404b0c976b3c7c836df26294)) + +### Bug Fixes + +* Fix TM download dir corner case (#14142) ([87cdcc9](https://github.com/googleapis/google-cloud-go/commit/87cdcc9f756881f90337d222fe3707234d1a2c71)) +* Omit auto checksum in final request when MD5 is given (#14024) ([d404777](https://github.com/googleapis/google-cloud-go/commit/d40477749f0d54c88dd92fd992c9401732ef8d50)) +* optimize gRPC writer with zero-copy and lazy allocation (#13481) ([df64147](https://github.com/googleapis/google-cloud-go/commit/df64147605e961803c7ea839bc080ffd1b814ac9)) + ## [1.60.0](https://github.com/googleapis/google-cloud-go/releases/tag/storage%2Fv1.60.0) (2026-02-10) ### Features diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/acl.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/acl.go index a894db60..7447725a 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/acl.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/acl.go @@ -34,8 +34,8 @@ const ( // They are sometimes referred to as grantees. // // It could be in the form of: -// "user-", "user-", "group-", "group-", -// "domain-" and "project-team-". +// "user-{userId}", "user-{email}", "group-{groupId}", "group-{email}", +// "domain-{domain}" and "project-team-{projectId}". // // Or one of the predefined constants: AllUsers, AllAuthenticatedUsers. type ACLEntity string diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/bucket.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/bucket.go index 509d8693..947932f7 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/bucket.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/bucket.go @@ -467,6 +467,21 @@ type BucketAttrs struct { // The encryption configuration used by default for newly inserted objects. Encryption *BucketEncryption + // GoogleManagedEncryptionEnforcementConfig specifies the enforcement config + // for Google Managed Encryption. Pass NotRestricted in Restriction mode + // to unset the configuration. + GoogleManagedEncryptionEnforcementConfig *EncryptionEnforcementConfig + + // CustomerManagedEncryptionEnforcementConfig specifies the enforcement config + // for Customer Managed Encryption. Pass NotRestricted in Restriction mode + // to unset the configuration. + CustomerManagedEncryptionEnforcementConfig *EncryptionEnforcementConfig + + // CustomerSuppliedEncryptionEnforcementConfig specifies the enforcement config + // for Customer Supplied Encryption. Pass NotRestricted in Restriction mode + // to unset the configuration. + CustomerSuppliedEncryptionEnforcementConfig *EncryptionEnforcementConfig + // The logging configuration. Logging *BucketLogging @@ -842,23 +857,26 @@ func newBucket(b *raw.Bucket) (*BucketAttrs, error) { } return &BucketAttrs{ - Name: b.Name, - Location: b.Location, - MetaGeneration: b.Metageneration, - DefaultEventBasedHold: b.DefaultEventBasedHold, - StorageClass: b.StorageClass, - Created: convertTime(b.TimeCreated), - Updated: convertTime(b.Updated), - VersioningEnabled: b.Versioning != nil && b.Versioning.Enabled, - ACL: toBucketACLRules(b.Acl), - DefaultObjectACL: toObjectACLRules(b.DefaultObjectAcl), - Labels: b.Labels, - RequesterPays: b.Billing != nil && b.Billing.RequesterPays, - Lifecycle: toLifecycle(b.Lifecycle), - RetentionPolicy: rp, - ObjectRetentionMode: toBucketObjectRetention(b.ObjectRetention), - CORS: toCORS(b.Cors), - Encryption: toBucketEncryption(b.Encryption), + Name: b.Name, + Location: b.Location, + MetaGeneration: b.Metageneration, + DefaultEventBasedHold: b.DefaultEventBasedHold, + StorageClass: b.StorageClass, + Created: convertTime(b.TimeCreated), + Updated: convertTime(b.Updated), + VersioningEnabled: b.Versioning != nil && b.Versioning.Enabled, + ACL: toBucketACLRules(b.Acl), + DefaultObjectACL: toObjectACLRules(b.DefaultObjectAcl), + Labels: b.Labels, + RequesterPays: b.Billing != nil && b.Billing.RequesterPays, + Lifecycle: toLifecycle(b.Lifecycle), + RetentionPolicy: rp, + ObjectRetentionMode: toBucketObjectRetention(b.ObjectRetention), + CORS: toCORS(b.Cors), + Encryption: toBucketEncryption(b.Encryption), + GoogleManagedEncryptionEnforcementConfig: toGoogleManagedEncryptionEnforcementConfig(b.Encryption), + CustomerManagedEncryptionEnforcementConfig: toCustomerManagedEncryptionEnforcementConfig(b.Encryption), + CustomerSuppliedEncryptionEnforcementConfig: toCustomerSuppliedEncryptionEnforcementConfig(b.Encryption), Logging: toBucketLogging(b.Logging), Website: toBucketWebsite(b.Website), BucketPolicyOnly: toBucketPolicyOnly(b.IamConfiguration), @@ -881,22 +899,25 @@ func newBucketFromProto(b *storagepb.Bucket) *BucketAttrs { return nil } return &BucketAttrs{ - Name: parseBucketName(b.GetName()), - Location: b.GetLocation(), - MetaGeneration: b.GetMetageneration(), - DefaultEventBasedHold: b.GetDefaultEventBasedHold(), - StorageClass: b.GetStorageClass(), - Created: b.GetCreateTime().AsTime(), - Updated: b.GetUpdateTime().AsTime(), - VersioningEnabled: b.GetVersioning().GetEnabled(), - ACL: toBucketACLRulesFromProto(b.GetAcl()), - DefaultObjectACL: toObjectACLRulesFromProto(b.GetDefaultObjectAcl()), - Labels: b.GetLabels(), - RequesterPays: b.GetBilling().GetRequesterPays(), - Lifecycle: toLifecycleFromProto(b.GetLifecycle()), - RetentionPolicy: toRetentionPolicyFromProto(b.GetRetentionPolicy()), - CORS: toCORSFromProto(b.GetCors()), - Encryption: toBucketEncryptionFromProto(b.GetEncryption()), + Name: parseBucketName(b.GetName()), + Location: b.GetLocation(), + MetaGeneration: b.GetMetageneration(), + DefaultEventBasedHold: b.GetDefaultEventBasedHold(), + StorageClass: b.GetStorageClass(), + Created: b.GetCreateTime().AsTime(), + Updated: b.GetUpdateTime().AsTime(), + VersioningEnabled: b.GetVersioning().GetEnabled(), + ACL: toBucketACLRulesFromProto(b.GetAcl()), + DefaultObjectACL: toObjectACLRulesFromProto(b.GetDefaultObjectAcl()), + Labels: b.GetLabels(), + RequesterPays: b.GetBilling().GetRequesterPays(), + Lifecycle: toLifecycleFromProto(b.GetLifecycle()), + RetentionPolicy: toRetentionPolicyFromProto(b.GetRetentionPolicy()), + CORS: toCORSFromProto(b.GetCors()), + Encryption: toBucketEncryptionFromProto(b.GetEncryption()), + GoogleManagedEncryptionEnforcementConfig: toGoogleManagedEncryptionEnforcementConfigFromProto(b.GetEncryption()), + CustomerManagedEncryptionEnforcementConfig: toCustomerManagedEncryptionEnforcementConfigFromProto(b.GetEncryption()), + CustomerSuppliedEncryptionEnforcementConfig: toCustomerSuppliedEncryptionEnforcementConfigFromProto(b.GetEncryption()), Logging: toBucketLoggingFromProto(b.GetLogging()), Website: toBucketWebsiteFromProto(b.GetWebsite()), BucketPolicyOnly: toBucketPolicyOnlyFromProto(b.GetIamConfig()), @@ -958,7 +979,7 @@ func (b *BucketAttrs) toRawBucket() *raw.Bucket { Lifecycle: toRawLifecycle(b.Lifecycle), RetentionPolicy: b.RetentionPolicy.toRawRetentionPolicy(), Cors: toRawCORS(b.CORS), - Encryption: b.Encryption.toRawBucketEncryption(), + Encryption: b.toRawBucketEncryption(), Logging: b.Logging.toRawBucketLogging(), Website: b.Website.toRawBucketWebsite(), IamConfiguration: bktIAM, @@ -1020,7 +1041,7 @@ func (b *BucketAttrs) toProtoBucket() *storagepb.Bucket { Lifecycle: toProtoLifecycle(b.Lifecycle), RetentionPolicy: b.RetentionPolicy.toProtoRetentionPolicy(), Cors: toProtoCORS(b.CORS), - Encryption: b.Encryption.toProtoBucketEncryption(), + Encryption: b.toProtoBucketEncryption(), Logging: b.Logging.toProtoBucketLogging(), Website: b.Website.toProtoBucketWebsite(), IamConfig: bktIAM, @@ -1104,7 +1125,7 @@ func (ua *BucketAttrsToUpdate) toProtoBucket() *storagepb.Bucket { Lifecycle: toProtoLifecycle(lifecycle), RetentionPolicy: ua.RetentionPolicy.toProtoRetentionPolicy(), Cors: toProtoCORS(ua.CORS), - Encryption: ua.Encryption.toProtoBucketEncryption(), + Encryption: ua.toProtoBucketEncryption(), Logging: ua.Logging.toProtoBucketLogging(), Website: ua.Website.toProtoBucketWebsite(), IamConfig: bktIAM, @@ -1146,6 +1167,27 @@ type BucketEncryption struct { DefaultKMSKeyName string } +// EncryptionEnforcementConfig specifies the enforcement config for encryption. +type EncryptionEnforcementConfig struct { + // RestrictionMode specifies the restriction mode for encryption. + // Valid values are "NotRestricted" and "FullyRestricted". + RestrictionMode RestrictionMode + + // EffectiveTime is the time from which the policy was enforced and + // effective. This field is read-only. + EffectiveTime time.Time +} + +// RestrictionMode is the restriction mode for encryption. +// It should be either "NotRestricted" or "FullyRestricted". +type RestrictionMode string + +// RestrictionMode constants. +const ( + NotRestricted RestrictionMode = "NotRestricted" + FullyRestricted RestrictionMode = "FullyRestricted" +) + // BucketAttrsToUpdate define the attributes to update during an Update call. type BucketAttrsToUpdate struct { // If set, updates whether the bucket uses versioning. @@ -1204,6 +1246,21 @@ type BucketAttrsToUpdate struct { // configuration. Encryption *BucketEncryption + // GoogleManagedEncryptionEnforcementConfig specifies the enforcement config + // for Google Managed Encryption. Pass NotRestricted in Restriction mode + // to unset the configuration. + GoogleManagedEncryptionEnforcementConfig *EncryptionEnforcementConfig + + // CustomerManagedEncryptionEnforcementConfig specifies the enforcement config + // for Customer Managed Encryption. Pass NotRestricted in Restriction mode + // to unset the configuration. + CustomerManagedEncryptionEnforcementConfig *EncryptionEnforcementConfig + + // CustomerSuppliedEncryptionEnforcementConfig specifies the enforcement config + // for Customer Supplied Encryption. Pass NotRestricted in Restriction mode + // to unset the configuration. + CustomerSuppliedEncryptionEnforcementConfig *EncryptionEnforcementConfig + // If set, replaces the lifecycle configuration of the bucket. Lifecycle *Lifecycle @@ -1322,14 +1379,7 @@ func (ua *BucketAttrsToUpdate) toRawBucket() *raw.Bucket { } rb.IamConfiguration.PublicAccessPrevention = ua.PublicAccessPrevention.String() } - if ua.Encryption != nil { - if ua.Encryption.DefaultKMSKeyName == "" { - rb.NullFields = append(rb.NullFields, "Encryption") - rb.Encryption = nil - } else { - rb.Encryption = ua.Encryption.toRawBucketEncryption() - } - } + rb.Encryption = ua.toRawBucketEncryption() if ua.Lifecycle != nil { rb.Lifecycle = toRawLifecycle(*ua.Lifecycle) rb.ForceSendFields = append(rb.ForceSendFields, "Lifecycle") @@ -1838,22 +1888,76 @@ func toLifecycleFromProto(rl *storagepb.Bucket_Lifecycle) Lifecycle { return l } -func (e *BucketEncryption) toRawBucketEncryption() *raw.BucketEncryption { - if e == nil { +func (b *BucketAttrs) toRawBucketEncryption() *raw.BucketEncryption { + return toRawBucketEncryption(b.Encryption, b.GoogleManagedEncryptionEnforcementConfig, b.CustomerManagedEncryptionEnforcementConfig, b.CustomerSuppliedEncryptionEnforcementConfig) +} + +func (b *BucketAttrsToUpdate) toRawBucketEncryption() *raw.BucketEncryption { + return toRawBucketEncryption(b.Encryption, b.GoogleManagedEncryptionEnforcementConfig, b.CustomerManagedEncryptionEnforcementConfig, b.CustomerSuppliedEncryptionEnforcementConfig) +} + +func toRawBucketEncryption(e *BucketEncryption, gme, cme, cse *EncryptionEnforcementConfig) *raw.BucketEncryption { + if e == nil && gme == nil && cme == nil && cse == nil { return nil } - return &raw.BucketEncryption{ - DefaultKmsKeyName: e.DefaultKMSKeyName, + ret := &raw.BucketEncryption{} + if e != nil { + if e.DefaultKMSKeyName != "" { + ret.DefaultKmsKeyName = e.DefaultKMSKeyName + } else { + ret.NullFields = append(ret.NullFields, "DefaultKmsKeyName") + } + } + if gme != nil { + ret.GoogleManagedEncryptionEnforcementConfig = &raw.BucketEncryptionGoogleManagedEncryptionEnforcementConfig{ + RestrictionMode: string(gme.RestrictionMode), + } + } + if cme != nil { + ret.CustomerManagedEncryptionEnforcementConfig = &raw.BucketEncryptionCustomerManagedEncryptionEnforcementConfig{ + RestrictionMode: string(cme.RestrictionMode), + } + } + if cse != nil { + ret.CustomerSuppliedEncryptionEnforcementConfig = &raw.BucketEncryptionCustomerSuppliedEncryptionEnforcementConfig{ + RestrictionMode: string(cse.RestrictionMode), + } } + return ret } -func (e *BucketEncryption) toProtoBucketEncryption() *storagepb.Bucket_Encryption { - if e == nil { +func (b *BucketAttrs) toProtoBucketEncryption() *storagepb.Bucket_Encryption { + return toProtoEncryption(b.Encryption, b.GoogleManagedEncryptionEnforcementConfig, b.CustomerManagedEncryptionEnforcementConfig, b.CustomerSuppliedEncryptionEnforcementConfig) +} + +func (b *BucketAttrsToUpdate) toProtoBucketEncryption() *storagepb.Bucket_Encryption { + return toProtoEncryption(b.Encryption, b.GoogleManagedEncryptionEnforcementConfig, b.CustomerManagedEncryptionEnforcementConfig, b.CustomerSuppliedEncryptionEnforcementConfig) +} + +func toProtoEncryption(e *BucketEncryption, gme, cme, cse *EncryptionEnforcementConfig) *storagepb.Bucket_Encryption { + if e == nil && gme == nil && cme == nil && cse == nil { return nil } - return &storagepb.Bucket_Encryption{ - DefaultKmsKey: e.DefaultKMSKeyName, + ret := &storagepb.Bucket_Encryption{} + if e != nil { + ret.DefaultKmsKey = e.DefaultKMSKeyName + } + if gme != nil { + ret.GoogleManagedEncryptionEnforcementConfig = &storagepb.Bucket_Encryption_GoogleManagedEncryptionEnforcementConfig{ + RestrictionMode: toProtoRestrictionMode(gme.RestrictionMode), + } + } + if cme != nil { + ret.CustomerManagedEncryptionEnforcementConfig = &storagepb.Bucket_Encryption_CustomerManagedEncryptionEnforcementConfig{ + RestrictionMode: toProtoRestrictionMode(cme.RestrictionMode), + } + } + if cse != nil { + ret.CustomerSuppliedEncryptionEnforcementConfig = &storagepb.Bucket_Encryption_CustomerSuppliedEncryptionEnforcementConfig{ + RestrictionMode: toProtoRestrictionMode(cse.RestrictionMode), + } } + return ret } func toBucketEncryption(e *raw.BucketEncryption) *BucketEncryption { @@ -1863,6 +1967,36 @@ func toBucketEncryption(e *raw.BucketEncryption) *BucketEncryption { return &BucketEncryption{DefaultKMSKeyName: e.DefaultKmsKeyName} } +func toGoogleManagedEncryptionEnforcementConfig(e *raw.BucketEncryption) *EncryptionEnforcementConfig { + if e == nil || e.GoogleManagedEncryptionEnforcementConfig == nil { + return nil + } + return &EncryptionEnforcementConfig{ + RestrictionMode: RestrictionMode(e.GoogleManagedEncryptionEnforcementConfig.RestrictionMode), + EffectiveTime: convertTime(e.GoogleManagedEncryptionEnforcementConfig.EffectiveTime), + } +} + +func toCustomerManagedEncryptionEnforcementConfig(e *raw.BucketEncryption) *EncryptionEnforcementConfig { + if e == nil || e.CustomerManagedEncryptionEnforcementConfig == nil { + return nil + } + return &EncryptionEnforcementConfig{ + RestrictionMode: RestrictionMode(e.CustomerManagedEncryptionEnforcementConfig.RestrictionMode), + EffectiveTime: convertTime(e.CustomerManagedEncryptionEnforcementConfig.EffectiveTime), + } +} + +func toCustomerSuppliedEncryptionEnforcementConfig(e *raw.BucketEncryption) *EncryptionEnforcementConfig { + if e == nil || e.CustomerSuppliedEncryptionEnforcementConfig == nil { + return nil + } + return &EncryptionEnforcementConfig{ + RestrictionMode: RestrictionMode(e.CustomerSuppliedEncryptionEnforcementConfig.RestrictionMode), + EffectiveTime: convertTime(e.CustomerSuppliedEncryptionEnforcementConfig.EffectiveTime), + } +} + func toBucketEncryptionFromProto(e *storagepb.Bucket_Encryption) *BucketEncryption { if e == nil { return nil @@ -1870,6 +2004,46 @@ func toBucketEncryptionFromProto(e *storagepb.Bucket_Encryption) *BucketEncrypti return &BucketEncryption{DefaultKMSKeyName: e.GetDefaultKmsKey()} } +func toGoogleManagedEncryptionEnforcementConfigFromProto(e *storagepb.Bucket_Encryption) *EncryptionEnforcementConfig { + if e == nil { + return nil + } + x := e.GetGoogleManagedEncryptionEnforcementConfig() + if x == nil { + return nil + } + return &EncryptionEnforcementConfig{ + RestrictionMode: RestrictionMode(x.GetRestrictionMode()), + EffectiveTime: x.GetEffectiveTime().AsTime(), + } +} +func toCustomerManagedEncryptionEnforcementConfigFromProto(e *storagepb.Bucket_Encryption) *EncryptionEnforcementConfig { + if e == nil { + return nil + } + x := e.GetCustomerManagedEncryptionEnforcementConfig() + if x == nil { + return nil + } + return &EncryptionEnforcementConfig{ + RestrictionMode: RestrictionMode(x.GetRestrictionMode()), + EffectiveTime: x.GetEffectiveTime().AsTime(), + } +} +func toCustomerSuppliedEncryptionEnforcementConfigFromProto(e *storagepb.Bucket_Encryption) *EncryptionEnforcementConfig { + if e == nil { + return nil + } + x := e.GetCustomerSuppliedEncryptionEnforcementConfig() + if x == nil { + return nil + } + return &EncryptionEnforcementConfig{ + RestrictionMode: RestrictionMode(x.GetRestrictionMode()), + EffectiveTime: x.GetEffectiveTime().AsTime(), + } +} + func (b *BucketLogging) toRawBucketLogging() *raw.BucketLogging { if b == nil { return nil @@ -2430,6 +2604,14 @@ func protoDateToUTCTime(d *dpb.Date) time.Time { return protoDateToTime(d, time.UTC) } +func toProtoRestrictionMode(rm RestrictionMode) *string { + if rm == "" { + return nil + } + s := string(rm) + return &s +} + // protoDateToTime returns a new Time based on the google.type.Date and provided // *time.Location. // diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/client.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/client.go index 8af94b90..1eb06f8f 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/client.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/client.go @@ -303,6 +303,12 @@ type newMultiRangeDownloaderParams struct { gen int64 object string handle *ReadHandle + + // Multistream settings. + minConnections int + maxConnections int + targetPendingRanges int + targetPendingBytes int } type newRangeReaderParams struct { diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go index cf295b4c..752682f7 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go @@ -69,11 +69,11 @@ const ( // which only does a single read per stream. defaultReadID = 1 - forceDirectConnectivityEnforced = "ENFORCED" - forceDirectConnectivityOptedOut = "OPTED_OUT" - directConnectivityHeaderKey = "force_direct_connectivity" - requestParamsHeaderKey = "x-goog-request-params" - directPathEndpointPrefix = "google-c2p:///" + forceDirectConnectivityEnforced = "ENFORCED" + directConnectivityHeaderKey = "force_direct_connectivity" + directConnectivityDiagnosticHeaderKey = "direct_connectivity_diagnostic" + requestParamsHeaderKey = "x-goog-request-params" + directPathEndpointPrefix = "google-c2p:///" ) // defaultGRPCOptions returns a set of the default client options @@ -123,6 +123,7 @@ type grpcStorageClient struct { raw *gapic.Client settings *settings config *storageConfig + dpDiag string } func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) { @@ -177,6 +178,7 @@ func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (*grpcStor option.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(ui)), option.WithGRPCDialOption(grpc.WithChainStreamInterceptor(si)), ) + c.dpDiag = directPathDiagnostic(ctx, s.clientOption...) g, err := gapic.NewClient(ctx, s.clientOption...) if err != nil { return nil, err @@ -206,12 +208,9 @@ func (c *grpcStorageClient) routingInterceptors() (grpc.UnaryClientInterceptor, } func (c *grpcStorageClient) prepareDirectPathMetadata(ctx context.Context, target string) (context.Context, error) { - // Check if the connection target supports DirectPath. - isDirectPath := true - // Target should not be empty in a normal scenario, but treat empty target - // as DirectPath compatible for safety. - if target != "" && !strings.HasPrefix(target, directPathEndpointPrefix) { - isDirectPath = false + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + md = metadata.MD{} } // Determine the intended mode based on user configuration. @@ -220,19 +219,8 @@ func (c *grpcStorageClient) prepareDirectPathMetadata(ctx context.Context, targe value = forceDirectConnectivityEnforced } - // Downgrade based on connection status. - if !isDirectPath { - // Downgrade to OPTED_OUT for server-side monitoring. - value = forceDirectConnectivityOptedOut - } - dc := directConnectivityHeaderKey + "=" + value - md, ok := metadata.FromOutgoingContext(ctx) - if !ok { - md = metadata.MD{} - } - // Inject the header only if we have a value to set. if value != "" { if vals := md.Get(requestParamsHeaderKey); len(vals) > 0 { @@ -241,7 +229,17 @@ func (c *grpcStorageClient) prepareDirectPathMetadata(ctx context.Context, targe md.Set(requestParamsHeaderKey, dc) } } - + // Check if the connection target supports DirectPath. + // Target should not be empty in a normal scenario, but treat empty target + // as DirectPath incompatible. + if !strings.HasPrefix(target, directPathEndpointPrefix) { + reason := directConnectivityDiagnosticHeaderKey + "=" + c.dpDiag + if vals := md.Get(requestParamsHeaderKey); len(vals) > 0 { + md.Set(requestParamsHeaderKey, vals[0]+"&"+reason) + } else { + md.Set(requestParamsHeaderKey, reason) + } + } return metadata.NewOutgoingContext(ctx, md), nil } @@ -428,7 +426,16 @@ func (c *grpcStorageClient) UpdateBucket(ctx context.Context, bucket string, uat fieldMask.Paths = append(fieldMask.Paths, "iam_config") } if uattrs.Encryption != nil { - fieldMask.Paths = append(fieldMask.Paths, "encryption") + fieldMask.Paths = append(fieldMask.Paths, "encryption.default_kms_key") + } + if uattrs.GoogleManagedEncryptionEnforcementConfig != nil { + fieldMask.Paths = append(fieldMask.Paths, "encryption.google_managed_encryption_enforcement_config") + } + if uattrs.CustomerManagedEncryptionEnforcementConfig != nil { + fieldMask.Paths = append(fieldMask.Paths, "encryption.customer_managed_encryption_enforcement_config") + } + if uattrs.CustomerSuppliedEncryptionEnforcementConfig != nil { + fieldMask.Paths = append(fieldMask.Paths, "encryption.customer_supplied_encryption_enforcement_config") } if uattrs.Lifecycle != nil { fieldMask.Paths = append(fieldMask.Paths, "lifecycle") diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_dp_diag.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_dp_diag.go new file mode 100644 index 00000000..1ba4a3f0 --- /dev/null +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_dp_diag.go @@ -0,0 +1,126 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "os" + "strings" + + "cloud.google.com/go/compute/metadata" + "google.golang.org/api/option" + "google.golang.org/api/option/internaloption" + _ "google.golang.org/grpc/balancer/rls" + _ "google.golang.org/grpc/xds/googledirectpath" +) + +const ( + reasonCustomGRPCConn = "custom_grpc_conn" + reasonCustomGRPCConnPool = "custom_grpc_conn_pool" + reasonEndpointFetchError = "endpoint_fetch_error" + reasonXDSNotEnabled = "xds_not_enabled" + reasonOptionDisabled = "option_disabled" + reasonUnsupportedEndpoint = "unsupported_endpoint" + reasonEnvVarDisabled = "env_disabled" + reasonNotOnGCE = "not_on_gce" + reasonNoAuth = "no_auth" + reasonAPIKey = "api_key" + reasonTokenFetchError = "token_fetch_error" + reasonNotDefaultServiceAccount = "not_default_service_account" + reasonCustomHTTPClient = "custom_http_client" + reasonUndetermined = "undetermined" + reasonInternalError = "internal_error" + + directPathDisableEnvVar = "GOOGLE_CLOUD_DISABLE_DIRECT_PATH" + + defaultKey = "default" + serviceAccountTokenKey = "instance/service-accounts/default/token" +) + +// directPathDiagnostic evaluates the provided options and environment to determine +// why gRPC DirectPath (high-throughput VPC routing) is not being utilized. +func directPathDiagnostic(ctx context.Context, opts ...option.ClientOption) string { + if strings.EqualFold(os.Getenv(directPathDisableEnvVar), "true") { + return reasonEnvVarDisabled + } + + res, err := internaloption.NewUnsafeResolver(opts...) + if err != nil { + return reasonInternalError + } + + if !res.ResolvedEnableDirectPath() { + return reasonOptionDisabled + } + + endpoint, err := res.ResolvedGRPCEndpoint() + if err != nil { + return reasonEndpointFetchError + } + + if !isDirectPathCompatible(endpoint) { + return reasonUnsupportedEndpoint + } + + if !res.ResolvedEnableDirectPathXds() { + return reasonXDSNotEnabled + } + + if res.ResolvedGRPCConnIsCustom() { + return reasonCustomGRPCConn + } + + if res.ResolvedHTTPClientIsCustom() { + return reasonCustomHTTPClient + } + + if !metadata.OnGCE() { + return reasonNotOnGCE + } + + return authDiagnostic(res) +} + +func authDiagnostic(res *internaloption.UnsafeResolver) string { + if res.ResolvedWithoutAuthentication() { + return reasonNoAuth + } + if res.ResolvedWithAPIKeyIsCustom() { + return reasonAPIKey + } + + // Verify that a default service account is attached. + if _, err := metadata.Email(defaultKey); err != nil { + return reasonNotDefaultServiceAccount + } + + // Verify that a token can be fetched. + if _, err := metadata.Get(serviceAccountTokenKey); err != nil { + return reasonTokenFetchError + } + + return reasonUndetermined +} + +func isDirectPathCompatible(endpoint string) bool { + if endpoint == "" { + return false + } + // DirectPath requires no scheme or the dns:/// scheme specifically. + if strings.Contains(endpoint, "://") && !strings.HasPrefix(endpoint, "dns:///") { + return false + } + return true +} diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go index 1e3f4fd1..bdc2896d 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go @@ -33,12 +33,16 @@ import ( ) const ( - mrdCommandChannelSize = 1 - mrdResponseChannelSize = 100 + mrdCommandChannelSize = 1 + mrdResponseChannelSize = 100 + mrdSendChannelSize = 100 + mrdAddStreamsChannelSize = 100 // This should never be hit in practice, but is a safety valve to prevent // unbounded memory usage if the user is adding ranges faster than they // can be processed. mrdAddInternalQueueMaxSize = 50000 + defaultTargetPendingBytes = 1 << 30 // 1 GiB + defaultTargetPendingRanges = 500 ) // --- internalMultiRangeDownloader Interface --- @@ -55,6 +59,77 @@ type internalMultiRangeDownloader interface { getSpanCtx() context.Context } +// streamPickerStrategy is an interface which each stream picker must implement. +type streamPickerStrategy interface { + pick(streams map[int]*mrdStream) int +} + +// weightedPicker picks the stream with the minimum combined score of +// pending ranges and pending bytes, normalized by their respective targets. +type weightedPicker struct { + targetPendingRanges int + targetPendingBytes int +} + +func (p *weightedPicker) pick(streams map[int]*mrdStream) int { + minScore := -1.0 + returnID := -1 + + for id, stream := range streams { + if stream.reconnecting || stream.session == nil { + continue + } + // If the stream's request channel is full, skip it to avoid blocking the event loop. + // This ensures we only attempt a send if it can likely proceed immediately. + if len(stream.session.reqC) >= cap(stream.session.reqC) { + continue + } + + // Calculate normalized score. + // Score = (PendingRanges / TargetRanges) + (PendingBytes / TargetBytes) + // Lower score is better (least loaded). + score := float64(stream.totalRanges)/float64(p.targetPendingRanges) + + float64(stream.totalRangeBytes)/float64(p.targetPendingBytes) + + if returnID == -1 || score < minScore { + minScore = score + returnID = id + } + } + return returnID +} + +// mrdStream holds all the relevant information of a single +// bidi stream. +type mrdStream struct { + id int + pendingRanges map[int64]*rangeRequest + session *bidiReadStreamSession + reconnecting bool + atCapacity bool + totalRanges int + totalRangeBytes int64 + // statsRanges and statsRangeBytes help understand the + // distribution of ranges on different streams. + statsRanges uint + statsRangeBytes int64 +} + +func (s *mrdStream) updateCapacity(m *multiRangeDownloaderManager, deltaRanges int, deltaBytes int64) { + s.totalRanges = s.totalRanges + deltaRanges + m.pendingRangesCount += deltaRanges + s.totalRangeBytes += deltaBytes + + wasAtCapacity := s.atCapacity + s.atCapacity = s.totalRanges >= m.params.targetPendingRanges || s.totalRangeBytes >= int64(m.params.targetPendingBytes) + + if wasAtCapacity && !s.atCapacity { + m.atCapacityCount-- + } else if !wasAtCapacity && s.atCapacity { + m.atCapacityCount++ + } +} + // --- grpcStorageClient method --- // Top level entry point into the MultiRangeDownloader via the storageClient interface. func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params *newMultiRangeDownloaderParams, opts ...storageOption) (*MultiRangeDownloader, error) { @@ -62,27 +137,20 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params return nil, errors.New("storage: MultiRangeDownloader requires the experimental.WithGRPCBidiReads option") } s := callSettings(c.settings, opts...) + // Force the use of the custom codec to enable zero-copy reads. + s.gax = append(s.gax, gax.WithGRPCOptions( + grpc.ForceCodecV2(bytesCodecV2{}), + )) + if s.userProject != "" { ctx = setUserProjectMetadata(ctx, s.userProject) } if s.retry == nil { s.retry = defaultRetry } + params.defaults() - b := bucketResourceName(globalProjectAlias, params.bucket) - readSpec := &storagepb.BidiReadObjectSpec{ - Bucket: b, - Object: params.object, - CommonObjectRequestParams: toProtoCommonObjectRequestParams(params.encryptionKey), - } - if params.gen >= 0 { - readSpec.Generation = params.gen - } - if params.handle != nil && len(*params.handle) > 0 { - readSpec.ReadHandle = &storagepb.BidiReadHandle{ - Handle: *params.handle, - } - } + readSpec := makeBidiReadObjectSpec(params) mCtx, cancel := context.WithCancel(ctx) @@ -95,18 +163,35 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params params: params, cmds: make(chan mrdCommand, mrdCommandChannelSize), sessionResps: make(chan mrdSessionResult, mrdResponseChannelSize), - pendingRanges: make(map[int64]*rangeRequest), readIDCounter: 1, readSpec: readSpec, attrsReady: make(chan struct{}), spanCtx: ctx, + streams: make(map[int]*mrdStream), + streamPicker: &weightedPicker{targetPendingRanges: params.targetPendingRanges, targetPendingBytes: params.targetPendingBytes}, unsentRequests: newRequestQueue(), + addStreams: make(chan mrdCommand, mrdAddStreamsChannelSize), } mrd := &MultiRangeDownloader{ impl: manager, } + // Blocking call to establish the first session and get attributes. + initialStreamID := manager.streamIDCounter + manager.streamIDCounter++ + manager.streams[initialStreamID] = &mrdStream{ + id: initialStreamID, + pendingRanges: make(map[int64]*rangeRequest), + } + session, finalSpec, err := manager.createNewSession(initialStreamID, readSpec, true) + if err != nil { + manager.setPermanentError(err) + return nil, err + } + // Update the manager's readSpec with any changes (like routing token) from the first session. + manager.readSpec = finalSpec + manager.streams[initialStreamID].session = session manager.wg.Add(1) go func() { defer manager.wg.Done() @@ -116,10 +201,10 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params // Wait for attributes to be ready select { case <-manager.attrsReady: - if manager.permanentErr != nil { + if pErr := manager.getPermanentError(); pErr != nil { cancel() manager.wg.Wait() - return nil, manager.permanentErr + return nil, pErr } if manager.attrs != nil { mrd.Attrs = *manager.attrs @@ -132,6 +217,39 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params } } +func makeBidiReadObjectSpec(params *newMultiRangeDownloaderParams) *storagepb.BidiReadObjectSpec { + b := bucketResourceName(globalProjectAlias, params.bucket) + readSpec := &storagepb.BidiReadObjectSpec{ + Bucket: b, + Object: params.object, + CommonObjectRequestParams: toProtoCommonObjectRequestParams(params.encryptionKey), + } + if params.gen >= 0 { + readSpec.Generation = params.gen + } + if params.handle != nil && len(*params.handle) > 0 { + readSpec.ReadHandle = &storagepb.BidiReadHandle{ + Handle: *params.handle, + } + } + return readSpec +} + +func (m *newMultiRangeDownloaderParams) defaults() { + if m.minConnections <= 0 { + m.minConnections = 1 + } + if m.maxConnections < m.minConnections { + m.maxConnections = m.minConnections + } + if m.targetPendingRanges <= 0 { + m.targetPendingRanges = defaultTargetPendingRanges + } + if m.targetPendingBytes <= 0 { + m.targetPendingBytes = defaultTargetPendingBytes + } +} + // --- mrdCommand Interface and Implementations --- // Used to pass commands from the user-facing code to the MRD manager. // mrdCommand handlers are applied sequentially in the event loop. Therefore, it's okay @@ -183,15 +301,42 @@ func (c *mrdGetHandleCmd) apply(ctx context.Context, m *multiRangeDownloaderMana } } -type mrdErrorCmd struct { - respC chan error +type addStreamCmd struct { + id int + spec *storagepb.BidiReadObjectSpec + stream *mrdStream } -func (c *mrdErrorCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { - select { - case c.respC <- m.permanentErr: - case <-ctx.Done(): - close(c.respC) +func (c *addStreamCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + m.handleAddStreamCmd(ctx, c) +} + +type reconnectStreamCmd struct { + id int + session *bidiReadStreamSession + spec *storagepb.BidiReadObjectSpec + err error +} + +func (c *reconnectStreamCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + m.handleReconnectStreamCmd(ctx, c) +} + +type mrdAddStreamErrorCmd struct { + err error +} + +func (c *mrdAddStreamErrorCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + m.streamCreating = false + if len(m.streams) == 0 { + var err error + if c.err != nil { + err = fmt.Errorf("no streams available. Last observed error: %w", c.err) + } else { + err = errors.New("no streams available") + } + m.setPermanentError(err) + m.failAllPending(m.getPermanentError()) } } @@ -200,12 +345,17 @@ func (c *mrdErrorCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) // back up to the multiRangeDownloadManager for processing, or to pass // an error if the session failed. type mrdSessionResult struct { + id int decoder *readResponseDecoder err error + session *bidiReadStreamSession redirect *storagepb.BidiReadObjectRedirectedError } -var errClosed = errors.New("downloader closed") +var ( + errClosed = errors.New("downloader closed") + errNoStreams = errors.New("no streams available") +) // --- multiRangeDownloaderManager --- // Manages main event loop for MRD commands and processing responses. @@ -221,19 +371,25 @@ type multiRangeDownloaderManager struct { sessionResps chan mrdSessionResult // State - currentSession *bidiReadStreamSession - readIDCounter int64 - pendingRanges map[int64]*rangeRequest - permanentErr error - waiters []chan struct{} - readSpec *storagepb.BidiReadObjectSpec - lastReadHandle []byte - attrs *ReaderObjectAttrs - attrsReady chan struct{} - attrsOnce sync.Once - spanCtx context.Context - callbackWg sync.WaitGroup - unsentRequests *requestQueue + mu sync.Mutex + readIDCounter int64 + permanentErr error + waiters []chan struct{} + readSpec *storagepb.BidiReadObjectSpec + lastReadHandle []byte + pendingRangesCount int + attrs *ReaderObjectAttrs + attrsReady chan struct{} + attrsOnce sync.Once + spanCtx context.Context + callbackWg sync.WaitGroup + streamCreating bool + streamPicker streamPickerStrategy + streamIDCounter int + streams map[int]*mrdStream + unsentRequests *requestQueue + addStreams chan mrdCommand + atCapacityCount int } type rangeRequest struct { @@ -253,8 +409,8 @@ type rangeRequest struct { // Methods implementing internalMultiRangeDownloader func (m *multiRangeDownloaderManager) add(output io.Writer, offset, length int64, callback func(int64, int64, error)) { if err := m.ctx.Err(); err != nil { - if m.permanentErr != nil { - err = m.permanentErr + if pErr := m.getPermanentError(); pErr != nil { + err = pErr } m.runCallback(offset, length, err, callback) return @@ -269,8 +425,8 @@ func (m *multiRangeDownloaderManager) add(output io.Writer, offset, length int64 case m.cmds <- cmd: case <-m.ctx.Done(): err := m.ctx.Err() - if m.permanentErr != nil { - err = m.permanentErr + if pErr := m.getPermanentError(); pErr != nil { + err = pErr } m.runCallback(offset, length, err, callback) } @@ -282,12 +438,15 @@ func (m *multiRangeDownloaderManager) close(err error) error { case m.cmds <- cmd: <-m.ctx.Done() m.wg.Wait() - if m.permanentErr != nil && !errors.Is(m.permanentErr, errClosed) { - return m.permanentErr + if pErr := m.getPermanentError(); pErr != nil && !errors.Is(pErr, errClosed) { + return pErr } return nil case <-m.ctx.Done(): m.wg.Wait() + if m.getPermanentError() != nil { + return m.getPermanentError() + } return m.ctx.Err() } } @@ -337,6 +496,8 @@ func (m *multiRangeDownloaderManager) getHandle() []byte { } func (m *multiRangeDownloaderManager) getPermanentError() error { + m.mu.Lock() + defer m.mu.Unlock() return m.permanentErr } @@ -352,45 +513,45 @@ func (m *multiRangeDownloaderManager) runCallback(origOffset, numBytes int64, er }() } -func (m *multiRangeDownloaderManager) eventLoop() { - defer func() { - if m.currentSession != nil { - m.currentSession.Shutdown() - } - finalErr := m.permanentErr - if finalErr == nil { - if ctxErr := m.ctx.Err(); ctxErr != nil { - finalErr = ctxErr - } - } - if finalErr == nil { - finalErr = errClosed - } - m.failAllPending(finalErr) - for _, waiter := range m.waiters { - close(waiter) - } - m.attrsOnce.Do(func() { close(m.attrsReady) }) - m.callbackWg.Wait() - }() - - // Blocking call to establish the first session and get attributes. - if err := m.establishInitialSession(); err != nil { - // permanentErr is set within establishInitialSession if necessary. - return // Exit eventLoop if we can't start. +func (m *multiRangeDownloaderManager) getReqAndTargetStream(req *rangeRequest) (*storagepb.BidiReadObjectRequest, *mrdStream) { + streamID := m.streamPicker.pick(m.streams) + if streamID == -1 { + return nil, nil + } + stream := m.streams[streamID] + if stream == nil { + return nil, nil } + protoReq := &storagepb.BidiReadObjectRequest{ + ReadRanges: []*storagepb.ReadRange{{ + ReadOffset: req.offset, + ReadLength: req.length, + ReadId: req.readID, + }}, + } + return protoReq, stream +} + +func (m *multiRangeDownloaderManager) eventLoop() { + defer m.cleanup() for { var nextReq *storagepb.BidiReadObjectRequest - var targetChan chan<- *storagepb.BidiReadObjectRequest + var nextRangeReq *rangeRequest + var targetStream *mrdStream + var targetChan chan *storagepb.BidiReadObjectRequest // Only try to send if we have queued requests - if m.unsentRequests.Len() > 0 && m.currentSession != nil { - nextReq = m.unsentRequests.Front() - if nextReq != nil { - targetChan = m.currentSession.reqC + if m.unsentRequests.Len() > 0 { + nextRangeReq = m.unsentRequests.Front() + if nextRangeReq != nil { + nextReq, targetStream = m.getReqAndTargetStream(nextRangeReq) } } + if targetStream != nil && targetStream.session != nil { + targetChan = targetStream.session.reqC + } + // Only read from cmds if we have space in the unsentRequests queue. var cmdsChan chan mrdCommand if m.unsentRequests.Len() < mrdAddInternalQueueMaxSize { @@ -402,7 +563,15 @@ func (m *multiRangeDownloaderManager) eventLoop() { // This path only triggers if space is available in the channel. // It never blocks the eventLoop. case targetChan <- nextReq: + targetStream.pendingRanges[nextRangeReq.readID] = nextRangeReq + targetStream.updateCapacity(m, 1, nextRangeReq.length) + + targetStream.statsRanges++ + targetStream.statsRangeBytes += nextRangeReq.length + m.unsentRequests.RemoveFront() + case cmd := <-m.addStreams: + cmd.apply(m.ctx, m) case cmd := <-cmdsChan: cmd.apply(m.ctx, m) if _, ok := cmd.(*mrdCloseCmd); ok { @@ -411,8 +580,12 @@ func (m *multiRangeDownloaderManager) eventLoop() { case result := <-m.sessionResps: m.processSessionResult(result) } - - if len(m.pendingRanges) == 0 && m.unsentRequests.Len() == 0 { + // Check if new stream has to be added. + if m.shouldAddStream() { + m.addNewStream() + } + // Notify waiters if all ranges are done. + if m.pendingRangesCount == 0 && m.unsentRequests.Len() == 0 { for _, waiter := range m.waiters { close(waiter) } @@ -421,48 +594,108 @@ func (m *multiRangeDownloaderManager) eventLoop() { } } -func (m *multiRangeDownloaderManager) establishInitialSession() error { - retry := m.settings.retry +func (m *multiRangeDownloaderManager) cleanup() { + for id, stream := range m.streams { + if stream.session != nil { + stream.session.Shutdown() + } + delete(m.streams, id) + } - var firstResult mrdSessionResult + // Drain and free any remaining responses to prevent buffer leaks. + close(m.sessionResps) + for result := range m.sessionResps { + if result.decoder != nil { + result.decoder.databufs.Free() + } + } - openStreamAndReceiveFirst := func(ctx context.Context, spec *storagepb.BidiReadObjectSpec) (*bidiReadStreamSession, mrdSessionResult) { - session, err := newBidiReadStreamSession(m.ctx, m.sessionResps, m.client, m.settings, m.params, spec) - if err != nil { - return nil, mrdSessionResult{err: err} + finalErr := m.getPermanentError() + if finalErr == nil { + if ctxErr := m.ctx.Err(); ctxErr != nil { + finalErr = ctxErr } + } + if finalErr == nil { + finalErr = errClosed + } + m.failAllPending(finalErr) + for _, waiter := range m.waiters { + close(waiter) + } + m.attrsOnce.Do(func() { close(m.attrsReady) }) + m.callbackWg.Wait() +} +func (m *multiRangeDownloaderManager) addNewStream() { + m.streamCreating = true + id := int(m.streamIDCounter) + m.streamIDCounter++ + // Clone the spec within the event loop. + clonedSpec := proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec) + if m.ctx.Err() != nil { + m.streamCreating = false + return + } + go func(id int, readSpec *storagepb.BidiReadObjectSpec) { + newSession, newSpec, err := m.createNewSession(id, readSpec, false) + if err != nil || newSession == nil { + // If we can't create a stream, the handler checks the health of + // manager and then decides to kill the manager. + select { + case m.addStreams <- &mrdAddStreamErrorCmd{err: err}: + case <-m.ctx.Done(): + if newSession != nil { + newSession.Shutdown() + } + return + } + return + } select { - case result := <-m.sessionResps: - return session, result - case <-ctx.Done(): - session.Shutdown() - return nil, mrdSessionResult{err: ctx.Err()} + case m.addStreams <- &addStreamCmd{ + id: id, + spec: newSpec, + stream: &mrdStream{ + id: id, + session: newSession, + pendingRanges: make(map[int64]*rangeRequest), + }, + }: + case <-m.ctx.Done(): + newSession.Shutdown() + return } - } + + }(id, clonedSpec) +} + +func (m *multiRangeDownloaderManager) createNewSession(id int, readSpec *storagepb.BidiReadObjectSpec, waitForResult bool) (*bidiReadStreamSession, *storagepb.BidiReadObjectSpec, error) { + retry := m.settings.retry + + var firstResult mrdSessionResult + var newSession *bidiReadStreamSession err := run(m.ctx, func(ctx context.Context) error { - if m.currentSession != nil { - m.currentSession.Shutdown() - m.currentSession = nil + if newSession != nil { + newSession.Shutdown() + newSession = nil } - currentSpec := proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec) - session, result := openStreamAndReceiveFirst(ctx, currentSpec) + session, result := m.openAndInitializeSession(ctx, id, readSpec, waitForResult) if result.err != nil { + if session != nil { + session.Shutdown() + } if result.redirect != nil { - m.readSpec.RoutingToken = result.redirect.RoutingToken - m.readSpec.ReadHandle = result.redirect.ReadHandle - if session != nil { - session.Shutdown() - } + readSpec.RoutingToken = result.redirect.RoutingToken + readSpec.ReadHandle = result.redirect.ReadHandle // We might get a redirect error here for an out-of-region request. // Add the routing token and read handle to the request and do one // retry. - currentSpec = proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec) - session, result = openStreamAndReceiveFirst(ctx, currentSpec) + session, result = m.openAndInitializeSession(ctx, id, readSpec, waitForResult) if result.err != nil { if session != nil { @@ -472,35 +705,59 @@ func (m *multiRangeDownloaderManager) establishInitialSession() error { } } else { // Not a redirect error, return to run() - if session != nil { - session.Shutdown() - } return result.err } } // Success - m.currentSession = session + newSession = session firstResult = result return nil }, retry, true) if err != nil { - m.setPermanentError(err) - return m.permanentErr + return nil, nil, err } - // Process the successful first result - m.processSessionResult(firstResult) - if m.permanentErr != nil { - return m.permanentErr + if waitForResult { + // Process the successful first result + m.processSessionResult(firstResult) + if pErr := m.getPermanentError(); pErr != nil { + return nil, nil, pErr + } + } + return newSession, readSpec, nil +} + +func (m *multiRangeDownloaderManager) openAndInitializeSession(ctx context.Context, id int, spec *storagepb.BidiReadObjectSpec, waitForResult bool) (*bidiReadStreamSession, mrdSessionResult) { + session, err := newBidiReadStreamSession(m.ctx, id, m.sessionResps, m.client, m.settings, m.params, spec) + if err != nil { + return nil, mrdSessionResult{err: err} + } + if !waitForResult { + return session, mrdSessionResult{} + } + for { + select { + case result := <-m.sessionResps: + if result.session != session { + // Stale session result, free it if it has data. + if result.decoder != nil { + result.decoder.databufs.Free() + } + continue + } + return session, result + case <-ctx.Done(): + session.Shutdown() + return nil, mrdSessionResult{err: ctx.Err()} + } } - return nil } func (m *multiRangeDownloaderManager) handleAddCmd(ctx context.Context, cmd *mrdAddCmd) { - if m.permanentErr != nil { - m.runCallback(cmd.offset, cmd.length, m.permanentErr, cmd.callback) + if pErr := m.getPermanentError(); pErr != nil { + m.runCallback(cmd.offset, cmd.length, pErr, cmd.callback) return } @@ -517,31 +774,28 @@ func (m *multiRangeDownloaderManager) handleAddCmd(ctx context.Context, cmd *mrd // Convert to positive offset only if attributes are available. if m.attrs != nil && req.offset < 0 { - err := m.convertToPositiveOffset(req) + err := m.convertToPositiveOffset(nil, req) if err != nil { return } } - if m.currentSession == nil { - // This should not happen if establishInitialSession was successful - m.failRange(req, errors.New("storage: session not available")) - return - } - - m.pendingRanges[req.readID] = req + m.unsentRequests.PushBack(req) +} - protoReq := &storagepb.BidiReadObjectRequest{ - ReadRanges: []*storagepb.ReadRange{{ - ReadOffset: req.offset, - ReadLength: req.length, - ReadId: req.readID, - }}, +func (m *multiRangeDownloaderManager) shouldAddStream() bool { + if m.streamCreating || len(m.streams) >= m.params.maxConnections { + return false } - m.unsentRequests.PushBack(protoReq) + if len(m.streams) < m.params.minConnections { + return true + } + + // Beyond minConnections, we only add if all existing active streams are at capacity. + return m.atCapacityCount >= len(m.streams) } -func (m *multiRangeDownloaderManager) convertToPositiveOffset(req *rangeRequest) error { +func (m *multiRangeDownloaderManager) convertToPositiveOffset(mrdStream *mrdStream, req *rangeRequest) error { if req.offset >= 0 { return nil } @@ -551,13 +805,17 @@ func (m *multiRangeDownloaderManager) convertToPositiveOffset(req *rangeRequest) } if objSize <= 0 { err := errors.New("storage: cannot resolve negative offset with object size as 0") - m.failRange(req, err) + m.failRange(mrdStream, req, err) return err } start := max(objSize+req.offset, 0) req.offset = start if req.length == 0 { + diff := (objSize - start) req.length = objSize - start + if mrdStream != nil { + mrdStream.updateCapacity(m, 0, diff) + } } return nil } @@ -568,179 +826,276 @@ func (m *multiRangeDownloaderManager) handleCloseCmd(ctx context.Context, cmd *m err = cmd.err } else { err = errClosed - } m.setPermanentError(err) m.cancel() } func (m *multiRangeDownloaderManager) handleWaitCmd(ctx context.Context, cmd *mrdWaitCmd) { - if len(m.pendingRanges) == 0 { + // unsentRequests could be non-empty when eventLoop is busy + // in select statements other than Add commands and cleared up + // existing pending ranges. + if m.pendingRangesCount == 0 && m.unsentRequests.Len() == 0 { close(cmd.doneC) } else { m.waiters = append(m.waiters, cmd.doneC) } } +func (m *multiRangeDownloaderManager) handleAddStreamCmd(ctx context.Context, cmd *addStreamCmd) { + m.streams[cmd.id] = cmd.stream + if cmd.spec != nil { + m.readSpec = cmd.spec + } + m.streamCreating = false +} + +func (m *multiRangeDownloaderManager) handleReconnectStreamCmd(ctx context.Context, cmd *reconnectStreamCmd) { + stream, ok := m.streams[cmd.id] + if !ok || stream == nil { + // Stream might have been removed during shutdown. + if cmd.session != nil { + cmd.session.Shutdown() + } + return + } + stream.reconnecting = false + + if cmd.err != nil { + m.failStream(stream, cmd.err) + if len(m.streams) == 0 && !m.streamCreating { + err := fmt.Errorf("no streams available. Last observed error: %w", cmd.err) + m.setPermanentError(err) + m.failAllPending(m.getPermanentError()) + } + return + } + + stream.session = cmd.session + if cmd.spec != nil { + m.readSpec = cmd.spec + } + + var rangesToResend []*storagepb.ReadRange + for _, req := range stream.pendingRanges { + if !req.completed { + readLength := req.length + if req.length > 0 { + readLength -= req.bytesWritten + } + if readLength < 0 { + readLength = 0 + } + + if req.length == 0 || readLength > 0 { + rangesToResend = append(rangesToResend, &storagepb.ReadRange{ + ReadOffset: req.offset + req.bytesWritten, + ReadLength: readLength, + ReadId: req.readID, + }) + } + } + } + if len(rangesToResend) > 0 { + retryReq := &storagepb.BidiReadObjectRequest{ReadRanges: rangesToResend} + stream.session.SendRequest(retryReq) + } +} + func (m *multiRangeDownloaderManager) processSessionResult(result mrdSessionResult) { + if result.decoder != nil { + defer result.decoder.databufs.Free() + } + + mrdStream := m.streams[result.id] + // Shutdown any stale streams sending responses. + if mrdStream != nil && + mrdStream.session != nil && + mrdStream.session != result.session { + result.session.Shutdown() + return + } + if result.err != nil { - m.handleStreamEnd(result) + m.handleStreamEnd(result, m.streams[result.id]) return } resp := result.decoder.msg if handle := resp.GetReadHandle().GetHandle(); len(handle) > 0 { m.lastReadHandle = handle + if m.readSpec.ReadHandle == nil { + m.readSpec.ReadHandle = &storagepb.BidiReadHandle{ + Handle: handle, + } + } else { + m.readSpec.ReadHandle.Handle = handle + } } - m.attrsOnce.Do(func() { defer close(m.attrsReady) if meta := resp.GetMetadata(); meta != nil { obj := newObjectFromProto(meta) attrs := readerAttrsFromObject(obj) m.attrs = &attrs - for _, req := range m.pendingRanges { - if req.offset < 0 { - _ = m.convertToPositiveOffset(req) + for _, stream := range m.streams { + for _, req := range stream.pendingRanges { + if req.offset < 0 { + _ = m.convertToPositiveOffset(stream, req) + } + } + } + // Iterate unsent requests. + for req := m.unsentRequests.l.Front(); req != nil; req = req.Next() { + rangeReq := req.Value.(*rangeRequest) + if rangeReq.offset < 0 { + _ = m.convertToPositiveOffset(nil, rangeReq) } } } }) + if mrdStream != nil { + m.processDataRanges(result, mrdStream, resp) + } +} + +func (m *multiRangeDownloaderManager) processDataRanges(result mrdSessionResult, mrdStream *mrdStream, resp *storagepb.BidiReadObjectResponse) { for _, dataRange := range resp.GetObjectDataRanges() { readID := dataRange.GetReadRange().GetReadId() - req, exists := m.pendingRanges[readID] + req, exists := mrdStream.pendingRanges[readID] if !exists || req.completed { continue } written, _, err := result.decoder.writeToAndUpdateCRC(req.output, readID, nil) req.bytesWritten += written + mrdStream.updateCapacity(m, 0, -written) if err != nil { - m.failRange(req, err) + m.failRange(mrdStream, req, err) continue } if dataRange.GetRangeEnd() { req.completed = true - delete(m.pendingRanges, req.readID) + delete(mrdStream.pendingRanges, req.readID) + mrdStream.updateCapacity(m, -1, 0) m.runCallback(req.origOffset, req.bytesWritten, nil, req.callback) } } - // Once all data in the initial response has been read out, free buffers. - result.decoder.databufs.Free() } // ensureSession is now only for reconnecting *after* the initial session is up. -func (m *multiRangeDownloaderManager) ensureSession(ctx context.Context) error { - if m.currentSession != nil { - return nil +func (m *multiRangeDownloaderManager) ensureSession(ctx context.Context, stream *mrdStream) { + if stream.session != nil || stream.reconnecting { + return } - if m.permanentErr != nil { - return m.permanentErr + if pErr := m.getPermanentError(); pErr != nil { + return } - // Using run for retries - return run(ctx, func(ctx context.Context) error { - if m.currentSession != nil { - return nil - } - if m.permanentErr != nil { - return m.permanentErr - } - - session, err := newBidiReadStreamSession(m.ctx, m.sessionResps, m.client, m.settings, m.params, proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec)) - if err != nil { - redirectErr, isRedirect := isRedirectError(err) - if isRedirect { - m.readSpec.RoutingToken = redirectErr.RoutingToken - m.readSpec.ReadHandle = redirectErr.ReadHandle - return fmt.Errorf("%w: %v", errBidiReadRedirect, err) - } - return err + stream.reconnecting = true + // Clone the spec within the event loop. + clonedSpec := proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec) + go func(id int, readSpec *storagepb.BidiReadObjectSpec) { + session, finalSpec, err := m.createNewSession(id, readSpec, false) + select { + case <-ctx.Done(): + return + case m.addStreams <- &reconnectStreamCmd{ + id: id, + session: session, + spec: finalSpec, + err: err, + }: } - m.currentSession = session - var rangesToResend []*storagepb.ReadRange - for _, req := range m.pendingRanges { - if !req.completed { - readLength := req.length - if req.length > 0 { - readLength -= req.bytesWritten - } - if readLength < 0 { - readLength = 0 - } - - if req.length == 0 || readLength > 0 { - rangesToResend = append(rangesToResend, &storagepb.ReadRange{ - ReadOffset: req.offset + req.bytesWritten, - ReadLength: readLength, - ReadId: req.readID, - }) - } - } - } - if len(rangesToResend) > 0 { - retryReq := &storagepb.BidiReadObjectRequest{ReadRanges: rangesToResend} - m.unsentRequests.PushFront(retryReq) - } - return nil - }, m.settings.retry, true) + }(stream.id, clonedSpec) } var errBidiReadRedirect = errors.New("bidi read object redirected") -func (m *multiRangeDownloaderManager) handleStreamEnd(result mrdSessionResult) { - if m.currentSession != nil { - m.currentSession.Shutdown() - m.currentSession = nil +func (m *multiRangeDownloaderManager) handleStreamEnd(result mrdSessionResult, stream *mrdStream) { + if stream == nil || stream.session != result.session { + result.session.Shutdown() + return + } + if stream.session != nil { + stream.session.Shutdown() + stream.session = nil } err := result.err - var ensureErr error - if result.redirect != nil { m.readSpec.RoutingToken = result.redirect.RoutingToken m.readSpec.ReadHandle = result.redirect.ReadHandle - ensureErr = m.ensureSession(m.ctx) + m.ensureSession(m.ctx, stream) } else if m.settings.retry != nil && m.settings.retry.runShouldRetry(err) { - ensureErr = m.ensureSession(m.ctx) + m.ensureSession(m.ctx, stream) } else { - if !errors.Is(err, context.Canceled) && !errors.Is(err, errClosed) { + m.failStream(stream, err) + if len(m.streams) == 0 && !m.streamCreating { + err := fmt.Errorf("no streams available. Last observed error: %w", err) m.setPermanentError(err) - } else if m.permanentErr == nil { - m.setPermanentError(errClosed) + m.failAllPending(m.getPermanentError()) } - m.failAllPending(m.permanentErr) - } - - // Handle error from ensureSession. - if ensureErr != nil { - m.setPermanentError(ensureErr) - m.failAllPending(m.permanentErr) } } -func (m *multiRangeDownloaderManager) failRange(req *rangeRequest, err error) { +func (m *multiRangeDownloaderManager) failRange(mrdStream *mrdStream, req *rangeRequest, err error) { if req.completed { return } req.completed = true - delete(m.pendingRanges, req.readID) + if mrdStream != nil { + if _, ok := mrdStream.pendingRanges[req.readID]; ok { + delete(mrdStream.pendingRanges, req.readID) + mrdStream.updateCapacity(m, -1, -(req.length - req.bytesWritten)) + } + } m.runCallback(req.origOffset, req.bytesWritten, err, req.callback) } -func (m *multiRangeDownloaderManager) failAllPending(err error) { - for _, req := range m.pendingRanges { +func (m *multiRangeDownloaderManager) failStream(mrdStream *mrdStream, err error) { + totalBytes := int64(0) + pendingRanges := 0 + for _, req := range mrdStream.pendingRanges { if !req.completed { + totalBytes += req.length - req.bytesWritten + pendingRanges++ req.completed = true m.runCallback(req.origOffset, req.bytesWritten, err, req.callback) } } - m.pendingRanges = make(map[int64]*rangeRequest) + mrdStream.updateCapacity(m, -pendingRanges, -totalBytes) + mrdStream.pendingRanges = make(map[int64]*rangeRequest) + delete(m.streams, mrdStream.id) +} + +func (m *multiRangeDownloaderManager) failAllPending(err error) { + for _, stream := range m.streams { + for _, req := range stream.pendingRanges { + if !req.completed { + req.completed = true + m.runCallback(req.origOffset, req.bytesWritten, err, req.callback) + } + } + stream.pendingRanges = make(map[int64]*rangeRequest) + stream.totalRanges = 0 + stream.totalRangeBytes = 0 + stream.atCapacity = false + } + // fail all the ranges in unsent requests + for req := m.unsentRequests.l.Front(); req != nil; req = req.Next() { + m.failRange(nil, req.Value.(*rangeRequest), err) + } + m.unsentRequests.Clear() + m.pendingRangesCount = 0 + m.atCapacityCount = 0 } // Set permanent error to the provided error, if it hasn't been set already. func (m *multiRangeDownloaderManager) setPermanentError(err error) { + m.mu.Lock() + defer m.mu.Unlock() if m.permanentErr == nil { m.permanentErr = err } @@ -751,8 +1106,10 @@ func (m *multiRangeDownloaderManager) setPermanentError(err error) { // object in GCS. Spins up goroutines for the read and write sides of the // stream. type bidiReadStreamSession struct { - ctx context.Context - cancel context.CancelFunc + id int + managerCtx context.Context + ctx context.Context + cancel context.CancelFunc stream storagepb.Storage_BidiReadObjectClient client *grpcStorageClient @@ -768,28 +1125,26 @@ type bidiReadStreamSession struct { streamErr error } -func newBidiReadStreamSession(ctx context.Context, respC chan<- mrdSessionResult, client *grpcStorageClient, settings *settings, params *newMultiRangeDownloaderParams, readSpec *storagepb.BidiReadObjectSpec) (*bidiReadStreamSession, error) { +func newBidiReadStreamSession(ctx context.Context, id int, respC chan<- mrdSessionResult, client *grpcStorageClient, settings *settings, params *newMultiRangeDownloaderParams, readSpec *storagepb.BidiReadObjectSpec) (*bidiReadStreamSession, error) { sCtx, cancel := context.WithCancel(ctx) s := &bidiReadStreamSession{ - ctx: sCtx, - cancel: cancel, - client: client, - settings: settings, - params: params, - readSpec: readSpec, - reqC: make(chan *storagepb.BidiReadObjectRequest, 100), - respC: respC, + id: id, + ctx: sCtx, + managerCtx: ctx, + cancel: cancel, + client: client, + settings: settings, + params: params, + readSpec: readSpec, + reqC: make(chan *storagepb.BidiReadObjectRequest, mrdSendChannelSize), + respC: respC, } initialReq := &storagepb.BidiReadObjectRequest{ ReadObjectSpec: s.readSpec, } reqCtx := gax.InsertMetadataIntoOutgoingContext(s.ctx, contextMetadataFromBidiReadObject(initialReq)...) - // Force the use of the custom codec to enable zero-copy reads. - s.settings.gax = append(s.settings.gax, gax.WithGRPCOptions( - grpc.ForceCodecV2(bytesCodecV2{}), - )) var err error s.stream, err = client.raw.BidiReadObject(reqCtx, s.settings.gax...) @@ -841,7 +1196,9 @@ func (s *bidiReadStreamSession) sendLoop() { } if err := s.stream.Send(req); err != nil { s.setError(err) - s.cancel() + if err != io.EOF { + s.cancel() + } return } case <-s.ctx.Done(): @@ -853,10 +1210,6 @@ func (s *bidiReadStreamSession) receiveLoop() { defer s.wg.Done() defer s.cancel() for { - if err := s.ctx.Err(); err != nil { - return - } - // Receive message without a copy. databufs := mem.BufferSlice{} err := s.stream.RecvMsg(&databufs) @@ -872,24 +1225,56 @@ func (s *bidiReadStreamSession) receiveLoop() { if err != nil { databufs.Free() redirectErr, isRedirect := isRedirectError(err) - result := mrdSessionResult{err: err} + result := mrdSessionResult{ + err: err, + id: s.id, + session: s, + } if isRedirect { result.redirect = redirectErr err = fmt.Errorf("%w: %v", errBidiReadRedirect, err) result.err = err } s.setError(err) - + if s.managerCtx.Err() != nil { + return + } select { case s.respC <- result: - case <-s.ctx.Done(): + case <-s.managerCtx.Done(): } return } - + if s.managerCtx.Err() != nil { + databufs.Free() + return + } select { - case s.respC <- mrdSessionResult{decoder: decoder}: + case s.respC <- mrdSessionResult{ + decoder: decoder, + id: s.id, + session: s, + }: + case <-s.ctx.Done(): + // If context is cancelled unexpectedly, make sure to notify + // eventLoop before returning + err := s.streamErr + if err == nil { + err = s.ctx.Err() + } + // Make sure the event loop is active before sending error + // to make sure we do not send on a closed respC channel + // during normal MRD close. + if s.managerCtx.Err() != nil { + databufs.Free() + return + } + select { + case s.respC <- mrdSessionResult{id: s.id, session: s, err: err}: + case <-s.managerCtx.Done(): + } + databufs.Free() return } } @@ -936,13 +1321,13 @@ func newRequestQueue() *requestQueue { return &requestQueue{l: list.New()} } -func (q *requestQueue) PushBack(r *storagepb.BidiReadObjectRequest) { q.l.PushBack(r) } -func (q *requestQueue) PushFront(r *storagepb.BidiReadObjectRequest) { q.l.PushFront(r) } -func (q *requestQueue) Len() int { return q.l.Len() } +func (q *requestQueue) PushBack(r *rangeRequest) { q.l.PushBack(r) } +func (q *requestQueue) Len() int { return q.l.Len() } +func (q *requestQueue) Clear() { q.l.Init() } -func (q *requestQueue) Front() *storagepb.BidiReadObjectRequest { +func (q *requestQueue) Front() *rangeRequest { if f := q.l.Front(); f != nil { - return f.Value.(*storagepb.BidiReadObjectRequest) + return f.Value.(*rangeRequest) } return nil } diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go index e6adfbab..14522957 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go @@ -23,6 +23,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" gapic "cloud.google.com/go/storage/internal/apiv2" @@ -52,8 +53,12 @@ func (w *gRPCWriter) Write(p []byte) (n int, err error) { case <-w.donec: return 0, w.streamResult case w.writesChan <- cmd: - // update fullObjectChecksum on every write and send it on finalWrite - if !w.disableAutoChecksum { + md5Provided := w.attrs != nil && w.attrs.MD5 != nil + // Update fullObjectChecksum on every write and send it on finalWrite if not disabled. + // Skip checksum calculation if user configures MD5 or CRC32C themselves. + if !w.disableAutoChecksum && + !w.sendCRC32C && + !md5Provided { w.fullObjectChecksum = crc32.Update(w.fullObjectChecksum, crc32cTable, p) } // write command successfully delivered to sender. We no longer own cmd. @@ -182,7 +187,8 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage appendGen: params.appendGen, finalizeOnClose: params.finalizeOnClose, - buf: make([]byte, 0, chunkSize), + buf: nil, // Allocated lazily on first buffered write. + chunkSize: chunkSize, writeQuantum: writeQuantum, lastSegmentStart: lastSegmentStart, sendableUnits: sendableUnits, @@ -264,7 +270,8 @@ type gRPCWriter struct { appendGen int64 finalizeOnClose bool - buf []byte + buf []byte + chunkSize int // A writeQuantum is the largest quantity of data which can be sent to the // service in a single message. writeQuantum int @@ -381,21 +388,26 @@ func (w *gRPCWriter) gatherFirstBuffer() error { for cmd := range w.writesChan { switch v := cmd.(type) { case *gRPCWriterCommandWrite: - if len(w.buf)+len(v.p) <= cap(w.buf) { - // We have not started sending yet, and we can stage all data without - // starting a send. Compare against cap(w.buf) instead of - // w.writeQuantum: that way we can perform a oneshot upload for objects - // which fit in one chunk, even though we will cut the request into - // w.writeQuantum units when we do start sending. - origLen := len(w.buf) - w.buf = w.buf[:origLen+len(v.p)] - copy(w.buf[origLen:], v.p) - close(v.done) - } else { - // Too large. Handle it in writeLoop. + // If zero-copy one-shot is requested, OR the payload is larger than the buffer, + // bypass buffering entirely and hand off to the writeLoop immediately. + if w.forceOneShot || len(w.buf)+len(v.p) > w.chunkSize { w.currentCommand = cmd return nil } + + // Otherwise, lazily allocate and stage the small write (normal buffered path) + if w.buf == nil { + w.buf = make([]byte, 0, w.chunkSize) + } + // We have not started sending yet, and we can stage all data without + // starting a send. Compare against w.chunkSize instead of + // w.writeQuantum: that way we can perform a oneshot upload for objects + // which fit in one chunk, even though we will cut the request into + // w.writeQuantum units when we do start sending. + origLen := len(w.buf) + w.buf = w.buf[:origLen+len(v.p)] + copy(w.buf[origLen:], v.p) + close(v.done) break case *gRPCWriterCommandClose: // If we get here, data (if any) fits in w.buf, so we can force oneshot. @@ -565,17 +577,33 @@ type gRPCWriterCommand interface { } type gRPCWriterCommandWrite struct { - p []byte - done chan struct{} + p []byte + done chan struct{} + initialOffset int64 + hasStarted bool + closeOnce sync.Once } func (c *gRPCWriterCommandWrite) handle(w *gRPCWriter, cs gRPCWriterCommandHandleChans) error { if len(c.p) == 0 { // No data to write. - close(c.done) + c.markDone() + return nil + } + + // Zero-Copy send. + if w.forceOneShot { + err := c.zeroCopyWrite(w, cs) + if err != nil { + return err + } + // If zeroCopyWrite returns without error, the write is done. return nil } + if w.buf == nil { + w.buf = make([]byte, 0, w.chunkSize) + } wblen := len(w.buf) allKnownBytes := wblen + len(c.p) fullBufs := allKnownBytes / cap(w.buf) @@ -602,7 +630,7 @@ func (c *gRPCWriterCommandWrite) handle(w *gRPCWriter, cs gRPCWriterCommandHandl return w.streamSender.err() } w.bufUnsentIdx = int(sentOffset - w.bufBaseOffset) - close(c.done) + c.markDone() return nil } @@ -695,10 +723,53 @@ func (c *gRPCWriterCommandWrite) handle(w *gRPCWriter, cs gRPCWriterCommandHandl w.buf = w.buf[:len(toCopyIn)] copy(w.buf, toCopyIn) w.bufUnsentIdx = int(sentOffset - w.bufBaseOffset) - close(c.done) + c.markDone() + return nil +} + +func (c *gRPCWriterCommandWrite) zeroCopyWrite(w *gRPCWriter, cs gRPCWriterCommandHandleChans) error { + // Pre-emptively get the context channel to avoid closure overhead in the loop. + ctxDone := w.preRunCtx.Done() + + // sendBufferToTarget handles the quantum breakdown. + newOffset, ok := w.sendBufferToTarget(cs, c.p, w.bufBaseOffset, len(c.p), w.handleCompletion) + if !ok { + return w.streamSender.err() + } + + // Request an ack from the sender goroutine to ensure the buffer has been + // dispatched to gRPC and is safe for the user to reuse. + if !cs.deliverRequestUnlessCompleted(gRPCBidiWriteRequest{requestAck: true}, w.handleCompletion) { + return w.streamSender.err() + } + + ackOutstanding := true + + // Wait for server acknowledgement and sender transmissions to enable incremental progress. + for ackOutstanding || w.bufBaseOffset < newOffset { + select { + case completion, ok := <-cs.completions: + if !ok { + return w.streamSender.err() + } + w.handleCompletion(completion) + case <-cs.requestAcks: + ackOutstanding = false + case <-ctxDone: + return w.preRunCtx.Err() + } + } + + c.p = nil + c.markDone() return nil } +// Helper to ensure we don't close done twice and keep the main logic clean. +func (c *gRPCWriterCommandWrite) markDone() { + c.closeOnce.Do(func() { close(c.done) }) +} + type gRPCWriterCommandFlush struct { done chan int64 } @@ -848,13 +919,16 @@ func getObjectChecksums(params *getObjectChecksumsParams) *storagepb.ObjectCheck } // send user's checksum on last write op if available - if params.sendCRC32C { + if params.sendCRC32C || (params.objectAttrs != nil && params.objectAttrs.MD5 != nil) { return toProtoChecksums(params.sendCRC32C, params.objectAttrs) } // TODO(b/461982277): Enable checksum validation for appendable takeover writer gRPC if params.disableAutoChecksum || params.takeoverWriter { return nil } + if params.fullObjectChecksum == nil { + return nil + } return &storagepb.ObjectChecksums{ Crc32C: proto.Uint32(params.fullObjectChecksum()), } diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go index af64d0b7..807ccc90 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go @@ -17,4 +17,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.60.0" +const Version = "1.61.3" diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/pcu.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/pcu.go index 0078d264..5a86e776 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/pcu.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/pcu.go @@ -16,170 +16,93 @@ package storage import ( "context" + "crypto/rand" + "encoding/hex" "errors" "fmt" - "maps" - "math/rand" "path" "runtime" - "strconv" + "sort" "sync" "time" +) - gax "github.com/googleapis/gax-go/v2" +const ( + defaultPartSize = 16 * 1024 * 1024 // 16 MiB + minPartSize = 5 * 1024 * 1024 // 5 MiB + baseWorkers = 4 + maxWorkers = 16 + tmpObjectPrefix = "gcs-go-sdk-pu-tmp/" + maxComposeComponents = 32 + defaultMaxRetries = 3 + defaultBaseDelay = 100 * time.Millisecond + defaultMaxDelay = 5 * time.Second ) -// parallelUploadConfig holds configuration for Parallel Composite Uploads. -// Setting this config and EnableParallelUpload flag on Writer enables PCU. +// parallelUploadConfig holds configuration for Parallel Uploads. +// Setting this config and EnableParallelUpload flag on Writer enables parallel uploads. // // **Note:** This feature is currently experimental and its API surface may change // in future releases. It is not yet recommended for production use. type parallelUploadConfig struct { - // minSize is the minimum size of an object in bytes to use PCU. - // If an object's size is less than this value, a simple upload is performed. - // If this is not set, a default of 64 MiB will be used. - // To enable PCU for all uploads regardless of size, set this to 0. - minSize *int64 // partSize is the size of each part to be uploaded in parallel. - // Defaults to 16MiB. Must be a multiple of 256KiB. + // Defaults to 16MiB. Must be a multiple of 256KiB and more than 5MiB. partSize int - // numWorkers is the number of goroutines to use for uploading parts in parallel. + // maxConcurrency is the number of goroutines to use for uploading parts in parallel. // Defaults to a dynamic value based on the number of CPUs (min(4 + NumCPU/2, 16)). - numWorkers int - - // bufferPoolSize is the number of PartSize buffers to pool. - // Defaults to NumWorkers + 1. - bufferPoolSize int - - // tmpObjectPrefix is the prefix for temporary object names. - // Defaults to "gcs-go-sdk-pcu-tmp/". - tmpObjectPrefix string - - // retryOptions defines the retry behavior for uploading parts. - // Defaults to a sensible policy for part uploads (e.g., max 3 retries). - retryOptions []RetryOption - - // cleanupStrategy dictates how temporary parts are cleaned up. - // Defaults to CleanupAlways. - cleanupStrategy partCleanupStrategy - - // namingStrategy provides a strategy for naming temporary part objects. - // Defaults to a strategy that includes a random element to avoid hotspotting. - namingStrategy partNamingStrategy - - // metadataDecorator allows adding custom metadata to temporary part objects. - metadataDecorator partMetadataDecorator -} - -// partCleanupStrategy defines when temporary objects are deleted. -type partCleanupStrategy int - -const ( - // cleanupAlways clean up temporary parts on both success and failure. - cleanupAlways partCleanupStrategy = iota - // cleanupOnSuccess clean up temporary parts only on successful final composition. - cleanupOnSuccess - // cleanupNever means the application is responsible for cleaning up temporary parts. - cleanupNever -) - -func (s partCleanupStrategy) String() string { - switch s { - case cleanupAlways: - return "always" - case cleanupOnSuccess: - return "on_success" - case cleanupNever: - return "never" - default: - return fmt.Sprintf("PartCleanupStrategy(%d)", s) - } -} - -// partNamingStrategy interface for generating temporary object names. -type partNamingStrategy interface { - newPartName(bucket, prefix, finalName string, partNumber int) string -} - -// defaultNamingStrategy provides a default implementation for naming temporary parts. -type defaultNamingStrategy struct{} - -// newPartName creates a unique name for a temporary part to avoid hotspotting. -func (d *defaultNamingStrategy) newPartName(bucket, prefix, finalName string, partNumber int) string { - rnd := rand.Uint64() - return path.Join(prefix, fmt.Sprintf("%x-%s-part-%d", rnd, finalName, partNumber)) -} - -// partMetadataDecorator interface for modifying temporary object metadata. -type partMetadataDecorator interface { - Decorate(attrs *ObjectAttrs) + maxConcurrency int } -const ( - defaultPartSize = 16 * 1024 * 1024 // 16 MiB - defaultMinSize = 64 * 1024 * 1024 // 64 MiB - baseWorkers = 4 - maxWorkers = 16 - defaultTmpObjectPrefix = "gcs-go-sdk-pcu-tmp/" - maxComposeComponents = 32 - defaultMaxRetries = 3 - defaultBaseDelay = 100 * time.Millisecond - defaultMaxDelay = 5 * time.Second - pcuPartNumberMetadataKey = "x-goog-meta-gcs-pcu-part-number" - pcuFinalObjectMetadataKey = "x-goog-meta-gcs-pcu-final-object" -) - +// defaults fills in values for the eventually-public configuration options. func (c *parallelUploadConfig) defaults() { - if c.minSize == nil { - c.minSize = new(int64) - *c.minSize = defaultMinSize - } if c.partSize == 0 { c.partSize = defaultPartSize + } else if c.partSize < minPartSize { + c.partSize = minPartSize } // Use a heuristic for the number of workers: start with 4, add 1 for // every 2 CPUs, but don't exceed a cap of 16. This provides a // balance between parallelism and resource contention. - if c.numWorkers == 0 { - c.numWorkers = min(baseWorkers+(runtime.NumCPU()/2), maxWorkers) + if c.maxConcurrency == 0 { + c.maxConcurrency = min(baseWorkers+(runtime.NumCPU()/2), maxWorkers) } +} + +type pcuSettings struct { + // bufferPoolSize is the number of PartSize buffers to pool + // and is set to MaxConcurrency + 1. + bufferPoolSize int +} + +func newPCUSettings(maxConcurrency int) *pcuSettings { + c := &pcuSettings{} + if c.bufferPoolSize == 0 { - c.bufferPoolSize = c.numWorkers + 1 - } - if c.tmpObjectPrefix == "" { - c.tmpObjectPrefix = defaultTmpObjectPrefix - } - if c.retryOptions == nil { - c.retryOptions = []RetryOption{ - WithMaxAttempts(defaultMaxRetries), - WithBackoff(gax.Backoff{ - Initial: defaultBaseDelay, - Max: defaultMaxDelay, - }), - } - } - if c.cleanupStrategy == 0 { - c.cleanupStrategy = cleanupAlways - } - if c.namingStrategy == nil { - c.namingStrategy = &defaultNamingStrategy{} + c.bufferPoolSize = maxConcurrency + 1 } + return c +} + +// newPartName creates a unique name for a temporary part to avoid hotspotting. +func newPartName(bucket, prefix, finalName string, partNumber int) string { + rnd := generateRandomBytes(4) + return path.Join(prefix, fmt.Sprintf("%x-%s-part-%d", rnd, finalName, partNumber)) } type pcuState struct { - ctx context.Context - cancel context.CancelFunc - w *Writer - config *parallelUploadConfig + ctx context.Context + cancel context.CancelFunc + w *Writer + config *parallelUploadConfig + settings *pcuSettings mu sync.Mutex // Handles to the uploaded temporary parts, keyed by partNumber. partMap map[int]*ObjectHandle // Handles to intermediate composite objects, keyed by their object name. intermediateMap map[string]*ObjectHandle - failedDeletes []*ObjectHandle errOnce sync.Once firstErr error errors []error @@ -193,9 +116,18 @@ type pcuState struct { workerWG sync.WaitGroup collectorWG sync.WaitGroup started bool + closeOnce sync.Once // Function to upload a part; can be overridden for testing. uploadPartFn func(s *pcuState, task uploadTask) (*ObjectHandle, *ObjectAttrs, error) + // Function to delete an object; can be overridden for testing. + deleteFn func(ctx context.Context, h *ObjectHandle) error + // Function to perform cleanup; can be overridden for testing. + doCleanupFn func(s *pcuState) + // Function to compose parts; can be overridden for testing. + composePartsFn func(s *pcuState) error + // Function to run the compose operation; can be overridden for testing. + composeFn func(ctx context.Context, composer *Composer) (*ObjectAttrs, error) } type uploadTask struct { @@ -214,6 +146,10 @@ type uploadResult struct { func (w *Writer) initPCU(ctx context.Context) error { // TODO: Check if PCU is enabled on the Writer. + // Sanity check: If these are nil, something has gone fundamentally wrong in the Writer lifecycle. + if w.o == nil || w.o.c == nil || w.o.bucket == "" { + return fmt.Errorf("upload requires a non-nil ObjectHandle with a bucket name and a client") + } // TODO: Get the config from the Writer. cfg := ¶llelUploadConfig{} cfg.defaults() @@ -221,6 +157,8 @@ func (w *Writer) initPCU(ctx context.Context) error { // Ensure PartSize is a multiple of googleapi.MinUploadChunkSize. cfg.partSize = gRPCChunkSize(cfg.partSize) + s := newPCUSettings(cfg.maxConcurrency) + pCtx, cancel := context.WithCancel(ctx) state := &pcuState{ @@ -228,21 +166,30 @@ func (w *Writer) initPCU(ctx context.Context) error { cancel: cancel, w: w, config: cfg, - bufferCh: make(chan []byte, cfg.bufferPoolSize), - uploadCh: make(chan uploadTask), + settings: s, + bufferCh: make(chan []byte, s.bufferPoolSize), + uploadCh: make(chan uploadTask, cfg.maxConcurrency), // Buffered to prevent worker starvation resultCh: make(chan uploadResult), partMap: make(map[int]*ObjectHandle), intermediateMap: make(map[string]*ObjectHandle), uploadPartFn: (*pcuState).uploadPart, + deleteFn: func(ctx context.Context, h *ObjectHandle) error { + return h.Delete(ctx) + }, + doCleanupFn: (*pcuState).doCleanup, + composePartsFn: (*pcuState).composeParts, + composeFn: func(ctx context.Context, c *Composer) (*ObjectAttrs, error) { + return c.Run(ctx) + }, } // TODO: Assign the state to the Writer - for i := 0; i < cfg.bufferPoolSize; i++ { + for i := 0; i < s.bufferPoolSize; i++ { state.bufferCh <- make([]byte, cfg.partSize) } - state.workerWG.Add(cfg.numWorkers) - for i := 0; i < cfg.numWorkers; i++ { + state.workerWG.Add(cfg.maxConcurrency) + for i := 0; i < cfg.maxConcurrency; i++ { go state.worker() } @@ -274,42 +221,45 @@ func (s *pcuState) worker() { } func(t uploadTask) { // Ensure the buffer is returned to the pool. - defer func() { s.bufferCh <- t.buffer }() + defer func() { + select { + case s.bufferCh <- t.buffer: + default: + // If the buffer pool is full, drop the buffer. + // This is a safety measure to avoid blocking indefinitely. + } + }() // This handles the case where cancellation happens before we begin upload. - select { - case <-s.ctx.Done(): - s.resultCh <- uploadResult{partNumber: t.partNumber, err: s.ctx.Err()} + if err := s.ctx.Err(); err != nil { return - default: } handle, attrs, err := s.uploadPartFn(s, t) - // Always send a result to the collector. - s.resultCh <- uploadResult{partNumber: t.partNumber, obj: attrs, handle: handle, err: err} + if err := s.ctx.Err(); err != nil { + return + } + select { + // Always send a result to the collector if the context is not cancelled. + case s.resultCh <- uploadResult{partNumber: t.partNumber, obj: attrs, handle: handle, err: err}: + case <-s.ctx.Done(): + } }(task) } } } -// TODO: add retry logic. func (s *pcuState) uploadPart(task uploadTask) (*ObjectHandle, *ObjectAttrs, error) { - partName := s.config.namingStrategy.newPartName(s.w.o.bucket, s.config.tmpObjectPrefix, s.w.o.object, task.partNumber) + partName := newPartName(s.w.o.bucket, tmpObjectPrefix, s.w.o.object, task.partNumber) partHandle := s.w.o.c.Bucket(s.w.o.bucket).Object(partName) pw := partHandle.NewWriter(s.ctx) pw.ObjectAttrs.Name = partName pw.ObjectAttrs.Size = task.size - pw.SendCRC32C = s.w.SendCRC32C + pw.DisableAutoChecksum = s.w.DisableAutoChecksum pw.ChunkSize = 0 // Force single-shot upload for parts. - // Clear fields not applicable to parts or that are set by compose. - pw.ObjectAttrs.CRC32C = 0 - pw.ObjectAttrs.MD5 = nil - setPartMetadata(pw, s, task) - _, err := pw.Write(task.buffer[:task.size]) - if err != nil { - pw.CloseWithError(err) + if _, err := pw.Write(task.buffer[:task.size]); err != nil { return nil, nil, fmt.Errorf("failed to write part %d: %w", task.partNumber, err) } @@ -320,22 +270,6 @@ func (s *pcuState) uploadPart(task uploadTask) (*ObjectHandle, *ObjectAttrs, err return partHandle, pw.Attrs(), nil } -func setPartMetadata(pw *Writer, s *pcuState, task uploadTask) { - partNumberStr := strconv.Itoa(task.partNumber) - var md map[string]string - if s.w.ObjectAttrs.Metadata != nil { - md = maps.Clone(s.w.ObjectAttrs.Metadata) - } else { - md = make(map[string]string) - } - pw.ObjectAttrs.Metadata = md - pw.ObjectAttrs.Metadata[pcuPartNumberMetadataKey] = partNumberStr - pw.ObjectAttrs.Metadata[pcuFinalObjectMetadataKey] = s.w.o.object - if s.config.metadataDecorator != nil { - s.config.metadataDecorator.Decorate(&pw.ObjectAttrs) - } -} - func (s *pcuState) resultCollector() { defer s.collectorWG.Done() for result := range s.resultCh { @@ -345,6 +279,10 @@ func (s *pcuState) resultCollector() { s.mu.Lock() s.partMap[result.partNumber] = result.handle s.mu.Unlock() + } else { + // Both are nil: this is an impossible state that indicates a logical error. + // Setting an error to prevent silent data corruption. + s.setError(fmt.Errorf("upload result missing both error and handle for part %d", result.partNumber)) } } } @@ -362,3 +300,266 @@ func (s *pcuState) setError(err error) { s.cancel() // Cancel context on first error. }) } + +func (s *pcuState) write(p []byte) (int, error) { + if !s.started { + return 0, fmt.Errorf("pcuState not started") + } + s.mu.Lock() + err := s.firstErr + s.mu.Unlock() + if err != nil { + return 0, err + } + + total := len(p) + for len(p) > 0 { + // Acquire a buffer from the pool if we don't have one. + if s.currentBuffer == nil { + // Fail-fast check before taking a new buffer. + s.mu.Lock() + err = s.firstErr + s.mu.Unlock() + if err != nil { + return total - len(p), err + } + + select { + case <-s.ctx.Done(): + return total - len(p), s.ctx.Err() + case s.currentBuffer = <-s.bufferCh: + s.bytesBuffered = 0 + } + } + + n := copy(s.currentBuffer[s.bytesBuffered:], p) + s.bytesBuffered += int64(n) + p = p[n:] + + // If the buffer is full, dispatch it to a worker. + if s.bytesBuffered == int64(s.config.partSize) { + if err := s.flushCurrentBuffer(); err != nil { + return total - len(p), err + } + } + } + return total, nil +} + +func (s *pcuState) flushCurrentBuffer() error { + if s.bytesBuffered == 0 { + return nil + } + + // Capture state for the task while under lock, then release immediately. + s.mu.Lock() + if s.firstErr != nil { + s.mu.Unlock() + return s.firstErr + } + s.partNum++ + pNum := s.partNum + s.mu.Unlock() + + task := uploadTask{ + partNumber: pNum, + buffer: s.currentBuffer, + size: s.bytesBuffered, + } + // Clear current state so the next Write call picks up a fresh buffer. + s.currentBuffer = nil + s.bytesBuffered = 0 + + // Dispatch the task. Using a select ensures we don't hang indefinitely + // if the context is cancelled while the upload queue is full. + select { + case <-s.ctx.Done(): + // Return buffer to pool if we couldn't dispatch. + select { + case s.bufferCh <- task.buffer: + default: + // Discard the buffer if we can't return it, to avoid blocking. + } + return s.ctx.Err() + case s.uploadCh <- task: + return nil + } +} + +func (s *pcuState) close() error { + if !s.started { + return nil + } + var err error + s.closeOnce.Do(func() { + + // Flush the final partial buffer if it exists. + if err := s.flushCurrentBuffer(); err != nil { + s.setError(err) + } + + // Wait for workers, then close resultCh. + // This prevents "send on closed channel" panics. + close(s.uploadCh) + s.workerWG.Wait() + close(s.resultCh) + s.collectorWG.Wait() + + // Cleanup is always attempted. + defer s.doCleanupFn(s) + + s.mu.Lock() + err = s.firstErr + s.mu.Unlock() + + if err != nil { + return + } + + // If no parts were actually uploaded (e.g. empty file), + // fall back to a standard empty object creation. + if len(s.partMap) == 0 { + ow := s.w.o.NewWriter(s.w.ctx) + if ow == nil { + err = fmt.Errorf("failed to create writer for empty object") + s.setError(err) + return + } + ow.ObjectAttrs = s.w.ObjectAttrs + err = ow.Close() + if err != nil { + s.setError(err) + } + return + } + + // Perform the recursive composition of parts. + if err = s.composePartsFn(s); err != nil { + s.setError(err) + return + } + }) + return err +} + +// getSortedParts returns the uploaded parts sorted by part number. +func (s *pcuState) getSortedParts() []*ObjectHandle { + keys := make([]int, 0, len(s.partMap)) + for k := range s.partMap { + keys = append(keys, k) + } + sort.Ints(keys) + + parts := make([]*ObjectHandle, 0, len(keys)) + for _, k := range keys { + parts = append(parts, s.partMap[k]) + } + return parts +} + +// composeParts performs the multi-level compose operation to create the final object. +func (s *pcuState) composeParts() error { + finalComps := s.getSortedParts() + level := 0 + + for len(finalComps) > maxComposeComponents { + level++ + numIntermediates := (len(finalComps) + maxComposeComponents - 1) / maxComposeComponents + nextLevel := make([]*ObjectHandle, numIntermediates) + + var wg sync.WaitGroup + // Use a thread-safe way to capture the first error at this level. + var levelErr error + var errOnce sync.Once + + for i := 0; i < numIntermediates; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + start := idx * maxComposeComponents + end := min(start+maxComposeComponents, len(finalComps)) + + // Level-based naming with hash to prevent exceeding 1024 bytes. + h := hex.EncodeToString(generateRandomBytes(4)) + compName := path.Join(tmpObjectPrefix, fmt.Sprintf("int-%s-lv%d-%d", h, level, idx)) + + interHandle := s.w.o.c.Bucket(s.w.o.bucket).Object(compName) + composer := interHandle.ComposerFrom(finalComps[start:end]...) + + _, err := s.composeFn(s.ctx, composer) + if err != nil { + errOnce.Do(func() { levelErr = err }) + return + } + nextLevel[idx] = interHandle + }(i) + } + wg.Wait() + // Do a batch insert of intermediate handles to the map to minimize lock contention. + s.mu.Lock() + for _, h := range nextLevel { + if h != nil { + s.intermediateMap[h.object] = h + } + } + s.mu.Unlock() + + if levelErr != nil { + return levelErr + } + finalComps = nextLevel + } + + // Final Compose + composer := s.w.o.ComposerFrom(finalComps...) + composer.ObjectAttrs = s.w.ObjectAttrs + composer.KMSKeyName = s.w.ObjectAttrs.KMSKeyName + composer.SendCRC32C = s.w.SendCRC32C + + attrs, err := s.composeFn(s.ctx, composer) + if err != nil { + return err + } + s.w.obj = attrs + return nil +} + +func (s *pcuState) doCleanup() { + if len(s.partMap) == 0 && len(s.intermediateMap) == 0 { + return + } + + var wg sync.WaitGroup + + // Semaphore to avoid spawning too many goroutines for deletion. + sem := make(chan struct{}, s.config.maxConcurrency) + + runDelete := func(h *ObjectHandle) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + + // Use WithoutCancel to ensure cleanup isn't killed by parent context cancellation + // Ignore cleanup errors here since its best effort and will rely on bucket + // lifecycle policies if cleanup fails. + _ = s.deleteFn(context.WithoutCancel(s.ctx), h) + } + + for _, h := range s.partMap { + wg.Add(1) + go runDelete(h) + } + for _, h := range s.intermediateMap { + wg.Add(1) + go runDelete(h) + } + wg.Wait() +} + +// Generates size random bytes. +func generateRandomBytes(n int) []byte { + b := make([]byte, n) + _, _ = rand.Read(b) + return b +} diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/post_policy_v4.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/post_policy_v4.go index 4b2feea9..17008a81 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/post_policy_v4.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/post_policy_v4.go @@ -40,7 +40,7 @@ type PostPolicyV4Options struct { // PrivateKey is the Google service account private key. It is obtainable // from the Google Developers Console. - // At https://console.developers.google.com/project//apiui/credential, + // At https://console.developers.google.com/project/{your-project-id}/apiui/credential, // create a service account client ID or reuse one of your existing service account // credentials. Click on the "Generate new P12 key" to generate and download // a new private key. Once you download the P12 file, use the following command diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/reader.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/reader.go index 9be40dfa..043cb467 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/reader.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/reader.go @@ -154,6 +154,73 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) return r, err } +// MRDOption is an option for MultiRangeDownloader. +type MRDOption interface { + apply(*newMultiRangeDownloaderParams) +} + +type minConnections int + +func (c minConnections) apply(params *newMultiRangeDownloaderParams) { + params.minConnections = int(c) +} + +// WithMinConnections returns an MRDOption which sets minimum connections +// on the MRD to c. The call to NewMultiRangeDownloader will create one connection +// and return with an MRD. The remaining connections will be created in the background +// to avoid open latency. +func WithMinConnections(c int) MRDOption { + return minConnections(c) +} + +type maxConnections int + +func (c maxConnections) apply(params *newMultiRangeDownloaderParams) { + params.maxConnections = int(c) +} + +// WithMaxConnections returns an MRDOption which sets maximum connections +// on the MRD to c. The number of connections will not exceed this number. +// The connections will range between minimum connections and maximum connections +// based on the load. +func WithMaxConnections(c int) MRDOption { + return maxConnections(c) +} + +type targetPendingRanges int + +func (c targetPendingRanges) apply(params *newMultiRangeDownloaderParams) { + params.targetPendingRanges = int(c) +} + +// WithTargetPendingRanges returns an MRDOption which sets target pending +// ranges on the MRD to c. If number of connections in the MRD is less than +// maximum connections, MRD will trigger creation of a new connection when +// pending ranges on all existing streams exceed c. +// +// Note: A new connection can be triggered by either the pending byte threshold +// (WithTargetPendingBytes) or the pending range threshold (WithTargetPendingRanges). +func WithTargetPendingRanges(c int) MRDOption { + return targetPendingRanges(c) +} + +type targetPendingBytes int + +func (c targetPendingBytes) apply(params *newMultiRangeDownloaderParams) { + params.targetPendingBytes = int(c) +} + +// WithTargetPendingBytes returns an MRDOption that sets target pending +// bytes on the MRD to c. If number of connections in the MRD is less than +// maximum connections, MRD will trigger creation of a new connection when +// outstanding bytes on all existing streams exceed c. +// +// Note: A new connection can be triggered by either the pending byte threshold +// (WithTargetPendingBytes) or the pending range threshold (WithTargetPendingRanges). +func WithTargetPendingBytes(c int) MRDOption { + return targetPendingBytes(c) +} + // NewMultiRangeDownloader creates a multi-range reader for an object. // Must be called on a gRPC client created using [NewGRPCClient]. // @@ -164,7 +231,7 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) // NewMultiRangeDownloader creates a multi-range reader for an object. // Must be called on a gRPC client created using [NewGRPCClient]. -func (o *ObjectHandle) NewMultiRangeDownloader(ctx context.Context) (mrd *MultiRangeDownloader, err error) { +func (o *ObjectHandle) NewMultiRangeDownloader(ctx context.Context, opts ...MRDOption) (mrd *MultiRangeDownloader, err error) { // This span covers the life of the MRD. It is closed via the context // in MultiRangeDownloader.Close. var spanCtx context.Context @@ -184,7 +251,7 @@ func (o *ObjectHandle) NewMultiRangeDownloader(ctx context.Context) (mrd *MultiR } } - opts := makeStorageOpts(true, o.retry, o.userProject) + storageOpts := makeStorageOpts(true, o.retry, o.userProject) params := &newMultiRangeDownloaderParams{ bucket: o.bucket, @@ -194,9 +261,13 @@ func (o *ObjectHandle) NewMultiRangeDownloader(ctx context.Context) (mrd *MultiR object: o.object, handle: &o.readHandle, } + // Process configured options + for _, opt := range opts { + opt.apply(params) + } // This call will return the *MultiRangeDownloader with the .impl field set. - return o.c.tc.NewMultiRangeDownloader(spanCtx, params, opts...) + return o.c.tc.NewMultiRangeDownloader(spanCtx, params, storageOpts...) } // decompressiveTranscoding returns true if the request was served decompressed diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/storage.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/storage.go index 3f71ab58..9533df6c 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/storage.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/storage.go @@ -408,7 +408,7 @@ func (s bucketBoundHostname) path(bucket, object string) string { } // PathStyle is the default style, and will generate a URL of the form -// "//". By default, is +// "{host-name}/{bucket-name}/{object-name}". By default, {host-name} is // storage.googleapis.com, but setting an endpoint on the storage Client or // through STORAGE_EMULATOR_HOST overrides this. Setting Hostname on // SignedURLOptions or PostPolicyV4Options overrides everything else. @@ -417,7 +417,7 @@ func PathStyle() URLStyle { } // VirtualHostedStyle generates a URL relative to the bucket's virtual -// hostname, e.g. ".storage.googleapis.com/". +// hostname, e.g. "{bucket-name}.storage.googleapis.com/{object-name}". func VirtualHostedStyle() URLStyle { return virtualHostedStyle{} } @@ -425,7 +425,7 @@ func VirtualHostedStyle() URLStyle { // BucketBoundHostname generates a URL with a custom hostname tied to a // specific GCS bucket. The desired hostname should be passed in using the // hostname argument. Generated urls will be of the form -// "/". See +// "{bucket-bound-hostname}/{object-name}". See // https://cloud.google.com/storage/docs/request-endpoints#cname and // https://cloud.google.com/load-balancing/docs/https/adding-backend-buckets-to-load-balancers // for details. Note that for CNAMEs, only HTTP is supported, so Insecure must @@ -452,7 +452,7 @@ type SignedURLOptions struct { // PrivateKey is the Google service account private key. It is obtainable // from the Google Developers Console. - // At https://console.developers.google.com/project//apiui/credential, + // At https://console.developers.google.com/project/{your-project-id}/apiui/credential, // create a service account client ID or reuse one of your existing service account // credentials. Click on the "Generate new P12 key" to generate and download // a new private key. Once you download the P12 file, use the following command @@ -1550,7 +1550,7 @@ type ObjectAttrs struct { // Owner is the owner of the object. This field is read-only. // - // If non-zero, it is in the form of "user-". + // If non-zero, it is in the form of "user-{userId}". Owner string // Size is the length of the object's content. This field is read-only. diff --git a/ci/resources/stemcell-version-bump/vendor/modules.txt b/ci/resources/stemcell-version-bump/vendor/modules.txt index 0053461a..0058afca 100644 --- a/ci/resources/stemcell-version-bump/vendor/modules.txt +++ b/ci/resources/stemcell-version-bump/vendor/modules.txt @@ -42,8 +42,8 @@ cloud.google.com/go/iam/apiv1/iampb cloud.google.com/go/monitoring/apiv3/v2 cloud.google.com/go/monitoring/apiv3/v2/monitoringpb cloud.google.com/go/monitoring/internal -# cloud.google.com/go/storage v1.60.0 -## explicit; go 1.24.0 +# cloud.google.com/go/storage v1.61.3 +## explicit; go 1.25.0 cloud.google.com/go/storage cloud.google.com/go/storage/experimental cloud.google.com/go/storage/internal