From b85d7932229e1ee007915d0c161bd2e31d35238f Mon Sep 17 00:00:00 2001 From: Kyle Rockman Date: Fri, 18 Jul 2025 11:54:59 -0400 Subject: [PATCH 1/3] Add ability to pipe the agent to a custom integration for data mapping --- cmd/root.go | 12 +++++++++++- workers/k8s.go | 52 ++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 16d2b54..730f0dc 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -3,6 +3,7 @@ package cmd import ( "context" "fmt" + "github.com/go-resty/resty/v2" "os" "runtime" "strings" @@ -54,7 +55,7 @@ opslevel-agent commit: %s (%s) var wg sync.WaitGroup ctx := signal.Init(context.Background()) // go workers.NewWebhookWorker().Run(ctx, &wg) - go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newClient(), resync, flush) + go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newClient(), newRESTClient(), resync, flush) time.Sleep(1 * time.Second) wg.Wait() @@ -165,6 +166,15 @@ func LoadConfig() (*config.Configuration, error) { return &output, nil } +func newRESTClient() *resty.Client { + client := opslevel.NewRestClient( + opslevel.SetURL(viper.GetString("api-url")), + opslevel.SetUserAgentExtra(fmt.Sprintf("agent-%s", _version)), + opslevel.SetTimeout(time.Second*time.Duration(viper.GetInt("api-timeout"))), + ) + return client +} + func newClient() *opslevel.Client { client := opslevel.NewGQLClient( opslevel.SetAPIToken(viper.GetString("api-token")), diff --git a/workers/k8s.go b/workers/k8s.go index 8c58eeb..2cf9626 100644 --- a/workers/k8s.go +++ b/workers/k8s.go @@ -3,6 +3,9 @@ package workers import ( "context" "encoding/json" + "fmt" + "github.com/go-resty/resty/v2" + "strings" "sync" "time" @@ -18,13 +21,15 @@ type K8SWorker struct { cluster string integration string client *opslevel.Client + rest *resty.Client } -func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, client *opslevel.Client, resync, flush time.Duration) { +func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, client *opslevel.Client, rest *resty.Client, resync, flush time.Duration) { controller.Run(ctx, wg, selectors, resync, flush, &K8SWorker{ - client: client, cluster: cluster, integration: integration, + client: client, + rest: rest, }) } @@ -32,15 +37,22 @@ func (s *K8SWorker) Handle(evt controller.Event) { kind := evt.ExternalKind() id := evt.ExternalID(s.cluster) - switch evt.Op { - case controller.OpCreate, controller.OpUpdate: - value, err := s.parse(evt.New) - if err != nil { - log.Error().Err(err).Msgf("failed to convert k8s resource") + if strings.Contains(s.integration, "integrations/custom/webhook") { + switch evt.Op { + case controller.OpCreate, controller.OpUpdate: + s.sendEvent(kind, id, evt.New) + } + } else { + switch evt.Op { + case controller.OpCreate, controller.OpUpdate: + value, err := s.parse(evt.New) + if err != nil { + log.Error().Err(err).Msgf("failed to convert k8s resource") + } + s.sendUpsert(kind, id, value) + case controller.OpDelete: + s.sendDelete(kind, id) } - s.sendUpsert(kind, id, value) - case controller.OpDelete: - s.sendDelete(kind, id) } } @@ -61,6 +73,26 @@ func (s *K8SWorker) parse(item *unstructured.Unstructured) (opslevel.JSON, error return data, nil } +func (s *K8SWorker) sendEvent(kind string, id string, value *unstructured.Unstructured) { + kind = strings.Replace(kind, "/", "_", -1) + if viper.GetBool("dry-run") { + log.Info().Msgf("[DRYRUN] POST %s | %s", kind, id) + log.Debug().Msgf("\t%#v", value) + } else { + url := fmt.Sprintf("%s?external_kind=%s", s.integration, kind) + resp, err := s.rest.R().SetBody(value).Post(url) + if err != nil { + log.Error().Err(err).Msgf("error during post") + return + } + if resp.StatusCode() > 299 { + log.Error().Msgf("%v", resp) + return + } + log.Info().Msgf("POST %s | %s", kind, id) + } +} + func (s *K8SWorker) sendUpsert(kind string, id string, value opslevel.JSON) { var m struct { Payload struct { From f763f51c23b4b2081028f3af87940ffd87d69ba7 Mon Sep 17 00:00:00 2001 From: Kyle Rockman Date: Tue, 22 Jul 2025 08:57:16 -0400 Subject: [PATCH 2/3] review feedback --- README.md | 18 ++++++++++++++++++ cmd/root.go | 17 +++++++---------- go.mod | 2 +- workers/k8s.go | 19 ++++++++++--------- 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 30c55cb..cd7b188 100644 --- a/README.md +++ b/README.md @@ -32,3 +32,21 @@ functionalities for OpsLevel. The current functionalities are: | Name | Type | Description | |---------------------------------|-------------|---------------------------------------------------------------| + + + + + +--- +extractors: +- external_kind: apps_v1_Deployment + external_id: ".metadata.uid" +--- +transforms: +- external_kind: apps_v1_Deployment + opslevel_kind: service + opslevel_identifier: ".metadata.name" + on_component_not_found: suggest + properties: + namespace: ".metadata.namespace" + containers: ".spec.template.spec | .containers + .initContainers | map(.image)" \ No newline at end of file diff --git a/cmd/root.go b/cmd/root.go index 730f0dc..40674fd 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -3,23 +3,22 @@ package cmd import ( "context" "fmt" - "github.com/go-resty/resty/v2" "os" "runtime" "strings" "sync" "time" - "gopkg.in/yaml.v3" - "k8s.io/utils/path" - "opslevel-agent/config" - + "github.com/go-resty/resty/v2" "github.com/opslevel/opslevel-go/v2025" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/spf13/viper" "go.uber.org/automaxprocs/maxprocs" + "gopkg.in/yaml.v3" + "k8s.io/utils/path" + "opslevel-agent/config" "opslevel-agent/signal" "opslevel-agent/workers" ) @@ -55,7 +54,7 @@ opslevel-agent commit: %s (%s) var wg sync.WaitGroup ctx := signal.Init(context.Background()) // go workers.NewWebhookWorker().Run(ctx, &wg) - go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newClient(), newRESTClient(), resync, flush) + go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newGraphClient(), newRESTClient(), resync, flush) time.Sleep(1 * time.Second) wg.Wait() @@ -167,15 +166,13 @@ func LoadConfig() (*config.Configuration, error) { } func newRESTClient() *resty.Client { - client := opslevel.NewRestClient( - opslevel.SetURL(viper.GetString("api-url")), + return opslevel.NewRestClient( opslevel.SetUserAgentExtra(fmt.Sprintf("agent-%s", _version)), opslevel.SetTimeout(time.Second*time.Duration(viper.GetInt("api-timeout"))), ) - return client } -func newClient() *opslevel.Client { +func newGraphClient() *opslevel.Client { client := opslevel.NewGQLClient( opslevel.SetAPIToken(viper.GetString("api-token")), opslevel.SetURL(viper.GetString("api-url")), diff --git a/go.mod b/go.mod index fe7955a..2e788ce 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24.0 toolchain go1.24.2 require ( + github.com/go-resty/resty/v2 v2.16.5 github.com/opslevel/opslevel-go/v2025 v2025.6.13 github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.9.1 @@ -31,7 +32,6 @@ require ( github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.26.0 // indirect - github.com/go-resty/resty/v2 v2.16.5 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/gnostic-models v0.6.9 // indirect diff --git a/workers/k8s.go b/workers/k8s.go index 2cf9626..688da69 100644 --- a/workers/k8s.go +++ b/workers/k8s.go @@ -4,11 +4,12 @@ import ( "context" "encoding/json" "fmt" - "github.com/go-resty/resty/v2" "strings" "sync" "time" + "github.com/go-resty/resty/v2" + "github.com/spf13/viper" "github.com/opslevel/opslevel-go/v2025" @@ -20,16 +21,16 @@ import ( type K8SWorker struct { cluster string integration string - client *opslevel.Client - rest *resty.Client + gqlClient *opslevel.Client + restClient *resty.Client } -func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, client *opslevel.Client, rest *resty.Client, resync, flush time.Duration) { +func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, gqlClient *opslevel.Client, restClient *resty.Client, resync, flush time.Duration) { controller.Run(ctx, wg, selectors, resync, flush, &K8SWorker{ cluster: cluster, integration: integration, - client: client, - rest: rest, + gqlClient: gqlClient, + restClient: restClient, }) } @@ -80,7 +81,7 @@ func (s *K8SWorker) sendEvent(kind string, id string, value *unstructured.Unstru log.Debug().Msgf("\t%#v", value) } else { url := fmt.Sprintf("%s?external_kind=%s", s.integration, kind) - resp, err := s.rest.R().SetBody(value).Post(url) + resp, err := s.restClient.R().SetBody(value).Post(url) if err != nil { log.Error().Err(err).Msgf("error during post") return @@ -110,7 +111,7 @@ func (s *K8SWorker) sendUpsert(kind string, id string, value opslevel.JSON) { log.Debug().Msgf("\t%#v", value) } else { log.Info().Msgf("UPSERT %s | %s", kind, id) - err := s.client.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectUpsert")) + err := s.gqlClient.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectUpsert")) if err != nil { log.Error().Err(err).Msgf("error during upsert mutate") } @@ -132,7 +133,7 @@ func (s *K8SWorker) sendDelete(kind string, id string) { log.Info().Msgf("[DRYRUN] DELETE %s | %s ", kind, id) } else { log.Info().Msgf("DELETE %s | %s ", kind, id) - err := s.client.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectDelete")) + err := s.gqlClient.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectDelete")) if err != nil { log.Error().Err(err).Msgf("error during delete mutate") } From f3fcf707f87e88731289e78ac0cad2b48fd0a54a Mon Sep 17 00:00:00 2001 From: Kyle Rockman Date: Tue, 22 Jul 2025 09:02:08 -0400 Subject: [PATCH 3/3] fix readme --- README.md | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/README.md b/README.md index cd7b188..30c55cb 100644 --- a/README.md +++ b/README.md @@ -32,21 +32,3 @@ functionalities for OpsLevel. The current functionalities are: | Name | Type | Description | |---------------------------------|-------------|---------------------------------------------------------------| - - - - - ---- -extractors: -- external_kind: apps_v1_Deployment - external_id: ".metadata.uid" ---- -transforms: -- external_kind: apps_v1_Deployment - opslevel_kind: service - opslevel_identifier: ".metadata.name" - on_component_not_found: suggest - properties: - namespace: ".metadata.namespace" - containers: ".spec.template.spec | .containers + .initContainers | map(.image)" \ No newline at end of file