From f2799fb734ab7a78fe2901b6af5fed7a68dad853 Mon Sep 17 00:00:00 2001 From: Adrian Fernandez De La Torre Date: Thu, 26 Mar 2026 02:22:16 +0100 Subject: [PATCH] Migrate event recorder to event/v1 Encapsulated under fluxcd/pkg/runtime/events Signed-off-by: Adrian Fernandez De La Torre --- internal/controller/bucket_controller.go | 20 ++--- internal/controller/bucket_controller_test.go | 53 ++++++++---- .../controller/gitrepository_controller.go | 20 ++--- .../gitrepository_controller_test.go | 81 ++++++++++++------- internal/controller/helmchart_controller.go | 22 ++--- .../controller/helmchart_controller_test.go | 66 +++++++++------ .../controller/helmrepository_controller.go | 20 ++--- .../helmrepository_controller_test.go | 57 +++++++++---- .../controller/ocirepository_controller.go | 20 ++--- .../ocirepository_controller_test.go | 64 ++++++++++----- internal/controller/suite_test.go | 12 +-- internal/reconcile/summarize/processor.go | 18 ++--- .../reconcile/summarize/processor_test.go | 4 +- internal/reconcile/summarize/summary.go | 6 +- internal/reconcile/summarize/summary_test.go | 6 +- main.go | 17 ++-- 16 files changed, 301 insertions(+), 185 deletions(-) diff --git a/internal/controller/bucket_controller.go b/internal/controller/bucket_controller.go index dbd163dcb..a83047d07 100644 --- a/internal/controller/bucket_controller.go +++ b/internal/controller/bucket_controller.go @@ -33,7 +33,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -42,7 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" intdigest "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" @@ -50,6 +49,7 @@ import ( "github.com/fluxcd/pkg/cache" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" @@ -125,7 +125,7 @@ var bucketFailConditions = []string{ // BucketReconciler reconciles a v1.Bucket object. type BucketReconciler struct { client.Client - kuberecorder.EventRecorder + events.EventRecorder helper.Metrics Storage *storage.Storage @@ -347,13 +347,13 @@ func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1. // Notify on new artifact and failure recovery. if !oldObj.GetArtifact().HasDigest(newObj.GetArtifact().Digest) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - "NewArtifact", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + "NewArtifact", eventv1.ActionApplied, message) ctrl.LoggerFrom(ctx).Info(message) } else { if sreconcile.FailureRecovery(oldObj, newObj, bucketFailConditions) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - meta.SucceededReason, message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + meta.SucceededReason, eventv1.ActionReconciled, message) ctrl.LoggerFrom(ctx).Info(message) } } @@ -387,7 +387,7 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.Seria // matches the actual artifact if !artifactMissing { if err := r.Storage.VerifyArtifact(*artifact); err != nil { - r.Eventf(obj, corev1.EventTypeWarning, "ArtifactVerificationFailed", "failed to verify integrity of artifact: %s", err.Error()) + r.Eventf(obj, nil, corev1.EventTypeWarning, "ArtifactVerificationFailed", eventv1.ActionFailed, "failed to verify integrity of artifact: %s", err.Error()) if err = r.Storage.Remove(*artifact); err != nil { return sreconcile.ResultEmpty, fmt.Errorf("failed to remove artifact after digest mismatch: %w", err) @@ -665,13 +665,15 @@ func (r *BucketReconciler) eventLogf(ctx context.Context, obj runtime.Object, ev func (r *BucketReconciler) annotatedEventLogf(ctx context.Context, obj runtime.Object, annotations map[string]string, eventType string, reason string, messageFmt string, args ...interface{}) { msg := fmt.Sprintf(messageFmt, args...) + action := eventv1.ActionReconciled // Log and emit event. if eventType == corev1.EventTypeWarning { + action = eventv1.ActionFailed ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) } else { ctrl.LoggerFrom(ctx).Info(msg) } - r.AnnotatedEventf(obj, annotations, eventType, reason, msg) + r.AnnotatedEventf(obj, nil, annotations, eventType, reason, action, messageFmt, args...) } // fetchEtagIndex fetches the current etagIndex for the in the obj specified diff --git a/internal/controller/bucket_controller_test.go b/internal/controller/bucket_controller_test.go index 00ed46cb7..9529162f1 100644 --- a/internal/controller/bucket_controller_test.go +++ b/internal/controller/bucket_controller_test.go @@ -30,19 +30,22 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" + + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" intdigest "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" @@ -85,7 +88,7 @@ func TestBucketReconciler_deleteBeforeFinalizer(t *testing.T) { r := &BucketReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, } // NOTE: Only a real API server responds with an error in this scenario. @@ -383,7 +386,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.Bucket{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), } @@ -918,7 +921,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { } r := &BucketReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Client: clientBuilder.Build(), Storage: testStorage, patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), @@ -1385,7 +1388,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { } r := &BucketReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Client: clientBuilder.Build(), Storage: testStorage, patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), @@ -1589,7 +1592,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { r := &BucketReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), } @@ -1712,7 +1715,7 @@ func TestBucketReconciler_statusConditions(t *testing.T) { } ctx := context.TODO() - summarizeHelper := summarize.NewHelper(record.NewFakeRecorder(32), serialPatcher) + summarizeHelper := summarize.NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summarizeOpts := []summarize.Option{ summarize.WithConditions(bucketReadyCondition), summarize.WithReconcileResult(sreconcile.ResultSuccess), @@ -1739,7 +1742,7 @@ func TestBucketReconciler_notify(t *testing.T) { resErr error oldObjBeforeFunc func(obj *sourcev1.Bucket) newObjBeforeFunc func(obj *sourcev1.Bucket) - wantEvent string + wantEvent *eventsv1.Event }{ { name: "error - no event", @@ -1753,7 +1756,12 @@ func TestBucketReconciler_notify(t *testing.T) { newObjBeforeFunc: func(obj *sourcev1.Bucket) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} }, - wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact with 2 fetched files from", + }, }, { name: "recovery from failure", @@ -1768,7 +1776,12 @@ func TestBucketReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal Succeeded stored artifact with 2 fetched files from", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "Succeeded", + Action: eventv1.ActionReconciled, + Note: "stored artifact with 2 fetched files from", + }, }, { name: "recovery and new artifact", @@ -1783,7 +1796,12 @@ func TestBucketReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "aaa", Digest: "bbb"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact with 2 fetched files from", + }, }, { name: "no updates", @@ -1804,7 +1822,7 @@ func TestBucketReconciler_notify(t *testing.T) { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - recorder := record.NewFakeRecorder(32) + recorder := events.NewFakeRecorder(32, false) oldObj := &sourcev1.Bucket{ Spec: sourcev1.BucketSpec{ @@ -1832,12 +1850,15 @@ func TestBucketReconciler_notify(t *testing.T) { select { case x, ok := <-recorder.Events: - g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") - if tt.wantEvent != "" { - g.Expect(x).To(ContainSubstring(tt.wantEvent)) + g.Expect(ok).To(Equal(tt.wantEvent != nil), "unexpected event received") + if tt.wantEvent != nil { + g.Expect(x.Type).To(Equal(tt.wantEvent.Type)) + g.Expect(x.Reason).To(Equal(tt.wantEvent.Reason)) + g.Expect(x.Action).To(Equal(tt.wantEvent.Action)) + g.Expect(x.Note).To(ContainSubstring(tt.wantEvent.Note)) } default: - if tt.wantEvent != "" { + if tt.wantEvent != nil { t.Errorf("expected some event to be emitted") } } diff --git a/internal/controller/gitrepository_controller.go b/internal/controller/gitrepository_controller.go index cf36de22c..5181915ab 100644 --- a/internal/controller/gitrepository_controller.go +++ b/internal/controller/gitrepository_controller.go @@ -30,13 +30,13 @@ import ( "github.com/fluxcd/pkg/auth" "github.com/fluxcd/pkg/auth/githubapp" authutils "github.com/fluxcd/pkg/auth/utils" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/logger" "github.com/fluxcd/pkg/runtime/secrets" "github.com/go-git/go-git/v5/plumbing/transport" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" @@ -47,7 +47,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/cache" @@ -129,7 +129,7 @@ func getPatchOptions(ownedConditions []string, controllerName string) []patch.Op // GitRepositoryReconciler reconciles a v1.GitRepository object. type GitRepositoryReconciler struct { client.Client - kuberecorder.EventRecorder + events.EventRecorder helper.Metrics Storage *storage.Storage @@ -341,13 +341,13 @@ func (r *GitRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *so // Notify on new artifact and failure recovery. if !oldObj.GetArtifact().HasDigest(newObj.GetArtifact().Digest) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - "NewArtifact", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + "NewArtifact", eventv1.ActionApplied, message) ctrl.LoggerFrom(ctx).Info(message) } else { if sreconcile.FailureRecovery(oldObj, newObj, gitRepositoryFailConditions) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - meta.SucceededReason, message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + meta.SucceededReason, eventv1.ActionReconciled, message) ctrl.LoggerFrom(ctx).Info(message) } } @@ -401,7 +401,7 @@ func (r *GitRepositoryReconciler) reconcileStorage(ctx context.Context, sp *patc // matches the actual artifact if !artifactMissing { if err := r.Storage.VerifyArtifact(*artifact); err != nil { - r.Eventf(obj, corev1.EventTypeWarning, "ArtifactVerificationFailed", "failed to verify integrity of artifact: %s", err.Error()) + r.Eventf(obj, nil, corev1.EventTypeWarning, "ArtifactVerificationFailed", eventv1.ActionFailed, "failed to verify integrity of artifact: %s", err.Error()) if err = r.Storage.Remove(*artifact); err != nil { return sreconcile.ResultEmpty, fmt.Errorf("failed to remove artifact after digest mismatch: %w", err) @@ -1229,13 +1229,15 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc // about the event. func (r *GitRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) { msg := fmt.Sprintf(messageFmt, args...) + action := eventv1.ActionReconciled // Log and emit event. if eventType == corev1.EventTypeWarning { + action = eventv1.ActionFailed ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) } else { ctrl.LoggerFrom(ctx).Info(msg) } - r.Eventf(obj, eventType, reason, msg) + r.Eventf(obj, nil, eventType, reason, action, msg) } // gitContentConfigChanged evaluates the current spec with the observations of diff --git a/internal/controller/gitrepository_controller_test.go b/internal/controller/gitrepository_controller_test.go index 46835e5d7..c1c145ea8 100644 --- a/internal/controller/gitrepository_controller_test.go +++ b/internal/controller/gitrepository_controller_test.go @@ -38,8 +38,8 @@ import ( . "github.com/onsi/gomega" sshtestdata "golang.org/x/crypto/ssh/testdata" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -47,6 +47,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" @@ -55,6 +56,7 @@ import ( "github.com/fluxcd/pkg/gittestserver" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/ssh" @@ -198,7 +200,7 @@ func TestGitRepositoryReconciler_deleteBeforeFinalizer(t *testing.T) { r := &GitRepositoryReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, } // NOTE: Only a real API server responds with an error in this scenario. @@ -311,7 +313,7 @@ func TestGitRepositoryReconciler_reconcileSource_emptyRepository(t *testing.T) { r := &GitRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), } @@ -868,7 +870,7 @@ func TestGitRepositoryReconciler_reconcileSource_authStrategy(t *testing.T) { r := &GitRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), } @@ -1033,7 +1035,7 @@ func TestGitRepositoryReconciler_getAuthOpts_provider(t *testing.T) { obj := &sourcev1.GitRepository{} r := &GitRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Client: clientBuilder.Build(), features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -1254,7 +1256,7 @@ func TestGitRepositoryReconciler_reconcileSource_checkoutStrategy(t *testing.T) WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.GitRepository{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), } @@ -1458,7 +1460,7 @@ func TestGitRepositoryReconciler_reconcileArtifact(t *testing.T) { resetChmod(tt.dir, 0o750, 0o600) r := &GitRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -1609,7 +1611,7 @@ func TestGitRepositoryReconciler_reconcileInclude(t *testing.T) { r := &GitRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: storage, requeueDependency: dependencyInterval, features: features.FeatureGates(), @@ -1866,7 +1868,7 @@ func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.GitRepository{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -1920,7 +1922,7 @@ func TestGitRepositoryReconciler_reconcileDelete(t *testing.T) { g := NewWithT(t) r := &GitRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -2345,7 +2347,7 @@ func TestGitRepositoryReconciler_verifySignature(t *testing.T) { } r := &GitRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Client: clientBuilder.Build(), features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -2498,7 +2500,7 @@ func TestGitRepositoryReconciler_ConditionsUpdate(t *testing.T) { r := &GitRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -2748,7 +2750,7 @@ func TestGitRepositoryReconciler_statusConditions(t *testing.T) { } ctx := context.TODO() - summarizeHelper := summarize.NewHelper(record.NewFakeRecorder(32), serialPatcher) + summarizeHelper := summarize.NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summarizeOpts := []summarize.Option{ summarize.WithConditions(gitRepositoryReadyCondition), summarize.WithBiPolarityConditionTypes(sourcev1.SourceVerifiedCondition), @@ -2789,7 +2791,7 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { oldObjBeforeFunc func(obj *sourcev1.GitRepository) newObjBeforeFunc func(obj *sourcev1.GitRepository) commit git.Commit - wantEvent string + wantEvent *eventsv1.Event }{ { name: "error - no event", @@ -2803,8 +2805,13 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { newObjBeforeFunc: func(obj *sourcev1.GitRepository) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} }, - commit: concreteCommit, - wantEvent: "Normal NewArtifact stored artifact for commit 'test commit'", + commit: concreteCommit, + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact for commit 'test commit'", + }, }, { name: "recovery from failure", @@ -2819,8 +2826,13 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - commit: concreteCommit, - wantEvent: "Normal Succeeded stored artifact for commit 'test commit'", + commit: concreteCommit, + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "Succeeded", + Action: eventv1.ActionReconciled, + Note: "stored artifact for commit 'test commit'", + }, }, { name: "recovery and new artifact", @@ -2835,8 +2847,13 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "aaa", Digest: "bbb"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - commit: concreteCommit, - wantEvent: "Normal NewArtifact stored artifact for commit 'test commit'", + commit: concreteCommit, + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact for commit 'test commit'", + }, }, { name: "no updates", @@ -2864,15 +2881,20 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - commit: partialCommit, // no-op will always result in partial commit. - wantEvent: "Normal Succeeded stored artifact for commit 'sha1:b9b3feadba509cb9b22e968a5d27e96c2bc2ff91'", + commit: partialCommit, // no-op will always result in partial commit. + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "Succeeded", + Action: eventv1.ActionReconciled, + Note: "stored artifact for commit 'sha1:b9b3feadba509cb9b22e968a5d27e96c2bc2ff91'", + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - recorder := record.NewFakeRecorder(32) + recorder := events.NewFakeRecorder(32, false) oldObj := &sourcev1.GitRepository{} newObj := oldObj.DeepCopy() @@ -2893,12 +2915,15 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { select { case x, ok := <-recorder.Events: - g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") - if tt.wantEvent != "" { - g.Expect(x).To(ContainSubstring(tt.wantEvent)) + g.Expect(ok).To(Equal(tt.wantEvent != nil), "unexpected event received") + if tt.wantEvent != nil { + g.Expect(x.Type).To(Equal(tt.wantEvent.Type)) + g.Expect(x.Reason).To(Equal(tt.wantEvent.Reason)) + g.Expect(x.Action).To(Equal(tt.wantEvent.Action)) + g.Expect(x.Note).To(ContainSubstring(tt.wantEvent.Note)) } default: - if tt.wantEvent != "" { + if tt.wantEvent != nil { t.Errorf("expected some event to be emitted") } } @@ -3029,7 +3054,7 @@ func TestGitRepositoryReconciler_fetchIncludes(t *testing.T) { r := &GitRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), } diff --git a/internal/controller/helmchart_controller.go b/internal/controller/helmchart_controller.go index 963d75dde..7c974b120 100644 --- a/internal/controller/helmchart_controller.go +++ b/internal/controller/helmchart_controller.go @@ -40,7 +40,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -51,12 +50,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/git" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" @@ -128,7 +128,7 @@ var helmChartFailConditions = []string{ // HelmChartReconciler reconciles a HelmChart object type HelmChartReconciler struct { client.Client - kuberecorder.EventRecorder + events.EventRecorder helper.Metrics Storage *storage.Storage @@ -348,13 +348,13 @@ func (r *HelmChartReconciler) notify(ctx context.Context, oldObj, newObj *source // Notify on new artifact and failure recovery. if !oldObj.GetArtifact().HasDigest(newObj.GetArtifact().Digest) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - reasonForBuild(build), build.Summary()) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + reasonForBuild(build), eventv1.ActionApplied, build.Summary()) ctrl.LoggerFrom(ctx).Info(build.Summary()) } else { if sreconcile.FailureRecovery(oldObj, newObj, helmChartFailConditions) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - reasonForBuild(build), build.Summary()) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + reasonForBuild(build), eventv1.ActionApplied, build.Summary()) ctrl.LoggerFrom(ctx).Info(build.Summary()) } } @@ -388,7 +388,7 @@ func (r *HelmChartReconciler) reconcileStorage(ctx context.Context, sp *patch.Se // matches the actual artifact if !artifactMissing { if err := r.Storage.VerifyArtifact(*artifact); err != nil { - r.Eventf(obj, corev1.EventTypeWarning, "ArtifactVerificationFailed", "failed to verify integrity of artifact: %s", err.Error()) + r.Eventf(obj, nil, corev1.EventTypeWarning, "ArtifactVerificationFailed", eventv1.ActionFailed, "failed to verify integrity of artifact: %s", err.Error()) if err = r.Storage.Remove(*artifact); err != nil { return sreconcile.ResultEmpty, fmt.Errorf("failed to remove artifact after digest mismatch: %w", err) @@ -482,7 +482,7 @@ func (r *HelmChartReconciler) reconcileSource(ctx context.Context, sp *patch.Ser // a sudden (partial) disappearance of observed state. // TODO(hidde): include specific name/version information? if depNum := build.ResolvedDependencies; build.Complete() && depNum > 0 { - r.Eventf(obj, eventv1.EventTypeTrace, "ResolvedDependencies", "resolved %d chart dependencies", depNum) + r.Eventf(obj, nil, eventv1.EventTypeTrace, "ResolvedDependencies", eventv1.ActionReconciled, "resolved %d chart dependencies", depNum) } // Handle any build error @@ -1209,13 +1209,15 @@ func (r *HelmChartReconciler) requestsForBucketChange(ctx context.Context, o cli // about the event. func (r *HelmChartReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) { msg := fmt.Sprintf(messageFmt, args...) + action := eventv1.ActionReconciled // Log and emit event. if eventType == corev1.EventTypeWarning { + action = eventv1.ActionFailed ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) } else { ctrl.LoggerFrom(ctx).Info(msg) } - r.Eventf(obj, eventType, reason, msg) + r.Eventf(obj, nil, eventType, reason, action, msg) } // observeChartBuild records the observation on the given given build and error on the object. diff --git a/internal/controller/helmchart_controller_test.go b/internal/controller/helmchart_controller_test.go index 23188e968..9fcaab0f6 100644 --- a/internal/controller/helmchart_controller_test.go +++ b/internal/controller/helmchart_controller_test.go @@ -50,11 +50,11 @@ import ( "helm.sh/helm/v4/pkg/chart/v2/loader" helmreg "helm.sh/helm/v4/pkg/registry" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" oras "oras.land/oras-go/v2/registry/remote" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -62,11 +62,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/helmtestserver" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/testserver" @@ -112,7 +114,7 @@ func TestHelmChartReconciler_deleteBeforeFinalizer(t *testing.T) { r := &HelmChartReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, } @@ -520,7 +522,7 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.HelmChart{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), } @@ -794,7 +796,7 @@ func TestHelmChartReconciler_reconcileSource(t *testing.T) { r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: st, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), @@ -1131,7 +1133,7 @@ func TestHelmChartReconciler_buildFromHelmRepository(t *testing.T) { r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, @@ -1384,7 +1386,7 @@ func TestHelmChartReconciler_buildFromOCIHelmRepository(t *testing.T) { r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, Storage: st, CosignVerifierFactory: testCosignVerifierFactory, @@ -1627,7 +1629,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) { WithScheme(testEnv.Scheme()). WithStatusSubresource(&sourcev1.HelmChart{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: st, Getters: testGetters, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), @@ -1838,7 +1840,7 @@ func TestHelmChartReconciler_reconcileArtifact(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.HelmChart{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), } @@ -2028,7 +2030,7 @@ func TestHelmChartReconciler_reconcileDelete(t *testing.T) { g := NewWithT(t) r := &HelmChartReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), @@ -2298,7 +2300,7 @@ func TestHelmChartReconciler_statusConditions(t *testing.T) { } ctx := context.TODO() - summarizeHelper := summarize.NewHelper(record.NewFakeRecorder(32), serialPatcher) + summarizeHelper := summarize.NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summarizeOpts := []summarize.Option{ summarize.WithConditions(helmChartReadyCondition), summarize.WithBiPolarityConditionTypes(sourcev1.SourceVerifiedCondition), @@ -2326,7 +2328,7 @@ func TestHelmChartReconciler_notify(t *testing.T) { resErr error oldObjBeforeFunc func(obj *sourcev1.HelmChart) newObjBeforeFunc func(obj *sourcev1.HelmChart) - wantEvent string + wantEvent *eventsv1.Event }{ { name: "error - no event", @@ -2340,7 +2342,12 @@ func TestHelmChartReconciler_notify(t *testing.T) { newObjBeforeFunc: func(obj *sourcev1.HelmChart) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} }, - wantEvent: "Normal ChartPackageSucceeded packaged", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "ChartPackageSucceeded", + Action: eventv1.ActionApplied, + Note: "packaged", + }, }, { name: "recovery from failure", @@ -2355,7 +2362,12 @@ func TestHelmChartReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal ChartPackageSucceeded packaged", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "ChartPackageSucceeded", + Action: eventv1.ActionApplied, + Note: "packaged", + }, }, { name: "recovery and new artifact", @@ -2370,7 +2382,12 @@ func TestHelmChartReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "aaa", Digest: "bbb"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal ChartPackageSucceeded packaged", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "ChartPackageSucceeded", + Action: eventv1.ActionApplied, + Note: "packaged", + }, }, { name: "no updates", @@ -2390,7 +2407,7 @@ func TestHelmChartReconciler_notify(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - recorder := record.NewFakeRecorder(32) + recorder := events.NewFakeRecorder(32, false) oldObj := &sourcev1.HelmChart{} newObj := oldObj.DeepCopy() @@ -2416,12 +2433,15 @@ func TestHelmChartReconciler_notify(t *testing.T) { select { case x, ok := <-recorder.Events: - g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") - if tt.wantEvent != "" { - g.Expect(x).To(ContainSubstring(tt.wantEvent)) + g.Expect(ok).To(Equal(tt.wantEvent != nil), "unexpected event received") + if tt.wantEvent != nil { + g.Expect(x.Type).To(Equal(tt.wantEvent.Type)) + g.Expect(x.Reason).To(Equal(tt.wantEvent.Reason)) + g.Expect(x.Action).To(Equal(tt.wantEvent.Action)) + g.Expect(x.Note).To(ContainSubstring(tt.wantEvent.Note)) } default: - if tt.wantEvent != "" { + if tt.wantEvent != nil { t.Errorf("expected some event to be emitted") } } @@ -2726,7 +2746,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_authStrategy(t *testing.T) { r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), } @@ -2885,7 +2905,7 @@ func TestHelmChartRepository_reconcileSource_verifyOCISourceSignature_keyless(t r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, @@ -3191,7 +3211,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureNotation(t *t r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, @@ -3443,7 +3463,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureCosign(t *tes r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, Storage: st, CosignVerifierFactory: testCosignVerifierFactory, diff --git a/internal/controller/helmrepository_controller.go b/internal/controller/helmrepository_controller.go index 0fd7eedc2..6ebbc07b7 100644 --- a/internal/controller/helmrepository_controller.go +++ b/internal/controller/helmrepository_controller.go @@ -30,7 +30,6 @@ import ( helmreg "helm.sh/helm/v4/pkg/registry" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -39,12 +38,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" intdigest "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" @@ -105,7 +105,7 @@ var helmRepositoryFailConditions = []string{ // HelmRepositoryReconciler reconciles a v1.HelmRepository object. type HelmRepositoryReconciler struct { client.Client - kuberecorder.EventRecorder + events.EventRecorder helper.Metrics Getters helmgetter.Providers @@ -299,13 +299,13 @@ func (r *HelmRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *s // Notify on new artifact and failure recovery. if !oldObj.GetArtifact().HasDigest(newObj.GetArtifact().Digest) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - "NewArtifact", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + "NewArtifact", eventv1.ActionApplied, message) ctrl.LoggerFrom(ctx).Info(message) } else { if sreconcile.FailureRecovery(oldObj, newObj, helmRepositoryFailConditions) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - meta.SucceededReason, message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + meta.SucceededReason, eventv1.ActionReconciled, message) ctrl.LoggerFrom(ctx).Info(message) } } @@ -340,7 +340,7 @@ func (r *HelmRepositoryReconciler) reconcileStorage(ctx context.Context, sp *pat // matches the actual artifact if !artifactMissing { if err := r.Storage.VerifyArtifact(*artifact); err != nil { - r.Eventf(obj, corev1.EventTypeWarning, "ArtifactVerificationFailed", "failed to verify integrity of artifact: %s", err.Error()) + r.Eventf(obj, nil, corev1.EventTypeWarning, "ArtifactVerificationFailed", eventv1.ActionFailed, "failed to verify integrity of artifact: %s", err.Error()) if err = r.Storage.Remove(*artifact); err != nil { return sreconcile.ResultEmpty, fmt.Errorf("failed to remove artifact after digest mismatch: %w", err) @@ -687,13 +687,15 @@ func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sour // about the event. func (r *HelmRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) { msg := fmt.Sprintf(messageFmt, args...) + action := eventv1.ActionReconciled // Log and emit event. if eventType == corev1.EventTypeWarning { ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) + action = eventv1.ActionFailed } else { ctrl.LoggerFrom(ctx).Info(msg) } - r.Eventf(obj, eventType, reason, msg) + r.Eventf(obj, nil, eventType, reason, action, msg) } // migrateToStatic is HelmRepository OCI migration to static object. diff --git a/internal/controller/helmrepository_controller_test.go b/internal/controller/helmrepository_controller_test.go index f76d4f221..12f75edb1 100644 --- a/internal/controller/helmrepository_controller_test.go +++ b/internal/controller/helmrepository_controller_test.go @@ -33,21 +33,23 @@ import ( helmgetter "helm.sh/helm/v4/pkg/getter" repo "helm.sh/helm/v4/pkg/repo/v1" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" intdigest "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/helmtestserver" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/secrets" @@ -86,7 +88,7 @@ func TestHelmRepositoryReconciler_deleteBeforeFinalizer(t *testing.T) { r := &HelmRepositoryReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, } // NOTE: Only a real API server responds with an error in this scenario. @@ -354,7 +356,7 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.HelmRepository{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(helmRepositoryReadyCondition.Owned, "sc"), } @@ -1028,7 +1030,7 @@ func TestHelmRepositoryReconciler_reconcileSource(t *testing.T) { } r := &HelmRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Client: clientBuilder.Build(), Storage: testStorage, Getters: testGetters, @@ -1171,7 +1173,7 @@ func TestHelmRepositoryReconciler_reconcileArtifact(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.HelmRepository{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, Cache: tt.cache, TTL: 1 * time.Minute, @@ -1443,7 +1445,7 @@ func TestHelmRepositoryReconciler_statusConditions(t *testing.T) { } ctx := context.TODO() - summarizeHelper := summarize.NewHelper(record.NewFakeRecorder(32), serialPatcher) + summarizeHelper := summarize.NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summarizeOpts := []summarize.Option{ summarize.WithConditions(helmRepositoryReadyCondition), summarize.WithReconcileResult(sreconcile.ResultSuccess), @@ -1469,7 +1471,7 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { resErr error oldObjBeforeFunc func(obj *sourcev1.HelmRepository) newObjBeforeFunc func(obj *sourcev1.HelmRepository) - wantEvent string + wantEvent *eventsv1.Event }{ { name: "error - no event", @@ -1483,7 +1485,12 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy", Size: nil} }, - wantEvent: "Normal NewArtifact stored fetched index of unknown size", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored fetched index of unknown size", + }, }, { name: "new artifact", @@ -1492,7 +1499,12 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy", Size: &aSize} }, - wantEvent: "Normal NewArtifact stored fetched index of size", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored fetched index of size", + }, }, { name: "recovery from failure", @@ -1507,7 +1519,12 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy", Size: &aSize} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal Succeeded stored fetched index of size", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "Succeeded", + Action: eventv1.ActionReconciled, + Note: "stored fetched index of size", + }, }, { name: "recovery and new artifact", @@ -1522,7 +1539,12 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "aaa", Digest: "bbb", Size: &aSize} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal NewArtifact stored fetched index of size", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored fetched index of size", + }, }, { name: "no updates", @@ -1542,7 +1564,7 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - recorder := record.NewFakeRecorder(32) + recorder := events.NewFakeRecorder(32, false) oldObj := &sourcev1.HelmRepository{} newObj := oldObj.DeepCopy() @@ -1565,12 +1587,15 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { select { case x, ok := <-recorder.Events: - g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") - if tt.wantEvent != "" { - g.Expect(x).To(ContainSubstring(tt.wantEvent)) + g.Expect(ok).To(Equal(tt.wantEvent != nil), "unexpected event received") + if tt.wantEvent != nil { + g.Expect(x.Type).To(Equal(tt.wantEvent.Type)) + g.Expect(x.Reason).To(Equal(tt.wantEvent.Reason)) + g.Expect(x.Action).To(Equal(tt.wantEvent.Action)) + g.Expect(x.Note).To(ContainSubstring(tt.wantEvent.Note)) } default: - if tt.wantEvent != "" { + if tt.wantEvent != nil { t.Errorf("expected some event to be emitted") } } diff --git a/internal/controller/ocirepository_controller.go b/internal/controller/ocirepository_controller.go index ebde8aa2d..a8b233ea3 100644 --- a/internal/controller/ocirepository_controller.go +++ b/internal/controller/ocirepository_controller.go @@ -43,12 +43,11 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/reconcile" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" @@ -56,6 +55,7 @@ import ( "github.com/fluxcd/pkg/oci" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" @@ -138,7 +138,7 @@ type ociRepositoryReconcileFunc func(ctx context.Context, sp *patch.SerialPatche type OCIRepositoryReconciler struct { client.Client helper.Metrics - kuberecorder.EventRecorder + events.EventRecorder Storage *storage.Storage ControllerName string @@ -1035,7 +1035,7 @@ func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context, sp *patc // matches the actual artifact if !artifactMissing { if err := r.Storage.VerifyArtifact(*artifact); err != nil { - r.Eventf(obj, corev1.EventTypeWarning, "ArtifactVerificationFailed", "failed to verify integrity of artifact: %s", err.Error()) + r.Eventf(obj, nil, corev1.EventTypeWarning, "ArtifactVerificationFailed", eventv1.ActionFailed, "failed to verify integrity of artifact: %s", err.Error()) if err = r.Storage.Remove(*artifact); err != nil { return sreconcile.ResultEmpty, fmt.Errorf("failed to remove artifact after digest mismatch: %w", err) @@ -1256,13 +1256,15 @@ func (r *OCIRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc // about the event. func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) { msg := fmt.Sprintf(messageFmt, args...) + action := eventv1.ActionReconciled // Log and emit event. if eventType == corev1.EventTypeWarning { + action = eventv1.ActionFailed ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) } else { ctrl.LoggerFrom(ctx).Info(msg) } - r.Eventf(obj, eventType, reason, msg) + r.Eventf(obj, nil, eventType, reason, action, msg) } // notify emits notification related to the reconciliation. @@ -1293,13 +1295,13 @@ func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *so // Notify on new artifact and failure recovery. if !oldObj.GetArtifact().HasDigest(newObj.GetArtifact().Digest) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - "NewArtifact", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + "NewArtifact", eventv1.ActionApplied, message) ctrl.LoggerFrom(ctx).Info(message) } else { if sreconcile.FailureRecovery(oldObj, newObj, ociRepositoryFailConditions) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - meta.SucceededReason, message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + meta.SucceededReason, eventv1.ActionReconciled, message) ctrl.LoggerFrom(ctx).Info(message) } } diff --git a/internal/controller/ocirepository_controller_test.go b/internal/controller/ocirepository_controller_test.go index 0755ff8c7..1a5fd8d80 100644 --- a/internal/controller/ocirepository_controller_test.go +++ b/internal/controller/ocirepository_controller_test.go @@ -48,9 +48,9 @@ import ( "github.com/sigstore/cosign/v3/cmd/cosign/cli/sign" "github.com/sigstore/cosign/v3/pkg/cosign" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" oras "oras.land/oras-go/v2/registry/remote" ctrl "sigs.k8s.io/controller-runtime" @@ -59,6 +59,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" intdigest "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" @@ -67,6 +68,7 @@ import ( "github.com/fluxcd/pkg/oci" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/tar" @@ -109,7 +111,7 @@ func TestOCIRepositoryReconciler_deleteBeforeFinalizer(t *testing.T) { r := &OCIRepositoryReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, } @@ -805,7 +807,7 @@ func TestOCIRepository_reconcileSource_authStrategy(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -1265,7 +1267,7 @@ func TestOCIRepository_reconcileSource_remoteReference(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -1468,7 +1470,7 @@ func TestOCIRepository_reconcileSource_verifyOCISourceSignatureNotation(t *testi r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -1832,7 +1834,7 @@ func TestOCIRepository_reconcileSource_verifyOCISourceTrustPolicyNotation(t *tes r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -2129,7 +2131,7 @@ func TestOCIRepository_reconcileSource_verifyOCISourceSignatureCosign(t *testing r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -2396,7 +2398,7 @@ func TestOCIRepository_reconcileSource_verifyOCISourceSignature_keyless(t *testi r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -2582,7 +2584,7 @@ func TestOCIRepository_reconcileSource_noop(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), } @@ -2814,7 +2816,7 @@ func TestOCIRepository_reconcileArtifact(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), } @@ -2962,7 +2964,7 @@ func TestOCIRepository_getArtifactRef(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), } @@ -3293,7 +3295,7 @@ func TestOCIRepository_reconcileStorage(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), } @@ -3356,7 +3358,7 @@ func TestOCIRepository_ReconcileDelete(t *testing.T) { g := NewWithT(t) r := &OCIRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -3395,7 +3397,7 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { oldObjBeforeFunc func(obj *sourcev1.OCIRepository) newObjBeforeFunc func(obj *sourcev1.OCIRepository) commit git.Commit - wantEvent string + wantEvent *eventsv1.Event }{ { name: "error - no event", @@ -3417,7 +3419,12 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { }, } }, - wantEvent: "Normal NewArtifact stored artifact with revision 'xxx' from 'oci://newurl.io', origin source 'https://github.com/stefanprodan/podinfo', origin revision '6.1.8/b3b00fe35424a45d373bf4c7214178bc36fd7872'", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact with revision 'xxx' from 'oci://newurl.io', origin source 'https://github.com/stefanprodan/podinfo', origin revision '6.1.8/b3b00fe35424a45d373bf4c7214178bc36fd7872'", + }, }, { name: "recovery from failure", @@ -3433,7 +3440,12 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal Succeeded stored artifact with revision 'xxx' from 'oci://newurl.io'", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "Succeeded", + Action: eventv1.ActionReconciled, + Note: "stored artifact with revision 'xxx' from 'oci://newurl.io'", + }, }, { name: "recovery and new artifact", @@ -3449,7 +3461,12 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "aaa", Digest: "bbb"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal NewArtifact stored artifact with revision 'aaa' from 'oci://newurl.io'", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact with revision 'aaa' from 'oci://newurl.io'", + }, }, { name: "no updates", @@ -3478,7 +3495,7 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - recorder := record.NewFakeRecorder(32) + recorder := events.NewFakeRecorder(32, false) oldObj := &sourcev1.OCIRepository{} newObj := oldObj.DeepCopy() @@ -3498,12 +3515,15 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { select { case x, ok := <-recorder.Events: - g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") - if tt.wantEvent != "" { - g.Expect(x).To(ContainSubstring(tt.wantEvent)) + g.Expect(ok).To(Equal(tt.wantEvent != nil), "unexpected event received") + if tt.wantEvent != nil { + g.Expect(x.Type).To(Equal(tt.wantEvent.Type)) + g.Expect(x.Reason).To(Equal(tt.wantEvent.Reason)) + g.Expect(x.Action).To(Equal(tt.wantEvent.Action)) + g.Expect(x.Note).To(ContainSubstring(tt.wantEvent.Note)) } default: - if tt.wantEvent != "" { + if tt.wantEvent != nil { t.Errorf("expected some event to be emitted") } } diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 53da2f74e..d2ad3a0c5 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -43,7 +43,6 @@ import ( helmreg "helm.sh/helm/v4/pkg/registry" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" @@ -52,6 +51,7 @@ import ( "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/testenv" "github.com/fluxcd/pkg/testserver" @@ -322,7 +322,7 @@ func TestMain(m *testing.M) { if err := (&GitRepositoryReconciler{ Client: testEnv, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Metrics: testMetricsH, Storage: testStorage, }).SetupWithManager(testEnv, GitRepositoryReconcilerOptions{ @@ -333,7 +333,7 @@ func TestMain(m *testing.M) { if err := (&BucketReconciler{ Client: testEnv, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Metrics: testMetricsH, Storage: testStorage, }).SetupWithManager(testEnv, BucketReconcilerOptions{ @@ -347,7 +347,7 @@ func TestMain(m *testing.M) { if err := (&OCIRepositoryReconciler{ Client: testEnv, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Metrics: testMetricsH, Storage: testStorage, }).SetupWithManager(testEnv, OCIRepositoryReconcilerOptions{ @@ -358,7 +358,7 @@ func TestMain(m *testing.M) { if err := (&HelmRepositoryReconciler{ Client: testEnv, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Metrics: testMetricsH, Getters: testGetters, Storage: testStorage, @@ -373,7 +373,7 @@ func TestMain(m *testing.M) { if err := (&HelmChartReconciler{ Client: testEnv, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Metrics: testMetricsH, Getters: testGetters, Storage: testStorage, diff --git a/internal/reconcile/summarize/processor.go b/internal/reconcile/summarize/processor.go index 746ca7c8e..e4f064c85 100644 --- a/internal/reconcile/summarize/processor.go +++ b/internal/reconcile/summarize/processor.go @@ -20,12 +20,12 @@ import ( "context" corev1 "k8s.io/api/core/v1" - kuberecorder "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/events" serror "github.com/fluxcd/source-controller/internal/error" "github.com/fluxcd/source-controller/internal/object" "github.com/fluxcd/source-controller/internal/reconcile" @@ -34,12 +34,12 @@ import ( // ResultProcessor processes the results of reconciliation (the object, result // and error). Any errors during processing need not result in the // reconciliation failure. The errors can be recorded as logs and events. -type ResultProcessor func(context.Context, kuberecorder.EventRecorder, client.Object, reconcile.Result, error) +type ResultProcessor func(context.Context, events.EventRecorder, client.Object, reconcile.Result, error) // RecordReconcileReq is a ResultProcessor that checks the reconcile // annotation value and sets it in the object status as // status.lastHandledReconcileAt. -func RecordReconcileReq(ctx context.Context, recorder kuberecorder.EventRecorder, obj client.Object, _ reconcile.Result, _ error) { +func RecordReconcileReq(ctx context.Context, recorder events.EventRecorder, obj client.Object, _ reconcile.Result, _ error) { if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok { object.SetStatusLastHandledReconcileAt(obj, v) } @@ -49,7 +49,7 @@ func RecordReconcileReq(ctx context.Context, recorder kuberecorder.EventRecorder // configured in the given error. Logging and event recording are the handled // actions at present. As more configurations are added to serror.Config, more // action handlers can be added here. -func ErrorActionHandler(ctx context.Context, recorder kuberecorder.EventRecorder, obj client.Object, _ reconcile.Result, err error) { +func ErrorActionHandler(ctx context.Context, recorder events.EventRecorder, obj client.Object, _ reconcile.Result, err error) { switch e := err.(type) { case *serror.Generic: if e.Log { @@ -80,7 +80,7 @@ func logError(ctx context.Context, eventType string, err error, msg string, keys } // recordEvent records events based on the passed error configurations. -func recordEvent(recorder kuberecorder.EventRecorder, obj client.Object, eventType string, notification bool, err error, reason string) { +func recordEvent(recorder events.EventRecorder, obj client.Object, eventType string, notification bool, err error, reason string) { if eventType == serror.EventTypeNone { return } @@ -88,16 +88,16 @@ func recordEvent(recorder kuberecorder.EventRecorder, obj client.Object, eventTy case corev1.EventTypeNormal: if notification { // K8s native event and notification-controller event. - recorder.Eventf(obj, corev1.EventTypeNormal, reason, err.Error()) + recorder.Eventf(obj, nil, corev1.EventTypeNormal, reason, eventv1.ActionApplied, err.Error()) } else { // K8s native event only. - recorder.Eventf(obj, eventv1.EventTypeTrace, reason, err.Error()) + recorder.Eventf(obj, nil, eventv1.EventTypeTrace, reason, eventv1.ActionApplied, err.Error()) } case corev1.EventTypeWarning: // TODO: Due to the current implementation of the event recorder, all // the K8s warning events are also sent as notification controller // notifications. Once the recorder becomes capable of separating the // two, conditionally record events. - recorder.Eventf(obj, corev1.EventTypeWarning, reason, err.Error()) + recorder.Eventf(obj, nil, corev1.EventTypeWarning, reason, eventv1.ActionFailed, err.Error()) } } diff --git a/internal/reconcile/summarize/processor_test.go b/internal/reconcile/summarize/processor_test.go index 44f68b5bf..d7af0a00f 100644 --- a/internal/reconcile/summarize/processor_test.go +++ b/internal/reconcile/summarize/processor_test.go @@ -22,10 +22,10 @@ import ( . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/events" sourcev1 "github.com/fluxcd/source-controller/api/v1" "github.com/fluxcd/source-controller/internal/object" @@ -119,7 +119,7 @@ func TestRecordReconcileReq(t *testing.T) { } ctx := context.TODO() - RecordReconcileReq(ctx, record.NewFakeRecorder(32), obj, reconcile.ResultEmpty, nil) + RecordReconcileReq(ctx, events.NewFakeRecorder(32, false), obj, reconcile.ResultEmpty, nil) if tt.afterFunc != nil { tt.afterFunc(g, obj) diff --git a/internal/reconcile/summarize/summary.go b/internal/reconcile/summarize/summary.go index 8650a0907..7a7ea8762 100644 --- a/internal/reconcile/summarize/summary.go +++ b/internal/reconcile/summarize/summary.go @@ -22,11 +22,11 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" kerrors "k8s.io/apimachinery/pkg/util/errors" - kuberecorder "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/conditions" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/source-controller/internal/reconcile" @@ -50,12 +50,12 @@ type Conditions struct { // Helper is SummarizeAndPatch helper. type Helper struct { - recorder kuberecorder.EventRecorder + recorder events.EventRecorder serialPatcher *patch.SerialPatcher } // NewHelper returns an initialized Helper. -func NewHelper(recorder kuberecorder.EventRecorder, serialPatcher *patch.SerialPatcher) *Helper { +func NewHelper(recorder events.EventRecorder, serialPatcher *patch.SerialPatcher) *Helper { return &Helper{ recorder: recorder, serialPatcher: serialPatcher, diff --git a/internal/reconcile/summarize/summary_test.go b/internal/reconcile/summarize/summary_test.go index c4c16e4eb..c9acbca38 100644 --- a/internal/reconcile/summarize/summary_test.go +++ b/internal/reconcile/summarize/summary_test.go @@ -26,7 +26,6 @@ import ( . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -34,6 +33,7 @@ import ( "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/patch" sourcev1 "github.com/fluxcd/source-controller/api/v1" @@ -351,7 +351,7 @@ func TestSummarizeAndPatch(t *testing.T) { serialPatcher := patch.NewSerialPatcher(obj, c) - summaryHelper := NewHelper(record.NewFakeRecorder(32), serialPatcher) + summaryHelper := NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summaryOpts := []Option{ WithReconcileResult(tt.result), WithReconcileError(tt.reconcileErr), @@ -479,7 +479,7 @@ func TestSummarizeAndPatch_Intermediate(t *testing.T) { g.Expect(c.Create(ctx, obj)).To(Succeed()) serialPatcher := patch.NewSerialPatcher(obj, c) - summaryHelper := NewHelper(record.NewFakeRecorder(32), serialPatcher) + summaryHelper := NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summaryOpts := []Option{ WithConditions(tt.conditions...), WithResultBuilder(reconcile.AlwaysRequeueResultBuilder{RequeueAfter: interval}), diff --git a/main.go b/main.go index 75d897bd8..271ae2de6 100644 --- a/main.go +++ b/main.go @@ -28,7 +28,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" @@ -195,7 +194,12 @@ func main() { metrics := helper.NewMetrics(mgr, metrics.MustMakeRecorder(), sourcev1.SourceFinalizer) cacheRecorder := cache.MustMakeMetrics() - eventRecorder := mustSetupEventRecorder(mgr, eventsAddr, controllerName) + + eventRecorder, err := events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName) + if err != nil { + setupLog.Error(err, "unable to create event recorder") + os.Exit(1) + } algo, err := artdigest.AlgorithmForName(artifactOptions.ArtifactDigestAlgo) if err != nil { @@ -328,15 +332,6 @@ func main() { } } -func mustSetupEventRecorder(mgr ctrl.Manager, eventsAddr, controllerName string) record.EventRecorder { - eventRecorder, err := events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName) - if err != nil { - setupLog.Error(err, "unable to create event recorder") - os.Exit(1) - } - return eventRecorder -} - func mustSetupManager(metricsAddr, healthAddr string, maxConcurrent int, watchOpts helper.WatchOptions, clientOpts client.Options, leaderOpts leaderelection.Options) ctrl.Manager {