diff options
| author | Loic Guegan <manzerbredes@mailbox.org> | 2022-06-23 14:06:26 +0200 |
|---|---|---|
| committer | Loic Guegan <manzerbredes@mailbox.org> | 2022-06-23 14:06:26 +0200 |
| commit | 6bf4be8b8f3863c65f47d37c0be182971f9360bc (patch) | |
| tree | 734107b238d8504f47fc4a3a7d152d310061ae79 /esds/node.py | |
| parent | b81ea45bd316869884cdee68b5ae07cc9e13c0c2 (diff) | |
Improve simulator robustness
Diffstat (limited to 'esds/node.py')
| -rw-r--r-- | esds/node.py | 205 |
1 files changed, 205 insertions, 0 deletions
diff --git a/esds/node.py b/esds/node.py new file mode 100644 index 0000000..1195ad1 --- /dev/null +++ b/esds/node.py @@ -0,0 +1,205 @@ +import threading,importlib,queue + +class Node: + available_node_id=0 + def __init__(self,src,interfaces): + """ + + """ + self.node_id=Node.available_node_id + Node.available_node_id+=1 # Refresh node id + self.src=src # Store the node source code + self.args=None # Store the node arguments (passed through Simulator.create_node() + self.rargs=None # Store the requests arguments + self.plugins=list() # Contains all registered node plugins + self.rqueue=queue.Queue() # Receive simulator acknowledgments + self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()} + for interface in interfaces: + self.chest["interfaces"][interface]=queue.Queue() + self.chest["interfaces_queue_size"][interface]=0 + self.chest_lock=threading.Lock() # To access/modify self.chest + + def plugin_register(self,plugin): + self.plugins.append(plugin) + + def plugin_notify(self,reason,args): + """ + This function strives to avoid using Python specific features + """ + for p in self.plugins: + if reason == "receive_return" or reason == "receivet_return": + p.on_receive_return(args[0],args[1],args[2],args[3]) + if reason == "send_call": + p.on_send_call(args[0],args[1],args[2],args[3]) + if reason == "send_return": + p.on_send_return(args[0],args[1],args[2],args[3],args[4]) + if reason == "terminated": + p.on_terminated() + + def __getitem__(self,key): + self.chest_lock.acquire() + value=self.chest[key] + self.chest_lock.release() + return value + + def __setitem__(self,key,value): + self.chest_lock.acquire() + value=self.chest[key]=value + self.chest_lock.release() + + def abort(self,reason): + self.rargs=reason + self["request"]="abort" + self["state"]="call" + while True: continue + + def log(self,msg): + if type(msg) != str: + self.abort("log() called with a non-string argument") + self.rargs=msg + self["request"]="log" + self["state"]="call" + self.wait_ack(["log"]) + + def read(self, register): + self["request"]="read" + self.rargs=register + self["state"]="call" + ack=self.wait_ack(["read"]) + return ack[1] + + def wait(self,duration): + if type(duration) != int and type(duration) != float: + self.abort("wait() called with a non-number duration") + self.rargs=duration + self["request"]="timeout_add" + self["state"]="call" + self.wait_ack(["timeout_add"]) + self["state"]="pending" + self.wait_ack(["timeout"]) + + def wait_end(self): + self["request"]="wait_end" + self["state"]="request" + self.wait_ack(["wait_end"]) + self.wait_ack(["sim_end"]) + + def turn_off(self): + self["turned_on"]=False + self["request"]="turn_off" + self["state"]="call" + self.wait_ack(["turn_off"]) + + def turn_on(self): + self["turned_on"]=True + self["request"]="turn_on" + self["state"]="call" + self.wait_ack(["turn_on"]) + + def send(self, interface, data, datasize, dst): + if interface not in self["interfaces"]: + self.abort("send() called with an unknown interface \""+interface+"\"") + elif type(datasize) != int and type(datasize) != float: + self.abort("send() called with a non-number datasize") + elif type(dst) != int and type(dst) != float and dst != None: + self.abort("send() called with a non-number dst (wired interfaces) or dst is not None (wireless interfaces)") + self.plugin_notify("send_call",(interface,data,datasize,dst)) + self.rargs=(interface, data, datasize, dst) + self["request"]="send" + self["state"]="request" + ack=self.wait_ack(["send","send_cancel"]) + self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1])) + return ack[1] + + def sendt(self, interface, data, datasize, dst, timeout): + if interface not in self["interfaces"]: + self.abort("sendt() called with an unknown interface \""+interface+"\"") + elif type(datasize) != int and type(datasize) != float: + self.abort("sendt() called with a non-number datasize") + elif type(timeout) != int and type(timeout) != float: + self.abort("sendt() called with a non-number timeout") + elif type(dst) != int and type(dst) != float and dst != None: + self.abort("send() called with a non-number dst (wired interfaces) or dst is not None (wireless interfaces)") + self.rargs=timeout + self["request"]="timeout_add" + self["state"]="call" + self.wait_ack(["timeout_add"]) + self.rargs=(interface, data, datasize, dst) + self["request"]="send" + self["state"]="request" + ack=self.wait_ack(["send","timeout","send_cancel"]) + if ack[0] == "timeout": + self["request"]="send_cancel" + self["state"]="call" + self.wait_ack(["send_cancel"]) + return -1 + self["request"]="timeout_remove" + self["state"]="call" + self.wait_ack(["timeout_remove"]) + return ack[1] + + def receive(self,interface): + if interface not in self["interfaces"]: + self.abort("receive() called with an unknown interface \""+interface+"\"") + self["request"]="receive" + self.rargs=interface + self["state"]="request" + self.wait_ack(["receive"]) + data,start_at,end_at=self["interfaces"][interface].get() + self.plugin_notify("receive_return",(interface,data,start_at,end_at)) + return (0,data) + + def receivet(self,interface, timeout): + if interface not in self["interfaces"]: + self.abort("receivet() called with an unknown interface \""+interface+"\"") + elif type(timeout) != int and type(timeout) != float: + self.abort("receivet() called with a non-number timeout") + self.rargs=timeout + self["request"]="timeout_add" + self["state"]="call" + self.wait_ack(["timeout_add"]) + self["request"]="receive" + self.rargs=interface + self["state"]="request" + ack=self.wait_ack(["receive","timeout"]) + if ack[0] == "timeout": + return (-1,None) + self["request"]="timeout_remove" + self["state"]="call" + self.wait_ack(["timeout_remove"]) + data,start_at,end_at=self["interfaces"][interface].get() + self.plugin_notify("receivet_return",(interface,data,start_at,end_at)) + return (0,data) + + def wait_ack(self, ack_types): + """ + Wait for specific acks from the request queue (rqueue) + """ + ack_buffer=list() # To filter ack + ack=None + while True: + ack=self.rqueue.get() # Wait for simulator acknowledgments + if ack[0] not in ack_types: + ack_buffer.append(ack) + else: + break + # Push back the filtered ack + for cur_ack in ack_buffer: + self.rqueue.put(cur_ack) + return(ack) + + def sync(self): + """ + Wait until node stop running + """ + while self["state"] == "running": + pass + + def run(self,args): + """ + Load and run the user program + """ + self.node=importlib.import_module(self.src) + self.args=args # Allow access to arguments + self.node.execute(self) + self["state"]="terminated" |
