A lightweight, actor-like event processing framework for Scala 3, providing queue-based message passing, multi-threaded dispatch, pub/sub subscriptions, and scheduled event delivery.
π For comprehensive usage guide, see AI API Guide - Detailed documentation with patterns, best practices, and pitfalls.
- Lightweight Event Processors: Actor-like message processing without the overhead of full actor systems
- Configurable Queue Sizes: Customize queue capacity per processor (default: 100,000 messages)
- Multi-Threaded Dispatch: Configurable thread pools with isolated dispatcher support
- Pub/Sub Subscriptions: Channel-based broadcasting and subscription management
- Scheduled Events: Time-delayed event delivery with cancellation support
- Handler Stacking: Dynamic behavior changes using
become/unbecomepattern - Fluent Builder API: Clean, composable processor creation
- Parent-Child Hierarchies: Organize processors in supervision trees
Add to your build.sbt:
libraryDependencies += "com.mcsherrylabs" %% "sss-events" % "0.0.8"Requires Scala 3.6+ and Java 17+.
import sss.events._
// Create the event processing engine
implicit val engine = EventProcessingEngine()
engine.start()
// Create a simple event processor
val processor = new EventProcessorSupport {
override def onEvent(self: EventProcessor, event: Any): Unit = event match {
case "ping" => println("Received ping!")
case count: Int => println(s"Count: $count")
}
}
// Send messages
val ep = engine.newEventProcessor(processor)
ep ! "ping"
ep ! 42import sss.events._
implicit val engine = EventProcessingEngine()
engine.start()
val processor = engine.builder()
.withId("my-processor")
.withCreateHandler { ep =>
case "start" =>
println("Starting...")
ep.become {
case "stop" => println("Stopping...")
}
}
.build()
processor ! "start"
processor ! "stop"import sss.events._
implicit val engine = EventProcessingEngine()
engine.start()
// Subscribe to a channel
val subscriber = engine.builder()
.withCreateHandler { ep =>
case msg: String => println(s"Received: $msg")
}
.withSubscriptions("news-channel")
.build()
// Broadcast to all subscribers
engine.subscriptions.broadcast("news-channel", "Breaking news!")import sss.events._
import scala.concurrent.duration._
implicit val engine = EventProcessingEngine()
engine.start()
val processor = engine.builder()
.withCreateHandler { ep =>
case "delayed-message" => println("This message was delayed!")
}
.build()
// Schedule a message to be delivered after 5 seconds
val cancellable = engine.schedule(5.seconds, processor, "delayed-message")
// Cancel the scheduled message if needed
cancellable.cancel()import sss.events._
implicit val engine = EventProcessingEngine()
engine.start()
// Create processors on different thread pools
val fastProcessor = engine.builder()
.withDispatcher("fast")
.withCreateHandler { ep => /* handler */ }
.build()
val slowProcessor = engine.builder()
.withDispatcher("slow")
.withCreateHandler { ep => /* handler */ }
.build()Customize processor queue capacity for memory-constrained environments or burst traffic scenarios:
import sss.events._
implicit val engine = EventProcessingEngine()
engine.start()
// Small queue for low-latency, memory-constrained scenarios
val lowLatencyProcessor = engine.builder()
.withQueueSize(1000)
.withCreateHandler { ep =>
case msg => // handle message
}
.build()
// Large queue for high-throughput burst traffic
val burstProcessor = engine.builder()
.withQueueSize(500000)
.withCreateHandler { ep =>
case msg => // handle message
}
.build()
// Default queue size (100,000) when not specified
val defaultProcessor = engine.builder()
.withCreateHandler { ep =>
case msg => // handle message
}
.build()Queue Sizing Guidelines:
- 1K-10K: Low-latency scenarios, limited memory
- 10K-50K: Balanced for typical workloads
- 100K (default): High-throughput, burst traffic
- 500K+: Extreme burst scenarios (monitor memory usage)
Tradeoffs:
- Larger queues = higher throughput + more memory + higher latency
- Smaller queues = lower latency + less memory + risk of backpressure/message loss
Lightweight actor-like entity that processes events asynchronously. Each processor has:
- A message queue (default capacity: 100,000 messages)
- An event handler (partial function)
- Optional unique ID for lookups
- Optional parent processor
Central dispatch engine that:
- Manages thread pools for event processing
- Routes messages to processors
- Provides registration for processor lookup by ID
- Manages subscriptions and scheduled events
Processors can dynamically change behavior using become and unbecome:
val processor = new EventProcessorSupport {
override def onEvent(self: EventProcessor, event: Any): Unit = event match {
case "login" =>
self.become {
case "logout" => self.unbecome()
case msg => println(s"Logged in, received: $msg")
}
case other => println("Not logged in")
}
}Important: become() and unbecome() are protected methods that can only be called from within event handlers. For thread-safe behavior changes from external threads, use requestBecome() and requestUnbecome():
// From external thread - post a message to change handler
processor.requestBecome({
case "new-message" => println("New handler!")
}, stackPreviousHandler = false)
// From within handler - direct call
override def onEvent(self: EventProcessor, event: Any): Unit = event match {
case "switch" =>
self.become {
case msg => println(s"Switched: $msg")
}
}Pub/sub system for broadcasting messages to multiple processors:
// Subscribe to channels
processor.subscriptions.subscribe("channel1", "channel2")
// Broadcast to channel
engine.subscriptions.broadcast("channel1", "Hello subscribers!")
// Unsubscribe
processor.subscriptions.unsubscribe("channel1")- Message Throughput: Queue-based with 100K default capacity per processor
- Threading: Lock-based dispatcher queues with configurable thread-to-dispatcher pinning
- Scaling Efficiency: 83.4% at 16 threads with 1:1 thread-to-dispatcher mapping
- Lock Contention: Exponential backoff strategy (10ΞΌs to 10ms) reduces CPU waste
- Thread Coordination: LockSupport.park/unpark for efficient sleeping and clean shutdown
- Overhead: Minimal - no complex actor supervision or remote messaging
- Scheduling: Built-in ScheduledExecutorService for time-based events
The engine employs an exponential backoff strategy when lock contention occurs:
- Base delay: 10 microseconds (configurable)
- Growth rate: 1.5x multiplier per failed attempt
- Maximum delay: 10 milliseconds cap
- Impact: Benchmarks show < 2% variance between strategies - focus optimization on thread-to-dispatcher ratio instead
The fixed 100ΞΌs park when queues are empty maintains responsive polling without exponential delays.
The engine uses LockSupport.parkNanos() and LockSupport.unpark() for efficient thread coordination:
- Empty queue polling: Threads park for 100ΞΌs when no work available
- Clean shutdown:
unpark()wakes threads,keepGoingflag triggers graceful exit - No exceptions: Unlike interrupt-based approaches, unpark doesn't throw exceptions
- Validated: All 25 core tests pass with this mechanism
The library includes comprehensive benchmarks and stress tests to measure and verify performance:
# Run all benchmarks
sbt "benchmarks/Jmh/run"
# Run thread safety stress tests
sbt "benchmarks/test"
# Quick smoke test
sbt "benchmarks/Jmh/run -wi 1 -i 1"For detailed information on benchmarking, stress tests, and interpreting results, see benchmarks/README.md.
Typical Performance (modern hardware):
- Single processor throughput: 30K-100K messages/second
- Concurrent scaling: Linear up to thread pool size
- Handler switching overhead: 50-200 microseconds
Use sss-events when:
- You need lightweight in-process message passing
- You want minimal dependencies and overhead
- You don't need distributed actors or clustering
- You want simple pub/sub without actor selection complexity
Use Akka when:
- You need distributed actors across multiple JVMs
- You require sophisticated supervision strategies
- You need features like cluster sharding or persistence
- You're building large-scale reactive systems
EventProcessingEngine
βββ Thread-to-Dispatcher Pinning
β βββ Configurable thread assignments
β βββ Lock-based dispatcher queues
β βββ Type-safe DispatcherName
β βββ Exponential backoff on contention
βββ Dedicated Dispatchers
β βββ "subscriptions" - Dedicated subscription thread
β βββ "" (default) - General purpose
β βββ Custom dispatchers (user-defined)
βββ Configuration (Typesafe Config)
β βββ Centralized ConfigFactory
β βββ Thread-dispatcher assignments
β βββ Backoff policy tuning
βββ Registrar (ID-based lookup)
βββ Subscriptions (Pub/Sub)
βββ Scheduler (Delayed events)
EventProcessor
βββ Message Queue (LinkedBlockingQueue)
βββ Handler Stack (become/unbecome)
βββ Parent Reference (optional)
βββ Dispatcher Assignment (type-safe)
βββ Subscriptions (channels)
The library uses a centralized configuration pattern following best practices:
- Single ConfigFactory Instance: System-level configuration loaded once via
AppConfig.config - Type-Safe Configuration: All engine settings validated at startup
- Flexible Thread Assignment: Configure thread-to-dispatcher mappings via
application.conf
sss-events.engine {
scheduler-pool-size = 2
# Thread-to-dispatcher assignment
# First thread is dedicated to "subscriptions" dispatcher
thread-dispatcher-assignment = [
["subscriptions"], # Thread 0: Subscriptions (required)
[""], # Thread 1: Default dispatcher
["api"], # Thread 2: API workload
["background"] # Thread 3: Background tasks
]
# Exponential backoff on lock contention
backoff {
base-delay-micros = 10
multiplier = 1.5
max-delay-micros = 10000
}
}Dispatcher names are type-safe using the DispatcherName case class:
import sss.events.DispatcherName
// Pre-defined dispatchers
DispatcherName.Default // "" (default dispatcher)
DispatcherName.Subscriptions // "subscriptions" (dedicated)
// Custom dispatchers
val apiDispatcher = DispatcherName("api")
// Use in builder
val processor = engine.builder()
.withDispatcher(apiDispatcher)
.withCreateHandler { ep => /* handler */ }
.build()The engine uses lock-based dispatcher queues with configurable thread assignments:
- 1:1 mapping achieves 83.4% scaling efficiency (validated via benchmarks)
- Exponential backoff reduces CPU waste during lock contention
- LockSupport.unpark for clean thread coordination and shutdown
For detailed configuration guidance, see docs/best-practices/thread-dispatcher-configuration.md.
- Graceful Shutdown Semantics - Comprehensive guide to stopping processors safely, queue draining, timeouts, and best practices
- Thread-Dispatcher Configuration - Performance tuning, configuration patterns, and optimization strategies
- Architecture: Dispatcher Queue Contention - Implementation details and design decisions
- Benchmark Comparison - Performance benchmarks and scaling analysis
- Testing and Validation - Test coverage and validation results
- Message queues are thread-safe (LinkedBlockingQueue)
- Multiple threads can send to the same processor safely
- Handler execution is single-threaded per processor
- Subscription operations are synchronized
This project is licensed under the GPL3 License - see LICENSE for details.
Contributions are welcome! Please feel free to submit a Pull Request.
- Alan McSherry - mcsherrylabs