diff options
| author | Loic Guegan <manzerbredes@mailbox.org> | 2023-07-16 17:19:44 +0200 |
|---|---|---|
| committer | Loic Guegan <manzerbredes@mailbox.org> | 2023-07-16 17:19:44 +0200 |
| commit | dd12a4b42f9f4619519ecc2095aa05fb37b27648 (patch) | |
| tree | 409bc73c6ce9f8b838923d11f9dae17450628045 /src/publisher.c | |
| parent | 33c54437949aaefacc3326a1c2d577db072cbacd (diff) | |
Minor changes
Diffstat (limited to 'src/publisher.c')
| -rw-r--r-- | src/publisher.c | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/src/publisher.c b/src/publisher.c index 3e41d86..8c8c2f3 100644 --- a/src/publisher.c +++ b/src/publisher.c @@ -1,7 +1,5 @@ #include <zmq.h> #include <assert.h> -#include <time.h> -#include <dirent.h> #include <signal.h> #include "utils.h" @@ -59,33 +57,38 @@ int main (int argc, char *argv []) //----- Start publisher struct dirent *de; // Pointer for directory entry while(!__stop){ - int interval=INTERVAL(__loginterval); - int interval_next=INTERVAL_NEXT(__loginterval); - DIR *dr = opendir(__logdir); + int interval=INTERVAL(__loginterval); // Current interval + int interval_next=INTERVAL_NEXT(__loginterval); // The next one + DIR *dr = opendir(__logdir); // Open the log directory if(dr !=NULL){ while ((de = readdir(dr)) != NULL){ + // Iterate over each loggers directories if(strcmp(de->d_name,".") && strcmp(de->d_name,"..")){ - char *client=de->d_name; + char *client=de->d_name; // Directory name is the client name + // Build the path for files we are looking for: char logfile[STATIC_LEN]; char logfile_next[STATIC_LEN]; sprintf(logfile,"%s/%s/%ld",__logdir,client,interval); sprintf(logfile_next,"%s/%s/%ld",__logdir,client,interval_next); // As long as next logfile is not available, we should wait - // for sending the current one + // before sending the current one (since loggers are working on it) printf("Waiting for %s logger measurements...\n",client); while(!FILE_EXISTS(logfile_next) && (!__stop)){ sleep(1); } - // Send current one + // Send current one: if(FILE_EXISTS(logfile)){ + printf("Publishing %s measurements\n",client); publish(publisher,logfile,client,interval); - remove(logfile); + remove(logfile); // Prevent log accumulation } } } closedir(dr); } else { + // If open fails, directory probably do not exists + // yet so wait: sleep(1); } } @@ -97,27 +100,31 @@ int main (int argc, char *argv []) } void publish(void *publisher, char *filepath, char* client, long int interval){ - printf("Publishing %s measurements\n",client); + // Build message header: char buffer[ZMQ_MSG_SIZE]; sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval); int msglen=strlen(buffer); - FILE *fptr; char * line = NULL; size_t len = 0; ssize_t read; + FILE *fptr; fptr=fopen(filepath,"r"); + // Put every lines in the buffer and send it while ((read = getline(&line, &len, fptr)) != -1) { if((read+msglen) <ZMQ_MSG_SIZE){ strcat(buffer,line); msglen+=read; } else { + // It buffer is full, we send the message and create another one zmq_send (publisher, buffer, msglen, 0); + // Build new message header: sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval); strcat(buffer,line); msglen=strlen(buffer); } } fclose(fptr); + // Finally send the last message (or the only one) zmq_send (publisher, buffer, msglen, 0); }
\ No newline at end of file |
