Skip to content

Implement: Bounded LRU AST Cache with Memory LimitsΒ #39

@lexasub

Description

@lexasub

Implementation Guide: Bounded LRU AST Cache

🎯 Goal

Implement a bounded LRU cache for AST trees with limits on both entry count and memory usage, preventing memory exhaustion during large project indexing.

πŸ“‹ Current State

What exists:

  • βœ… ParseCache - Simple in-memory cache (no limits)
  • βœ… SQLiteParseCache - Persistent cache with LRU by timestamp
  • ❌ No memory-based eviction
  • ❌ No combined entry+memory limits

What's missing:

  • ❌ BoundedASTCache class with dual limits
  • ❌ Memory monitoring
  • ❌ Configurable eviction policies

πŸ”§ Implementation Plan

Step 1: Create BoundedASTCache Class

File: ast_rag/utils/bounded_ast_cache.py (new)

"""
Bounded LRU AST Cache with memory limits.

Provides O(1) cache operations with automatic eviction based on:
- Maximum entry count
- Maximum memory usage (in MB)
"""

import sys
from collections import OrderedDict
from pathlib import Path
from typing import Any, ItemsView, Optional, Tuple

from tree_sitter import Node

from ..dto.enums import Language


class BoundedASTCache:
    """
    LRU cache for AST trees with bounded size and memory usage.
    
    Attributes:
        max_entries: Maximum number of entries (default: 10000)
        max_memory_mb: Maximum memory usage in MB (default: 500)
    """
    
    def __init__(
        self,
        max_entries: Optional[int] = None,
        max_memory_mb: Optional[int] = None,
    ):
        """
        Initialize bounded cache.
        
        Args:
            max_entries: Max entries before eviction. Default: 10000
            max_memory_mb: Max memory in MB before eviction. Default: 500
        """
        # Cache: file_path -> (tree_node, language)
        self._cache: OrderedDict[Path, Tuple[Node, Language]] = OrderedDict()
        
        # Limits
        self.max_entries = max_entries if max_entries is not None else 10000
        self.max_memory_bytes = (
            (max_memory_mb if max_memory_mb is not None else 500) * 1024 * 1024
        )
        
        # Track approximate memory usage
        self._memory_usage: int = 0
    
    def __getitem__(self, key: Path) -> Tuple[Node, Language]:
        """Get item from cache, moving it to end (most recently used)."""
        value = self._cache[key]  # Raises KeyError if not found
        self._cache.move_to_end(key)
        return value
    
    def __setitem__(self, key: Path, value: Tuple[Node, Language]) -> None:
        """Add item to cache, evicting if necessary."""
        if key in self._cache:
            # Update existing entry
            old_value = self._cache[key]
            self._memory_usage -= self._estimate_size(old_value)
            del self._cache[key]
        
        # Add new entry
        self._cache[key] = value
        self._memory_usage += self._estimate_size(value)
        
        # Enforce limits
        self._enforce_limits()
    
    def __delitem__(self, key: Path) -> None:
        """Remove item from cache."""
        if key in self._cache:
            value = self._cache[key]
            self._memory_usage -= self._estimate_size(value)
            del self._cache[key]
    
    def __contains__(self, key: Path) -> bool:
        """Check if key is in cache."""
        return key in self._cache
    
    def __len__(self) -> int:
        """Return number of entries in cache."""
        return len(self._cache)
    
    def __iter__(self):
        """Iterate over cache keys."""
        return iter(self._cache)
    
    def items(self) -> ItemsView[Path, Tuple[Node, Language]]:
        """Return all items in cache."""
        return self._cache.items()
    
    def clear(self) -> None:
        """Clear all entries from cache."""
        self._cache.clear()
        self._memory_usage = 0
    
    def get_memory_usage_mb(self) -> float:
        """Return current memory usage in MB."""
        return self._memory_usage / (1024 * 1024)
    
    def get_stats(self) -> dict[str, Any]:
        """Return cache statistics."""
        return {
            "entries": len(self._cache),
            "max_entries": self.max_entries,
            "memory_mb": self.get_memory_usage_mb(),
            "max_memory_mb": self.max_memory_bytes / (1024 * 1024),
            "utilization_entries": len(self._cache) / self.max_entries,
            "utilization_memory": self._memory_usage / self.max_memory_bytes,
        }
    
    def _estimate_size(self, value: Tuple[Node, Language]) -> int:
        """
        Estimate memory size of a cache entry.
        
        Args:
            value: (tree_node, language) tuple
        
        Returns:
            Estimated size in bytes
        """
        node, lang = value
        
        # Rough estimation:
        # - Node object: ~200 bytes base
        # - Plus ~50 bytes per child node
        # - Language enum: ~50 bytes
        
        try:
            # Count descendants for better estimation
            child_count = self._count_descendants(node)
            return 200 + (child_count * 50) + 50
        except Exception:
            # Fallback: fixed size per entry
            return 1024  # 1KB per entry
    
    def _count_descendants(self, node: Node) -> int:
        """Count all descendant nodes."""
        count = 0
        for child in node.children:
            count += 1
            count += self._count_descendants(child)
        return count
    
    def _enforce_limits(self) -> None:
        """Evict entries to enforce limits."""
        # First: enforce entry count limit
        while len(self._cache) > self.max_entries:
            self._evict_oldest()
        
        # Second: enforce memory limit
        while self._memory_usage > self.max_memory_bytes and len(self._cache) > 0:
            self._evict_oldest()
    
    def _evict_oldest(self) -> None:
        """Evict the oldest (least recently used) entry."""
        if not self._cache:
            return
        
        # Get oldest item (first in OrderedDict)
        key, value = next(iter(self._cache.items()))
        self._memory_usage -= self._estimate_size(value)
        del self._cache[key]
    
    def resize(
        self,
        max_entries: Optional[int] = None,
        max_memory_mb: Optional[int] = None,
    ) -> None:
        """
        Resize cache limits and evict if necessary.
        
        Args:
            max_entries: New max entries limit
            max_memory_mb: New max memory limit in MB
        """
        if max_entries is not None:
            self.max_entries = max_entries
        if max_memory_mb is not None:
            self.max_memory_bytes = max_memory_mb * 1024 * 1024
        
        # Evict if needed
        self._enforce_limits()

Step 2: Integration with ParserManager

File: ast_rag/services/parsing/parser_manager.py

Update ParserManager to use bounded cache:

from ..utils.bounded_ast_cache import BoundedASTCache

class ParserManager:
    def __init__(
        self,
        cache: Optional[Any] = None,  # Can be BoundedASTCache or SQLiteParseCache
        config: Optional[dict] = None,
        project_id: str = "default",
        cache_max_entries: Optional[int] = None,
        cache_max_memory_mb: Optional[int] = None,
    ) -> None:
        # ... existing init
        
        # Initialize cache
        if cache is not None:
            self._cache = cache
        else:
            # Use bounded cache by default
            self._cache = BoundedASTCache(
                max_entries=cache_max_entries,
                max_memory_mb=cache_max_memory_mb,
            )

Step 3: Update Configuration

File: ast_rag/dto/config.py

Add cache configuration:

class ProjectConfig(BaseModel):
    # ... existing fields
    
    # Cache configuration
    cache_max_entries: int = 10000
    cache_max_memory_mb: int = 500

File: ast_rag_config.json

{
  "neo4j": { ... },
  "qdrant": { ... },
  "cache_max_entries": 10000,
  "cache_max_memory_mb": 500
}

Step 4: CLI Options

File: ast_rag/cli.py

Add cache configuration options:

@app.command("index")
def index_project(
    # ... existing options
    cache_max_entries: int = typer.Option(10000, "--cache-entries", help="Max cache entries"),
    cache_max_memory_mb: int = typer.Option(500, "--cache-memory-mb", help="Max cache memory (MB)"),
) -> None:
    """Index a codebase."""
    cfg = _load_config()
    
    pm = ParserManager(
        project_id=cfg.neo4j.project_id,
        cache_max_entries=cache_max_entries or cfg.cache_max_entries,
        cache_max_memory_mb=cache_max_memory_mb or cfg.cache_max_memory_mb,
    )
    
    # ... indexing

Step 5: Cache Statistics Command

File: ast_rag/cli.py

Add cache stats command:

@app.command("cache-stats")
def cache_stats(
    config: Optional[str] = typer.Option(None, "--config", "-c"),
) -> None:
    """Show cache statistics (for debugging)."""
    cfg = _load_config(config)
    
    # Create parser manager with current config
    pm = ParserManager(
        project_id=cfg.neo4j.project_id,
        cache_max_entries=cfg.cache_max_entries,
        cache_max_memory_mb=cfg.cache_max_memory_mb,
    )
    
    if hasattr(pm._cache, "get_stats"):
        stats = pm._cache.get_stats()
        console.print("\n[bold]Cache Statistics:[/bold]")
        console.print(f"  Entries: {stats['entries']}/{stats['max_entries']} ({stats['utilization_entries']:.1%})")
        console.print(f"  Memory: {stats['memory_mb']:.2f} MB / {stats['max_memory_mb']:.2f} MB ({stats['utilization_memory']:.1%})")
    else:
        console.print("[yellow]Cache type does not support statistics[/yellow]")

Step 6: Monitoring & Logging

File: ast_rag/services/parsing/parser_manager.py

Add cache hit/miss logging:

import logging

logger = logging.getLogger(__name__)

class ParserManager:
    def __init__(self, ...):
        # ... existing
        self._cache_hits = 0
        self._cache_misses = 0
    
    def parse_file(self, file_path: str, source: Optional[bytes] = None) -> Tree:
        """Parse a file with caching."""
        path = Path(file_path)
        
        # Check cache
        if path in self._cache:
            self._cache_hits += 1
            logger.debug("Cache hit: %s", file_path)
            return self._cache[path][0]  # Return tree
        
        # Parse
        self._cache_misses += 1
        tree = self._parse_file(file_path, source)
        
        # Cache result
        self._cache[path] = (tree, lang)
        
        return tree
    
    def get_cache_stats(self) -> dict[str, Any]:
        """Get cache statistics including hit rate."""
        total = self._cache_hits + self._cache_misses
        hit_rate = self._cache_hits / total if total > 0 else 0.0
        
        stats = {
            "hits": self._cache_hits,
            "misses": self._cache_misses,
            "hit_rate": hit_rate,
        }
        
        if hasattr(self._cache, "get_stats"):
            stats.update(self._cache.get_stats())
        
        return stats

πŸ§ͺ Testing

def test_bounded_cache():
    cache = BoundedASTCache(max_entries=100, max_memory_mb=10)
    
    # Add entries
    for i in range(150):
        key = Path(f"/test/file_{i}.py")
        value = (mock_node(), Language.PYTHON)
        cache[key] = value
    
    # Should have evicted oldest entries
    assert len(cache) <= 100
    
    # Memory should be within limit
    assert cache.get_memory_usage_mb() <= 10

def test_cache_resize():
    cache = BoundedASTCache(max_entries=100)
    
    # Add entries
    for i in range(100):
        cache[Path(f"/test/file_{i}.py")] = (mock_node(), Language.PYTHON)
    
    # Resize to smaller
    cache.resize(max_entries=50)
    
    # Should have evicted entries
    assert len(cache) <= 50

πŸ“ Files to Create/Modify

Create:

  1. ast_rag/utils/bounded_ast_cache.py - New cache class

Modify:

  1. ast_rag/services/parsing/parser_manager.py - Use bounded cache
  2. ast_rag/dto/config.py - Add cache config fields
  3. ast_rag/cli.py - Add cache options and stats command
  4. ast_rag_config.json - Add cache configuration

⏱️ Estimated Time

  • 3-4 hours for implementation
  • 1-2 hours for testing
  • 30 minutes for documentation

Labels: enhancement, performance, memory-management
Priority: Medium
Implementation Time: 4-6 hours

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions