Skip to content

Research: Binary Export Format (Protobuf/MessagePack) #36

@lexasub

Description

@lexasub

Research: Binary Export Format for AST Graph

🎯 Goal

Evaluate and implement a binary export format for the AST graph to enable:

  • Efficient data exchange between tools
  • Offline analysis without database
  • Backup and restore functionality
  • Integration with external analysis tools

📋 Current State

  • ✅ Graph stored in Neo4j (Bolt protocol)
  • ✅ Vector embeddings in Qdrant (HTTP API)
  • ✅ Configuration in JSON format
  • ✅ Parse cache in SQLite
  • ❌ No graph export functionality
  • ❌ No binary serialization
  • ❌ No backup/restore mechanism
  • ❌ No integration with external tools

🔍 Research Questions

1. Format Selection

Candidates:

Format Pros Cons Use Case
Protobuf Strong typing, schema evolution, fast Requires .proto files, compilation Inter-tool communication
MessagePack Simple, no schema needed, compact No built-in validation Quick serialization
Pickle Python-native, easy Python-only, security risks Internal caching
Apache Arrow Columnar, analytics-friendly Complex, overkill for graphs Data analysis
GraphML Standard for graphs, XML-based Verbose, text-based Interchange format
GEXF Graph exchange format Limited tooling Visualization tools
Custom binary Optimized for our schema Maintenance burden Performance-critical

Recommended: Protobuf + MessagePack fallback

Rationale:

  • Protobuf for structured data exchange (schema validation)
  • MessagePack for quick internal serialization
  • Both support streaming (large graphs)

2. Schema Design (Protobuf)

Proposed .proto file:

syntax = "proto3";

package ast_rag;

// Node types matching our AST
enum NodeKind {
  NODE_KIND_UNSPECIFIED = 0;
  NODE_KIND_PROJECT = 1;
  NODE_KIND_PACKAGE = 2;
  NODE_KIND_MODULE = 3;
  NODE_KIND_FILE = 4;
  NODE_KIND_CLASS = 5;
  NODE_KIND_INTERFACE = 6;
  NODE_KIND_FUNCTION = 7;
  NODE_KIND_METHOD = 8;
  // ... etc
}

enum EdgeKind {
  EDGE_KIND_UNSPECIFIED = 0;
  EDGE_KIND_CONTAINS_FILE = 1;
  EDGE_KIND_CALLS = 2;
  EDGE_KIND_IMPORTS = 3;
  EDGE_KIND_INHERITS = 4;
  // ... etc
}

message ASTNode {
  string id = 1;
  NodeKind kind = 2;
  string name = 3;
  string qualified_name = 4;
  string lang = 5;
  string file_path = 6;
  int32 start_line = 7;
  int32 end_line = 8;
  optional string signature = 9;
  optional string source_text = 10;
  string project_id = 11;
  string valid_from = 12;
  optional string valid_to = 13;
  map<string, string> metadata = 14;
}

message ASTEdge {
  string id = 1;
  EdgeKind kind = 2;
  string from_id = 3;
  string to_id = 4;
  optional string label = 5;
  float confidence = 6;
  string resolution_method = 7;
  string valid_from = 8;
  optional string valid_to = 9;
  map<string, string> metadata = 10;
}

message ASTGraph {
  string project_id = 1;
  string export_timestamp = 2;
  string git_commit = 3;
  repeated ASTNode nodes = 4;
  repeated ASTEdge edges = 5;
  map<string, bytes> embeddings = 6;  // node_id -> vector bytes
  ExportMetadata metadata = 7;
}

message ExportMetadata {
  string ast_rag_version = 1;
  int32 node_count = 2;
  int32 edge_count = 3;
  repeated string languages = 4;
  int64 total_size_bytes = 5;
}

3. Implementation Approaches

Option A: Full Graph Export

def export_graph(driver: Driver, output_path: str, format: str = "protobuf") -> None:
    nodes = fetch_all_nodes(driver)
    edges = fetch_all_edges(driver)
    embeddings = fetch_all_embeddings(driver)
    
    graph = ASTGraph(nodes=nodes, edges=edges, embeddings=embeddings)
    
    if format == "protobuf":
        with open(output_path, "wb") as f:
            f.write(graph.SerializeToString())
    elif format == "msgpack":
        with open(output_path, "wb") as f:
            msgpack.pack(asdict(graph), f)

Option B: Streaming Export (for large graphs)

def export_graph_streaming(driver: Driver, output_path: str) -> None:
    with open(output_path, "wb") as f:
        # Write header
        f.write(MAGIC_HEADER)
        
        # Stream nodes
        for node in fetch_nodes_streaming(driver):
            write_delimited(f, node)
        
        # Stream edges
        for edge in fetch_edges_streaming(driver):
            write_delimited(f, edge)

Option C: Incremental Export

def export_incremental(
    driver: Driver, 
    output_dir: str,
    from_commit: str,
    to_commit: str,
) -> None:
    # Export only changed nodes/edges
    diff = compute_diff(driver, from_commit, to_commit)
    
    # Write delta file
    delta = GraphDelta(
        added_nodes=diff.added,
        deleted_nodes=diff.deleted,
        updated_nodes=diff.updated,
        added_edges=diff.added_edges,
        deleted_edges=diff.deleted_edges,
    )
    
    with open(f"{output_dir}/delta_{to_commit}.pb", "wb") as f:
        f.write(delta.SerializeToString())

4. Import/Restore

Requirements:

  • Validate schema on import
  • Handle version mismatches
  • Support partial imports (merge with existing)
  • Conflict resolution (same node ID, different data)

API:

def import_graph(
    driver: Driver,
    input_path: str,
    mode: str = "replace",  # or "merge"
    project_id: Optional[str] = None,
) -> ImportResult:
    # Load from protobuf/msgpack
    graph = load_graph(input_path)
    
    # Validate
    errors = validate_graph(graph)
    if errors:
        raise ValidationError(errors)
    
    # Import
    with driver.session() as session:
        if mode == "replace":
            session.run("MATCH (n) WHERE n.project_id = $pid DELETE n", pid=project_id)
        
        batch_upsert_nodes(session, graph.nodes)
        batch_upsert_edges(session, graph.edges)
    
    return ImportResult(
        nodes_imported=len(graph.nodes),
        edges_imported=len(graph.edges),
    )

5. Compression & Size Optimization

Techniques:

Technique Savings Complexity
Gzip compression 60-80% Low
String interning 20-30% Medium
Delta encoding (IDs) 10-20% Medium
Remove redundant fields 5-10% Low
Quantize embeddings 50-75% High (lossy)

Recommended:

  • Default: Gzip + Protobuf
  • Optional: String interning for qualified names
  • Embeddings: Store separately (already in Qdrant)

6. Security Considerations

Risks:

  • Pickle deserialization attacks
  • Malicious graph data (cycles, huge payloads)
  • Source code exposure (if source_text included)

Mitigations:

  • Never use pickle for untrusted data
  • Validate graph size limits
  • Option to exclude source_text
  • Signature verification for exports

📐 Proposed Implementation

Phase 1: Protobuf Schema & Generation

Files:

  • proto/ast_rag.proto - Schema definition
  • ast_rag/proto/ast_rag_pb2.py - Generated code
  • requirements.txt - Add protobuf>=4.0

Commands:

pip install protobuf
protoc --python_out=ast_rag proto/ast_rag.proto

Phase 2: Export Service

File: ast_rag/services/export_service.py (new)

class GraphExporter:
    def __init__(self, driver: Driver, qdrant_config: QdrantConfig):
        self._driver = driver
        self._qdrant_config = qdrant_config
    
    def export_protobuf(
        self, 
        output_path: str, 
        project_id: str,
        include_source: bool = False,
        include_embeddings: bool = True,
        compress: bool = True,
    ) -> ExportResult:
        # Fetch data
        nodes = self._fetch_nodes(project_id, include_source)
        edges = self._fetch_edges(project_id)
        embeddings = self._fetch_embeddings(project_id) if include_embeddings else {}
        
        # Build protobuf message
        graph = ast_rag_pb2.ASTGraph(
            project_id=project_id,
            export_timestamp=datetime.utcnow().isoformat(),
            nodes=[node_to_proto(n) for n in nodes],
            edges=[edge_to_proto(e) for e in edges],
            embeddings={k: v.tobytes() for k, v in embeddings.items()},
        )
        
        # Write
        data = graph.SerializeToString()
        if compress:
            data = gzip.compress(data)
        
        with open(output_path, "wb") as f:
            f.write(data)
        
        return ExportResult(
            node_count=len(nodes),
            edge_count=len(edges),
            size_bytes=len(data),
        )

Phase 3: Import Service

File: ast_rag/services/import_service.py (new)

class GraphImporter:
    def import_protobuf(
        self,
        input_path: str,
        project_id: Optional[str] = None,
        mode: str = "merge",
    ) -> ImportResult:
        # Read
        with open(input_path, "rb") as f:
            data = f.read()
        
        # Decompress if needed
        try:
            data = gzip.decompress(data)
        except:
            pass
        
        # Parse
        graph = ast_rag_pb2.ASTGraph()
        graph.ParseFromString(data)
        
        # Validate
        self._validate(graph)
        
        # Import
        with self._driver.session() as session:
            if mode == "replace" and project_id:
                session.run("MATCH (n) WHERE n.project_id = $pid DELETE n", pid=project_id)
            
            nodes = [proto_to_node(n, project_id) for n in graph.nodes]
            edges = [proto_to_edge(e) for e in graph.edges]
            
            batch_upsert_nodes(session, group_by_label(nodes))
            batch_upsert_edges(session, edges)
        
        return ImportResult(...)

Phase 4: CLI Integration

File: ast_rag/cli.py

@app.command("export")
def export_graph(
    output: str = typer.Argument(..., help="Output file path"),
    project_id: Optional[str] = typer.Option(None, "--project", "-p"),
    format: str = typer.Option("protobuf", "--format", "-f"),
    compress: bool = typer.Option(True, "--compress/--no-compress"),
    include_source: bool = typer.Option(False, "--include-source"),
) -> None:
    """Export graph to binary format."""
    cfg = _load_config()
    driver = create_driver(cfg.neo4j)
    exporter = GraphExporter(driver, cfg.qdrant)
    
    with console.status("Exporting graph..."):
        result = exporter.export_protobuf(
            output,
            project_id=project_id or cfg.neo4j.project_id,
            compress=compress,
            include_source=include_source,
        )
    
    console.print(f"[green]✓ Exported {result.node_count} nodes, {result.edge_count} edges")
    console.print(f"  Size: {result.size_bytes / 1024 / 1024:.2f} MB")

@app.command("import")
def import_graph(
    input_path: str = typer.Argument(..., help="Input file path"),
    project_id: Optional[str] = typer.Option(None, "--project", "-p"),
    mode: str = typer.Option("merge", "--mode", "-m"),
) -> None:
    """Import graph from binary format."""
    cfg = _load_config()
    driver = create_driver(cfg.neo4j)
    importer = GraphImporter(driver)
    
    with console.status("Importing graph..."):
        result = importer.import_protobuf(
            input_path,
            project_id=project_id,
            mode=mode,
        )
    
    console.print(f"[green]✓ Imported {result.nodes_imported} nodes, {result.edges_imported} edges")

⚠️ Risks & Mitigations

Risk Impact Mitigation
Large export files Medium Streaming export, compression
Version incompatibility Medium Schema versioning, backward compat
Memory issues (large graphs) High Streaming, batch processing
Security (malicious imports) High Validation, size limits, no pickle

📊 Success Metrics

  • Export 100k nodes in <30 seconds
  • Import 100k nodes in <60 seconds
  • Compression ratio >60%
  • CLI commands export and import work
  • Round-trip (export→import) preserves all data

📚 References

🎯 Deliverables

  1. Research document (this file)
  2. proto/ast_rag.proto schema
  3. GraphExporter and GraphImporter classes
  4. CLI commands: export, import
  5. Documentation and examples

Labels: research, enhancement, export, protobuf
Priority: Medium
Estimated Research Time: 1-2 days

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    Status

    Spike/Need Analytics

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions