Reader
– high-level consumer¶
-
class
nsq.
Reader
(topic, channel, message_handler=None, name=None, nsqd_tcp_addresses=None, lookupd_http_addresses=None, max_tries=5, max_in_flight=1, lookupd_poll_interval=60, low_rdy_idle_timeout=10, max_backoff_duration=128, lookupd_poll_jitter=0.3, lookupd_connect_timeout=1, lookupd_request_timeout=2, **kwargs)¶ Reader provides high-level functionality for building robust NSQ consumers in Python on top of the async module.
Reader receives messages over the specified
topic/channel
and callsmessage_handler
for each message (up tomax_tries
).Multiple readers can be instantiated in a single process (to consume from multiple topics/channels at once).
Supports various hooks to modify behavior when heartbeats are received, to temporarily disable the reader, and pre-process/validate messages.
When supplied a list of
nsqlookupd
addresses, it will periodically poll those addresses to discover new producers of the specifiedtopic
.It maintains a sufficient RDY count based on the # of producers and your configured
max_in_flight
.Handlers should be defined as shown in the examples below. The handler receives a
nsq.Message
object that has instance methodsnsq.Message.finish()
,nsq.Message.requeue()
, andnsq.Message.touch()
to respond tonsqd
.When messages are not responded to explicitly, it is responsible for sending
FIN
orREQ
commands based on return value ofmessage_handler
. When re-queueing, it will backoff from processing additional messages for an increasing delay (calculated exponentially based on consecutive failures up tomax_backoff_duration
).Synchronous example:
import nsq def handler(message): print message return True r = nsq.Reader(message_handler=handler, lookupd_http_addresses=['http://127.0.0.1:4161'], topic='nsq_reader', channel='asdf', lookupd_poll_interval=15) nsq.run()
Asynchronous example:
import nsq buf = [] def process_message(message): global buf message.enable_async() # cache the message for later processing buf.append(message) if len(buf) >= 3: for msg in buf: print msg msg.finish() buf = [] else: print 'deferring processing' r = nsq.Reader(message_handler=process_message, lookupd_http_addresses=['http://127.0.0.1:4161'], topic='nsq_reader', channel='async', max_in_flight=9) nsq.run()
Parameters: - message_handler – the callable that will be executed for each message received
- topic – specifies the desired NSQ topic
- channel – specifies the desired NSQ channel
- name – a string that is used for logging messages (defaults to ‘topic:channel’)
- nsqd_tcp_addresses – a sequence of string addresses of the nsqd instances this reader should connect to
- lookupd_http_addresses – a sequence of string addresses of the nsqlookupd instances this reader should query for producers of the specified topic
- max_tries – the maximum number of attempts the reader will make to process a message after which messages will be automatically discarded
- max_in_flight – the maximum number of messages this reader will pipeline for processing. this value will be divided evenly amongst the configured/discovered nsqd producers
- lookupd_poll_interval – the amount of time in seconds between querying all of the supplied nsqlookupd instances. a random amount of time based on thie value will be initially introduced in order to add jitter when multiple readers are running
- lookupd_poll_jitter – The maximum fractional amount of jitter to add to the lookupd pool loop. This helps evenly distribute requests even if multiple consumers restart at the same time.
- lookupd_connect_timeout – the amount of time in seconds to wait for
a connection to
nsqlookupd
to be established - lookupd_request_timeout – the amount of time in seconds to wait for
a request to
nsqlookupd
to complete. - low_rdy_idle_timeout – the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers)
- max_backoff_duration – the maximum time we will allow a backoff state to last in seconds
- **kwargs – passed to
nsq.AsyncConn
initialization
-
close
()¶ Closes all connections stops all periodic callbacks
-
connect_to_nsqd
(host, port)¶ Adds a connection to
nsqd
at the specified address.Parameters: - host – the address to connect to
- port – the port to connect to
-
classmethod
disabled
()¶ Called as part of RDY handling to identify whether this Reader has been disabled
This is useful to subclass and override to examine a file on disk or a key in cache to identify if this reader should pause execution (during a deploy, etc.).
Note: deprecated. Use set_max_in_flight(0)
-
giving_up
(message)¶ Called when a message has been received where
msg.attempts > max_tries
This is useful to subclass and override to perform a task (such as writing to disk, etc.)
Parameters: message – the nsq.Message
received
-
heartbeat
(conn)¶ Called whenever a heartbeat has been received
This is useful to subclass and override to perform an action based on liveness (for monitoring, etc.)
Parameters: conn – the nsq.AsyncConn
over which the heartbeat was received
-
is_starved
()¶ Used to identify when buffered messages should be processed and responded to.
When max_in_flight > 1 and you’re batching messages together to perform work is isn’t possible to just compare the len of your list of buffered messages against your configured max_in_flight (because max_in_flight may not be evenly divisible by the number of producers you’re connected to, ie. you might never get that many messages... it’s a max).
Example:
def message_handler(self, nsq_msg, reader): # buffer messages if reader.is_starved(): # perform work reader = nsq.Reader(...) reader.set_message_handler(functools.partial(message_handler, reader=reader)) nsq.run()
-
process_message
(message)¶ Called when a message is received in order to execute the configured
message_handler
This is useful to subclass and override if you want to change how your message handlers are called.
Parameters: message – the nsq.Message
received
-
query_lookupd
()¶ Trigger a query of the configured
nsq_lookupd_http_addresses
.
-
set_max_in_flight
(max_in_flight)¶ dynamically adjust the reader max_in_flight count. Set to 0 to immediately disable a Reader
-
set_message_handler
(message_handler)¶ Assigns the callback method to be executed for each message received
Parameters: message_handler – a callable that takes a single argument
-
nsq.
run
() Starts any instantiated
nsq.Reader
ornsq.Writer