Producers

nsq.run()

Starts any instantiated nsq.Reader or nsq.Writer

Writer – high-level producer

class nsq.Writer(nsqd_tcp_addresses, heartbeat_interval=30)

A high-level producer class built on top of the Tornado IOLoop supporting async publishing (PUB & MPUB) of messages to nsqd over the TCP protocol.

Example publishing a message repeatedly using a Tornado IOLoop periodic callback:

import nsq
import tornado.ioloop
import time

def pub_message():
    writer.pub('test', time.strftime('%H:%M:%S'), finish_pub)

def finish_pub(conn, data):
    print data

writer = nsq.Writer(["127.0.0.1:4150"])
tornado.ioloop.PeriodicCallback(pub_message, 1000).start()
nsq.run()

Example publshing a message from a Tornado HTTP request handler:

import functools
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from nsq import Writer, Error
from tornado.options import define, options

class MainHandler(tornado.web.RequestHandler):
    @property
    def nsq(self):
        return self.application.nsq
    
    def get(self):
        topic = "log"
        msg = "Hello world"
        msg_cn = "Hello 世界"
        
        self.nsq.pub(topic, msg) # pub
        self.nsq.mpub(topic, [msg, msg_cn]) # mpub
        
        # customize callback
        callback = functools.partial(self.finish_pub, topic=topic, msg=msg)
        self.nsq.pub(topic, msg, callback=callback)
        
        self.write(msg)
    
    def finish_pub(self, conn, data, topic, msg):
        if isinstance(data, Error):
            # try to re-pub message again if pub failed
            self.nsq.pub(topic, msg)

class Application(tornado.web.Application):
    def __init__(self, handlers, **settings):
        self.nsq = Writer(["127.0.0.1:4150"])
        super(Application, self).__init__(handlers, **settings)
Parameters:
  • nsqd_tcp_addresses – a sequence of (addresses, port) of the nsqd instances this writer should publish to
  • heartbeat_interval – the interval in seconds to configure heartbeats w/ nsqd