summaryrefslogtreecommitdiff
path: root/esds
diff options
context:
space:
mode:
authorLoic Guegan <manzerbredes@mailbox.org>2022-06-23 14:06:26 +0200
committerLoic Guegan <manzerbredes@mailbox.org>2022-06-23 14:06:26 +0200
commit6bf4be8b8f3863c65f47d37c0be182971f9360bc (patch)
tree734107b238d8504f47fc4a3a7d152d310061ae79 /esds
parentb81ea45bd316869884cdee68b5ae07cc9e13c0c2 (diff)
Improve simulator robustness
Diffstat (limited to 'esds')
-rw-r--r--esds/__init__.py5
-rw-r--r--esds/node.py205
-rw-r--r--esds/simulator.py (renamed from esds/esds.py)202
3 files changed, 215 insertions, 197 deletions
diff --git a/esds/__init__.py b/esds/__init__.py
index efefa84..af3fa98 100644
--- a/esds/__init__.py
+++ b/esds/__init__.py
@@ -1,3 +1,4 @@
-__all__ = ["simulator", "plugins", "helpers"]
+__all__ = ["simulator", "node", "plugins", "helpers"]
+
+from esds.simulator import Simulator
-from esds.esds import Simulator
diff --git a/esds/node.py b/esds/node.py
new file mode 100644
index 0000000..1195ad1
--- /dev/null
+++ b/esds/node.py
@@ -0,0 +1,205 @@
+import threading,importlib,queue
+
+class Node:
+ available_node_id=0
+ def __init__(self,src,interfaces):
+ """
+
+ """
+ self.node_id=Node.available_node_id
+ Node.available_node_id+=1 # Refresh node id
+ self.src=src # Store the node source code
+ self.args=None # Store the node arguments (passed through Simulator.create_node()
+ self.rargs=None # Store the requests arguments
+ self.plugins=list() # Contains all registered node plugins
+ self.rqueue=queue.Queue() # Receive simulator acknowledgments
+ self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()}
+ for interface in interfaces:
+ self.chest["interfaces"][interface]=queue.Queue()
+ self.chest["interfaces_queue_size"][interface]=0
+ self.chest_lock=threading.Lock() # To access/modify self.chest
+
+ def plugin_register(self,plugin):
+ self.plugins.append(plugin)
+
+ def plugin_notify(self,reason,args):
+ """
+ This function strives to avoid using Python specific features
+ """
+ for p in self.plugins:
+ if reason == "receive_return" or reason == "receivet_return":
+ p.on_receive_return(args[0],args[1],args[2],args[3])
+ if reason == "send_call":
+ p.on_send_call(args[0],args[1],args[2],args[3])
+ if reason == "send_return":
+ p.on_send_return(args[0],args[1],args[2],args[3],args[4])
+ if reason == "terminated":
+ p.on_terminated()
+
+ def __getitem__(self,key):
+ self.chest_lock.acquire()
+ value=self.chest[key]
+ self.chest_lock.release()
+ return value
+
+ def __setitem__(self,key,value):
+ self.chest_lock.acquire()
+ value=self.chest[key]=value
+ self.chest_lock.release()
+
+ def abort(self,reason):
+ self.rargs=reason
+ self["request"]="abort"
+ self["state"]="call"
+ while True: continue
+
+ def log(self,msg):
+ if type(msg) != str:
+ self.abort("log() called with a non-string argument")
+ self.rargs=msg
+ self["request"]="log"
+ self["state"]="call"
+ self.wait_ack(["log"])
+
+ def read(self, register):
+ self["request"]="read"
+ self.rargs=register
+ self["state"]="call"
+ ack=self.wait_ack(["read"])
+ return ack[1]
+
+ def wait(self,duration):
+ if type(duration) != int and type(duration) != float:
+ self.abort("wait() called with a non-number duration")
+ self.rargs=duration
+ self["request"]="timeout_add"
+ self["state"]="call"
+ self.wait_ack(["timeout_add"])
+ self["state"]="pending"
+ self.wait_ack(["timeout"])
+
+ def wait_end(self):
+ self["request"]="wait_end"
+ self["state"]="request"
+ self.wait_ack(["wait_end"])
+ self.wait_ack(["sim_end"])
+
+ def turn_off(self):
+ self["turned_on"]=False
+ self["request"]="turn_off"
+ self["state"]="call"
+ self.wait_ack(["turn_off"])
+
+ def turn_on(self):
+ self["turned_on"]=True
+ self["request"]="turn_on"
+ self["state"]="call"
+ self.wait_ack(["turn_on"])
+
+ def send(self, interface, data, datasize, dst):
+ if interface not in self["interfaces"]:
+ self.abort("send() called with an unknown interface \""+interface+"\"")
+ elif type(datasize) != int and type(datasize) != float:
+ self.abort("send() called with a non-number datasize")
+ elif type(dst) != int and type(dst) != float and dst != None:
+ self.abort("send() called with a non-number dst (wired interfaces) or dst is not None (wireless interfaces)")
+ self.plugin_notify("send_call",(interface,data,datasize,dst))
+ self.rargs=(interface, data, datasize, dst)
+ self["request"]="send"
+ self["state"]="request"
+ ack=self.wait_ack(["send","send_cancel"])
+ self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1]))
+ return ack[1]
+
+ def sendt(self, interface, data, datasize, dst, timeout):
+ if interface not in self["interfaces"]:
+ self.abort("sendt() called with an unknown interface \""+interface+"\"")
+ elif type(datasize) != int and type(datasize) != float:
+ self.abort("sendt() called with a non-number datasize")
+ elif type(timeout) != int and type(timeout) != float:
+ self.abort("sendt() called with a non-number timeout")
+ elif type(dst) != int and type(dst) != float and dst != None:
+ self.abort("send() called with a non-number dst (wired interfaces) or dst is not None (wireless interfaces)")
+ self.rargs=timeout
+ self["request"]="timeout_add"
+ self["state"]="call"
+ self.wait_ack(["timeout_add"])
+ self.rargs=(interface, data, datasize, dst)
+ self["request"]="send"
+ self["state"]="request"
+ ack=self.wait_ack(["send","timeout","send_cancel"])
+ if ack[0] == "timeout":
+ self["request"]="send_cancel"
+ self["state"]="call"
+ self.wait_ack(["send_cancel"])
+ return -1
+ self["request"]="timeout_remove"
+ self["state"]="call"
+ self.wait_ack(["timeout_remove"])
+ return ack[1]
+
+ def receive(self,interface):
+ if interface not in self["interfaces"]:
+ self.abort("receive() called with an unknown interface \""+interface+"\"")
+ self["request"]="receive"
+ self.rargs=interface
+ self["state"]="request"
+ self.wait_ack(["receive"])
+ data,start_at,end_at=self["interfaces"][interface].get()
+ self.plugin_notify("receive_return",(interface,data,start_at,end_at))
+ return (0,data)
+
+ def receivet(self,interface, timeout):
+ if interface not in self["interfaces"]:
+ self.abort("receivet() called with an unknown interface \""+interface+"\"")
+ elif type(timeout) != int and type(timeout) != float:
+ self.abort("receivet() called with a non-number timeout")
+ self.rargs=timeout
+ self["request"]="timeout_add"
+ self["state"]="call"
+ self.wait_ack(["timeout_add"])
+ self["request"]="receive"
+ self.rargs=interface
+ self["state"]="request"
+ ack=self.wait_ack(["receive","timeout"])
+ if ack[0] == "timeout":
+ return (-1,None)
+ self["request"]="timeout_remove"
+ self["state"]="call"
+ self.wait_ack(["timeout_remove"])
+ data,start_at,end_at=self["interfaces"][interface].get()
+ self.plugin_notify("receivet_return",(interface,data,start_at,end_at))
+ return (0,data)
+
+ def wait_ack(self, ack_types):
+ """
+ Wait for specific acks from the request queue (rqueue)
+ """
+ ack_buffer=list() # To filter ack
+ ack=None
+ while True:
+ ack=self.rqueue.get() # Wait for simulator acknowledgments
+ if ack[0] not in ack_types:
+ ack_buffer.append(ack)
+ else:
+ break
+ # Push back the filtered ack
+ for cur_ack in ack_buffer:
+ self.rqueue.put(cur_ack)
+ return(ack)
+
+ def sync(self):
+ """
+ Wait until node stop running
+ """
+ while self["state"] == "running":
+ pass
+
+ def run(self,args):
+ """
+ Load and run the user program
+ """
+ self.node=importlib.import_module(self.src)
+ self.args=args # Allow access to arguments
+ self.node.execute(self)
+ self["state"]="terminated"
diff --git a/esds/esds.py b/esds/simulator.py
index 0149a82..a9866b8 100644
--- a/esds/esds.py
+++ b/esds/simulator.py
@@ -1,197 +1,6 @@
-#!/usr/bin/env python
-
import numpy as np
-import threading,importlib,queue,sys,time
-
-class Node:
- available_node_id=0
- def __init__(self,src,interfaces):
- """
-
- """
- self.node_id=Node.available_node_id
- Node.available_node_id+=1 # Refresh node id
- self.src=src # Store the node source code
- self.args=None # Store the node arguments (passed through Simulator.create_node()
- self.rargs=None # Store the requests arguments
- self.plugins=list() # Contains all registered node plugins
- self.rqueue=queue.Queue() # Receive simulator acknowledgments
- self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()}
- for interface in interfaces:
- self.chest["interfaces"][interface]=queue.Queue()
- self.chest["interfaces_queue_size"][interface]=0
- self.chest_lock=threading.Lock() # To access/modify self.chest
-
- def plugin_register(self,plugin):
- self.plugins.append(plugin)
-
- def plugin_notify(self,reason,args):
- """
- This function strives to avoid using Python specific features
- """
- for p in self.plugins:
- if reason == "receive_return" or reason == "receivet_return":
- p.on_receive_return(args[0],args[1],args[2],args[3])
- if reason == "send_call":
- p.on_send_call(args[0],args[1],args[2],args[3])
- if reason == "send_return":
- p.on_send_return(args[0],args[1],args[2],args[3],args[4])
- if reason == "terminated":
- p.on_terminated()
-
- def __getitem__(self,key):
- self.chest_lock.acquire()
- value=self.chest[key]
- self.chest_lock.release()
- return value
-
- def __setitem__(self,key,value):
- self.chest_lock.acquire()
- value=self.chest[key]=value
- self.chest_lock.release()
-
- def abort(self,reason):
- self.rargs=reason
- self["request"]="abort"
- self["state"]="call"
- while True: continue
-
- def log(self,msg):
- if type(msg) != str:
- self.abort("log() called with a non-string argument")
- self.rargs=msg
- self["request"]="log"
- self["state"]="call"
- self.wait_ack(["log"])
-
- def read(self, register):
- self["request"]="read"
- self.rargs=register
- self["state"]="call"
- ack=self.wait_ack(["read"])
- return ack[1]
-
- def wait(self,duration):
- self.rargs=duration
- self["request"]="timeout_add"
- self["state"]="call"
- self.wait_ack(["timeout_add"])
- self["state"]="pending"
- self.wait_ack(["timeout"])
-
- def wait_end(self):
- self["request"]="wait_end"
- self["state"]="request"
- self.wait_ack(["wait_end"])
- self.wait_ack(["sim_end"])
-
- def turn_off(self):
- self["turned_on"]=False
- self["request"]="turn_off"
- self["state"]="call"
- self.wait_ack(["turn_off"])
-
- def turn_on(self):
- self["turned_on"]=True
- self["request"]="turn_on"
- self["state"]="call"
- self.wait_ack(["turn_on"])
-
- def send(self, interface, data, datasize, dst):
- if interface not in self["interfaces"]:
- self.abort("send() called with an unknown interface \""+interface+"\"")
- self.plugin_notify("send_call",(interface,data,datasize,dst))
- self.rargs=(interface, data, datasize, dst)
- self["request"]="send"
- self["state"]="request"
- ack=self.wait_ack(["send","send_cancel"])
- self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1]))
- return ack[1]
-
- def sendt(self, interface, data, datasize, dst, timeout):
- if interface not in self["interfaces"]:
- self.abort("sendt() called with an unknown interface \""+interface+"\"")
- self.rargs=timeout
- self["request"]="timeout_add"
- self["state"]="call"
- self.wait_ack(["timeout_add"])
- self.rargs=(interface, data, datasize, dst)
- self["request"]="send"
- self["state"]="request"
- ack=self.wait_ack(["send","timeout","send_cancel"])
- if ack[0] == "timeout":
- self["request"]="send_cancel"
- self["state"]="call"
- self.wait_ack(["send_cancel"])
- return -1
- self["request"]="timeout_remove"
- self["state"]="call"
- self.wait_ack(["timeout_remove"])
- return ack[1]
-
- def receive(self,interface):
- if interface not in self["interfaces"]:
- self.abort("receive() called with an unknown interface \""+interface+"\"")
- self["request"]="receive"
- self.rargs=interface
- self["state"]="request"
- self.wait_ack(["receive"])
- data,start_at,end_at=self["interfaces"][interface].get()
- self.plugin_notify("receive_return",(interface,data,start_at,end_at))
- return (0,data)
-
- def receivet(self,interface, timeout):
- if interface not in self["interfaces"]:
- self.abort("receivet() called with an unknown interface \""+interface+"\"")
- self.rargs=timeout
- self["request"]="timeout_add"
- self["state"]="call"
- self.wait_ack(["timeout_add"])
- self["request"]="receive"
- self.rargs=interface
- self["state"]="request"
- ack=self.wait_ack(["receive","timeout"])
- if ack[0] == "timeout":
- return (-1,None)
- self["request"]="timeout_remove"
- self["state"]="call"
- self.wait_ack(["timeout_remove"])
- data,start_at,end_at=self["interfaces"][interface].get()
- self.plugin_notify("receivet_return",(interface,data,start_at,end_at))
- return (0,data)
-
- def wait_ack(self, ack_types):
- """
- Wait for specific acks from the request queue (rqueue)
- """
- ack_buffer=list() # To filter ack
- ack=None
- while True:
- ack=self.rqueue.get() # Wait for simulator acknowledgments
- if ack[0] not in ack_types:
- ack_buffer.append(ack)
- else:
- break
- # Push back the filtered ack
- for cur_ack in ack_buffer:
- self.rqueue.put(cur_ack)
- return(ack)
-
- def sync(self):
- """
- Wait until node stop running
- """
- while self["state"] == "running":
- pass
-
- def run(self,args):
- """
- Load and run the user program
- """
- self.node=importlib.import_module(self.src)
- self.args=args # Allow access to arguments
- self.node.execute(self)
- self["state"]="terminated"
+import threading,sys,time
+from esds.node import Node
class Simulator:
"""
@@ -344,7 +153,7 @@ class Simulator:
elif node["request"] == "read":
node["state"]="running"
if node.rargs == "clock":
- node.rqueue.put(("read",self.time))
+ node.rqueue.put(("read",float(self.time)))
elif node.rargs[0:5] == "ncom_": # ncom_<interface> register
interface=node.rargs[5:]
count=0
@@ -462,6 +271,10 @@ class Simulator:
if node["request"] == "send":
node["state"]="pending"
interface, data, datasize, dst=node.rargs
+ if dst != None:
+ if not (dst >=0 and dst <=len(self.nodes)):
+ self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id)
+ exit(1)
self.communicate(interface, node.node_id, dst, data, datasize)
elif node["request"] == "receive":
interface=node.rargs
@@ -612,4 +425,3 @@ class Simulator:
##### Simulation ends
self.log("Simulation ends")
-