In many use cases, there is an event loop, which does some main task, and often some subsidiary tasks. The subsidiary tasks often grow with time, and it is usually tempting to put the subsidiary task directly in the main loop. However, this should be avoided because it leads to lot of code in the main loop, making it hard to read. A better approach is to offload additional functionality to a new class or closure, and make a single function call from the main loop.
We had some code which looked like this
class EventProcessor: def __init__(self, consumer): self.consumer = consumer def run(self): for message in consumer: # lots of processing here consumer = KafkaConsumer("some_topic") ep = EventProcessor(consumer) ep.run()
Within each message, there is a timestamp which denotes the time at which the message was put in the queue. Using this, we would like to output, every 10,000th message, what was the average time in the queue for these 10,000 messages.
This is how we did it, in the first try:
BATCH_SIZE = 10000 class EventProcessor: def __init__(self, consumer): self.consumer = consumer def run(self): count, total_wait_time = 0, 0 for message in consumer: # lots of processing here count += 1 total_wait_time = time.time() - message['timestamp'] if count == BATCH_SIZE: Logger.log(float(total_wait_time) / count) count, total_wait_time = 0, 0 consumer = KafkaConsumer("some_topic") ep = EventProcessor(consumer) ep.run()
This has led to contamination of our loop. Not only does the main loop now implement the logic of outputting average wait times, this logic is distributed at various places in the loop, and some lies outside the loop.
We wrote an improved version of the code, abstracting the wait time logger to a separate class:
BATCH_SIZE = 10000 class WaitTimeLogger: def __init__(self): self.init_vars() def init_vars(self): self.count, self.total_wait_time = 0, 0 def increment(self, timestamp): self.total_wait_time += time.time() - timestamp if self.count == BATCH_SIZE: Logger.log(float(total_wait_time) / count) self.init_vars() class EventProcessor: def __init__(self, consumer): self.consumer = consumer self.wait_time_logger = WaitTimeLogger() def run(self): for message in consumer: # lots of processing here self.wait_time_logger.increment(message['timestamp']) consumer = KafkaConsumer("some_topic") ep = EventProcessor(consumer) ep.run()
This is much better. Now the main loop has just one extra line. To make the code testable, we can insert WaitTimeLogger as a dependency:
BATCH_SIZE = 10000 class WaitTimeLogger: def __init__(self): self.init_vars() def init_vars(self): self.count, self.total_wait_time = 0, 0 def increment(self, timestamp): self.total_wait_time += time.time() - timestamp if self.count == BATCH_SIZE: Logger.log(float(total_wait_time) / count) self.init_vars() class EventProcessor: def __init__(self, consumer, wait_time_logger): self.consumer = consumer self.wait_time_logger = wait_time_logger def run(self): for message in consumer: # lots of processing here self.wait_time_logger.increment(message['timestamp']) consumer = KafkaConsumer("some_topic") wait_time_logger = WaitTimeLogger() ep = EventProcessor(consumer, wait_time_logger) ep.run()
This, in my opinion, is the perfect solution.