From c0de628407491354778921c4dbc270346d21671a Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Mon, 23 Mar 2026 13:52:52 -0700 Subject: [PATCH] Add vectorized JSON reader --- compiler/semantic/op.go | 2 +- go.mod | 6 ++ go.sum | 18 ++++ runtime/exec/environment.go | 3 + sio/anyio/lookup.go | 7 ++ sio/fjsonio/jsonvec/array.go | 40 ++++++++ sio/fjsonio/jsonvec/bool.go | 30 ++++++ sio/fjsonio/jsonvec/builder.go | 92 ++++++++++++++++++ sio/fjsonio/jsonvec/float.go | 34 +++++++ sio/fjsonio/jsonvec/int.go | 34 +++++++ sio/fjsonio/jsonvec/materialize.go | 98 +++++++++++++++++++ sio/fjsonio/jsonvec/null.go | 27 ++++++ sio/fjsonio/jsonvec/record.go | 107 ++++++++++++++++++++ sio/fjsonio/jsonvec/string.go | 31 ++++++ sio/fjsonio/jsonvec/union.go | 150 +++++++++++++++++++++++++++++ sio/fjsonio/jsonvec/value.go | 39 ++++++++ sio/fjsonio/stream.go | 93 ++++++++++++++++++ sio/fjsonio/valreader.go | 61 ++++++++++++ sio/fjsonio/vectorreader.go | 78 +++++++++++++++ sio/fjsonio/vectorreader_test.go | 22 +++++ sio/fjsonio/ztests/array.yaml | 42 ++++++++ sio/fjsonio/ztests/record.yaml | 90 +++++++++++++++++ sio/fjsonio/ztests/union.yaml | 21 ++++ vector/bitvec/bits.go | 9 ++ 24 files changed, 1133 insertions(+), 1 deletion(-) create mode 100644 sio/fjsonio/jsonvec/array.go create mode 100644 sio/fjsonio/jsonvec/bool.go create mode 100644 sio/fjsonio/jsonvec/builder.go create mode 100644 sio/fjsonio/jsonvec/float.go create mode 100644 sio/fjsonio/jsonvec/int.go create mode 100644 sio/fjsonio/jsonvec/materialize.go create mode 100644 sio/fjsonio/jsonvec/null.go create mode 100644 sio/fjsonio/jsonvec/record.go create mode 100644 sio/fjsonio/jsonvec/string.go create mode 100644 sio/fjsonio/jsonvec/union.go create mode 100644 sio/fjsonio/jsonvec/value.go create mode 100644 sio/fjsonio/stream.go create mode 100644 sio/fjsonio/valreader.go create mode 100644 sio/fjsonio/vectorreader.go create mode 100644 sio/fjsonio/vectorreader_test.go create mode 100644 sio/fjsonio/ztests/array.yaml create mode 100644 sio/fjsonio/ztests/record.yaml create mode 100644 sio/fjsonio/ztests/union.yaml diff --git a/compiler/semantic/op.go b/compiler/semantic/op.go index 01f940a64f..05bea26a8c 100644 --- a/compiler/semantic/op.go +++ b/compiler/semantic/op.go @@ -284,7 +284,7 @@ func (t *translator) file(n ast.Node, name string, args []ast.OpArg) sem.Op { if format == "" { format = sio.FormatFromPath(name) } - if format == "csup" || format == "parquet" { + if format == "csup" || format == "parquet" || format == "fjson" { t.hasVectorizedInput = true } typ, err := t.fileType(name, format) diff --git a/go.mod b/go.mod index 5e004ce7cc..c58765e228 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de github.com/aws/aws-sdk-go v1.36.17 github.com/axiomhq/hyperloglog v0.2.5 + github.com/bytedance/sonic v1.15.0 github.com/go-redis/redis/v8 v8.11.5 github.com/goccy/go-yaml v1.19.0 github.com/golang-jwt/jwt/v4 v4.4.3 @@ -49,7 +50,10 @@ require ( github.com/apache/thrift v0.22.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.12.0 // indirect + github.com/bytedance/gopkg v0.1.3 // indirect + github.com/bytedance/sonic/loader v0.5.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect @@ -75,9 +79,11 @@ require ( github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.8.0 // indirect + golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect golang.org/x/mod v0.32.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/telemetry v0.0.0-20260109210033-bd525da824e2 // indirect diff --git a/go.sum b/go.sum index 4a41869058..dc20e8a8d0 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,12 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA= github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= +github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= +github.com/bytedance/sonic v1.15.0 h1:/PXeWFaR5ElNcVE84U0dOHjiMHQOwNIx3K4ymzh/uSE= +github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k= +github.com/bytedance/sonic/loader v0.5.0 h1:gXH3KVnatgY7loH5/TkeVyXPfESoqSBSBEiDd5VjlgE= +github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -75,6 +81,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= +github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -326,16 +334,24 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/teamortix/golang-wasm/wasm v0.0.0-20230719150929-5d000994c833 h1:PE/ebx5HZAsK42Bs/syRaSWBInfZpj9RifI/sEhGHvo= github.com/teamortix/golang-wasm/wasm v0.0.0-20230719150929-5d000994c833/go.mod h1:nskvTyoGIaAsC+664SkRitVI1ft6dm1xerCr50YZsnY= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= @@ -378,6 +394,8 @@ go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/runtime/exec/environment.go b/runtime/exec/environment.go index eaf2266648..de564a1cf3 100644 --- a/runtime/exec/environment.go +++ b/runtime/exec/environment.go @@ -16,6 +16,7 @@ import ( "github.com/brimdata/super/sbuf" "github.com/brimdata/super/sio/anyio" "github.com/brimdata/super/sio/csupio" + "github.com/brimdata/super/sio/fjsonio" "github.com/brimdata/super/sio/parquetio" "github.com/brimdata/super/vector" "github.com/brimdata/super/vector/vio" @@ -188,6 +189,8 @@ func (e *Environment) VectorOpen(ctx context.Context, sctx *super.Context, path, puller, err = csupio.NewVectorReader(ctx, sctx, reader, p, concurrentReaders) case "parquet": puller, err = parquetio.NewVectorReader(ctx, sctx, reader, p, concurrentReaders) + case "fjson": + puller = fjsonio.NewVectorReader(ctx, sctx, reader, p, concurrentReaders) default: var sbufPuller sbuf.Puller sbufPuller, err = e.Open(ctx, sctx, path, format, p) diff --git a/sio/anyio/lookup.go b/sio/anyio/lookup.go index c7c8d178e9..faa955699d 100644 --- a/sio/anyio/lookup.go +++ b/sio/anyio/lookup.go @@ -1,15 +1,18 @@ package anyio import ( + "context" "fmt" "io" "github.com/brimdata/super" + "github.com/brimdata/super/sbuf" "github.com/brimdata/super/sio" "github.com/brimdata/super/sio/arrowio" "github.com/brimdata/super/sio/bsupio" "github.com/brimdata/super/sio/csupio" "github.com/brimdata/super/sio/csvio" + "github.com/brimdata/super/sio/fjsonio" "github.com/brimdata/super/sio/jsonio" "github.com/brimdata/super/sio/jsupio" "github.com/brimdata/super/sio/lineio" @@ -34,6 +37,10 @@ func lookupReader(sctx *super.Context, r io.Reader, opts ReaderOpts) (sio.ReadCl return sio.NopReadCloser(csvio.NewReader(sctx, r, opts.CSV)), nil case "line": return sio.NopReadCloser(lineio.NewReader(r)), nil + case "fjson": + v := fjsonio.NewVectorReader(context.Background(), sctx, r, nil, 1) + puller := sbuf.NewMaterializer(v) + return sio.NopReadCloser(sbuf.PullerReader(puller)), nil case "json": return sio.NopReadCloser(jsonio.NewReader(sctx, r)), nil case "parquet": diff --git a/sio/fjsonio/jsonvec/array.go b/sio/fjsonio/jsonvec/array.go new file mode 100644 index 0000000000..0d2d96979a --- /dev/null +++ b/sio/fjsonio/jsonvec/array.go @@ -0,0 +1,40 @@ +package jsonvec + +import "github.com/brimdata/super/vector" + +var _ Value = (*Array)(nil) + +type Array struct { + Offsets []uint32 + Inner Value +} + +func NewArray() *Array { + return &Array{ + Offsets: []uint32{0}, + Inner: Unknown{}, + } +} + +func (a *Array) BeginArray() Value { return a } +func (a *Array) EnterArray() Value { return a.Inner } + +func (a *Array) EndArray(inner Value) { + a.Inner = inner + n := a.Inner.Len() + if n == 0 { + a.Inner = new(Null) + } + a.Offsets = append(a.Offsets, n) +} + +func (a *Array) OnNull() Value { return ToUnion(a).OnNull() } +func (a *Array) OnBool(v bool) Value { return ToUnion(a).OnBool(v) } +func (a *Array) OnInt(v int64) Value { return ToUnion(a).OnInt(v) } +func (a *Array) OnFloat(v float64) Value { return ToUnion(a).OnFloat(v) } +func (a *Array) OnString(v string) Value { return ToUnion(a).OnString(v) } +func (a *Array) BeginRecord() Value { return ToUnion(a).BeginRecord() } +func (a *Array) Field(v string) Value { panic("system error") } +func (a *Array) EndRecord() { panic("system error") } +func (a *Array) Kind() vector.Kind { return vector.KindNull } +func (a *Array) Len() uint32 { return uint32(len(a.Offsets)) - 1 } diff --git a/sio/fjsonio/jsonvec/bool.go b/sio/fjsonio/jsonvec/bool.go new file mode 100644 index 0000000000..7e8ed0fd11 --- /dev/null +++ b/sio/fjsonio/jsonvec/bool.go @@ -0,0 +1,30 @@ +package jsonvec + +import ( + "github.com/brimdata/super/vector" + "github.com/brimdata/super/vector/bitvec" +) + +type Bool struct { + Value bitvec.Bits +} + +var _ Value = (*Bool)(nil) + +func (b *Bool) OnBool(v bool) Value { + b.Value.Append(v) + return b +} + +func (b *Bool) OnNull() Value { return ToUnion(b).OnNull() } +func (b *Bool) OnString(v string) Value { return ToUnion(b).OnString(v) } +func (b *Bool) OnInt(v int64) Value { return ToUnion(b).OnInt(v) } +func (b *Bool) OnFloat(v float64) Value { return ToUnion(b).OnFloat(v) } +func (b *Bool) BeginRecord() Value { return ToUnion(b).BeginRecord() } +func (b *Bool) Field(k string) Value { panic("system error") } +func (b *Bool) EndRecord() { panic("system error") } +func (b *Bool) BeginArray() Value { return ToUnion(b).BeginArray() } +func (b *Bool) EnterArray() Value { panic("system error") } +func (b *Bool) EndArray(Value) { panic("system error") } +func (b *Bool) Kind() vector.Kind { return vector.KindBool } +func (b *Bool) Len() uint32 { return b.Value.Len() } diff --git a/sio/fjsonio/jsonvec/builder.go b/sio/fjsonio/jsonvec/builder.go new file mode 100644 index 0000000000..524d9c35c8 --- /dev/null +++ b/sio/fjsonio/jsonvec/builder.go @@ -0,0 +1,92 @@ +package jsonvec + +import ( + "encoding/json" + + "github.com/bytedance/sonic/ast" +) + +var _ ast.Visitor = (*Builder)(nil) + +type Builder struct { + stack []Value +} + +func NewBuilder() *Builder { + return &Builder{ + stack: []Value{Unknown{}}, + } +} + +func (b *Builder) Reset() { + b.stack = b.stack[:1] +} + +func (b *Builder) OnNull() error { + b.stack[len(b.stack)-1] = b.tos().OnNull() + return nil +} + +func (b *Builder) OnBool(v bool) error { + b.stack[len(b.stack)-1] = b.tos().OnBool(v) + return nil +} + +func (b *Builder) OnInt64(v int64, _ json.Number) error { + b.stack[len(b.stack)-1] = b.tos().OnInt(v) + return nil +} + +func (b *Builder) OnFloat64(v float64, _ json.Number) error { + b.stack[len(b.stack)-1] = b.tos().OnFloat(v) + return nil +} + +func (b *Builder) OnString(v string) error { + b.stack[len(b.stack)-1] = b.tos().OnString(v) + return nil +} + +func (b *Builder) OnObjectBegin(capacity int) error { + b.stack[len(b.stack)-1] = b.tos().BeginRecord() + b.push(nil) + return nil +} + +func (b *Builder) OnObjectKey(name string) error { + b.pop() + b.push(b.tos().Field(name)) + return nil +} + +func (b *Builder) OnObjectEnd() error { + b.pop() + b.tos().EndRecord() + return nil +} + +func (b *Builder) OnArrayBegin(capacity int) error { + b.stack[len(b.stack)-1] = b.tos().BeginArray() + b.push(b.tos().EnterArray()) + return nil +} + +func (b *Builder) OnArrayEnd() error { + inner := b.pop() + b.tos().EndArray(inner) + return nil +} + +func (b *Builder) tos() Value { + return b.stack[len(b.stack)-1] +} + +func (b *Builder) push(val Value) { + b.stack = append(b.stack, val) +} + +func (b *Builder) pop() Value { + v := b.tos() + b.stack = b.stack[:len(b.stack)-1] + return v +} diff --git a/sio/fjsonio/jsonvec/float.go b/sio/fjsonio/jsonvec/float.go new file mode 100644 index 0000000000..446f0ec923 --- /dev/null +++ b/sio/fjsonio/jsonvec/float.go @@ -0,0 +1,34 @@ +package jsonvec + +import ( + "github.com/brimdata/super" + "github.com/brimdata/super/vector" +) + +var _ Value = (*Float)(nil) + +type Float struct { + Value *vector.Float +} + +func NewFloat() *Float { + return &Float{Value: vector.NewFloatEmpty(super.TypeFloat64, 0)} +} + +func (f *Float) OnFloat(v float64) Value { + f.Value.Append(v) + return f +} + +func (f *Float) OnNull() Value { return ToUnion(f).OnNull() } +func (f *Float) OnBool(v bool) Value { return ToUnion(f).OnBool(v) } +func (f *Float) OnInt(v int64) Value { return ToUnion(f).OnInt(v) } +func (f *Float) OnString(v string) Value { return ToUnion(f).OnString(v) } +func (f *Float) BeginRecord() Value { return ToUnion(f).BeginRecord() } +func (f *Float) Field(key string) Value { panic("system error") } +func (f *Float) EndRecord() { panic("system error") } +func (f *Float) BeginArray() Value { return ToUnion(f).BeginArray() } +func (f *Float) EnterArray() Value { panic("system error") } +func (f *Float) EndArray(Value) { panic("system error") } +func (f *Float) Kind() vector.Kind { return vector.KindFloat } +func (f *Float) Len() uint32 { return f.Value.Len() } diff --git a/sio/fjsonio/jsonvec/int.go b/sio/fjsonio/jsonvec/int.go new file mode 100644 index 0000000000..f1d1812d75 --- /dev/null +++ b/sio/fjsonio/jsonvec/int.go @@ -0,0 +1,34 @@ +package jsonvec + +import ( + "github.com/brimdata/super" + "github.com/brimdata/super/vector" +) + +var _ Value = (*Int)(nil) + +type Int struct { + Value *vector.Int +} + +func NewInt() *Int { + return &Int{Value: vector.NewIntEmpty(super.TypeInt64, 0)} +} + +func (i *Int) OnInt(v int64) Value { + i.Value.Append(v) + return i +} + +func (i *Int) OnNull() Value { return ToUnion(i).OnNull() } +func (i *Int) OnBool(v bool) Value { return ToUnion(i).OnBool(v) } +func (i *Int) OnFloat(v float64) Value { return ToUnion(i).OnFloat(v) } +func (i *Int) OnString(v string) Value { return ToUnion(i).OnString(v) } +func (i *Int) BeginRecord() Value { return ToUnion(i).BeginRecord() } +func (i *Int) Field(key string) Value { panic("system error") } +func (i *Int) EndRecord() { panic("system error") } +func (i *Int) BeginArray() Value { return ToUnion(i).BeginArray() } +func (i *Int) EnterArray() Value { panic("system error") } +func (i *Int) EndArray(Value) { panic("system error") } +func (i *Int) Kind() vector.Kind { return vector.KindInt } +func (i *Int) Len() uint32 { return i.Value.Len() } diff --git a/sio/fjsonio/jsonvec/materialize.go b/sio/fjsonio/jsonvec/materialize.go new file mode 100644 index 0000000000..1a4fa88d3a --- /dev/null +++ b/sio/fjsonio/jsonvec/materialize.go @@ -0,0 +1,98 @@ +package jsonvec + +import ( + "encoding/binary" + "strings" + + "github.com/brimdata/super" + "github.com/brimdata/super/vector" +) + +func Materialize(sctx *super.Context, b *Builder) vector.Any { + return materialize(sctx, b.stack[0]) +} + +func materialize(sctx *super.Context, v Value) vector.Any { + switch v := v.(type) { + case *Null: + return vector.NewNull(v.len) + case *Bool: + return vector.NewBool(v.Value) + case *Int: + return v.Value + case *Float: + return v.Value + case *String: + return v.Value + case *Union: + return materializeUnion(sctx, v) + case *Array: + inner := materialize(sctx, v.Inner) + typ := sctx.LookupTypeArray(inner.Type()) + return vector.NewArray(typ, v.Offsets, inner) + case *Record: + return materializeRecord(sctx, v) + default: + panic(v) + } +} + +func materializeUnion(sctx *super.Context, u *Union) vector.Any { + var types []super.Type + var vecs []vector.Any + for _, v := range u.Values() { + vec := materialize(sctx, v) + types = append(types, vec.Type()) + vecs = append(vecs, vec) + } + subTypes := make([]super.Type, len(u.Tags)) + for i, tag := range u.Tags { + subTypes[i] = types[tag] + } + utyp := sctx.LookupTypeUnion(types) + vec := vector.NewUnion(utyp, u.Tags, vecs) + ftyp := sctx.LookupTypeFusion(utyp) + return vector.NewFusion(sctx, ftyp, vec, subTypes) +} + +func materializeRecord(sctx *super.Context, r *Record) vector.Any { + fieldNames := make([]string, len(r.LUT)) + for name, id := range r.LUT { + fieldNames[id] = name + } + n := r.Len() + var vecs []vector.Any + var allFields []super.Field + for i, field := range r.Fields { + rle := r.RLEs[i].End(n) + vec := materialize(sctx, field.Value) + field := super.NewFieldWithOpt(fieldNames[i], vec.Type(), len(rle) > 0) + vecs = append(vecs, vector.NewFieldFromRLE(sctx, vec, n, rle)) + allFields = append(allFields, field) + } + rtyp := sctx.MustLookupTypeRecord(allFields) + record := vector.NewRecord(rtyp, vecs, n) + if len(r.typeToTag) == 1 { + return record + } + subTypeMap := make([]super.Type, len(r.typeToTag)) + for desc, tag := range r.typeToTag { + r := strings.NewReader(desc) + var subFields []super.Field + for { + fieldId, err := binary.ReadUvarint(r) + if err != nil { + break + } + f := allFields[fieldId] + subFields = append(subFields, super.NewField(f.Name, f.Type)) + } + subTypeMap[tag] = sctx.MustLookupTypeRecord(subFields) + } + subTypes := make([]super.Type, len(r.tags)) + for i, tag := range r.tags { + subTypes[i] = subTypeMap[tag] + } + fusedType := sctx.LookupTypeFusion(rtyp) + return vector.NewFusion(sctx, fusedType, record, subTypes) +} diff --git a/sio/fjsonio/jsonvec/null.go b/sio/fjsonio/jsonvec/null.go new file mode 100644 index 0000000000..660b2380d3 --- /dev/null +++ b/sio/fjsonio/jsonvec/null.go @@ -0,0 +1,27 @@ +package jsonvec + +import "github.com/brimdata/super/vector" + +var _ Value = (*Null)(nil) + +type Null struct { + len uint32 +} + +func (n *Null) OnNull() Value { + n.len++ + return n +} + +func (n *Null) OnBool(v bool) Value { return ToUnion(n).OnBool(v) } +func (n *Null) OnString(v string) Value { return ToUnion(n).OnString(v) } +func (n *Null) OnInt(v int64) Value { return ToUnion(n).OnInt(v) } +func (n *Null) OnFloat(v float64) Value { return ToUnion(n).OnFloat(v) } +func (n *Null) BeginRecord() Value { return ToUnion(n).BeginRecord() } +func (n *Null) Field(v string) Value { panic("system error") } +func (n *Null) EndRecord() { panic("system error") } +func (n *Null) BeginArray() Value { return ToUnion(n).BeginArray() } +func (n *Null) EnterArray() Value { panic("system error") } +func (n *Null) EndArray(Value) { panic("system error") } +func (n *Null) Kind() vector.Kind { return vector.KindNull } +func (n *Null) Len() uint32 { return n.len } diff --git a/sio/fjsonio/jsonvec/record.go b/sio/fjsonio/jsonvec/record.go new file mode 100644 index 0000000000..08ebcab4cb --- /dev/null +++ b/sio/fjsonio/jsonvec/record.go @@ -0,0 +1,107 @@ +package jsonvec + +import ( + "encoding/binary" + + "github.com/brimdata/super/vector" +) + +var _ Value = (*Record)(nil) + +type Record struct { + Fields []*Element + RLEs []vector.RLE + LUT map[string]int + + tags []uint32 + // typeToTag map[super.Type]uint32 + typeToTag map[string]uint32 + scratch []byte +} + +func NewRecord() *Record { + return &Record{ + LUT: make(map[string]int), + typeToTag: make(map[string]uint32), + } +} + +func (r *Record) BeginRecord() Value { + r.scratch = r.scratch[:0] + return r +} + +func (r *Record) Field(name string) Value { + idx, ok := r.LUT[name] + if !ok { + idx = len(r.Fields) + r.LUT[name] = idx + r.Fields = append(r.Fields, &Element{Value: Unknown{}}) + r.RLEs = append(r.RLEs, vector.RLE{}) + } + r.scratch = binary.AppendUvarint(r.scratch, uint64(idx)) + f := r.Fields[idx] + r.RLEs[idx].Touch(uint32(len(r.tags))) + return f +} + +func (r *Record) EndRecord() { + tag, ok := r.typeToTag[string(r.scratch)] + if !ok { + tag = uint32(len(r.typeToTag)) + r.typeToTag[string(r.scratch)] = tag + } + r.tags = append(r.tags, tag) +} + +func (r *Record) OnNull() Value { return ToUnion(r).OnNull() } +func (r *Record) OnBool(v bool) Value { return ToUnion(r).OnBool(v) } +func (r *Record) OnInt(v int64) Value { return ToUnion(r).OnInt(v) } +func (r *Record) OnFloat(v float64) Value { return ToUnion(r).OnFloat(v) } +func (r *Record) OnString(v string) Value { return ToUnion(r).OnString(v) } +func (r *Record) BeginArray() Value { return ToUnion(r).BeginArray() } +func (r *Record) EnterArray() Value { panic("system error") } +func (r *Record) EndArray(Value) {} +func (r *Record) Kind() vector.Kind { return vector.KindRecord } +func (r *Record) Len() uint32 { return uint32(len(r.tags)) } + +var _ Value = (*Element)(nil) + +type Element struct { + Value +} + +func (f *Element) OnNull() Value { + f.Value = f.Value.OnNull() + return f +} + +func (f *Element) OnBool(v bool) Value { + f.Value = f.Value.OnBool(v) + return f +} + +func (f *Element) OnString(v string) Value { + f.Value = f.Value.OnString(v) + return f +} + +func (f *Element) OnInt(v int64) Value { + f.Value = f.Value.OnInt(v) + return f +} + +func (f *Element) OnFloat(v float64) Value { + f.Value = f.Value.OnFloat(v) + return f +} + +func (f *Element) BeginRecord() Value { + f.Value = f.Value.BeginRecord() + return f +} + +func (f *Element) BeginArray() Value { + f.Value = f.Value.BeginArray() + return f +} diff --git a/sio/fjsonio/jsonvec/string.go b/sio/fjsonio/jsonvec/string.go new file mode 100644 index 0000000000..d338a334f3 --- /dev/null +++ b/sio/fjsonio/jsonvec/string.go @@ -0,0 +1,31 @@ +package jsonvec + +import "github.com/brimdata/super/vector" + +var _ Value = (*String)(nil) + +type String struct { + Value *vector.String +} + +func NewString() *String { + return &String{vector.NewStringEmpty(0)} +} + +func (s *String) OnString(v string) Value { + s.Value.Append(v) + return s +} + +func (s *String) OnNull() Value { return ToUnion(s).OnNull() } +func (s *String) OnBool(v bool) Value { return ToUnion(s).OnBool(v) } +func (s *String) OnInt(v int64) Value { return ToUnion(s).OnInt(v) } +func (s *String) OnFloat(v float64) Value { return ToUnion(s).OnFloat(v) } +func (s *String) BeginRecord() Value { return ToUnion(s).BeginRecord() } +func (s *String) Field(string) Value { panic("system error") } +func (s *String) EndRecord() { panic("system error") } +func (s *String) BeginArray() Value { return ToUnion(s).BeginArray() } +func (s *String) EnterArray() Value { panic("system error") } +func (s *String) EndArray(Value) { panic("system error") } +func (s *String) Kind() vector.Kind { return vector.KindString } +func (s *String) Len() uint32 { return s.Value.Len() } diff --git a/sio/fjsonio/jsonvec/union.go b/sio/fjsonio/jsonvec/union.go new file mode 100644 index 0000000000..64a01d5e90 --- /dev/null +++ b/sio/fjsonio/jsonvec/union.go @@ -0,0 +1,150 @@ +package jsonvec + +import "github.com/brimdata/super/vector" + +var _ Value = (*Union)(nil) + +type Union struct { + Tags []uint32 + lut map[vector.Kind]uint32 + Null *Null + Bool *Bool + Int *Int + Float *Float + String *String + Record *Record + Array *Array +} + +func NewUnion() *Union { + return &Union{ + lut: make(map[vector.Kind]uint32), + Null: new(Null), + Bool: new(Bool), + Int: NewInt(), + Float: NewFloat(), + String: NewString(), + Record: NewRecord(), + Array: NewArray(), + } +} + +func ToUnion(val Value) *Union { + u := NewUnion() + u.lut[val.Kind()] = 0 + u.Tags = make([]uint32, val.Len()) + switch val := val.(type) { + case *Null: + u.Null = val + case *Bool: + u.Bool = val + case *Int: + u.Int = val + case *Float: + u.Float = val + case *String: + u.String = val + case *Record: + u.Record = val + case *Array: + u.Array = val + default: + panic(val) + } + return u +} + +func (u *Union) OnNull() Value { + u.Null.OnNull() + u.touch(vector.KindNull) + return u +} + +func (u *Union) OnBool(v bool) Value { + u.Bool.OnBool(v) + u.touch(vector.KindBool) + return u +} + +func (u *Union) OnInt(v int64) Value { + u.Int.OnInt(v) + u.touch(vector.KindInt) + return u +} + +func (u *Union) OnFloat(v float64) Value { + u.Float.OnFloat(v) + u.touch(vector.KindFloat) + return u +} + +func (u *Union) OnString(v string) Value { + u.String.OnString(v) + u.touch(vector.KindString) + return u +} + +func (u *Union) BeginRecord() Value { + u.Record.BeginRecord() + u.touch(vector.KindRecord) + return u +} + +func (u *Union) Field(name string) Value { + return u.Record.Field(name) +} + +func (u *Union) EndRecord() { + u.Record.EndRecord() +} + +func (u *Union) BeginArray() Value { + u.Array.BeginArray() + u.touch(vector.KindArray) + return u +} + +func (u *Union) EnterArray() Value { + return u.Array.EnterArray() +} + +func (u *Union) EndArray(v Value) { + u.Array.EndArray(v) +} + +func (u *Union) touch(kind vector.Kind) { + idx, ok := u.lut[kind] + if !ok { + idx = uint32(len(u.lut)) + u.lut[kind] = idx + } + u.Tags = append(u.Tags, idx) +} + +func (u *Union) Values() []Value { + values := make([]Value, len(u.lut)) + for kind, idx := range u.lut { + switch kind { + case vector.KindNull: + values[idx] = u.Null + case vector.KindBool: + values[idx] = u.Bool + case vector.KindInt: + values[idx] = u.Int + case vector.KindFloat: + values[idx] = u.Float + case vector.KindString: + values[idx] = u.String + case vector.KindRecord: + values[idx] = u.Record + case vector.KindArray: + values[idx] = u.Array + default: + panic(kind) + } + } + return values +} + +func (u *Union) Kind() vector.Kind { return vector.KindUnion } +func (u *Union) Len() uint32 { return uint32(len(u.Tags)) } diff --git a/sio/fjsonio/jsonvec/value.go b/sio/fjsonio/jsonvec/value.go new file mode 100644 index 0000000000..e4bbc27bcf --- /dev/null +++ b/sio/fjsonio/jsonvec/value.go @@ -0,0 +1,39 @@ +package jsonvec + +import ( + "github.com/brimdata/super/vector" +) + +type Value interface { + OnNull() Value + OnBool(bool) Value + OnInt(int64) Value + OnFloat(float64) Value + OnString(string) Value + BeginRecord() Value + Field(string) Value + EndRecord() + BeginArray() Value + EnterArray() Value + EndArray(Value) + Kind() vector.Kind + Len() uint32 +} + +var _ Value = Unknown{} + +type Unknown struct{} + +func (Unknown) OnNull() Value { return new(Null).OnNull() } +func (Unknown) OnBool(v bool) Value { return new(Bool).OnBool(v) } +func (Unknown) OnInt(v int64) Value { return NewInt().OnInt(v) } +func (Unknown) OnFloat(v float64) Value { return NewFloat().OnFloat(v) } +func (Unknown) OnString(v string) Value { return NewString().OnString(v) } +func (Unknown) BeginRecord() Value { return NewRecord().BeginRecord() } +func (Unknown) Field(string) Value { panic("system error") } +func (Unknown) EndRecord() { panic("system error") } +func (Unknown) BeginArray() Value { return NewArray() } +func (Unknown) EnterArray() Value { panic("system error") } +func (Unknown) EndArray(Value) { panic("system error") } +func (Unknown) Kind() vector.Kind { panic("system error") } +func (Unknown) Len() uint32 { return 0 } diff --git a/sio/fjsonio/stream.go b/sio/fjsonio/stream.go new file mode 100644 index 0000000000..cc6bd4f39b --- /dev/null +++ b/sio/fjsonio/stream.go @@ -0,0 +1,93 @@ +package fjsonio + +import ( + "context" + "errors" + "io" + "runtime" + "sync" + + "github.com/brimdata/super/vector" +) + +type stream struct { + r io.Reader + ch chan result + once sync.Once + ctx context.Context + cancel context.CancelFunc +} + +func newStream(ctx context.Context, r io.Reader, n int) *stream { + ctx, cancel := context.WithCancel(ctx) + return &stream{ + r: r, + ch: make(chan result, n), + ctx: ctx, + cancel: cancel, + } +} + +type result struct { + bytes *vector.BytesTable + err error +} + +func (s *stream) next() (*vector.BytesTable, error) { + s.once.Do(func() { + s.ch = make(chan result, runtime.GOMAXPROCS(0)) + go s.run() + }) + select { + case r, ok := <-s.ch: + if errors.Is(r.err, io.EOF) { + r.err = nil + } + if !ok || r.err != nil { + return nil, r.err + } + return r.bytes, nil + case <-s.ctx.Done(): + return nil, s.ctx.Err() + } +} + +func (s *stream) run() { + r := newValReader(s.r) + for { + batch, err := readBatch(r) + select { + case s.ch <- result{&batch, err}: + case <-s.ctx.Done(): + return + } + if err != nil { + close(s.ch) + break + } + } +} + +func (s *stream) close() error { + s.cancel() + // drain channel + for range s.ch { + } + if closer, ok := s.r.(io.Closer); ok { + return closer.Close() + } + return nil +} + +func readBatch(r *valReader) (vector.BytesTable, error) { + // XXX Should we pool these? + t := vector.NewBytesTableEmpty(VecBatchSize) + for range VecBatchSize { + b, err := r.Next() + if err != nil { + return t, err + } + t.Append(b) + } + return t, nil +} diff --git a/sio/fjsonio/valreader.go b/sio/fjsonio/valreader.go new file mode 100644 index 0000000000..9779032622 --- /dev/null +++ b/sio/fjsonio/valreader.go @@ -0,0 +1,61 @@ +package fjsonio + +import ( + "errors" + "io" + + "github.com/bytedance/sonic/decoder" +) + +type valReader struct { + r io.Reader + buf []byte + cursor []byte + EOF bool +} + +func newValReader(r io.Reader) *valReader { + return &valReader{r: r, buf: make([]byte, 512*1024)} +} + +func (r *valReader) Next() ([]byte, error) { + start, end := decoder.Skip(r.cursor) + if start < 0 { + // XXX There's an issue here if we encounter a value that is larger than + // the default buffer size. We should probably include functionality to + // increase the buffer size to an arbitrary amount and return a detailed + // error if a value is larger than MaxBufSize. + if err := r.fill(); err != nil { + return nil, err + } + start, end = decoder.Skip(r.cursor) + if start < 0 { + return nil, errors.New("invalid input") + } + } + b := r.cursor[start:end] + r.cursor = r.cursor[end:] + return b, nil +} + +func (r *valReader) fill() error { + if r.EOF { + return io.EOF + } + // copy rest of cursor to buf + cc := copy(r.buf, r.cursor) + n, err := r.r.Read(r.buf[cc:]) + if errors.Is(err, io.EOF) { + r.EOF = true + if n == 0 { + return err + } + err = nil + } + if err != nil { + return err + } + r.cursor = r.buf + r.cursor = r.cursor[:cc+n] + return nil +} diff --git a/sio/fjsonio/vectorreader.go b/sio/fjsonio/vectorreader.go new file mode 100644 index 0000000000..af84394ba4 --- /dev/null +++ b/sio/fjsonio/vectorreader.go @@ -0,0 +1,78 @@ +package fjsonio + +import ( + "context" + "fmt" + "io" + "sync/atomic" + + "github.com/bytedance/sonic/ast" + + "github.com/brimdata/super" + "github.com/brimdata/super/pkg/byteconv" + "github.com/brimdata/super/sbuf" + "github.com/brimdata/super/sio/fjsonio/jsonvec" + "github.com/brimdata/super/vector" +) + +var VecBatchSize uint32 = 1024 + +type VectorReader struct { + sctx *super.Context + ctx context.Context + stream *stream + pushdown sbuf.Pushdown + + hasClosed atomic.Bool +} + +func NewVectorReader(ctx context.Context, sctx *super.Context, r io.Reader, p sbuf.Pushdown, concurrentReaders int) *VectorReader { + return &VectorReader{ + sctx: sctx, + ctx: ctx, + stream: newStream(ctx, r, concurrentReaders), + pushdown: p, + } +} + +func (v *VectorReader) Pull(done bool) (vector.Any, error) { + return v.ConcurrentPull(done, 0) +} + +func (v *VectorReader) ConcurrentPull(done bool, _ int) (vector.Any, error) { + if done { + return nil, v.close() + } + table, err := v.stream.next() + if table == nil || err != nil { + // XXX what do we do when we get in an error state. + v.close() + return nil, err + } + // XXX support projections + builder := jsonvec.NewBuilder() + for i := range table.Len() { + if err := ast.Preorder(byteconv.UnsafeString(table.Bytes(i)), builder, nil); err != nil { + // XXX what do we do when we get in an error state. + v.close() + return nil, err + } + } + defer func() { + if r := recover(); r != nil { + fmt.Println("lee problem", string(table.RawBytes())) + fmt.Println("r", r) + panic(r) + } + }() + vec := jsonvec.Materialize(v.sctx, builder) + // fmt.Println("vec", vector.Debug(vec)) + return vec, nil +} + +func (v *VectorReader) close() error { + if v.hasClosed.CompareAndSwap(false, true) { + return nil + } + return v.stream.close() +} diff --git a/sio/fjsonio/vectorreader_test.go b/sio/fjsonio/vectorreader_test.go new file mode 100644 index 0000000000..57555fa60d --- /dev/null +++ b/sio/fjsonio/vectorreader_test.go @@ -0,0 +1,22 @@ +package fjsonio + +import ( + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValueReader(t *testing.T) { + const s = `{"x":1} {"x":2}` + r := newValReader(strings.NewReader(s)) + b, err := r.Next() + require.NoError(t, err) + fmt.Println("first", string(b)) + b, err = r.Next() + require.NoError(t, err) + fmt.Println("next", string(b)) + b, err = r.Next() + fmt.Println("err", err) +} diff --git a/sio/fjsonio/ztests/array.yaml b/sio/fjsonio/ztests/array.yaml new file mode 100644 index 0000000000..e2a5888fde --- /dev/null +++ b/sio/fjsonio/ztests/array.yaml @@ -0,0 +1,42 @@ +# Test simple array +spq: pass + +input-flags: '-i fjson' +output-flags: '-f json' + +input: &input | + {"a":[1,2,3]} + {"a":[4,5]} + {"a":[6]} + +output: *input + +--- + +# Test array union +spq: pass + +input-flags: '-i fjson' + +input: | + {"a":[1,2]} + {"a":[{"x":1},{"x":2}]} + +output: | + {a:[fusion(1::(int64|{x:int64}),),fusion(2::(int64|{x:int64}),)]} + {a:[fusion({x:1}::(int64|{x:int64}),<{x:int64}>),fusion({x:2}::(int64|{x:int64}),<{x:int64}>)]} + +--- + +# Test empty array +spq: pass + +input-flags: '-i fjson' + +input: | + [] + [] + +output: | + [] + [] diff --git a/sio/fjsonio/ztests/record.yaml b/sio/fjsonio/ztests/record.yaml new file mode 100644 index 0000000000..fe3b3aa7b3 --- /dev/null +++ b/sio/fjsonio/ztests/record.yaml @@ -0,0 +1,90 @@ +spq: pass + +input-flags: '-i fjson' + +output-flags: '-f json' + +input: &input | + {"a":1,"b":2} + {"a":3,"b":4} + {"a":5,"b":6} + +output: *input + +--- + +# Test fusion on record leaves. +spq: pass + +input-flags: '-i fjson' + +input: | + {"x":1} + {"x":"foo"} + +output: | + {x:fusion(1::(int64|string),)} + {x:fusion("foo"::(int64|string),)} + +--- + +spq: pass + +input-flags: '-i fjson' + +input: | + {"a":1} + {"a":2,"b":"foo"} + {"a":3} + +output: | + fusion({a:1,b?:_::string},<{a:int64}>) + fusion({a:2,b?:"foo"},<{a:int64,b:string}>) + fusion({a:3,b?:_::string},<{a:int64}>) + +--- + +# Test order is preserved in record with same column names but different order. +spq: pass + +input-flags: '-i fjson' + +input: | + {"a":1,"b":2} + {"b":1,"a":2} + +output: | + fusion({a:1,b:2},<{a:int64,b:int64}>) + fusion({a:2,b:1},<{b:int64,a:int64}>) + +--- + +# Test nested record +spq: pass + +input-flags: '-i fjson' + +input: | + {"a":{"b":"foo"}} + {"a":{"b":2.5}} + {"a":{"b":"bar"}} + +output: | + {a:{b:fusion("foo"::(float64|string),)}} + {a:{b:fusion(2.5::(float64|string),)}} + {a:{b:fusion("bar"::(float64|string),)}} + +--- + +# Test empty record +spq: pass + +input-flags: '-i fjson' + +input: | + {} + {} + +output: | + {} + {} diff --git a/sio/fjsonio/ztests/union.yaml b/sio/fjsonio/ztests/union.yaml new file mode 100644 index 0000000000..6275540d85 --- /dev/null +++ b/sio/fjsonio/ztests/union.yaml @@ -0,0 +1,21 @@ +spq: pass + +input-flags: '-i fjson' + +input: | + 1 + 2 + {"x":1} + {"x":2} + true + false + null + +output: | + fusion(1::(int64|bool|null|{x:int64}),) + fusion(2::(int64|bool|null|{x:int64}),) + fusion({x:1}::(int64|bool|null|{x:int64}),<{x:int64}>) + fusion({x:2}::(int64|bool|null|{x:int64}),<{x:int64}>) + fusion(false::(int64|bool|null|{x:int64}),) + fusion(true::(int64|bool|null|{x:int64}),) + fusion(null::(int64|bool|null|{x:int64}),) diff --git a/vector/bitvec/bits.go b/vector/bitvec/bits.go index e0da2a6950..cb0a74c1e7 100644 --- a/vector/bitvec/bits.go +++ b/vector/bitvec/bits.go @@ -62,6 +62,15 @@ func (b Bits) Set(slot uint32) { b.bits[slot>>6] |= (1 << (slot & 0x3f)) } +func (b *Bits) Append(v bool) { + b.length++ + n := int(b.length>>6) + 1 + b.bits = slices.Grow(b.bits, n)[:n] + if v { + b.Set(b.length - 1) + } +} + // Shorten may be called to shorten the length of an allocated vector. // This is useful when you know that a vector has a limit but you're not // sure how large it might be. Create the vector with max length, write