Reader – high-level consumer¶
Reader(topic, channel, message_handler=None, name=None, nsqd_tcp_addresses=None, lookupd_http_addresses=None, max_tries=5, max_in_flight=1, lookupd_poll_interval=60, low_rdy_idle_timeout=10, max_backoff_duration=128, lookupd_poll_jitter=0.3, lookupd_connect_timeout=1, lookupd_request_timeout=2, **kwargs)¶
Reader provides high-level functionality for building robust NSQ consumers in Python on top of the async module.
Reader receives messages over the specified
message_handlerfor each message (up to
Multiple readers can be instantiated in a single process (to consume from multiple topics/channels at once).
Supports various hooks to modify behavior when heartbeats are received, to temporarily disable the reader, and pre-process/validate messages.
When supplied a list of
nsqlookupdaddresses, it will periodically poll those addresses to discover new producers of the specified
It maintains a sufficient RDY count based on the # of producers and your configured
Handlers should be defined as shown in the examples below. The handler receives a
nsq.Messageobject that has instance methods
nsq.Message.touch()to respond to
When messages are not responded to explicitly, it is responsible for sending
REQcommands based on return value of
message_handler. When re-queueing, it will backoff from processing additional messages for an increasing delay (calculated exponentially based on consecutive failures up to
import nsq def handler(message): print message return True r = nsq.Reader(message_handler=handler, lookupd_http_addresses=['http://127.0.0.1:4161'], topic='nsq_reader', channel='asdf', lookupd_poll_interval=15) nsq.run()
import nsq buf =  def process_message(message): global buf message.enable_async() # cache the message for later processing buf.append(message) if len(buf) >= 3: for msg in buf: print msg msg.finish() buf =  else: print 'deferring processing' r = nsq.Reader(message_handler=process_message, lookupd_http_addresses=['http://127.0.0.1:4161'], topic='nsq_reader', channel='async', max_in_flight=9) nsq.run()
- message_handler – the callable that will be executed for each message received
- topic – specifies the desired NSQ topic
- channel – specifies the desired NSQ channel
- name – a string that is used for logging messages (defaults to ‘topic:channel’)
- nsqd_tcp_addresses – a sequence of string addresses of the nsqd instances this reader should connect to
- lookupd_http_addresses – a sequence of string addresses of the nsqlookupd instances this reader should query for producers of the specified topic
- max_tries – the maximum number of attempts the reader will make to process a message after which messages will be automatically discarded
- max_in_flight – the maximum number of messages this reader will pipeline for processing. this value will be divided evenly amongst the configured/discovered nsqd producers
- lookupd_poll_interval – the amount of time in seconds between querying all of the supplied nsqlookupd instances. a random amount of time based on thie value will be initially introduced in order to add jitter when multiple readers are running
- lookupd_poll_jitter – The maximum fractional amount of jitter to add to the lookupd pool loop. This helps evenly distribute requests even if multiple consumers restart at the same time.
- lookupd_connect_timeout – the amount of time in seconds to wait for
a connection to
nsqlookupdto be established
- lookupd_request_timeout – the amount of time in seconds to wait for
a request to
- low_rdy_idle_timeout – the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers)
- max_backoff_duration – the maximum time we will allow a backoff state to last in seconds
- **kwargs – passed to
Closes all connections stops all periodic callbacks
Adds a connection to
nsqdat the specified address.
- host – the address to connect to
- port – the port to connect to
Called as part of RDY handling to identify whether this Reader has been disabled
This is useful to subclass and override to examine a file on disk or a key in cache to identify if this reader should pause execution (during a deploy, etc.).
Note: deprecated. Use set_max_in_flight(0)
Called when a message has been received where
msg.attempts > max_tries
This is useful to subclass and override to perform a task (such as writing to disk, etc.)
Parameters: message – the
Called whenever a heartbeat has been received
This is useful to subclass and override to perform an action based on liveness (for monitoring, etc.)
Parameters: conn – the
nsq.AsyncConnover which the heartbeat was received
Used to identify when buffered messages should be processed and responded to.
When max_in_flight > 1 and you’re batching messages together to perform work is isn’t possible to just compare the len of your list of buffered messages against your configured max_in_flight (because max_in_flight may not be evenly divisible by the number of producers you’re connected to, ie. you might never get that many messages... it’s a max).
def message_handler(self, nsq_msg, reader): # buffer messages if reader.is_starved(): # perform work reader = nsq.Reader(...) reader.set_message_handler(functools.partial(message_handler, reader=reader)) nsq.run()
Called when a message is received in order to execute the configured
This is useful to subclass and override if you want to change how your message handlers are called.
Parameters: message – the
Trigger a query of the configured
dynamically adjust the reader max_in_flight count. Set to 0 to immediately disable a Reader
Assigns the callback method to be executed for each message received
Parameters: message_handler – a callable that takes a single argument