Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ prettier -w .
5. HTTP client executes request
6. Response formatted based on Content-Type and output to stdout (optionally via pager)

Retryable requests replay bodies by calling `req.GetBody` when available, reopening file-backed bodies directly when possible, and only spooling the original body to a temp file as a final fallback for one-shot streams. This avoids holding large uploads in memory and keeps retries working for closable bodies like `*os.File`.

### Content Type Detection

`internal/fetch/fetch.go:getContentType()` maps MIME types to formatters. Supported types include JSON, XML, YAML, HTML, CSS, CSV, msgpack, protobuf, gRPC, SSE, NDJSON, and images.
Expand Down
12 changes: 10 additions & 2 deletions internal/fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ import (
// formatting a response body or copying it to the clipboard.
const maxBodyBytes = 1 << 20 // 1MiB

func setReplayableBody(req *http.Request, data []byte) {
req.Body = io.NopCloser(bytes.NewReader(data))
req.ContentLength = int64(len(data))
req.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(data)), nil
}
}

type Request struct {
AWSSigv4 *aws.Config
Basic *core.KeyVal[string]
Expand Down Expand Up @@ -213,13 +221,13 @@ func fetch(ctx context.Context, r *Request) (int, error) {
if err != nil {
return 0, err
}
req.Body = io.NopCloser(converted)
setReplayableBody(req, converted)
}
framed, err := frameGRPCRequest(req.Body)
if err != nil {
return 0, err
}
req.Body = io.NopCloser(framed)
setReplayableBody(req, framed)
}
}

Expand Down
9 changes: 4 additions & 5 deletions internal/fetch/proto.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package fetch

import (
"bytes"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -107,7 +106,7 @@ func setupGRPC(r *Request, schema *proto.Schema) (protoreflect.MessageDescriptor
}

// convertJSONToProtobuf converts JSON body to protobuf.
func convertJSONToProtobuf(data io.Reader, desc protoreflect.MessageDescriptor) (io.Reader, error) {
func convertJSONToProtobuf(data io.Reader, desc protoreflect.MessageDescriptor) ([]byte, error) {
// Read all the JSON data.
jsonData, err := io.ReadAll(data)
if err != nil {
Expand All @@ -120,12 +119,12 @@ func convertJSONToProtobuf(data io.Reader, desc protoreflect.MessageDescriptor)
return nil, fmt.Errorf("failed to convert JSON to protobuf: %w", err)
}

return bytes.NewReader(protoData), nil
return protoData, nil
}

// frameGRPCRequest wraps data in gRPC framing.
// Handles nil/empty body by sending an empty framed message.
func frameGRPCRequest(data io.Reader) (io.Reader, error) {
func frameGRPCRequest(data io.Reader) ([]byte, error) {
var rawData []byte
if data != nil && data != http.NoBody {
var err error
Expand All @@ -137,7 +136,7 @@ func frameGRPCRequest(data io.Reader) (io.Reader, error) {

// Frame with gRPC format (works for empty data too).
framedData := fetchgrpc.Frame(rawData, false)
return bytes.NewReader(framedData), nil
return framedData, nil
}

// streamGRPCRequest reads JSON objects from data, converts each to protobuf,
Expand Down
120 changes: 103 additions & 17 deletions internal/fetch/retry.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package fetch

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -11,6 +10,7 @@ import (
"net/http"
"net/http/httptrace"
"net/url"
"os"
"strconv"
"time"

Expand All @@ -31,6 +31,9 @@ func retryableRequest(ctx context.Context, r *Request, c *client.Client, req *ht
if err != nil {
return 0, err
}
if replayer != nil {
defer replayer.close()
}
}

var hadRedirects bool
Expand Down Expand Up @@ -246,10 +249,11 @@ func sleepWithContext(ctx context.Context, d time.Duration) error {
}
}

// replayableBody allows a request body to be replayed across retry attempts.
// replayableBody reopens a request body for each retry attempt.
type replayableBody struct {
seeker io.ReadSeeker
data []byte
open func() (io.ReadCloser, error)
cleanup func() error
tempPath string
}

// newReplayableBody creates a replayableBody from the request's current body.
Expand All @@ -259,31 +263,113 @@ func newReplayableBody(req *http.Request) (*replayableBody, error) {
return nil, nil
}

// If the body supports seeking, use it directly.
if rs, ok := req.Body.(io.ReadSeeker); ok {
return &replayableBody{seeker: rs}, nil
if req.GetBody != nil {
if err := req.Body.Close(); err != nil {
return nil, err
}
return &replayableBody{open: req.GetBody}, nil
}

if f, ok := req.Body.(*os.File); ok && f != os.Stdin {
offset, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return nil, err
}
path := f.Name()
if err := f.Close(); err != nil {
return nil, err
}
return &replayableBody{
open: func() (io.ReadCloser, error) {
reopened, err := os.Open(path)
if err != nil {
return nil, err
}
if offset != 0 {
if _, err := reopened.Seek(offset, io.SeekStart); err != nil {
reopened.Close()
return nil, err
}
}
return reopened, nil
},
}, nil
}

if rs, ok := req.Body.(io.ReadSeeker); ok && req.Body != os.Stdin {
var cleanup func() error
if closer, ok := req.Body.(io.Closer); ok {
cleanup = closer.Close
}
return &replayableBody{
open: func() (io.ReadCloser, error) {
if _, err := rs.Seek(0, io.SeekStart); err != nil {
return nil, err
}
return nopReadCloser{Reader: rs}, nil
},
cleanup: cleanup,
}, nil
}

// Otherwise, read the entire body into memory.
data, err := io.ReadAll(req.Body)
tmp, err := os.CreateTemp("", "fetch-retry-body-*")
if err != nil {
return nil, err
}
req.Body.Close()
return &replayableBody{data: data}, nil
tmpPath := tmp.Name()
cleanup := func() error {
return os.Remove(tmpPath)
}

_, copyErr := io.Copy(tmp, req.Body)
closeErr := req.Body.Close()
if copyErr != nil {
tmp.Close()
cleanup()
return nil, copyErr
}
if closeErr != nil {
tmp.Close()
cleanup()
return nil, closeErr
}
if err := tmp.Close(); err != nil {
cleanup()
return nil, err
}

return &replayableBody{
open: func() (io.ReadCloser, error) {
return os.Open(tmpPath)
},
cleanup: cleanup,
tempPath: tmpPath,
}, nil
}

// reset returns a fresh io.ReadCloser for the next attempt.
func (rb *replayableBody) reset() (io.ReadCloser, error) {
if rb.seeker != nil {
if _, err := rb.seeker.Seek(0, io.SeekStart); err != nil {
return nil, err
}
return io.NopCloser(rb.seeker), nil
if rb == nil {
return nil, nil
}
return io.NopCloser(bytes.NewReader(rb.data)), nil
return rb.open()
}

func (rb *replayableBody) close() error {
if rb == nil || rb.cleanup == nil {
return nil
}
err := rb.cleanup()
rb.cleanup = nil
return err
}

type nopReadCloser struct {
io.Reader
}

func (nopReadCloser) Close() error { return nil }

// retryReason returns a human-readable reason for the retry.
func retryReason(resp *http.Response, err error) string {
if err != nil {
Expand Down
Loading
Loading