summaryrefslogtreecommitdiff
path: root/esds
diff options
context:
space:
mode:
authorLoic Guegan <manzerbredes@mailbox.org>2022-06-14 17:13:46 +0200
committerLoic Guegan <manzerbredes@mailbox.org>2022-06-14 17:13:46 +0200
commit7f13c95e16a802d0706e9f5a6d5d845d7fd67631 (patch)
tree65cb3c0faec397cb1b06894645d060514f6b242d /esds
parentb6877cb81e56c3991d0dbcf9fa579f627a4c2a29 (diff)
Major refactoring:
- Create pip package - Reorganized source code
Diffstat (limited to 'esds')
-rw-r--r--esds/__init__.py3
-rw-r--r--esds/esds.py594
-rw-r--r--esds/plugins/__init__.py1
-rw-r--r--esds/plugins/node_plugin.py29
-rw-r--r--esds/plugins/operating_states.py66
-rw-r--r--esds/plugins/power_states.py164
-rw-r--r--esds/plugins/wireless_area.py72
7 files changed, 929 insertions, 0 deletions
diff --git a/esds/__init__.py b/esds/__init__.py
new file mode 100644
index 0000000..bee183e
--- /dev/null
+++ b/esds/__init__.py
@@ -0,0 +1,3 @@
+__all__ = ["simulator", "plugins"]
+
+from esds.esds import Simulator
diff --git a/esds/esds.py b/esds/esds.py
new file mode 100644
index 0000000..148c205
--- /dev/null
+++ b/esds/esds.py
@@ -0,0 +1,594 @@
+#!/usr/bin/env python
+
+import numpy as np
+import threading,importlib,queue,sys,time
+
+class Node:
+ available_node_id=0
+ def __init__(self,src,interfaces):
+ """
+
+ """
+ self.node_id=Node.available_node_id
+ Node.available_node_id+=1 # Refresh node id
+ self.src=src # Store the node source code
+ self.args=None # Store the node arguments (passed through Simulator.create_node()
+ self.rargs=None # Store the requests arguments
+ self.plugins=list() # Contains all registered node plugins
+ self.rqueue=queue.Queue() # Receive simulator acknowledgments
+ self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()}
+ for interface in interfaces:
+ self.chest["interfaces"][interface]=queue.Queue()
+ self.chest["interfaces_queue_size"][interface]=0
+ self.chest_lock=threading.Lock() # To access/modify self.chest
+
+ def plugin_register(self,plugin):
+ self.plugins.append(plugin)
+
+ def plugin_notify(self,reason,args):
+ """
+ This function strives to avoid using Python specific features
+ """
+ for p in self.plugins:
+ if reason == "receive_return" or reason == "receivet_return":
+ p.on_receive_return(args[0],args[1],args[2],args[3])
+ if reason == "send_call":
+ p.on_send_call(args[0],args[1],args[2],args[3])
+ if reason == "send_return":
+ p.on_send_return(args[0],args[1],args[2],args[3],args[4])
+ if reason == "terminated":
+ p.on_terminated()
+
+ def __getitem__(self,key):
+ self.chest_lock.acquire()
+ value=self.chest[key]
+ self.chest_lock.release()
+ return value
+
+ def __setitem__(self,key,value):
+ self.chest_lock.acquire()
+ value=self.chest[key]=value
+ self.chest_lock.release()
+
+ def log(self,msg):
+ self.rargs=msg
+ self["request"]="log"
+ self["state"]="call"
+ self.wait_ack(["log"])
+
+ def read(self, register):
+ self["request"]="read"
+ self.rargs=register
+ self["state"]="call"
+ ack=self.wait_ack(["read"])
+ return ack[1]
+
+ def wait(self,duration):
+ self.rargs=duration
+ self["request"]="timeout_add"
+ self["state"]="call"
+ self.wait_ack(["timeout_add"])
+ self["state"]="pending"
+ self.wait_ack(["timeout"])
+
+ def wait_end(self):
+ self["request"]="wait_end"
+ self["state"]="request"
+ self.wait_ack(["wait_end"])
+ self.wait_ack(["sim_end"])
+
+ def turn_off(self):
+ self["turned_on"]=False
+ self["request"]="turn_off"
+ self["state"]="call"
+ self.wait_ack(["turn_off"])
+
+ def turn_on(self):
+ self["turned_on"]=True
+ self["request"]="turn_on"
+ self["state"]="call"
+ self.wait_ack(["turn_on"])
+
+ def send(self, interface, data, datasize, dst):
+ self.plugin_notify("send_call",(interface,data,datasize,dst))
+ self.rargs=(interface, data, datasize, dst)
+ self["request"]="send"
+ self["state"]="request"
+ ack=self.wait_ack(["send","send_cancel"])
+ self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1]))
+ return ack[1]
+
+ def receive(self,interface):
+ self["request"]="receive"
+ self.rargs=interface
+ self["state"]="request"
+ self.wait_ack(["receive"])
+ data,start_at,end_at=self["interfaces"][interface].get()
+ self.plugin_notify("receive_return",(interface,data,start_at,end_at))
+ return (0,data)
+
+ def sendt(self, interface, data, datasize, dst, timeout):
+ self.rargs=timeout
+ self["request"]="timeout_add"
+ self["state"]="call"
+ self.wait_ack(["timeout_add"])
+ self.rargs=(interface, data, datasize, dst)
+ self["request"]="send"
+ self["state"]="request"
+ ack=self.wait_ack(["send","timeout","send_cancel"])
+ if ack[0] == "timeout":
+ self["request"]="send_cancel"
+ self["state"]="call"
+ self.wait_ack(["send_cancel"])
+ return -1
+ self["request"]="timeout_remove"
+ self["state"]="call"
+ self.wait_ack(["timeout_remove"])
+ return ack[1]
+
+ def receivet(self,interface, timeout):
+ self.rargs=timeout
+ self["request"]="timeout_add"
+ self["state"]="call"
+ self.wait_ack(["timeout_add"])
+ self["request"]="receive"
+ self.rargs=interface
+ self["state"]="request"
+ ack=self.wait_ack(["receive","timeout"])
+ if ack[0] == "timeout":
+ return (-1,None)
+ self["request"]="timeout_remove"
+ self["state"]="call"
+ self.wait_ack(["timeout_remove"])
+ data,start_at,end_at=self["interfaces"][interface].get()
+ self.plugin_notify("receivet_return",(interface,data,start_at,end_at))
+ return (0,data)
+
+ def wait_ack(self, ack_types):
+ """
+ Wait for specific acks from the request queue (rqueue)
+ """
+ ack_buffer=list() # To filter ack
+ ack=None
+ while True:
+ ack=self.rqueue.get() # Wait for simulator acknowledgments
+ if ack[0] not in ack_types:
+ ack_buffer.append(ack)
+ else:
+ break
+ # Push back the filtered ack
+ for cur_ack in ack_buffer:
+ self.rqueue.put(cur_ack)
+ return(ack)
+
+ def sync(self):
+ """
+ Wait until node stop running
+ """
+ while self["state"] == "running":
+ pass
+
+ def run(self,args):
+ """
+ Load and run the user program
+ """
+ self.node=importlib.import_module(self.src)
+ self.args=args # Allow access to arguments
+ self.node.execute(self)
+ self["state"]="terminated"
+
+class Simulator:
+ """
+ Flow-Level Discrete Event Simulator for Cyber-Physical Systems
+ The general format for an event is (type,timestamp,event,priority)
+ Event types:
+ - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining), 1)
+ - 1 timeout (1,timestamp,node_id,4)
+ - 2 breakpoint_manual (3,timestamp,0,0)
+ - 3 breakpoint_auto (4,timestamp,0,0)
+
+ Very important: when the simulator wakes up a node (changing is state to running)
+ data that should be received by that node on the current simulated time SHOULD be in the queue!
+ Thus, the send event must be handle before the other event (priority equals to 1). Otherwise plugings such as the power states
+ one may not gives accurate results because of missing entries in the nodes received queues.
+ """
+
+ def __init__(self,netmat):
+ """
+ Format of netmat: { "interface": {"bandwidth": numpy_matrix, "latency": numpy_matrix, "is_wired":bool}}
+ For wireless interfaces the diagonals of the bandwidth and latency matrices are very important.
+ They determine the duration of the tranmission for THE SENDER. It allows to have a different tx duration per node and per interface.
+ Thus, at each wireless communication, an addionnal event is created for the sender that corresponds to a send to himself (diagonals of the matrices) used
+ to unlock him from the api.send() call. Consequently, the duration of the transmission (by the sender) can be
+ different from the time at which the receivers actually receive the data (non-diagonal entries of the matrices).
+ """
+ self.netmat=netmat
+ self.nodes=list()
+ self.sharing=dict()
+ for interface in netmat.keys():
+ if netmat[interface]["is_wired"]:
+ self.sharing[interface]=np.zeros(len(netmat[interface]["bandwidth"]))
+ self.events=np.empty((0,4),dtype=object)
+ self.events_dirty=True # For optimization reasons
+ self.startat=-1
+ self.time=0
+ self.debug_file_path="./esds.debug"
+ self.precision=".3f"
+ self.interferences=True
+ self.wait_end_nodes=list() # Keep track of nodes that wait for the end of the simulation
+ self.time_truncated=format(self.time,self.precision) # Truncated version is used in log print
+
+ def update_network(self,netmat):
+ for event in self.events:
+ if int(event[0]) == 0:
+ cur_event=event[2]
+ ts=float(event[1])
+ src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at=cur_event
+ new_bw=netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
+ old_bw=self.netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
+ new_lat=netmat[interface]["latency"][int(src_id),int(dst_id)]
+ old_lat=self.netmat[interface]["latency"][int(src_id),int(dst_id)]
+ if new_bw != old_bw or new_lat != old_lat:
+ new_datasize_remaining=float(datasize_remaining)*((ts-self.time)/float(duration))
+ if new_datasize_remaining > 0:
+ latency_factor=new_datasize_remaining/float(datasize)
+ if self.netmat[interface]["is_wired"]:
+ new_duration=new_datasize_remaining*8/(new_bw/self.sharing[interface][int(dst_id)])+new_lat*latency_factor
+ else:
+ new_duration=new_datasize_remaining*8/new_bw+new_lat*latency_factor
+ event[1]=self.time+new_duration
+ event[2][6]=new_datasize_remaining
+ event[2][5]=new_duration
+ self.netmat=netmat
+
+ def debug(self):
+ """
+ Log all the informations for debugging
+ """
+ stdout_save = sys.stdout
+ with open(self.debug_file_path, "a") as debug_file:
+ sys.stdout = debug_file
+ print("-----------------------------------------------")
+ print("Started since {}s".format(round(time.time()-self.startat,2)))
+ print("Simulated time {}s (or more precisely {}s)".format(self.time_truncated,self.time))
+ states=dict()
+ timeout_mode=list()
+ sharing=dict()
+ for node in self.nodes:
+ s=node["state"]
+ states[s]=states[s]+1 if s in states else 1
+ node_key="n"+str(node.node_id)
+ for interface in self.sharing.keys():
+ if self.sharing[interface][node.node_id] > 0:
+ if node_key not in sharing:
+ sharing[node_key] = ""
+ sharing[node_key]+=str(int(self.sharing[interface][node.node_id]))
+ print("Node number per state: ",end="")
+ for key in states:
+ print(key+"="+str(states[key]), end=" ")
+ print("\nNode sharing: ",end="")
+ for node_id in sharing:
+ print(node_id+"="+sharing[node_id], end=" ")
+ print("\nIds of node in timeout mode: ", end="")
+ for n in timeout_mode:
+ print(n,end=" ")
+ print("\nSorted events list:")
+ print(self.events)
+ sys.stdout = stdout_save
+
+ def create_node(self, src, args=None):
+ """
+ Create a node thread and run it
+ """
+ node=Node(src, self.netmat.keys())
+ self.nodes.append(node)
+ thread=threading.Thread(target=node.run, daemon=False,args=[args])
+ thread.start()
+
+ def log(self,msg,node=None):
+ src = "esds" if node is None else "n"+str(node)
+ print("[t="+str(self.time_truncated)+",src="+src+"] "+msg)
+
+ def sort_events(self):
+ """
+ Sort the events by timestamp and priorities
+ """
+ sorted_indexes=np.lexsort((self.events[:,3],self.events[:,1]))
+ self.events=self.events[sorted_indexes]
+
+ def sync_node(self,node):
+ """
+ Process all call request and wait for Node.sync() to return
+ """
+ node.sync()
+ while node["state"] == "call":
+ if node["request"] == "log":
+ self.log(node.rargs,node=node.node_id)
+ node["state"]="running"
+ node.rqueue.put(("log",0))
+ elif node["request"] == "timeout_add":
+ self.add_event(1,self.time+node.rargs,node.node_id,priority=3)
+ node["state"]="running"
+ node.rqueue.put(("timeout_add",0))
+ elif node["request"] == "timeout_remove":
+ selector=list()
+ for event in self.events:
+ if event[0] == 1 and event[2]==node.node_id:
+ selector.append(True)
+ else:
+ selector.append(False)
+ self.events=self.events[~np.array(selector)]
+ node["state"]="running"
+ node.rqueue.put(("timeout_remove",0))
+ elif node["request"] == "read":
+ node["state"]="running"
+ if node.rargs == "clock":
+ node.rqueue.put(("read",self.time))
+ elif node.rargs[0:5] == "ncom_": # ncom_<interface> register
+ interface=node.rargs[5:]
+ count=0
+ # Count number of communication on interface
+ for event in self.events:
+ if event[0] == 0 and event[2][1] == node.node_id and event[2][2] == interface:
+ count+=1
+ node.rqueue.put(("read",count))
+ else:
+ node.rqueue.put(("read",0)) # Always return 0 if register is unknown
+ elif node["request"] == "turn_on":
+ node["state"]="running"
+ node.rqueue.put(("turn_on",0))
+ self.log("Turned on",node=node.node_id)
+ elif node["request"] == "turn_off":
+ # Create communications selectors (True/False arrays)
+ selector_wireless=list() # Select all wireless events where node is involved
+ selector_wired=list() # Select all wired events where node is involved
+ for event in self.events:
+ if event[0]==0 and int(event[2][1])==node.node_id:
+ if self.netmat[event[2][2]]["is_wired"]:
+ selector_wireless.append(False)
+ selector_wired.append(True)
+ else:
+ selector_wireless.append(True)
+ selector_wired.append(False)
+ else:
+ selector_wireless.append(False)
+ selector_wired.append(False)
+ # Informed senders of wired events to cancel send
+ for event in self.events[selector_wired]:
+ sender=self.nodes[int(event[2][0])]
+ sender["state"]="running"
+ sender.rqueue.put(("send_cancel",2))
+ # Remove communications from the event list
+ if(len(self.events) != 0):
+ self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))]
+ # Refresh wired sharing
+ for interface in self.sharing.keys():
+ self.sharing[interface][node.node_id]=0 # Sharing goes back to zero
+ # Update node state after turning off
+ node["state"]="running"
+ node.rqueue.put(("turn_off",0))
+ self.log("Turned off",node=node.node_id)
+ elif node["request"] == "send_cancel":
+ selector=list()
+ for event in self.events:
+ if event[0]==0 and int(event[2][0]) == node.node_id:
+ selector.append(True)
+ if self.netmat[event[2][2]]["is_wired"]:
+ self.update_sharing(int(event[2][1]),-1,event[2][2])
+ else:
+ selector.append(False)
+ self.events=self.events[~np.array(selector)]
+ node["state"]="running"
+ node.rqueue.put(("send_cancel",0))
+ node.sync()
+
+ def update_sharing(self, dst, amount,interface):
+ """
+ Manage bandwidth sharing on wired interfaces
+ """
+ sharing=self.sharing[interface][dst]
+ new_sharing=sharing+amount
+ for event in self.events:
+ if event[0] == 0 and self.netmat[event[2][2]]["is_wired"] and int(event[2][1]) == dst:
+ remaining=event[1]-self.time
+ if remaining > 0:
+ remaining=remaining/sharing if sharing>1 else remaining # First restore sharing
+ remaining=remaining*new_sharing if new_sharing > 1 else remaining # Then apply new sharing
+ event[2][5]=remaining # Update duration
+ event[1]=self.time+remaining # Update timestamp
+ self.sharing[interface][dst]=new_sharing
+ self.sort_events()
+
+ def handle_interferences(self,sender,receiver, interface):
+ """
+ Interferences are detected by looking for conflicts between
+ new events and existing events.
+ """
+ status=False
+ selector=list()
+ notify=set()
+ for event in self.events:
+ event_type=event[0]
+ com=event[2]
+ if event_type==0 and com[2] == interface:
+ com_sender=int(com[0])
+ com_receiver=int(com[1])
+ select=False
+ if receiver==com_sender:
+ status=True
+ notify.add(receiver)
+ elif receiver==com_receiver:
+ status=True
+ select=True
+ notify.add(receiver)
+ if sender==com_receiver and com_sender != com_receiver:
+ select=True
+ notify.add(sender)
+ selector.append(select)
+ else:
+ selector.append(False)
+ if len(selector) != 0:
+ self.events=self.events[~np.array(selector)]
+ for node in notify:
+ self.log("Interferences on "+interface,node=node)
+ return status
+
+ def sync_event(self, node):
+ """
+ Collect events from the nodes
+ """
+ if node["state"] == "request":
+ if node["request"] == "send":
+ node["state"]="pending"
+ interface, data, datasize, dst=node.rargs
+ self.communicate(interface, node.node_id, dst, data, datasize)
+ elif node["request"] == "receive":
+ interface=node.rargs
+ if node["interfaces_queue_size"][interface] > 0:
+ node["interfaces_queue_size"][interface]-=1
+ node.rqueue.put(("receive",0))
+ node["state"]="running"
+ # Do not forget to collect the next event. This is the only request which is processed here
+ self.sync_node(node)
+ self.sync_event(node)
+ elif node["request"] == "wait_end":
+ node["state"]="pending"
+ node.rqueue.put(("wait_end",0))
+ self.wait_end_nodes.append(node.node_id)
+
+ def communicate(self, interface, src, dst, data, datasize):
+ """
+ Create communication event between src and dst
+ """
+ nsrc=self.nodes[src]
+ if self.netmat[interface]["is_wired"]:
+ if self.nodes[dst]["turned_on"]:
+ self.log("Send "+str(datasize)+" bytes to n"+str(dst)+" on "+interface,node=src)
+ self.update_sharing(dst,1,interface) # Update sharing first
+ # Note that in the following we send more data than expected to handle bandwidth sharing (datasize*8*sharing):
+ duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst]
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time))
+ else:
+ nsrc["state"]="request" # Try later when node is on
+ else:
+ self.log("Send "+str(datasize)+" bytes on "+interface,node=src)
+ for dst in self.list_receivers(nsrc,interface):
+ if self.nodes[dst]["turned_on"]:
+ duration=datasize*8/self.netmat[interface]["bandwidth"][src,dst]+self.netmat[interface]["latency"][src,dst]
+ if src == dst:
+ # This event (where src == dst) is used to notify the sender when data is received!
+ # Correspond to the diagonal of the network matrices (bandwidth and latency)
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time))
+ elif not self.interferences:
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time))
+ elif not self.handle_interferences(src,dst, interface):
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time))
+
+ def list_receivers(self,node,interface):
+ """
+ Deduce reachable receivers from the bandwidth matrix
+ """
+ selector = self.netmat[interface]["bandwidth"][node.node_id,] > 0
+ return np.arange(0,selector.shape[0])[selector]
+
+
+ def add_event(self,event_type,event_ts,event,priority=1):
+ """
+ Call this function with sort=True the least amount of time possible
+ """
+ self.events=np.concatenate([self.events,[np.array([event_type,event_ts,np.array(event,dtype=object),priority],dtype=object)]]) # Add new events
+ self.sort_events()
+
+ def run(self, breakpoints=[],breakpoint_callback=lambda s:None,breakpoints_every=None,debug=False,interferences=True):
+ """
+ Run the simulation with the created nodes
+ """
+ ##### Setup simulation
+ self.startat=time.time()
+ self.interferences=interferences
+ for bp in breakpoints:
+ self.add_event(2,bp,0,0)
+ if breakpoints_every != None:
+ self.add_event(3,breakpoints_every,0,0)
+ if debug:
+ with open(self.debug_file_path, "w") as f:
+ f.write("Python version {}\n".format(sys.version))
+ f.write("Simulation started at {}\n".format(self.startat))
+ f.write("Number of nodes is "+str(len(self.nodes))+"\n")
+ f.write("Manual breakpoints list: "+str(breakpoints)+"\n")
+ f.write("Breakpoints every "+str(breakpoints_every)+"s\n")
+ ##### Simulation loop
+ while True:
+ # Synchronize every nodes
+ for node in self.nodes:
+ self.sync_node(node)
+ # Manage events
+ for node in self.nodes:
+ self.sync_event(node)
+ # Generate debug logs
+ if debug:
+ self.debug()
+ # Simulation end
+ if len(self.events) <= 0 or len(self.events) == 1 and self.events[0,0] == 3:
+ # Notify nodes that wait for the end of the simulation
+ # Note that we do not allow them to create new events (even if they try, they will not be processed)
+ for node_id in self.wait_end_nodes:
+ self.nodes[node_id].rqueue.put(("sim_end",0))
+ self.nodes[node_id]["state"]="running"
+ self.sync_node(self.nodes[node_id]) # Allow them for make call requests (printing logs for example)
+ break # End the event processing loop
+
+ # Update simulation time
+ self.time=self.events[0,1]
+ self.time_truncated=format(self.time,self.precision) # refresh truncated time
+
+ # Process events
+ while len(self.events) > 0 and self.events[0,1] == self.time:
+ event_type=int(self.events[0,0])
+ ts=self.events[0,1]
+ event=self.events[0,2]
+ self.events=np.delete(self.events,0,0) # Consume events NOW! not at the end of the loop (event list may change in between)
+ if event_type == 0:
+ src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at=event
+ src=self.nodes[int(src_id)]
+ dst=self.nodes[int(dst_id)]
+ if self.netmat[interface]["is_wired"]:
+ dst["interfaces"][interface].put((data,start_at,self.time))
+ dst["interfaces_queue_size"][interface]+=1
+ self.update_sharing(dst.node_id,-1,interface)
+ self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id))
+ # If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
+ if dst["state"] == "request" and dst["request"] == "receive":
+ dst["interfaces_queue_size"][interface]-=1
+ dst.rqueue.put(("receive",0))
+ dst["state"]="running"
+ self.sync_node(dst)
+ src["state"]="running"
+ src.rqueue.put(("send",0))
+ else:
+ if src.node_id != dst.node_id:
+ dst["interfaces"][interface].put((data,start_at,self.time))
+ dst["interfaces_queue_size"][interface]+=1
+ self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id))
+ # If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
+ if dst["state"] == "request" and dst["request"] == "receive":
+ dst["interfaces_queue_size"][interface]-=1
+ dst.rqueue.put(("receive",0))
+ dst["state"]="running"
+ self.sync_node(dst)
+ else:
+ src["state"]="running"
+ src.rqueue.put(("send",0))
+ elif event_type == 1:
+ node=self.nodes[int(event)]
+ node["state"]="running"
+ node.rqueue.put(("timeout",0))
+ self.sync_node(node)
+ elif event_type == 2 or event_type == 3:
+ breakpoint_callback(self)
+ if event_type == 3:
+ self.add_event(3,self.time+breakpoints_every,0,0)
+
+ ##### Simulation ends
+ self.log("Simulation ends")
+
diff --git a/esds/plugins/__init__.py b/esds/plugins/__init__.py
new file mode 100644
index 0000000..abb734a
--- /dev/null
+++ b/esds/plugins/__init__.py
@@ -0,0 +1 @@
+__all__ = [ "node_plugin", "power_states" ]
diff --git a/esds/plugins/node_plugin.py b/esds/plugins/node_plugin.py
new file mode 100644
index 0000000..325ff8a
--- /dev/null
+++ b/esds/plugins/node_plugin.py
@@ -0,0 +1,29 @@
+class NodePlugin:
+ """
+ Node plugins get register to the node API get notified when events occurs.
+ The call and return suffixes are used for methods that are called at the beginning
+ and the end, respectively, of API calls triggered by the node source code.
+
+ Changing this API could brake most of the node plugins.
+ """
+
+ def __init__(self,plugin_name,api):
+ self.api=api
+ self.plugin_name=plugin_name
+ api.plugin_register(self)
+
+ def on_send_call(self,interface,data,datasize,dst):
+ pass
+
+ def on_send_return(self,interface,data,datasize,dst,code):
+ pass
+
+ def on_receive_return(self,interface,data,start_at,end_at):
+ pass
+
+ def on_terminated(self):
+ pass
+
+ def log(self,msg):
+ self.api.log(self.plugin_name+"(NP) "+msg)
+
diff --git a/esds/plugins/operating_states.py b/esds/plugins/operating_states.py
new file mode 100644
index 0000000..400aa1b
--- /dev/null
+++ b/esds/plugins/operating_states.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+
+from plugins.node_plugin import *
+
+######################
+# _ ____ ___ #
+# / \ | _ \_ _| #
+# / _ \ | |_) | | #
+# / ___ \| __/| | #
+# /_/ \_\_| |___| #
+# #
+######################
+
+# import plugins.operating_states as op
+# # Load the directional transition graph from graph.txt starting at the "vertex1" state
+# opstate=op.OperatingStates(api,"graph.txt","vertex1")
+# Format of the graph.txt file consists in one edge per line
+# that consists on the source vertex and destination vertex sperated by a space
+# As an example:
+# vertex1 vertex2
+# vertex1 vertex3
+# vertex3 vertex2
+# vertex2 vertex1
+#
+# opstate.register_callback(boom)
+# # On each state transition boom will be called as boom(src_state,dst_state)
+# # This way the boom callback can contains power_state transitions for examples
+# opstate.goto("vertex2") # works
+# opstate.goto("vertex3") # wont work
+# opstate.goto("vertex1") # work since we are on vertex2
+
+class OperatingStates(NodePlugin):
+ """
+ OperatingStates plugin
+ """
+ def __init__(self,api, state_file, initial_state):
+ self.transitions=list()
+ self.callbacks=list()
+ self.state=initial_state
+ with open(state_file) as fp:
+ for i, line in enumerate(fp):
+ self.transitions.append(line)
+ super().__init__("OperatingStates",api)
+
+ def goto(self,state):
+ if (self.state+" "+state) in self.transitions:
+ old_state=self.state
+ self.state=state
+ for c in self.callbacks:
+ c(old_state,state)
+ else:
+ self.log("Invalid transition "+self.state+" => "+state)
+
+ def get_state(self):
+ return(self.state)
+
+ def register_callback(self,callback):
+ """
+ The callback will be called on each state transition
+ Callback takes two arguments which are:
+ - The source state
+ - The destination state
+ """
+ self.callbacks.append(callback)
+
+
diff --git a/esds/plugins/power_states.py b/esds/plugins/power_states.py
new file mode 100644
index 0000000..be3d085
--- /dev/null
+++ b/esds/plugins/power_states.py
@@ -0,0 +1,164 @@
+#!/usr/bin/env python
+
+from plugins.node_plugin import *
+
+# PowerStates allows you to measure the energy consumption of a
+# node that go through several power states during the simulation
+# Two version of Powerstates is provided by mean of two classes:
+# - Powerstates: Allow you to set power to any user's defined values
+# - PowerstatesFromFile: Allow you to set power from states defined in a file
+
+######################
+# _ ____ ___ #
+# / \ | _ \_ _| #
+# / _ \ | |_) | | #
+# / ___ \| __/| | #
+# /_/ \_\_| |___| #
+# #
+######################
+
+# #Regarding PowerStates:
+# import Powerstates as ps
+# pstates=ps.PowerStates(<node>,<power_init>)
+# pstates.set_power(<power>) # Switch the power consumption to <power>
+# pstates.report_energy() # Display the current node energy consumption up to the current simulated time
+# pstates.report_power_changes() # Display all the power changes up to the current simulated time
+
+# #Regarding PowerStatesFromFile:
+# #Format of <file> is one <entry> per line that follow this format <state-0>:<state-1>:...:<state-n>
+# #Each line can corresponds to one node
+# import Powerstates as ps
+# pstates=ps.PowerStatesFromFile(<node>,<file>,<entry-line>) # Create a power states on node <node> using line <entry-line> of file <file>
+# pstates.set_state(<id>) # Switch to the <id> power states
+# pstates.report_energy() # Display the current node energy consumption up to the current simulated time
+# pstates.report_power_changes() # Display all the power changes up to the current simulated time
+# pstates.report_state_changes() # Display all the states changes up to the current simulated time
+
+
+class PowerStates(NodePlugin):
+ """
+ PowerStates model the energy consumed by the various changes of power consumption of a node over time.
+ """
+ def __init__(self,node,power_init):
+ self.node=node
+ self.clock=self.node.read("clock")
+ self.energy=0
+ self.power=power_init
+ self.power_changes=dict()
+ self.set_power(power_init)
+ super().__init__("Powerstates",api)
+
+
+ def set_power(self,power_watt):
+ cur_clock=self.node.read("clock")
+ self.energy+=self.power*(cur_clock-self.clock)
+ self.clock=cur_clock
+ if self.power != power_watt:
+ self.power_changes[cur_clock]=power_watt
+ self.power=power_watt
+ return cur_clock
+
+ def report_energy(self):
+ self.set_power(self.power)
+ self.node.log("[PowerStates Plugin] Consumed "+str(self.energy) +"J")
+
+ def report_power_changes(self):
+ self.set_power(self.power)
+ for key in self.power_changes.keys():
+ self.node.log("[PowerStates Plugin] At t="+str(key)+" power is "+str(self.power_changes[key])+"W")
+
+
+
+class PowerStatesFromFile(PowerStates):
+ """
+ A version of Powerstates that load the power values from a file.
+ """
+ def __init__(self,node,state_file,entry_line=1):
+ self.node=node
+ self.state_changes=dict()
+ self.states=[]
+ self.state=0
+ with open(state_file) as fp:
+ for i, line in enumerate(fp):
+ if i+1 == entry_line:
+ self.states=line.split(":")
+ self.states=[float(i) for i in self.states]
+ assert len(self.states) > 0
+ super().__init__(node,self.states[0])
+ self.set_state(0)
+
+ def set_state(self,state_id):
+ assert state_id < len(self.states)
+ clock=super().set_power(self.states[state_id])
+ if self.state != state_id:
+ self.state_changes[clock]=state_id
+ self.state=state_id
+
+
+ def report_state_changes(self):
+ self.set_state(self.state)
+ for key in self.state_changes.keys():
+ self.node.log("[PowerStates Plugin] At t="+str(key)+" state is "+str(self.state_changes[key]))
+
+
+class PowerStatesComms(NodePlugin):
+ """
+ Monitor the energy consumed by the network interfaces by mean of power states.
+ Note that for finer grained predictions, bytes and packet power consumption must be accounted.
+ Which is not the case with these power states.
+ """
+
+ def __init__(self,api):
+ super().__init__("PowerStatesComms",api)
+ self.energy_dynamic=0.0 # Store the dynamic part of the energy consumption
+ self.power=dict() # Store the power states
+ self.tx_clock=0 # Dynamic clock (store the time at which a the last tx starts
+ self.idle_clock=api.read("clock") # Store the start time (to compute the idle part of the energy consumption)
+
+ def on_receive_return(self,interface,data,start_at,end_at):
+ duration=float(end_at)-float(start_at)
+ self.energy_dynamic+=self.power[interface]["rx"]*duration
+
+ def on_send_call(self,interface,data,datasize,dst):
+ self.tx_clock=self.api.read("clock")
+
+ def on_send_return(self,interface,data,datasize,dst,code):
+ clock=self.api.read("clock")
+ duration=(clock-float(self.tx_clock))
+ self.energy_dynamic+=self.power[interface]["tx"]*duration
+ self.tx_clock=clock # Any value could be use here
+
+ def set_power(self,interface,idle,tx,rx):
+ self.power[interface]=dict()
+ self.power[interface]["idle"]=idle
+ self.power[interface]["rx"]=rx
+ self.power[interface]["tx"]=tx
+
+ def get_idle(self):
+ clock=self.api.read("clock")
+ idle=0
+ for interface in self.power.keys():
+ idle+=(clock-self.idle_clock)*self.power[interface]["idle"]
+ return idle
+
+ def get_receive_queue_energy(self,interface):
+ """
+ Not that call to on_receive_return may not have happened yet (or never).
+ Thus we should manually compute the energy consumption stored in each queues of the node.
+ """
+ energy=0
+ # For each interface we should check if there is received data that has not been consumed
+ for data in list(self.api["interfaces"][interface].queue):
+ start_at=float(data[1])
+ end_at=float(data[2])
+ energy+=(end_at-start_at)*self.power[interface]["rx"]
+ return energy
+
+ def get_energy(self):
+ queue_energy=0
+ for interface in self.power.keys():
+ queue_energy+=self.get_receive_queue_energy(interface)
+ return self.get_idle()+self.energy_dynamic+queue_energy
+
+ def report_energy(self):
+ self.log("Communications consumed "+str(round(self.get_energy(),2))+"J")
diff --git a/esds/plugins/wireless_area.py b/esds/plugins/wireless_area.py
new file mode 100644
index 0000000..958d4ba
--- /dev/null
+++ b/esds/plugins/wireless_area.py
@@ -0,0 +1,72 @@
+import math
+import numpy as np
+
+# This plugin is outdated
+class WirelessArea:
+
+ def __init__(self):
+ self.nodes=list()
+
+ def dump_nodes(self):
+ i=0
+ for node in self.nodes:
+ x,y,z,com_range=node
+ print("Node {} at ({},{},{}) with a communication range of {}m".format(i,x,y,z,com_range))
+ i+=1
+
+ def dump_infos(self):
+ print("Number of nodes {}".format(len(self.nodes)))
+ adjacency=self.generate_adjacency_matrix(fill_diagonal=False)
+ print("Nodes average degree is {}".format(np.mean(np.sum(adjacency,axis=0))))
+ x = [node[0] for node in self.nodes]
+ y = [node[1] for node in self.nodes]
+ z = [node[2] for node in self.nodes]
+ com_range = [node[3] for node in self.nodes]
+ print("Nodes locations ranges: x in [{},{}] y in [{},{}] z in [{},{}]".format(min(x),max(x),min(y),max(y),min(z),max(z)))
+ print("Node communication ranges in [{},{}]".format(min(com_range),max(com_range)))
+
+ def add_node(self,x,y,z,com_range):
+ self.nodes.append((x,y,z,com_range))
+
+ def get_neighbours(self,node_id):
+ node=self.nodes[node_id]
+ neighbours=list()
+ for i in range(0,len(self.nodes)):
+ if i != node_id:
+ neighbour=self.nodes[i]
+ if math.dist(node[0:3],neighbour[0:3]) <= node[3]:
+ neighbours.append(i)
+ return neighbours
+
+ def generate_dot(self,filepath):
+ is_strict=False
+ com_range=self.nodes[0][3]
+ for node in self.nodes:
+ if node[3] != com_range:
+ is_strict=True
+ break
+
+ with open(filepath, "w") as f:
+ if is_strict:
+ f.write("digraph G {\n")
+ else:
+ f.write("strict graph G {\n")
+ for i in range(0,len(self.nodes)):
+ neighbours=self.get_neighbours(i)
+ for n in neighbours:
+ if is_strict:
+ f.write("{}->{}\n".format(i,n))
+ else:
+ f.write("{}--{}\n".format(i,n))
+ f.write("}")
+
+ def generate_adjacency_matrix(self,fill_diagonal=True):
+ matrix=np.full((len(self.nodes),len(self.nodes)),0)
+ if fill_diagonal:
+ np.fill_diagonal(matrix,1) # Required by ESDS
+ for i in range(0,len(self.nodes)):
+ neighbours=self.get_neighbours(i)
+ for n in neighbours:
+ matrix[i,n]=1
+ return matrix
+