Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (

swapi "github.com/fluxcd/source-watcher/api/v2/v1beta1"
"github.com/fluxcd/source-watcher/v2/internal/controller"
"github.com/fluxcd/source-watcher/v2/internal/features"
// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -119,6 +120,11 @@ func main() {

ctrlruntime.SetLogger(gotklogger.NewLogger(logOptions))

if err := featureGates.WithLogger(setupLog).SupportedFeatures(features.FeatureGates()); err != nil {
setupLog.Error(err, "unable to load feature gates")
os.Exit(1)
}

digestAlgo, err := gotkdigest.AlgorithmForName(artifactOptions.ArtifactDigestAlgo)
if err != nil {
setupLog.Error(err, "unable to configure canonical digest algorithm")
Expand Down Expand Up @@ -182,6 +188,15 @@ func main() {
os.Exit(1)
}

directSourceFetch, err := features.Enabled(gotkctrl.FeatureGateDirectSourceFetch)
if err != nil {
setupLog.Error(err, "unable to check feature gate "+gotkctrl.FeatureGateDirectSourceFetch)
os.Exit(1)
}
if directSourceFetch {
setupLog.Info("DirectSourceFetch feature gate is enabled, sources will be fetched directly from the API server bypassing the cache")
}

// Note that the liveness check will pass beyond this point, but the readiness
// check will continue to fail until this controller instance is elected leader.
gotkprobes.SetupChecks(mgr, setupLog)
Expand All @@ -198,6 +213,7 @@ func main() {
Storage: artifactStorage,
ArtifactFetchRetries: httpRetry,
DependencyRequeueInterval: requeueDependency,
DirectSourceFetch: directSourceFetch,
NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs,
}).SetupWithManager(ctx, mgr, controller.ArtifactGeneratorReconcilerOptions{
RateLimiter: gotkctrl.GetRateLimiter(rateLimiterOptions),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/fluxcd/pkg/apis/meta v1.25.0
github.com/fluxcd/pkg/artifact v0.8.0
github.com/fluxcd/pkg/http/fetch v0.22.0
github.com/fluxcd/pkg/runtime v0.100.0
github.com/fluxcd/pkg/runtime v0.100.1
github.com/fluxcd/pkg/tar v0.17.0
github.com/fluxcd/pkg/testserver v0.13.0
github.com/fluxcd/source-controller/api v1.7.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ github.com/fluxcd/pkg/lockedfile v0.7.0 h1:tmzW2GeMGuJMiCcVloXVd1vKZ92anm9WGkRgO
github.com/fluxcd/pkg/lockedfile v0.7.0/go.mod h1:AzCV/h1N3hi/KtUDUCUgS8hl1+a1y+I6pmRo25dxdK0=
github.com/fluxcd/pkg/oci v0.60.0 h1:uyAoYoj0i9rxFYQchThwfe4i/X0eb5l9wJuDbSAbqGs=
github.com/fluxcd/pkg/oci v0.60.0/go.mod h1:5NT4IaYZocOsXLV3IGgj4FRQtSae46DL8Lq3EcDUqME=
github.com/fluxcd/pkg/runtime v0.100.0 h1:7k2T/zlOLZ+knVr5fGB6cqq3Dr9D1k2jEe6AJo91JlI=
github.com/fluxcd/pkg/runtime v0.100.0/go.mod h1:SctSsHvFwUfiOVP1zirP6mo7I8wQtXeWVl2lNQWal88=
github.com/fluxcd/pkg/runtime v0.100.1 h1:UiPmgY8Yv7UF06MT5T8AG9uDGNszm75/DQtK6JEhnrM=
github.com/fluxcd/pkg/runtime v0.100.1/go.mod h1:SctSsHvFwUfiOVP1zirP6mo7I8wQtXeWVl2lNQWal88=
github.com/fluxcd/pkg/sourceignore v0.17.0 h1:Z72nruRMhC15zIEpWoDrAcJcJ1El6QDnP/aRDfE4WOA=
github.com/fluxcd/pkg/sourceignore v0.17.0/go.mod h1:3e/VmYLId0pI/H5sK7W9Ibif+j0Ahns9RxNjDMtTTfY=
github.com/fluxcd/pkg/tar v0.17.0 h1:uNxbFXy8ly8C7fJ8D7w3rjTNJFrb4Hp1aY/30XkfvxY=
Expand Down
21 changes: 14 additions & 7 deletions internal/controller/artifactgenerator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type ArtifactGeneratorReconciler struct {
ArtifactFetchRetries int
DependencyRequeueInterval time.Duration
NoCrossNamespaceRefs bool
DirectSourceFetch bool
}

// +kubebuilder:rbac:groups=source.extensions.fluxcd.io,resources=artifactgenerators,verbs=get;list;watch;create;update;patchStatus;delete
Expand Down Expand Up @@ -286,6 +287,12 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
// Map of source alias to observed state.
observedSources := make(map[string]swapi.ObservedSource)

// Use APIReader to bypass the cache when DirectSourceFetch is enabled.
var reader client.Reader = r.Client
if r.DirectSourceFetch {
reader = r.APIReader
}

// Get the source objects referenced in the ArtifactGenerator spec.
for _, src := range obj.Spec.Sources {
namespacedName := client.ObjectKey{
Expand All @@ -301,7 +308,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
switch src.Kind {
case sourcev1.OCIRepositoryKind:
var repository sourcev1.OCIRepository
err := r.Get(ctx, namespacedName, &repository)
err := reader.Get(ctx, namespacedName, &repository)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, err
Expand All @@ -311,7 +318,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
source = &repository
case sourcev1.GitRepositoryKind:
var repository sourcev1.GitRepository
err := r.Get(ctx, namespacedName, &repository)
err := reader.Get(ctx, namespacedName, &repository)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, err
Expand All @@ -321,7 +328,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
source = &repository
case sourcev1.BucketKind:
var bucket sourcev1.Bucket
err := r.Get(ctx, namespacedName, &bucket)
err := reader.Get(ctx, namespacedName, &bucket)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, err
Expand All @@ -331,7 +338,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
source = &bucket
case sourcev1.HelmChartKind:
var chart sourcev1.HelmChart
err := r.Get(ctx, namespacedName, &chart)
err := reader.Get(ctx, namespacedName, &chart)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, err
Expand All @@ -340,15 +347,15 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
}
source = &chart
case sourcev1.ExternalArtifactKind:
var chart sourcev1.ExternalArtifact
err := r.Get(ctx, namespacedName, &chart)
var ea sourcev1.ExternalArtifact
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

err := reader.Get(ctx, namespacedName, &ea)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, err
}
return nil, fmt.Errorf("unable to get source '%s': %w", namespacedName, err)
}
source = &chart
source = &ea
default:
return nil, fmt.Errorf("source `%s` kind '%s' not supported",
src.Name, src.Kind)
Expand Down
131 changes: 131 additions & 0 deletions internal/controller/artifactgenerator_direct_source_fetch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
Copyright 2026 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"fmt"
"testing"
"time"

. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

gotkmeta "github.com/fluxcd/pkg/apis/meta"
gotkconditions "github.com/fluxcd/pkg/runtime/conditions"
gotktestsrv "github.com/fluxcd/pkg/testserver"
sourcev1 "github.com/fluxcd/source-controller/api/v1"

swapi "github.com/fluxcd/source-watcher/api/v2/v1beta1"
)

func TestArtifactGeneratorReconciler_DirectSourceFetch(t *testing.T) {
g := NewWithT(t)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// Create a namespace
ns, err := testEnv.CreateNamespace(ctx, "direct-fetch-test")
g.Expect(err).ToNot(HaveOccurred())

t.Run("reconciles with DirectSourceFetch enabled (uses APIReader)", func(t *testing.T) {
g := NewWithT(t)

// Create reconciler with DirectSourceFetch enabled
reconciler := &ArtifactGeneratorReconciler{
ControllerName: controllerName,
Client: testClient,
APIReader: testClient,
Scheme: testEnv.Scheme(),
EventRecorder: testEnv.GetEventRecorderFor(controllerName),
Storage: testStorage,
ArtifactFetchRetries: 1,
DependencyRequeueInterval: 5 * time.Second,
NoCrossNamespaceRefs: true,
DirectSourceFetch: true, // Enable DirectSourceFetch
}

// Create the ArtifactGenerator object
objKey := client.ObjectKey{
Name: "direct-fetch-enabled",
Namespace: ns.Name,
}
obj := &swapi.ArtifactGenerator{
TypeMeta: metav1.TypeMeta{
Kind: swapi.ArtifactGeneratorKind,
APIVersion: swapi.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: objKey.Name,
Namespace: objKey.Namespace,
},
Spec: swapi.ArtifactGeneratorSpec{
Sources: []swapi.SourceReference{
{
Alias: fmt.Sprintf("%s-git", objKey.Name),
Kind: sourcev1.GitRepositoryKind,
Name: objKey.Name,
},
},
OutputArtifacts: []swapi.OutputArtifact{
{
Name: fmt.Sprintf("%s-git", objKey.Name),
Copy: []swapi.CopyOperation{
{
From: fmt.Sprintf("@%s-git/**", objKey.Name),
To: "@artifact/",
},
},
},
},
},
}
err := testClient.Create(ctx, obj)
g.Expect(err).ToNot(HaveOccurred())

// Create the GitRepository source
gitFiles := []gotktestsrv.File{
{Name: "app.yaml", Body: "apiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: direct-fetch-test"},
}
err = applyGitRepository(objKey, "main@sha256:directfetch123", gitFiles)
g.Expect(err).ToNot(HaveOccurred())

// Initialize the ArtifactGenerator with the finalizer
r, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: objKey,
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(r.RequeueAfter).To(BeEquivalentTo(time.Millisecond))

// Reconcile to process the sources and build artifacts
r, err = reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: objKey,
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(r.RequeueAfter).To(Equal(obj.GetRequeueAfter()))

// Verify the ArtifactGenerator status
err = testClient.Get(ctx, objKey, obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(gotkconditions.IsReady(obj)).To(BeTrue())
g.Expect(gotkconditions.GetReason(obj, gotkmeta.ReadyCondition)).To(Equal(gotkmeta.SucceededReason))

t.Log(objToYaml(obj))
})
}
49 changes: 49 additions & 0 deletions internal/features/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2026 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package features sets the feature gates that source-watcher supports,
// and their default states.
package features

import (
"github.com/fluxcd/pkg/runtime/controller"
feathelper "github.com/fluxcd/pkg/runtime/features"
)

var features = map[string]bool{
// DirectSourceFetch
// opt-in from v2.1
controller.FeatureGateDirectSourceFetch: false,
}

// FeatureGates contains a list of all supported feature gates and
// their default values.
func FeatureGates() map[string]bool {
return features
}

// Enabled verifies whether the feature is enabled or not.
func Enabled(feature string) (bool, error) {
return feathelper.Enabled(feature)
}

// Disable disables the specified feature. If the feature is not
// present, it's a no-op.
func Disable(feature string) {
if _, ok := features[feature]; ok {
features[feature] = false
}
}
Loading