summaryrefslogtreecommitdiff
path: root/src/publisher.c
diff options
context:
space:
mode:
authorLoic Guegan <manzerbredes@mailbox.org>2023-07-16 17:19:44 +0200
committerLoic Guegan <manzerbredes@mailbox.org>2023-07-16 17:19:44 +0200
commitdd12a4b42f9f4619519ecc2095aa05fb37b27648 (patch)
tree409bc73c6ce9f8b838923d11f9dae17450628045 /src/publisher.c
parent33c54437949aaefacc3326a1c2d577db072cbacd (diff)
Minor changes
Diffstat (limited to 'src/publisher.c')
-rw-r--r--src/publisher.c29
1 files changed, 18 insertions, 11 deletions
diff --git a/src/publisher.c b/src/publisher.c
index 3e41d86..8c8c2f3 100644
--- a/src/publisher.c
+++ b/src/publisher.c
@@ -1,7 +1,5 @@
#include <zmq.h>
#include <assert.h>
-#include <time.h>
-#include <dirent.h>
#include <signal.h>
#include "utils.h"
@@ -59,33 +57,38 @@ int main (int argc, char *argv [])
//----- Start publisher
struct dirent *de; // Pointer for directory entry
while(!__stop){
- int interval=INTERVAL(__loginterval);
- int interval_next=INTERVAL_NEXT(__loginterval);
- DIR *dr = opendir(__logdir);
+ int interval=INTERVAL(__loginterval); // Current interval
+ int interval_next=INTERVAL_NEXT(__loginterval); // The next one
+ DIR *dr = opendir(__logdir); // Open the log directory
if(dr !=NULL){
while ((de = readdir(dr)) != NULL){
+ // Iterate over each loggers directories
if(strcmp(de->d_name,".") && strcmp(de->d_name,"..")){
- char *client=de->d_name;
+ char *client=de->d_name; // Directory name is the client name
+ // Build the path for files we are looking for:
char logfile[STATIC_LEN];
char logfile_next[STATIC_LEN];
sprintf(logfile,"%s/%s/%ld",__logdir,client,interval);
sprintf(logfile_next,"%s/%s/%ld",__logdir,client,interval_next);
// As long as next logfile is not available, we should wait
- // for sending the current one
+ // before sending the current one (since loggers are working on it)
printf("Waiting for %s logger measurements...\n",client);
while(!FILE_EXISTS(logfile_next) && (!__stop)){
sleep(1);
}
- // Send current one
+ // Send current one:
if(FILE_EXISTS(logfile)){
+ printf("Publishing %s measurements\n",client);
publish(publisher,logfile,client,interval);
- remove(logfile);
+ remove(logfile); // Prevent log accumulation
}
}
}
closedir(dr);
}
else {
+ // If open fails, directory probably do not exists
+ // yet so wait:
sleep(1);
}
}
@@ -97,27 +100,31 @@ int main (int argc, char *argv [])
}
void publish(void *publisher, char *filepath, char* client, long int interval){
- printf("Publishing %s measurements\n",client);
+ // Build message header:
char buffer[ZMQ_MSG_SIZE];
sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval);
int msglen=strlen(buffer);
- FILE *fptr;
char * line = NULL;
size_t len = 0;
ssize_t read;
+ FILE *fptr;
fptr=fopen(filepath,"r");
+ // Put every lines in the buffer and send it
while ((read = getline(&line, &len, fptr)) != -1) {
if((read+msglen) <ZMQ_MSG_SIZE){
strcat(buffer,line);
msglen+=read;
} else {
+ // It buffer is full, we send the message and create another one
zmq_send (publisher, buffer, msglen, 0);
+ // Build new message header:
sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval);
strcat(buffer,line);
msglen=strlen(buffer);
}
}
fclose(fptr);
+ // Finally send the last message (or the only one)
zmq_send (publisher, buffer, msglen, 0);
} \ No newline at end of file