Skip to content

sjsj0/rainstorm

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RainStorm

Overview

RainStorm is a distributed stream processing system built on top of a hybrid distributed file system (HyDFS). Inspired by Apache Spark, Apache Flink, and HDFS/Cassandra design principles, RainStorm provides fault-tolerant, scalable stream processing with exactly-once semantics. The implementation is in Go and is intended to run across multiple VMs. The project combines three coordinated subsystems:

  • A membership and failure-detection subsystem that provides dynamic group membership, protocol switching, and an HTTP admin interface.
  • A HyDFS storage and routing subsystem that stores logical files as manifests + chunked data, uses a consistent-hash ring for replica placement, and performs re-replication and GC when the ring changes.
  • A RainStorm stream processing engine with leader-based orchestration, worker agents for distributed task execution, and support for stateful operators with exactly-once processing guarantees.

Key implementation points:

  • Membership daemon (src/membership/memberd) supports two protocols: gossip and ping/ack. The active protocol is configurable at runtime and piggybacks cluster config updates on protocol messages.
  • Failure detection is handled via a suspicion.Manager that can be enabled/disabled and tuned (TSuspect/TFail/TCleanup). The Ping/Ack loop uses an inflight tracker to observe ping timeouts.
  • An introducer TCP server assists new nodes to join the cluster; nodes can also be configured as introducers.
  • The membership store maintains member entries, incarnation numbers, and state transitions and exposes a simple HTTP API for inspection and admin operations.
  • HyDFS routing and replication are implemented with a token ring (src/hydfs/ring) and a ReplicaManager that computes local push/GC plans on ring changes and executes transfers via an HTTP mover.
  • Storage is chunk/manifest-based (src/hydfs/storage): manifests record ordered append operations (with client IDs, sequences, timestamps and chunk lists) and are used to deterministically merge concurrent appends.
  • RainStorm stream processing (src/rainstorm) implements a leader-worker architecture where the leader manages resource allocation and task scheduling, while workers execute stream operations using pluggable operator binaries.
  • Stream operators (grep, transform, aggregate) are implemented as standalone wrapper binaries in src/rainstorm/binaries/ that can be dynamically invoked by worker agents.
  • Exactly-once processing semantics are provided through source tracking, idempotent state updates, and transaction-like append operations coordinated between the leader and workers.
  • A control client/daemon (src/ctl/rainstormctl_daemon.go) provides a CLI-like interface to issue file operations and submit stream processing jobs against the HyDFS and RainStorm daemons.

Features

Membership & Failure Detection

  • Protocols: supports both gossip and ping/ack membership protocols with runtime switching. Both protocols piggyback cluster config updates onto normal messages.
  • Failure detection & suspicion: a configurable suspicion manager (TSuspect/TFail/TCleanup) detects suspect/failed nodes; ping/ack mode uses timeout-based inflight tracking.
  • Introducer-based bootstrapping: nodes can join via an introducer TCP server; the introducer is optional and runs in its own goroutine in the daemon.
  • Membership store & HTTP admin: membership state is stored and exposed via an HTTP admin API (endpoints: /get, /set, /list_mem, /list_self, /display_suspects, /display_protocol, /switch, /leave).

HyDFS Distributed File System

  • Consistent-hash ring & replica management: ring tokens determine replica sets; ReplicaManager computes per-node local plans (pushes + GC) when nodes join/leave and executes them via an HTTP mover.
  • Chunked storage + manifests: logical files are represented by manifests (ordered append ops) and streaming chunk data; manifests provide deterministic merge and versioning semantics for concurrent appends.
  • Re-replication & garbage collection: when the ring changes the node planner computes which token ranges to push to peers and which ranges to GC locally to restore the target replication factor.
  • File operations: rainstormctl (in src/ctl) can connect to the daemon for file operations (create, append, get, liststore, getfromreplica, multicreate/multiappend, merge, etc.).

RainStorm Stream Processing

  • Leader-worker architecture: centralized leader handles resource management, task scheduling, and coordination; distributed workers execute stream processing tasks.
  • Pluggable operators: stream operations (grep, transform, aggregateByKey) are implemented as standalone wrapper binaries that workers invoke dynamically.
  • Exactly-once semantics: source operators track processed records, and workers use idempotent state updates to guarantee each record is processed exactly once even in the presence of failures.
  • Stateful processing: support for aggregation operators with key-based state management and periodic state checkpointing to HyDFS.
  • Fault tolerance: automatic task rescheduling on worker failures; state recovery from HyDFS-backed checkpoints.
  • Data parallelism: tasks can be distributed across multiple workers with automatic load balancing and data partitioning.

Configuration & Monitoring

  • Runtime-configurable knobs: many parameters (PingEvery, GossipPeriod, PingFanout/GossipFanout, DropRateRecv, TSuspect/TFail/TCleanup, ports) are exposed through config.json and can be live-tuned via the HTTP API.
  • HTTP-based monitoring: REST endpoints for cluster state inspection, job submission, and administrative operations.

Directory Structure

config.json                  # System configuration file (cluster knobs and runtime parameters)
README.md                    # Project documentation
dataset/                     # Test datasets and sample files used for experiments
  business_*.txt             # Sample input files (20 business datasets)
  text_*.txt                 # Text files of various sizes (1K to 1024K) for throughput/latency tests
llm/                         # LLM notes and chat logs
  chat.txt                   # Conversation / notes
pyspark/                     # PySpark application examples
  app1.py                    # PySpark application 1
  app2.py                    # PySpark application 2
rainstorm_dataset/           # Rainstorm-specific datasets
  dataset1.csv               # Dataset 1
  dataset2.csv               # Dataset 2
  hotels.csv                 # Hotels dataset
  transactions.csv           # Transactions dataset
  trial_data.csv             # Trial data
setup/                       # Scripts for VM setup and orchestration
  setup.bash                 # Local single-VM setup/install script
  vm_setup.bash              # Multi-VM orchestration script (setup/start/stop)
  start_server.bash          # Starts servers/daemons across nodes
  kill.bash                  # Kills running test processes / daemons

src/                         # Source code (Go)
  go.mod                     # Go module file (dependencies)
  config/
    config.go                # Configuration structures, parsing, DTO and live-apply logic
  ctl/
    rainstormctl_daemon.go   # Rainstorm control client/daemon: CLI for stream processing ops
  hydfs/
    cmd/
      hydfsd/
        hydfs_daemon.go      # HyDFS daemon entrypoint for storage node (starts HTTP + replica manager)
      server/
        http_server.go       # HTTP API used for client/replica endpoints (upload, download, replica management)
    ring/
      replica_manager.go     # Plans local push/GC on ring changes and executes re-replication via HTTPMover
      ring_manager.go        # Builds/updates the logical token ring from membership information
      ring.go                # Ring utilities: tokens, predecessor/successor, node iteration
    routing/
      router.go              # Maps file tokens to replica set (replica selection logic)
    storage/
      chunk_store.go         # Low-level chunk storage (read/write chunk blobs)
      file_enumerator.go     # Enumerates files & tokens for re-replication and GC
      file_store.go          # High-level file store: manifest+chunks management and stream IO
      fs_paths.go            # Filesystem layout and path helpers for manifests/chunks
      manifest.go            # Manifest structure and deterministic append/merge logic (append ops)
    utils/
      ascii_ring.go          # ASCII helpers for pretty-printing ring/token maps
      ids.go                 # ID/token utilities and hashing helpers
  main/
    main.go                  # Local hydfs/membership binary (convenience runner)
  membership/
    memberd/
      config_diff.go         # Utilities to compute and log config diffs
      membership_daemon.go   # Membership daemon: starts gossip/pingack loops, introducer, HTTP admin
    node/
      nodeid.go              # NodeID, endpoint and timestamp helpers
    protocol/
      common/
        inflight.go          # Inflight tracker used by ping/ack for nonce/timeouts
        peers.go             # Peer selection helpers used by both gossip and ping/ack
      gossip/     
        loop.go              # Gossip protocol loop: periodic gossip + recv and merge
      pingack/     
        loop.go              # Ping/Ack protocol loop: periodic ping, ACK handling, inflight tracking
    store/     
      membership.go          # Membership store: entries, incarnation, merge rules and snapshot
      subscriber.go          # Subscriber interface for components that react to membership changes
    suspicion/     
      manager.go             # Suspicion manager: scanner that marks suspect/fail and issues GC/cleanup
    transport/     
      udp.go                 # UDP wrapper used by both protocols (Read/Write helpers)
    wire/     
      codec.go               # Wire encoding/decoding of membership messages (ping/ack/gossip)
  rainstorm/                 # Rainstorm stream processing engine
    binaries/                # Compiled operator binaries
      aggregate-wrapper      # Aggregation operator wrapper
      echo-wrapper           # Echo operator wrapper
      grep-wrapper           # Grep operator wrapper
      grep-wrapper2          # Grep operator wrapper (variant 2)
      transform-wrapper      # Transform operator wrapper
    binaries_source/         # Source code for operator binaries
      aggregateByKey/
        aggregate-wrapper.go # Aggregation operator source
      grep/
        grep-wrapper.go      # Grep operator source
      grep2/
        grep-wrapper2.go     # Grep operator source (variant 2)
      transform/
        transform-wrapper.go # Transform operator source
    cmd/
      client/
        http_client.go       # HTTP client for Rainstorm operations
      rainstormd/
        rainstorm_daemon.go  # Rainstorm daemon entrypoint
      server/
        http_server.go       # HTTP server for Rainstorm leader/worker communication
    core/
      control.go             # Control logic for stream processing
      types.go               # Core type definitions
    leader/
      leader.go              # Leader election and coordination
      resource_manager.go    # Resource allocation and management
      source_exactly_once.go # Exactly-once source semantics
      source.go              # Source operator implementation
    utils/
      http_utils.go          # HTTP utilities for Rainstorm
      routing_utils.go       # Routing utilities
    worker/
      agent.go               # Worker agent for task execution
      exactly_once.go        # Exactly-once processing semantics
      task_runtime.go        # Task runtime and execution
  utils/
    colors.go                # Terminal color constants used in logging
    utils.go                 # Generic helpers (DNS resolution, string utilities)

Getting Started

To get started with rainstorm-g33, follow these steps:

Clone the Repository

git clone <your-repo-url>
cd rainstorm

Installation & Setup

Prerequisites

  • Go 1.18+
  • Bash (for setup scripts)
  • Access to multiple VMs or containers (recommended for distributed testing)

Install Dependencies

cd src
# Install Go dependencies
go mod tidy

How to Run

1. VM Setup & Run

Use the provided scripts in the setup/ directory to initialize and start the system:

# Setup VMs and environment
bash setup/vm_setup.bash
# Start the server
bash setup/start_server.bash

On each VM, run the setup script to install dependencies and clone the repo:

cd setup
bash setup.bash

Or, to automate setup and startup across all VMs from one machine:

bash vm_setup.bash setup   # Setup all VMs
bash vm_setup.bash start   # Start servers on all VMs

2. Start the RainStorm Daemon

If the automation scripts were run, the RainStorm daemon will already be running on each VM. You can attach to the session (if using tmux):

sudo tmux -S /tmp/tmux-cs-425-mp4.sock attach-session -t cs-425-shared-mp4

Or run directly:

--> if introducer:
    sudo tmux -S /tmp/tmux-cs-425-mp4.sock new-session -d -s cs-425-shared-mp4 "cd /home/mp4/rainstorm-g33/src/main && go run main.go -is-introducer -stage-config-map 'stage0 echo-wrapper 100 200 stage1 echo-wrapper x2 stage2 echo-wrapper' "
--> else:
    sudo tmux -S /tmp/tmux-cs-425-mp4.sock new-session -d -s cs-425-shared-mp4 "cd /home/mp4/rainstorm-g33/src/main && go run main.go -introducer=fa25-cs425-3301.cs.illinois.edu:6000 -stage-config-map 'stage0 echo-wrapper 100 200 stage1 echo-wrapper x2 stage2 echo-wrapper' "

3. RainStorm CTL Daemon

To interact with the rainstorm daemon, you can use the rainstormctl daemon located in src/ctl/rainstormctl_daemon.go. This tool allows you to send commands and queries to the running rainstorm daemon.

sudo tmux -S /tmp/tmux-cs-425-mp4-ctl.sock attach-session -t cs-425-shared-mp4-ctl

Or run directly:

sudo tmux -S /tmp/tmux-cs-425-mp4-ctl.sock new-session -d -s cs-425-shared-mp4-ctl "cd /home/mp4/rainstorm-g33/src/ctl && go run rainstormctl_daemon.go -is-ctl-client"

4. RainStorm Daemon Features

The RainStorm daemon provides the following features:

  • Stream processing with leader-worker architecture for distributed task execution
  • Exactly-once processing semantics with fault tolerance and automatic recovery
  • HyDFS integration for distributed storage with ring management, data replication, and routing
  • HTTP server for configuration, monitoring, and job submission
  • Membership management with gossip/pingack protocols and failure detection

5. Monitor Membership & Failures

Membership and failure events are logged and can be monitored via the daemon output. Configuration is managed via config.json and command-line flags.

Usage

  • Configuration is managed via config.json.
  • Main entry point for the RainStorm system is in src/main/main.go.
  • RainStorm daemon entrypoint is in src/rainstorm/cmd/rainstormd/.
  • HyDFS daemon entrypoint is in src/hydfs/cmd/hydfsd/.
  • Control client/daemon is in src/ctl/rainstormctl_daemon.go.
  • Membership subsystem is in src/membership/:
    • Membership, gossip, and failure detection logic are in src/membership/protocol/, src/membership/store/, and src/membership/suspicion/.
  • HyDFS subsystem is in src/hydfs/:
    • Storage and routing logic are in src/hydfs/storage/, src/hydfs/ring/, and src/hydfs/routing/.
  • RainStorm stream processing subsystem is in src/rainstorm/:
    • Leader coordination in src/rainstorm/leader/
    • Worker agents in src/rainstorm/worker/
    • Operator binaries in src/rainstorm/binaries/ with source in src/rainstorm/binaries_source/

Ports Information

  • 5000: Membership protocol port (UDP). Each node runs its main membership and gossip/ping-ack protocol on this port.
  • 6000: Introducer port (TCP). Used for new nodes joining the group; the introducer listens here for join requests.
  • 8080: HTTP admin server port (TCP). Used for changing configuration parameters at runtime via HTTP requests and monitoring membership.
  • 10010: HyDFS daemon port (TCP). Used for HyDFS file operations (create, append, get, merge, etc.).
  • 15000: RainStorm daemon port (TCP). Used for RainStorm stream processing operations and leader-worker communication.

RainStorm Stream Processing Commands

The rainstormctl daemon supports various commands for stream processing operations. Use the rainstormctl daemon to send commands to the RainStorm daemon.

Example Commands

  • Submit a stream processing job:

    rainstorm 2 3 -stage-config-map 'stage0 grep-wrapper Parking 4 stage1 aggregate-wrapper' dataset1.csv out1.txt true false 100 20 80
    
  • List running jobs:

    list_tasks
    
  • Kill a running job:

    kill_task <VMNo> <PID>
    

HyDFS Daemon Commands

The HyDFS daemon supports various commands for file operations and system management. Use the rainstormctl daemon to send commands to the HyDFS daemon.

Example Commands

  • Create a file:
    create <localfilename> <HyDFSfilename>
    
  • Append to a file:
    append <localfilename> <HyDFSfilename>
    
  • Download a file:
    get <HyDFSfilename> <localfilename>
    
  • Merge file replicas:
    merge <HyDFSfilename>
    
  • List file replicas:
    ls <HyDFSfilename>
    
  • List all stored files:
    liststore
    
  • Download from a specific replica:
    getfromreplica <VMAddress> <HyDFSfilename> <localfilename>
    
  • List ring node IDs and tokens:
    list_mem_ids
    
  • Multiple file operations:
    multiappend <HyDFSfilename> <VMi> <VMj> ... <localfilei> <localfilej> ...
    
  • Create multiple files:
    multicreate n <localfile> ...
    
  • Help command:
    help
    
  • Exit the rainstormctl daemon:
    exit | quit
    

Membership API Endpoints

The HTTP server (port 8080) exposes several endpoints for monitoring and controlling the distributed group membership system. Example usage with curl:

--> Example VM address used here: http://fa25-cs425-3301.cs.illinois.edu:8080

  • Get current configuration:
    curl http://fa25-cs425-3301.cs.illinois.edu:8080/get | python -m json.tool --sort-keys
  • Set configuration (from deployed config.json file):
    curl http://fa25-cs425-3301.cs.illinois.edu:8080/set | python -m json.tool --sort-keys
  • List all members:
    curl http://fa25-cs425-3301.cs.illinois.edu:8080/list_mem | python -m json.tool --sort-keys
  • List self information:
    curl http://fa25-cs425-3301.cs.illinois.edu:8080/list_self | python -m json.tool --sort-keys
  • Display suspected failed nodes:
    curl http://fa25-cs425-3301.cs.illinois.edu:8080/display_suspects | python -m json.tool --sort-keys
  • Display protocol information:
    curl http://fa25-cs425-3301.cs.illinois.edu:8080/display_protocol | python -m json.tool --sort-keys
  • Leave the group:
    curl http://fa25-cs425-3301.cs.illinois.edu:8080/leave | python -m json.tool --sort-keys
  • Switch protocol/suspicion settings: To switch to pingack protocol with suspicion disabled:
    curl -s -X POST http://fa25-cs425-3301.cs.illinois.edu:8080/switch -d '{"protocol":"pingack","suspicion":"disabled"}' -H 'content-type: application/json' | python -m json.tool --sort-keys
    To switch to gossip protocol with suspicion disabled:
    curl -s -X POST http://fa25-cs425-3301.cs.illinois.edu:8080/switch -d '{"protocol":"gossip","suspicion":"disabled"}' -H 'content-type: application/json' | python -m json.tool --sort-keys
    To enable suspicion with a custom suspicion time (e.g., 1s) for pingack protocol:
    curl -s -X POST http://fa25-cs425-3301.cs.illinois.edu:8080/switch -d '{"protocol":"pingack","suspicion":"enabled","suspicion_time":"1s"}' -H 'content-type: application/json' | python -m json.tool --sort-keys
    To enable suspicion with a custom suspicion time (e.g., 1s) for gossip protocol:
    curl -s -X POST http://fa25-cs425-3301.cs.illinois.edu:8080/switch -d '{"protocol":"gossip","suspicion":"enabled","suspicion_time":"1s"}' -H 'content-type: application/json' | python -m json.tool --sort-keys

These endpoints allow you to query and modify the system state, view membership and suspicion lists, and control protocol settings at runtime.

Contributing

Contributions are welcome!

Authors & Acknowledgments

  • Group 33, Distributed Systems (CS 425), Fall 2025
  • Special thanks to course staff and contributors

About

RainStorm is a distributed stream processing system built on top of a hybrid distributed file system (HyDFS). Inspired by Apache Spark, Apache Flink, and HDFS/Cassandra design principles, RainStorm provides fault-tolerant, scalable stream processing with exactly-once semantics. The implementation is in Go and is intended to run across multiple VMs.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors