Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ jobs:
- uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v5
with:
go-version-file: go.mod
cache-dependency-path: go.sum
cache-dependency-path: |
go.sum
addons/processors/skeleton/go.sum
addons/processors/sql-processor/go.sum
addons/processors/iceberg-processor/go.sum

- name: Prepare Go build cache
run: mkdir -p "$GOCACHE"
Expand All @@ -57,6 +61,9 @@ jobs:
- name: Run go test -race ./...
run: go test -race ./...

- name: Run nested module tests
run: make test-nested-modules

go-coverage:
name: Go Coverage Gate
runs-on: ubuntu-latest
Expand Down
17 changes: 8 additions & 9 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ on:
- main
workflow_dispatch:

permissions:
actions: read
contents: read
security-events: write

jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
Expand All @@ -44,10 +43,10 @@ jobs:

steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4

- name: Initialize CodeQL
uses: github/codeql-action/init@v4
uses: github/codeql-action/init@c6f931105cb2c34c8f901cc885ba1e2e259cf745 # v4
with:
languages: ${{ matrix.language }}
queries: security-extended,security-and-quality
Expand All @@ -59,9 +58,9 @@ jobs:
- 'third_party/**'

- name: Autobuild
uses: github/codeql-action/autobuild@v4
uses: github/codeql-action/autobuild@c6f931105cb2c34c8f901cc885ba1e2e259cf745 # v4

- name: Analyze
uses: github/codeql-action/analyze@v4
uses: github/codeql-action/analyze@c6f931105cb2c34c8f901cc885ba1e2e259cf745 # v4
with:
category: "/language:${{ matrix.language }}"
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ on:
- 'v*'
workflow_dispatch:

permissions: read-all
permissions: {}

jobs:
build-and-push:
Expand Down
12 changes: 10 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ LOCAL_NODE_BIN := $(LOCAL_NODE_DIR)/bin
LOCAL_NODE := $(LOCAL_NODE_BIN)/node
LOCAL_NPM := $(LOCAL_NODE_BIN)/npm

.PHONY: proto build test tidy lint generate build-sdk docker-build docker-build-e2e-client docker-build-etcd-tools docker-clean ensure-minio start-minio stop-containers release-broker-ports test-produce-consume test-produce-consume-debug test-consumer-group test-ops-api test-mcp test-multi-segment-durability test-full test-operator test-acl demo demo-platform demo-platform-bootstrap iceberg-demo kafsql-demo platform-demo help clean-kind-all ensure-local-node check vet race fmt fmt-check test-fuzz code-ql code-ql-summary code-ql-gate commit-check
.PHONY: proto build test test-nested-modules tidy lint generate build-sdk docker-build docker-build-e2e-client docker-build-etcd-tools docker-clean ensure-minio start-minio stop-containers release-broker-ports test-produce-consume test-produce-consume-debug test-consumer-group test-ops-api test-mcp test-multi-segment-durability test-full test-operator test-acl demo demo-platform demo-platform-bootstrap iceberg-demo kafsql-demo platform-demo help clean-kind-all ensure-local-node check vet race fmt fmt-check test-fuzz code-ql code-ql-summary code-ql-gate commit-check

REGISTRY ?= ghcr.io/kafscale
STAMP_DIR ?= .build
Expand Down Expand Up @@ -164,6 +164,14 @@ test: ## Run unit tests + vet + race
@echo "race passed."
@echo "test passed."

test-nested-modules: ## Run go test across nested Go modules under addons/processors
@set -e; \
for dir in addons/processors/skeleton addons/processors/sql-processor addons/processors/iceberg-processor; do \
echo "==> $$dir: go test ./..."; \
( cd $$dir && go test ./... ); \
done; \
echo "nested module tests passed."

vet: ## Run go vet
@echo "==> go vet"
@go vet ./...
Expand Down Expand Up @@ -212,7 +220,7 @@ code-ql-gate: code-ql ## Fail if CodeQL reports any error findings
fi; \
echo "CodeQL gate passed: no error findings found."

commit-check: ensure-local-node check fmt test test-fuzz code-ql-gate ## Run pre-commit quality gates
commit-check: ensure-local-node check fmt test test-nested-modules test-fuzz code-ql-gate ## Run pre-commit quality gates
@echo "commit-check passed."

test-acl: ## Run ACL e2e test (requires KAFSCALE_E2E=1)
Expand Down
4 changes: 2 additions & 2 deletions addons/processors/iceberg-processor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.25-alpine AS build
FROM golang:1.25-alpine@sha256:8e02eb337d9e0ea459e041f1ee5eece41cbb61f1d83e7d883a3e2fb4862063fa AS build
RUN apk add --no-cache git
ARG REPO_ROOT=.
ARG MODULE_DIR=.
Expand All @@ -36,7 +36,7 @@ RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build ${GO_BUILD_FLAGS} -o /out/iceberg-processor ./cmd/processor

FROM alpine:3.19
FROM alpine:3.19@sha256:6baf43584bcb78f2e5847d1de515f23499913ac9f12bdf834811a3145eb11ca1
WORKDIR /app
COPY --from=build /out/iceberg-processor /app/iceberg-processor
USER 65532:65532
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ config:
uri: https://iceberg-catalog.example.com
token: ""
username: ""
password: ""
password: null
warehouse: s3://iceberg-warehouse/production
offsets:
backend: etcd
Expand All @@ -68,7 +68,7 @@ config:
endpoints:
- http://etcd.kafscale.svc.cluster.local:2379
username: ""
password: ""
password: null
schema:
mode: "off"
registry:
Expand Down
2 changes: 1 addition & 1 deletion addons/processors/iceberg-processor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ require (
google.golang.org/genproto v0.0.0-20250715232539-7130f93afb79 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251213004720-97cd9d5aeac2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251213004720-97cd9d5aeac2 // indirect
google.golang.org/grpc v1.79.1 // indirect
google.golang.org/grpc v1.79.3 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)

Expand Down
2 changes: 2 additions & 0 deletions addons/processors/iceberg-processor/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20251213004720-97cd9d5aeac2 h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20251213004720-97cd9d5aeac2/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY=
google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE=
google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
Expand Down
19 changes: 15 additions & 4 deletions addons/processors/iceberg-processor/internal/sink/iceberg.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,27 +451,38 @@ type loggingTransport struct {
base http.RoundTripper
}

func sanitizeLogValue(value string) string {
value = strings.ReplaceAll(value, "\n", "\\n")
value = strings.ReplaceAll(value, "\r", "\\r")
return value
}

func (t *loggingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
base := t.base
if base == nil {
base = http.DefaultTransport
}
method := sanitizeLogValue(req.Method)
urlText := ""
if req.URL != nil {
urlText = sanitizeLogValue(req.URL.Redacted())
}
resp, err := base.RoundTrip(req)
if err != nil {
log.Printf("iceberg-rest http %s %s failed: %v", req.Method, req.URL, err)
log.Printf("iceberg-rest http %s %s failed: %v", method, urlText, err)
return resp, err
}
if resp.StatusCode >= 400 {
body, readErr := io.ReadAll(io.LimitReader(resp.Body, 8192))
_ = resp.Body.Close()
resp.Body = io.NopCloser(bytes.NewReader(body))
if readErr != nil {
log.Printf("iceberg-rest http %s %s -> %d (read error: %v)", req.Method, req.URL, resp.StatusCode, readErr)
log.Printf("iceberg-rest http %s %s -> %d (read error: %v)", method, urlText, resp.StatusCode, readErr)
} else {
log.Printf("iceberg-rest http %s %s -> %d body=%s", req.Method, req.URL, resp.StatusCode, strings.TrimSpace(string(body)))
log.Printf("iceberg-rest http %s %s -> %d body=%s", method, urlText, resp.StatusCode, sanitizeLogValue(strings.TrimSpace(string(body))))
}
} else {
log.Printf("iceberg-rest http %s %s -> %d", req.Method, req.URL, resp.StatusCode)
log.Printf("iceberg-rest http %s %s -> %d", method, urlText, resp.StatusCode)
}
return resp, nil
}
Expand Down
3 changes: 1 addition & 2 deletions addons/processors/skeleton/internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,8 @@ func (p *Processor) Run(ctx context.Context) error {
continue
}

if dropped := len(records); dropped > 0 {
if len(records) > 0 {
records = filterRecords(records, state.Offset)
dropped -= len(records)
}
if len(records) == 0 {
continue
Expand Down
15 changes: 7 additions & 8 deletions addons/processors/sql-processor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,21 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
go.opentelemetry.io/otel/metric v1.40.0 // indirect
go.opentelemetry.io/otel/sdk v1.40.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect
go.opentelemetry.io/otel/trace v1.40.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/crypto v0.45.0 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/crypto v0.46.0 // indirect
golang.org/x/net v0.48.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.31.0 // indirect
golang.org/x/text v0.32.0 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
google.golang.org/grpc v1.79.3 // indirect
google.golang.org/protobuf v1.36.10 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
Expand Down
Loading
Loading