diff --git a/internal/evaluator/logger.go b/internal/evaluator/logger.go index fd0ffa7200..c02764ee39 100644 --- a/internal/evaluator/logger.go +++ b/internal/evaluator/logger.go @@ -18,8 +18,11 @@ package evaluator import ( + "context" + "github.com/elastic/elastic-agent-libs/logp" "github.com/open-policy-agent/opa/v1/logging" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -54,7 +57,7 @@ func (l *logger) Info(fmt string, a ...any) { } func (l *logger) Error(fmt string, a ...any) { - l.log.Errorf(fmt, a...) + l.log.Errorf(context.TODO(), fmt, a...) } func (l *logger) Warn(fmt string, a ...any) { @@ -84,12 +87,12 @@ func mapToArray(m map[string]any) []any { return ret } -func newLogger() logging.Logger { +func newLogger(ctx context.Context) logging.Logger { lvl := zap.NewAtomicLevelAt(logp.GetLevel()) log := clog.NewLogger("opa").WithOptions( zap.IncreaseLevel(lvl), zap.AddCallerSkip(1), - ) + ).WithSpanContext(trace.SpanContextFromContext(ctx)) return &logger{ log: log, diff --git a/internal/evaluator/logger_test.go b/internal/evaluator/logger_test.go index 51368afe7e..d68f6e2702 100644 --- a/internal/evaluator/logger_test.go +++ b/internal/evaluator/logger_test.go @@ -46,7 +46,7 @@ func (s *LoggerTestSuite) SetupSuite() { } func (s *LoggerTestSuite) TestLogFormat() { - logger := newLogger() + logger := newLogger(s.T().Context()) logger.SetLevel(logging.Warn) logger.Warn("warn %s", "warn") logs := logp.ObserverLogs().TakeAll() @@ -56,7 +56,7 @@ func (s *LoggerTestSuite) TestLogFormat() { } func (s *LoggerTestSuite) TestLogFields() { - logger := newLogger() + logger := newLogger(s.T().Context()) logger.SetLevel(logging.Debug) logger = logger.WithFields(map[string]any{ "key": "val", @@ -72,7 +72,7 @@ func (s *LoggerTestSuite) TestLogFields() { } func (s *LoggerTestSuite) TestLogMultipleFields() { - logger := newLogger() + logger := newLogger(s.T().Context()) logger.SetLevel(logging.Debug) logger = logger.WithFields(map[string]any{ "key1": "val1", @@ -93,7 +93,7 @@ func (s *LoggerTestSuite) TestLogMultipleFields() { } func (s *LoggerTestSuite) TestLoggerGetLevel() { - logger := newLogger() + logger := newLogger(s.T().Context()) tests := []logging.Level{ logging.Debug, logging.Info, @@ -108,7 +108,7 @@ func (s *LoggerTestSuite) TestLoggerGetLevel() { } func (s *LoggerTestSuite) TestLoggerSetLevel() { - logger := newLogger() + logger := newLogger(s.T().Context()) logger.SetLevel(logging.Debug) logger.Debug("debug") logs := logp.ObserverLogs().TakeAll() diff --git a/internal/evaluator/opa.go b/internal/evaluator/opa.go index b5854ca67d..e61300448f 100644 --- a/internal/evaluator/opa.go +++ b/internal/evaluator/opa.go @@ -72,14 +72,11 @@ func NewOpaEvaluator(ctx context.Context, log *clog.Logger, cfg *config.Config) plugin := fmt.Sprintf(logPlugin, dlogger.PluginName, dlogger.PluginName) opaCfg := fmt.Sprintf(opaConfig, cfg.BundlePath, plugin) - decisonLogger := newLogger() - stdLogger := newLogger() - // create an instance of the OPA object opa, err := sdk.New(ctx, sdk.Options{ Config: bytes.NewReader([]byte(opaCfg)), - Logger: stdLogger, - ConsoleLogger: decisonLogger, + Logger: newLogger(ctx), + ConsoleLogger: newLogger(ctx), Plugins: map[string]plugins.Factory{ dlogger.PluginName: &dlogger.Factory{}, }, diff --git a/internal/flavors/benchmark/aws_org.go b/internal/flavors/benchmark/aws_org.go index 1a56c23650..e973cc5467 100644 --- a/internal/flavors/benchmark/aws_org.go +++ b/internal/flavors/benchmark/aws_org.go @@ -161,7 +161,7 @@ func (a *AWSOrg) getAwsAccounts(ctx context.Context, log *clog.Logger, cfgCloudb if identity.Account == rootIdentity.Account { cfg, err := a.pickManagementAccountRole(ctx, log, stsClient, cfgCloudbeatRoot, identity) if err != nil { - log.Errorf("error picking roles for account %s: %s", identity.Account, err) + log.Errorf(ctx, "error picking roles for account %s: %s", identity.Account, err) continue } awsConfig = cfg @@ -218,7 +218,7 @@ func (a *AWSOrg) pickManagementAccountRole(ctx context.Context, log *clog.Logger if foundTagValue == scanSettingTagValue { _, err := a.IAMProvider.GetRole(ctx, memberRole) if err != nil { - log.Errorf("Management Account should be scanned (%s: %s), but %q role is missing: %s", scanSettingTagKey, foundTagValue, memberRole, err) + log.Errorf(ctx, "Management Account should be scanned (%s: %s), but %q role is missing: %s", scanSettingTagKey, foundTagValue, memberRole, err) } } diff --git a/internal/flavors/benchmark/k8s_helper.go b/internal/flavors/benchmark/k8s_helper.go index 13cbd88fcd..08f1e75152 100644 --- a/internal/flavors/benchmark/k8s_helper.go +++ b/internal/flavors/benchmark/k8s_helper.go @@ -48,7 +48,7 @@ func NewK8sBenchmarkHelper(log *clog.Logger, cfg *config.Config, client client_g func (h *K8SBenchmarkHelper) GetK8sDataProvider(ctx context.Context, clusterNameProvider k8s.ClusterNameProviderAPI) (dataprovider.CommonDataProvider, error) { clusterName, err := clusterNameProvider.GetClusterName(ctx, h.cfg) if err != nil { - h.log.Errorf("failed to get cluster name: %v", err) + h.log.Errorf(ctx, "failed to get cluster name: %v", err) } serverVersion, err := h.client.Discovery().ServerVersion() diff --git a/internal/flavors/vulnerability.go b/internal/flavors/vulnerability.go index 30fe0666f9..410f705047 100644 --- a/internal/flavors/vulnerability.go +++ b/internal/flavors/vulnerability.go @@ -129,7 +129,7 @@ func (bt *vulnerability) Run(*beat.Beat) error { func (bt *vulnerability) runIteration() error { worker, err := vuln.NewVulnerabilityWorker(bt.ctx, bt.log, bt.config, bt.bdp, bt.cdp) if err != nil { - bt.log.Error("vulnerability.runIteration worker creation failed") + bt.log.Error(context.TODO(), "vulnerability.runIteration worker creation failed") bt.cancel() return err } diff --git a/internal/infra/clog/clog.go b/internal/infra/clog/clog.go index d220a56d77..9b3dfd2c4c 100644 --- a/internal/infra/clog/clog.go +++ b/internal/infra/clog/clog.go @@ -31,22 +31,24 @@ type Logger struct { *logp.Logger } -func (l *Logger) Errorf(template string, args ...any) { +func (l *Logger) Errorf(ctx context.Context, template string, args ...any) { + spanCtx := trace.SpanContextFromContext(ctx) // Downgrade context.Canceled errors to warning level if hasErrorType(context.Canceled, args...) { - l.Warnf(template, args...) + l.WithSpanContext(spanCtx).Warnf(template, args...) return } - l.Logger.Errorf(template, args...) + l.WithSpanContext(spanCtx).Logger.Errorf(template, args...) } -func (l *Logger) Error(args ...any) { +func (l *Logger) Error(ctx context.Context, args ...any) { + spanCtx := trace.SpanContextFromContext(ctx) // Downgrade context.Canceled errors to warning level if hasErrorType(context.Canceled, args...) { - l.Warn(args...) + l.WithSpanContext(spanCtx).Warn(args...) return } - l.Logger.Error(args...) + l.WithSpanContext(spanCtx).Logger.Error(args...) } func (l *Logger) Named(name string) *Logger { diff --git a/internal/infra/clog/clog_test.go b/internal/infra/clog/clog_test.go index 92469e86b3..32cbc417b1 100644 --- a/internal/infra/clog/clog_test.go +++ b/internal/infra/clog/clog_test.go @@ -46,8 +46,8 @@ func (s *LoggerTestSuite) TestErrorfWithContextCanceled() { logger := NewLogger("test") err := context.Canceled - logger.Errorf("some error: %s", err) // error with context.Canceled - logger.Errorf("some error: %s", err.Error()) // error string with context Canceled + logger.Errorf(s.T().Context(), "some error: %s", err) // error with context.Canceled + logger.Errorf(s.T().Context(), "some error: %s", err.Error()) // error string with context Canceled logs := logp.ObserverLogs().TakeAll() if s.Len(logs, 2) { @@ -62,7 +62,7 @@ func (s *LoggerTestSuite) TestLogErrorfWithoutContextCanceled() { logger := NewLogger("test") err := errors.New("oops") - logger.Errorf("some error: %s", err) + logger.Errorf(s.T().Context(), "some error: %s", err) logs := logp.ObserverLogs().TakeAll() if s.Len(logs, 1) { diff --git a/internal/inventory/awsfetcher/fetcher_ec2_instance.go b/internal/inventory/awsfetcher/fetcher_ec2_instance.go index 3377f86930..7def1664b3 100644 --- a/internal/inventory/awsfetcher/fetcher_ec2_instance.go +++ b/internal/inventory/awsfetcher/fetcher_ec2_instance.go @@ -57,7 +57,7 @@ func (e *ec2InstanceFetcher) Fetch(ctx context.Context, assetChannel chan<- inve instances, err := e.provider.DescribeInstances(ctx) if err != nil { - e.logger.Errorf("Could not list ec2 instances: %v", err) + e.logger.Errorf(ctx, "Could not list ec2 instances: %v", err) awslib.ReportMissingPermission(e.statusHandler, err) return } diff --git a/internal/inventory/awsfetcher/fetcher_elb.go b/internal/inventory/awsfetcher/fetcher_elb.go index 6e96ffc5b4..586b8e1bab 100644 --- a/internal/inventory/awsfetcher/fetcher_elb.go +++ b/internal/inventory/awsfetcher/fetcher_elb.go @@ -76,8 +76,8 @@ func (f *elbFetcher) fetch(ctx context.Context, resourceName string, function el awsResources, err := function(ctx) if err != nil { + f.logger.Errorf(ctx, "Could not fetch %s: %v", resourceName, err) awslib.ReportMissingPermission(f.statusHandler, err) - f.logger.Errorf("Could not fetch %s: %v", resourceName, err) return } diff --git a/internal/inventory/awsfetcher/fetcher_iam_policy.go b/internal/inventory/awsfetcher/fetcher_iam_policy.go index 11045debba..9995991337 100644 --- a/internal/inventory/awsfetcher/fetcher_iam_policy.go +++ b/internal/inventory/awsfetcher/fetcher_iam_policy.go @@ -57,7 +57,7 @@ func (i *iamPolicyFetcher) Fetch(ctx context.Context, assetChannel chan<- invent policies, err := i.provider.GetPolicies(ctx) if err != nil { - i.logger.Errorf("Could not list policies: %v", err) + i.logger.Errorf(ctx, "Could not list policies: %v", err) if len(policies) == 0 { return } @@ -71,7 +71,7 @@ func (i *iamPolicyFetcher) Fetch(ctx context.Context, assetChannel chan<- invent policy, ok := resource.(iam.Policy) if !ok { - i.logger.Errorf("Could not get info about policy: %s", resource.GetResourceArn()) + i.logger.Errorf(ctx, "Could not get info about policy: %s", resource.GetResourceArn()) continue } diff --git a/internal/inventory/awsfetcher/fetcher_iam_role.go b/internal/inventory/awsfetcher/fetcher_iam_role.go index d18666ac7b..1722a28f2b 100644 --- a/internal/inventory/awsfetcher/fetcher_iam_role.go +++ b/internal/inventory/awsfetcher/fetcher_iam_role.go @@ -57,7 +57,7 @@ func (i *iamRoleFetcher) Fetch(ctx context.Context, assetChannel chan<- inventor roles, err := i.provider.ListRoles(ctx) if err != nil { - i.logger.Errorf("Could not list roles: %v", err) + i.logger.Errorf(ctx, "Could not list roles: %v", err) if len(roles) == 0 { return } diff --git a/internal/inventory/awsfetcher/fetcher_iam_user.go b/internal/inventory/awsfetcher/fetcher_iam_user.go index 201534d261..1075691b2c 100644 --- a/internal/inventory/awsfetcher/fetcher_iam_user.go +++ b/internal/inventory/awsfetcher/fetcher_iam_user.go @@ -56,8 +56,8 @@ func (i *iamUserFetcher) Fetch(ctx context.Context, assetChannel chan<- inventor users, err := i.provider.GetUsers(ctx) if err != nil { + i.logger.Errorf(ctx, "Could not list users: %v", err) awslib.ReportMissingPermission(i.statusHandler, err) - i.logger.Errorf("Could not list users: %v", err) if len(users) == 0 { return } @@ -70,7 +70,7 @@ func (i *iamUserFetcher) Fetch(ctx context.Context, assetChannel chan<- inventor user, ok := resource.(iam.User) if !ok { - i.logger.Errorf("Could not get info about user: %s", resource.GetResourceArn()) + i.logger.Errorf(ctx, "Could not get info about user: %s", resource.GetResourceArn()) continue } diff --git a/internal/inventory/awsfetcher/fetcher_lambda.go b/internal/inventory/awsfetcher/fetcher_lambda.go index 784c06c4d1..2b5a30d633 100644 --- a/internal/inventory/awsfetcher/fetcher_lambda.go +++ b/internal/inventory/awsfetcher/fetcher_lambda.go @@ -76,8 +76,8 @@ func (s *lambdaFetcher) fetch(ctx context.Context, resourceName string, function awsResources, err := function(ctx) if err != nil { + s.logger.Errorf(ctx, "Could not fetch %s: %v", resourceName, err) awslib.ReportMissingPermission(s.statusHandler, err) - s.logger.Errorf("Could not fetch %s: %v", resourceName, err) return } diff --git a/internal/inventory/awsfetcher/fetcher_networking.go b/internal/inventory/awsfetcher/fetcher_networking.go index bf384f20b5..bf2241902f 100644 --- a/internal/inventory/awsfetcher/fetcher_networking.go +++ b/internal/inventory/awsfetcher/fetcher_networking.go @@ -91,8 +91,8 @@ func (s *networkingFetcher) fetch(ctx context.Context, resourceName string, func awsResources, err := function(ctx) if err != nil { + s.logger.Errorf(ctx, "Could not fetch %s: %v", resourceName, err) awslib.ReportMissingPermission(s.statusHandler, err) - s.logger.Errorf("Could not fetch %s: %v", resourceName, err) return } diff --git a/internal/inventory/awsfetcher/fetcher_rds.go b/internal/inventory/awsfetcher/fetcher_rds.go index 52840cac38..d784a97b94 100644 --- a/internal/inventory/awsfetcher/fetcher_rds.go +++ b/internal/inventory/awsfetcher/fetcher_rds.go @@ -58,7 +58,7 @@ func (s *rdsFetcher) Fetch(ctx context.Context, assetChannel chan<- inventory.As awsResources, err := s.provider.DescribeDBInstances(ctx) if err != nil { - s.logger.Errorf("Could not list RDS Instances: %v", err) + s.logger.Errorf(ctx, "Could not list RDS Instances: %v", err) if len(awsResources) == 0 { return } diff --git a/internal/inventory/awsfetcher/fetcher_s3_bucket.go b/internal/inventory/awsfetcher/fetcher_s3_bucket.go index c28b71be13..af6b8289bf 100644 --- a/internal/inventory/awsfetcher/fetcher_s3_bucket.go +++ b/internal/inventory/awsfetcher/fetcher_s3_bucket.go @@ -58,7 +58,7 @@ func (s *s3BucketFetcher) Fetch(ctx context.Context, assetChannel chan<- invento awsBuckets, err := s.provider.DescribeBuckets(ctx) if err != nil { - s.logger.Errorf("Could not list s3 buckets: %v", err) + s.logger.Errorf(ctx, "Could not list s3 buckets: %v", err) if len(awsBuckets) == 0 { return } diff --git a/internal/inventory/awsfetcher/fetcher_sns.go b/internal/inventory/awsfetcher/fetcher_sns.go index 658febf43b..57ac1a7831 100644 --- a/internal/inventory/awsfetcher/fetcher_sns.go +++ b/internal/inventory/awsfetcher/fetcher_sns.go @@ -55,7 +55,7 @@ func (s *snsFetcher) Fetch(ctx context.Context, assetChannel chan<- inventory.As awsResources, err := s.provider.ListTopicsWithSubscriptions(ctx) if err != nil { - s.logger.Errorf("Could not fetch SNS Topics: %v", err) + s.logger.Errorf(ctx, "Could not fetch SNS Topics: %v", err) awslib.ReportMissingPermission(s.statusHandler, err) return } diff --git a/internal/inventory/azurefetcher/fetcher_account.go b/internal/inventory/azurefetcher/fetcher_account.go index 77b2aa46e1..54e089388f 100644 --- a/internal/inventory/azurefetcher/fetcher_account.go +++ b/internal/inventory/azurefetcher/fetcher_account.go @@ -67,7 +67,7 @@ func (f *accountFetcher) fetch(ctx context.Context, resourceName string, functio azureAssets, err := function(ctx) if err != nil { - f.logger.Errorf("Could not fetch %s: %v", resourceName, err) + f.logger.Errorf(ctx, "Could not fetch %s: %v", resourceName, err) return } diff --git a/internal/inventory/azurefetcher/fetcher_activedirectory.go b/internal/inventory/azurefetcher/fetcher_activedirectory.go index dd2958c020..e07c5c4c06 100644 --- a/internal/inventory/azurefetcher/fetcher_activedirectory.go +++ b/internal/inventory/azurefetcher/fetcher_activedirectory.go @@ -63,7 +63,7 @@ func (f *activedirectoryFetcher) fetchServicePrincipals(ctx context.Context, ass items, err := f.provider.ListServicePrincipals(ctx) if err != nil { - f.logger.Errorf("Could not fetch Service Principals: %v", err) + f.logger.Errorf(ctx, "Could not fetch Service Principals: %v", err) } for _, item := range items { @@ -94,7 +94,7 @@ func (f *activedirectoryFetcher) fetchDirectoryRoles(ctx context.Context, assetC items, err := f.provider.ListDirectoryRoles(ctx) if err != nil { - f.logger.Errorf("Could not fetch Directory Roles: %v", err) + f.logger.Errorf(ctx, "Could not fetch Directory Roles: %v", err) } for _, item := range items { @@ -124,7 +124,7 @@ func (f *activedirectoryFetcher) fetchGroups(ctx context.Context, assetChan chan items, err := f.provider.ListGroups(ctx) if err != nil { - f.logger.Errorf("Could not fetch Groups: %v", err) + f.logger.Errorf(ctx, "Could not fetch Groups: %v", err) } for _, item := range items { @@ -154,7 +154,7 @@ func (f *activedirectoryFetcher) fetchUsers(ctx context.Context, assetChan chan< items, err := f.provider.ListUsers(ctx) if err != nil { - f.logger.Errorf("Could not fetch Users: %v", err) + f.logger.Errorf(ctx, "Could not fetch Users: %v", err) } for _, item := range items { diff --git a/internal/inventory/azurefetcher/fetcher_resource_graph.go b/internal/inventory/azurefetcher/fetcher_resource_graph.go index f46b6d7201..d7a4066da8 100644 --- a/internal/inventory/azurefetcher/fetcher_resource_graph.go +++ b/internal/inventory/azurefetcher/fetcher_resource_graph.go @@ -79,7 +79,7 @@ func (f *resourceGraphFetcher) fetch(ctx context.Context, resourceName, serviceN azureAssets, err := f.provider.ListAllAssetTypesByName(ctx, resourceGroup, []string{resourceType}) if err != nil { - f.logger.Errorf("Could not fetch %s: %v", resourceName, err) + f.logger.Errorf(ctx, "Could not fetch %s: %v", resourceName, err) return } diff --git a/internal/inventory/azurefetcher/fetcher_storage.go b/internal/inventory/azurefetcher/fetcher_storage.go index 18baec2ad1..63c6a4e183 100644 --- a/internal/inventory/azurefetcher/fetcher_storage.go +++ b/internal/inventory/azurefetcher/fetcher_storage.go @@ -74,7 +74,7 @@ func (f *storageFetcher) Fetch(ctx context.Context, assetChan chan<- inventory.A storageAccounts, err := f.listStorageAccounts(ctx) if err != nil { - f.logger.Errorf("Could not fetch anything: %v", err) + f.logger.Errorf(ctx, "Could not fetch anything: %v", err) return } @@ -108,7 +108,7 @@ func (f *storageFetcher) fetch(ctx context.Context, storageAccounts []azurelib.A azureAssets, err := function(ctx, storageAccounts) if err != nil { - f.logger.Errorf("Could not fetch %s: %v", resourceName, err) + f.logger.Errorf(ctx, "Could not fetch %s: %v", resourceName, err) return } diff --git a/internal/launcher/launcher.go b/internal/launcher/launcher.go index 8670dbf357..f2066b729d 100644 --- a/internal/launcher/launcher.go +++ b/internal/launcher/launcher.go @@ -21,6 +21,7 @@ package launcher import ( + "context" "errors" "fmt" "sync" @@ -93,13 +94,16 @@ func (l *launcher) Run(b *beat.Beat) error { return err } + // Create a root context for the launcher's execution. + ctx := context.Background() + // Wait for Fleet-side reconfiguration only if beater is running in Agent-managed mode. if b.Manager.Enabled() { defer b.Manager.Stop() l.log.Infof("Waiting for initial reconfiguration from Fleet server...") - update, err := l.reconfigureWait(reconfigureWaitTimeout) + update, err := l.reconfigureWait(ctx, reconfigureWaitTimeout) if err != nil { - l.log.Errorf("Failed while waiting for the initial reconfiguration from Fleet server: %v", err) + l.log.Errorf(ctx, "Failed while waiting for the initial reconfiguration from Fleet server: %v", err) return err } @@ -108,12 +112,12 @@ func (l *launcher) Run(b *beat.Beat) error { } } - err := l.run() + err := l.run(ctx) return err } -func (l *launcher) run() error { - err := l.runLoop() +func (l *launcher) run(ctx context.Context) error { + err := l.runLoop(ctx) switch { case errors.Is(err, ErrGracefulExit): @@ -122,7 +126,7 @@ func (l *launcher) run() error { l.log.Info("Launcher stopped after timeout") case err == nil: // unexpected default: - l.log.Errorf("Launcher stopped by error: %v", err) + l.log.Errorf(ctx, "Launcher stopped by error: %v", err) } l.reloader.Stop() @@ -130,7 +134,7 @@ func (l *launcher) run() error { } // runLoop is the loop that keeps the launcher alive -func (l *launcher) runLoop() error { +func (l *launcher) runLoop(ctx context.Context) error { l.log.Info("Launcher is running") for { // Run a new beater @@ -143,7 +147,7 @@ func (l *launcher) runLoop() error { // config update (val, nil) // stop signal (nil, ErrStopSignal) // beater error (nil, err) - cfg, err := l.waitForUpdates() + cfg, err := l.waitForUpdates(ctx) if isConfigUpdate(cfg, err) { l.stopBeater() @@ -237,8 +241,10 @@ func (l *launcher) stopBeaterWithTimeout(duration time.Duration) error { // 1. The Stop function got called (nil, ErrStopSignal) // 2. The beater run has returned (nil, err) // 3. A config update received (val, nil) -func (l *launcher) waitForUpdates() (*config.C, error) { +func (l *launcher) waitForUpdates(ctx context.Context) (*config.C, error) { select { + case <-ctx.Done(): + return nil, ctx.Err() case err, ok := <-l.beaterErr: if !ok { l.log.Infof("Launcher received a stop signal") @@ -278,7 +284,7 @@ func (l *launcher) configUpdate(update *config.C) error { // reconfigureWait will wait for and consume incoming reconfiguration from the Fleet server, and keep // discarding them until the incoming config contains the necessary information to start beater // properly, thereafter returning the valid config. -func (l *launcher) reconfigureWait(timeout time.Duration) (*config.C, error) { +func (l *launcher) reconfigureWait(ctx context.Context, timeout time.Duration) (*config.C, error) { start := time.Now() timer := time.After(timeout) @@ -298,7 +304,7 @@ func (l *launcher) reconfigureWait(timeout time.Duration) (*config.C, error) { if l.validator != nil { err := l.validator.Validate(update) if err != nil { - l.log.Errorf("Config update validation failed: %v", err) + l.log.Errorf(ctx, "Config update validation failed: %v", err) healthErr := &BeaterUnhealthyError{} if errors.As(err, healthErr) { l.beat.Manager.UpdateStatus(status.Degraded, healthErr.Error()) diff --git a/internal/launcher/launcher_test.go b/internal/launcher/launcher_test.go index 49358112ba..212784ddfe 100644 --- a/internal/launcher/launcher_test.go +++ b/internal/launcher/launcher_test.go @@ -299,7 +299,7 @@ func (s *LauncherTestSuite) TestWaitForUpdates() { sut.Stop() }(tt.configs) - err := sut.run() + err := sut.run(s.T().Context()) s.Require().ErrorIs(err, ErrGracefulExit) beater, ok := sut.beater.(*beaterMock) s.Require().True(ok) @@ -322,7 +322,7 @@ func (s *LauncherTestSuite) TestErrorWaitForUpdates() { mocks.reloader.ch <- configErr }() - err := sut.run() + err := sut.run(s.T().Context()) s.Require().Error(err) } @@ -417,7 +417,7 @@ func (s *LauncherTestSuite) TestLauncherValidator() { } }(tt.configs) - cfg, err := sut.reconfigureWait(tt.timeout) + cfg, err := sut.reconfigureWait(s.T().Context(), tt.timeout) if tt.expected == nil { s.Require().Error(err) } else { @@ -430,12 +430,12 @@ func (s *LauncherTestSuite) TestLauncherValidator() { // TestLauncherErrorBeater should not call sut.Stop as the launcher should stop without calling it func (s *LauncherTestSuite) TestLauncherErrorBeater() { - s.Require().Error(s.newLauncher(s.initMocks(), errorBeaterMockCreator).run()) + s.Require().Error(s.newLauncher(s.initMocks(), errorBeaterMockCreator).run(s.T().Context())) } // TestLauncherPanicBeater should not call sut.Stop as the launcher should stop without calling it func (s *LauncherTestSuite) TestLauncherPanicBeater() { - s.Require().ErrorContains(s.newLauncher(s.initMocks(), panicBeaterMockCreator).run(), "panicBeaterMock panics") + s.Require().ErrorContains(s.newLauncher(s.initMocks(), panicBeaterMockCreator).run(s.T().Context()), "panicBeaterMock panics") } func (s *LauncherTestSuite) TestLauncherUpdateAndStop() { @@ -445,7 +445,7 @@ func (s *LauncherTestSuite) TestLauncherUpdateAndStop() { mocks.reloader.ch <- config.NewConfig() sut.Stop() }() - err := sut.run() + err := sut.run(s.T().Context()) s.Require().ErrorIs(err, ErrGracefulExit) } @@ -456,7 +456,7 @@ func (s *LauncherTestSuite) TestLauncherStopTwicePanics() { mocks.reloader.ch <- config.NewConfig() sut.Stop() }() - err := sut.run() + err := sut.run(s.T().Context()) s.Require().ErrorIs(err, ErrGracefulExit) s.Panics(func() { @@ -466,7 +466,7 @@ func (s *LauncherTestSuite) TestLauncherStopTwicePanics() { // TestLauncherErrorBeaterCreation should not call sut.Stop as the launcher should stop without calling it func (s *LauncherTestSuite) TestLauncherErrorBeaterCreation() { - s.Require().Error(s.newLauncher(s.initMocks(), errorBeaterCreator).run()) + s.Require().Error(s.newLauncher(s.initMocks(), errorBeaterCreator).run(s.T().Context())) } func (s *LauncherTestSuite) TestLauncherStop() { @@ -476,7 +476,7 @@ func (s *LauncherTestSuite) TestLauncherStop() { sut.Stop() }() - err := sut.run() + err := sut.run(s.T().Context()) s.Require().ErrorIs(err, ErrGracefulExit) } @@ -491,7 +491,7 @@ func (s *LauncherTestSuite) TestLauncherStopTimeout() { time.Sleep(shutdownGracePeriod + 100*time.Millisecond) }() - err := sut.run() + err := sut.run(s.T().Context()) s.Require().ErrorIs(err, ErrTimeoutExit) } diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 531131c4f6..aaca295380 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -38,7 +38,7 @@ func Step[In any, Out any](ctx context.Context, log *clog.Logger, inputChannel c for s := range inputChannel { val, err := fn(ctx, s) if err != nil { - log.Error(err) + log.Error(ctx, err) continue } outputCh <- val diff --git a/internal/resources/fetching/cycle/cache.go b/internal/resources/fetching/cycle/cache.go index 3e2b13cd1b..040185e466 100644 --- a/internal/resources/fetching/cycle/cache.go +++ b/internal/resources/fetching/cycle/cache.go @@ -57,7 +57,7 @@ func (c *Cache[T]) GetValue(ctx context.Context, cycle Metadata, fetch func(cont if c.lastCycle.Sequence < 0 { return result, err } - c.log.Errorf("Failed to renew, using cached value: %v", err) + c.log.Errorf(ctx, "Failed to renew, using cached value: %v", err) } else { c.cachedValue = result c.lastCycle = cycle diff --git a/internal/resources/fetching/fetchers/aws/ecr_fetcher.go b/internal/resources/fetching/fetchers/aws/ecr_fetcher.go index d9af08df61..2dd775910d 100644 --- a/internal/resources/fetching/fetchers/aws/ecr_fetcher.go +++ b/internal/resources/fetching/fetchers/aws/ecr_fetcher.go @@ -74,7 +74,7 @@ func (f *EcrFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) er podsList, err := f.kubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{}) if err != nil { - f.log.Errorf("failed to get pods - %v", err) + f.log.Errorf(ctx, "failed to get pods - %v", err) return err } @@ -96,7 +96,7 @@ func (f *EcrFetcher) describePodImagesRepositories(ctx context.Context, podsList // Add configuration describedRepo, err := describer.Provider.DescribeRepositories(ctx, repositories, region) if err != nil { - f.log.Errorf("could not retrieve pod's aws repositories for region %s: %v", region, err) + f.log.Errorf(ctx, "could not retrieve pod's aws repositories for region %s: %v", region, err) } else { awsRepositories = append(awsRepositories, describedRepo...) } diff --git a/internal/resources/fetching/fetchers/aws/iam_fetcher.go b/internal/resources/fetching/fetchers/aws/iam_fetcher.go index ce884da105..8c9aca58dc 100644 --- a/internal/resources/fetching/fetchers/aws/iam_fetcher.go +++ b/internal/resources/fetching/fetchers/aws/iam_fetcher.go @@ -65,7 +65,7 @@ func (f IAMFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) err pwdPolicy, err := f.iamProvider.GetPasswordPolicy(ctx) if err != nil { - f.log.Errorf("Unable to fetch PasswordPolicy, error: %v", err) + f.log.Errorf(ctx, "Unable to fetch PasswordPolicy, error: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) } else { iamResources = append(iamResources, pwdPolicy) @@ -73,7 +73,7 @@ func (f IAMFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) err users, err := f.iamProvider.GetUsers(ctx) if err != nil { - f.log.Errorf("Unable to fetch IAM users, error: %v", err) + f.log.Errorf(ctx, "Unable to fetch IAM users, error: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) } else { iamResources = append(iamResources, users...) @@ -81,7 +81,7 @@ func (f IAMFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) err policies, err := f.iamProvider.GetPolicies(ctx) if err != nil { - f.log.Errorf("Unable to fetch IAM policies, error: %v", err) + f.log.Errorf(ctx, "Unable to fetch IAM policies, error: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) } else { iamResources = append(iamResources, policies...) @@ -89,7 +89,7 @@ func (f IAMFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) err serverCertificates, err := f.iamProvider.ListServerCertificates(ctx) if err != nil { - f.log.Errorf("Unable to fetch IAM server certificates, error: %v", err) + f.log.Errorf(ctx, "Unable to fetch IAM server certificates, error: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) } else { iamResources = append(iamResources, serverCertificates) @@ -97,7 +97,7 @@ func (f IAMFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) err accessAnalyzers, err := f.iamProvider.GetAccessAnalyzers(ctx) if err != nil { - f.log.Errorf("Unable to fetch access access analyzers, error: %v", err) + f.log.Errorf(ctx, "Unable to fetch access access analyzers, error: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) } else { iamResources = append(iamResources, accessAnalyzers) diff --git a/internal/resources/fetching/fetchers/aws/kms_fetcher.go b/internal/resources/fetching/fetchers/aws/kms_fetcher.go index 1242755543..7507585e63 100644 --- a/internal/resources/fetching/fetchers/aws/kms_fetcher.go +++ b/internal/resources/fetching/fetchers/aws/kms_fetcher.go @@ -55,7 +55,7 @@ func (f *KmsFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) er keys, err := f.kms.DescribeSymmetricKeys(ctx) if err != nil { - f.log.Errorf("failed to describe keys from KMS: %v", err) + f.log.Errorf(ctx, "failed to describe keys from KMS: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) return nil } diff --git a/internal/resources/fetching/fetchers/aws/logging_fetcher.go b/internal/resources/fetching/fetchers/aws/logging_fetcher.go index 18342093e9..e00737f480 100644 --- a/internal/resources/fetching/fetchers/aws/logging_fetcher.go +++ b/internal/resources/fetching/fetchers/aws/logging_fetcher.go @@ -71,7 +71,7 @@ func (f LoggingFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) f.log.Debug("Starting LoggingFetcher.Fetch") trails, err := f.loggingProvider.DescribeTrails(ctx) if err != nil { - f.log.Errorf("failed to describe trails: %v", err) + f.log.Errorf(ctx, "failed to describe trails: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) } @@ -86,7 +86,7 @@ func (f LoggingFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) configs, err := f.configserviceProvider.DescribeConfigRecorders(ctx) if err != nil { - f.log.Errorf("failed to describe config recorders: %v", err) + f.log.Errorf(ctx, "failed to describe config recorders: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) return nil } diff --git a/internal/resources/fetching/fetchers/aws/monitoring_fetcher.go b/internal/resources/fetching/fetchers/aws/monitoring_fetcher.go index aa16100f66..6c1c8f8293 100644 --- a/internal/resources/fetching/fetchers/aws/monitoring_fetcher.go +++ b/internal/resources/fetching/fetchers/aws/monitoring_fetcher.go @@ -64,7 +64,7 @@ func (m MonitoringFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metada m.log.Debug("Starting MonitoringFetcher.Fetch") out, err := m.provider.AggregateResources(ctx) if err != nil { - m.log.Errorf("failed to aggregate monitoring resources: %v", err) + m.log.Errorf(ctx, "failed to aggregate monitoring resources: %v", err) awslib.ReportMissingPermission(m.statusHandler, err) } if out != nil { @@ -75,7 +75,7 @@ func (m MonitoringFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metada } hubs, err := m.securityhub.Describe(ctx) if err != nil { - m.log.Errorf("failed to describe security hub: %v", err) + m.log.Errorf(ctx, "failed to describe security hub: %v", err) awslib.ReportMissingPermission(m.statusHandler, err) return nil } diff --git a/internal/resources/fetching/fetchers/aws/network_fetcher.go b/internal/resources/fetching/fetchers/aws/network_fetcher.go index be77c3453e..8a03bb02fa 100644 --- a/internal/resources/fetching/fetchers/aws/network_fetcher.go +++ b/internal/resources/fetching/fetchers/aws/network_fetcher.go @@ -100,26 +100,26 @@ func (f NetworkFetcher) aggregateResources(ctx context.Context, client ec2.Elast var resources []awslib.AwsResource nacl, err := client.DescribeNetworkAcl(ctx) if err != nil { - f.log.Errorf("failed to describe network acl: %v", err) + f.log.Errorf(ctx, "failed to describe network acl: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) } resources = append(resources, nacl...) securityGroups, err := client.DescribeSecurityGroups(ctx) if err != nil { - f.log.Errorf("failed to describe security groups: %v", err) + f.log.Errorf(ctx, "failed to describe security groups: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) } resources = append(resources, securityGroups...) vpcs, err := client.DescribeVpcs(ctx) if err != nil { - f.log.Errorf("failed to describe vpcs: %v", err) + f.log.Errorf(ctx, "failed to describe vpcs: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) } resources = append(resources, vpcs...) ebsEncryption, err := client.GetEbsEncryptionByDefault(ctx) if err != nil { - f.log.Errorf("failed to get ebs encryption by default: %v", err) + f.log.Errorf(ctx, "failed to get ebs encryption by default: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) } diff --git a/internal/resources/fetching/fetchers/aws/rds_fetcher.go b/internal/resources/fetching/fetchers/aws/rds_fetcher.go index f57a51c270..a6dbf4a38b 100644 --- a/internal/resources/fetching/fetchers/aws/rds_fetcher.go +++ b/internal/resources/fetching/fetchers/aws/rds_fetcher.go @@ -56,7 +56,7 @@ func (f *RdsFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) er f.log.Info("Starting RdsFetcher.Fetch") dbInstances, err := f.provider.DescribeDBInstances(ctx) if err != nil { - f.log.Errorf("failed to load some DB instances from rds: %v", err) + f.log.Errorf(ctx, "failed to load some DB instances from rds: %v", err) awslib.ReportMissingPermission(f.statusHandler, err) } diff --git a/internal/resources/fetching/fetchers/aws/s3_fetcher.go b/internal/resources/fetching/fetchers/aws/s3_fetcher.go index c28a259904..8225956a01 100644 --- a/internal/resources/fetching/fetchers/aws/s3_fetcher.go +++ b/internal/resources/fetching/fetchers/aws/s3_fetcher.go @@ -49,7 +49,7 @@ func (f *S3Fetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) err f.log.Info("Starting S3Fetcher.Fetch") buckets, err := f.s3.DescribeBuckets(ctx) if err != nil { - f.log.Errorf("failed to load buckets from S3: %v", err) + f.log.Errorf(ctx, "failed to load buckets from S3: %v", err) return nil } diff --git a/internal/resources/fetching/fetchers/azure/assets_fetcher.go b/internal/resources/fetching/fetchers/azure/assets_fetcher.go index 648518afc0..4135e8d181 100644 --- a/internal/resources/fetching/fetchers/azure/assets_fetcher.go +++ b/internal/resources/fetching/fetchers/azure/assets_fetcher.go @@ -92,7 +92,7 @@ func (f *AzureAssetsFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Meta // Fetching all types even if non-existent in asset group for simplicity r, err := f.provider.ListAllAssetTypesByName(ctx, assetGroup, slices.Collect(maps.Keys(AzureAssetTypeToTypePair))) if err != nil { - f.log.Errorf("AzureAssetsFetcher.Fetch failed to fetch asset group %s: %s", assetGroup, err.Error()) + f.log.Errorf(ctx, "AzureAssetsFetcher.Fetch failed to fetch asset group %s: %s", assetGroup, err.Error()) errAgg = errors.Join(errAgg, err) continue } @@ -101,7 +101,7 @@ func (f *AzureAssetsFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Meta subscriptions, err := f.provider.GetSubscriptions(ctx, cycleMetadata) if err != nil { - f.log.Errorf("Error fetching subscription information: %v", err) + f.log.Errorf(ctx, "Error fetching subscription information: %v", err) } for _, e := range f.enrichers { diff --git a/internal/resources/fetching/fetchers/azure/batch_fetcher.go b/internal/resources/fetching/fetchers/azure/batch_fetcher.go index 7496552b3c..182fdf09ce 100644 --- a/internal/resources/fetching/fetchers/azure/batch_fetcher.go +++ b/internal/resources/fetching/fetchers/azure/batch_fetcher.go @@ -70,7 +70,7 @@ func (f *AzureBatchAssetFetcher) Fetch(ctx context.Context, cycleMetadata cycle. for _, assetGroup := range AzureBatchAssetGroups { r, err := f.provider.ListAllAssetTypesByName(ctx, assetGroup, slices.Collect(maps.Keys(AzureBatchAssets))) if err != nil { - f.log.Errorf("AzureBatchAssetFetcher.Fetch failed to fetch asset group %s: %s", assetGroup, err.Error()) + f.log.Errorf(ctx, "AzureBatchAssetFetcher.Fetch failed to fetch asset group %s: %s", assetGroup, err.Error()) errAgg = errors.Join(errAgg, err) continue } diff --git a/internal/resources/fetching/fetchers/azure/security_fetcher.go b/internal/resources/fetching/fetchers/azure/security_fetcher.go index 8e69ca5f2e..4c27efff34 100644 --- a/internal/resources/fetching/fetchers/azure/security_fetcher.go +++ b/internal/resources/fetching/fetchers/azure/security_fetcher.go @@ -66,7 +66,7 @@ func (f *AzureSecurityAssetFetcher) Fetch(ctx context.Context, cycleMetadata cyc for assetType, fn := range fetches { securityContacts, err := fn(ctx, sub.ShortID) if err != nil { - f.log.Errorf("AzureSecurityAssetFetcher.Fetch failed to fetch %s for subscription %s: %s", assetType, sub.ShortID, err.Error()) + f.log.Errorf(ctx, "AzureSecurityAssetFetcher.Fetch failed to fetch %s for subscription %s: %s", assetType, sub.ShortID, err.Error()) errs = append(errs, err) continue } diff --git a/internal/resources/fetching/fetchers/k8s/file_system_fetcher.go b/internal/resources/fetching/fetchers/k8s/file_system_fetcher.go index 0bbd9bf870..fd79d81370 100644 --- a/internal/resources/fetching/fetchers/k8s/file_system_fetcher.go +++ b/internal/resources/fetching/fetchers/k8s/file_system_fetcher.go @@ -99,20 +99,22 @@ func NewFsFetcher(log *clog.Logger, ch chan fetching.ResourceInfo, patterns []st } } -func (f *FileSystemFetcher) Fetch(_ context.Context, cycleMetadata cycle.Metadata) error { +func (f *FileSystemFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error { f.log.Debug("Starting FileSystemFetcher.Fetch") // Input files might contain glob pattern for _, filePattern := range f.patterns { matchedFiles, err := Glob(filePattern) if err != nil { - f.log.Errorf("Failed to find matched glob for %s, error: %+v", filePattern, err) + // FIXME: This should be a context from the function signature. + f.log.Errorf(ctx, "Failed to find matched glob for %s, error: %+v", filePattern, err) } for _, file := range matchedFiles { - resource, err := f.fetchSystemResource(file) + resource, err := f.fetchSystemResource(ctx, file) if err != nil { - f.log.Errorf("Unable to fetch fileSystemResource for file %v", file) + // FIXME: This should be a context from the function signature. + f.log.Errorf(ctx, "Unable to fetch fileSystemResource for file %v", file) continue } @@ -123,17 +125,17 @@ func (f *FileSystemFetcher) Fetch(_ context.Context, cycleMetadata cycle.Metadat return nil } -func (f *FileSystemFetcher) fetchSystemResource(filePath string) (*FSResource, error) { +func (f *FileSystemFetcher) fetchSystemResource(ctx context.Context, filePath string) (*FSResource, error) { info, err := os.Stat(filePath) if err != nil { return nil, fmt.Errorf("failed to fetch %s, error: %w", filePath, err) } - resourceInfo, _ := f.fromFileInfo(info, filePath) + resourceInfo, _ := f.fromFileInfo(ctx, info, filePath) return resourceInfo, nil } -func (f *FileSystemFetcher) fromFileInfo(info os.FileInfo, path string) (*FSResource, error) { +func (f *FileSystemFetcher) fromFileInfo(ctx context.Context, info os.FileInfo, path string) (*FSResource, error) { if info == nil { return nil, nil } @@ -172,7 +174,7 @@ func (f *FileSystemFetcher) fromFileInfo(info os.FileInfo, path string) (*FSReso return &FSResource{ EvalResource: data, - ElasticCommon: f.createFileCommonData(stat, data, path), + ElasticCommon: f.createFileCommonData(ctx, stat, data, path), }, nil } @@ -232,7 +234,7 @@ func getFSSubType(fileInfo os.FileInfo) string { return FileSubType } -func (f *FileSystemFetcher) createFileCommonData(stat *syscall.Stat_t, data EvalFSResource, path string) FileCommonData { +func (f *FileSystemFetcher) createFileCommonData(ctx context.Context, stat *syscall.Stat_t, data EvalFSResource, path string) FileCommonData { cd := FileCommonData{ Name: data.Name, Mode: data.Mode, @@ -250,7 +252,8 @@ func (f *FileSystemFetcher) createFileCommonData(stat *syscall.Stat_t, data Eval t, err := times.Stat(path) if err != nil { - f.log.Errorf("failed to get file time data (file %s), error - %s", path, err.Error()) + // FIXME: This should be a context from the function signature. + f.log.Errorf(ctx, "failed to get file time data (file %s), error - %s", path, err.Error()) } else { cd.Accessed = t.AccessTime() cd.Mtime = t.ModTime() diff --git a/internal/resources/fetching/fetchers/k8s/kube_fetcher.go b/internal/resources/fetching/fetchers/k8s/kube_fetcher.go index ae43039e2b..80938eedd4 100644 --- a/internal/resources/fetching/fetchers/k8s/kube_fetcher.go +++ b/internal/resources/fetching/fetchers/k8s/kube_fetcher.go @@ -150,7 +150,7 @@ func (f *KubeFetcher) initWatchers() error { return nil } -func (f *KubeFetcher) Fetch(_ context.Context, cycleMetadata cycle.Metadata) error { +func (f *KubeFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error { f.log.Debug("Starting KubeFetcher.Fetch") var err error @@ -163,7 +163,7 @@ func (f *KubeFetcher) Fetch(_ context.Context, cycleMetadata cycle.Metadata) err return fmt.Errorf("could not initate Kubernetes watchers: %w", err) } - getKubeData(f.log, f.watchers, f.resourceCh, cycleMetadata) + getKubeData(ctx, f.log, f.watchers, f.resourceCh, cycleMetadata) return nil } diff --git a/internal/resources/fetching/fetchers/k8s/kube_provider.go b/internal/resources/fetching/fetchers/k8s/kube_provider.go index c2ca3ad5da..0d70da417f 100644 --- a/internal/resources/fetching/fetchers/k8s/kube_provider.go +++ b/internal/resources/fetching/fetchers/k8s/kube_provider.go @@ -18,6 +18,7 @@ package fetchers import ( + "context" "reflect" "github.com/elastic/elastic-agent-autodiscover/kubernetes" @@ -43,7 +44,7 @@ const ( ecsResourceNameField = "orchestrator.resource.name" ) -func getKubeData(log *clog.Logger, watchers []kubernetes.Watcher, resCh chan fetching.ResourceInfo, cycleMetadata cycle.Metadata) { +func getKubeData(ctx context.Context, log *clog.Logger, watchers []kubernetes.Watcher, resCh chan fetching.ResourceInfo, cycleMetadata cycle.Metadata) { log.Debug("Starting getKubeData") for _, watcher := range watchers { @@ -54,13 +55,13 @@ func getKubeData(log *clog.Logger, watchers []kubernetes.Watcher, resCh chan fet resource, ok := r.(kubernetes.Resource) if !ok { - log.Errorf("Bad resource: %#v does not implement kubernetes.Resource", r) + log.Errorf(ctx, "Bad resource: %#v does not implement kubernetes.Resource", r) continue } err := addTypeInformationToKubeResource(resource) if err != nil { - log.Errorf("Bad resource: %v", err) + log.Errorf(ctx, "Bad resource: %v", err) continue } // See https://github.com/kubernetes/kubernetes/issues/3030 resCh <- fetching.ResourceInfo{Resource: K8sResource{log, resource}, CycleMetadata: cycleMetadata} @@ -108,7 +109,9 @@ func (r K8sResource) GetElasticCommonData() (map[string]any, error) { func getK8sObjectMeta(log *clog.Logger, k8sObj reflect.Value) metav1.ObjectMeta { metadata, ok := k8sObj.FieldByName(k8sObjMetadataField).Interface().(metav1.ObjectMeta) if !ok { - log.Errorf("Failed to retrieve object metadata, Resource: %#v", k8sObj) + // Bypassing clog's context-aware Errorf to avoid using context.TODO(). + // This means these logs won't be enriched with tracing information. + log.Logger.Errorf("Failed to retrieve object metadata, Resource: %#v", k8sObj) return metav1.ObjectMeta{} } @@ -118,7 +121,9 @@ func getK8sObjectMeta(log *clog.Logger, k8sObj reflect.Value) metav1.ObjectMeta func getK8sSubType(log *clog.Logger, k8sObj reflect.Value) string { typeMeta, ok := k8sObj.FieldByName(k8sTypeMetadataField).Interface().(metav1.TypeMeta) if !ok { - log.Errorf("Failed to retrieve type metadata, Resource: %#v", k8sObj) + // Bypassing clog's context-aware Errorf to avoid using context.TODO(). + // This means these logs won't be enriched with tracing information. + log.Logger.Errorf("Failed to retrieve type metadata, Resource: %#v", k8sObj) return "" } diff --git a/internal/resources/fetching/fetchers/k8s/process_fetcher.go b/internal/resources/fetching/fetchers/k8s/process_fetcher.go index d1b931afcb..2fdbcf59e5 100644 --- a/internal/resources/fetching/fetchers/k8s/process_fetcher.go +++ b/internal/resources/fetching/fetchers/k8s/process_fetcher.go @@ -129,7 +129,7 @@ func NewProcessFetcher(log *clog.Logger, ch chan fetching.ResourceInfo, processe } } -func (f *ProcessesFetcher) Fetch(_ context.Context, cycleMetadata cycle.Metadata) error { +func (f *ProcessesFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error { f.log.Debug("Starting ProcessesFetcher.Fetch") pids, err := proc.ListFS(f.Fs) @@ -142,14 +142,15 @@ func (f *ProcessesFetcher) Fetch(_ context.Context, cycleMetadata cycle.Metadata for _, p := range pids { stat, err := proc.ReadStatFS(f.Fs, p) if err != nil { - f.log.Errorf("error while reading /proc//stat for process %s: %s", p, err.Error()) + // FIXME: This should be a context from the function signature. + f.log.Errorf(ctx, "error while reading /proc//stat for process %s: %s", p, err.Error()) continue } // Get the full command line name and not the /proc/pid/status one which might be silently truncated. cmd, err := proc.ReadCmdLineFS(f.Fs, p) if err != nil { - f.log.Error("error while reading /proc//cmdline for process %s: %s", p, err.Error()) + f.log.Error(ctx, "error while reading /proc//cmdline for process %s: %s", p, err.Error()) continue } name := extractCommandName(cmd) @@ -159,44 +160,48 @@ func (f *ProcessesFetcher) Fetch(_ context.Context, cycleMetadata cycle.Metadata continue } - fetchedResource := f.fetchProcessData(stat, processConfig, p, cmd) + fetchedResource := f.fetchProcessData(ctx, stat, processConfig, p, cmd) f.resourceCh <- fetching.ResourceInfo{Resource: fetchedResource, CycleMetadata: cycleMetadata} } return nil } -func (f *ProcessesFetcher) fetchProcessData(procStat proc.ProcStat, processConf ProcessInputConfiguration, processId string, cmd string) fetching.Resource { - configMap := f.getProcessConfigurationFile(processConf, cmd, procStat.Name) +func (f *ProcessesFetcher) fetchProcessData(ctx context.Context, procStat proc.ProcStat, processConf ProcessInputConfiguration, processId string, cmd string) fetching.Resource { + configMap := f.getProcessConfigurationFile(ctx, processConf, cmd, procStat.Name) evalRes := EvalProcResource{PID: processId, Cmd: cmd, Stat: procStat, ExternalData: configMap} - procCd := f.createProcCommonData(procStat, cmd, processId) + procCd := f.createProcCommonData(ctx, procStat, cmd, processId) return ProcResource{EvalResource: evalRes, ElasticCommon: procCd} } -func (f *ProcessesFetcher) createProcCommonData(stat proc.ProcStat, cmd string, pid string) ProcCommonData { +func (f *ProcessesFetcher) createProcCommonData(ctx context.Context, stat proc.ProcStat, cmd string, pid string) ProcCommonData { processID, err := strconv.ParseInt(pid, 10, 64) if err != nil { - f.log.Errorf("Couldn't parse PID, pid: %s", pid) + // FIXME: This should be a context from the function signature. + f.log.Errorf(ctx, "Couldn't parse PID, pid: %s", pid) } startTime, err := strconv.ParseUint(stat.StartTime, 10, 64) if err != nil { - f.log.Errorf("Couldn't parse stat.StartTime, startTime: %s", stat.StartTime) + // FIXME: This should be a context from the function signature. + f.log.Errorf(ctx, "Couldn't parse stat.StartTime, startTime: %s", stat.StartTime) } pgid, err := strconv.ParseInt(stat.Group, 10, 64) if err != nil { - f.log.Errorf("Couldn't parse stat.Group, Group: %s, Error: %v", stat.Group, err) + // FIXME: This should be a context from the function signature. + f.log.Errorf(ctx, "Couldn't parse stat.Group, Group: %s, Error: %v", stat.Group, err) } ppid, err := strconv.ParseInt(stat.Parent, 10, 64) if err != nil { - f.log.Errorf("Couldn't parse stat.Parent, Parent: %s, Error: %v", stat.Parent, err) + // FIXME: This should be a context from the function signature. + f.log.Errorf(ctx, "Couldn't parse stat.Parent, Parent: %s, Error: %v", stat.Parent, err) } sysUptime, err := proc.ReadUptimeFS(f.Fs) if err != nil { - f.log.Error("couldn't read system boot time", err) + f.log.Error(ctx, "couldn't read system boot time", err) } uptimeDate := time.Now().Add(-time.Duration(sysUptime) * time.Second) @@ -219,7 +224,7 @@ func (f *ProcessesFetcher) createProcCommonData(stat proc.ProcStat, cmd string, // getProcessConfigurationFile - reads the configuration file associated with a process. // As an input this function receives a ProcessInputConfiguration that contains ConfigFileArguments, a string array that represents some process flags // The function extracts the configuration file associated with each flag and returns it. -func (f *ProcessesFetcher) getProcessConfigurationFile(processConfig ProcessInputConfiguration, cmd string, processName string) map[string]any { +func (f *ProcessesFetcher) getProcessConfigurationFile(ctx context.Context, processConfig ProcessInputConfiguration, cmd string, processName string) map[string]any { configMap := make(map[string]any) for _, argument := range processConfig.ConfigFileArguments { // The regex extracts the cmd line flag(argument) value @@ -232,7 +237,8 @@ func (f *ProcessesFetcher) getProcessConfigurationFile(processConfig ProcessInpu groupMatches := matcher.FindStringSubmatch(cmd) if len(groupMatches) < 2 { - f.log.Errorf("Couldn't find a configuration file associated with flag %s for process %s", argument, processName) + // FIXME: This should be a context from the function signature. + f.log.Errorf(ctx, "Couldn't find a configuration file associated with flag %s for process %s", argument, processName) continue } argValue := matcher.FindStringSubmatch(cmd)[1] @@ -240,12 +246,14 @@ func (f *ProcessesFetcher) getProcessConfigurationFile(processConfig ProcessInpu data, err := fs.ReadFile(f.Fs, argValue) if err != nil { - f.log.Errorf("Failed to read file configuration for process %s, error - %+v", processName, err) + // FIXME: This should be a context from the function signature. + f.log.Errorf(ctx, "Failed to read file configuration for process %s, error - %+v", processName, err) continue } configFile, err := f.readConfigurationFile(argValue, data) if err != nil { - f.log.Errorf("Failed to parse file configuration for process %s, error - %+v", processName, err) + // FIXME: This should be a context from the function signature. + f.log.Errorf(ctx, "Failed to parse file configuration for process %s, error - %+v", processName, err) continue } configMap[argument] = configFile diff --git a/internal/resources/fetching/manager/manager.go b/internal/resources/fetching/manager/manager.go index 219c2f71c0..4070817edd 100644 --- a/internal/resources/fetching/manager/manager.go +++ b/internal/resources/fetching/manager/manager.go @@ -84,7 +84,7 @@ func (m *Manager) Stop() { func (m *Manager) fetchAndSleep(ctx context.Context) { counter, err := meter.Int64Counter("cloudbeat.fetcher.manager.cycles") if err != nil { - m.log.Errorf("Failed to create fetcher manager cycles counter: %v", err) + m.log.Errorf(ctx, "Failed to create fetcher manager cycles counter: %v", err) } // set immediate exec for first time run @@ -134,7 +134,7 @@ func (m *Manager) fetchIteration(ctx context.Context) { defer wg.Done() err := m.fetchSingle(ctx, k, cycle.Metadata{Sequence: seq}) if err != nil { - logger.Errorf("Error running fetcher for key %s: %v", k, err) + logger.Errorf(ctx, "Error running fetcher for key %s: %v", k, err) } }(key) } diff --git a/internal/resources/fetching/registry/registry.go b/internal/resources/fetching/registry/registry.go index 281bbea99a..a8de1bfd75 100644 --- a/internal/resources/fetching/registry/registry.go +++ b/internal/resources/fetching/registry/registry.go @@ -123,7 +123,7 @@ func (r *registry) Update(ctx context.Context) { } fm, err := r.updater(ctx) if err != nil { - r.log.Errorf("Failed to update registry: %v", err) + r.log.Errorf(ctx, "Failed to update registry: %v", err) return } r.reg = fm diff --git a/internal/resources/providers/aws_cis/logging/provider.go b/internal/resources/providers/aws_cis/logging/provider.go index 79d365af72..5204729479 100644 --- a/internal/resources/providers/aws_cis/logging/provider.go +++ b/internal/resources/providers/aws_cis/logging/provider.go @@ -52,17 +52,17 @@ func (p *Provider) DescribeTrails(ctx context.Context) ([]awslib.AwsResource, er } bucketPolicy, policyErr := p.s3Provider.GetBucketPolicy(ctx, info.Trail.S3BucketName, *info.Trail.HomeRegion) if policyErr != nil { - p.log.Errorf("Error getting bucket policy for bucket %s: %v", *info.Trail.S3BucketName, policyErr) + p.log.Errorf(ctx, "Error getting bucket policy for bucket %s: %v", *info.Trail.S3BucketName, policyErr) } aclGrants, aclErr := p.s3Provider.GetBucketACL(ctx, info.Trail.S3BucketName, *info.Trail.HomeRegion) if aclErr != nil { - p.log.Errorf("Error getting bucket ACL for bucket %s: %v", *info.Trail.S3BucketName, aclErr) + p.log.Errorf(ctx, "Error getting bucket ACL for bucket %s: %v", *info.Trail.S3BucketName, aclErr) } bucketLogging, loggingErr := p.s3Provider.GetBucketLogging(ctx, info.Trail.S3BucketName, *info.Trail.HomeRegion) if loggingErr != nil { - p.log.Errorf("Error getting bucket logging for bucket %s: %v", *info.Trail.S3BucketName, loggingErr) + p.log.Errorf(ctx, "Error getting bucket logging for bucket %s: %v", *info.Trail.S3BucketName, loggingErr) } enrichedTrails = append(enrichedTrails, EnrichedTrail{ diff --git a/internal/resources/providers/aws_cis/monitoring/monitoring.go b/internal/resources/providers/aws_cis/monitoring/monitoring.go index 3a0076ab76..9bab32031a 100644 --- a/internal/resources/providers/aws_cis/monitoring/monitoring.go +++ b/internal/resources/providers/aws_cis/monitoring/monitoring.go @@ -98,11 +98,11 @@ func (p *Provider) AggregateResources(ctx context.Context) (*Resource, error) { } metrics, err := p.Cloudwatchlogs.DescribeMetricFilters(ctx, info.Trail.HomeRegion, logGroup) if err != nil { - p.Log.Errorf("failed to describe metric filters for cloudwatchlog log group arn %s: %v", *info.Trail.CloudWatchLogsLogGroupArn, err) + p.Log.Errorf(ctx, "failed to describe metric filters for cloudwatchlog log group arn %s: %v", *info.Trail.CloudWatchLogsLogGroupArn, err) continue } - parsedMetrics := p.parserMetrics(metrics) + parsedMetrics := p.parserMetrics(ctx, metrics) names := filterNamesFromMetrics(metrics) if len(names) == 0 { @@ -117,7 +117,7 @@ func (p *Provider) AggregateResources(ctx context.Context) (*Resource, error) { for _, name := range names { alarms, err := p.Cloudwatch.DescribeAlarms(ctx, info.Trail.HomeRegion, []string{name}) if err != nil { - p.Log.Errorf("failed to describe alarms for cloudwatch filter %v: %v", names, err) + p.Log.Errorf(ctx, "failed to describe alarms for cloudwatch filter %v: %v", names, err) continue } topics := p.getSubscriptionForAlarms(ctx, info.Trail.HomeRegion, alarms) @@ -133,7 +133,7 @@ func (p *Provider) AggregateResources(ctx context.Context) (*Resource, error) { return &Resource{Items: items}, nil } -func (p *Provider) parserMetrics(metrics []cloudwatchlogs_types.MetricFilter) []MetricFilter { +func (p *Provider) parserMetrics(ctx context.Context, metrics []cloudwatchlogs_types.MetricFilter) []MetricFilter { parsedMetrics := make([]MetricFilter, 0, len(metrics)) for _, m := range metrics { if m.FilterPattern == nil { @@ -145,7 +145,8 @@ func (p *Provider) parserMetrics(metrics []cloudwatchlogs_types.MetricFilter) [] exp, err := parseFilterPattern(*m.FilterPattern) if err != nil { - p.Log.Errorf("failed to parse metric filter pattern: %v (pattern: %s)", err, *m.FilterPattern) + // FIXME: This should be a context from the function signature. + p.Log.Errorf(ctx, "failed to parse metric filter pattern: %v (pattern: %s)", err, *m.FilterPattern) parsedMetrics = append(parsedMetrics, MetricFilter{ MetricFilter: m, }) @@ -166,7 +167,7 @@ func (p *Provider) getSubscriptionForAlarms(ctx context.Context, region *string, for _, action := range alarm.AlarmActions { subscriptions, err := p.Sns.ListSubscriptionsByTopic(ctx, pointers.Deref(region), action) if err != nil { - p.Log.Errorf("failed to list subscriptions for topic %s: %v", action, err) + p.Log.Errorf(ctx, "failed to list subscriptions for topic %s: %v", action, err) continue } for _, topic := range subscriptions { diff --git a/internal/resources/providers/awslib/account_provider.go b/internal/resources/providers/awslib/account_provider.go index 8ccb8cd67d..815bfa2994 100644 --- a/internal/resources/providers/awslib/account_provider.go +++ b/internal/resources/providers/awslib/account_provider.go @@ -64,7 +64,7 @@ func listAccounts(ctx context.Context, log *clog.Logger, client organizationsAPI organization, err := getOUInfoForAccount(ctx, client, organizationIdToName, account.Id) if err != nil { - log.Errorf("failed to get organizational unit info for account %s: %v", *account.Id, err) + log.Errorf(ctx, "failed to get organizational unit info for account %s: %v", *account.Id, err) } accounts = append(accounts, cloud.Identity{ Provider: "aws", diff --git a/internal/resources/providers/awslib/all_region_selector.go b/internal/resources/providers/awslib/all_region_selector.go index 14bad6e1ed..5ef32be459 100644 --- a/internal/resources/providers/awslib/all_region_selector.go +++ b/internal/resources/providers/awslib/all_region_selector.go @@ -47,7 +47,7 @@ func (s *allRegionSelector) Regions(ctx context.Context, cfg aws.Config) ([]stri output, err := s.client.DescribeRegions(ctx, nil) if err != nil { - log.Errorf("Failed getting available regions: %v", err) + log.Errorf(ctx, "Failed getting available regions: %v", err) return nil, err } diff --git a/internal/resources/providers/awslib/cached_region_selector.go b/internal/resources/providers/awslib/cached_region_selector.go index 07a8feae7d..a7a870adc9 100644 --- a/internal/resources/providers/awslib/cached_region_selector.go +++ b/internal/resources/providers/awslib/cached_region_selector.go @@ -94,12 +94,12 @@ func (s *cachedRegionSelector) Regions(ctx context.Context, cfg aws.Config) ([]s var output []string output, err := s.client.Regions(ctx, cfg) if err != nil { - log.Errorf("Failed getting regions: %v", err) + log.Errorf(ctx, "Failed getting regions: %v", err) return nil, err } if !s.setCache(output) { - log.Errorf("Failed setting regions cache") + log.Errorf(ctx, "Failed setting regions cache") } return output, nil } diff --git a/internal/resources/providers/awslib/cloudtrail/provider.go b/internal/resources/providers/awslib/cloudtrail/provider.go index 02b2102a2b..6dc8d78dc9 100644 --- a/internal/resources/providers/awslib/cloudtrail/provider.go +++ b/internal/resources/providers/awslib/cloudtrail/provider.go @@ -57,12 +57,12 @@ func (p Provider) DescribeTrails(ctx context.Context) ([]TrailInfo, error) { } status, err := p.getTrailStatus(ctx, trail) if err != nil { - p.log.Errorf("failed to get trail status %s %v", *trail.TrailARN, err.Error()) + p.log.Errorf(ctx, "failed to get trail status %s %v", *trail.TrailARN, err.Error()) } selectors, err := p.getEventSelectors(ctx, trail) if err != nil { - p.log.Errorf("failed to get trail event selector %s %v", *trail.TrailARN, err.Error()) + p.log.Errorf(ctx, "failed to get trail event selector %s %v", *trail.TrailARN, err.Error()) } result = append(result, TrailInfo{ diff --git a/internal/resources/providers/awslib/config.go b/internal/resources/providers/awslib/config.go index 84ea1b76d1..d51d143f20 100644 --- a/internal/resources/providers/awslib/config.go +++ b/internal/resources/providers/awslib/config.go @@ -138,7 +138,7 @@ func CredentialsValid(ctx context.Context, cnf aws.Config, log *clog.Logger) boo } if !strings.Contains(err.Error(), "not authorized to perform: sts:AssumeRole") { - log.Errorf("Expected a 403 authorization error, but got: %v", err) + log.Errorf(ctx, "Expected a 403 authorization error, but got: %v", err) } return false diff --git a/internal/resources/providers/awslib/configservice/provider.go b/internal/resources/providers/awslib/configservice/provider.go index 04a42f0e59..8f2fc2ad61 100644 --- a/internal/resources/providers/awslib/configservice/provider.go +++ b/internal/resources/providers/awslib/configservice/provider.go @@ -29,7 +29,7 @@ func (p *Provider) DescribeConfigRecorders(ctx context.Context) ([]awslib.AwsRes configs, err := awslib.MultiRegionFetch(ctx, p.clients, func(ctx context.Context, region string, c Client) (awslib.AwsResource, error) { recorderList, err := c.DescribeConfigurationRecorders(ctx, nil) if err != nil { - p.log.Errorf("Error fetching AWS Config recorders: %v", err) + p.log.Errorf(ctx, "Error fetching AWS Config recorders: %v", err) return nil, err } @@ -40,7 +40,7 @@ func (p *Provider) DescribeConfigRecorders(ctx context.Context) ([]awslib.AwsRes }) if err != nil { - p.log.Error("Error fetching recorder status, recorder: %v , Error: %v:", recorder, err) + p.log.Error(ctx, "Error fetching recorder status, recorder: %v , Error: %v:", recorder, err) return nil, err } result = append(result, Recorder{ diff --git a/internal/resources/providers/awslib/current_region_selector.go b/internal/resources/providers/awslib/current_region_selector.go index d58ee76ed8..c3c3d29fb1 100644 --- a/internal/resources/providers/awslib/current_region_selector.go +++ b/internal/resources/providers/awslib/current_region_selector.go @@ -39,7 +39,7 @@ func (s *currentRegionSelector) Regions(ctx context.Context, cfg aws.Config) ([] metadata, err := s.client.GetMetadata(ctx, cfg) if err != nil { - log.Errorf("Failed getting current region: %v", err) + log.Errorf(ctx, "Failed getting current region: %v", err) return nil, err } diff --git a/internal/resources/providers/awslib/ec2/provider.go b/internal/resources/providers/awslib/ec2/provider.go index 31d0bb7d05..e7a07c0cc7 100644 --- a/internal/resources/providers/awslib/ec2/provider.go +++ b/internal/resources/providers/awslib/ec2/provider.go @@ -362,7 +362,7 @@ func (p *Provider) IterOwnedSnapshots(ctx context.Context, before time.Time) ite return nil, nil }) if err != nil { - p.log.Errorf("Error listing owned snapshots: %v", err) + p.log.Errorf(ctx, "Error listing owned snapshots: %v", err) } } } @@ -492,7 +492,7 @@ func (p *Provider) DescribeVolumes(ctx context.Context, instances []*Ec2Instance var result []*Volume for _, vol := range allVolumes { if len(vol.Attachments) != 1 { - p.log.Errorf("Volume %s has %d attachments", *vol.VolumeId, len(vol.Attachments)) + p.log.Errorf(ctx, "Volume %s has %d attachments", *vol.VolumeId, len(vol.Attachments)) continue } @@ -564,7 +564,7 @@ func (p *Provider) DescribeVpcs(ctx context.Context) ([]awslib.AwsResource, erro }, }}) if err != nil { - p.log.Errorf("Error fetching flow logs for VPC %s: %v", *vpc.VpcId, err.Error()) + p.log.Errorf(ctx, "Error fetching flow logs for VPC %s: %v", *vpc.VpcId, err.Error()) continue } diff --git a/internal/resources/providers/awslib/elb_v2/provider_v2.go b/internal/resources/providers/awslib/elb_v2/provider_v2.go index d7e8699df2..e719b85c6e 100644 --- a/internal/resources/providers/awslib/elb_v2/provider_v2.go +++ b/internal/resources/providers/awslib/elb_v2/provider_v2.go @@ -54,7 +54,7 @@ func (p *Provider) DescribeLoadBalancers(ctx context.Context) ([]awslib.AwsResou } listeners, err := p.describeListeners(ctx, region, loadBalancer.GetResourceArn()) if err != nil { - p.log.Errorf("Error fetching listeners for %s: %v", loadBalancer.GetResourceArn(), err) + p.log.Errorf(ctx, "Error fetching listeners for %s: %v", loadBalancer.GetResourceArn(), err) } else { loadBalancer.Listeners = listeners } diff --git a/internal/resources/providers/awslib/iam/policy.go b/internal/resources/providers/awslib/iam/policy.go index f406ea220a..9cec7eb17c 100644 --- a/internal/resources/providers/awslib/iam/policy.go +++ b/internal/resources/providers/awslib/iam/policy.go @@ -223,7 +223,7 @@ func (p Provider) listInlinePolicies(ctx context.Context, identity *string) ([]P UserName: identity, }) if err != nil { - p.log.Errorf("fail to get inline policy for user: %s, policy name: %s", *identity, policyNames[i]) + p.log.Errorf(ctx, "fail to get inline policy for user: %s, policy name: %s", *identity, policyNames[i]) policies = append(policies, PolicyDocument{PolicyName: policyNames[i]}) continue } diff --git a/internal/resources/providers/awslib/iam/role_policy.go b/internal/resources/providers/awslib/iam/role_policy.go index 65d683c92e..a5c0f4acfe 100644 --- a/internal/resources/providers/awslib/iam/role_policy.go +++ b/internal/resources/providers/awslib/iam/role_policy.go @@ -41,7 +41,7 @@ func (p Provider) GetIAMRolePermissions(ctx context.Context, roleName string) ([ policy, err := p.client.GetRolePolicy(ctx, input) if err != nil { - p.log.Errorf("Failed to get policy %s: %v", *policyId.PolicyName, err) + p.log.Errorf(ctx, "Failed to get policy %s: %v", *policyId.PolicyName, err) continue } diff --git a/internal/resources/providers/awslib/iam/root_account.go b/internal/resources/providers/awslib/iam/root_account.go index a2400bf23d..05c907f0ec 100644 --- a/internal/resources/providers/awslib/iam/root_account.go +++ b/internal/resources/providers/awslib/iam/root_account.go @@ -27,15 +27,15 @@ import ( "github.com/aws/aws-sdk-go-v2/service/iam/types" ) -func (p Provider) getRootAccountUser(rootAccount *CredentialReport) *types.User { +func (p Provider) getRootAccountUser(ctx context.Context, rootAccount *CredentialReport) *types.User { if rootAccount == nil { - p.log.Error("no root account entry was provided") + p.log.Error(ctx, "no root account entry was provided") return nil } rootDate, err := time.Parse(time.RFC3339, rootAccount.UserCreation) if err != nil { - p.log.Errorf("fail to parse root account user creation, error: %v", err) + p.log.Errorf(ctx, "fail to parse root account user creation, error: %v", err) return nil } @@ -45,7 +45,7 @@ func (p Provider) getRootAccountUser(rootAccount *CredentialReport) *types.User if rootAccount.PasswordLastUsed != "no_information" && rootAccount.PasswordLastUsed != "N/A" { pwdLastUsed, err = time.Parse(time.RFC3339, rootAccount.PasswordLastUsed) if err != nil { - p.log.Errorf("fail to parse root account password last used, error: %v", err) + p.log.Errorf(ctx, "fail to parse root account password last used, error: %v", err) return nil } } diff --git a/internal/resources/providers/awslib/iam/user.go b/internal/resources/providers/awslib/iam/user.go index 3305bedfd8..34617a8635 100644 --- a/internal/resources/providers/awslib/iam/user.go +++ b/internal/resources/providers/awslib/iam/user.go @@ -54,7 +54,7 @@ func (p Provider) GetUsers(ctx context.Context) ([]awslib.AwsResource, error) { return nil, err } - rootUser := p.getRootAccountUser(credentialReport[rootAccount]) + rootUser := p.getRootAccountUser(ctx, credentialReport[rootAccount]) if rootUser != nil { apiUsers = append(apiUsers, *rootUser) } @@ -80,23 +80,23 @@ func (p Provider) GetUsers(ctx context.Context) ([]awslib.AwsResource, error) { mfaDevices, err := p.getMFADevices(ctx, apiUser, userAccount) if err != nil { - p.log.Errorf("fail to list mfa device for user: %s, error: %v", username, err) + p.log.Errorf(ctx, "fail to list mfa device for user: %s, error: %v", username, err) } pwdEnabled, err := isPasswordEnabled(userAccount) if err != nil { - p.log.Errorf("fail to parse PasswordEnabled for user: %s, error: %v", username, err) + p.log.Errorf(ctx, "fail to parse PasswordEnabled for user: %s, error: %v", username, err) pwdEnabled = false } inlinePolicies, err := p.listInlinePolicies(ctx, apiUser.UserName) if err != nil && !isRootUser(username) { - p.log.Errorf("fail to list inline policies for user: %s, error: %v", username, err) + p.log.Errorf(ctx, "fail to list inline policies for user: %s, error: %v", username, err) } attachedPolicies, err := p.listAttachedPolicies(ctx, apiUser.UserName) if err != nil && !isRootUser(username) { - p.log.Errorf("fail to list attached policies for user: %s, error: %v", username, err) + p.log.Errorf(ctx, "fail to list attached policies for user: %s, error: %v", username, err) } users = append(users, User{ diff --git a/internal/resources/providers/awslib/kms/provider.go b/internal/resources/providers/awslib/kms/provider.go index e9a9f1135c..f595084f61 100644 --- a/internal/resources/providers/awslib/kms/provider.go +++ b/internal/resources/providers/awslib/kms/provider.go @@ -62,7 +62,7 @@ func (p *Provider) DescribeSymmetricKeys(ctx context.Context) ([]awslib.AwsResou KeyId: keyEntry.KeyId, }) if err != nil { - p.log.Error(err.Error()) + p.log.Error(ctx, err.Error()) continue } @@ -78,7 +78,7 @@ func (p *Provider) DescribeSymmetricKeys(ctx context.Context) ([]awslib.AwsResou KeyId: keyEntry.KeyId, }) if err != nil { - p.log.Error(err.Error()) + p.log.Error(ctx, err.Error()) continue } diff --git a/internal/resources/providers/awslib/multi_region.go b/internal/resources/providers/awslib/multi_region.go index 4548009b38..b09b11e241 100644 --- a/internal/resources/providers/awslib/multi_region.go +++ b/internal/resources/providers/awslib/multi_region.go @@ -53,7 +53,7 @@ func (w *MultiRegionClientFactory[T]) NewMultiRegionClients(ctx context.Context, clientsMap := make(map[string]T, 0) regionList, err := selector.Regions(ctx, cfg) if err != nil { - log.Errorf("Region '%s' selected after failure to retrieve aws regions: %v", cfg.Region, err) + log.Errorf(ctx, "Region '%s' selected after failure to retrieve aws regions: %v", cfg.Region, err) regionList = []string{cfg.Region} } for _, region := range regionList { diff --git a/internal/resources/providers/awslib/rds/provider.go b/internal/resources/providers/awslib/rds/provider.go index f1d133d6e5..7c0c90caa8 100644 --- a/internal/resources/providers/awslib/rds/provider.go +++ b/internal/resources/providers/awslib/rds/provider.go @@ -52,7 +52,7 @@ func (p Provider) DescribeDBInstances(ctx context.Context) ([]awslib.AwsResource for { output, err := c.DescribeDBInstances(ctx, dbInstancesInput) if err != nil { - p.log.Errorf("Could not describe DB instances. Error: %v", err) + p.log.Errorf(ctx, "Could not describe DB instances. Error: %v", err) return result, err } @@ -89,7 +89,7 @@ func (p Provider) getDBInstanceSubnets(ctx context.Context, region string, dbIns resultSubnet := Subnet{ID: *subnet.SubnetIdentifier, RouteTable: nil} routeTableForSubnet, err := p.ec2.GetRouteTableForSubnet(ctx, region, *subnet.SubnetIdentifier, *dbInstance.DBSubnetGroup.VpcId) if err != nil { - p.log.Errorf("Could not get route table for subnet %s of DB %s. Error: %v", *subnet.SubnetIdentifier, *dbInstance.DBInstanceIdentifier, err) + p.log.Errorf(ctx, "Could not get route table for subnet %s of DB %s. Error: %v", *subnet.SubnetIdentifier, *dbInstance.DBInstanceIdentifier, err) } else { var routes []Route for _, route := range routeTableForSubnet.Routes { diff --git a/internal/resources/providers/awslib/s3/provider.go b/internal/resources/providers/awslib/s3/provider.go index 4d176bafd5..6f871a8c90 100644 --- a/internal/resources/providers/awslib/s3/provider.go +++ b/internal/resources/providers/awslib/s3/provider.go @@ -65,7 +65,7 @@ func (p Provider) DescribeBuckets(ctx context.Context) ([]awslib.AwsResource, er } clientBuckets, err := defaultClient.ListBuckets(ctx, &s3Client.ListBucketsInput{}) if err != nil { - p.log.Errorf("Could not list s3 buckets: %v", err) + p.log.Errorf(ctx, "Could not list s3 buckets: %v", err) return nil, err } @@ -77,7 +77,7 @@ func (p Provider) DescribeBuckets(ctx context.Context) ([]awslib.AwsResource, er accountPublicAccessBlockConfig, accountPublicAccessBlockErr := p.getAccountPublicAccessBlock(ctx) if accountPublicAccessBlockErr != nil { - p.log.Errorf("Could not get account public access block configuration. Err: %v", accountPublicAccessBlockErr) + p.log.Errorf(ctx, "Could not get account public access block configuration. Err: %v", accountPublicAccessBlockErr) } bucketsRegionsMapping := p.getBucketsRegionMapping(ctx, clientBuckets.Buckets) @@ -87,22 +87,22 @@ func (p Provider) DescribeBuckets(ctx context.Context) ([]awslib.AwsResource, er // of the flow, so we should keep describing the bucket even if getting these objects fails. sseAlgorithm, encryptionErr := p.getBucketEncryptionAlgorithm(ctx, bucket.Name, region) if encryptionErr != nil { - p.log.Errorf("Could not get encryption for bucket %s. Error: %v", *bucket.Name, encryptionErr) + p.log.Errorf(ctx, "Could not get encryption for bucket %s. Error: %v", *bucket.Name, encryptionErr) } bucketPolicy, policyErr := p.GetBucketPolicy(ctx, bucket.Name, region) if policyErr != nil { - p.log.Errorf("Could not get bucket policy for bucket %s. Error: %v", *bucket.Name, policyErr) + p.log.Errorf(ctx, "Could not get bucket policy for bucket %s. Error: %v", *bucket.Name, policyErr) } bucketVersioning, versioningErr := p.getBucketVersioning(ctx, bucket.Name, region) if versioningErr != nil { - p.log.Errorf("Could not get bucket versioning for bucket %s. Err: %v", *bucket.Name, versioningErr) + p.log.Errorf(ctx, "Could not get bucket versioning for bucket %s. Err: %v", *bucket.Name, versioningErr) } publicAccessBlockConfiguration, publicAccessBlockErr := p.getPublicAccessBlock(ctx, bucket.Name, region) if publicAccessBlockErr != nil { - p.log.Errorf("Could not get public access block configuration for bucket %s. Err: %v", *bucket.Name, publicAccessBlockErr) + p.log.Errorf(ctx, "Could not get public access block configuration for bucket %s. Err: %v", *bucket.Name, publicAccessBlockErr) } result = append(result, BucketDescription{ @@ -191,7 +191,7 @@ func (p Provider) getBucketsRegionMapping(ctx context.Context, buckets []types.B // If we could not get the Region for a bucket, additional API calls for resources will probably fail, we should // not describe this bucket. if regionErr != nil { - p.log.Errorf("Could not get bucket location for bucket %s. Not describing this bucket. Error: %v", *clientBucket.Name, regionErr) + p.log.Errorf(ctx, "Could not get bucket location for bucket %s. Not describing this bucket. Error: %v", *clientBucket.Name, regionErr) continue } diff --git a/internal/resources/providers/awslib/sns/provider.go b/internal/resources/providers/awslib/sns/provider.go index e6d1c5c2ad..f17b440794 100644 --- a/internal/resources/providers/awslib/sns/provider.go +++ b/internal/resources/providers/awslib/sns/provider.go @@ -48,7 +48,7 @@ func (p *Provider) ListTopics(ctx context.Context) ([]types.Topic, error) { for { output, err := c.ListTopics(ctx, input) if err != nil { - p.log.Errorf("Could not list SNS Topics. Error: %s", err) + p.log.Errorf(ctx, "Could not list SNS Topics. Error: %s", err) return nil, err } all = append(all, output.Topics...) @@ -93,7 +93,7 @@ func (p *Provider) ListTopicsWithSubscriptions(ctx context.Context) ([]awslib.Aw for { output, err := c.ListTopics(ctx, input) if err != nil { - p.log.Errorf("Could not list SNS Topics. Error: %s", err) + p.log.Errorf(ctx, "Could not list SNS Topics. Error: %s", err) return nil, err } @@ -104,7 +104,7 @@ func (p *Provider) ListTopicsWithSubscriptions(ctx context.Context) ([]awslib.Aw } subscriptions, err := p.ListSubscriptionsByTopic(ctx, region, topicInfo.GetResourceArn()) if err != nil { - p.log.Errorf("Could not list SNS Subscriptions for Topic %q. Error: %s", topicInfo.GetResourceArn(), err) + p.log.Errorf(ctx, "Could not list SNS Subscriptions for Topic %q. Error: %s", topicInfo.GetResourceArn(), err) } else { topicInfo.Subscriptions = subscriptions } diff --git a/internal/resources/providers/azurelib/inventory/storage_provider.go b/internal/resources/providers/azurelib/inventory/storage_provider.go index 2f9da7e6f2..d94279d8c2 100644 --- a/internal/resources/providers/azurelib/inventory/storage_provider.go +++ b/internal/resources/providers/azurelib/inventory/storage_provider.go @@ -155,12 +155,12 @@ func (p *storageAccountProvider) ListStorageAccounts(ctx context.Context, storag for _, saID := range storageAccountsSubscriptionsIds { res, err := p.client.AssetAccountStorage(ctx, saID, nil) if err != nil { - p.log.Errorf("error while fetching storage accounts for subscriptionId: %s, error: %v", saID, err) + p.log.Errorf(ctx, "error while fetching storage accounts for subscriptionId: %s, error: %v", saID, err) continue } storageAccountsAssets, err := transformStorageAccounts(res, saID) if err != nil { - p.log.Errorf("error while transforming storage for subscriptionId: %s, error: %v", saID, err) + p.log.Errorf(ctx, "error while transforming storage for subscriptionId: %s, error: %v", saID, err) continue } assets = append(assets, storageAccountsAssets...) @@ -256,7 +256,7 @@ func (p *storageAccountProvider) ListStorageAccountFileServices(ctx context.Cont for _, item := range response.Value { properties, err := maps.AsMapStringAny(item.FileServiceProperties) if err != nil { - p.log.Errorf("error while transforming azure queue services for storage accounts %s: %v", sa.Id, err) + p.log.Errorf(ctx, "error while transforming azure queue services for storage accounts %s: %v", sa.Id, err) } assets = append(assets, AzureAsset{ @@ -288,7 +288,7 @@ func (p *storageAccountProvider) ListStorageAccountFileShares(ctx context.Contex fileShares, err := transformFileShares(responses, sa) if err != nil { - p.log.Errorf("error while transforming azure file share for storage accounts %s: %v", sa.Id, err) + p.log.Errorf(ctx, "error while transforming azure file share for storage accounts %s: %v", sa.Id, err) } assets = append(assets, fileShares...) @@ -307,7 +307,7 @@ func (p *storageAccountProvider) ListStorageAccountQueues(ctx context.Context, s queues, err := transformQueues(responses, sa) if err != nil { - p.log.Errorf("error while transforming azure queues for storage accounts %s: %v", sa.Id, err) + p.log.Errorf(ctx, "error while transforming azure queues for storage accounts %s: %v", sa.Id, err) } assets = append(assets, queues...) @@ -327,7 +327,7 @@ func (p *storageAccountProvider) ListStorageAccountQueueServices(ctx context.Con for _, item := range response.Value { properties, err := maps.AsMapStringAny(item.QueueServiceProperties) if err != nil { - p.log.Errorf("error while transforming azure queue services for storage accounts %s: %v", sa.Id, err) + p.log.Errorf(ctx, "error while transforming azure queue services for storage accounts %s: %v", sa.Id, err) } assets = append(assets, AzureAsset{ @@ -358,7 +358,7 @@ func (p *storageAccountProvider) ListStorageAccountTables(ctx context.Context, s tables, err := transformTables(responses, sa) if err != nil { - p.log.Errorf("error while transforming azure tables for storage accounts %s: %v", sa.Id, err) + p.log.Errorf(ctx, "error while transforming azure tables for storage accounts %s: %v", sa.Id, err) } assets = append(assets, tables...) @@ -377,7 +377,7 @@ func (p *storageAccountProvider) ListStorageAccountTableServices(ctx context.Con for _, item := range response.Value { properties, err := maps.AsMapStringAny(item.TableServiceProperties) if err != nil { - p.log.Errorf("error while transforming azure table services for storage accounts %s: %v", sa.Id, err) + p.log.Errorf(ctx, "error while transforming azure table services for storage accounts %s: %v", sa.Id, err) } assets = append(assets, AzureAsset{ diff --git a/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go index 3bafd14d2f..e9e07a9df0 100644 --- a/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go +++ b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go @@ -97,7 +97,7 @@ func (rl *AssetsInventoryRateLimiter) Wait(ctx context.Context, method string, r if limiter != nil { err := limiter.Wait(ctx) if err != nil { - rl.log.Errorf("Failed to wait for project quota on method: %s, request: %v, error: %v", method, req, err) + rl.log.Errorf(ctx, "Failed to wait for project quota on method: %s, request: %v, error: %v", method, req, err) } } } diff --git a/internal/resources/providers/gcplib/inventory/provider.go b/internal/resources/providers/gcplib/inventory/provider.go index 4422807019..21bcb44105 100644 --- a/internal/resources/providers/gcplib/inventory/provider.go +++ b/internal/resources/providers/gcplib/inventory/provider.go @@ -413,7 +413,7 @@ func (p *Provider) getAllAssets(ctx context.Context, out chan<- *ExtendedGcpAsse } if err != nil { - p.log.Errorf("Error fetching GCP %v of types: %v for %v: %v\n", req.ContentType, req.AssetTypes, req.Parent, err) + p.log.Errorf(ctx, "Error fetching GCP %v of types: %v for %v: %v\n", req.ContentType, req.AssetTypes, req.Parent, err) return } diff --git a/internal/resources/providers/gcplib/inventory/resource_manager.go b/internal/resources/providers/gcplib/inventory/resource_manager.go index 4dc0ae5569..de8159e6f1 100644 --- a/internal/resources/providers/gcplib/inventory/resource_manager.go +++ b/internal/resources/providers/gcplib/inventory/resource_manager.go @@ -55,7 +55,7 @@ func NewResourceManagerWrapper(ctx context.Context, log *clog.Logger, gcpConfig getProjectDisplayName: func(ctx context.Context, parent string) string { prj, err := crmService.Projects.Get(parent).Context(ctx).Do() if err != nil { - log.Errorf("error fetching GCP Project: %s, error: %s", parent, err) + log.Errorf(ctx, "error fetching GCP Project: %s, error: %s", parent, err) return "" } return prj.DisplayName @@ -63,7 +63,7 @@ func NewResourceManagerWrapper(ctx context.Context, log *clog.Logger, gcpConfig getOrganizationDisplayName: func(ctx context.Context, parent string) string { org, err := crmService.Organizations.Get(parent).Context(ctx).Do() if err != nil { - log.Errorf("error fetching GCP Org: %s, error: %s", parent, err) + log.Errorf(ctx, "error fetching GCP Org: %s, error: %s", parent, err) return "" } return org.DisplayName @@ -81,7 +81,7 @@ func (c *ResourceManagerWrapper) GetCloudMetadata(ctx context.Context, asset *as if valid { return cloudAccountMetadata } - c.log.Errorf("error casting cloud account metadata for key: %s", key) + c.log.Errorf(ctx, "error casting cloud account metadata for key: %s", key) } cloudAccountMetadata := c.getMetadata(ctx, orgId, projectId) c.accountMetadataCache.Store(key, cloudAccountMetadata) diff --git a/internal/resources/providers/msgraph/provider.go b/internal/resources/providers/msgraph/provider.go index e0d5f1fbd2..7a57282718 100644 --- a/internal/resources/providers/msgraph/provider.go +++ b/internal/resources/providers/msgraph/provider.go @@ -94,7 +94,7 @@ func (p *provider) ListServicePrincipals(ctx context.Context) ([]*models.Service return true // to continue the iteration }) if err != nil { - p.log.Errorf("error iterating over Service Principals: %v", err) + p.log.Errorf(ctx, "error iterating over Service Principals: %v", err) } return items, nil } @@ -122,7 +122,7 @@ func (p *provider) ListDirectoryRoles(ctx context.Context) ([]*models.DirectoryR return true // to continue the iteration }) if err != nil { - p.log.Errorf("error iterating over Directory Roles: %v", err) + p.log.Errorf(ctx, "error iterating over Directory Roles: %v", err) } return items, nil } @@ -150,7 +150,7 @@ func (p *provider) ListGroups(ctx context.Context) ([]*models.Group, error) { return true // to continue the iteration }) if err != nil { - p.log.Errorf("error iterating over Groups: %v", err) + p.log.Errorf(ctx, "error iterating over Groups: %v", err) } return items, nil } @@ -178,7 +178,7 @@ func (p *provider) ListUsers(ctx context.Context) ([]*models.User, error) { return true // to continue the iteration }) if err != nil { - p.log.Errorf("error iterating over Users: %v", err) + p.log.Errorf(ctx, "error iterating over Users: %v", err) } return items, nil } diff --git a/internal/uniqueness/leaderelection.go b/internal/uniqueness/leaderelection.go index da0acf3b85..8b8fcc288c 100644 --- a/internal/uniqueness/leaderelection.go +++ b/internal/uniqueness/leaderelection.go @@ -74,13 +74,13 @@ func (m *LeaderelectionManager) Run(ctx context.Context) error { leConfig, err := m.buildConfig(newCtx) if err != nil { - m.log.Errorf("Fail building leader election config: %v", err) + m.log.Errorf(ctx, "Fail building leader election config: %v", err) return err } m.leader, err = le.NewLeaderElector(leConfig) if err != nil { - m.log.Errorf("Fail to create a new leader elector: %v", err) + m.log.Errorf(ctx, "Fail to create a new leader elector: %v", err) return err } diff --git a/internal/uniqueness/leaderelection_test.go b/internal/uniqueness/leaderelection_test.go index 5bfa291156..7ed8f03b16 100644 --- a/internal/uniqueness/leaderelection_test.go +++ b/internal/uniqueness/leaderelection_test.go @@ -18,7 +18,6 @@ package uniqueness import ( - "context" "fmt" "os" "strings" @@ -169,7 +168,7 @@ func (s *LeaderElectionTestSuite) TestManager_buildConfig() { s.Require().NoError(os.Setenv(PodNameEnvar, podId)) } - got, err := s.manager.buildConfig(context.TODO()) + got, err := s.manager.buildConfig(s.T().Context()) if (err != nil) != tt.wantErr { s.FailNow("unexpected error", "error: %v", err) } diff --git a/internal/vulnerability/events_creator.go b/internal/vulnerability/events_creator.go index 1d55e83053..ec0a12a334 100644 --- a/internal/vulnerability/events_creator.go +++ b/internal/vulnerability/events_creator.go @@ -203,7 +203,7 @@ func (e EventsCreator) CreateEvents(ctx context.Context, scanResults chan []Resu events := make([]beat.Event, 0, len(data)) for _, res := range data { - events = append(events, e.generateEvent(res.reportResult, res.vulnerability, res.snapshot.Instance, res.seq)) + events = append(events, e.generateEvent(ctx, res.reportResult, res.vulnerability, res.snapshot.Instance, res.seq)) } select { @@ -220,7 +220,7 @@ func (e EventsCreator) GetChan() chan []beat.Event { return e.ch } -func (e EventsCreator) generateEvent(reportResult trivyTypes.Result, vul trivyTypes.DetectedVulnerability, instance ec2.Ec2Instance, seq time.Time) beat.Event { +func (e EventsCreator) generateEvent(ctx context.Context, reportResult trivyTypes.Result, vul trivyTypes.DetectedVulnerability, instance ec2.Ec2Instance, seq time.Time) beat.Event { timestamp := time.Now().UTC() sequence := seq.Unix() @@ -250,7 +250,7 @@ func (e EventsCreator) generateEvent(reportResult trivyTypes.Result, vul trivyTy }) // TODO: Should we fail the event if we can't enrich the cloud section? if err != nil { - e.log.Errorf("failed to enrich cloud section: %v", err) + e.log.Errorf(ctx, "failed to enrich cloud section: %v", err) } hostSec, err := convertStructToMapStr(HostSection{ @@ -267,7 +267,7 @@ func (e EventsCreator) generateEvent(reportResult trivyTypes.Result, vul trivyTy }) // TODO: Should we fail the event if we can't enrich the host section? if err != nil { - e.log.Errorf("failed to enrich host section: %v", err) + e.log.Errorf(ctx, "failed to enrich host section: %v", err) } networkSec, err := convertStructToMapStr(NetworkSection{ @@ -277,7 +277,7 @@ func (e EventsCreator) generateEvent(reportResult trivyTypes.Result, vul trivyTy }) // TODO: Should we fail the event if we can't enrich the network section? if err != nil { - e.log.Errorf("failed to enrich network section: %v", err) + e.log.Errorf(ctx, "failed to enrich network section: %v", err) } event := beat.Event{ @@ -341,12 +341,12 @@ func (e EventsCreator) generateEvent(reportResult trivyTypes.Result, vul trivyTy err = e.cloudDataProvider.EnrichEvent(&event, fetching.ResourceMetadata{Region: instance.Region}) if err != nil { - e.log.Errorf("failed to enrich event with benchmark data provider: %v", err) + e.log.Errorf(ctx, "failed to enrich event with benchmark data provider: %v", err) } err = e.commonDataProvider.EnrichEvent(&event) if err != nil { - e.log.Errorf("failed to enrich event with global data provider: %v", err) + e.log.Errorf(ctx, "failed to enrich event with global data provider: %v", err) } return event diff --git a/internal/vulnerability/fetcher.go b/internal/vulnerability/fetcher.go index cdd275dd78..dd66db8d2b 100644 --- a/internal/vulnerability/fetcher.go +++ b/internal/vulnerability/fetcher.go @@ -51,14 +51,14 @@ func (f VulnerabilityFetcher) FetchInstances(ctx context.Context) error { f.log.Info("Starting VulnerabilityFetcher.FetchInstances") ins, err := f.provider.DescribeInstances(ctx) if err != nil { - f.log.Errorf("VulnerabilityFetcher.FetchInstances DescribeInstances failed: %v", err) + f.log.Errorf(ctx, "VulnerabilityFetcher.FetchInstances DescribeInstances failed: %v", err) return err } f.log.Infof("VulnerabilityFetcher.FetchInstances found %d results", len(ins)) err = f.attachRootVolumes(ctx, ins) if err != nil { - f.log.Errorf("VulnerabilityFetcher.FetchInstances attachRootVolumes failed: %v", err) + f.log.Errorf(ctx, "VulnerabilityFetcher.FetchInstances attachRootVolumes failed: %v", err) } else { f.sortByRootVolumeSize(ins) } diff --git a/internal/vulnerability/replicator.go b/internal/vulnerability/replicator.go index 342a9f25e8..7a17a88ea1 100644 --- a/internal/vulnerability/replicator.go +++ b/internal/vulnerability/replicator.go @@ -55,7 +55,7 @@ func (f VulnerabilityReplicator) SnapshotInstance(ctx context.Context, insCh cha } sp, err := f.manager.CreateSnapshots(ctx, data) if err != nil { - f.log.Errorf("VulnerabilityReplicator.SnapshotInstance.CreateSnapshots failed: %v", err) + f.log.Errorf(ctx, "VulnerabilityReplicator.SnapshotInstance.CreateSnapshots failed: %v", err) continue } diff --git a/internal/vulnerability/runner.go b/internal/vulnerability/runner.go index 86649e4dd3..bffac5f5d1 100644 --- a/internal/vulnerability/runner.go +++ b/internal/vulnerability/runner.go @@ -40,7 +40,7 @@ func NewVulnerabilityRunner(ctx context.Context, log *clog.Logger) (Vulnerabilit log.Debug("NewVulnerabilityRunner: New") if err := clearTrivyCache(ctx, log); err != nil { - log.Errorf("error during runner cache clearing %s", err.Error()) + log.Errorf(ctx, "error during runner cache clearing %s", err.Error()) } opts := flag.Options{ @@ -69,7 +69,7 @@ func NewVulnerabilityRunner(ctx context.Context, log *clog.Logger) (Vulnerabilit runner, err := artifact.NewRunner(ctx, opts, artifact.TargetFilesystem) if err != nil { - log.Error("NewVulnerabilityRunner: NewRunner error: ", err) + log.Error(ctx, "NewVulnerabilityRunner: NewRunner error: ", err) return VulnerabilityRunner{}, err } diff --git a/internal/vulnerability/scanner.go b/internal/vulnerability/scanner.go index 94540bcc4a..e734d920ba 100644 --- a/internal/vulnerability/scanner.go +++ b/internal/vulnerability/scanner.go @@ -99,15 +99,15 @@ func (f VulnerabilityScanner) ScanSnapshot(ctx context.Context, snapCh chan ec2. func (f VulnerabilityScanner) scan(ctx context.Context, snap ec2.EBSSnapshot) { f.log.Infof("Starting VulnerabilityScanner.scan, %s", snap.SnapshotId) - defer func() { + defer func(ctx context.Context) { if r := recover(); r != nil { - f.log.Errorf("vulnerability scanner recovered from panic: %v", r) + f.log.Errorf(ctx, "vulnerability scanner recovered from panic: %v", r) } - }() + }(ctx) o, err := os.CreateTemp("", "") if err != nil { - f.log.Error("VulnerabilityScanner.scan.TempFile error: ", err) + f.log.Error(ctx, "VulnerabilityScanner.scan.TempFile error: ", err) return } defer func(name string) { @@ -158,28 +158,28 @@ func (f VulnerabilityScanner) scan(ctx context.Context, snap ec2.EBSSnapshot) { ) if err != nil { - f.log.Errorf("VulnerabilityScanner.scan.ScanVM, snapshotId: %s, instanceId: %s, error: %v", snap.SnapshotId, *snap.Instance.InstanceId, err) + f.log.Errorf(ctx, "VulnerabilityScanner.scan.ScanVM, snapshotId: %s, instanceId: %s, error: %v", snap.SnapshotId, *snap.Instance.InstanceId, err) return } f.log.Info("VulnerabilityScanner.scan.Filter") report, err = f.runner.Filter(ctx, opts, report) if err != nil { - f.log.Error("VulnerabilityScanner.scan.Filter error: ", err) + f.log.Error(ctx, "VulnerabilityScanner.scan.Filter error: ", err) return } f.log.Info("VulnerabilityScanner.scan.Report") err = f.runner.Report(ctx, opts, report) if err != nil { - f.log.Error("VulnerabilityScanner.scan.Report error: ", err) + f.log.Error(ctx, "VulnerabilityScanner.scan.Report error: ", err) return } f.log.Info("VulnerabilityScanner.scan.jsonFile") jsonFile, err := os.Open(o.Name()) if err != nil { - f.log.Error("VulnerabilityScanner.scan.jsonFile error: ", err) + f.log.Error(ctx, "VulnerabilityScanner.scan.jsonFile error: ", err) return } @@ -189,7 +189,7 @@ func (f VulnerabilityScanner) scan(ctx context.Context, snap ec2.EBSSnapshot) { var unmarshalledReport trivy_types.Report err = json.Unmarshal(byteValue, &unmarshalledReport) if err != nil { - f.log.Error("VulnerabilityScanner.scan.Unmarshal error: ", err) + f.log.Error(ctx, "VulnerabilityScanner.scan.Unmarshal error: ", err) return } diff --git a/internal/vulnerability/snapshot.go b/internal/vulnerability/snapshot.go index 01c78ec114..d1df2a043f 100644 --- a/internal/vulnerability/snapshot.go +++ b/internal/vulnerability/snapshot.go @@ -127,7 +127,7 @@ func (s *SnapshotManager) delete(ctx context.Context, snapshot ec2.EBSSnapshot, s.logger.Infof("VulnerabilityScanner.manager.%s %s", message, snapshot.SnapshotId) err := s.provider.DeleteSnapshot(ctx, snapshot) if err != nil { - s.logger.Errorf("VulnerabilityScanner.manager.%s %s error: %s", message, snapshot.SnapshotId, err) + s.logger.Errorf(ctx, "VulnerabilityScanner.manager.%s %s error: %s", message, snapshot.SnapshotId, err) } } diff --git a/internal/vulnerability/verifier.go b/internal/vulnerability/verifier.go index 1c9926339a..d5b5f856e5 100644 --- a/internal/vulnerability/verifier.go +++ b/internal/vulnerability/verifier.go @@ -102,7 +102,7 @@ func (f VulnerabilityVerifier) verify(ctx context.Context, snap ec2.EBSSnapshot) case <-time.After(f.interval): sp, err := f.provider.DescribeSnapshots(ctx, snap) if err != nil { - f.log.Errorf("VulnerabilityVerifier.verify.DescribeSnapshots failed: %v", err) + f.log.Errorf(ctx, "VulnerabilityVerifier.verify.DescribeSnapshots failed: %v", err) continue } // TODO: Add a layer of "smart" cache to avoid checking and sending the same snapshot diff --git a/internal/vulnerability/worker.go b/internal/vulnerability/worker.go index 443ee6998a..84921d166d 100644 --- a/internal/vulnerability/worker.go +++ b/internal/vulnerability/worker.go @@ -145,7 +145,7 @@ func (f *VulnerabilityWorker) Run(ctx context.Context) { defer wg.Done() err := job.fn(ctx) if err != nil { - f.log.Errorf("VulnerabilityWorker.work job %s failed: %s", job.name, err.Error()) + f.log.Errorf(ctx, "VulnerabilityWorker.work job %s failed: %s", job.name, err.Error()) } else { f.log.Infof("VulnerabilityWorker.work job %s finished", job.name) }