aboutsummaryrefslogtreecommitdiff
path: root/src/simulator.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/simulator.cc')
-rw-r--r--src/simulator.cc122
1 files changed, 88 insertions, 34 deletions
diff --git a/src/simulator.cc b/src/simulator.cc
index 9322a2b..76bb0fb 100644
--- a/src/simulator.cc
+++ b/src/simulator.cc
@@ -28,10 +28,28 @@
/// @brief Note that we need to simulate latency our self since we need to send instantaneous messages
#define SEND(instruction) \
{ \
- simgrid::s4u::this_actor::sleep_for(i.latency); \
+ TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(i.latency > uptime ? uptime : i.latency)); \
instruction; \
}
+#define FORWARD_HINT(TRY_FORWARD_DURING) \
+ { \
+ if(hint_forward!=NULL && CLOCK<hint_forward->hint){ \
+ hint_forward->HisForward=true; \
+ hint_forward->DedicatedMailbox="hint_forward"; \
+ try { \
+ XBT_INFO("%s try to forward a hint",CNAME); \
+ TRACK_UPTIME(m->put(hint_forward,0,TRY_FORWARD_DURING)); \
+ simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(hint_forward->DedicatedMailbox); \
+ MODE_TX(); \
+ SEND(m_ded->put(hint_forward,0,uptime)); \
+ } \
+ catch(...){ \
+ XBT_INFO("%s fail to forward a hint",CNAME); \
+ } \
+ } \
+ }
+
/// @brief Required by SimGrid
XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO] Loosely Coupled DSS");
@@ -44,11 +62,12 @@ typedef unsigned int u32;
*/
class Payload{
public:
- Payload():hint(0),duration(0),HasHint(false){}
- Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox){}
+ Payload():hint(0),duration(0),HasHint(false),HisForward(false){}
+ Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward){}
double hint; // The timestamp that should be used by the receiver
double duration; // The duration that should be used by the receiver
bool HasHint;
+ bool HisForward;
std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver
};
@@ -107,6 +126,7 @@ static void obs_node(std::vector<std::string> args) {
u32 nRcvFail=0;
u32 nSend=0;
double totalUptime=0;
+ Payload *hint_forward=NULL;
while(i.ShouldContinue()){
// Start by sleeping
XBT_INFO("%s is spleeping",CNAME);
@@ -119,6 +139,7 @@ static void obs_node(std::vector<std::string> args) {
double uptime=i.GetDuration();
double upsince=simgrid::s4u::Engine::get_clock();
double upuntil=i.GetTS()+i.GetDuration();
+ bool forward_mode=false;
while(uptime>0)
{
if(i.is_sender){
@@ -135,7 +156,6 @@ static void obs_node(std::vector<std::string> args) {
// This allow first to detect if their is a receiver
// (to not cause deadlock for the extended mode) and second
// to inform the receiver if he should get a hint first
- MODE_TX();
TRACK_UPTIME(m->put(p,0,uptime));
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
// First send hint if it is required
@@ -143,6 +163,7 @@ static void obs_node(std::vector<std::string> args) {
TRACK_UPTIME(SEND(m_ded->put(p,i.hint_size,uptime)));
XBT_INFO("%s sent a hint successfully",CNAME);
}
+ MODE_TX();
// Then try sending the data
if(i.extended){
SEND(m_ded->put(p,i.data_size));
@@ -162,42 +183,75 @@ static void obs_node(std::vector<std::string> args) {
}
}
else if(!isObserver){
- Payload *p; // Received data
- Payload *hint; // To Save the received hint
- bool hintReceived=false; // In case of error during data rx this will be use to check if we could use the *hint Payload object
- try {
- // Get the instantaneous message
- TRACK_UPTIME(p=m->get<Payload>(uptime));
- simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
- // Start receiving data
- MODE_RX();
- if(p->HasHint){
- TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
- XBT_INFO("%s received a hint successfully",CNAME);
- hint=new Payload(*p); // Save hint
- hintReceived=true;
+ // Here we try to forward hint for 1 sec and try to receive data for 5 secs
+ double try_for=forward_mode ? 1 : 1;
+ try_for=try_for>uptime ? uptime : try_for; // Ensure we do not exceed uptime
+ // Forward hint mode
+ if(forward_mode){
+ if(hint_forward!=NULL && CLOCK < hint_forward->hint){
+ FORWARD_HINT(try_for); // Try forward for 5 seconds then switch to received mode
+ }
+ }
+ else { // Receiving mode
+ Payload *p; // Received data
+ Payload *hint; // To Save the received hint
+ bool hintReceived=false; // In case of error during data rx this will be use to check if we could use the *hint Payload object
+ try {
+ // Get the instantaneous message
+ do {
+ TRACK_UPTIME(p=m->get<Payload>(try_for));
+ if(p->HisForward){
+ if(hint_forward==NULL || (hint_forward !=NULL && p->hint!=hint_forward->hint)){
+ simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
+ MODE_RX();
+ TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
+ MODE_ON();
+ XBT_INFO("%s received a forwarded hint successfully",CNAME);
+ if(CLOCK < p->hint){
+ i.AddEvent(p->hint, p->duration);
+ hint_forward=new Payload(*p);
+ }
+ }
+ }
+ } while(p->HisForward);
+ simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
+ // Start receiving data
+ MODE_RX();
+ if(p->HasHint){
+ TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
+ XBT_INFO("%s received a hint successfully",CNAME);
+ hint=new Payload(*p); // Save hint
+ hint_forward=new Payload(*p); // Enable hint forwarding
+ hintReceived=true;
+ }
+ if(i.extended)
+ p=m_ded->get<Payload>(); // Fetch data until sended
+ else
+ p=m_ded->get<Payload>(uptime); // Fetch data until sended or uptime expire
+ // If we reach here, data has been received successfully
+ XBT_INFO("%s received data successfully",CNAME);
+ nDataRcv++;
+ isObserver=true;
+ i.is_sender=false;
+
+ }catch(...){
+ XBT_INFO("%s could not receive any data",CNAME);
+ nRcvFail++;
+ if(hintReceived)
+ i.AddEvent(hint->hint, hint->duration); // Add the hint to the event list
}
- if(i.extended)
- p=m_ded->get<Payload>(); // Fetch data until sended
- else
- p=m_ded->get<Payload>(uptime); // Fetch data until sended or uptime expire
- // If we reach here, data has been received successfully
- XBT_INFO("%s received data successfully",CNAME);
- nDataRcv++;
- isObserver=true;
- i.is_sender=false;
-
- }catch(...){
- XBT_INFO("%s could not receive any data",CNAME);
- nRcvFail++;
- if(hintReceived)
- i.AddEvent(hint->hint, hint->duration); // Add the hint to the event list
}
+ forward_mode=!forward_mode; // Toggle mode (go back and forth between receiving and forwarding)
}
else {
XBT_INFO("%s is observing his environment...",CNAME);
MODE_ON();
- simgrid::s4u::this_actor::sleep_for(uptime);
+ if(hint_forward!=NULL && CLOCK < hint_forward->hint){
+ FORWARD_HINT(uptime);
+ }
+ else{
+ simgrid::s4u::this_actor::sleep_for(uptime);
+ }
}
uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode
}