Skip to content

[SPARK-52780] Add StreamRows and Arrow Record Streaming#152

Open
caldempsey wants to merge 21 commits intoapache:masterfrom
caldempsey:callum/SPARK-52780
Open

[SPARK-52780] Add StreamRows and Arrow Record Streaming#152
caldempsey wants to merge 21 commits intoapache:masterfrom
caldempsey:callum/SPARK-52780

Conversation

@caldempsey
Copy link
Copy Markdown

@caldempsey caldempsey commented Jul 13, 2025

What changes were proposed in this pull request?

This PR adds streaming support to the Spark Connect Go client by introducing ToLocalIterator(ctx) on the DataFrame API, matching the functionality available in Scala's implementation. The implementation includes a new types.RowIterator interface that streams Arrow records batch-by-batch, and a ToRecordBatches() method on ExecutePlanClient that returns channels for streaming Arrow records and errors. Context cancellation is handled throughout the streaming pipeline to ensure proper resource cleanup. We use EOF as the default implementation to signal we are done processing. This seems to be the default from a Databricks Cluster running on Spark. Seems OK on localhost Spark too.

Why are the changes needed?

The Go client currently only supports Collect(), which loads all rows into memory at once. This approach doesn't work for large OLAP queries that return gigabytes of data, and prevents lightweight services like API gateways or serverless functions from using Spark Connect effectively.

This PR brings the same streaming capability that Scala and Java clients have through toLocalIterator(). Rows are deserialised batch-by-batch and released immediately after consumption. Since ToTable was pulling entire tables of Arrow batches, we needed a new method to stream batches individually. For now, I've reused ReadArrowTableToRows for the batch-to-row conversion, though this could be optimised in a follow-up PR.

The changes are purely additive to maintain backward compatibility. External consumers can pull data at their own pace, making it suitable for streaming over HTTP or gRPC. I believe there are many opportunities to improve performance by reducing allocs and GC pressure, which can also be addressed in a follow up.

Does this PR introduce any user-facing change?

Yes, it adds a new optional API:

it, err := df.ToLocalIterator(ctx)
if err != nil {
    return err
}
defer it.Close()

for {
    row, err := it.Next()
    if err == io.EOF { 
        break 
    }
    if err != nil {
        return err
    }
    // process row ...
}

Existing code using Collect() or ToArrow() remains unchanged.

How was this patch tested?

  • Unit tests verify that streaming returns rows in the correct order, handles EOF properly, cancels contexts gracefully without goroutine leaks, converts Arrow batches correctly, and propagates errors through the streaming pipeline. All tests pass locally with go test ./...
  • Manual E2E Testing run with the Databricks Simba Spark ODBC 2.9.1 driver running adjacently seemed to surface an infrequent panic to the Go application, which describes a cgo or C application may not have followed the signal implementation properly. Since this can only be reproduced with the Simba driver attached I don't think this is something we need to block on, not an issue with the code in this PR, but worth noting for passers by. I wrote a write up on the fix™ here [SPARK-52780] Add StreamRows and Arrow Record Streaming #152 (comment)
  • Manual E2E Testing run against a 3 node i3.xlarge instance on the Databricks platform: using this streaming HTTP handler to make it easy for cURL requests to cancel context:
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"time"

	"github.com/apache/spark-connect-go/v35/client/sql"
)

var sparkSession *sql.SparkSession

func main() {
	// Get Spark Connect URL from environment or use default
	sparkConnectURL := os.Getenv("SPARK_CONNECT_URL")
	if sparkConnectURL == "" {
		sparkConnectURL = "sc://localhost:15002" // Default Spark Connect URL
	}

	// Initialize Spark Connect session
	var err error
	sparkSession, err = (&sql.SparkSessionBuilder{}).
		Remote(sparkConnectURL).
		Build(context.Background())
	if err != nil {
		log.Fatalf("Failed to create Spark session: %v", err)
	}
	defer sparkSession.Stop()

	log.Printf("Connected to Spark at %s", sparkConnectURL)

	// Set up HTTP routes
	http.HandleFunc("/query", StreamingHandler)
	http.HandleFunc("/health", HealthHandler)
	
	// Start server
	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
	}

	log.Printf("Starting server on port %s", port)
	log.Printf("Try: curl 'http://localhost:%s/query?sql=SELECT 1 as test_col UNION SELECT 2'", port)
	
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatalf("Server failed: %v", err)
	}
}

// StreamingHandler streams SQL query results as NDJSON
func StreamingHandler(w http.ResponseWriter, r *http.Request) {
	// Get SQL query from URL parameter
	query := r.URL.Query().Get("sql")
	if query == "" {
		http.Error(w, "Missing 'sql' query parameter", http.StatusBadRequest)
		return
	}

	log.Printf("Executing query: %s", query)

	// Create a context with timeout for the request
	ctx, cancel := context.WithTimeout(r.Context(), 30*time.Minute)
	defer cancel()

	// Set up proper cleanup on client disconnect
	notify := w.(http.CloseNotifier).CloseNotify()
	go func() {
		select {
		case <-notify:
			log.Printf("Client disconnected, cancelling query")
			cancel() // Cancel context if client disconnects
		case <-ctx.Done():
			// Context already done
		}
	}()

	// Execute the SQL query
	df, err := sparkSession.Sql(ctx, query)
	if err != nil {
		log.Printf("SQL execution failed: %v", err)
		http.Error(w, fmt.Sprintf("SQL execution failed: %v", err), http.StatusInternalServerError)
		return
	}

	// Create iterator for streaming results
	rowIter, err := df.ToLocalIterator(ctx)
	if err != nil {
		log.Printf("Failed to create iterator: %v", err)
		http.Error(w, fmt.Sprintf("Failed to create iterator: %v", err), http.StatusInternalServerError)
		return
	}
	defer func() {
		if closeErr := rowIter.Close(); closeErr != nil {
			log.Printf("Error closing iterator: %v", closeErr)
		}
	}()

	// Set response headers for NDJSON streaming
	w.Header().Set("Content-Type", "application/x-ndjson")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.WriteHeader(http.StatusOK)

	encoder := json.NewEncoder(w)
	encoder.SetEscapeHTML(false)

	rowCount := 0
	startTime := time.Now()

	// Stream rows as NDJSON
	for {
		select {
		case <-ctx.Done():
			// Context cancelled - exit gracefully
			log.Printf("Query cancelled after %d rows in %v", rowCount, time.Since(startTime))
			return
		default:
		}

		row, err := rowIter.Next()
		if err != nil {
			if errors.Is(err, io.EOF) {
				log.Printf("Query completed: %d rows streamed in %v", rowCount, time.Since(startTime))
				return // Normal completion
			}
			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
				log.Printf("Query cancelled: %v", err)
				return // Context cancellation - exit gracefully
			}
			// Log error but don't crash
			log.Printf("Error reading row: %v", err)
			return
		}

		// Map column names to values
		names := row.FieldNames()
		obj := make(map[string]interface{}, len(names))
		for i, name := range names {
			obj[name] = row.At(i)
		}

		// Stream the row as JSON
		if err := encoder.Encode(obj); err != nil {
			// Client disconnected or other write error
			log.Printf("Error encoding row %d: %v", rowCount, err)
			return
		}

		// Flush the response to ensure streaming
		if flusher, ok := w.(http.Flusher); ok {
			flusher.Flush()
		}

		rowCount++
		
		// Log progress for large result sets
		if rowCount%1000 == 0 {
			log.Printf("Streamed %d rows so far...", rowCount)
		}
	}
}

// HealthHandler provides a simple health check endpoint
func HealthHandler(w http.ResponseWriter, r *http.Request) {
	// Try a simple query to check Spark connectivity
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	_, err := sparkSession.Sql(ctx, "SELECT 1")
	if err != nil {
		w.WriteHeader(http.StatusServiceUnavailable)
		json.NewEncoder(w).Encode(map[string]interface{}{
			"status": "unhealthy",
			"spark":  "disconnected",
			"error":  err.Error(),
		})
		return
	}

	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(map[string]interface{}{
		"status": "healthy",
		"spark":  "connected",
	})
}

Demo:

Showing the effects of a complete stream and context cancellation (against a UC catalog table I get ~15MB/s):

Screen.Recording.2025-07-13.at.16.40.21.mov

From the server:

{"level":"info","component":"server","component":"rest","method":"GET","query":"SELECT id FROM RANGE(20000000)","time":"2025-07-13T16:55:50+01:00","message":"executing query"}
{"level":"debug","component":"server","query":"SELECT id FROM RANGE(20000000)","time":"2025-07-13T16:55:50+01:00","message":"starting FlushQuery"}
DEBUG: Received response type: *generated.ExecutePlanResponse_SqlCommandResult_
DEBUG: Received response type: *generated.ExecutePlanResponse_ExecutionProgress_
DEBUG: Recv error: EOF, is EOF: true // we handle this, check the code.
{"level":"info","component":"server","query":"SELECT id FROM RANGE(20000000)","rows_streamed":20000000,"time":"2025-07-13T16:57:02+01:00","message":"FlushQuery completed successfully"}

Copy link
Copy Markdown
Author

@caldempsey caldempsey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a number of places we can improve the performance of the Spark Connect Go Client where it matters, namely by pre-allocating resources and re-using pointers to those allocations to reduce GC pressure. I think this could speed things up substantially.


type ExecuteResponseStream interface {
ToTable() (*types.StructType, arrow.Table, error)
ToRecordBatches(ctx context.Context) (<-chan arrow.Record, <-chan error, *types.StructType)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added so we can consume Apache Arrow record batches rather than the entire table at once, in truth, ToTable should re-use private methods w/ ToRecordBatches, but I've avoided this to avoid breaking clients for now.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than exposing a channel directly, it might make more sense to use either an array.RecordReader or an iter.Seq2[arrow.Record, error].

Either that, or create something that wraps the two channels and produces one of those, simply to make it easier to consume for users.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll apply this comment (and all the others) in the next round of commits 🥳

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use Seq2

@caldempsey
Copy link
Copy Markdown
Author

Seem to be some issues, will resolve.

@grundprinzip
Copy link
Copy Markdown
Contributor

I will take a look at the PR. It would be great to compare to #103 and see how the native go iterators are different from what you propose here.

Thanks for your contributions!

@caldempsey
Copy link
Copy Markdown
Author

caldempsey commented Jul 13, 2025

@grundprinzip Hey dude! Thanks a lot!

This works really well but failing one edge case in my E2E test (real spark cluster, on Java (not Photon) Databricks, which is sending EOF as its 'I'm done' cue).

Happy paths work, and most of the time if you cancel the context mid-stream, everything cancels gracefully, so I believe we're doing everything right on our end. Infrequently I'm observing a panic:

signal 13 received but handler not on signal stack
mp.gsignal stack [0xc000804000 0xc00080c000], mp.g0 stack [0x7fff917fb280 0x7fff91ffae80], sp=0xc0004ade20
fatal error: non-Go code set up signal handler without SA_ONSTACK flag

I've moved this to WIP while I pepper it with debug statements (on my own master branch so I can easily pull my own go.mod) and find the root cause. Something doesn't implement signals right. I intend to find out what. To repro:

  1. Launch a HTTP server w/ a response writer flushing the output to ndjson.
  2. Start a query streaming the results out from ToLocalIterator from a fairly large table.
  3. On your 10th or so attempt of cancelling a CURL request to the server which streams the data out, you should be able to reproduce the panic and crash the HTTP server.

I took a look at your PR. Happy to accept iter semantics. For me the value is to enable streaming the records out to client consumers. This doesn't work with Collect because of the outcomes of ToTable. This is something that works in Scala's Spark Connect implemetation of ToLocalIterator w/ code that looks like this:

val jsonIt = df.toJSON.toLocalIterator()
        Iterator.unfold(1L) { rowNum =>
          if jsonIt.hasNext then
            val json = jsonIt.next()
            val row = DataLakeRow(
              rowNum = rowNum,
              fetchedAt = Instant.now(),
              payload = json
            )
            Some((row, rowNum + 1))
          else None
        }

Hoping to put some elbow grease in to get SC for Go off the ground.

@caldempsey caldempsey changed the title [SPARK-52780] Add ToLocalIterator and Arrow Record Streaming [WIP][SPARK-52780] Add ToLocalIterator and Arrow Record Streaming Jul 13, 2025
Comment on lines +566 to +567
case recordChan <- record:
// Successfully sent
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this effort just so we can do this. Again, we should opt in to a DRY implementation with ToTable.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas how to improve this? I'm not crazy familiar with golang and the idiomatic implementation of channels.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will have a think, and make a proposal, then run that by you in this thread before implementation.

Copy link
Copy Markdown
Author

@caldempsey caldempsey Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left this for now, I think we can punt this down the road, mainly because I don't think it needs to be the focus of this PR but maybe the focus of a following one

…atabricks/Spark signal done processing rows)
@caldempsey caldempsey changed the title [WIP][SPARK-52780] Add ToLocalIterator and Arrow Record Streaming [SPARK-52780] Add ToLocalIterator and Arrow Record Streaming Jul 13, 2025
Copy link
Copy Markdown
Author

@caldempsey caldempsey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@grundprinzip Hey! I worked out the panic. It looks like the error I mentioned before only occurs when running the Databricks ODBC driver adjacent to it. There has to be some kind of signal handling collision between Spark Connect and that instance. Removing the Databricks ODBC driver instance from my main thread (a pretty big testing app) resolved the issue.

signal 13 received but handler not on signal stack
mp.gsignal stack [0xc000584000 0xc00058c000], mp.g0 stack [0x7fffb07f9280 0x7fffb0ff8e80], sp=0xc000815e20
fatal error: non-Go code set up signal handler without SA_ONSTACK flag

The introduction of context cancellation to Spark Connect only came in at about v40, and I was running v34 before, which means I think it might be worth internal testing the combination of cancelled context in Spark Connect on v40 with the Databricks ODBC Driver. A pretty rare edge case, I know, but you seem to work at Databricks and might want to shout at the guys at Simba to make sure they are handling Signal gracefully. Tagging @alexott onto this one.

I think we can unblock this PR right now. I'll see if I can get a core dump to prove out the issue being related to Simba Spark ODBC, but the problem cannot be reproduced without it, and the PR otherwise seems feature complete.


type ExecuteResponseStream interface {
ToTable() (*types.StructType, arrow.Table, error)
ToRecordBatches(ctx context.Context) (<-chan arrow.Record, <-chan error, *types.StructType)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to add comments on the how this is intended to be used and what the difference to the ToTable is.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% I'll add this to the next commit.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Comment on lines +453 to +454
close(recordChan)
close(errorChan)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does close require any error handling or is this final state.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final state here. Once closed, no more sends are allowed (sending on a closed channel panics), but you can still receive any buffered items, and subsequent receives yield the zero value immediately. Then attempting to close a closed channel will panic.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is outdated now, we use seq2 for pulls which simplifies a bit

}()

// Explicitly needed when tracking re-attachable execution.
c.done = false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-attachable execution is optional, we need to make sure it works with both modes.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not exactly sure how this is supposed to be implemented, any similar code or resources you can point out before I run over this again?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this be a race condition? Should we be locking around accessing c.done?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking another look at this. I've spliced in the approach from ToTable(), originally written by @grundprinzip. Instead of writing to the shared c.done field, ToRecordSequence now tracks completion with a local done variable inside the closure.

After EOF, it checks if c.opts.ReattachExecution && !done and yields an error, the same way ToTable() does. This removes the race condition because nothing shared is being mutated, and it behaves correctly in both reattachable and non-reattachable modes.

I do think we should DRY this up eventually, but judiciously. I've kept the code WET for now intentionally. I want both code paths to remain directly comparable until I fully understand the domain. Having @grundprinzip's original logic in ToTable() sitting side by side against my equivalent in ToRecordSequence makes it much easier to reason about correctness and spot differences.

So rather than DRY it up in this PR, I'd like to take ownership of a fast follow-up where I consolidate ToRecordSequence and ToTable once this gets merged.

That said, happy to take a crack. I'm being cautious since this is an open source project, and I'd rather not introduce a negative diff that risks breaking users when the current approach is correct, comparable, and maintains parity between the two separate critical paths.

Take(ctx context.Context, limit int32) ([]types.Row, error)
// ToArrow returns the Arrow representation of the DataFrame.
ToArrow(ctx context.Context) (*arrow.Table, error)
ToLocalIterator(ctx context.Context) (types.RowIterator, error)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name local iterator is a bit misleading because this has a specific meaning in the context of Spark on the driver. I'm not opposed to naming it like this but it might lead to confusion. What about just calling it Iter().

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second question I had was about using Golangs native iterator types to support iter.Seq or iter.Seq2 essentially.

One good improvement to the current code (but can be done in a follow-up) would be to replace the All() implementation using this iterator based one so that we avoid buffering the whole table before returning the iterator.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd prefer to rename this. I don't like the name ToLocalIterator because it doesn't tell us anything about the underlying mechanism. When I tested with Scala, I was surprised to find I can use ToLocalIterator in Scala Spark Connect on streams of Arrow batches. IterArrowStream or StreamRows would be so much better. Let me know what you prefer, I'm glad we're on the same page here (for different reasons)!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be in favor of using iter.Seq and iter.Seq2

For reference, Arrow also allows using iter directly: https://pkg.go.dev/github.com/apache/arrow-go/v18/arrow/array#IterFromReader and https://pkg.go.dev/github.com/apache/arrow-go/v18/arrow/array#ReaderFromIter

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use Seq2, seemed to better match the Iterator semantics we wanted to expose

@grundprinzip
Copy link
Copy Markdown
Contributor

@caldempsey, thanks a lot for your contribution! I will take another look in early next week, but I don't see a lot of contentious pieces.

@caldempsey
Copy link
Copy Markdown
Author

I looked into the panic signal 13 received but handler not on signal stack... fatal error: non-Go code set up signal handler without SA_ONSTACK flag that was happening in my application w/ ODBC and Spark Connect loaded side by side. Turns out it only occurs when running the Databricks ODBC driver alongside Spark Connect in the same process - run either one alone and everything works fine.

I added some socket monitoring with lsof into the Go app (get its own PID and pass it through and print stats on a ticker) to see what was going on and found both libraries maintain their own connections to Databricks through port 443:

ODBC Connections:    172.17.0.2:42562->44.234.192.45:443
                     172.17.0.2:42568->44.234.192.45:443
Spark Connect:       172.17.0.2:60452->44.234.192.46:443
                     172.17.0.2:41742->44.234.192.45:443

I THINK the issue is becaauuuuse POSIX signals get delivered to the entire process, not individual sockets. I'm guessing the ODBC driver (/opt/simba/spark/lib/64/libsparkodbc_sb64.so) installs signal handlers when it loads, but doesn't use the SA_ONSTACK flag that Go requires. So when either library hits a broken pipe, the kernel sends SIGPIPE to the process, the ODBC handler catches it, and Go panics because the handler isn't set up correctly for Go's stack management.

With Spark Connect v40's new streaming and context cancellation support, SIGPIPE happens more often - cancel a context mid-stream and you'll likely trigger it. The ODBC driver's process-wide signal handler intercepts these signals regardless of which library's socket actually broke, while Spark Connect seems to handle the termination gracefully. The 'fix' here is really sweeping the issue under the rug by adding signal.Ignore(syscall.SIGPIPE) at startup e.g.

package main 

func init() {
	signal.Ignore(syscall.SIGPIPE)
}

When you call signal.Ignore(syscall.SIGPIPE), the entire process ignores SIGPIPE signals. The kernel simply doesn't deliver them to the Go process at all. Both libraries already handle EPIPE errors properly through return values, so they don't need the signal. A C++ dev I know (who I can't mention) said this is actually the modern way to handle broken pipes - check return values instead of relying on signals.

So with this, Spark Connect streaming and ODBC can coexist in the same process without issues. Pretty important for services that need both traditional ODBC connectivity (faster than gRPC) and Spark Connect's DataFrame capabilities (easier to use than ODBC).

Copy link
Copy Markdown
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working on a side project, I think this change could come in handy. I think the code is in a good spot, but would be good to add an integration test as well to show as well how to use the API.

@caldempsey
Copy link
Copy Markdown
Author

@grundprinzip Thanks Martin, yes I will get to the feedback as soon as I can, this Saturday is my next best available op. I've been vacationing + had a surgery, sorry for the delay!

@caldempsey caldempsey changed the title [SPARK-52780] Add ToLocalIterator and Arrow Record Streaming [SPARK-52780] Add StreamRows and Arrow Record Streaming Sep 3, 2025
@caldempsey
Copy link
Copy Markdown
Author

I’ve made the changes we discussed @grundprinzip @zeroshade. Few thoughts:

  • I haven’t run a full end-to-end test yet to confirm that we’re truly fetching data at client pace without leaks. The channel-based implementation already handled this case well by blocking explicitly, so I’ve passed the sequences wrapping the business logic in the same way. My main concern is converting the push iterator in ToRecordIterator into a Pull2 upstream. Hopefully this... just works, and won’t introduce leaks, but I’m still new to iterators in Go (we’ve been without them for 15 years!), so we better test E2E.

  • One thing I don’t love is this detail from the docs: “It is an error to call next or stop from multiple goroutines simultaneously.”. That limitation feels a bit restrictive, but it does simplify the model, and I don't see any reason you need more than one go-routine doing the consumption here anyway.

  • Iterators bring some nice benefits: being synchronous, guaranteeing no data race if the iterator and function body share state (although arguably modifying a data-structure while you are iterating it is slightly cursed and hard to read), has a fraction of the cost of channels + spinning up a goroutine, supports loop breaking control flow (return inside a loop body and break), while with channels you need to pass-in a cancellation channel (or context same thing) and manually close it when you break out, and here you can just... for loop over the Sequence type gracefully. That's pretty neat!

  • But again, worried that this could be a bit of a leaky abstraction as we cross the context boundaries of gRPC, client.go, and the Pull2 type (which I've aliased to make sure we know its a 'Pull' sequence, as they both share the same type, frustratingly), as I've never used Sequences before today.

  • I settled on the name 'Stream' with the Pull2 type: 'Stream' to convey the intent 'We want to stream records from Spark over time' and the type to convey 'We want the client to determine the rate Rows get consumed'. But maybe ToLocalIterator with the Seq2 type (not a Pull2) would make more sense, I'm just not exactly sure if that's the semantics we want to go for (again unfamiliar with the exact implications). Hence noted in dataframe.go:

	// StreamRows exposes a pull-based iterator over Arrow record batches from Spark types.RowPull2.
	// No rows are fetched from Spark over gRPC until the previous one has been consumed.
	// It provides no internal buffering: each Row is produced only when the caller
	// requests it, ensuring client back-pressure is respected.
	// types.RowPull2 is single use (can only be ranged once).
	StreamRows(ctx context.Context) (types.RowPull2, error)

So please don't crucify me if this isn't exactly right!!!!!

@caldempsey
Copy link
Copy Markdown
Author

While working on a side project, I think this change could come in handy. I think the code is in a good spot, but would be good to add an integration test as well to show as well how to use the API.

Will take a look at writing the integration test next step, missed this comment!

Comment on lines +75 to +78
// Treat io.EOF as clean termination (don’t forward).
if errors.Is(err, io.EOF) {
return
}
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're also pruning downstream maybe remove this

tests for channel-based processing, filtering, error handling, empty datasets, multiple columns, and large datasets.
@caldempsey
Copy link
Copy Markdown
Author

@grundprinzip @zeroshade Added a Spark test that works fine locally, but looks like the Github action is failing on wget -q https://dlcdn.apache.org/spark/spark-4.0.0/spark-4.0.0-bin-hadoop3.tgz.

I'd normally address concerns in separate PRs, but I've bumped the version inline to keep things moving. Also added an integration test demonstrating how this works with unary RPCs. All current feedback should be addressed. Let me know if anything else needs attention 🙂

Separately, I'd love to connect more with the community on this project. If there are any channels where folks discuss features, use cases, or development, I'd like to be part of them. I've been productionizing Spark Connect Go for a while now (a vote of confidence), including my own implementation of this feature with Databricks to synchronize the control loop between Spark requests and infrastructure (previously asynchronous Spark submit jobs). Would be good to share and listen to insights and experiences in the wild and smooth problems out!

Thanks ✌️

Copy link
Copy Markdown
Member

@zeroshade zeroshade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of my comments are nitpicks, there's a few things that I think are pretty important but overall this looks pretty good to me!

Comment on lines +372 to +380
if err != nil {
fmt.Printf("DEBUG: Recv error: %v, is EOF: %v\n", err, errors.Is(err, io.EOF))
}
if err == nil && resp != nil {
fmt.Printf("DEBUG: Received response type: %T\n", resp.ResponseType)
if _, ok := resp.ResponseType.(*proto.ExecutePlanResponse_ResultComplete_); ok {
fmt.Println("DEBUG: Got ResultComplete!")
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably shouldn't keep these debug lines in here. At minimum, use log.Println if we're not gonna switch to something like zerolog yet.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, a blunder, thanks

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the debug lines

Comment on lines +449 to +450
// Return Seq2 iterator that directly yields results as they arrive, upstream callers can convert this as needed
iterator := func(yield func(arrow.Record, error) bool) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just do return func(yield func(arrow.Record, error) bool) { instead of iterator := .... and return iterator

}()

// Explicitly needed when tracking re-attachable execution.
c.done = false
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this be a race condition? Should we be locking around accessing c.done?

Comment on lines +35 to +47
rows, err := func() ([]Row, error) {
defer rec.Release()
return ReadArrowRecordToRows(rec)
}()
if err != nil {
_ = yield(nil, err)
return
}
for _, row := range rows {
if !yield(row, nil) {
return
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like we could simplify this if ReadArrowRecordToRows performed the release inside the iterator etc.

Something like:

for row := range rowIterFromRecord(rec) {
    if !yield(row, nil) {
        return
    }
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So i've extracted ReadArrowRecordToRows, and that let us range over it directly as you suggested.

There was an error check that needs to be handled since ReadArrowRecordToRows can still fail, so made that explicit:

for row, err := range rowIterFromRecord(rec) {
    if err != nil {
         _ = yield(nil, err)
        return
    }
    if !yield(row, nil) {
        return
    }
}

Comment on lines +951 to +953
seq2 := responseClient.ToRecordSequence(ctx)

return types.NewRowPull2(ctx, seq2), nil
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use iter.Pull2?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, as I mentioned briefly, this is my first time using Go iterators. I was originally inspired to build a Pull abstraction from the Go range functions blog post to give callers on-demand 'pull' semantics over the tuple. I used that as my 'north star', so to speak.

But yeah, I think the iter.Pull2 conversion here is overengineering: looking at this a few months down the line this is just creating a push→pull→push, and so a roundtrip that seems completely redundant and less concise. Seems the underlying gRPC stream is inherently single-use, so we don't need to guard termination either.

I'll deal with this by taking the useful part, the EOF handler and bubble it directly into NewRowSequence and drop the iter.Pull2 conversion.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, let me know if this looks like the right idiom to you. Apologies for the greenness.

@caldempsey caldempsey requested a review from zeroshade March 1, 2026 17:10
@caldempsey
Copy link
Copy Markdown
Author

caldempsey commented Mar 1, 2026

Updated the PR addressing feedback. Removed NewRowPull2 entirely and consolidated it into NewRowSequence. Fixed the c.done race condition by having ToRecordSequence splice against ToTable. Kept things WET intentionally for now to avoid a negative diff against ToTable, will DRY up in a follow-up. Other comments addressed. Marking as ready for re-review.

return
}
if rec == nil {
_ = yield(nil, errors.New("expected arrow.Record to contain non-nil Rows, got nil"))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error here isn't quite correct, should probably be "expected non-nil arrow.Record" or something to that effect. It's not a case of "non-nil Rows" but rather the entire arrow.Record object is nil

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eagle eyes, good catch.

Comment on lines +57 to +63
if err != nil {
_ = yield(nil, err)
return
}
if !yield(row, nil) {
return
}
Copy link
Copy Markdown
Member

@zeroshade zeroshade Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this?

Suggested change
if err != nil {
_ = yield(nil, err)
return
}
if !yield(row, nil) {
return
}
cont := yield(row, err)
if err != nil || !cont {
return
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah! i wasn't sure if we wanted the explicit form of this where you make the err != nil flow painfully clear

for row, err := range rowIterFromRecord(rec) {
    if !yield(row, err) || err != nil {
        return
    }
}

is another way to hash this. but honestly? i like your version most because it preserves the semantics from a human readability standpoint better.

for row, err := range rowIterFromRecord(rec) {
    cont := yield(row, err)
    if err != nil || !cont {
        return
    }
}

will update

Copy link
Copy Markdown
Member

@zeroshade zeroshade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple comments, but otherwise looks great to me!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants