diff --git a/cmd/root.go b/cmd/root.go index 16d2b54..40674fd 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -9,16 +9,16 @@ import ( "sync" "time" - "gopkg.in/yaml.v3" - "k8s.io/utils/path" - "opslevel-agent/config" - + "github.com/go-resty/resty/v2" "github.com/opslevel/opslevel-go/v2025" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/spf13/viper" "go.uber.org/automaxprocs/maxprocs" + "gopkg.in/yaml.v3" + "k8s.io/utils/path" + "opslevel-agent/config" "opslevel-agent/signal" "opslevel-agent/workers" ) @@ -54,7 +54,7 @@ opslevel-agent commit: %s (%s) var wg sync.WaitGroup ctx := signal.Init(context.Background()) // go workers.NewWebhookWorker().Run(ctx, &wg) - go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newClient(), resync, flush) + go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newGraphClient(), newRESTClient(), resync, flush) time.Sleep(1 * time.Second) wg.Wait() @@ -165,7 +165,14 @@ func LoadConfig() (*config.Configuration, error) { return &output, nil } -func newClient() *opslevel.Client { +func newRESTClient() *resty.Client { + return opslevel.NewRestClient( + opslevel.SetUserAgentExtra(fmt.Sprintf("agent-%s", _version)), + opslevel.SetTimeout(time.Second*time.Duration(viper.GetInt("api-timeout"))), + ) +} + +func newGraphClient() *opslevel.Client { client := opslevel.NewGQLClient( opslevel.SetAPIToken(viper.GetString("api-token")), opslevel.SetURL(viper.GetString("api-url")), diff --git a/go.mod b/go.mod index fe7955a..2e788ce 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24.0 toolchain go1.24.2 require ( + github.com/go-resty/resty/v2 v2.16.5 github.com/opslevel/opslevel-go/v2025 v2025.6.13 github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.9.1 @@ -31,7 +32,6 @@ require ( github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.26.0 // indirect - github.com/go-resty/resty/v2 v2.16.5 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/gnostic-models v0.6.9 // indirect diff --git a/workers/k8s.go b/workers/k8s.go index 8c58eeb..688da69 100644 --- a/workers/k8s.go +++ b/workers/k8s.go @@ -3,9 +3,13 @@ package workers import ( "context" "encoding/json" + "fmt" + "strings" "sync" "time" + "github.com/go-resty/resty/v2" + "github.com/spf13/viper" "github.com/opslevel/opslevel-go/v2025" @@ -17,14 +21,16 @@ import ( type K8SWorker struct { cluster string integration string - client *opslevel.Client + gqlClient *opslevel.Client + restClient *resty.Client } -func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, client *opslevel.Client, resync, flush time.Duration) { +func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, gqlClient *opslevel.Client, restClient *resty.Client, resync, flush time.Duration) { controller.Run(ctx, wg, selectors, resync, flush, &K8SWorker{ - client: client, cluster: cluster, integration: integration, + gqlClient: gqlClient, + restClient: restClient, }) } @@ -32,15 +38,22 @@ func (s *K8SWorker) Handle(evt controller.Event) { kind := evt.ExternalKind() id := evt.ExternalID(s.cluster) - switch evt.Op { - case controller.OpCreate, controller.OpUpdate: - value, err := s.parse(evt.New) - if err != nil { - log.Error().Err(err).Msgf("failed to convert k8s resource") + if strings.Contains(s.integration, "integrations/custom/webhook") { + switch evt.Op { + case controller.OpCreate, controller.OpUpdate: + s.sendEvent(kind, id, evt.New) + } + } else { + switch evt.Op { + case controller.OpCreate, controller.OpUpdate: + value, err := s.parse(evt.New) + if err != nil { + log.Error().Err(err).Msgf("failed to convert k8s resource") + } + s.sendUpsert(kind, id, value) + case controller.OpDelete: + s.sendDelete(kind, id) } - s.sendUpsert(kind, id, value) - case controller.OpDelete: - s.sendDelete(kind, id) } } @@ -61,6 +74,26 @@ func (s *K8SWorker) parse(item *unstructured.Unstructured) (opslevel.JSON, error return data, nil } +func (s *K8SWorker) sendEvent(kind string, id string, value *unstructured.Unstructured) { + kind = strings.Replace(kind, "/", "_", -1) + if viper.GetBool("dry-run") { + log.Info().Msgf("[DRYRUN] POST %s | %s", kind, id) + log.Debug().Msgf("\t%#v", value) + } else { + url := fmt.Sprintf("%s?external_kind=%s", s.integration, kind) + resp, err := s.restClient.R().SetBody(value).Post(url) + if err != nil { + log.Error().Err(err).Msgf("error during post") + return + } + if resp.StatusCode() > 299 { + log.Error().Msgf("%v", resp) + return + } + log.Info().Msgf("POST %s | %s", kind, id) + } +} + func (s *K8SWorker) sendUpsert(kind string, id string, value opslevel.JSON) { var m struct { Payload struct { @@ -78,7 +111,7 @@ func (s *K8SWorker) sendUpsert(kind string, id string, value opslevel.JSON) { log.Debug().Msgf("\t%#v", value) } else { log.Info().Msgf("UPSERT %s | %s", kind, id) - err := s.client.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectUpsert")) + err := s.gqlClient.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectUpsert")) if err != nil { log.Error().Err(err).Msgf("error during upsert mutate") } @@ -100,7 +133,7 @@ func (s *K8SWorker) sendDelete(kind string, id string) { log.Info().Msgf("[DRYRUN] DELETE %s | %s ", kind, id) } else { log.Info().Msgf("DELETE %s | %s ", kind, id) - err := s.client.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectDelete")) + err := s.gqlClient.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectDelete")) if err != nil { log.Error().Err(err).Msgf("error during delete mutate") }