请用 Python 写一个 EventManager该EventManager将启动独立thread并监听各种事件请添加如AddEventListener与SendEvent的方法。对于你将回答的一个实现请添加预设事件ticking该ticking事件将被使用示例中的一个for i in range100的循环中不断触发并传递一个列表作为参数ticking回调函数为update_listdata;
下面是一个示例实现EventManager的代码:
import threading
import time
class EventManager:
def __init__(self):
self.event_listeners = {}
self.event_lock = threading.Lock()
self.ticking_counter = 0
def add_event_listener(self, event, callback):
with self.event_lock:
if event not in self.event_listeners:
self.event_listeners[event] = []
self.event_listeners[event].append(callback)
def send_event(self, event, data):
with self.event_lock:
if event in self.event_listeners:
for callback in self.event_listeners[event]:
callback(data)
def start(self):
threading.Thread(target=self._tick).start()
def _tick(self):
for i in range(100):
time.sleep(0.1)
self.send_event("ticking", [i])
self.ticking_counter += 1
if self.ticking_counter == 2:
self.send_event("onParking", [])
self.ticking_counter = 0
def update_list(data):
print("Update list:", data)
def parking_ended():
print("Parking ended")
if __name__ == "__main__":
event_manager = EventManager()
event_manager.add_event_listener("ticking", update_list)
event_manager.add_event_listener("onParking", parking_ended)
event_manager.start()
在这个示例实现中,EventManager类有两个主要方法:add_event_listener和send_event。add_event_listener用于添加事件监听器,send_event用于触发事件。EventManager内部使用一个字典event_listeners来存储每个事件对应的回调函数列表。在send_event方法中,会遍历对应事件的回调函数列表,并依次调用回调函数。
在start方法中,会启动一个独立的线程来执行_tick方法。_tick方法中的循环会不断发送"ticking"事件,并在每次发送后检查ticking_counter的值。当ticking_counter等于2时,会发送"onParking"事件,并将ticking_counter重置为0。
在示例的if name == "main":部分,我们创建了一个EventManager实例,并添加了"ticking"事件和"onParking"事件的回调函数。然后调用start方法启动EventManager的线程。这样,在循环中不断触发"ticking"事件,并在满足条件时触发"onParking"事件,从而触发相应的回调函数
原文地址: https://www.cveoy.top/t/topic/iWAI 著作权归作者所有。请勿转载和采集!