Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import (
"sync"
"time"

"gopkg.in/yaml.v3"
"k8s.io/utils/path"
"opslevel-agent/config"

"github.com/go-resty/resty/v2"
"github.com/opslevel/opslevel-go/v2025"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/automaxprocs/maxprocs"
"gopkg.in/yaml.v3"
"k8s.io/utils/path"
"opslevel-agent/config"
"opslevel-agent/signal"
"opslevel-agent/workers"
)
Expand Down Expand Up @@ -54,7 +54,7 @@ opslevel-agent commit: %s (%s)
var wg sync.WaitGroup
ctx := signal.Init(context.Background())
// go workers.NewWebhookWorker().Run(ctx, &wg)
go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newClient(), resync, flush)
go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newGraphClient(), newRESTClient(), resync, flush)

time.Sleep(1 * time.Second)
wg.Wait()
Expand Down Expand Up @@ -165,7 +165,14 @@ func LoadConfig() (*config.Configuration, error) {
return &output, nil
}

func newClient() *opslevel.Client {
func newRESTClient() *resty.Client {
return opslevel.NewRestClient(
opslevel.SetUserAgentExtra(fmt.Sprintf("agent-%s", _version)),
opslevel.SetTimeout(time.Second*time.Duration(viper.GetInt("api-timeout"))),
)
}

func newGraphClient() *opslevel.Client {
client := opslevel.NewGQLClient(
opslevel.SetAPIToken(viper.GetString("api-token")),
opslevel.SetURL(viper.GetString("api-url")),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.24.0
toolchain go1.24.2

require (
github.com/go-resty/resty/v2 v2.16.5
github.com/opslevel/opslevel-go/v2025 v2025.6.13
github.com/rs/zerolog v1.34.0
github.com/spf13/cobra v1.9.1
Expand All @@ -31,7 +32,6 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.26.0 // indirect
github.com/go-resty/resty/v2 v2.16.5 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/gnostic-models v0.6.9 // indirect
Expand Down
59 changes: 46 additions & 13 deletions workers/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package workers
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/go-resty/resty/v2"

"github.com/spf13/viper"

"github.com/opslevel/opslevel-go/v2025"
Expand All @@ -17,30 +21,39 @@ import (
type K8SWorker struct {
cluster string
integration string
client *opslevel.Client
gqlClient *opslevel.Client
restClient *resty.Client
}

func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, client *opslevel.Client, resync, flush time.Duration) {
func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, gqlClient *opslevel.Client, restClient *resty.Client, resync, flush time.Duration) {
controller.Run(ctx, wg, selectors, resync, flush, &K8SWorker{
client: client,
cluster: cluster,
integration: integration,
gqlClient: gqlClient,
restClient: restClient,
})
}

func (s *K8SWorker) Handle(evt controller.Event) {
kind := evt.ExternalKind()
id := evt.ExternalID(s.cluster)

switch evt.Op {
case controller.OpCreate, controller.OpUpdate:
value, err := s.parse(evt.New)
if err != nil {
log.Error().Err(err).Msgf("failed to convert k8s resource")
if strings.Contains(s.integration, "integrations/custom/webhook") {
switch evt.Op {
case controller.OpCreate, controller.OpUpdate:
s.sendEvent(kind, id, evt.New)
}
} else {
switch evt.Op {
case controller.OpCreate, controller.OpUpdate:
value, err := s.parse(evt.New)
if err != nil {
log.Error().Err(err).Msgf("failed to convert k8s resource")
}
s.sendUpsert(kind, id, value)
case controller.OpDelete:
s.sendDelete(kind, id)
}
s.sendUpsert(kind, id, value)
case controller.OpDelete:
s.sendDelete(kind, id)
}
}

Expand All @@ -61,6 +74,26 @@ func (s *K8SWorker) parse(item *unstructured.Unstructured) (opslevel.JSON, error
return data, nil
}

func (s *K8SWorker) sendEvent(kind string, id string, value *unstructured.Unstructured) {
kind = strings.Replace(kind, "/", "_", -1)
if viper.GetBool("dry-run") {
log.Info().Msgf("[DRYRUN] POST %s | %s", kind, id)
log.Debug().Msgf("\t%#v", value)
} else {
url := fmt.Sprintf("%s?external_kind=%s", s.integration, kind)
resp, err := s.restClient.R().SetBody(value).Post(url)
if err != nil {
log.Error().Err(err).Msgf("error during post")
return
}
if resp.StatusCode() > 299 {
log.Error().Msgf("%v", resp)
return
}
log.Info().Msgf("POST %s | %s", kind, id)
}
}

func (s *K8SWorker) sendUpsert(kind string, id string, value opslevel.JSON) {
var m struct {
Payload struct {
Expand All @@ -78,7 +111,7 @@ func (s *K8SWorker) sendUpsert(kind string, id string, value opslevel.JSON) {
log.Debug().Msgf("\t%#v", value)
} else {
log.Info().Msgf("UPSERT %s | %s", kind, id)
err := s.client.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectUpsert"))
err := s.gqlClient.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectUpsert"))
if err != nil {
log.Error().Err(err).Msgf("error during upsert mutate")
}
Expand All @@ -100,7 +133,7 @@ func (s *K8SWorker) sendDelete(kind string, id string) {
log.Info().Msgf("[DRYRUN] DELETE %s | %s ", kind, id)
} else {
log.Info().Msgf("DELETE %s | %s ", kind, id)
err := s.client.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectDelete"))
err := s.gqlClient.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectDelete"))
if err != nil {
log.Error().Err(err).Msgf("error during delete mutate")
}
Expand Down
Loading