Writer – high-level producer

class nsq.Writer(nsqd_tcp_addresses, reconnect_interval=15.0, name=None, **kwargs)

A high-level producer class built on top of the Tornado IOLoop supporting async publishing (PUB & MPUB & DPUB) of messages to nsqd over the TCP protocol.

Example publishing a message repeatedly using a Tornado IOLoop periodic callback:

import nsq
import tornado.ioloop
import time

def pub_message():
    writer.pub('test', time.strftime('%H:%M:%S'), finish_pub)

def finish_pub(conn, data):
    print(data)

writer = nsq.Writer(['127.0.0.1:4150'])
tornado.ioloop.PeriodicCallback(pub_message, 1000).start()
nsq.run()

Example publshing a message from a Tornado HTTP request handler:

import functools
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from nsq import Writer, Error
from tornado.options import define, options

class MainHandler(tornado.web.RequestHandler):
    @property
    def nsq(self):
        return self.application.nsq

    def get(self):
        topic = 'log'
        msg = 'Hello world'
        msg_cn = 'Hello 世界'

        self.nsq.pub(topic, msg) # pub
        self.nsq.mpub(topic, [msg, msg_cn]) # mpub
        self.nsq.dpub(topic, 60, msg) # dpub

        # customize callback
        callback = functools.partial(self.finish_pub, topic=topic, msg=msg)
        self.nsq.pub(topic, msg, callback=callback)

        self.write(msg)

    def finish_pub(self, conn, data, topic, msg):
        if isinstance(data, Error):
            # try to re-pub message again if pub failed
            self.nsq.pub(topic, msg)

class Application(tornado.web.Application):
    def __init__(self, handlers, **settings):
        self.nsq = Writer(['127.0.0.1:4150'])
        super(Application, self).__init__(handlers, **settings)
Parameters:
  • nsqd_tcp_addresses – a sequence with elements of the form ‘address:port’ corresponding to the nsqd instances this writer should publish to
  • name – a string that is used for logging messages (defaults to first nsqd address)
  • **kwargs – passed to nsq.AsyncConn initialization
heartbeat(conn)

Called whenever a heartbeat has been received

This is useful to subclass and override to perform an action based on liveness (for monitoring, etc.)

Parameters:conn – the nsq.AsyncConn over which the heartbeat was received
nsq.run()

Starts any instantiated nsq.Reader or nsq.Writer