SkillAgentSearch skills...

Streamsql

Lightweight SQL-based stream processing engine for IoT edge.

Install / Use

/learn @rulego/Streamsql
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

StreamSQL

GoDoc Go Report CI RELEASE codecov Mentioned in Awesome Go

English| 简体中文

StreamSQL is a lightweight, SQL-based stream processing engine for IoT edge, enabling efficient data processing and analysis on unbounded streams.

📖 Documentation | Similar to: Apache Flink

Features

  • Lightweight
    • Pure in-memory operations
    • No dependencies
  • Data processing with SQL syntax
    • Nested field access: Support dot notation syntax (device.info.name) and array indexing (sensors[0].value) for accessing nested structured data
  • Data analysis
    • Built-in multiple window types: sliding window, tumbling window, counting window
    • Built-in aggregate functions: MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
    • Support for group-by aggregation
    • Support for filtering conditions
  • High extensibility
    • Flexible function extension provided
    • Integration with the RuleGo ecosystem to expand input and output sources using RuleGo components
  • Integration with RuleGo
    • Utilize the rich and flexible input, output, and processing components of RuleGo to achieve data source access and integration with third-party systems

Installation

go get github.com/rulego/streamsql

Usage

StreamSQL supports two main processing modes for different business scenarios:

Non-Aggregation Mode - Real-time Data Transformation and Filtering

Suitable for scenarios requiring real-time response and low latency, where each data record is processed and output immediately.

Typical Use Cases:

  • Data Cleaning: Clean and standardize dirty data from IoT devices
  • Real-time Alerting: Monitor key metrics and alert immediately when thresholds are exceeded
  • Data Enrichment: Add calculated fields and business labels to raw data
  • Format Conversion: Convert data to formats required by downstream systems
  • Data Routing: Route data to different processing channels based on content
package main

import (
	"fmt"
	"time"
	"github.com/rulego/streamsql"
)

func main() {
	// Create StreamSQL instance
	ssql := streamsql.New()
	defer ssql.Stop()

	// Non-aggregation SQL: Real-time data transformation and filtering
	// Feature: Each input data is processed immediately, no need to wait for windows
	rsql := `SELECT deviceId, 
	                UPPER(deviceType) as device_type,
	                temperature * 1.8 + 32 as temp_fahrenheit,
	                CASE WHEN temperature > 30 THEN 'hot'
	                     WHEN temperature < 15 THEN 'cold'
	                     ELSE 'normal' END as temp_category,
	                CONCAT(location, '-', deviceId) as full_identifier,
	                NOW() as processed_time
	         FROM stream 
	         WHERE temperature > 0 AND STARTSWITH(deviceId, 'sensor')`

	err := ssql.Execute(rsql)
	if err != nil {
		panic(err)
	}

	// Handle real-time transformation results
    ssql.AddSink(func(results []map[string]interface{}) {
        fmt.Printf("Real-time result: %+v\n", results)
    })

	// Simulate sensor data input
	sensorData := []map[string]interface{}{
		{
			"deviceId":     "sensor001",
			"deviceType":   "temperature", 
			"temperature":  25.0,
			"location":     "warehouse-A",
		},
		{
			"deviceId":     "sensor002",
			"deviceType":   "humidity",
			"temperature":  32.5,
			"location":     "warehouse-B", 
		},
		{
			"deviceId":     "pump001",  // Will be filtered out
			"deviceType":   "actuator",
			"temperature":  20.0,
			"location":     "factory",
		},
	}

	// Process data one by one, each will output results immediately
	for _, data := range sensorData {
		ssql.Emit(data)
        //changedData,err:=ssql.EmitSync(data) //Synchronize to obtain processing results
		time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
	}

	time.Sleep(500 * time.Millisecond) // Wait for processing completion
}

Aggregation Mode - Windowed Statistical Analysis

Suitable for scenarios requiring statistical analysis and batch processing, collecting data over a period of time for aggregated computation.

Typical Use Cases:

  • Monitoring Dashboard: Display real-time statistical charts of device operational status
  • Performance Analysis: Analyze key metrics like QPS, latency, etc.
  • Anomaly Detection: Detect data anomalies based on statistical models
  • Report Generation: Generate various business reports periodically
  • Trend Analysis: Analyze data trends and patterns
package main

import (
	"context"
	"fmt"
	"time"

	"math/rand"
	"sync"
	"github.com/rulego/streamsql"
)

// StreamSQL Usage Example
// This example demonstrates the complete workflow of StreamSQL: from instance creation to data processing and result handling
func main() {
	// Step 1: Create StreamSQL Instance
	// StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle
	ssql := streamsql.New()
    defer ssql.Stop()
	// Step 2: Define Stream SQL Query Statement
	// This SQL statement showcases StreamSQL's core capabilities:
	// - SELECT: Choose output fields and aggregation functions
	// - FROM stream: Specify the data source as stream data
	// - WHERE: Filter condition, excluding device3 data
	// - GROUP BY: Group by deviceId, combined with tumbling window for aggregation
	// - TumblingWindow('5s'): 5-second tumbling window, triggers computation every 5 seconds
	// - avg(), min(): Aggregation functions for calculating average and minimum values
	// - window_start(), window_end(): Window functions to get window start and end times
	rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," +
		"window_start() as start,window_end() as end FROM  stream  where deviceId!='device3' group by deviceId,TumblingWindow('5s')"
	
	// Step 3: Execute SQL Statement and Start Stream Analysis Task
	// The Execute method parses SQL, builds execution plan, initializes window manager and aggregators
	err := ssql.Execute(rsql)
	if err != nil {
		panic(err)
	}
	
	// Step 4: Setup Test Environment and Concurrency Control
	var wg sync.WaitGroup
	wg.Add(1)
	// Set 30-second test timeout to prevent infinite running
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	
	// Step 5: Start Data Producer Goroutine
	// Simulate real-time data stream, continuously feeding data into StreamSQL
	go func() {
		defer wg.Done()
		// Create ticker to trigger data generation every second
		ticker := time.NewTicker(1 * time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-ticker.C:
				// Generate 10 random test data points per second, simulating high-frequency data stream
				// This data density tests StreamSQL's real-time processing capability
				for i := 0; i < 10; i++ {
					// Construct device data containing deviceId, temperature, and humidity
					randomData := map[string]interface{}{
						"deviceId":    fmt.Sprintf("device%d", rand.Intn(2)+1), // Randomly select device1 or device2
						"temperature": 20.0 + rand.Float64()*10,                // Temperature range: 20-30 degrees
						"humidity":    50.0 + rand.Float64()*20,                // Humidity range: 50-70%
					}
					// Add data to stream, triggering StreamSQL's real-time processing
                    // Emit distributes data to corresponding windows and aggregators
                    ssql.Emit(randomData)
				}

			case <-ctx.Done():
				// Timeout or cancellation signal, stop data generation
				return
			}
		}
	}()

	// Step 6: Setup Result Processing Pipeline
	resultChan := make(chan interface{})
	// Add computation result callback function (Sink)
    // When window triggers computation, results are output through this callback
    ssql.AddSink(func(results []map[string]interface{}) {
        resultChan <- results
    })
	
	// Step 7: Start Result Consumer Goroutine
	// Count received results for effect verification
	resultCount := 0
	go func() {
		for result := range resultChan {
			// Print results when window computation is triggered (every 5 seconds)
			// This demonstrates StreamSQL's window-based aggregation results
			fmt.Printf("Window Result [%s]: %v\n", time.Now().Format("15:04:05.000"), result)
			resultCount++
		}
	}()
	
	// Step 8: Wait for Processing Completion
	// Wait for data producer goroutine to finish (30-second timeout or manual cancellation)
	wg.Wait()
	
	// Step 9: Display Final Statistics
	// Show total number of window results received during the test period
	fmt.Printf("\nTotal window results received: %d\n", resultCount)
	fmt.Println("StreamSQL processing completed successfully!")
}

Nested Field Access

StreamSQL supports querying nested structured data using dot notation (.) syntax to access nested fields:

// Nested field access example
package main

import (
	"fmt"
View on GitHub
GitHub Stars55
CategoryData
Updated19d ago
Forks5

Languages

Go

Security Score

100/100

Audited on Mar 11, 2026

No findings