From bf9df7ddb3ad2bd56b0f5881ff32a4911380d265 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 9 Feb 2026 14:57:07 +0100 Subject: [PATCH 01/12] pkg/fileutils: use CopyFileRange if possible Signed-off-by: Giuseppe Scrivano --- storage/pkg/fileutils/reflink_linux.go | 51 +++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/storage/pkg/fileutils/reflink_linux.go b/storage/pkg/fileutils/reflink_linux.go index 9f5c6c90bb..d9625a0f0d 100644 --- a/storage/pkg/fileutils/reflink_linux.go +++ b/storage/pkg/fileutils/reflink_linux.go @@ -8,13 +8,62 @@ import ( ) // ReflinkOrCopy attempts to reflink the source to the destination fd. -// If reflinking fails or is unsupported, it falls back to io.Copy(). +// If reflinking fails, it tries copy_file_range for kernel-level copying. +// If that also fails, it falls back to io.Copy(). func ReflinkOrCopy(src, dst *os.File) error { err := unix.IoctlFileClone(int(dst.Fd()), int(src.Fd())) if err == nil { return nil } + srcInfo, statErr := src.Stat() + if statErr != nil { + _, err = io.Copy(dst, src) + return err + } + + if err := doCopyFileRange(src, dst, srcInfo.Size()); err == nil { + return nil + } + + // copy_file_range may have partially written data before failing, + // so reset both file offsets and truncate dst before falling back. + if _, err := src.Seek(0, io.SeekStart); err != nil { + return err + } + if _, err := dst.Seek(0, io.SeekStart); err != nil { + return err + } + if err := dst.Truncate(0); err != nil { + return err + } + _, err = io.Copy(dst, src) return err } + +// doCopyFileRange uses the copy_file_range syscall for kernel-level copying. +func doCopyFileRange(src, dst *os.File, size int64) error { + const maxChunk = 1 << 30 // 1GiB + remaining := size + srcFd := int(src.Fd()) + dstFd := int(dst.Fd()) + for remaining > 0 { + chunkSize := remaining + if chunkSize > maxChunk { + chunkSize = maxChunk + } + n, err := unix.CopyFileRange(srcFd, nil, dstFd, nil, int(chunkSize), 0) + if err != nil { + return err + } + if n == 0 { + if remaining > 0 { + return io.ErrUnexpectedEOF + } + break + } + remaining -= int64(n) + } + return nil +} From d5428aaa564403e29134394fbf8a27c18a0c4a27 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 19 Feb 2026 15:45:38 +0100 Subject: [PATCH 02/12] storage, chunked: new function GenerateDumpFromTarHeaders Signed-off-by: Giuseppe Scrivano --- storage/pkg/chunked/dump/dump.go | 61 ++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/storage/pkg/chunked/dump/dump.go b/storage/pkg/chunked/dump/dump.go index 1d004023b8..fc1986850a 100644 --- a/storage/pkg/chunked/dump/dump.go +++ b/storage/pkg/chunked/dump/dump.go @@ -3,6 +3,7 @@ package dump import ( + "archive/tar" "bufio" "encoding/base64" "fmt" @@ -13,6 +14,7 @@ import ( "time" "github.com/opencontainers/go-digest" + "go.podman.io/storage/pkg/archive" "go.podman.io/storage/pkg/chunked/internal/minimal" storagePath "go.podman.io/storage/pkg/chunked/internal/path" "golang.org/x/sys/unix" @@ -270,3 +272,62 @@ func GenerateDump(tocI any, verityDigests map[string]string) (io.Reader, error) }() return pipeR, nil } + +// GenerateDumpFromTarHeaders generates a composefs dump from stdlib tar headers, +// content digests, and verity digests. It converts the tar headers to +// minimal.FileMetadata entries internally and delegates to GenerateDump. +func GenerateDumpFromTarHeaders(headers []*tar.Header, contentDigests, verityDigests map[string]string) (io.Reader, error) { + var entries []minimal.FileMetadata + for _, hdr := range headers { + entry, err := fileMetadataFromTarHeader(hdr) + if err != nil { + return nil, err + } + if d, ok := contentDigests[hdr.Name]; ok { + entry.Digest = d + } + entries = append(entries, entry) + } + toc := &minimal.TOC{Version: 1, Entries: entries} + return GenerateDump(toc, verityDigests) +} + +// fileMetadataFromTarHeader creates a minimal.FileMetadata from a stdlib +// tar.Header. This mirrors minimal.NewFileMetadata (which uses the +// tar-split tar package) including AccessTime and ChangeTime. +func fileMetadataFromTarHeader(hdr *tar.Header) (minimal.FileMetadata, error) { + typ, err := minimal.GetType(hdr.Typeflag) + if err != nil { + return minimal.FileMetadata{}, err + } + xattrs := make(map[string]string) + for k, v := range hdr.PAXRecords { + xattrKey, ok := strings.CutPrefix(k, archive.PaxSchilyXattr) + if !ok { + continue + } + xattrs[xattrKey] = base64.StdEncoding.EncodeToString([]byte(v)) + } + return minimal.FileMetadata{ + Type: typ, + Name: hdr.Name, + Linkname: hdr.Linkname, + Mode: hdr.Mode, + Size: hdr.Size, + UID: hdr.Uid, + GID: hdr.Gid, + ModTime: timeIfNotZero(&hdr.ModTime), + AccessTime: timeIfNotZero(&hdr.AccessTime), + ChangeTime: timeIfNotZero(&hdr.ChangeTime), + Devmajor: hdr.Devmajor, + Devminor: hdr.Devminor, + Xattrs: xattrs, + }, nil +} + +func timeIfNotZero(t *time.Time) *time.Time { + if t == nil || t.IsZero() { + return nil + } + return t +} From 8141b8901986fc7313afc9b520f0ec97cdd2716e Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 11 Feb 2026 10:36:30 +0100 Subject: [PATCH 03/12] storage/vendor: add github.com/bootc-dev/jsonrpc-fdpass-go Signed-off-by: Giuseppe Scrivano --- go.work.sum | 7 + storage/go.mod | 2 + storage/go.sum | 4 + .../.bootc-dev-infra-commit.txt | 1 + .../bootc-dev/jsonrpc-fdpass-go/.gitignore | 23 ++ .../bootc-dev/jsonrpc-fdpass-go/AGENTS.md | 41 ++ .../bootc-dev/jsonrpc-fdpass-go/Justfile | 49 +++ .../bootc-dev/jsonrpc-fdpass-go/LICENSE | 21 + .../bootc-dev/jsonrpc-fdpass-go/README.md | 68 ++++ .../bootc-dev/jsonrpc-fdpass-go/REVIEW.md | 173 ++++++++ .../bootc-dev/jsonrpc-fdpass-go/message.go | 216 ++++++++++ .../bootc-dev/jsonrpc-fdpass-go/renovate.json | 6 + .../bootc-dev/jsonrpc-fdpass-go/transport.go | 381 ++++++++++++++++++ vendor/modules.txt | 3 + 14 files changed, 995 insertions(+) create mode 100644 vendor/github.com/bootc-dev/jsonrpc-fdpass-go/.bootc-dev-infra-commit.txt create mode 100644 vendor/github.com/bootc-dev/jsonrpc-fdpass-go/.gitignore create mode 100644 vendor/github.com/bootc-dev/jsonrpc-fdpass-go/AGENTS.md create mode 100644 vendor/github.com/bootc-dev/jsonrpc-fdpass-go/Justfile create mode 100644 vendor/github.com/bootc-dev/jsonrpc-fdpass-go/LICENSE create mode 100644 vendor/github.com/bootc-dev/jsonrpc-fdpass-go/README.md create mode 100644 vendor/github.com/bootc-dev/jsonrpc-fdpass-go/REVIEW.md create mode 100644 vendor/github.com/bootc-dev/jsonrpc-fdpass-go/message.go create mode 100644 vendor/github.com/bootc-dev/jsonrpc-fdpass-go/renovate.json create mode 100644 vendor/github.com/bootc-dev/jsonrpc-fdpass-go/transport.go diff --git a/go.work.sum b/go.work.sum index 882de9b5e1..ed45298637 100644 --- a/go.work.sum +++ b/go.work.sum @@ -76,6 +76,8 @@ github.com/aws/smithy-go v1.23.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4p github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/bootc-dev/jsonrpc-fdpass-go v0.0.0-20260407114003-45e4e8bb1fde h1:b/WbDyjcMI41JEv2NsjTp8w6Uj1D443TR/U6JQBhbVA= +github.com/bootc-dev/jsonrpc-fdpass-go v0.0.0-20260407114003-45e4e8bb1fde/go.mod h1:JVI3DW7BAg91VjSjYwAirSLq6K0AdlET7S8AjanZC0Q= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -98,6 +100,7 @@ github.com/containerd/typeurl/v2 v2.2.0/go.mod h1:8XOOxnyatxSWuG8OfsZXVnAF4iZfed github.com/containers/storage v1.59.1/go.mod h1:KoAYHnAjP3/cTsRS+mmWZGkufSY2GACiKQ4V3ZLQnR0= github.com/coreos/go-iptables v0.8.0/go.mod h1:Qe8Bv2Xik5FyTXwgIbLAnv2sWSBmvWdFETJConOQ//Q= github.com/cpuguy83/go-md2man/v2 v2.0.7/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/creack/pty v1.1.24/go.mod h1:08sCNb52WyoAwi2QDyzUCTgcvVFhUzewun7wtTfvcwE= github.com/danieljoos/wincred v1.2.3/go.mod h1:6qqX0WNrS4RzPZ1tnroDzq9kY3fu1KwE7MRLQK4X0bs= @@ -193,12 +196,14 @@ github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdB github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/letsencrypt/boulder v0.20250721.0/go.mod h1:pF04h1wm4jvF17m5vGRzOYBr+NHgZQ+9pLOxkAOoz6w= github.com/letsencrypt/boulder v0.20251110.0/go.mod h1:ogKCJQwll82m7OVHWyTuf8eeFCjuzdRQlgnZcCl0V+8= github.com/letsencrypt/boulder v0.20260223.0/go.mod h1:r3aTSA7UZ7dbDfiGK+HLHJz0bWNbHk6YSPiXgzl23sA= github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mdlayher/packet v1.1.2/go.mod h1:GEu1+n9sG5VtiRE4SydOmX5GTwyyYlteZiFU+x0kew4= github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ= @@ -228,6 +233,7 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= @@ -238,6 +244,7 @@ github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7q github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= diff --git a/storage/go.mod b/storage/go.mod index 9be263342f..9f6e54fba2 100644 --- a/storage/go.mod +++ b/storage/go.mod @@ -6,6 +6,7 @@ module go.podman.io/storage require ( github.com/BurntSushi/toml v1.6.0 + github.com/bootc-dev/jsonrpc-fdpass-go v0.0.0-20260407114003-45e4e8bb1fde github.com/containerd/stargz-snapshotter/estargz v0.18.2 github.com/cyphar/filepath-securejoin v0.6.1 github.com/docker/go-units v0.5.0 @@ -19,6 +20,7 @@ require ( github.com/moby/sys/mountinfo v0.7.2 github.com/moby/sys/user v0.4.0 github.com/opencontainers/go-digest v1.0.0 + github.com/opencontainers/image-spec v1.1.1 github.com/opencontainers/runtime-spec v1.3.0 github.com/opencontainers/selinux v1.13.1 github.com/sirupsen/logrus v1.9.4 diff --git a/storage/go.sum b/storage/go.sum index e8f1f284ce..b83a2ecf78 100644 --- a/storage/go.sum +++ b/storage/go.sum @@ -2,6 +2,8 @@ cyphar.com/go-pathrs v0.2.4 h1:iD/mge36swa1UFKdINkr1Frkpp6wZsy3YYEildj9cLY= cyphar.com/go-pathrs v0.2.4/go.mod h1:y8f1EMG7r+hCuFf/rXsKqMJrJAUoADZGNh5/vZPKcGc= github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk= github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/bootc-dev/jsonrpc-fdpass-go v0.0.0-20260407114003-45e4e8bb1fde h1:b/WbDyjcMI41JEv2NsjTp8w6Uj1D443TR/U6JQBhbVA= +github.com/bootc-dev/jsonrpc-fdpass-go v0.0.0-20260407114003-45e4e8bb1fde/go.mod h1:JVI3DW7BAg91VjSjYwAirSLq6K0AdlET7S8AjanZC0Q= github.com/containerd/stargz-snapshotter/estargz v0.18.2 h1:yXkZFYIzz3eoLwlTUZKz2iQ4MrckBxJjkmD16ynUTrw= github.com/containerd/stargz-snapshotter/estargz v0.18.2/go.mod h1:XyVU5tcJ3PRpkA9XS2T5us6Eg35yM0214Y+wvrZTBrY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -52,6 +54,8 @@ github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFd github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= +github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= github.com/opencontainers/runtime-spec v1.3.0 h1:YZupQUdctfhpZy3TM39nN9Ika5CBWT5diQ8ibYCRkxg= github.com/opencontainers/runtime-spec v1.3.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/selinux v1.13.1 h1:A8nNeceYngH9Ow++M+VVEwJVpdFmrlxsN22F+ISDCJE= diff --git a/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/.bootc-dev-infra-commit.txt b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/.bootc-dev-infra-commit.txt new file mode 100644 index 0000000000..439a61cec2 --- /dev/null +++ b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/.bootc-dev-infra-commit.txt @@ -0,0 +1 @@ +56e4f615d38cc4a923f6a7e2a174a0c05a962451 diff --git a/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/.gitignore b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/.gitignore new file mode 100644 index 0000000000..1ee0e6b891 --- /dev/null +++ b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/.gitignore @@ -0,0 +1,23 @@ +# Binaries +*.exe +*.exe~ +*.dll +*.so +*.dylib +/echo + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool +*.out + +# Dependency directories +vendor/ + +# Build output +/target/ + +# Integration test Rust build artifacts +tests-integration/target/ +tests-integration/Cargo.lock diff --git a/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/AGENTS.md b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/AGENTS.md new file mode 100644 index 0000000000..98ff247807 --- /dev/null +++ b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/AGENTS.md @@ -0,0 +1,41 @@ + + +# Instructions for AI agents + +## CRITICAL instructions for generating commits + +### Signed-off-by + +Human review is required for all code that is generated +or assisted by a large language model. If you +are a LLM, you MUST NOT include a `Signed-off-by` +on any automatically generated git commits. Only explicit +human action or request should include a Signed-off-by. +If for example you automatically create a pull request +and the DCO check fails, tell the human to review +the code and give them instructions on how to add +a signoff. + +### Attribution + +When generating substantial amounts of code, you SHOULD +include an `Assisted-by: TOOLNAME (MODELNAME)`. For example, +`Assisted-by: Goose (Sonnet 4.5)`. + +## Code guidelines + +The [REVIEW.md](REVIEW.md) file describes expectations around +testing, code quality, commit organization, etc. If you're +creating a change, it is strongly encouraged after each +commit and especially when you think a task is complete +to spawn a subagent to perform a review using guidelines (alongside +looking for any other issues). + +If you are performing a review of other's code, the same +principles apply. + +## Follow other guidelines + +Look at the project README.md and look for guidelines +related to contribution, such as a CONTRIBUTING.md +and follow those. diff --git a/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/Justfile b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/Justfile new file mode 100644 index 0000000000..b76683a7ed --- /dev/null +++ b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/Justfile @@ -0,0 +1,49 @@ +# Format and lint Go code +check: + go fmt ./... + go vet ./... + +# Run unit tests +unit: + go test -v ./... + +# Run unit tests with race detector +test-race: + go test -race -v ./... + +# Build all packages +build: + go build ./... + +# Build the example +build-example: + go build -o target/echo ./examples/echo + +# Run all tests +test-all: unit + +# Clean build artifacts +clean: + rm -rf target/ + rm -rf tests-integration/target/ + go clean ./... + +# Full CI check (format, lint, test) +ci: check unit + +# Run the integration tests against the Rust implementation +# Requires: cargo, go +test-integration: build-integration-server + go test -v ./tests-integration/... + +# Build the Rust integration test server +build-integration-server: + cargo build --manifest-path tests-integration/Cargo.toml + +# Run the echo server example +run-server socket="/tmp/echo.sock": + go run ./examples/echo server {{socket}} + +# Run the echo client example +run-client socket="/tmp/echo.sock": + go run ./examples/echo client {{socket}} diff --git a/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/LICENSE b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/LICENSE new file mode 100644 index 0000000000..4e800f548b --- /dev/null +++ b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Colin Walters + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/README.md b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/README.md new file mode 100644 index 0000000000..5ad0e38f1d --- /dev/null +++ b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/README.md @@ -0,0 +1,68 @@ +# jsonrpc-fdpass-go + +A Go implementation of JSON-RPC 2.0 with file descriptor passing over Unix domain sockets. + +This library implements the protocol specified in [jsonrpc-fdpass](https://github.com/bootc-dev/jsonrpc-fdpass). + +## Protocol Overview + +- **Transport**: Unix domain sockets (SOCK_STREAM) +- **Framing**: Self-delimiting JSON (streaming parser) +- **FD Passing**: Via sendmsg/recvmsg with SCM_RIGHTS ancillary data +- **FD Count**: Top-level `fds` field indicates the number of file descriptors attached + +When file descriptors are attached to a message, the `fds` field is automatically +set to the count of FDs. File descriptors are passed positionally—the application +layer defines the semantic mapping between FD positions and parameters. + +## Installation + +```bash +go get github.com/bootc-dev/jsonrpc-fdpass-go +``` + +## Usage + +```go +package main + +import ( + "net" + "os" + + fdpass "github.com/bootc-dev/jsonrpc-fdpass-go" +) + +func main() { + // Connect to a Unix socket + conn, _ := net.DialUnix("unix", nil, &net.UnixAddr{Name: "/tmp/socket.sock", Net: "unix"}) + + // Create sender and receiver + sender := fdpass.NewSender(conn) + receiver := fdpass.NewReceiver(conn) + + // Send a request with a file descriptor + file, _ := os.Open("example.txt") + defer file.Close() + + req := fdpass.NewRequest("readFile", map[string]interface{}{ + "path": "example.txt", + }, 1) + + msg := &fdpass.MessageWithFds{ + Message: req, + FileDescriptors: []*os.File{file}, + } + + // The sender automatically sets the "fds" field to 1 + sender.Send(msg) + + // Receive response + resp, _ := receiver.Receive() + // Handle resp.Message and resp.FileDescriptors +} +``` + +## License + +MIT diff --git a/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/REVIEW.md b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/REVIEW.md new file mode 100644 index 0000000000..80e6f4d8b0 --- /dev/null +++ b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/REVIEW.md @@ -0,0 +1,173 @@ +# Code Review Guidelines + +These guidelines are derived from analysis of code reviews across the bootc-dev +organization (October–December 2024). They represent the collective expectations +and standards that have emerged from real review feedback. + +## Testing + +Tests are expected for all non-trivial changes - unit and integration by default. + +If there's something that's difficult to write a test for at the current time, +please do at least state if it was tested manually. + +### Choosing the Right Test Type + +Unit tests are appropriate for parsing logic, data transformations, and +self-contained functions. Use integration tests for anything that involves +running containers or VMs. + +Default to table-driven tests instead of having a separate unit test per +case. Especially LLMs like to generate the latter, but it can become +too verbose. Context windows matter to both humans and LLMs reading the +code later (this applies outside of unit tests too of course, but it's +easy to generate a *lot* of code for unit tests unnecessarily). + +### Separating Parsing from I/O + +A recurring theme is structuring code for testability. Split parsers from data +reading: have the parser accept a `&str`, then have a separate function that +reads from disk and calls the parser. This makes unit testing straightforward +without filesystem dependencies. + +### Test Assertions + +Make assertions strict and specific. Don't just verify that code "didn't crash"— +check that outputs match expected values. When adding new commands or output +formats, tests should verify the actual content, not just that something was +produced. + +## Code Quality + +### Parsing Structured Data + +Never parse structured data formats (JSON, YAML, XML) with text tools like `grep` +or `sed`. + +### Shell Scripts + +Try to avoid having shell script longer than 50 lines. This commonly occurs +in build system and tests. For the build system, usually there's higher +level ways to structure things (Justfile e.g.) and several of our projects +use the `cargo xtask` pattern to put arbitrary "glue" code in Rust using +the `xshell` crate to keep it easy to run external commands. + +### Constants and Magic Values + +Extract magic numbers into named constants. Any literal number that isn't +immediately obvious—buffer sizes, queue lengths, retry counts, timeouts—should +be a constant with a descriptive name. The same applies to magic strings: +deduplicate repeated paths, configuration keys, and other string literals. + +When values aren't self-explanatory, add a comment explaining why that specific +value was chosen. + +### Don't ignore (swallow) errors + +Avoid the `if let Ok(v) = ... { }` in Rust, or `foo 2>/dev/null || true` +pattern in shell script by default. Most errors should be propagated by +default. If not, it's usually appropriate to at least log error messages +at a `tracing::debug!` or equivalent level. + +Handle edge cases explicitly: missing data, malformed input, offline systems. +Error messages should provide clear context for diagnosis. + +### Code Organization + +Separate concerns: I/O operations, parsing logic, and business logic belong in +different functions. Structure code so core logic can be unit tested without +external dependencies. + +It can be OK to duplicate a bit of code in a slightly different form twice, +but having it happen in 3 places asks for deduplication. + +## Commits and Pull Requests + +### Commit Organization + +Break changes into logical, atomic commits. Reviewers appreciate being able to +follow your reasoning: "Especially grateful for breaking it up into individual +commits so I can more easily follow your train of thought." + +Preparatory refactoring should be separate from behavioral changes. Each commit +should tell a clear story and be reviewable independently. Commit messages should +explain the "why" not just the "what," and use imperative mood ("Add feature" +not "Added feature"). + +### PR Descriptions + +PRs should link to the issues they address using `Closes:` or `Fixes:` with +full URLs. One reviewer noted: "I edited this issue just now to have +`Closes: ` but let's try to be sure we're doing that kind of thing in +general in the future." + +Document known limitations and caveats explicitly. When approaches have tradeoffs +or don't fully solve a problem, say so. For complex investigations, use collapsible +`
` sections to include debugging notes without cluttering the main +description. + +Think about broader implications: "But we'll have this problem across all repos +right?" Consider how your change affects the wider ecosystem. + +### Keeping PRs Current + +Keep PRs rebased on main. When CI failures are fixed in other PRs, rebase to +pick up the fixes. Reference the fixing PR when noting that a rebase is needed. + +### Before Merge + +Self-review your diff before requesting review. Catch obvious issues yourself +rather than burning reviewer cycles. + +Do not add `Signed-off-by` lines automatically—these require explicit human +action after review. If code was AI-assisted, include an `Assisted-by:` trailer +indicating the tool and model used. + + +## Architecture and Design + +### Workarounds vs Proper Fixes + +When implementing a workaround, document where the proper fix belongs and link +to relevant upstream issues. Invest time investigating proper fixes before +settling on workarounds. + +### Cross-Project Considerations + +Prefer pushing fixes upstream when the root cause is in a dependency. Reduce +scope where possible; don't reimplement functionality that belongs elsewhere. + +When multiple systems interact (like Renovate and custom sync tooling), be +explicit about which system owns what and how they coordinate. + +### Avoiding Regressions + +Verify that new code paths handle all cases the old code handled. When rewriting +functionality, ensure equivalent coverage exists. + +### Review Requirements + +When multiple contributors co-author a PR, bring in an independent reviewer. + +## Rust-Specific Guidance + +Prefer rustix over `libc`. All `unsafe` code must be very carefully +justified. + +### Dependencies + +New dependencies should be justified. Glance at existing reverse dependencies +on crates.io to see if a crate is widely used. Consider alternatives: "I'm +curious if you did any comparative analysis at all with alternatives?" + +Prefer well-maintained crates with active communities. Consider `cargo deny` +policies when adding dependencies. + +### API Design + +When adding new commands or options, think about machine-readable output early. +JSON is generally preferred for that. + +Keep helper functions in appropriate modules. Move command output formatting +close to the CLI layer, keeping core logic functions focused on their primary +purpose. diff --git a/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/message.go b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/message.go new file mode 100644 index 0000000000..b4dcde7e83 --- /dev/null +++ b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/message.go @@ -0,0 +1,216 @@ +// Package fdpass implements JSON-RPC 2.0 with file descriptor passing over Unix domain sockets. +package fdpass + +import ( + "encoding/json" + "os" +) + +// JSONRPCVersion is the JSON-RPC protocol version. +const JSONRPCVersion = "2.0" + +// FDsKey is the JSON key for the file descriptor count field. +const FDsKey = "fds" + +// FileDescriptorErrorCode is the error code for FD-related protocol errors. +const FileDescriptorErrorCode = -32050 + +// Request represents a JSON-RPC 2.0 request. +type Request struct { + JsonRpc string `json:"jsonrpc"` + Method string `json:"method"` + Params interface{} `json:"params,omitempty"` + ID interface{} `json:"id"` + // Fds is the number of file descriptors attached to this message. + Fds *int `json:"fds,omitempty"` +} + +// Response represents a JSON-RPC 2.0 response. +type Response struct { + JsonRpc string `json:"jsonrpc"` + Result interface{} `json:"result,omitempty"` + Error *Error `json:"error,omitempty"` + ID interface{} `json:"id"` + // Fds is the number of file descriptors attached to this message. + Fds *int `json:"fds,omitempty"` +} + +// Notification represents a JSON-RPC 2.0 notification (a request without an ID). +type Notification struct { + JsonRpc string `json:"jsonrpc"` + Method string `json:"method"` + Params interface{} `json:"params,omitempty"` + // Fds is the number of file descriptors attached to this message. + Fds *int `json:"fds,omitempty"` +} + +// Error represents a JSON-RPC 2.0 error object. +type Error struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} + +func (e *Error) Error() string { + return e.Message +} + +// MessageWithFds wraps a JSON-RPC message with associated file descriptors. +type MessageWithFds struct { + // Message is the JSON-RPC message (Request, Response, or Notification). + Message interface{} + // FileDescriptors are the file descriptors to pass with this message. + // The order corresponds to indices 0..N-1 matching the message's fds count. + FileDescriptors []*os.File +} + +// NewRequest creates a new JSON-RPC request. +func NewRequest(method string, params interface{}, id interface{}) *Request { + return &Request{ + JsonRpc: JSONRPCVersion, + Method: method, + Params: params, + ID: id, + } +} + +// NewResponse creates a new successful JSON-RPC response. +func NewResponse(result interface{}, id interface{}) *Response { + return &Response{ + JsonRpc: JSONRPCVersion, + Result: result, + ID: id, + } +} + +// NewErrorResponse creates a new error JSON-RPC response. +func NewErrorResponse(err *Error, id interface{}) *Response { + return &Response{ + JsonRpc: JSONRPCVersion, + Error: err, + ID: id, + } +} + +// NewNotification creates a new JSON-RPC notification. +func NewNotification(method string, params interface{}) *Notification { + return &Notification{ + JsonRpc: JSONRPCVersion, + Method: method, + Params: params, + } +} + +// GetFDCount reads the file descriptor count from a JSON value. +// Returns 0 if the `fds` field is absent or not a valid number. +func GetFDCount(value map[string]interface{}) int { + if fds, ok := value[FDsKey]; ok { + switch v := fds.(type) { + case float64: + return int(v) + case int: + return v + } + } + return 0 +} + +// FileDescriptorError creates a standard FD error for protocol violations. +func FileDescriptorError() *Error { + return &Error{ + Code: FileDescriptorErrorCode, + Message: "File Descriptor Error", + } +} + +// SetFDs sets the fds count on a Request. +func (r *Request) SetFDs(count int) { + if count > 0 { + r.Fds = &count + } else { + r.Fds = nil + } +} + +// GetFDs returns the fds count from a Request. +func (r *Request) GetFDs() int { + if r.Fds != nil { + return *r.Fds + } + return 0 +} + +// SetFDs sets the fds count on a Response. +func (r *Response) SetFDs(count int) { + if count > 0 { + r.Fds = &count + } else { + r.Fds = nil + } +} + +// GetFDs returns the fds count from a Response. +func (r *Response) GetFDs() int { + if r.Fds != nil { + return *r.Fds + } + return 0 +} + +// SetFDs sets the fds count on a Notification. +func (n *Notification) SetFDs(count int) { + if count > 0 { + n.Fds = &count + } else { + n.Fds = nil + } +} + +// GetFDs returns the fds count from a Notification. +func (n *Notification) GetFDs() int { + if n.Fds != nil { + return *n.Fds + } + return 0 +} + +// ParseMessage parses a raw JSON message into the appropriate type. +// It returns one of *Request, *Response, or *Notification. +func ParseMessage(data []byte) (interface{}, error) { + // First parse as a generic map to determine type + var raw map[string]json.RawMessage + if err := json.Unmarshal(data, &raw); err != nil { + return nil, err + } + + // Determine message type based on fields present + _, hasMethod := raw["method"] + _, hasID := raw["id"] + _, hasResult := raw["result"] + _, hasError := raw["error"] + + if hasMethod && hasID { + // Request + var req Request + if err := json.Unmarshal(data, &req); err != nil { + return nil, err + } + return &req, nil + } else if hasResult || hasError { + // Response + var resp Response + if err := json.Unmarshal(data, &resp); err != nil { + return nil, err + } + return &resp, nil + } else if hasMethod { + // Notification + var notif Notification + if err := json.Unmarshal(data, ¬if); err != nil { + return nil, err + } + return ¬if, nil + } + + return nil, &Error{Code: -32600, Message: "Invalid JSON-RPC message"} +} diff --git a/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/renovate.json b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/renovate.json new file mode 100644 index 0000000000..3e4b91a3a5 --- /dev/null +++ b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/renovate.json @@ -0,0 +1,6 @@ +{ + "$schema": "https://docs.renovatebot.com/renovate-schema.json", + "extends": [ + "local>bootc-dev/infra:renovate-shared-config.json" + ] +} diff --git a/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/transport.go b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/transport.go new file mode 100644 index 0000000000..c2239a9173 --- /dev/null +++ b/vendor/github.com/bootc-dev/jsonrpc-fdpass-go/transport.go @@ -0,0 +1,381 @@ +package fdpass + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "sync" + + "golang.org/x/sys/unix" +) + +const ( + // DefaultMaxFDsPerSendmsg is the default maximum number of file descriptors + // per sendmsg() call. Platform limits for SCM_RIGHTS vary (e.g., ~253 on + // Linux, ~512 on macOS). We start with an optimistic value; if sendmsg() + // fails with EINVAL, the batch size is automatically reduced and retried. + DefaultMaxFDsPerSendmsg = 500 + + // MaxFDsPerRecvmsg is the maximum number of FDs to expect in a single + // recvmsg() call. Must be at least as large as the largest platform limit. + MaxFDsPerRecvmsg = 512 + + // ReadBufferSize is the size of the read buffer. + ReadBufferSize = 4096 +) + +var ( + // ErrConnectionClosed is returned when the connection is closed. + ErrConnectionClosed = errors.New("connection closed") + // ErrFramingError is returned when JSON parsing fails. + ErrFramingError = errors.New("framing error: invalid JSON") + // ErrMismatchedCount is returned when the number of FDs doesn't match the fds field. + ErrMismatchedCount = errors.New("mismatched file descriptor count") +) + +// Sender sends JSON-RPC messages with file descriptors over a Unix socket. +type Sender struct { + conn *net.UnixConn + mu sync.Mutex + maxFDsPerSendmsg int +} + +// NewSender creates a new Sender for the given Unix connection. +func NewSender(conn *net.UnixConn) *Sender { + return &Sender{ + conn: conn, + maxFDsPerSendmsg: DefaultMaxFDsPerSendmsg, + } +} + +// SetMaxFDsPerSendmsg sets the maximum number of file descriptors to send per +// sendmsg() call. This is primarily useful for testing FD batching behavior. +// The value must be at least 1. +func (s *Sender) SetMaxFDsPerSendmsg(max int) { + if max < 1 { + max = 1 + } + s.maxFDsPerSendmsg = max +} + +// Send sends a message with optional file descriptors. +func (s *Sender) Send(msg *MessageWithFds) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Set the fds field on the message based on the number of file descriptors + fdCount := len(msg.FileDescriptors) + switch m := msg.Message.(type) { + case *Request: + m.SetFDs(fdCount) + case *Response: + m.SetFDs(fdCount) + case *Notification: + m.SetFDs(fdCount) + } + + // Serialize the message with the fds field set + msgData, err := json.Marshal(msg.Message) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + // Get the raw file descriptor for the socket + rawConn, err := s.conn.SyscallConn() + if err != nil { + return fmt.Errorf("failed to get syscall conn: %w", err) + } + + var sendErr error + err = rawConn.Control(func(fd uintptr) { + sendErr = s.sendWithFDs(int(fd), msgData, msg.FileDescriptors) + }) + if err != nil { + return err + } + return sendErr +} + +func (s *Sender) sendWithFDs(sockfd int, data []byte, files []*os.File) error { + // Extract raw FD ints + allFDs := make([]int, len(files)) + for i, f := range files { + allFDs[i] = int(f.Fd()) + } + + bytesSent := 0 + fdsSent := 0 + currentMaxFDs := s.maxFDsPerSendmsg + + // Send data and FDs in batches. Each sendmsg can only handle a limited + // number of FDs. After all data bytes are sent, remaining FDs are sent + // with whitespace padding bytes per the protocol spec (Section 4.1). + for bytesSent < len(data) || fdsSent < len(allFDs) { + remainingData := data[bytesSent:] + remainingFDs := allFDs[fdsSent:] + + // Determine how many FDs to send in this batch + fdBatchSize := len(remainingFDs) + if fdBatchSize > currentMaxFDs { + fdBatchSize = currentMaxFDs + } + fdBatch := remainingFDs[:fdBatchSize] + + var n int + var err error + + if len(fdBatch) > 0 { + // Send with FDs using sendmsg with ancillary data + rights := unix.UnixRights(fdBatch...) + + var payload []byte + if len(remainingData) > 0 { + payload = remainingData + } else { + // All data bytes already sent; send a whitespace padding byte. + // The receiver's JSON parser ignores inter-message whitespace + // per RFC 8259. This is required because some systems need + // non-empty data for ancillary data delivery. + payload = []byte{' '} + } + + n, err = unix.SendmsgN(sockfd, payload, rights, nil, 0) + if err != nil { + // EINVAL with multiple FDs likely means we exceeded the + // kernel's SCM_MAX_FD limit. Halve the batch size and retry. + if errors.Is(err, unix.EINVAL) && fdBatchSize > 1 { + currentMaxFDs = fdBatchSize / 2 + continue + } + return fmt.Errorf("sendmsg failed: %w", err) + } + fdsSent += fdBatchSize + + // Only count actual data bytes, not the padding byte + if len(remainingData) > 0 { + bytesSent += n + } + } else if len(remainingData) > 0 { + // No FDs left, just send remaining data bytes + n, err = unix.Write(sockfd, remainingData) + if err != nil { + return fmt.Errorf("write failed: %w", err) + } + bytesSent += n + } + } + + // If we discovered a lower limit, remember it for future sends + if currentMaxFDs < s.maxFDsPerSendmsg { + s.maxFDsPerSendmsg = currentMaxFDs + } + + return nil +} + +// Receiver receives JSON-RPC messages with file descriptors from a Unix socket. +type Receiver struct { + conn *net.UnixConn + buffer []byte + fdQueue []*os.File + mu sync.Mutex +} + +// NewReceiver creates a new Receiver for the given Unix connection. +func NewReceiver(conn *net.UnixConn) *Receiver { + return &Receiver{ + conn: conn, + buffer: make([]byte, 0), + fdQueue: make([]*os.File, 0), + } +} + +// Receive receives the next message with its file descriptors. +func (r *Receiver) Receive() (*MessageWithFds, error) { + r.mu.Lock() + defer r.mu.Unlock() + + for { + // Try to parse a complete message from the buffer + result, err := r.tryParseMessage() + if err != nil { + return nil, err + } + if result.msg != nil { + return result.msg, nil + } + + // Need more data — either incomplete JSON or waiting for + // batched FDs from continuation sendmsg() calls. + if err := r.readMoreData(); err != nil { + // If we had a parsed message waiting for FDs and the + // connection closed, that's a mismatched count error. + if result.needFDs && errors.Is(err, ErrConnectionClosed) { + return nil, fmt.Errorf("%w: connection closed while waiting for batched FDs", ErrMismatchedCount) + } + return nil, err + } + } +} + +// tryParseResult is used internally to communicate between tryParseMessage +// and Receive about whether more FDs are needed from batched sendmsg calls. +type tryParseResult struct { + msg *MessageWithFds + needFDs bool // true when message is parsed but FD queue is short +} + +func (r *Receiver) tryParseMessage() (*tryParseResult, error) { + if len(r.buffer) == 0 { + return &tryParseResult{}, nil + } + + // Use streaming JSON decoder to find message boundaries + decoder := json.NewDecoder(bytes.NewReader(r.buffer)) + var value map[string]interface{} + + err := decoder.Decode(&value) + if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) { + // Incomplete JSON - need more data + return &tryParseResult{}, nil + } + if err != nil { + // Actual parse error - framing error + return nil, fmt.Errorf("%w: %v", ErrFramingError, err) + } + + // Successfully parsed a complete JSON value + // Use InputOffset to find consumed bytes (Go 1.21+) + bytesConsumed := decoder.InputOffset() + + // Extract the consumed bytes for re-parsing + consumedData := make([]byte, bytesConsumed) + copy(consumedData, r.buffer[:bytesConsumed]) + + // Read the fds count from the message + fdCount := GetFDCount(value) + + // Check we have enough FDs. When FD batching is in use, the sender + // sends continuation sendmsg() calls with whitespace padding and more + // FDs. We need to read more data to collect them. + if fdCount > len(r.fdQueue) { + return &tryParseResult{needFDs: true}, nil + } + + // Remove consumed bytes from buffer + r.buffer = r.buffer[bytesConsumed:] + + // Dequeue FDs + fds := make([]*os.File, fdCount) + copy(fds, r.fdQueue[:fdCount]) + r.fdQueue = r.fdQueue[fdCount:] + + // Parse the message into the appropriate type + msg, err := ParseMessage(consumedData) + if err != nil { + return nil, err + } + + return &tryParseResult{ + msg: &MessageWithFds{ + Message: msg, + FileDescriptors: fds, + }, + }, nil +} + +func (r *Receiver) readMoreData() error { + rawConn, err := r.conn.SyscallConn() + if err != nil { + return fmt.Errorf("failed to get syscall conn: %w", err) + } + + var readErr error + var bytesRead int + var receivedFDs []*os.File + + err = rawConn.Read(func(fd uintptr) bool { + bytesRead, receivedFDs, readErr = r.recvWithFDs(int(fd)) + // Return true to indicate we're done with this read operation + // Return false only if we get EAGAIN/EWOULDBLOCK + if readErr != nil { + if errors.Is(readErr, unix.EAGAIN) || errors.Is(readErr, unix.EWOULDBLOCK) { + readErr = nil + return false // Tell runtime to wait and retry + } + } + return true + }) + + if err != nil { + return err + } + if readErr != nil { + return readErr + } + + if bytesRead == 0 && len(receivedFDs) == 0 { + return ErrConnectionClosed + } + + // Append received FDs to queue + r.fdQueue = append(r.fdQueue, receivedFDs...) + + return nil +} + +func (r *Receiver) recvWithFDs(sockfd int) (int, []*os.File, error) { + buf := make([]byte, ReadBufferSize) + // Allocate space for control message (for up to MaxFDsPerRecvmsg FDs) + // Each FD is 4 bytes (int32), use CmsgSpace to get properly aligned size + oob := make([]byte, unix.CmsgSpace(MaxFDsPerRecvmsg*4)) + + n, oobn, _, _, err := unix.Recvmsg(sockfd, buf, oob, unix.MSG_CMSG_CLOEXEC) + if err != nil { + return 0, nil, err + } + + // Append data to buffer + if n > 0 { + r.buffer = append(r.buffer, buf[:n]...) + } + + // Parse control messages for FDs + var files []*os.File + if oobn > 0 { + scms, err := unix.ParseSocketControlMessage(oob[:oobn]) + if err != nil { + return n, nil, fmt.Errorf("failed to parse control message: %w", err) + } + + for _, scm := range scms { + fds, err := unix.ParseUnixRights(&scm) + if err != nil { + continue + } + for _, fd := range fds { + files = append(files, os.NewFile(uintptr(fd), "")) + } + } + } + + return n, files, nil +} + +// Close closes the receiver and any pending file descriptors in the queue. +func (r *Receiver) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + // Close any FDs remaining in the queue to prevent leaks + for _, f := range r.fdQueue { + f.Close() + } + r.fdQueue = nil + + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index d35efa67ea..1c170fedee 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -30,6 +30,9 @@ github.com/VividCortex/ewma # github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d ## explicit github.com/acarl005/stripansi +# github.com/bootc-dev/jsonrpc-fdpass-go v0.0.0-20260407114003-45e4e8bb1fde +## explicit; go 1.21 +github.com/bootc-dev/jsonrpc-fdpass-go # github.com/cespare/xxhash/v2 v2.3.0 ## explicit; go 1.11 github.com/cespare/xxhash/v2 From d50db34b71329645dff7fd1eaf8196d69bc823eb Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 11 Feb 2026 10:51:18 +0100 Subject: [PATCH 04/12] storage/pkg/splitfdstream: new package Signed-off-by: Giuseppe Scrivano --- storage/pkg/splitfdstream/server_linux.go | 550 ++++++++++++++++++ .../pkg/splitfdstream/server_unsupported.go | 64 ++ storage/pkg/splitfdstream/types.go | 180 ++++++ storage/pkg/splitfdstream/types_test.go | 361 ++++++++++++ 4 files changed, 1155 insertions(+) create mode 100644 storage/pkg/splitfdstream/server_linux.go create mode 100644 storage/pkg/splitfdstream/server_unsupported.go create mode 100644 storage/pkg/splitfdstream/types.go create mode 100644 storage/pkg/splitfdstream/types_test.go diff --git a/storage/pkg/splitfdstream/server_linux.go b/storage/pkg/splitfdstream/server_linux.go new file mode 100644 index 0000000000..55c15c1e72 --- /dev/null +++ b/storage/pkg/splitfdstream/server_linux.go @@ -0,0 +1,550 @@ +//go:build linux + +package splitfdstream + +import ( + "errors" + "fmt" + "io" + "net" + "os" + "runtime" + "sync" + + fdpass "github.com/bootc-dev/jsonrpc-fdpass-go" + "golang.org/x/sys/unix" +) + +// JSON-RPC 2.0 standard error codes as documented here: https://www.jsonrpc.org/specification +const ( + jsonrpcInvalidRequest = -32600 + jsonrpcMethodNotFound = -32601 + jsonrpcInvalidParams = -32602 + jsonrpcServerError = -32000 +) + +// sendRetry retries sender.Send on EAGAIN (non-blocking socket buffer full). +func sendRetry(sender *fdpass.Sender, msg *fdpass.MessageWithFds) error { + for { + err := sender.Send(msg) + if err == nil { + return nil + } + if errors.Is(err, unix.EAGAIN) || errors.Is(err, unix.EWOULDBLOCK) { + runtime.Gosched() + continue + } + return err + } +} + +// JSONRPCServer manages a JSON-RPC server using the external library. +type JSONRPCServer struct { + driverFunc DriverFunc + store Store + listener net.Listener + running bool + mu sync.RWMutex + shutdown chan struct{} + connections sync.WaitGroup +} + +// NewJSONRPCServer creates a new JSON-RPC server. +func NewJSONRPCServer(driverFunc DriverFunc, store Store) *JSONRPCServer { + return &JSONRPCServer{ + driverFunc: driverFunc, + store: store, + shutdown: make(chan struct{}), + } +} + +// Start starts the JSON-RPC server listening on the given Unix socket. +func (s *JSONRPCServer) Start(socketPath string) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.running { + return fmt.Errorf("server already running") + } + + os.Remove(socketPath) + + listener, err := net.Listen("unix", socketPath) + if err != nil { + return fmt.Errorf("failed to listen on %s: %w", socketPath, err) + } + + s.listener = listener + s.running = true + + go s.acceptConnections() + + return nil +} + +// Stop stops the JSON-RPC server. +func (s *JSONRPCServer) Stop() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.running { + return nil + } + + close(s.shutdown) + if s.listener != nil { + s.listener.Close() + } + s.connections.Wait() + s.running = false + + return nil +} + +func (s *JSONRPCServer) acceptConnections() { + for { + conn, err := s.listener.Accept() + if err != nil { + select { + case <-s.shutdown: + return + default: + continue + } + } + + unixConn, ok := conn.(*net.UnixConn) + if !ok { + conn.Close() + continue + } + + s.TrackConnection() + go s.HandleConnection(unixConn) + } +} + +// TrackConnection increments the connection counter. Call this before +// spawning a goroutine that calls HandleConnection so that Stop()'s +// Wait() cannot return before the handler has started. +func (s *JSONRPCServer) TrackConnection() { + s.connections.Add(1) +} + +// HandleConnection handles a single client connection. +// The caller must call TrackConnection before spawning HandleConnection +// in a goroutine. +func (s *JSONRPCServer) HandleConnection(conn *net.UnixConn) { + defer s.connections.Done() + defer conn.Close() + + receiver := fdpass.NewReceiver(conn) + sender := fdpass.NewSender(conn) + defer receiver.Close() + + for { + select { + case <-s.shutdown: + return + default: + } + + msgWithFds, err := receiver.Receive() + if err != nil { + return + } + + req, ok := msgWithFds.Message.(*fdpass.Request) + if !ok { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcInvalidRequest, Message: "Invalid Request"}, + nil, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + return + } + continue + } + + s.handleRequest(sender, req, msgWithFds.FileDescriptors) + } +} + +func (s *JSONRPCServer) handleRequest(sender *fdpass.Sender, req *fdpass.Request, fds []*os.File) { + defer func() { + for _, f := range fds { + f.Close() + } + }() + + switch req.Method { + case "GetSplitFDStream": + s.handleGetSplitFDStream(sender, req) + case "GetImage": + s.handleGetImage(sender, req) + default: + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcMethodNotFound, Message: fmt.Sprintf("method %q not found", req.Method)}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending method-not-found response: %v\n", err) + } + } +} + +func (s *JSONRPCServer) handleGetSplitFDStream(sender *fdpass.Sender, req *fdpass.Request) { + params, ok := req.Params.(map[string]interface{}) + if !ok { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcInvalidParams, Message: "params must be an object"}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + layerID, _ := params["layerId"].(string) + if layerID == "" { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcInvalidParams, Message: "layerId is required"}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + parentID, _ := params["parentId"].(string) + + driver, release, err := s.driverFunc() + if err != nil { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: fmt.Sprintf("failed to acquire driver: %v", err)}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + stream, fileFDs, err := driver.GetSplitFDStream(layerID, parentID, &GetSplitFDStreamOpts{}) + release() + if err != nil { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: err.Error()}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + // The stream must be backed by an *os.File (e.g. a memfd) so it can + // be sent over the socket as a file descriptor. + streamFile, ok := stream.(*os.File) + if !ok { + stream.Close() + for _, f := range fileFDs { + f.Close() + } + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: "stream is not backed by a file descriptor"}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + info, err := streamFile.Stat() + if err != nil { + streamFile.Close() + for _, f := range fileFDs { + f.Close() + } + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: fmt.Sprintf("failed to stat stream: %v", err)}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + streamSize := info.Size() + + // allFDs[0] = stream memfd, allFDs[1:] = content file descriptors. + // The library automatically batches FDs across multiple sendmsg() + // calls with whitespace padding per the jsonrpc-fdpass protocol. + allFDs := make([]*os.File, 0, 1+len(fileFDs)) + allFDs = append(allFDs, streamFile) + allFDs = append(allFDs, fileFDs...) + + result := map[string]interface{}{ + "streamSize": streamSize, + } + + resp := fdpass.NewResponse(result, req.ID) + if err := sendRetry(sender, &fdpass.MessageWithFds{ + Message: resp, + FileDescriptors: allFDs, + }); err != nil { + fmt.Fprintf(os.Stderr, "error sending response: %v\n", err) + } + // sendmsg duplicates FDs into the kernel — close our copies + // whether the send succeeded or failed. + for _, f := range allFDs { + f.Close() + } +} + +func (s *JSONRPCServer) handleGetImage(sender *fdpass.Sender, req *fdpass.Request) { + params, ok := req.Params.(map[string]interface{}) + if !ok { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcInvalidParams, Message: "params must be an object"}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + imageID, _ := params["imageId"].(string) + if imageID == "" { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcInvalidParams, Message: "imageId is required"}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + if s.store == nil { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: "store not available for image operations"}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + metadata, err := GetImageMetadata(s.store, imageID) + if err != nil { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: fmt.Sprintf("failed to get image metadata: %v", err)}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + result := map[string]interface{}{ + "manifest": string(metadata.ManifestJSON), + "config": string(metadata.ConfigJSON), + "layerDigests": metadata.LayerDigests, + } + + resp := fdpass.NewResponse(result, req.ID) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending image response: %v\n", err) + return + } +} + +// JSONRPCClient implements a JSON-RPC client. +type JSONRPCClient struct { + conn *net.UnixConn + sender *fdpass.Sender + receiver *fdpass.Receiver + mu sync.Mutex + nextID int64 +} + +// NewJSONRPCClient connects to a JSON-RPC server on the given Unix socket. +func NewJSONRPCClient(socketPath string) (*JSONRPCClient, error) { + conn, err := net.Dial("unix", socketPath) + if err != nil { + return nil, fmt.Errorf("failed to connect to socket: %w", err) + } + + unixConn, ok := conn.(*net.UnixConn) + if !ok { + conn.Close() + return nil, fmt.Errorf("connection is not a unix socket") + } + + return &JSONRPCClient{ + conn: unixConn, + sender: fdpass.NewSender(unixConn), + receiver: fdpass.NewReceiver(unixConn), + nextID: 1, + }, nil +} + +// Close closes the client connection. +func (c *JSONRPCClient) Close() error { + if c.receiver != nil { + c.receiver.Close() + } + if c.conn != nil { + return c.conn.Close() + } + return nil +} + +// GetSplitFDStream sends a GetSplitFDStream request and returns the response. +func (c *JSONRPCClient) GetSplitFDStream(layerID, parentID string) ([]byte, []*os.File, error) { + c.mu.Lock() + id := c.nextID + c.nextID++ + c.mu.Unlock() + + req := fdpass.NewRequest("GetSplitFDStream", map[string]interface{}{ + "layerId": layerID, + "parentId": parentID, + }, id) + + if err := sendRetry(c.sender, &fdpass.MessageWithFds{Message: req}); err != nil { + return nil, nil, fmt.Errorf("failed to send request: %w", err) + } + + // The library handles FD batching transparently — all FDs arrive + // with the single response via batched sendmsg()/recvmsg() calls. + respMsg, err := c.receiver.Receive() + if err != nil { + return nil, nil, fmt.Errorf("failed to receive response: %w", err) + } + + resp, ok := respMsg.Message.(*fdpass.Response) + if !ok { + return nil, nil, fmt.Errorf("unexpected response type: %T", respMsg.Message) + } + + if resp.Error != nil { + return nil, nil, fmt.Errorf("server error: %s", resp.Error.Message) + } + + allFDs := respMsg.FileDescriptors + if len(allFDs) == 0 { + return nil, nil, fmt.Errorf("no file descriptors received") + } + + // allFDs[0] is a memfd containing the stream data, the rest are content FDs + streamFile := allFDs[0] + contentFDs := allFDs[1:] + + streamData, err := io.ReadAll(streamFile) + streamFile.Close() + if err != nil { + for _, f := range contentFDs { + f.Close() + } + return nil, nil, fmt.Errorf("failed to read stream data from fd: %w", err) + } + + return streamData, contentFDs, nil +} + +// GetImage sends a GetImage request and returns image metadata. +func (c *JSONRPCClient) GetImage(imageID string) (*ImageMetadata, error) { + c.mu.Lock() + id := c.nextID + c.nextID++ + c.mu.Unlock() + + req := fdpass.NewRequest("GetImage", map[string]interface{}{ + "imageId": imageID, + }, id) + + if err := sendRetry(c.sender, &fdpass.MessageWithFds{Message: req}); err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + + respMsg, err := c.receiver.Receive() + if err != nil { + return nil, fmt.Errorf("failed to receive response: %w", err) + } + + resp, ok := respMsg.Message.(*fdpass.Response) + if !ok { + return nil, fmt.Errorf("unexpected response type: %T", respMsg.Message) + } + + if resp.Error != nil { + return nil, fmt.Errorf("server error: %s", resp.Error.Message) + } + + result, ok := resp.Result.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("unexpected result type: %T", resp.Result) + } + + manifestJSON, _ := result["manifest"].(string) + configJSON, _ := result["config"].(string) + + layerDigestsInterface, _ := result["layerDigests"].([]interface{}) + layerDigests := make([]string, len(layerDigestsInterface)) + for i, v := range layerDigestsInterface { + layerDigests[i], _ = v.(string) + } + + return &ImageMetadata{ + ManifestJSON: []byte(manifestJSON), + ConfigJSON: []byte(configJSON), + LayerDigests: layerDigests, + }, nil +} + +// CreateSocketPair creates a pair of connected UNIX sockets. +func CreateSocketPair() (*net.UnixConn, *net.UnixConn, error) { + fds, err := unix.Socketpair(unix.AF_UNIX, unix.SOCK_STREAM, 0) + if err != nil { + return nil, nil, fmt.Errorf("failed to create socket pair: %w", err) + } + + clientFile := os.NewFile(uintptr(fds[0]), "client") + defer clientFile.Close() + serverFile := os.NewFile(uintptr(fds[1]), "server") + defer serverFile.Close() + + clientConn, err := net.FileConn(clientFile) + if err != nil { + return nil, nil, fmt.Errorf("failed to create client connection: %w", err) + } + + serverConn, err := net.FileConn(serverFile) + if err != nil { + clientConn.Close() + return nil, nil, fmt.Errorf("failed to create server connection: %w", err) + } + + clientUnix, ok := clientConn.(*net.UnixConn) + if !ok { + clientConn.Close() + serverConn.Close() + return nil, nil, fmt.Errorf("failed to cast client to UnixConn") + } + + serverUnix, ok := serverConn.(*net.UnixConn) + if !ok { + clientConn.Close() + serverConn.Close() + return nil, nil, fmt.Errorf("failed to cast server to UnixConn") + } + + return clientUnix, serverUnix, nil +} diff --git a/storage/pkg/splitfdstream/server_unsupported.go b/storage/pkg/splitfdstream/server_unsupported.go new file mode 100644 index 0000000000..36eb13576c --- /dev/null +++ b/storage/pkg/splitfdstream/server_unsupported.go @@ -0,0 +1,64 @@ +//go:build !linux + +package splitfdstream + +import ( + "fmt" + "net" + "os" +) + +// JSONRPCServer is not supported on this platform. +type JSONRPCServer struct{} + +// NewJSONRPCServer creates a new JSON-RPC server stub for unsupported platforms. +func NewJSONRPCServer(driverFunc DriverFunc, store Store) *JSONRPCServer { + return &JSONRPCServer{} +} + +// TrackConnection is not supported on this platform. +func (s *JSONRPCServer) TrackConnection() { +} + +// HandleConnection is not supported on this platform. +func (s *JSONRPCServer) HandleConnection(conn *net.UnixConn) { + panic("JSONRPCServer is not supported on this platform") +} + +// Start is not supported on this platform. +func (s *JSONRPCServer) Start(socketPath string) error { + return fmt.Errorf("JSONRPCServer is not supported on this platform") +} + +// Stop is not supported on this platform. +func (s *JSONRPCServer) Stop() error { + return fmt.Errorf("JSONRPCServer is not supported on this platform") +} + +// JSONRPCClient is not supported on this platform. +type JSONRPCClient struct{} + +// NewJSONRPCClient creates a new JSON-RPC client stub for unsupported platforms. +func NewJSONRPCClient(socketPath string) (*JSONRPCClient, error) { + return nil, fmt.Errorf("JSONRPCClient is not supported on this platform") +} + +// Close is not supported on this platform. +func (c *JSONRPCClient) Close() error { + return fmt.Errorf("JSONRPCClient is not supported on this platform") +} + +// GetSplitFDStream is not supported on this platform. +func (c *JSONRPCClient) GetSplitFDStream(layerID, parentID string) ([]byte, []*os.File, error) { + return nil, nil, fmt.Errorf("GetSplitFDStream is not supported on this platform") +} + +// GetImage is not supported on this platform. +func (c *JSONRPCClient) GetImage(imageID string) (*ImageMetadata, error) { + return nil, fmt.Errorf("GetImage is not supported on this platform") +} + +// CreateSocketPair is not supported on this platform. +func CreateSocketPair() (*net.UnixConn, *net.UnixConn, error) { + return nil, nil, fmt.Errorf("CreateSocketPair is not supported on this platform") +} diff --git a/storage/pkg/splitfdstream/types.go b/storage/pkg/splitfdstream/types.go new file mode 100644 index 0000000000..9e9cb092e4 --- /dev/null +++ b/storage/pkg/splitfdstream/types.go @@ -0,0 +1,180 @@ +package splitfdstream + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "io" + "os" + + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "go.podman.io/storage/pkg/idtools" +) + +const ( + // manifestBigDataKey is the BigData key under which the image manifest + // is stored. Matches storage.ImageDigestBigDataKey. + manifestBigDataKey = "manifest" +) + +// Store represents the minimal interface needed for image metadata access. +type Store interface { + ImageBigData(id, key string) ([]byte, error) + ResolveImageID(id string) (actualID string, topLayerID string, err error) + LayerParent(id string) (parentID string, err error) +} + +// SplitFDStreamDriver defines the interface that storage drivers must implement +// to support splitfdstream operations. +type SplitFDStreamDriver interface { + // GetSplitFDStream generates a splitfdstream for a layer. + GetSplitFDStream(id, parent string, options *GetSplitFDStreamOpts) (io.ReadCloser, []*os.File, error) +} + +// DriverFunc acquires a SplitFDStreamDriver (and any associated lock) +// and returns a release function that the caller must invoke when done. +// This allows per-request locking so that the graph driver is held only +// for the duration of each operation. +type DriverFunc func() (SplitFDStreamDriver, func(), error) + +// ImageMetadata holds manifest and config data for an OCI image. +type ImageMetadata struct { + ManifestJSON []byte `json:"manifest"` + ConfigJSON []byte `json:"config"` + LayerDigests []string `json:"layerDigests"` +} + +// findManifest retrieves the image manifest via store.ImageBigData +// using the well-known "manifest" key. We use this thin wrapper +// (rather than importing containers/image) to avoid pulling a heavy +// dependency into the storage layer. The image in the store is +// already platform-resolved, so no key scanning is needed. +func findManifest(store Store, imageID string) ([]byte, error) { + data, err := store.ImageBigData(imageID, manifestBigDataKey) + if err != nil { + return nil, fmt.Errorf("no manifest found for image %s: %w", imageID, err) + } + return data, nil +} + +// findConfig retrieves the image config via store.ImageBigData using +// the config digest from the manifest (e.g. "sha256:abc..."). +func findConfig(store Store, imageID string, manifest *v1.Manifest) ([]byte, error) { + configDigest := manifest.Config.Digest.String() + data, err := store.ImageBigData(imageID, configDigest) + if err != nil { + return nil, fmt.Errorf("config %s not found for image %s: %w", configDigest, imageID, err) + } + return data, nil +} + +// GetImageMetadata retrieves manifest, config, and layer information for an image. +func GetImageMetadata(store Store, imageID string) (*ImageMetadata, error) { + actualID, topLayerID, err := store.ResolveImageID(imageID) + if err != nil { + return nil, fmt.Errorf("failed to resolve image %s: %w", imageID, err) + } + + manifestJSON, err := findManifest(store, actualID) + if err != nil { + return nil, fmt.Errorf("failed to get manifest for %s (resolved to %s): %w", imageID, actualID, err) + } + + var manifest v1.Manifest + if err := json.Unmarshal(manifestJSON, &manifest); err != nil { + return nil, fmt.Errorf("failed to parse manifest: %w", err) + } + + configJSON, err := findConfig(store, actualID, &manifest) + if err != nil { + return nil, fmt.Errorf("failed to get config for %s (resolved to %s): %w", imageID, actualID, err) + } + + // Walk the layer chain using store.LayerParent. + // Cap at a generous limit to prevent infinite loops from corrupted storage. + const maxLayerDepth = 4096 + var layerIDs []string + layerID := topLayerID + for layerID != "" { + if len(layerIDs) >= maxLayerDepth { + return nil, fmt.Errorf("layer chain exceeds maximum depth of %d", maxLayerDepth) + } + layerIDs = append(layerIDs, layerID) + parentID, err := store.LayerParent(layerID) + if err != nil { + return nil, fmt.Errorf("failed to get parent of layer %s: %w", layerID, err) + } + layerID = parentID + } + + // Fall back to manifest layer digests if layer chain traversal failed + if len(layerIDs) == 0 { + layerIDs = make([]string, len(manifest.Layers)) + for i, layer := range manifest.Layers { + layerIDs[i] = layer.Digest.String() + } + } + + return &ImageMetadata{ + ManifestJSON: manifestJSON, + ConfigJSON: configJSON, + LayerDigests: layerIDs, + }, nil +} + +// GetSplitFDStreamOpts provides options for GetSplitFDStream operations. +type GetSplitFDStreamOpts struct { + MountLabel string + IDMappings *idtools.IDMappings +} + +// SplitFDStreamWriter writes data in the composefs-rs splitfdstream format. +// The format uses signed 64-bit little-endian prefixes: +// - Negative prefix: abs(prefix) bytes of inline data follow +// - Non-negative prefix: reference to external file descriptor at index prefix +type SplitFDStreamWriter struct { + writer io.Writer +} + +// NewWriter creates a new SplitFDStreamWriter. +func NewWriter(w io.Writer) *SplitFDStreamWriter { + return &SplitFDStreamWriter{writer: w} +} + +// WriteInline writes inline data with a negative prefix indicating the data length. +func (w *SplitFDStreamWriter) WriteInline(data []byte) error { + if len(data) == 0 { + return nil + } + prefix := int64(-len(data)) + if err := binary.Write(w.writer, binary.LittleEndian, prefix); err != nil { + return fmt.Errorf("failed to write inline prefix: %w", err) + } + if _, err := w.writer.Write(data); err != nil { + return fmt.Errorf("failed to write inline data: %w", err) + } + return nil +} + +// InlineWriter writes a negative prefix for size bytes and returns an +// io.Writer that passes data straight through to the underlying stream. +// This lets callers use io.Copy instead of manual chunked loops. +func (w *SplitFDStreamWriter) InlineWriter(size int64) (io.Writer, error) { + if size <= 0 { + return io.Discard, nil + } + prefix := -size + if err := binary.Write(w.writer, binary.LittleEndian, prefix); err != nil { + return nil, fmt.Errorf("failed to write inline prefix: %w", err) + } + return w.writer, nil +} + +// WriteExternal writes a reference to an external file descriptor. +func (w *SplitFDStreamWriter) WriteExternal(fdIndex int) error { + prefix := int64(fdIndex) + if err := binary.Write(w.writer, binary.LittleEndian, prefix); err != nil { + return fmt.Errorf("failed to write external fd reference: %w", err) + } + return nil +} diff --git a/storage/pkg/splitfdstream/types_test.go b/storage/pkg/splitfdstream/types_test.go new file mode 100644 index 0000000000..6bf256fa66 --- /dev/null +++ b/storage/pkg/splitfdstream/types_test.go @@ -0,0 +1,361 @@ +package splitfdstream + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "fmt" + "io" + "strings" + "testing" + + digest "github.com/opencontainers/go-digest" + v1 "github.com/opencontainers/image-spec/specs-go/v1" +) + +func TestWriterInline(t *testing.T) { + var buf bytes.Buffer + w := NewWriter(&buf) + + data := []byte("hello world") + if err := w.WriteInline(data); err != nil { + t.Fatalf("WriteInline: %v", err) + } + + // Expect: int64 prefix (-11) + "hello world" + b := buf.Bytes() + if len(b) != 8+len(data) { + t.Fatalf("expected %d bytes, got %d", 8+len(data), len(b)) + } + + prefix := int64(binary.LittleEndian.Uint64(b[:8])) + if prefix != -int64(len(data)) { + t.Fatalf("expected prefix %d, got %d", -int64(len(data)), prefix) + } + if !bytes.Equal(b[8:], data) { + t.Fatalf("expected data %q, got %q", data, b[8:]) + } +} + +func TestWriterInlineEmpty(t *testing.T) { + var buf bytes.Buffer + w := NewWriter(&buf) + + if err := w.WriteInline(nil); err != nil { + t.Fatalf("WriteInline(nil): %v", err) + } + if err := w.WriteInline([]byte{}); err != nil { + t.Fatalf("WriteInline(empty): %v", err) + } + + if buf.Len() != 0 { + t.Fatalf("expected no output for empty inline, got %d bytes", buf.Len()) + } +} + +func TestWriterExternal(t *testing.T) { + var buf bytes.Buffer + w := NewWriter(&buf) + + if err := w.WriteExternal(42); err != nil { + t.Fatalf("WriteExternal: %v", err) + } + + b := buf.Bytes() + if len(b) != 8 { + t.Fatalf("expected 8 bytes, got %d", len(b)) + } + + prefix := int64(binary.LittleEndian.Uint64(b[:8])) + if prefix != 42 { + t.Fatalf("expected prefix 42, got %d", prefix) + } +} + +func TestWriterMixed(t *testing.T) { + var buf bytes.Buffer + w := NewWriter(&buf) + + // inline "abc" + if err := w.WriteInline([]byte("abc")); err != nil { + t.Fatal(err) + } + // external fd 0 + if err := w.WriteExternal(0); err != nil { + t.Fatal(err) + } + // inline "de" + if err := w.WriteInline([]byte("de")); err != nil { + t.Fatal(err) + } + + b := buf.Bytes() + off := 0 + + // First chunk: prefix -3 + "abc" + p1 := int64(binary.LittleEndian.Uint64(b[off : off+8])) + off += 8 + if p1 != -3 { + t.Fatalf("chunk1 prefix: expected -3, got %d", p1) + } + if string(b[off:off+3]) != "abc" { + t.Fatalf("chunk1 data: expected abc, got %q", b[off:off+3]) + } + off += 3 + + // Second chunk: external fd 0 + p2 := int64(binary.LittleEndian.Uint64(b[off : off+8])) + off += 8 + if p2 != 0 { + t.Fatalf("chunk2 prefix: expected 0, got %d", p2) + } + + // Third chunk: prefix -2 + "de" + p3 := int64(binary.LittleEndian.Uint64(b[off : off+8])) + off += 8 + if p3 != -2 { + t.Fatalf("chunk3 prefix: expected -2, got %d", p3) + } + if string(b[off:off+2]) != "de" { + t.Fatalf("chunk3 data: expected de, got %q", b[off:off+2]) + } + off += 2 + + if off != len(b) { + t.Fatalf("expected %d total bytes, got %d", off, len(b)) + } +} + +func TestInlineWriter(t *testing.T) { + var buf bytes.Buffer + w := NewWriter(&buf) + + data := []byte("test data for inline writer") + iw, err := w.InlineWriter(int64(len(data))) + if err != nil { + t.Fatalf("InlineWriter: %v", err) + } + + n, err := io.Copy(iw, bytes.NewReader(data)) + if err != nil { + t.Fatalf("io.Copy: %v", err) + } + if n != int64(len(data)) { + t.Fatalf("copied %d bytes, expected %d", n, len(data)) + } + + // Verify same binary format as WriteInline + var expected bytes.Buffer + ew := NewWriter(&expected) + if err := ew.WriteInline(data); err != nil { + t.Fatal(err) + } + + if !bytes.Equal(buf.Bytes(), expected.Bytes()) { + t.Fatal("InlineWriter output differs from WriteInline") + } +} + +func TestInlineWriterZeroSize(t *testing.T) { + var buf bytes.Buffer + w := NewWriter(&buf) + + iw, err := w.InlineWriter(0) + if err != nil { + t.Fatalf("InlineWriter(0): %v", err) + } + if iw != io.Discard { + t.Fatal("expected io.Discard for zero size") + } + + iw, err = w.InlineWriter(-5) + if err != nil { + t.Fatalf("InlineWriter(-5): %v", err) + } + if iw != io.Discard { + t.Fatal("expected io.Discard for negative size") + } + + if buf.Len() != 0 { + t.Fatalf("expected no output, got %d bytes", buf.Len()) + } +} + +// mockStore implements the Store interface for testing. +type mockStore struct { + bigData map[string]map[string][]byte // imageID -> key -> data + resolveID map[string][2]string // input -> [actualID, topLayerID] + layerParents map[string]string // layerID -> parentID + parentErr map[string]error // layerID -> error +} + +func (m *mockStore) ImageBigData(id, key string) ([]byte, error) { + if img, ok := m.bigData[id]; ok { + if data, ok := img[key]; ok { + return data, nil + } + } + return nil, fmt.Errorf("big data %s/%s not found", id, key) +} + +func (m *mockStore) ResolveImageID(id string) (string, string, error) { + if r, ok := m.resolveID[id]; ok { + return r[0], r[1], nil + } + return "", "", fmt.Errorf("image %s not found", id) +} + +func (m *mockStore) LayerParent(id string) (string, error) { + if m.parentErr != nil { + if err, ok := m.parentErr[id]; ok { + return "", err + } + } + parent := m.layerParents[id] + return parent, nil +} + +func makeManifest(configDigest string, layerDigests []string) []byte { + layers := make([]v1.Descriptor, len(layerDigests)) + for i, d := range layerDigests { + layers[i] = v1.Descriptor{Digest: digest.Digest(d)} + } + m := v1.Manifest{ + Config: v1.Descriptor{Digest: digest.Digest(configDigest)}, + Layers: layers, + } + b, _ := json.Marshal(m) + return b +} + +func TestGetImageMetadata(t *testing.T) { + manifestJSON := makeManifest("sha256:configabc", []string{"sha256:layer1", "sha256:layer2", "sha256:layer3"}) + configJSON := []byte(`{"architecture":"amd64"}`) + + store := &mockStore{ + resolveID: map[string][2]string{ + "img1": {"actual1", "layer-c"}, + }, + bigData: map[string]map[string][]byte{ + "actual1": { + "manifest": manifestJSON, + "sha256:configabc": configJSON, + }, + }, + layerParents: map[string]string{ + "layer-c": "layer-b", + "layer-b": "layer-a", + "layer-a": "", + }, + } + + meta, err := GetImageMetadata(store, "img1") + if err != nil { + t.Fatalf("GetImageMetadata: %v", err) + } + + if !bytes.Equal(meta.ManifestJSON, manifestJSON) { + t.Fatal("manifest mismatch") + } + if !bytes.Equal(meta.ConfigJSON, configJSON) { + t.Fatal("config mismatch") + } + + // Layer chain: leaf-first (layer-c, layer-b, layer-a) + if len(meta.LayerDigests) != 3 { + t.Fatalf("expected 3 layers, got %d", len(meta.LayerDigests)) + } + if meta.LayerDigests[0] != "layer-c" || meta.LayerDigests[1] != "layer-b" || meta.LayerDigests[2] != "layer-a" { + t.Fatalf("unexpected layer order: %v", meta.LayerDigests) + } +} + +func TestGetImageMetadataDepthCap(t *testing.T) { + manifestJSON := makeManifest("sha256:cfg", []string{"sha256:l1"}) + + // Build a circular chain + parents := make(map[string]string) + for i := 0; i < 5000; i++ { + parents[fmt.Sprintf("layer-%d", i)] = fmt.Sprintf("layer-%d", i+1) + } + + store := &mockStore{ + resolveID: map[string][2]string{ + "img": {"img", "layer-0"}, + }, + bigData: map[string]map[string][]byte{ + "img": { + "manifest": manifestJSON, + "sha256:cfg": []byte(`{}`), + }, + }, + layerParents: parents, + } + + _, err := GetImageMetadata(store, "img") + if err == nil { + t.Fatal("expected depth cap error") + } + if !strings.Contains(err.Error(), "maximum depth") { + t.Fatalf("expected depth error, got: %v", err) + } +} + +func TestGetImageMetadataLayerParentError(t *testing.T) { + manifestJSON := makeManifest("sha256:cfg", []string{"sha256:l1"}) + + store := &mockStore{ + resolveID: map[string][2]string{ + "img": {"img", "layer-top"}, + }, + bigData: map[string]map[string][]byte{ + "img": { + "manifest": manifestJSON, + "sha256:cfg": []byte(`{}`), + }, + }, + layerParents: map[string]string{ + "layer-top": "layer-mid", + }, + parentErr: map[string]error{ + "layer-mid": fmt.Errorf("storage I/O error"), + }, + } + + _, err := GetImageMetadata(store, "img") + if err == nil { + t.Fatal("expected error from LayerParent") + } + if !strings.Contains(err.Error(), "storage I/O error") { + t.Fatalf("expected wrapped I/O error, got: %v", err) + } +} + +func TestGetImageMetadataFallbackToManifest(t *testing.T) { + manifestJSON := makeManifest("sha256:cfg", []string{"sha256:digest-a", "sha256:digest-b"}) + + store := &mockStore{ + resolveID: map[string][2]string{ + "img": {"img", ""}, // empty top layer → triggers fallback + }, + bigData: map[string]map[string][]byte{ + "img": { + "manifest": manifestJSON, + "sha256:cfg": []byte(`{}`), + }, + }, + layerParents: map[string]string{}, + } + + meta, err := GetImageMetadata(store, "img") + if err != nil { + t.Fatalf("GetImageMetadata: %v", err) + } + + if len(meta.LayerDigests) != 2 { + t.Fatalf("expected 2 layers from manifest fallback, got %d", len(meta.LayerDigests)) + } + if meta.LayerDigests[0] != "sha256:digest-a" || meta.LayerDigests[1] != "sha256:digest-b" { + t.Fatalf("unexpected fallback layers: %v", meta.LayerDigests) + } +} From da928ef01ea9559f2c63123a77ddabd26a5ba9f9 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 16 Feb 2026 18:28:15 +0100 Subject: [PATCH 05/12] storage/store: add new APIs to resolve image ID Signed-off-by: Giuseppe Scrivano --- storage/store.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/storage/store.go b/storage/store.go index 4b66388fd2..c365580ec2 100644 --- a/storage/store.go +++ b/storage/store.go @@ -509,6 +509,12 @@ type Store interface { // Image returns a specific image. Image(id string) (*Image, error) + // ResolveImageID resolves an image reference to its actual ID and top layer ID. + ResolveImageID(id string) (string, string, error) + + // LayerParent returns the parent layer ID for the given layer. + LayerParent(id string) (string, error) + // ImagesByTopLayer returns a list of images which reference the specified // layer as their top layer. They will have different IDs and names // and may have different metadata, big data items, and flags. @@ -3593,6 +3599,22 @@ func (s *store) Image(id string) (*Image, error) { return nil, fmt.Errorf("locating image with ID %q: %w", id, ErrImageUnknown) } +func (s *store) ResolveImageID(id string) (string, string, error) { + img, err := s.Image(id) + if err != nil { + return "", "", err + } + return img.ID, img.TopLayer, nil +} + +func (s *store) LayerParent(id string) (string, error) { + l, err := s.Layer(id) + if err != nil { + return "", err + } + return l.Parent, nil +} + func (s *store) ImagesByTopLayer(id string) ([]*Image, error) { layer, err := s.Layer(id) if err != nil { From 873cd113474a78510963b879278c872a6882ac7f Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 5 Feb 2026 13:15:03 +0000 Subject: [PATCH 06/12] storage: add SplitFDStreamStore interface Extend the store with splitfdstream capabilities exposed via a UNIX socket for JSON-RPC communication. Signed-off-by: Giuseppe Scrivano --- storage/store.go | 72 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/storage/store.go b/storage/store.go index c365580ec2..ae2eb90f0c 100644 --- a/storage/store.go +++ b/storage/store.go @@ -32,6 +32,7 @@ import ( "go.podman.io/storage/pkg/idtools" "go.podman.io/storage/pkg/ioutils" "go.podman.io/storage/pkg/lockfile" + "go.podman.io/storage/pkg/splitfdstream" "go.podman.io/storage/pkg/stringutils" "go.podman.io/storage/pkg/system" "go.podman.io/storage/types" @@ -618,6 +619,15 @@ type Store interface { Dedup(DedupArgs) (drivers.DedupResult, error) } +// SplitFDStreamStore extends the Store interface with splitfdstream capabilities. +// This API is experimental and can be changed without bumping the major version number. +type SplitFDStreamStore interface { + Store + + // SplitFDStreamSocket returns a socket for splitfdstream operations. + SplitFDStreamSocket() (*os.File, error) +} + // AdditionalLayer represents a layer that is contained in the additional layer store // This API is experimental and can be changed without bumping the major version number. type AdditionalLayer interface { @@ -789,9 +799,14 @@ type store struct { layerStoreUseGetters rwLayerStore // Almost all users should use the provided accessors instead of accessing this field directly. roLayerStoresUseGetters []roLayerStore // Almost all users should use the provided accessors instead of accessing this field directly. - // FIXME: The following fields need locking, and don’t have it. + // FIXME: The following fields need locking, and don't have it. additionalUIDs *idSet // Set by getAvailableIDs() additionalGIDs *idSet // Set by getAvailableIDs() + + // jsonRPCServer manages the JSON-RPC server for storage operations. + // This API is experimental and can be changed without bumping the major version number. + // Protected by graphLock (via startUsingGraphDriver). + jsonRPCServer *splitfdstream.JSONRPCServer } // GetStore attempts to find an already-created Store object matching the @@ -4099,3 +4114,58 @@ func (s *store) Dedup(req DedupArgs) (drivers.DedupResult, error) { return rlstore.dedup(r) }) } + +// SplitFDStreamSocket returns a UNIX socket file descriptor for split FD stream operations. +// JSON-RPC requests for split FD stream operations are sent over this socket. +// The caller is responsible for closing the returned file when done. +// This API is experimental and can be changed without bumping the major version number. +func (s *store) SplitFDStreamSocket() (*os.File, error) { + if err := s.startUsingGraphDriver(); err != nil { + return nil, err + } + defer s.stopUsingGraphDriver() + + // Check if driver supports splitfdstream operations + if _, ok := s.graphDriver.(splitfdstream.SplitFDStreamDriver); !ok { + return nil, fmt.Errorf("driver %s does not support split FD stream operations: %w", s.graphDriver.String(), drivers.ErrNotSupported) + } + + // Create socket pair - one end for the caller, one end for the server + clientConn, serverConn, err := splitfdstream.CreateSocketPair() + if err != nil { + return nil, fmt.Errorf("failed to create socket pair: %w", err) + } + + // Get file descriptor from client connection before starting + // the server goroutine, so cleanup is straightforward on error. + clientFile, err := clientConn.File() + if err != nil { + clientConn.Close() + serverConn.Close() + return nil, fmt.Errorf("failed to get file from connection: %w", err) + } + clientConn.Close() + + // Initialize server if not already created + if s.jsonRPCServer == nil { + s.jsonRPCServer = splitfdstream.NewJSONRPCServer(func() (splitfdstream.SplitFDStreamDriver, func(), error) { + if err := s.startUsingGraphDriver(); err != nil { + return nil, nil, err + } + driver, ok := s.graphDriver.(splitfdstream.SplitFDStreamDriver) + if !ok { + s.stopUsingGraphDriver() + return nil, nil, fmt.Errorf("driver does not support splitfdstream") + } + return driver, s.stopUsingGraphDriver, nil + }, s) + } + + // Start handling the server connection in a goroutine. + // Add to the WaitGroup before spawning the goroutine to avoid + // a race with Stop() -> connections.Wait(). + s.jsonRPCServer.TrackConnection() + go s.jsonRPCServer.HandleConnection(serverConn) + + return clientFile, nil +} From a1b3bf83a8d276cd947e938619eb6843533e9aae Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 9 Feb 2026 14:27:12 +0100 Subject: [PATCH 07/12] storage/archive,chrootarchive: add support for splitfdstream Signed-off-by: Giuseppe Scrivano --- storage/pkg/archive/archive.go | 81 +++++++++++++++++++++++++---- storage/pkg/archive/archive_test.go | 3 +- storage/pkg/archive/diff.go | 6 ++- 3 files changed, 75 insertions(+), 15 deletions(-) diff --git a/storage/pkg/archive/archive.go b/storage/pkg/archive/archive.go index bce24e5af5..804c1a20dc 100644 --- a/storage/pkg/archive/archive.go +++ b/storage/pkg/archive/archive.go @@ -79,6 +79,11 @@ const ( tarExt = "tar" windows = "windows" darwin = "darwin" + + // CopyBufferSize is the buffer size used for reading file content + // during archive extraction and splitfdstream operations. 1 MiB + // balances syscall overhead against memory usage. + CopyBufferSize = 1 << 20 ) var xattrsToIgnore = map[string]any{ @@ -702,7 +707,7 @@ func (ta *tarWriter) addFile(headers *addFileData) error { return nil } -func extractTarFileEntry(path, extractDir string, hdr *tar.Header, reader io.Reader, Lchown bool, chownOpts *idtools.IDPair, inUserns, ignoreChownErrors bool, forceMask *os.FileMode, buffer []byte) error { +func extractTarFileEntry(path, extractDir string, hdr *tar.Header, writeContent func(*os.File) error, Lchown bool, chownOpts *idtools.IDPair, inUserns, ignoreChownErrors bool, forceMask *os.FileMode) error { // hdr.Mode is in linux format, which we can use for sycalls, // but for os.Foo() calls we need the mode converted to os.FileMode, // so use hdrInfo.Mode() (they differ for e.g. setuid bits) @@ -739,9 +744,11 @@ func extractTarFileEntry(path, extractDir string, hdr *tar.Header, reader io.Rea if err != nil { return err } - if _, err := io.CopyBuffer(file, reader, buffer); err != nil { - file.Close() - return err + if writeContent != nil { + if err := writeContent(file); err != nil { + file.Close() + return err + } } if err := file.Close(); err != nil { return err @@ -1085,17 +1092,67 @@ func TarWithOptions(srcPath string, options *TarOptions) (io.ReadCloser, error) return pipeReader, nil } -// Unpack unpacks the decompressedArchive to dest with options. -func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) error { +// TarEntryIterator abstracts iteration over tar entries. +// Standard implementation wraps tar.Reader; splitfdstream provides +// entries from its chunk-based format with reflink support. +type TarEntryIterator interface { + // Next advances to the next entry and returns its header. + Next() (*tar.Header, error) + // WriteContentTo writes the current entry's file content to dst. + // Called for TypeReg entries (including zero-size files). + WriteContentTo(dst *os.File) error +} + +// tarReaderIterator implements TarEntryIterator for a standard tar.Reader. +type tarReaderIterator struct { + tr *tar.Reader + trBuf *bufio.Reader + buffer []byte +} + +func newTarReaderIterator(decompressedArchive io.Reader) *tarReaderIterator { tr := tar.NewReader(decompressedArchive) trBuf := pools.BufioReader32KPool.Get(nil) - defer pools.BufioReader32KPool.Put(trBuf) + return &tarReaderIterator{ + tr: tr, + trBuf: trBuf, + buffer: make([]byte, CopyBufferSize), + } +} + +func (i *tarReaderIterator) Next() (*tar.Header, error) { + hdr, err := i.tr.Next() + if err != nil { + return nil, err + } + i.trBuf.Reset(i.tr) + return hdr, nil +} + +func (i *tarReaderIterator) WriteContentTo(dst *os.File) error { + _, err := io.CopyBuffer(dst, i.trBuf, i.buffer) + return err +} +func (i *tarReaderIterator) close() { + pools.BufioReader32KPool.Put(i.trBuf) +} + +// Unpack unpacks the decompressedArchive to dest with options. +func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) error { + iter := newTarReaderIterator(decompressedArchive) + defer iter.close() + return UnpackFromIterator(iter, dest, options) +} + +// UnpackFromIterator unpacks tar entries from the given iterator to dest with options. +// This allows plugging in alternative sources of tar entries (e.g., splitfdstream) +// while reusing the full extraction logic including xattrs, whiteouts, device nodes, etc. +func UnpackFromIterator(iter TarEntryIterator, dest string, options *TarOptions) error { var dirs []*tar.Header idMappings := idtools.NewIDMappingsFromMaps(options.UIDMaps, options.GIDMaps) rootIDs := idMappings.RootPair() whiteoutConverter := GetWhiteoutConverter(options.WhiteoutFormat, options.WhiteoutData) - buffer := make([]byte, 1<<20) doChown := !options.NoLchown if options.ForceMask != nil { @@ -1107,7 +1164,7 @@ func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) err // Iterate through the files in the archive. loop: for { - hdr, err := tr.Next() + hdr, err := iter.Next() if err == io.EOF { // end of tar archive break @@ -1181,7 +1238,6 @@ loop: } } } - trBuf.Reset(tr) chownOpts := options.ChownOpts if err := remapIDs(nil, idMappings, chownOpts, hdr); err != nil { @@ -1202,7 +1258,10 @@ loop: chownOpts = &idtools.IDPair{UID: hdr.Uid, GID: hdr.Gid} } - if err = extractTarFileEntry(path, dest, hdr, trBuf, doChown, chownOpts, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil { + writeContent := func(dst *os.File) error { + return iter.WriteContentTo(dst) + } + if err = extractTarFileEntry(path, dest, hdr, writeContent, doChown, chownOpts, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil { return err } diff --git a/storage/pkg/archive/archive_test.go b/storage/pkg/archive/archive_test.go index d17dd64dee..90ea76fc7e 100644 --- a/storage/pkg/archive/archive_test.go +++ b/storage/pkg/archive/archive_test.go @@ -751,8 +751,7 @@ func TestTarWithOptions(t *testing.T) { func TestTypeXGlobalHeaderDoesNotFail(t *testing.T) { hdr := tar.Header{Typeflag: tar.TypeXGlobalHeader} tmpDir := t.TempDir() - buffer := make([]byte, 1<<20) - err := extractTarFileEntry(filepath.Join(tmpDir, "pax_global_header"), tmpDir, &hdr, nil, true, nil, false, false, nil, buffer) + err := extractTarFileEntry(filepath.Join(tmpDir, "pax_global_header"), tmpDir, &hdr, nil, true, nil, false, false, nil) if err != nil { t.Fatal(err) } diff --git a/storage/pkg/archive/diff.go b/storage/pkg/archive/diff.go index 355d65f212..64be4f8fdd 100644 --- a/storage/pkg/archive/diff.go +++ b/storage/pkg/archive/diff.go @@ -104,7 +104,8 @@ func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64, } defer os.RemoveAll(aufsTempdir) } - if err := extractTarFileEntry(filepath.Join(aufsTempdir, basename), dest, hdr, tr, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil { + writeContent := func(dst *os.File) error { _, err := io.CopyBuffer(dst, tr, buffer); return err } + if err := extractTarFileEntry(filepath.Join(aufsTempdir, basename), dest, hdr, writeContent, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil { return 0, err } } @@ -209,7 +210,8 @@ func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64, return 0, err } - if err := extractTarFileEntry(path, dest, srcHdr, srcData, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil { + writeContent := func(dst *os.File) error { _, err := io.CopyBuffer(dst, srcData, buffer); return err } + if err := extractTarFileEntry(path, dest, srcHdr, writeContent, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil { return 0, err } From 02cedfcb1a3747faff6b61813f9c33b5470fe0af Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 19 Feb 2026 11:48:33 +0100 Subject: [PATCH 08/12] storage, overlay: use openat2 instead of using procfs Signed-off-by: Giuseppe Scrivano --- storage/drivers/overlay/overlay.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/storage/drivers/overlay/overlay.go b/storage/drivers/overlay/overlay.go index 5e38e9ab49..8a7bdc7735 100644 --- a/storage/drivers/overlay/overlay.go +++ b/storage/drivers/overlay/overlay.go @@ -2162,17 +2162,24 @@ func (g *overlayFileGetter) Get(path string) (io.ReadCloser, error) { buf := make([]byte, unix.PathMax) for _, d := range g.diffDirs { if f, found := g.composefsMounts[d]; found { - // there is no *at equivalent for getxattr, but it can be emulated by opening the file under /proc/self/fd/$FD/$PATH - len, err := unix.Getxattr(fmt.Sprintf("/proc/self/fd/%d/%s", int(f.Fd()), path), "trusted.overlay.redirect", buf) + cfd, err := unix.Openat2(int(f.Fd()), path, &unix.OpenHow{ + Flags: unix.O_RDONLY | unix.O_CLOEXEC | unix.O_PATH, + Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH, + }) + if err != nil { + continue + } + n, err := unix.Fgetxattr(cfd, "trusted.overlay.redirect", buf) + unix.Close(cfd) if err != nil { if errors.Is(err, unix.ENODATA) { continue } - return nil, &fs.PathError{Op: "getxattr", Path: path, Err: err} + return nil, &fs.PathError{Op: "fgetxattr", Path: path, Err: err} } // the xattr value is the path to the file in the composefs layer diff directory - return os.Open(filepath.Join(d, string(buf[:len]))) + return os.Open(filepath.Join(d, string(buf[:n]))) } f, err := os.Open(filepath.Join(d, path)) From 0abddc99b3415c3420c2fd7fc8c2590709d9d5a3 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Tue, 7 Apr 2026 11:16:45 +0200 Subject: [PATCH 09/12] composefs: do not leak open file Signed-off-by: Giuseppe Scrivano --- storage/drivers/overlay/composefs.go | 1 + 1 file changed, 1 insertion(+) diff --git a/storage/drivers/overlay/composefs.go b/storage/drivers/overlay/composefs.go index 713aeed3cb..17e10af69c 100644 --- a/storage/drivers/overlay/composefs.go +++ b/storage/drivers/overlay/composefs.go @@ -68,6 +68,7 @@ func generateComposeFsBlob(verityDigests map[string]string, toc any, composefsDi outFile.Close() return fmt.Errorf("failed to reopen %s as read-only: %w", destFile, err) } + defer roFile.Close() err = func() error { // a scope to close outFile before setting fsverity on the read-only fd. From 41771222bd7d12e7fc49f614dd7af40a7259e156 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 9 Feb 2026 09:20:59 +0100 Subject: [PATCH 10/12] storage/overlay: implement SplitFDStreamDriver Implement the SplitFDStreamDriver interface for the overlay driver, enabling efficient layer operations with reflink support. Signed-off-by: Giuseppe Scrivano --- storage/drivers/overlay/overlay.go | 5 +- .../drivers/overlay/overlay_splitfdstream.go | 271 ++++++++++++++++++ storage/drivers/overlay/splitfdstream_test.go | 39 +++ 3 files changed, 314 insertions(+), 1 deletion(-) create mode 100644 storage/drivers/overlay/overlay_splitfdstream.go create mode 100644 storage/drivers/overlay/splitfdstream_test.go diff --git a/storage/drivers/overlay/overlay.go b/storage/drivers/overlay/overlay.go index 8a7bdc7735..261a1d3f64 100644 --- a/storage/drivers/overlay/overlay.go +++ b/storage/drivers/overlay/overlay.go @@ -2167,7 +2167,10 @@ func (g *overlayFileGetter) Get(path string) (io.ReadCloser, error) { Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH, }) if err != nil { - continue + if errors.Is(err, unix.ENOENT) || errors.Is(err, unix.ELOOP) { + continue + } + return nil, &fs.PathError{Op: "openat2", Path: path, Err: err} } n, err := unix.Fgetxattr(cfd, "trusted.overlay.redirect", buf) unix.Close(cfd) diff --git a/storage/drivers/overlay/overlay_splitfdstream.go b/storage/drivers/overlay/overlay_splitfdstream.go new file mode 100644 index 0000000000..b1431615ce --- /dev/null +++ b/storage/drivers/overlay/overlay_splitfdstream.go @@ -0,0 +1,271 @@ +//go:build linux + +package overlay + +import ( + "archive/tar" + "bytes" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/sirupsen/logrus" + "go.podman.io/storage/pkg/archive" + "go.podman.io/storage/pkg/fileutils" + "go.podman.io/storage/pkg/idtools" + "go.podman.io/storage/pkg/splitfdstream" + "golang.org/x/sys/unix" +) + +// GetSplitFDStream generates a split FD stream from the layer differences. +// The returned ReadCloser contains the splitfdstream-formatted data, and the +// []*os.File slice contains the external file descriptors referenced by the stream. +// Regular files are passed as external file descriptors for reflink-based copying. +// The caller is responsible for closing both the ReadCloser and all file descriptors. +func (d *Driver) GetSplitFDStream(id, parent string, options *splitfdstream.GetSplitFDStreamOpts) (io.ReadCloser, []*os.File, error) { + if options == nil { + return nil, nil, fmt.Errorf("options cannot be nil") + } + + dir := d.dir(id) + if err := fileutils.Exists(dir); err != nil { + return nil, nil, fmt.Errorf("layer %s does not exist: %w", id, err) + } + + // Check if this is a composefs layer and mount the EROFS blob if so. + // The mount FD is used to resolve file paths to their flat storage paths + // via the trusted.overlay.redirect xattr. + composefsData := d.getComposefsData(id) + composefsMountFd := -1 + if err := fileutils.Exists(composefsData); err == nil { + fd, err := openComposefsMount(composefsData) + if err != nil { + return nil, nil, fmt.Errorf("failed to mount composefs for layer %s: %w", id, err) + } + composefsMountFd = fd + defer unix.Close(composefsMountFd) + } else if !errors.Is(err, unix.ENOENT) { + return nil, nil, err + } + + logrus.Debugf("overlay: GetSplitFDStream for layer %s with parent %s", id, parent) + + idMappings := options.IDMappings + if idMappings == nil { + idMappings = &idtools.IDMappings{} + } + + // Get the diff path for file access (used for FD references) + diffPath, err := d.getDiffPath(id) + if err != nil { + return nil, nil, fmt.Errorf("failed to get diff path for layer %s: %w", id, err) + } + + // Diff() handles naiveDiff and all the edge cases correctly. + tarStream, err := d.Diff(id, idMappings, parent, nil, options.MountLabel) + if err != nil { + return nil, nil, fmt.Errorf("failed to generate diff for layer %s: %w", id, err) + } + defer tarStream.Close() + + streamFd, err := unix.MemfdCreate("splitfdstream", unix.MFD_CLOEXEC) + if err != nil { + return nil, nil, fmt.Errorf("memfd_create: %w", err) + } + streamFile := os.NewFile(uintptr(streamFd), "splitfdstream") + + var fds []*os.File + writer := splitfdstream.NewWriter(streamFile) + + // Convert tar stream to splitfdstream + err = convertTarToSplitFDStream(tarStream, writer, diffPath, composefsMountFd, &fds) + if err != nil { + streamFile.Close() + for _, f := range fds { + f.Close() + } + return nil, nil, fmt.Errorf("failed to convert tar to splitfdstream: %w", err) + } + + if _, err := streamFile.Seek(0, io.SeekStart); err != nil { + streamFile.Close() + for _, f := range fds { + f.Close() + } + return nil, nil, fmt.Errorf("failed to seek stream: %w", err) + } + + logrus.Debugf("overlay: GetSplitFDStream complete for layer %s: numFDs=%d", id, len(fds)) + return streamFile, fds, nil +} + +// convertTarToSplitFDStream converts a tar stream to a splitfdstream by parsing +// tar headers and replacing file content with file descriptor references. +func convertTarToSplitFDStream(tarStream io.ReadCloser, writer *splitfdstream.SplitFDStreamWriter, diffPath string, composefsMountFd int, fds *[]*os.File) error { + tr := tar.NewReader(tarStream) + + // Open diff directory for safe file access + diffDirFd, err := unix.Open(diffPath, unix.O_RDONLY|unix.O_DIRECTORY|unix.O_CLOEXEC, 0) + if err != nil { + return fmt.Errorf("failed to open diff directory %s: %w", diffPath, err) + } + defer unix.Close(diffDirFd) + + // Reusable buffer for inline content, lazily allocated + var buf []byte + + for { + header, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read tar header: %w", err) + } + + // Write the tar header as inline data + var headerBuf bytes.Buffer + tw := tar.NewWriter(&headerBuf) + if err := tw.WriteHeader(header); err != nil { + return fmt.Errorf("failed to serialize tar header for %s: %w", header.Name, err) + } + if err := writer.WriteInline(headerBuf.Bytes()); err != nil { + return fmt.Errorf("failed to write tar header for %s: %w", header.Name, err) + } + + // Handle file content + if header.Typeflag == tar.TypeReg && header.Size > 0 { + // Try to open file and write FD reference + ok, err := tryWriteFileAsFDReference(writer, diffDirFd, composefsMountFd, header, fds) + if err != nil { + return fmt.Errorf("failed to write FD reference for %s: %w", header.Name, err) + } + if ok { + // Skip the content in the tar stream since we're using FD reference + if _, err := io.CopyN(io.Discard, tr, header.Size); err != nil { + return fmt.Errorf("failed to skip content for %s: %w", header.Name, err) + } + } else { + if buf == nil { + buf = make([]byte, archive.CopyBufferSize) + } + // naiveDiff generates the tar stream from a temporary + // mount, so individual files may not exist at the diff + // directory path. Fall back to streaming the content + // inline from the tar stream. + iw, err := writer.InlineWriter(header.Size) + if err != nil { + return fmt.Errorf("failed to write inline prefix for %s: %w", header.Name, err) + } + if _, err := io.CopyBuffer(iw, io.LimitReader(tr, header.Size), buf); err != nil { + return fmt.Errorf("failed to write inline content for %s: %w", header.Name, err) + } + } + } + } + + return nil +} + +// tryWriteFileAsFDReference tries to open a file and write an FD reference to the splitfdstream. +// Returns (true, nil) if the file was successfully written as FD reference. +// Returns (false, nil) if the file doesn't exist in the diff directory (caller should write inline). +// Returns (_, error) on other errors. +// +// When composefsMountFd >= 0, the diff directory uses a flat layout (files stored by digest). +// The file path is resolved by reading the trusted.overlay.redirect xattr from the composefs mount. +func tryWriteFileAsFDReference(writer *splitfdstream.SplitFDStreamWriter, diffDirFd int, composefsMountFd int, header *tar.Header, fds *[]*os.File) (bool, error) { + // Clean the file name to prevent path traversal + cleanName := filepath.Clean(header.Name) + if strings.Contains(cleanName, "..") { + return false, fmt.Errorf("invalid file path: %s", header.Name) + } + + var fd int + var openErr error + + if composefsMountFd >= 0 { + // Composefs: open the file in the composefs mount to read the redirect xattr, + // which gives the flat storage path in the diff directory. + cfd, err := unix.Openat2(composefsMountFd, cleanName, &unix.OpenHow{ + Flags: unix.O_RDONLY | unix.O_CLOEXEC | unix.O_PATH, + Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH, + }) + if err != nil { + if errors.Is(err, unix.ENOENT) || errors.Is(err, unix.ELOOP) { + return false, nil + } + return false, fmt.Errorf("failed to open %s in composefs mount: %w", cleanName, err) + } + buf := make([]byte, unix.PathMax) + n, err := unix.Fgetxattr(cfd, "trusted.overlay.redirect", buf) + unix.Close(cfd) + if err != nil { + if errors.Is(err, unix.ENODATA) { + return false, nil + } + return false, fmt.Errorf("failed to get redirect xattr for %s: %w", cleanName, err) + } + + flatPath := string(buf[:n]) + if strings.Contains(flatPath, "..") || filepath.IsAbs(flatPath) { + return false, fmt.Errorf("invalid redirect xattr value for %s: %s", cleanName, flatPath) + } + + fd, openErr = unix.Openat2(diffDirFd, flatPath, &unix.OpenHow{ + Flags: unix.O_RDONLY | unix.O_CLOEXEC, + Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH, + }) + } else { + // Non-composefs: open directly by name under the diff directory + fd, openErr = unix.Openat2(diffDirFd, cleanName, &unix.OpenHow{ + Flags: unix.O_RDONLY | unix.O_CLOEXEC, + Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH, + }) + } + + if openErr != nil { + if errors.Is(openErr, unix.ENOENT) || errors.Is(openErr, unix.ELOOP) { + return false, nil + } + return false, fmt.Errorf("failed to open %s: %w", cleanName, openErr) + } + + // Verify it's still a regular file + var fdStat unix.Stat_t + if err := unix.Fstat(fd, &fdStat); err != nil { + unix.Close(fd) + return false, fmt.Errorf("failed to fstat opened file %s: %w", cleanName, err) + } + if fdStat.Mode&unix.S_IFMT != unix.S_IFREG { + unix.Close(fd) + return false, fmt.Errorf("file %s is not a regular file", cleanName) + } + if fdStat.Size != header.Size { + unix.Close(fd) + return false, nil + } + + // Create os.File from fd + f := os.NewFile(uintptr(fd), cleanName) + if f == nil { + unix.Close(fd) + return false, fmt.Errorf("failed to create File from fd for %s", cleanName) + } + + fdIndex := len(*fds) + + // Write FD reference before appending to the slice so that on + // error the caller's cleanup loop does not see a stale entry. + if err := writer.WriteExternal(fdIndex); err != nil { + f.Close() + return false, fmt.Errorf("failed to write external FD reference: %w", err) + } + + *fds = append(*fds, f) + + return true, nil +} diff --git a/storage/drivers/overlay/splitfdstream_test.go b/storage/drivers/overlay/splitfdstream_test.go new file mode 100644 index 0000000000..c47ca1f3d7 --- /dev/null +++ b/storage/drivers/overlay/splitfdstream_test.go @@ -0,0 +1,39 @@ +//go:build linux + +package overlay + +import ( + "testing" + + "go.podman.io/storage/pkg/splitfdstream" +) + +func TestGetSplitFDStreamStub(t *testing.T) { + driver := &Driver{ + home: t.TempDir(), + } + + // Test with nil options + _, _, err := driver.GetSplitFDStream("test-layer", "parent-layer", nil) + if err == nil { + t.Error("Expected error with nil options") + } + + // Test with valid options but non-existent layer + opts := &splitfdstream.GetSplitFDStreamOpts{} + _, _, err = driver.GetSplitFDStream("non-existent-layer", "parent-layer", opts) + if err == nil { + t.Error("Expected error for non-existent layer") + } +} + +// TestOverlayImplementsSplitFDStreamDriver verifies that the overlay driver +// implements the SplitFDStreamDriver interface via type assertion. +func TestOverlayImplementsSplitFDStreamDriver(t *testing.T) { + driver := &Driver{} + + // Verify the driver implements SplitFDStreamDriver + if _, ok := interface{}(driver).(splitfdstream.SplitFDStreamDriver); !ok { + t.Error("Expected overlay driver to implement SplitFDStreamDriver interface") + } +} From 68e58d6f499275b17b3f3407d6eb3e2ccbd398ad Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Fri, 20 Mar 2026 16:48:02 +0100 Subject: [PATCH 11/12] image, storage: expose the underlying SplitFDStreamStore Signed-off-by: Giuseppe Scrivano --- image/storage/storage_src.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/image/storage/storage_src.go b/image/storage/storage_src.go index e76b9ceb03..a2383c8512 100644 --- a/image/storage/storage_src.go +++ b/image/storage/storage_src.go @@ -509,3 +509,13 @@ func (s *storageImageSource) getSize() (int64, error) { func (s *storageImageSource) Size() (int64, error) { return s.getSize() } + +// SplitFDStreamSocket returns a socket for splitfdstream operations, +// if the underlying store supports it. +func (s *storageImageSource) SplitFDStreamSocket() (*os.File, error) { + sfds, ok := s.imageRef.transport.store.(storage.SplitFDStreamStore) + if !ok { + return nil, fmt.Errorf("store does not support splitfdstream") + } + return sfds.SplitFDStreamSocket() +} From 604910eb7afda3b6d6c1f503677e9737c8b91b1d Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Fri, 20 Mar 2026 13:51:55 +0000 Subject: [PATCH 12/12] json-proxy: add OpenJSONRPCFdPass to broker splitfdstream sockets Add a new json-proxy method that returns a Unix socket file descriptor to the client. The client then speaks the jsonrpc-fdpass protocol directly over that socket for splitfdstream operations, bypassing the json-proxy for bulk data transfer. Bump protocolVersion to "0.2.9". Co-Authored-By: Claude Opus 4.6 Signed-off-by: Giuseppe Scrivano --- common/cmd/json-proxy-test-server/main.go | 88 +++++++++++++-- common/pkg/json-proxy/handler.go | 40 ++++++- common/pkg/json-proxy/proxy.go | 9 ++ common/pkg/json-proxy/proxy_test.go | 127 ++++++++++++++++++++++ common/pkg/json-proxy/types.go | 2 +- 5 files changed, 255 insertions(+), 11 deletions(-) diff --git a/common/cmd/json-proxy-test-server/main.go b/common/cmd/json-proxy-test-server/main.go index fd6d84132b..9577935feb 100644 --- a/common/cmd/json-proxy-test-server/main.go +++ b/common/cmd/json-proxy-test-server/main.go @@ -4,16 +4,26 @@ package main import ( "context" + "flag" "fmt" "os" - "strconv" - "go.podman.io/common/pkg/json-proxy" + imgcopy "go.podman.io/image/v5/copy" "go.podman.io/image/v5/signature" + istorage "go.podman.io/image/v5/storage" + "go.podman.io/image/v5/transports/alltransports" "go.podman.io/image/v5/types" + "go.podman.io/storage" + "go.podman.io/storage/pkg/reexec" + storagetypes "go.podman.io/storage/types" + + jsonproxy "go.podman.io/common/pkg/json-proxy" ) func main() { + if reexec.Init() { + return + } if err := run(); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) @@ -21,12 +31,32 @@ func main() { } func run() error { - if len(os.Args) < 3 || os.Args[1] != "--sockfd" { - return fmt.Errorf("usage: %s --sockfd ", os.Args[0]) + var ( + sockfd int + graphRoot string + runRoot string + seedImage string + ) + flag.IntVar(&sockfd, "sockfd", -1, "socket file descriptor") + flag.StringVar(&graphRoot, "graph-root", "", "storage graph root") + flag.StringVar(&runRoot, "run-root", "", "storage run root") + flag.StringVar(&seedImage, "seed-image", "", "image to copy into local store") + flag.Parse() + + if sockfd < 0 { + return fmt.Errorf("usage: %s --sockfd [--graph-root --run-root --seed-image ]", os.Args[0]) } - sockfd, err := strconv.Atoi(os.Args[2]) - if err != nil { - return fmt.Errorf("invalid sockfd: %v", err) + + if graphRoot != "" { + ref, store, err := setupStore(graphRoot, runRoot, seedImage) + if err != nil { + return fmt.Errorf("setting up store: %w", err) + } + defer func() { + _, _ = store.Shutdown(true) + }() + // Print the containers-storage:// reference for the test to read. + fmt.Fprintln(os.Stdout, ref) } manager, err := jsonproxy.NewManager( @@ -47,3 +77,47 @@ func run() error { defer manager.Close() return manager.Serve(context.Background(), sockfd) } + +func setupStore(graphRoot, runRoot, seedImage string) (string, storage.Store, error) { + store, err := storage.GetStore(storagetypes.StoreOptions{ + GraphRoot: graphRoot, + RunRoot: runRoot, + GraphDriverName: "overlay", + }) + if err != nil { + return "", nil, fmt.Errorf("creating store: %w", err) + } + + ctx := context.Background() + + srcRef, err := alltransports.ParseImageName(seedImage) + if err != nil { + return "", nil, fmt.Errorf("parsing seed image %q: %w", seedImage, err) + } + + destRef, err := istorage.Transport.ParseStoreReference(store, "testimage:latest") + if err != nil { + return "", nil, fmt.Errorf("creating store reference: %w", err) + } + + policy, err := signature.DefaultPolicy(nil) + if err != nil { + return "", nil, fmt.Errorf("getting default policy: %w", err) + } + pc, err := signature.NewPolicyContext(policy) + if err != nil { + return "", nil, fmt.Errorf("creating policy context: %w", err) + } + defer func() { + if err := pc.Destroy(); err != nil { + fmt.Fprintf(os.Stderr, "warning: destroying policy context: %v\n", err) + } + }() + + _, err = imgcopy.Image(ctx, pc, destRef, srcRef, nil) + if err != nil { + return "", nil, fmt.Errorf("copying seed image: %w", err) + } + + return "containers-storage:" + destRef.StringWithinTransport(), store, nil +} diff --git a/common/pkg/json-proxy/handler.go b/common/pkg/json-proxy/handler.go index 63c947fb77..e8b9e9f846 100644 --- a/common/pkg/json-proxy/handler.go +++ b/common/pkg/json-proxy/handler.go @@ -28,9 +28,10 @@ type handler struct { lock sync.Mutex // Dependency injection functions. - getSystemContext func() (*types.SystemContext, error) - getPolicyContext func() (*signature.PolicyContext, error) - logger logrus.FieldLogger + getSystemContext func() (*types.SystemContext, error) + getPolicyContext func() (*signature.PolicyContext, error) + splitFDStreamStore splitFDStreamStore + logger logrus.FieldLogger // Internal state. sysctx *types.SystemContext @@ -127,6 +128,12 @@ func (h *handler) openImageImpl(ctx context.Context, args []any, allowNotFound b return ret, err } + if h.splitFDStreamStore == nil { + if sfds, ok := imgsrc.(splitFDStreamStore); ok { + h.splitFDStreamStore = sfds + } + } + policyContext, err := h.getPolicyContext() if err != nil { return ret, err @@ -707,6 +714,31 @@ func (h *handler) FinishPipe(ctx context.Context, args []any) (replyBuf, error) return ret, err } +// OpenJSONRPCFdPass returns a socket FD over which the client can +// speak the jsonrpc-fdpass protocol for splitfdstream operations. +// The json-proxy does not interpret the protocol; it just brokers the socket. +func (h *handler) OpenJSONRPCFdPass(ctx context.Context, args []any) (replyBuf, error) { + h.lock.Lock() + defer h.lock.Unlock() + + var ret replyBuf + + if h.splitFDStreamStore == nil { + return ret, errors.New("splitfdstream store not configured") + } + if len(args) != 0 { + return ret, fmt.Errorf("found %d args, expecting none", len(args)) + } + + sockFile, err := h.splitFDStreamStore.SplitFDStreamSocket() + if err != nil { + return ret, err + } + + ret.fd = sockFile + return ret, nil +} + // processRequest dispatches a remote request. // replyBuf is the result of the invocation. // terminate should be true if processing of requests should halt. @@ -746,6 +778,8 @@ func (h *handler) processRequest(ctx context.Context, readBytes []byte) (rb repl rb, err = h.GetLayerInfoPiped(ctx, req.Args) case "FinishPipe": rb, err = h.FinishPipe(ctx, req.Args) + case "OpenJSONRPCFdPass": + rb, err = h.OpenJSONRPCFdPass(ctx, req.Args) case "Shutdown": terminate = true // NOTE: If you add a method here, you should very likely be bumping the diff --git a/common/pkg/json-proxy/proxy.go b/common/pkg/json-proxy/proxy.go index 8c44831467..54c42af50a 100644 --- a/common/pkg/json-proxy/proxy.go +++ b/common/pkg/json-proxy/proxy.go @@ -4,11 +4,20 @@ package jsonproxy import ( + "os" + "github.com/sirupsen/logrus" "go.podman.io/image/v5/signature" "go.podman.io/image/v5/types" ) +// splitFDStreamStore is the subset of storage.SplitFDStreamStore needed +// by the json-proxy. Keeping a local interface avoids a hard dependency +// on go.podman.io/storage for consumers that do not use splitfdstream. +type splitFDStreamStore interface { + SplitFDStreamSocket() (*os.File, error) +} + // options holds the internal configuration for a Manager. type options struct { getSystemContext func() (*types.SystemContext, error) diff --git a/common/pkg/json-proxy/proxy_test.go b/common/pkg/json-proxy/proxy_test.go index 0337c06ecb..097945e83f 100644 --- a/common/pkg/json-proxy/proxy_test.go +++ b/common/pkg/json-proxy/proxy_test.go @@ -3,12 +3,14 @@ package jsonproxy_test import ( + "bufio" "encoding/json" "fmt" "io" "net" "os" "os/exec" + "path/filepath" "strings" "sync" "syscall" @@ -508,3 +510,128 @@ func TestProxyGetBlob(t *testing.T) { } assert.NoError(t, err) } + +// newProxyWithStore spawns the test binary with a local containers-storage +// store seeded with the given image. It returns the proxy and the +// containers-storage:// reference string for the seeded image. +func newProxyWithStore(t *testing.T, seedImage string) (*proxy, string) { + t.Helper() + + proxyBinary := os.Getenv("JSON_PROXY_TEST_BINARY") + if proxyBinary == "" { + t.Skip("JSON_PROXY_TEST_BINARY is not set; skipping integration test") + } + + wd := t.TempDir() + graphRoot := filepath.Join(wd, "root") + runRoot := filepath.Join(wd, "run") + + fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_SEQPACKET, 0) + require.NoError(t, err) + myfd := os.NewFile(uintptr(fds[0]), "myfd") + defer myfd.Close() + theirfd := os.NewFile(uintptr(fds[1]), "theirfd") + defer theirfd.Close() + + mysock, err := net.FileConn(myfd) + require.NoError(t, err) + unixConn, ok := mysock.(*net.UnixConn) + require.True(t, ok, "expected *net.UnixConn, got %T", mysock) + + proc := exec.Command(proxyBinary, //nolint:gosec + "--sockfd", "3", + "--graph-root", graphRoot, + "--run-root", runRoot, + "--seed-image", seedImage, + ) + proc.Stderr = os.Stderr + proc.ExtraFiles = append(proc.ExtraFiles, theirfd) + + stdoutPipe, err := proc.StdoutPipe() + require.NoError(t, err) + + err = proc.Start() + require.NoError(t, err) + + // Read the containers-storage reference from stdout. + scanner := bufio.NewScanner(stdoutPipe) + require.True(t, scanner.Scan(), "expected storage reference on stdout") + storageRef := strings.TrimSpace(scanner.Text()) + require.True(t, strings.HasPrefix(storageRef, "containers-storage:"), "unexpected ref: %s", storageRef) + + p := &proxy{ + c: unixConn, + proc: proc, + } + t.Cleanup(p.close) + + v, err := p.callNoFd("Initialize", nil) + require.NoError(t, err) + semver, ok := v.(string) + require.True(t, ok, "proxy Initialize: Unexpected value %T", v) + require.True(t, strings.HasPrefix(semver, expectedProxySemverMajor), "Unexpected semver %s", semver) + + return p, storageRef +} + +func TestOpenJSONRPCFdPass(t *testing.T) { + p, storageRef := newProxyWithStore(t, knownListImage) + + // Open the containers-storage image to trigger auto-discovery. + imgidVal, err := p.callNoFd("OpenImage", []any{storageRef}) + require.NoError(t, err) + imgid, ok := imgidVal.(float64) + require.True(t, ok) + require.NotZero(t, imgid) + + // OpenJSONRPCFdPass should return a valid FD. + _, fd, err := p.call("OpenJSONRPCFdPass", nil) + require.NoError(t, err) + require.NotNil(t, fd, "expected an FD from OpenJSONRPCFdPass") + + // Verify the received FD is a unix socket. + var stat syscall.Stat_t + err = syscall.Fstat(int(fd.datafd.Fd()), &stat) + require.NoError(t, err) + require.True(t, stat.Mode&syscall.S_IFMT == syscall.S_IFSOCK, "expected socket, got mode %o", stat.Mode) + + // Validate the socket speaks the splitfdstream jsonrpc-fdpass protocol. + // Send a JSON-RPC request for a bogus method and expect a method-not-found error. + conn, err := net.FileConn(fd.datafd) + fd.datafd.Close() + require.NoError(t, err) + unixSock, ok := conn.(*net.UnixConn) + require.True(t, ok) + defer unixSock.Close() + + rpcReq := []byte("{\"jsonrpc\":\"2.0\",\"method\":\"NoSuchMethod\",\"id\":1}\n") + _, err = unixSock.Write(rpcReq) + require.NoError(t, err) + + respBuf := make([]byte, 4096) + n, err := unixSock.Read(respBuf) + require.NoError(t, err) + var rpcResp map[string]any + err = json.Unmarshal(respBuf[:n], &rpcResp) + require.NoError(t, err) + // A valid JSON-RPC server returns an error object for unknown methods. + rpcErr, ok := rpcResp["error"].(map[string]any) + require.True(t, ok, "expected JSON-RPC error object, got %v", rpcResp) + require.Contains(t, rpcErr["message"], "not found") + + _, err = p.callNoFd("CloseImage", []any{imgid}) + require.NoError(t, err) +} + +func TestOpenJSONRPCFdPassNotAvailable(t *testing.T) { + p := newProxy(t) + + // Open a docker:// image (no splitfdstream support). + _, err := p.callNoFd("OpenImage", []any{knownNotManifestListedImageX8664}) + require.NoError(t, err) + + // OpenJSONRPCFdPass should fail since no containers-storage source was opened. + _, _, err = p.call("OpenJSONRPCFdPass", nil) + require.Error(t, err) + require.Contains(t, err.Error(), "splitfdstream store not configured") +} diff --git a/common/pkg/json-proxy/types.go b/common/pkg/json-proxy/types.go index b316549965..3a3f98f10b 100644 --- a/common/pkg/json-proxy/types.go +++ b/common/pkg/json-proxy/types.go @@ -17,7 +17,7 @@ import ( // departure from the original code which used HTTP. // // When bumping this, please also update the man page. -const protocolVersion = "0.2.8" +const protocolVersion = "0.2.9" // maxMsgSize is the current limit on a packet size. // Note that all non-metadata (i.e. payload data) is sent over a pipe.