Writer
– high-level producer¶
-
class
nsq.
Writer
(nsqd_tcp_addresses, reconnect_interval=15.0, name=None, **kwargs)¶ A high-level producer class built on top of the Tornado IOLoop supporting async publishing (
PUB
&MPUB
&DPUB
) of messages tonsqd
over the TCP protocol.Example publishing a message repeatedly using a Tornado IOLoop periodic callback:
import nsq import tornado.ioloop import time def pub_message(): writer.pub('test', time.strftime('%H:%M:%S'), finish_pub) def finish_pub(conn, data): print(data) writer = nsq.Writer(['127.0.0.1:4150']) tornado.ioloop.PeriodicCallback(pub_message, 1000).start() nsq.run()
Example publshing a message from a Tornado HTTP request handler:
import functools import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web from nsq import Writer, Error from tornado.options import define, options class MainHandler(tornado.web.RequestHandler): @property def nsq(self): return self.application.nsq def get(self): topic = 'log' msg = 'Hello world' msg_cn = 'Hello 世界' self.nsq.pub(topic, msg) # pub self.nsq.mpub(topic, [msg, msg_cn]) # mpub self.nsq.dpub(topic, 60, msg) # dpub # customize callback callback = functools.partial(self.finish_pub, topic=topic, msg=msg) self.nsq.pub(topic, msg, callback=callback) self.write(msg) def finish_pub(self, conn, data, topic, msg): if isinstance(data, Error): # try to re-pub message again if pub failed self.nsq.pub(topic, msg) class Application(tornado.web.Application): def __init__(self, handlers, **settings): self.nsq = Writer(['127.0.0.1:4150']) super(Application, self).__init__(handlers, **settings)
Parameters: - nsqd_tcp_addresses – a sequence with elements of the form ‘address:port’ corresponding
to the
nsqd
instances this writer should publish to - name – a string that is used for logging messages (defaults to first nsqd address)
- **kwargs – passed to
nsq.AsyncConn
initialization
-
heartbeat
(conn)¶ Called whenever a heartbeat has been received
This is useful to subclass and override to perform an action based on liveness (for monitoring, etc.)
Parameters: conn – the nsq.AsyncConn
over which the heartbeat was received
- nsqd_tcp_addresses – a sequence with elements of the form ‘address:port’ corresponding
to the
-
nsq.
run
() Starts any instantiated
nsq.Reader
ornsq.Writer