From 61633f81e84c6333d2141ad3443fc759ac3270c8 Mon Sep 17 00:00:00 2001 From: annhiluc Date: Sat, 16 Aug 2025 00:17:08 +0000 Subject: [PATCH 1/4] feat: Add support for streaming replay and recordings fix: Add redaction for request URLs --- internal/record/recording_https_proxy.go | 1 + internal/replay/replay_http_server.go | 30 ++++++++++++++++++--- internal/store/store.go | 34 ++++++++++++++++++++++-- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/internal/record/recording_https_proxy.go b/internal/record/recording_https_proxy.go index 3f8449e..22a6541 100644 --- a/internal/record/recording_https_proxy.go +++ b/internal/record/recording_https_proxy.go @@ -125,6 +125,7 @@ func (r *RecordingHTTPSProxy) redactRequest(req *http.Request) (*store.RecordedR // Redacts secrets from header values r.redactor.Headers(recordedRequest.Headers) recordedRequest.Request = r.redactor.String(recordedRequest.Request) + recordedRequest.URL = r.redactor.String(recordedRequest.URL) var redactedBodySegments []map[string]any for _, bodySegment := range recordedRequest.BodySegments { redactedBodySegments = append(redactedBodySegments, r.redactor.Map(bodySegment)) diff --git a/internal/replay/replay_http_server.go b/internal/replay/replay_http_server.go index 0beaa89..ebb42dc 100644 --- a/internal/replay/replay_http_server.go +++ b/internal/replay/replay_http_server.go @@ -131,6 +131,7 @@ func (r *ReplayHTTPServer) createRedactedRequest(req *http.Request) (*store.Reco // Redacts secrets from header values r.redactor.Headers(recordedRequest.Headers) recordedRequest.Request = r.redactor.String(recordedRequest.Request) + recordedRequest.URL = r.redactor.String(recordedRequest.URL) var redactedBodySegments []map[string]any for _, bodySegment := range recordedRequest.BodySegments { redactedBodySegments = append(redactedBodySegments, r.redactor.Map(bodySegment)) @@ -179,13 +180,34 @@ func (r *ReplayHTTPServer) writeResponse(w http.ResponseWriter, resp *store.Reco w.WriteHeader(int(resp.StatusCode)) - jsonBytes, err := json.Marshal(resp.BodySegments[0]) - if err != nil { + if len(resp.BodySegments) == 1 { + jsonBytes, err := json.Marshal(resp.BodySegments[0]) + if err != nil { + return err + } + + _, err = w.Write(jsonBytes) return err + } else { + respToWrite := []byte{} + for _, bodySegment := range resp.BodySegments { + jsonBytes, err := json.Marshal(bodySegment) + if err != nil { + return err + } + + line := append([]byte("data: "), jsonBytes...) + line = append(line, []byte("\n\n")...) + + respToWrite = append(respToWrite, line...) + } + + if _, err := w.Write(respToWrite); err != nil { + return err + } } - _, err = w.Write(jsonBytes) - return err + return nil } func extractNumber(i *int, content string) (int, error) { diff --git a/internal/store/store.go b/internal/store/store.go index 1476caa..85a2237 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -17,6 +17,7 @@ limitations under the License. package store import ( + "bufio" "bytes" "compress/gzip" "crypto/sha256" @@ -175,16 +176,45 @@ func NewRecordedResponse(resp *http.Response, body []byte) (*RecordedResponse, e } + var bodySegments []map[string]any var bodySegment map[string]any err := json.Unmarshal(body, &bodySegment) if err != nil { - return nil, err + // Attempt to process streamed response. + prefix := []byte("data: ") + + reader := bytes.NewReader(body) + scanner := bufio.NewScanner(reader) + + for scanner.Scan() { + lineBytes := scanner.Bytes() + if len(lineBytes) == 0 { + continue + } + + if jsonBytes, ok := bytes.CutPrefix(lineBytes, prefix); ok { + var jsonMap map[string]any + if err := json.Unmarshal(jsonBytes, &jsonMap); err != nil { + log.Printf("Error unmarshaling JSON: %v", err) + continue + } + + bodySegments = append(bodySegments, jsonMap) + } + } + + if err := scanner.Err(); err != nil { + log.Fatalf("Error reading input bytes: %v", err) + return nil, err + } + } else { + bodySegments = append(bodySegments, bodySegment) } recordedResponse := &RecordedResponse{ StatusCode: int32(resp.StatusCode), Headers: GetHeadersMap(&resp.Header), - BodySegments: []map[string]any{bodySegment}, + BodySegments: bodySegments, } return recordedResponse, nil } From e6d9aff0ecea5ab32566a4c664cb9ef94f6e8eff Mon Sep 17 00:00:00 2001 From: annhiluc Date: Tue, 19 Aug 2025 01:14:12 +0000 Subject: [PATCH 2/4] fix: Update logic for processing streamed responses and add check for empty body messages in readBody() --- internal/replay/replay_http_server.go | 15 ++++++--------- internal/store/store.go | 3 +++ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/replay/replay_http_server.go b/internal/replay/replay_http_server.go index ebb42dc..746d627 100644 --- a/internal/replay/replay_http_server.go +++ b/internal/replay/replay_http_server.go @@ -109,7 +109,7 @@ func (r *ReplayHTTPServer) handleRequest(w http.ResponseWriter, req *http.Reques return } - err = r.writeResponse(w, resp) + err = r.writeResponse(w, resp, redactedReq) if err != nil { fmt.Printf("Error writing response: %v\n", err) panic(err) @@ -170,7 +170,7 @@ func (r *ReplayHTTPServer) loadResponse(fileName string, shaSum string) (*store. return nil, fmt.Errorf("response with shaSum %s not found in file", shaSum) } -func (r *ReplayHTTPServer) writeResponse(w http.ResponseWriter, resp *store.RecordedResponse) error { +func (r *ReplayHTTPServer) writeResponse(w http.ResponseWriter, resp *store.RecordedResponse, req *store.RecordedRequest) error { for key, value := range resp.Headers { if key == "Content-Length" || key == "Content-Encoding" { continue @@ -180,7 +180,7 @@ func (r *ReplayHTTPServer) writeResponse(w http.ResponseWriter, resp *store.Reco w.WriteHeader(int(resp.StatusCode)) - if len(resp.BodySegments) == 1 { + if !strings.Contains(req.URL, "alt=sse") { jsonBytes, err := json.Marshal(resp.BodySegments[0]) if err != nil { return err @@ -189,7 +189,6 @@ func (r *ReplayHTTPServer) writeResponse(w http.ResponseWriter, resp *store.Reco _, err = w.Write(jsonBytes) return err } else { - respToWrite := []byte{} for _, bodySegment := range resp.BodySegments { jsonBytes, err := json.Marshal(bodySegment) if err != nil { @@ -199,11 +198,9 @@ func (r *ReplayHTTPServer) writeResponse(w http.ResponseWriter, resp *store.Reco line := append([]byte("data: "), jsonBytes...) line = append(line, []byte("\n\n")...) - respToWrite = append(respToWrite, line...) - } - - if _, err := w.Write(respToWrite); err != nil { - return err + if _, err := w.Write(line); err != nil { + return err + } } } diff --git a/internal/store/store.go b/internal/store/store.go index 85a2237..3b61372 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -106,6 +106,9 @@ func readBody(req *http.Request) (map[string]any, error) { return nil, err } var resultMap map[string]any + if string(body) == "" { + return resultMap, nil + } err = json.Unmarshal(body, &resultMap) if err != nil { log.Fatalf("Error unmarshaling JSON: %v", err) From e8e96be12d5186c542594699a370854cd3f85991 Mon Sep 17 00:00:00 2001 From: annhiluc Date: Tue, 19 Aug 2025 01:23:54 +0000 Subject: [PATCH 3/4] Update to fix merge conflict. --- internal/replay/replay_http_server.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/internal/replay/replay_http_server.go b/internal/replay/replay_http_server.go index 13ab41e..746d627 100644 --- a/internal/replay/replay_http_server.go +++ b/internal/replay/replay_http_server.go @@ -200,11 +200,7 @@ func (r *ReplayHTTPServer) writeResponse(w http.ResponseWriter, resp *store.Reco if _, err := w.Write(line); err != nil { return err - } - } - - if _, err := w.Write(respToWrite); err != nil { - return err + } } } From 9c4903af8e08d74afa8cec032d8e780799099ef6 Mon Sep 17 00:00:00 2001 From: annhiluc Date: Tue, 19 Aug 2025 01:23:54 +0000 Subject: [PATCH 4/4] Update to fix merge conflict. --- internal/store/store.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/store/store.go b/internal/store/store.go index 3b61372..107b8ea 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -33,6 +33,7 @@ import ( ) const HeadSHA = "b4d6e60a9b97e7b98c63df9308728c5c88c0b40c398046772c63447b94608b4d" +const ReadBufferSize = 10 * 1024 * 1024 // 10MB // Represents a single interaction, request and response in a replay. type RecordInteraction struct { @@ -189,6 +190,9 @@ func NewRecordedResponse(resp *http.Response, body []byte) (*RecordedResponse, e reader := bytes.NewReader(body) scanner := bufio.NewScanner(reader) + buf := make([]byte, ReadBufferSize) + scanner.Buffer(buf, ReadBufferSize) + for scanner.Scan() { lineBytes := scanner.Bytes() if len(lineBytes) == 0 {