Skip to content

fix: add SSE event type line for Anthropic SDK streaming compatibility#332

Open
johnyang007 wants to merge 2 commits intoLaisky:mainfrom
johnyang007:fix/anthropic-sse-event-type
Open

fix: add SSE event type line for Anthropic SDK streaming compatibility#332
johnyang007 wants to merge 2 commits intoLaisky:mainfrom
johnyang007:fix/anthropic-sse-event-type

Conversation

@johnyang007
Copy link
Copy Markdown

@johnyang007 johnyang007 commented Apr 1, 2026

Summary

  • Add event: {type} line before each data: line in ClaudeNativeStreamHandler — the Anthropic Python SDK requires this to dispatch streaming events correctly
  • Skip upstream data: [DONE] marker to avoid JSON parse error logs
  • Forward unparseable SSE events as-is instead of silently dropping them

Root Cause

The Anthropic SDK's __stream__ method checks sse.event value (e.g. message_start, content_block_delta) to dispatch events. Without the event: line, sse.event is None and all events are silently skipped, resulting in zero streaming output.

Test plan

  • Docker local build with SQLite, Anthropic-type channel proxying to upstream
  • 10/10 test cases passed: basic, multi-turn, system prompt, streaming (text_stream + events + raw), tool use, tool result round-trip, streaming tool use, metrics
  • No error unmarshalling stream response logs for [DONE] marker

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added a comprehensive Grafana dashboard template with 54 monitoring panels covering system overview, relay status, channel health, user quotas, and billing metrics.
    • Metrics endpoint is now publicly accessible without requiring admin authentication.
  • Bug Fixes

    • Improved streaming event handling for Anthropic API responses.
  • Documentation

    • Added Grafana dashboard design specification.

johnyang007 and others added 2 commits April 1, 2026 16:06
- Add complete Grafana dashboard JSON (7 rows, 54+ panels) covering
  relay requests, channel health, user quota, billing, DB/Redis, and
  Go runtime metrics
- Add dashboard design spec and implementation plan docs
- Remove admin auth from /metrics endpoint for Prometheus scraping
- Fix TokenAuth to populate username in context, resolving "unknown"
  user labels in Prometheus metrics
- Update dashboard and spec to use username instead of user_id for
  display and filtering

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…patibility

The Anthropic Python SDK requires `event: <type>` lines in SSE streams
to dispatch events correctly. Without them, `sse.event` is None and
all events are silently skipped, resulting in zero streaming output.

- Add `event: {type}` line before each `data:` line using the parsed
  JSON `type` field (message_start, content_block_delta, etc.)
- Skip upstream `data: [DONE]` marker to avoid JSON parse errors
- Forward unparseable events as-is instead of silently dropping them

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 1, 2026

📝 Walkthrough

Walkthrough

New Grafana dashboard specification and JSON configuration files introduce comprehensive monitoring infrastructure with 54 panels across 7 rows, templating variables for dynamic filtering, and multi-currency billing visualization. Concurrently, the /metrics endpoint access control is relaxed, authentication context is enhanced with username metadata, and Claude streaming event handling is refined to better manage SSE event types and DONE markers.

Changes

Cohort / File(s) Summary
Grafana Dashboard Documentation
docs/superpowers/plans/2026-04-01-grafana-dashboard.md, docs/superpowers/specs/2026-04-01-grafana-dashboard-design.md
New dashboard design specification (204 lines) and importable Grafana 10.x+ dashboard JSON (1635 lines) with 54 panels across Overview, Relay, Channel Health, Users & Quota, Billing, Infrastructure, and Security/Runtime rows; includes 7 templating variables for filtering and currency conversion with PromQL-based metrics queries.
Metrics Endpoint Access Control
main.go
Removed middleware.AdminAuth() wrapper from /metrics route, making Prometheus metrics endpoint publicly accessible without authentication.
Authentication Context Enhancement
middleware/auth.go
Added ctxkey.Username context value population in TokenAuth() middleware using model.GetUsernameById(token.UserId) post-token validation.
Claude Streaming Event Handler
relay/adaptor/anthropic/main.go
Modified ClaudeNativeStreamHandler to filter upstream [DONE] markers, emit SSE event: lines when claudeResponse.Type is present, forward unparseable payloads as basic SSE messages, and flush after each event; final stream still closes with [DONE].

Poem

🐰 A dashboard blooms with metrics bright,
Forty-four panels in candlelit light,
Prometheus whispers through currencies fine,
Metrics now public—no auth needed—divine!
Claude's streams flow smoother, events align,
Context enriched with usernames to shine. ✨

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 66.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately describes the main change: adding SSE event type lines to fix Anthropic SDK streaming compatibility. This aligns with the primary objective in the relay/adaptor/anthropic/main.go changes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive Grafana dashboard for monitoring One API, including detailed implementation plans and design specifications for 54 panels across 7 categories. Key code changes include updating the Anthropic stream handler for better SDK compatibility and adding username context to token authentication. However, several issues were identified: removing authentication from the /metrics endpoint poses a critical security risk, and performing a database lookup for usernames in the authentication middleware introduces a significant performance bottleneck. Additionally, there is an inconsistency between the design specification and the implementation plan regarding the use of user IDs versus usernames for metrics filtering.

// Add Prometheus metrics endpoint if enabled
if config.EnablePrometheusMetrics {
server.GET("/metrics", middleware.AdminAuth(), gin.WrapH(promhttp.Handler()))
server.GET("/metrics", gin.WrapH(promhttp.Handler()))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-critical critical

Removing authentication from the /metrics endpoint is a significant security risk. Metrics can expose sensitive internal data, including user activity patterns and channel configurations. This change also contradicts the design specification in docs/superpowers/specs/2026-04-01-grafana-dashboard-design.md (line 7), which states that authentication is required. If the intention is to facilitate Prometheus scraping, please use a more secure method such as a dedicated API token or IP allowlisting.

Suggested change
server.GET("/metrics", gin.WrapH(promhttp.Handler()))
server.GET("/metrics", middleware.AdminAuth(), gin.WrapH(promhttp.Handler()))


// Set token-related context for downstream handlers
c.Set(ctxkey.Id, token.UserId)
c.Set(ctxkey.Username, model.GetUsernameById(token.UserId))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Performing a database query (model.GetUsernameById) for every API request in the TokenAuth middleware will cause a major performance bottleneck. This is especially problematic for high-concurrency environments. Since the implementation plan for the Grafana dashboard (docs/superpowers/plans/2026-04-01-grafana-dashboard.md) uses user_id for filtering, this lookup appears unnecessary. If the username is required for metrics, it should be retrieved from a cache or the metrics should be updated to use the ID instead.

| `$channel_id` | Query | `label_values(one_api_relay_requests_total, channel_id)` | All | 是 |
| `$channel_type` | Query | `label_values(one_api_relay_requests_total, channel_type)` | All | 是 |
| `$model` | Query | `label_values(one_api_relay_requests_total, model)` | All | 是 |
| `$username` | Query | `label_values(one_api_user_requests_total, username)` | All | 是 |
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a discrepancy between this design specification and the implementation plan in docs/superpowers/plans/2026-04-01-grafana-dashboard.md. The spec defines a $username variable and uses it in PromQL filters (e.g., line 63), whereas the plan defines and uses user_id (lines 153-164 and 502). Standardizing on user_id is recommended as it is already available in the token context and avoids the overhead of username lookups.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/superpowers/plans/2026-04-01-grafana-dashboard.md`:
- Around line 152-164: Replace the template variable named "user_id" with
"username" in the JSON block and update its PromQL query from
label_values(one_api_user_requests_total, user_id) to
label_values(one_api_user_requests_total, username); ensure the variable
"username" (not "user_id") is used by any downstream PromQL snippets that
reference this template and by the metric one_api_user_requests_total so the
dashboard matches the new labeling convention.
- Around line 1-9: The plan document "One API Grafana Dashboard 实现计划" in
docs/superpowers/plans/2026-04-01-grafana-dashboard.md is written largely in
Chinese; translate the entire file into clear English while preserving structure
(title, Goal, Architecture, Tech Stack, task checklist lines using "- [ ]", and
references to files like docs/grafana-dashboard.json), ensuring all headings,
descriptive text and implementation steps are converted to English and that
terminology like "templating", "Transformations + Math expression", "Prometheus
PromQL", and Grafana versioning remain unchanged and accurate.

In `@docs/superpowers/specs/2026-04-01-grafana-dashboard-design.md`:
- Around line 1-8: Translate the entire Grafana dashboard spec into English and
update the stale authentication note: replace the Chinese content with English
wording that describes the single Grafana dashboard with seven collapsible row
sections, mapping of all one_api_* Prometheus metrics to panels, and that
"reserved" panels are preconfigured PromQL awaiting data; also update the data
source/auth line to state that Prometheus scrapes GET /metrics and that /metrics
is exposed without AdminAuth (per main.go) so no AdminAuth is required;
reference the one_api_* metric naming, the /metrics endpoint, the Grafana
Dashboard (Row sections) and main.go when making the edits.

In `@main.go`:
- Line 217: The /metrics endpoint was exposed without authentication by
registering server.GET("/metrics", gin.WrapH(promhttp.Handler())), which
combined with ctxkey.Username population in middleware/auth.go and
username-based dashboard queries leaks per-user metrics; revert to protecting
/metrics by either applying the existing auth middleware (attach the same
middleware used elsewhere to the /metrics route), or move the metrics handler to
an internal-only listener or implement an allowlist check before calling
promhttp.Handler(); ensure any change references the same route registration
(server.GET("/metrics", ...)) and the ctxkey.Username behavior so anonymous
callers cannot scrape user-labelled series.

In `@middleware/auth.go`:
- Line 239: The new synchronous DB lookup c.Set(ctxkey.Username,
model.GetUsernameById(token.UserId)) on the token-auth hot path adds a blocking
round-trip and silently returns empty on failure; instead propagate the username
from ValidateUserToken() (or fetch from your cache) into the context so no fresh
DB read is performed here. Modify ValidateUserToken() to return the username (or
an optional cached lookup result) alongside the validated token, then replace
the direct call to model.GetUsernameById in the middleware with
c.Set(ctxkey.Username, usernameFromValidateUserToken) (or read from the cache)
and remove the direct DB call to model.GetUsernameById(token.UserId). Ensure
ctxkey.Username is populated only from the value returned by ValidateUserToken
or cache and handle nil/empty values explicitly.

In `@relay/adaptor/anthropic/main.go`:
- Around line 927-941: The fallback that writes unparseable events uses only the
trimmed data payload and therefore loses any preceding SSE metadata (e.g.,
event: lines); modify the stream parsing so you retain the full SSE frame or at
least the last-seen event name and include it on error. Concretely, in the loop
that handles incoming SSE chunks (the code surrounding the data variable and
json.Unmarshal into StreamResponse), add a buffer (e.g., rawFrame string) that
accumulates the original lines for the current SSE event or track a currentEvent
string when you parse an "event:" line, and then change the error path that
currently does c.Writer.Write([]byte("data: "+data+"\n\n")) to write the
buffered rawFrame (or prepend "event: "+currentEvent+"\n" before the data) so
the original SSE metadata is preserved when StreamResponse unmarshalling fails.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6bf54a14-b83c-4167-825f-eb1f4ed63b12

📥 Commits

Reviewing files that changed from the base of the PR and between 593a423 and 77ca64e.

📒 Files selected for processing (6)
  • docs/grafana-dashboard.json
  • docs/superpowers/plans/2026-04-01-grafana-dashboard.md
  • docs/superpowers/specs/2026-04-01-grafana-dashboard-design.md
  • main.go
  • middleware/auth.go
  • relay/adaptor/anthropic/main.go

Comment on lines +1 to +9
# One API Grafana Dashboard 实现计划

> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.

**Goal:** 生成完整的 Grafana Dashboard JSON 配置文件,包含 7 个 Row、54 个面板、顶部变量和货币换算逻辑。

**Architecture:** 单个 `docs/grafana-dashboard.json` 文件,Grafana 10.x+ Dashboard JSON Model 格式。按 Row 逐步构建,每个 Task 负责一个 Row 的面板定义。使用 Grafana 的 `templating` 实现变量筛选,使用 Transformations + Math expression 实现货币换算。

**Tech Stack:** Grafana 10.x+ Dashboard JSON Model, Prometheus PromQL
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Please keep the implementation plan in English.

Most of this file is Chinese. The repo rule applies to planning docs too, so this needs translation before it becomes the canonical implementation guide. As per coding guidelines, "Only use English as output for codes, comments, chat, documents and everything else".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/superpowers/plans/2026-04-01-grafana-dashboard.md` around lines 1 - 9,
The plan document "One API Grafana Dashboard 实现计划" in
docs/superpowers/plans/2026-04-01-grafana-dashboard.md is written largely in
Chinese; translate the entire file into clear English while preserving structure
(title, Goal, Architecture, Tech Stack, task checklist lines using "- [ ]", and
references to files like docs/grafana-dashboard.json), ensuring all headings,
descriptive text and implementation steps are converted to English and that
terminology like "templating", "Transformations + Math expression", "Prometheus
PromQL", and Grafana versioning remain unchanged and accurate.

Comment on lines +152 to +164
{
"name": "user_id",
"label": "用户 ID",
"type": "query",
"query": "label_values(one_api_user_requests_total, user_id)",
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"refresh": 2,
"includeAll": true,
"multi": true,
"allValue": ".*",
"current": { "selected": true, "text": "All", "value": "$__all" },
"sort": 3
},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use username, not user_id, as the user filter dimension here.

Lines 152-164 switch the template variable back to user_id, and the later PromQL snippets inherit that choice. That diverges from the design spec (docs/superpowers/specs/2026-04-01-grafana-dashboard-design.md Line 18) and the backend change in middleware/auth.go Line 239, both of which move the metrics dimension to username. If this plan is followed as written, the generated dashboard JSON won't align with the new labeling strategy.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/superpowers/plans/2026-04-01-grafana-dashboard.md` around lines 152 -
164, Replace the template variable named "user_id" with "username" in the JSON
block and update its PromQL query from label_values(one_api_user_requests_total,
user_id) to label_values(one_api_user_requests_total, username); ensure the
variable "username" (not "user_id") is used by any downstream PromQL snippets
that reference this template and by the metric one_api_user_requests_total so
the dashboard matches the new labeling convention.

Comment on lines +1 to +8
# One API Grafana Dashboard 设计文档

## 概述

单个 Grafana Dashboard,包含 7 个可折叠 Row 分区,同时覆盖运维/SRE 和平台管理两种视角。所有 `one_api_*` Prometheus 指标均已映射到面板。标记为"预留"的面板当前无生产数据,但已预配置 PromQL,数据出现后自动展示。

**数据源**:Prometheus,采集 `GET /metrics`(需要 AdminAuth 会话认证)。

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Translate this spec to English and fix the stale /metrics auth note.

Most of this document is Chinese, which violates the repo documentation rule. Line 7 is also outdated now that main.go exposes /metrics without AdminAuth, so operators following the spec will assume the endpoint is protected when it is not. As per coding guidelines, "Only use English as output for codes, comments, chat, documents and everything else".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/superpowers/specs/2026-04-01-grafana-dashboard-design.md` around lines 1
- 8, Translate the entire Grafana dashboard spec into English and update the
stale authentication note: replace the Chinese content with English wording that
describes the single Grafana dashboard with seven collapsible row sections,
mapping of all one_api_* Prometheus metrics to panels, and that "reserved"
panels are preconfigured PromQL awaiting data; also update the data source/auth
line to state that Prometheus scrapes GET /metrics and that /metrics is exposed
without AdminAuth (per main.go) so no AdminAuth is required; reference the
one_api_* metric naming, the /metrics endpoint, the Grafana Dashboard (Row
sections) and main.go when making the edits.

// Add Prometheus metrics endpoint if enabled
if config.EnablePrometheusMetrics {
server.GET("/metrics", middleware.AdminAuth(), gin.WrapH(promhttp.Handler()))
server.GET("/metrics", gin.WrapH(promhttp.Handler()))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep /metrics protected unless you're also stripping user-labelled series.

Line 217 removes the only auth gate from /metrics. Combined with the new ctxkey.Username population in middleware/auth.go Line 239 and the dashboard's username-based queries, any caller that can reach this service can now scrape per-user usage and other operational metadata. If Prometheus needs unauthenticated scraping, enforce that with an explicit internal-only listener or allowlist instead of making anonymous access the code default.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@main.go` at line 217, The /metrics endpoint was exposed without
authentication by registering server.GET("/metrics",
gin.WrapH(promhttp.Handler())), which combined with ctxkey.Username population
in middleware/auth.go and username-based dashboard queries leaks per-user
metrics; revert to protecting /metrics by either applying the existing auth
middleware (attach the same middleware used elsewhere to the /metrics route), or
move the metrics handler to an internal-only listener or implement an allowlist
check before calling promhttp.Handler(); ensure any change references the same
route registration (server.GET("/metrics", ...)) and the ctxkey.Username
behavior so anonymous callers cannot scrape user-labelled series.


// Set token-related context for downstream handlers
c.Set(ctxkey.Id, token.UserId)
c.Set(ctxkey.Username, model.GetUsernameById(token.UserId))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't add a live username lookup to the token-auth hot path.

Line 239 adds a synchronous model.GetUsernameById() call to every token-authenticated request. The helper in model/user.go Line 554 does a direct DB read with no cache or error return, so this change adds one more round-trip per request and silently falls back to an empty username on lookup failures. Please surface the username from ValidateUserToken() or a cache instead of doing a fresh lookup here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/auth.go` at line 239, The new synchronous DB lookup
c.Set(ctxkey.Username, model.GetUsernameById(token.UserId)) on the token-auth
hot path adds a blocking round-trip and silently returns empty on failure;
instead propagate the username from ValidateUserToken() (or fetch from your
cache) into the context so no fresh DB read is performed here. Modify
ValidateUserToken() to return the username (or an optional cached lookup result)
alongside the validated token, then replace the direct call to
model.GetUsernameById in the middleware with c.Set(ctxkey.Username,
usernameFromValidateUserToken) (or read from the cache) and remove the direct DB
call to model.GetUsernameById(token.UserId). Ensure ctxkey.Username is populated
only from the value returned by ValidateUserToken or cache and handle nil/empty
values explicitly.

Comment on lines +927 to +941
// Skip [DONE] marker (OpenAI convention, not part of Anthropic protocol)
if data == "[DONE]" {
continue
}

// For Claude native streaming, we pass through the events directly
c.Writer.Write([]byte("data: " + data + "\n\n"))
c.Writer.(http.Flusher).Flush()
logger.Debug("stream received", zap.String("data", data))

// Parse the response to extract usage and model info
// Parse the response to extract event type and usage info
var claudeResponse StreamResponse
err := json.Unmarshal([]byte(data), &claudeResponse)
if err != nil {
logger.Error("error unmarshalling stream response", zap.Error(err))
// Still forward unparseable events as-is
c.Writer.Write([]byte("data: " + data + "\n\n"))
c.Writer.(http.Flusher).Flush()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The raw passthrough path still drops original SSE metadata.

By Line 925 the handler has already trimmed the payload and skipped every non-data: line, so the fallback at Lines 939-941 cannot actually forward an unparseable event unchanged. Any upstream frame whose event name only existed in an event: line will still lose that metadata here. If raw passthrough is a requirement, buffer full SSE frames or at least carry the preceding event: line into this error path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@relay/adaptor/anthropic/main.go` around lines 927 - 941, The fallback that
writes unparseable events uses only the trimmed data payload and therefore loses
any preceding SSE metadata (e.g., event: lines); modify the stream parsing so
you retain the full SSE frame or at least the last-seen event name and include
it on error. Concretely, in the loop that handles incoming SSE chunks (the code
surrounding the data variable and json.Unmarshal into StreamResponse), add a
buffer (e.g., rawFrame string) that accumulates the original lines for the
current SSE event or track a currentEvent string when you parse an "event:"
line, and then change the error path that currently does
c.Writer.Write([]byte("data: "+data+"\n\n")) to write the buffered rawFrame (or
prepend "event: "+currentEvent+"\n" before the data) so the original SSE
metadata is preserved when StreamResponse unmarshalling fails.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant