mitallast / scala-nsq

NSQ client library on scala, based on netty


Maven Central

Scala NSQ client

Scala NSQ client, based on netty, typesafe config, slf4j and json4s.

Dependency management

Client requires scala 2.11.

For SBT users:

libraryDependencies += "com.github.mitallast" %% "scala-nsq" % "1.10"

For Maven users:



Configuration based on typesafe config library. See reference for available configuration options

Create client instance

By default, client connect to nsqlookupd node at, you can override default settings using standard typesafehub/config ways.

For consumer instance, client send GET /lookup?topic=... for concrete topic, and send GET /nodes for provider instance to retrieve list of addresses nsqd nodes and connect to them.

and request GET /lookup?topic=... for concrete topic, o

import org.mitallast.nsq._

val client = NSQClient()

Also, you can provide it programmatically:

import org.mitallast.nsq._
import com.typesafe.config.Config

val config: Config = ...
val client = NSQClient(config)

By default, client use NSQLookupDefault with configurable addresses to nsqlookupd instances. By default, address using. Set config property nsq.lookup-address and lookup-period to override.

Also, you can implement trait NSQLookup and provide it programmatically:

import org.mitallast.nsq._

val lookup = new NSQLookup {
    def nodes(): List[SocketAddress] = ...
    def lookup(topic: String): List[SocketAddress] = ...

val client = NSQClient(lookup)

// or with config:
val config: Config = ...
val client = NSQClient(lookup, config)

Producer API

val producer = client.producer()

val listener = match {
  case Success(_:OK) =>"message pub successfully")
  case Failure(error) =>"message pub failed", error)

// publish one message"test", data=Array[Byte](1,0,1,1)).onComplete(listener)
producer.pubStr(topic="test", data="hello").onComplete(listener)

// publish multiple messages
producer.mpub(topic="test", data=Seq(Array[Byte](1,0,1,1), Array[Byte](1,0,1,1))).onComplete(listener)
producer.mpubStr(topic="test", data=Seq("hello", "world")).onComplete(listener)

Consumer API

Consumer automatically send RDY <number>\n command. By default, 1 message. Set config property nsq.max-ready-count to override.

val consumer = client.consumer(topic="test", channel="default") { message =>"received: {}", msg)
    // send `TOUCH msgid` message request 
    // send `REQ msdid 100` message request
    // send `FIN msgid` message request