diff --git a/cmd/main.go b/cmd/main.go index 7644988..0541ae8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -53,6 +53,7 @@ import ( swapi "github.com/fluxcd/source-watcher/api/v2/v1beta1" "github.com/fluxcd/source-watcher/v2/internal/controller" + "github.com/fluxcd/source-watcher/v2/internal/features" // +kubebuilder:scaffold:imports ) @@ -119,6 +120,11 @@ func main() { ctrlruntime.SetLogger(gotklogger.NewLogger(logOptions)) + if err := featureGates.WithLogger(setupLog).SupportedFeatures(features.FeatureGates()); err != nil { + setupLog.Error(err, "unable to load feature gates") + os.Exit(1) + } + digestAlgo, err := gotkdigest.AlgorithmForName(artifactOptions.ArtifactDigestAlgo) if err != nil { setupLog.Error(err, "unable to configure canonical digest algorithm") @@ -182,6 +188,15 @@ func main() { os.Exit(1) } + directSourceFetch, err := features.Enabled(gotkctrl.FeatureGateDirectSourceFetch) + if err != nil { + setupLog.Error(err, "unable to check feature gate "+gotkctrl.FeatureGateDirectSourceFetch) + os.Exit(1) + } + if directSourceFetch { + setupLog.Info("DirectSourceFetch feature gate is enabled, sources will be fetched directly from the API server bypassing the cache") + } + // Note that the liveness check will pass beyond this point, but the readiness // check will continue to fail until this controller instance is elected leader. gotkprobes.SetupChecks(mgr, setupLog) @@ -198,6 +213,7 @@ func main() { Storage: artifactStorage, ArtifactFetchRetries: httpRetry, DependencyRequeueInterval: requeueDependency, + DirectSourceFetch: directSourceFetch, NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs, }).SetupWithManager(ctx, mgr, controller.ArtifactGeneratorReconcilerOptions{ RateLimiter: gotkctrl.GetRateLimiter(rateLimiterOptions), diff --git a/go.mod b/go.mod index 04b5b3b..7c5225a 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/fluxcd/pkg/apis/meta v1.25.0 github.com/fluxcd/pkg/artifact v0.8.0 github.com/fluxcd/pkg/http/fetch v0.22.0 - github.com/fluxcd/pkg/runtime v0.100.0 + github.com/fluxcd/pkg/runtime v0.100.1 github.com/fluxcd/pkg/tar v0.17.0 github.com/fluxcd/pkg/testserver v0.13.0 github.com/fluxcd/source-controller/api v1.7.2 diff --git a/go.sum b/go.sum index 535e459..4f3257b 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/fluxcd/pkg/lockedfile v0.7.0 h1:tmzW2GeMGuJMiCcVloXVd1vKZ92anm9WGkRgO github.com/fluxcd/pkg/lockedfile v0.7.0/go.mod h1:AzCV/h1N3hi/KtUDUCUgS8hl1+a1y+I6pmRo25dxdK0= github.com/fluxcd/pkg/oci v0.60.0 h1:uyAoYoj0i9rxFYQchThwfe4i/X0eb5l9wJuDbSAbqGs= github.com/fluxcd/pkg/oci v0.60.0/go.mod h1:5NT4IaYZocOsXLV3IGgj4FRQtSae46DL8Lq3EcDUqME= -github.com/fluxcd/pkg/runtime v0.100.0 h1:7k2T/zlOLZ+knVr5fGB6cqq3Dr9D1k2jEe6AJo91JlI= -github.com/fluxcd/pkg/runtime v0.100.0/go.mod h1:SctSsHvFwUfiOVP1zirP6mo7I8wQtXeWVl2lNQWal88= +github.com/fluxcd/pkg/runtime v0.100.1 h1:UiPmgY8Yv7UF06MT5T8AG9uDGNszm75/DQtK6JEhnrM= +github.com/fluxcd/pkg/runtime v0.100.1/go.mod h1:SctSsHvFwUfiOVP1zirP6mo7I8wQtXeWVl2lNQWal88= github.com/fluxcd/pkg/sourceignore v0.17.0 h1:Z72nruRMhC15zIEpWoDrAcJcJ1El6QDnP/aRDfE4WOA= github.com/fluxcd/pkg/sourceignore v0.17.0/go.mod h1:3e/VmYLId0pI/H5sK7W9Ibif+j0Ahns9RxNjDMtTTfY= github.com/fluxcd/pkg/tar v0.17.0 h1:uNxbFXy8ly8C7fJ8D7w3rjTNJFrb4Hp1aY/30XkfvxY= diff --git a/internal/controller/artifactgenerator_controller.go b/internal/controller/artifactgenerator_controller.go index d267cf4..ddd6e08 100644 --- a/internal/controller/artifactgenerator_controller.go +++ b/internal/controller/artifactgenerator_controller.go @@ -61,6 +61,7 @@ type ArtifactGeneratorReconciler struct { ArtifactFetchRetries int DependencyRequeueInterval time.Duration NoCrossNamespaceRefs bool + DirectSourceFetch bool } // +kubebuilder:rbac:groups=source.extensions.fluxcd.io,resources=artifactgenerators,verbs=get;list;watch;create;update;patchStatus;delete @@ -286,6 +287,12 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, // Map of source alias to observed state. observedSources := make(map[string]swapi.ObservedSource) + // Use APIReader to bypass the cache when DirectSourceFetch is enabled. + var reader client.Reader = r.Client + if r.DirectSourceFetch { + reader = r.APIReader + } + // Get the source objects referenced in the ArtifactGenerator spec. for _, src := range obj.Spec.Sources { namespacedName := client.ObjectKey{ @@ -301,7 +308,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, switch src.Kind { case sourcev1.OCIRepositoryKind: var repository sourcev1.OCIRepository - err := r.Get(ctx, namespacedName, &repository) + err := reader.Get(ctx, namespacedName, &repository) if err != nil { if apierrors.IsNotFound(err) { return nil, err @@ -311,7 +318,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, source = &repository case sourcev1.GitRepositoryKind: var repository sourcev1.GitRepository - err := r.Get(ctx, namespacedName, &repository) + err := reader.Get(ctx, namespacedName, &repository) if err != nil { if apierrors.IsNotFound(err) { return nil, err @@ -321,7 +328,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, source = &repository case sourcev1.BucketKind: var bucket sourcev1.Bucket - err := r.Get(ctx, namespacedName, &bucket) + err := reader.Get(ctx, namespacedName, &bucket) if err != nil { if apierrors.IsNotFound(err) { return nil, err @@ -331,7 +338,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, source = &bucket case sourcev1.HelmChartKind: var chart sourcev1.HelmChart - err := r.Get(ctx, namespacedName, &chart) + err := reader.Get(ctx, namespacedName, &chart) if err != nil { if apierrors.IsNotFound(err) { return nil, err @@ -340,15 +347,15 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context, } source = &chart case sourcev1.ExternalArtifactKind: - var chart sourcev1.ExternalArtifact - err := r.Get(ctx, namespacedName, &chart) + var ea sourcev1.ExternalArtifact + err := reader.Get(ctx, namespacedName, &ea) if err != nil { if apierrors.IsNotFound(err) { return nil, err } return nil, fmt.Errorf("unable to get source '%s': %w", namespacedName, err) } - source = &chart + source = &ea default: return nil, fmt.Errorf("source `%s` kind '%s' not supported", src.Name, src.Kind) diff --git a/internal/controller/artifactgenerator_direct_source_fetch_test.go b/internal/controller/artifactgenerator_direct_source_fetch_test.go new file mode 100644 index 0000000..f55da7a --- /dev/null +++ b/internal/controller/artifactgenerator_direct_source_fetch_test.go @@ -0,0 +1,131 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "testing" + "time" + + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + gotkmeta "github.com/fluxcd/pkg/apis/meta" + gotkconditions "github.com/fluxcd/pkg/runtime/conditions" + gotktestsrv "github.com/fluxcd/pkg/testserver" + sourcev1 "github.com/fluxcd/source-controller/api/v1" + + swapi "github.com/fluxcd/source-watcher/api/v2/v1beta1" +) + +func TestArtifactGeneratorReconciler_DirectSourceFetch(t *testing.T) { + g := NewWithT(t) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Create a namespace + ns, err := testEnv.CreateNamespace(ctx, "direct-fetch-test") + g.Expect(err).ToNot(HaveOccurred()) + + t.Run("reconciles with DirectSourceFetch enabled (uses APIReader)", func(t *testing.T) { + g := NewWithT(t) + + // Create reconciler with DirectSourceFetch enabled + reconciler := &ArtifactGeneratorReconciler{ + ControllerName: controllerName, + Client: testClient, + APIReader: testClient, + Scheme: testEnv.Scheme(), + EventRecorder: testEnv.GetEventRecorderFor(controllerName), + Storage: testStorage, + ArtifactFetchRetries: 1, + DependencyRequeueInterval: 5 * time.Second, + NoCrossNamespaceRefs: true, + DirectSourceFetch: true, // Enable DirectSourceFetch + } + + // Create the ArtifactGenerator object + objKey := client.ObjectKey{ + Name: "direct-fetch-enabled", + Namespace: ns.Name, + } + obj := &swapi.ArtifactGenerator{ + TypeMeta: metav1.TypeMeta{ + Kind: swapi.ArtifactGeneratorKind, + APIVersion: swapi.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: objKey.Name, + Namespace: objKey.Namespace, + }, + Spec: swapi.ArtifactGeneratorSpec{ + Sources: []swapi.SourceReference{ + { + Alias: fmt.Sprintf("%s-git", objKey.Name), + Kind: sourcev1.GitRepositoryKind, + Name: objKey.Name, + }, + }, + OutputArtifacts: []swapi.OutputArtifact{ + { + Name: fmt.Sprintf("%s-git", objKey.Name), + Copy: []swapi.CopyOperation{ + { + From: fmt.Sprintf("@%s-git/**", objKey.Name), + To: "@artifact/", + }, + }, + }, + }, + }, + } + err := testClient.Create(ctx, obj) + g.Expect(err).ToNot(HaveOccurred()) + + // Create the GitRepository source + gitFiles := []gotktestsrv.File{ + {Name: "app.yaml", Body: "apiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: direct-fetch-test"}, + } + err = applyGitRepository(objKey, "main@sha256:directfetch123", gitFiles) + g.Expect(err).ToNot(HaveOccurred()) + + // Initialize the ArtifactGenerator with the finalizer + r, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: objKey, + }) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(r.RequeueAfter).To(BeEquivalentTo(time.Millisecond)) + + // Reconcile to process the sources and build artifacts + r, err = reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: objKey, + }) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(r.RequeueAfter).To(Equal(obj.GetRequeueAfter())) + + // Verify the ArtifactGenerator status + err = testClient.Get(ctx, objKey, obj) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(gotkconditions.IsReady(obj)).To(BeTrue()) + g.Expect(gotkconditions.GetReason(obj, gotkmeta.ReadyCondition)).To(Equal(gotkmeta.SucceededReason)) + + t.Log(objToYaml(obj)) + }) +} diff --git a/internal/features/features.go b/internal/features/features.go new file mode 100644 index 0000000..81607d8 --- /dev/null +++ b/internal/features/features.go @@ -0,0 +1,49 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package features sets the feature gates that source-watcher supports, +// and their default states. +package features + +import ( + "github.com/fluxcd/pkg/runtime/controller" + feathelper "github.com/fluxcd/pkg/runtime/features" +) + +var features = map[string]bool{ + // DirectSourceFetch + // opt-in from v2.1 + controller.FeatureGateDirectSourceFetch: false, +} + +// FeatureGates contains a list of all supported feature gates and +// their default values. +func FeatureGates() map[string]bool { + return features +} + +// Enabled verifies whether the feature is enabled or not. +func Enabled(feature string) (bool, error) { + return feathelper.Enabled(feature) +} + +// Disable disables the specified feature. If the feature is not +// present, it's a no-op. +func Disable(feature string) { + if _, ok := features[feature]; ok { + features[feature] = false + } +}