From 9b1cf7889c35585f09d9f9ecb88aa67c4666f021 Mon Sep 17 00:00:00 2001 From: Dipti Pai Date: Fri, 13 Feb 2026 15:25:49 -0800 Subject: [PATCH] Add DirectSourceFetch feature gate to bypass cache for source objects This feature gate enables fetching source objects (GitRepository, OCIRepository, Bucket) directly from the API server using APIReader, bypassing the controller's cache. This can be useful when immediate consistency is required for source object reads. When enabled via --feature-gates=DirectSourceFetch=true: Source objects are fetched using r.APIReader instead of r.Client A log message is emitted at startup indicating the feature is active Changes: Add DirectSourceFetch field to ArtifactGeneratorReconciler struct Update getSource() to use APIReader when feature is enabled Add internal/features package with feature gate definitions Register feature gate with default value false (opt-in) Add unit tests for sources Update pkg/runtime dependency to v0.100.1 Signed-off-by: Dipti Pai --- cmd/main.go | 16 +++ go.mod | 2 +- go.sum | 4 +- .../artifactgenerator_controller.go | 21 ++- ...ifactgenerator_direct_source_fetch_test.go | 131 ++++++++++++++++++ internal/features/features.go | 49 +++++++ 6 files changed, 213 insertions(+), 10 deletions(-) create mode 100644 internal/controller/artifactgenerator_direct_source_fetch_test.go create mode 100644 internal/features/features.go diff --git a/cmd/main.go b/cmd/main.go index 7644988..0541ae8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -53,6 +53,7 @@ import ( swapi "github.com/fluxcd/source-watcher/api/v2/v1beta1" "github.com/fluxcd/source-watcher/v2/internal/controller" + "github.com/fluxcd/source-watcher/v2/internal/features" // +kubebuilder:scaffold:imports ) @@ -119,6 +120,11 @@ func main() { ctrlruntime.SetLogger(gotklogger.NewLogger(logOptions)) + if err := featureGates.WithLogger(setupLog).SupportedFeatures(features.FeatureGates()); err != nil { + setupLog.Error(err, "unable to load feature gates") + os.Exit(1) + } + digestAlgo, err := gotkdigest.AlgorithmForName(artifactOptions.ArtifactDigestAlgo) if err != nil { setupLog.Error(err, "unable to configure canonical digest algorithm") @@ -182,6 +188,15 @@ func main() { os.Exit(1) } + directSourceFetch, err := features.Enabled(gotkctrl.FeatureGateDirectSourceFetch) + if err != nil { + setupLog.Error(err, "unable to check feature gate "+gotkctrl.FeatureGateDirectSourceFetch) + os.Exit(1) + } + if directSourceFetch { + setupLog.Info("DirectSourceFetch feature gate is enabled, sources will be fetched directly from the API server bypassing the cache") + } + // Note that the liveness check will pass beyond this point, but the readiness // check will continue to fail until this controller instance is elected leader. gotkprobes.SetupChecks(mgr, setupLog) @@ -198,6 +213,7 @@ func main() { Storage: artifactStorage, ArtifactFetchRetries: httpRetry, DependencyRequeueInterval: requeueDependency, + DirectSourceFetch: directSourceFetch, NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs, }).SetupWithManager(ctx, mgr, controller.ArtifactGeneratorReconcilerOptions{ RateLimiter: gotkctrl.GetRateLimiter(rateLimiterOptions), diff --git a/go.mod b/go.mod index 04b5b3b..7c5225a 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/fluxcd/pkg/apis/meta v1.25.0 github.com/fluxcd/pkg/artifact v0.8.0 github.com/fluxcd/pkg/http/fetch v0.22.0 - github.com/fluxcd/pkg/runtime v0.100.0 + github.com/fluxcd/pkg/runtime v0.100.1 github.com/fluxcd/pkg/tar v0.17.0 github.com/fluxcd/pkg/testserver v0.13.0 github.com/fluxcd/source-controller/api v1.7.2 diff --git a/go.sum b/go.sum index 535e459..4f3257b 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/fluxcd/pkg/lockedfile v0.7.0 h1:tmzW2GeMGuJMiCcVloXVd1vKZ92anm9WGkRgO github.com/fluxcd/pkg/lockedfile v0.7.0/go.mod h1:AzCV/h1N3hi/KtUDUCUgS8hl1+a1y+I6pmRo25dxdK0= github.com/fluxcd/pkg/oci v0.60.0 h1:uyAoYoj0i9rxFYQchThwfe4i/X0eb5l9wJuDbSAbqGs= github.com/fluxcd/pkg/oci v0.60.0/go.mod h1:5NT4IaYZocOsXLV3IGgj4FRQtSae46DL8Lq3EcDUqME= -github.com/fluxcd/pkg/runtime v0.100.0 h1:7k2T/zlOLZ+knVr5fGB6cqq3Dr9D1k2jEe6AJo91JlI= -github.com/fluxcd/pkg/runtime v0.100.0/go.mod h1:SctSsHvFwUfiOVP1zirP6mo7I8wQtXeWVl2lNQWal88= +github.com/fluxcd/pkg/runtime v0.100.1 h1:UiPmgY8Yv7UF06MT5T8AG9uDGNszm75/DQtK6JEhnrM= +github.com/fluxcd/pkg/runtime v0.100.1/go.mod h1:SctSsHvFwUfiOVP1zirP6mo7I8wQtXeWVl2lNQWal88= github.com/fluxcd/pkg/sourceignore v0.17.0 h1:Z72nruRMhC15zIEpWoDrAcJcJ1El6QDnP/aRDfE4WOA= github.com/fluxcd/pkg/sourceignore v0.17.0/go.mod h1:3e/VmYLId0pI/H5sK7W9Ibif+j0Ahns9RxNjDMtTTfY= github.com/fluxcd/pkg/tar v0.17.0 h1:uNxbFXy8ly8C7fJ8D7w3rjTNJFrb4Hp1aY/30XkfvxY= diff --git a/internal/controller/artifactgenerator_controller.go b/internal/controller/artifactgenerator_controller.go index d267cf4..ddd6e08 100644 --- a/internal/controller/artifactgenerator_controller.go +++ b/internal/controller/artifactgenerator_controller.go @@ -61,6 +61,7 @@ type ArtifactGeneratorReconciler struct { ArtifactFetchRetries int DependencyRequeueInterval time.Duration NoCrossNamespaceRefs bool + DirectSourceFetch bool } // +kubebuilder:rbac:groups=source.extensions.fluxcd.io,resources=artifactgenerators,verbs=get;list;watch;create;update;patchStatus;delete @@ -286,6 +287,12 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, // Map of source alias to observed state. observedSources := make(map[string]swapi.ObservedSource) + // Use APIReader to bypass the cache when DirectSourceFetch is enabled. + var reader client.Reader = r.Client + if r.DirectSourceFetch { + reader = r.APIReader + } + // Get the source objects referenced in the ArtifactGenerator spec. for _, src := range obj.Spec.Sources { namespacedName := client.ObjectKey{ @@ -301,7 +308,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, switch src.Kind { case sourcev1.OCIRepositoryKind: var repository sourcev1.OCIRepository - err := r.Get(ctx, namespacedName, &repository) + err := reader.Get(ctx, namespacedName, &repository) if err != nil { if apierrors.IsNotFound(err) { return nil, err @@ -311,7 +318,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, source = &repository case sourcev1.GitRepositoryKind: var repository sourcev1.GitRepository - err := r.Get(ctx, namespacedName, &repository) + err := reader.Get(ctx, namespacedName, &repository) if err != nil { if apierrors.IsNotFound(err) { return nil, err @@ -321,7 +328,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, source = &repository case sourcev1.BucketKind: var bucket sourcev1.Bucket - err := r.Get(ctx, namespacedName, &bucket) + err := reader.Get(ctx, namespacedName, &bucket) if err != nil { if apierrors.IsNotFound(err) { return nil, err @@ -331,7 +338,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, source = &bucket case sourcev1.HelmChartKind: var chart sourcev1.HelmChart - err := r.Get(ctx, namespacedName, &chart) + err := reader.Get(ctx, namespacedName, &chart) if err != nil { if apierrors.IsNotFound(err) { return nil, err @@ -340,15 +347,15 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, } source = &chart case sourcev1.ExternalArtifactKind: - var chart sourcev1.ExternalArtifact - err := r.Get(ctx, namespacedName, &chart) + var ea sourcev1.ExternalArtifact + err := reader.Get(ctx, namespacedName, &ea) if err != nil { if apierrors.IsNotFound(err) { return nil, err } return nil, fmt.Errorf("unable to get source '%s': %w", namespacedName, err) } - source = &chart + source = &ea default: return nil, fmt.Errorf("source `%s` kind '%s' not supported", src.Name, src.Kind) diff --git a/internal/controller/artifactgenerator_direct_source_fetch_test.go b/internal/controller/artifactgenerator_direct_source_fetch_test.go new file mode 100644 index 0000000..f55da7a --- /dev/null +++ b/internal/controller/artifactgenerator_direct_source_fetch_test.go @@ -0,0 +1,131 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "testing" + "time" + + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + gotkmeta "github.com/fluxcd/pkg/apis/meta" + gotkconditions "github.com/fluxcd/pkg/runtime/conditions" + gotktestsrv "github.com/fluxcd/pkg/testserver" + sourcev1 "github.com/fluxcd/source-controller/api/v1" + + swapi "github.com/fluxcd/source-watcher/api/v2/v1beta1" +) + +func TestArtifactGeneratorReconciler_DirectSourceFetch(t *testing.T) { + g := NewWithT(t) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Create a namespace + ns, err := testEnv.CreateNamespace(ctx, "direct-fetch-test") + g.Expect(err).ToNot(HaveOccurred()) + + t.Run("reconciles with DirectSourceFetch enabled (uses APIReader)", func(t *testing.T) { + g := NewWithT(t) + + // Create reconciler with DirectSourceFetch enabled + reconciler := &ArtifactGeneratorReconciler{ + ControllerName: controllerName, + Client: testClient, + APIReader: testClient, + Scheme: testEnv.Scheme(), + EventRecorder: testEnv.GetEventRecorderFor(controllerName), + Storage: testStorage, + ArtifactFetchRetries: 1, + DependencyRequeueInterval: 5 * time.Second, + NoCrossNamespaceRefs: true, + DirectSourceFetch: true, // Enable DirectSourceFetch + } + + // Create the ArtifactGenerator object + objKey := client.ObjectKey{ + Name: "direct-fetch-enabled", + Namespace: ns.Name, + } + obj := &swapi.ArtifactGenerator{ + TypeMeta: metav1.TypeMeta{ + Kind: swapi.ArtifactGeneratorKind, + APIVersion: swapi.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: objKey.Name, + Namespace: objKey.Namespace, + }, + Spec: swapi.ArtifactGeneratorSpec{ + Sources: []swapi.SourceReference{ + { + Alias: fmt.Sprintf("%s-git", objKey.Name), + Kind: sourcev1.GitRepositoryKind, + Name: objKey.Name, + }, + }, + OutputArtifacts: []swapi.OutputArtifact{ + { + Name: fmt.Sprintf("%s-git", objKey.Name), + Copy: []swapi.CopyOperation{ + { + From: fmt.Sprintf("@%s-git/**", objKey.Name), + To: "@artifact/", + }, + }, + }, + }, + }, + } + err := testClient.Create(ctx, obj) + g.Expect(err).ToNot(HaveOccurred()) + + // Create the GitRepository source + gitFiles := []gotktestsrv.File{ + {Name: "app.yaml", Body: "apiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: direct-fetch-test"}, + } + err = applyGitRepository(objKey, "main@sha256:directfetch123", gitFiles) + g.Expect(err).ToNot(HaveOccurred()) + + // Initialize the ArtifactGenerator with the finalizer + r, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: objKey, + }) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(r.RequeueAfter).To(BeEquivalentTo(time.Millisecond)) + + // Reconcile to process the sources and build artifacts + r, err = reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: objKey, + }) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(r.RequeueAfter).To(Equal(obj.GetRequeueAfter())) + + // Verify the ArtifactGenerator status + err = testClient.Get(ctx, objKey, obj) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(gotkconditions.IsReady(obj)).To(BeTrue()) + g.Expect(gotkconditions.GetReason(obj, gotkmeta.ReadyCondition)).To(Equal(gotkmeta.SucceededReason)) + + t.Log(objToYaml(obj)) + }) +} diff --git a/internal/features/features.go b/internal/features/features.go new file mode 100644 index 0000000..81607d8 --- /dev/null +++ b/internal/features/features.go @@ -0,0 +1,49 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package features sets the feature gates that source-watcher supports, +// and their default states. +package features + +import ( + "github.com/fluxcd/pkg/runtime/controller" + feathelper "github.com/fluxcd/pkg/runtime/features" +) + +var features = map[string]bool{ + // DirectSourceFetch + // opt-in from v2.1 + controller.FeatureGateDirectSourceFetch: false, +} + +// FeatureGates contains a list of all supported feature gates and +// their default values. +func FeatureGates() map[string]bool { + return features +} + +// Enabled verifies whether the feature is enabled or not. +func Enabled(feature string) (bool, error) { + return feathelper.Enabled(feature) +} + +// Disable disables the specified feature. If the feature is not +// present, it's a no-op. +func Disable(feature string) { + if _, ok := features[feature]; ok { + features[feature] = false + } +}