Reader – high-level consumer

class nsq.Reader(topic, channel, message_handler=None, name=None, nsqd_tcp_addresses=None, lookupd_http_addresses=None, max_tries=5, max_in_flight=1, lookupd_poll_interval=60, low_rdy_idle_timeout=10, max_backoff_duration=128, lookupd_poll_jitter=0.3, lookupd_connect_timeout=1, lookupd_request_timeout=2, **kwargs)

Reader provides high-level functionality for building robust NSQ consumers in Python on top of the async module.

Reader receives messages over the specified topic/channel and calls message_handler for each message (up to max_tries).

Multiple readers can be instantiated in a single process (to consume from multiple topics/channels at once).

Supports various hooks to modify behavior when heartbeats are received, to temporarily disable the reader, and pre-process/validate messages.

When supplied a list of nsqlookupd addresses, it will periodically poll those addresses to discover new producers of the specified topic.

It maintains a sufficient RDY count based on the # of producers and your configured max_in_flight.

Handlers should be defined as shown in the examples below. The handler receives a nsq.Message object that has instance methods nsq.Message.finish(), nsq.Message.requeue(), and nsq.Message.touch() to respond to nsqd.

When messages are not responded to explicitly, it is responsible for sending FIN or REQ commands based on return value of message_handler. When re-queueing, it will backoff from processing additional messages for an increasing delay (calculated exponentially based on consecutive failures up to max_backoff_duration).

Synchronous example:

import nsq

def handler(message):
    print message
    return True

r = nsq.Reader(message_handler=handler,
        lookupd_http_addresses=['http://127.0.0.1:4161'],
        topic='nsq_reader', channel='asdf', lookupd_poll_interval=15)
nsq.run()

Asynchronous example:

import nsq

buf = []

def process_message(message):
    global buf
    message.enable_async()
    # cache the message for later processing
    buf.append(message)
    if len(buf) >= 3:
        for msg in buf:
            print msg
            msg.finish()
        buf = []
    else:
        print 'deferring processing'

r = nsq.Reader(message_handler=process_message,
        lookupd_http_addresses=['http://127.0.0.1:4161'],
        topic='nsq_reader', channel='async', max_in_flight=9)
nsq.run()
Parameters:
  • message_handler – the callable that will be executed for each message received
  • topic – specifies the desired NSQ topic
  • channel – specifies the desired NSQ channel
  • name – a string that is used for logging messages (defaults to ‘topic:channel’)
  • nsqd_tcp_addresses – a sequence of string addresses of the nsqd instances this reader should connect to
  • lookupd_http_addresses – a sequence of string addresses of the nsqlookupd instances this reader should query for producers of the specified topic
  • max_tries – the maximum number of attempts the reader will make to process a message after which messages will be automatically discarded
  • max_in_flight – the maximum number of messages this reader will pipeline for processing. this value will be divided evenly amongst the configured/discovered nsqd producers
  • lookupd_poll_interval – the amount of time in seconds between querying all of the supplied nsqlookupd instances. a random amount of time based on thie value will be initially introduced in order to add jitter when multiple readers are running
  • lookupd_poll_jitter – The maximum fractional amount of jitter to add to the lookupd pool loop. This helps evenly distribute requests even if multiple consumers restart at the same time.
  • lookupd_connect_timeout – the amount of time in seconds to wait for a connection to nsqlookupd to be established
  • lookupd_request_timeout – the amount of time in seconds to wait for a request to nsqlookupd to complete.
  • low_rdy_idle_timeout – the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers)
  • max_backoff_duration – the maximum time we will allow a backoff state to last in seconds
  • **kwargs – passed to nsq.AsyncConn initialization
close()

Closes all connections stops all periodic callbacks

connect_to_nsqd(host, port)

Adds a connection to nsqd at the specified address.

Parameters:
  • host – the address to connect to
  • port – the port to connect to
classmethod disabled()

Called as part of RDY handling to identify whether this Reader has been disabled

This is useful to subclass and override to examine a file on disk or a key in cache to identify if this reader should pause execution (during a deploy, etc.).

Note: deprecated. Use set_max_in_flight(0)

giving_up(message)

Called when a message has been received where msg.attempts > max_tries

This is useful to subclass and override to perform a task (such as writing to disk, etc.)

Parameters:message – the nsq.Message received
heartbeat(conn)

Called whenever a heartbeat has been received

This is useful to subclass and override to perform an action based on liveness (for monitoring, etc.)

Parameters:conn – the nsq.AsyncConn over which the heartbeat was received
is_starved()

Used to identify when buffered messages should be processed and responded to.

When max_in_flight > 1 and you’re batching messages together to perform work is isn’t possible to just compare the len of your list of buffered messages against your configured max_in_flight (because max_in_flight may not be evenly divisible by the number of producers you’re connected to, ie. you might never get that many messages... it’s a max).

Example:

def message_handler(self, nsq_msg, reader):
    # buffer messages
    if reader.is_starved():
        # perform work

reader = nsq.Reader(...)
reader.set_message_handler(functools.partial(message_handler, reader=reader))
nsq.run()
process_message(message)

Called when a message is received in order to execute the configured message_handler

This is useful to subclass and override if you want to change how your message handlers are called.

Parameters:message – the nsq.Message received
query_lookupd()

Trigger a query of the configured nsq_lookupd_http_addresses.

set_max_in_flight(max_in_flight)

dynamically adjust the reader max_in_flight count. Set to 0 to immediately disable a Reader

set_message_handler(message_handler)

Assigns the callback method to be executed for each message received

Parameters:message_handler – a callable that takes a single argument
nsq.run()

Starts any instantiated nsq.Reader or nsq.Writer