Modular Code: Don't burden the main loop

In many use cases, there is an event loop, which does some main task, and often some subsidiary tasks. The subsidiary tasks often grow with time, and it is usually tempting to put the subsidiary task directly in the main loop. However, this should be avoided because it leads to lot of code in the main loop, making it hard to read. A better approach is to offload additional functionality to a new class or closure, and make a single function call from the main loop.

An Example

We had some code which looked like this

class EventProcessor:
    def __init__(self, consumer):
        self.consumer = consumer

    def run(self):
        for message in consumer:
            # lots of processing here       


consumer = KafkaConsumer("some_topic")
ep = EventProcessor(consumer)
ep.run()

Within each message, there is a timestamp which denotes the time at which the message was put in the queue. Using this, we would like to output, every 10,000th message, what was the average time in the queue for these 10,000 messages.

This is how we did it, in the first try:

BATCH_SIZE = 10000
class EventProcessor:
    def __init__(self, consumer):
        self.consumer = consumer

    def run(self):
        count, total_wait_time = 0, 0
        for message in consumer:
            # lots of processing here       
            count += 1
            total_wait_time = time.time() - message['timestamp']

            if count == BATCH_SIZE:
                Logger.log(float(total_wait_time) / count)
                count, total_wait_time = 0, 0

consumer = KafkaConsumer("some_topic")
ep = EventProcessor(consumer)
ep.run()

This has led to contamination of our loop. Not only does the main loop now implement the logic of outputting average wait times, this logic is distributed at various places in the loop, and some lies outside the loop.

We wrote an improved version of the code, abstracting the wait time logger to a separate class:

BATCH_SIZE = 10000

class WaitTimeLogger:
    def __init__(self):
        self.init_vars()

    def init_vars(self):
        self.count, self.total_wait_time = 0, 0

    def increment(self, timestamp):
        self.total_wait_time += time.time() - timestamp
        if self.count == BATCH_SIZE:
            Logger.log(float(total_wait_time) / count)
            self.init_vars()
    
class EventProcessor:
    def __init__(self, consumer):
        self.consumer = consumer
        self.wait_time_logger = WaitTimeLogger()

    def run(self):
        for message in consumer:
            # lots of processing here       
            self.wait_time_logger.increment(message['timestamp'])

consumer = KafkaConsumer("some_topic")
ep = EventProcessor(consumer)
ep.run()

This is much better. Now the main loop has just one extra line. To make the code testable, we can insert WaitTimeLogger as a dependency:

BATCH_SIZE = 10000

class WaitTimeLogger:
    def __init__(self):
        self.init_vars()

    def init_vars(self):
        self.count, self.total_wait_time = 0, 0

    def increment(self, timestamp):
        self.total_wait_time += time.time() - timestamp
        if self.count == BATCH_SIZE:
            Logger.log(float(total_wait_time) / count)
            self.init_vars()
    
class EventProcessor:
    def __init__(self, consumer, wait_time_logger):
        self.consumer = consumer
        self.wait_time_logger = wait_time_logger

    def run(self):
        for message in consumer:
            # lots of processing here       
            self.wait_time_logger.increment(message['timestamp'])

consumer = KafkaConsumer("some_topic")
wait_time_logger = WaitTimeLogger()
ep = EventProcessor(consumer, wait_time_logger)
ep.run()

This, in my opinion, is the perfect solution.