summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoic Guegan <manzerbredes@mailbox.org>2023-07-18 13:00:38 +0200
committerLoic Guegan <manzerbredes@mailbox.org>2023-07-18 13:00:38 +0200
commit7f87211b01bcc116eba8b01b73677ea87172e8d7 (patch)
treed92a66698ac50996b1842328cbc5846175dd1da7
parent55d9cd75679050b45f1b350decb95d05aac2a47e (diff)
Minor changes
-rw-r--r--Makefile25
-rw-r--r--config.mk7
-rw-r--r--src/logger.c172
-rw-r--r--src/publisher.c262
-rw-r--r--src/subscriber.c8
5 files changed, 172 insertions, 302 deletions
diff --git a/Makefile b/Makefile
index 46ad715..31dd09c 100644
--- a/Makefile
+++ b/Makefile
@@ -6,30 +6,29 @@ CFLAGS=
MACROS=\
-DZMQ_TOKEN=\"$(ZMQ_TOKEN)\" \
-DZMQ_MSG_SIZE=$(ZMQ_MSG_SIZE) \
--DLOGGERS_DELAY=$(LOGGERS_DELAY)
+-DLOG_DELAY=$(LOG_DELAY)
-all: publisher subscriber logger
+all: publisher subscriber
publisher: src/publisher.c src/utils.c config.mk
- $(CC) -lzmq $(filter-out config.mk,$^) -o $@ $(MACROS)
+ $(CC) -lzmq -lpthread $(filter-out config.mk,$^) -o $@ $(MACROS)
subscriber: src/subscriber.c src/utils.c config.mk
$(CC) -lzmq $(filter-out config.mk,$^) -o $@ $(MACROS)
-logger: src/logger.c src/utils.c config.mk
- $(CC) -lzmq -lpthread $(filter-out config.mk,$^) -o $@ $(MACROS)
-
-publish: publisher logger
- for client in $$(basename -a /sys/kernel/ina260/*); \
+publish: publisher
+ [ -f pid ] && { cat pid|xargs kill -INT; rm pid; } || exit 0
+ for client in $$(ls /sys/kernel/ina260/|xargs basename -a); \
do \
- ./logger $(LOGGERS_DIR) $$client $(LOG_INTERVAL) $(SUBSCRIBER_ADDR) $(ZMQ_PORT) $(KEY) ; \
- done
-# [ -f pid ] && { kill -INT $(shell cat pid); rm pid; }
+ ./$^ $$client $(LOG_INTERVAL) $(SUBSCRIBER_ADDR) $(ZMQ_PORT) $(KEY) > publisher_$${client}.log 2>&1 & \
+ echo $$! >> pid; \
+ done ;\
+ wait
subscribe: subscriber
- ./subscriber $(ZMQ_PORT) $(SUBSCRIBER_DIR)
+ ./$^ $(ZMQ_PORT) $(SUBSCRIBER_DIR)
clean:
- rm -f logger subscriber publisher
+ rm -f subscriber publisher
.PHONY: clean publish subscribe
diff --git a/config.mk b/config.mk
index 92232f7..15dd5d6 100644
--- a/config.mk
+++ b/config.mk
@@ -13,13 +13,10 @@ ZMQ_TOKEN=ina260-zmq-publisher
ZMQ_MSG_SIZE=5242880
##### Loggers/Publisher
-# LOGGERS_DIR will contains all the data generated by the loggers
-# a.k.a the ina260 power measurements
-LOGGERS_DIR=/tmp/ina260_logs/
-# LOGGERS_DELAY defines the delay between 2 consecutive
+# LOG_DELAY defines the delay between 2 consecutive
# ina260 power read performed by the logger
# Unit is milliseconds
-LOGGERS_DELAY=1000
+LOG_DELAY=1000
# SUBSCRIBER_DIR will contain all the measurments
# received from the publishers
SUBSCRIBER_DIR=./data
diff --git a/src/logger.c b/src/logger.c
deleted file mode 100644
index 7655f75..0000000
--- a/src/logger.c
+++ /dev/null
@@ -1,172 +0,0 @@
-#include <stdio.h>
-#include <zmq.h>
-#include <assert.h>
-#include <libgen.h>
-#include <unistd.h>
-#include <sys/stat.h>
-#include <signal.h>
-#include <pthread.h>
-#include "utils.h"
-
-#ifndef LOGGERS_DELAY
-#define LOGGERS_DELAY 0
-#endif
-
-// Global:
-char *__client;
-char *__ip;
-char *__key;
-int __port;
-char __logdir[STATIC_LEN];
-char __regpower[STATIC_LEN];
-int __loginterval;
-unsigned char __stop=0;
-
-void sighandler(int signo){
- if (signo == SIGINT){
- printf("Stopping...\n");
- __stop=1;
- }
-}
-
-void *publisher(void *zmq_publisher);
-
-typedef struct queue {
- int size;
- char issending;
- char msg[ZMQ_MSG_SIZE];
-} queue;
-queue queues[MAX_QUEUES];
-
-int main (int argc, char *argv [])
-{
- if(argc != 7){
- printf("Usage: %s <abslogdir> <client> <loginterval> <ip> <port> <key>",argv[0]);
- exit(1);
- }
-
- //----- Init global variables
- __client=argv[2];
- __loginterval=atoi(argv[3]);
- __ip=argv[4];
- __port=atoi(argv[5]);
- __key=argv[6];
- // __logdir:
- strcat(__logdir,argv[1]);
- strcat(__logdir,"/");
- strcat(__logdir,__client);
- // __regpower:
- strcat(__regpower,INA260_SYSFS);
- strcat(__regpower,"/");
- strcat(__regpower,__client);
- strcat(__regpower,"/");
- strcat(__regpower,INA260_POWER_REGISTER);
-
- //----- Sanity checks
- signal(SIGINT,sighandler);
- mkdirp(__logdir);
- if(__loginterval<MIN_INTERVAL){
- printf("Log interval is too small (min=%ds)\n",MIN_INTERVAL);
- exit(2);
- }
- if(FILE_EXISTS(__regpower)){
- printf("Logger cannot access to %s\n",__regpower);
- exit(3);
- }
-
- //----- Prepare our context and publisher
- void *zmq_context = zmq_ctx_new ();
- void *zmq_publisher = zmq_socket (zmq_context, ZMQ_PUB);
- char bindto[STATIC_LEN];
- sprintf(bindto,"tcp://%s:%d",__ip,__port);
- int rc = zmq_connect (zmq_publisher, bindto);
- if(rc!=0){
- printf("Failed to connect to %s\n",bindto);
- exit(1);
- }
-
-
-
- //----- Start logging
- pthread_t zmq_thread;
- printf("Logger started [client=%s,interval=%ds]\n",__client,__loginterval);
-
- FILE *regptr,*logptr;
- char logfilepath[STATIC_LEN]="";
- regptr=fopen("/home/loic/out.txt","r");
- char buffer[STATIC_LEN];
- int power;
- time_t interval;
- struct timespec power_ts;
- int queue_id=0;
-
- // Init queues
- for(int i=0;i<MAX_QUEUES;i++){
- queues[queue_id].issending=0;
- }
- pthread_create(&zmq_thread, NULL, publisher, zmq_publisher);
-
- while(!__stop){
- interval=INTERVAL(__loginterval);
- // Log current interval
- queue_id=(queue_id+1)>=MAX_QUEUES ? 0 : (queue_id+1);
- // Busy wait:
- while(queues[queue_id].issending){};
- // Write msg header:
- *queues[queue_id].msg='\0';
- sprintf(queues[queue_id].msg,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,interval);
- queues[queue_id].size=strlen(queues[queue_id].msg);
- // Monitor:
- while((TIMESTAMP()-interval)<__loginterval){
- if(__stop)
- break;
- // Read power:
- fgets(buffer,STATIC_LEN,regptr);
- // Get power measurement timestamp:
- clock_gettime(CLOCK_REALTIME,&power_ts);
- // Write measurement into msg buffer:
- char line[MAX_RECORD_LEN];
- if((queues[queue_id].size+MAX_RECORD_LEN)>ZMQ_MSG_SIZE){
- printf("To many measurements to publish. Please increase ZMQ_MSG_SIZE\n");
- } else {
- sprintf(queues[queue_id].msg+queues[queue_id].size,"%ld,%ld,%d\n",power_ts.tv_sec,power_ts.tv_nsec,atoi(buffer));
- queues[queue_id].size+=strlen(queues[queue_id].msg+queues[queue_id].size);
- }
- // Reset power register file:
- fseek(regptr,0,SEEK_SET);
-#if LOGGERS_DELAY > 0
- usleep(LOGGERS_DELAY*1000);
-#endif
- //printf("Tick\n"); fflush(stdout);
- }
- queues[queue_id].issending=1;
- }
-
- fclose(regptr);
- pthread_join(zmq_thread, NULL);
- zmq_close (zmq_publisher);
- zmq_ctx_destroy (zmq_context);
- return 0;
-}
-
-
-void *publisher(void *zmq_publisher){
- int queue_id=0;
- while(!__stop){
- if(queues[queue_id].issending){
- printf("Publishing...");
- zmq_send(zmq_publisher,queues[queue_id].msg,queues[queue_id].size,0);
- queues[queue_id].issending=0;
- printf("done\n");
- } else {
-#if LOGGERS_DELAY > 0
- usleep(LOGGERS_DELAY*1000);
-#endif
- continue;
- }
- queue_id++;
- if(queue_id>=MAX_QUEUES)
- queue_id=0;
- }
- pthread_exit(EXIT_SUCCESS);
-} \ No newline at end of file
diff --git a/src/publisher.c b/src/publisher.c
index 8c8c2f3..03b3a1a 100644
--- a/src/publisher.c
+++ b/src/publisher.c
@@ -1,130 +1,170 @@
-#include <zmq.h>
+#include "utils.h"
#include <assert.h>
+#include <libgen.h>
+#include <pthread.h>
#include <signal.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <zmq.h>
-#include "utils.h"
-
-// Global:
-char *__logdir;
-char *__key;
-char *__interface;
-char *__ip;
-int __loginterval;
-int __port;
-unsigned char __stop=0;
+#ifndef LOG_DELAY
+#define LOG_DELAY 0
+#endif
-void publish(void *publisher, char *filepath, char* client, long int interval);
+/// @brief Set to non-zero to stop the processes
+unsigned char __stop = 0;
-void sighandler(int signo){
- if (signo == SIGINT){
+/**
+ * @brief Stop process properly on SIGINT
+ *
+ * @param signo
+ */
+void sighandler(int signo) {
+ if (signo == SIGINT) {
printf("Stopping...\n");
- __stop=1;
+ __stop = 1;
}
}
-int main (int argc, char *argv [])
-{
- if(argc != 6){
- printf("Usage: %s <abslogdir> <loginterval> <ip> <port> <key>",argv[0]);
- exit(1);
- }
+void *publisher(void *zmq_publisher);
- //----- Init global variables
- __logdir=argv[1];
- __loginterval=atoi(argv[2]);
- __ip=argv[3];
- __port=atoi(argv[4]);
- __key=argv[5];
-
- //----- Sanity checks
- signal(SIGINT,sighandler);
- if(__loginterval<MIN_INTERVAL){
- printf("Log interval is too small (min=%ds)\n",MIN_INTERVAL);
- exit(2);
- }
+typedef struct queue {
+ int size;
+ char issending;
+ char msg[ZMQ_MSG_SIZE];
+} queue;
+queue queues[MAX_QUEUES];
- //----- Prepare our context and publisher
- void *context = zmq_ctx_new ();
- void *publisher = zmq_socket (context, ZMQ_PUB);
- char bindto[STATIC_LEN];
- sprintf(bindto,"tcp://%s:%d",__ip,__port);
- int rc = zmq_connect (publisher, bindto);
- if(rc!=0){
- printf("Failed to connect to %s\n",bindto);
- exit(1);
- }
+int main(int argc, char *argv[]) {
+ if (argc != 6) {
+ printf("Usage: %s <client> <loginterval> <ip> <port> <key>",
+ argv[0]);
+ exit(1);
+ }
+
+ //----- Init arguments
+ char *client = argv[1];
+ int loginterval = atoi(argv[2]);
+ char *ip = argv[3];
+ int port = atoi(argv[4]);
+ char *key = argv[5];
+
+ // __regpower:
+ char regpower[STATIC_LEN]="";
+ strcat(regpower, INA260_SYSFS);
+ strcat(regpower, "/");
+ strcat(regpower, client);
+ strcat(regpower, "/");
+ strcat(regpower, INA260_POWER_REGISTER);
- //----- Start publisher
- struct dirent *de; // Pointer for directory entry
- while(!__stop){
- int interval=INTERVAL(__loginterval); // Current interval
- int interval_next=INTERVAL_NEXT(__loginterval); // The next one
- DIR *dr = opendir(__logdir); // Open the log directory
- if(dr !=NULL){
- while ((de = readdir(dr)) != NULL){
- // Iterate over each loggers directories
- if(strcmp(de->d_name,".") && strcmp(de->d_name,"..")){
- char *client=de->d_name; // Directory name is the client name
- // Build the path for files we are looking for:
- char logfile[STATIC_LEN];
- char logfile_next[STATIC_LEN];
- sprintf(logfile,"%s/%s/%ld",__logdir,client,interval);
- sprintf(logfile_next,"%s/%s/%ld",__logdir,client,interval_next);
- // As long as next logfile is not available, we should wait
- // before sending the current one (since loggers are working on it)
- printf("Waiting for %s logger measurements...\n",client);
- while(!FILE_EXISTS(logfile_next) && (!__stop)){
- sleep(1);
- }
- // Send current one:
- if(FILE_EXISTS(logfile)){
- printf("Publishing %s measurements\n",client);
- publish(publisher,logfile,client,interval);
- remove(logfile); // Prevent log accumulation
- }
- }
- }
- closedir(dr);
- }
- else {
- // If open fails, directory probably do not exists
- // yet so wait:
- sleep(1);
- }
+ //----- Sanity checks
+ signal(SIGINT, sighandler);
+ if (loginterval < MIN_INTERVAL) {
+ printf("Log interval is too small (min=%ds)\n", MIN_INTERVAL);
+ exit(2);
+ }
+ if (!FILE_EXISTS(regpower)) {
+ printf("Logger cannot access to %s\n", regpower);
+ exit(3);
+ }
+
+ //----- Prepare our context and publisher
+ void *zmq_context = zmq_ctx_new();
+ void *zmq_publisher = zmq_socket(zmq_context, ZMQ_PUB);
+ char bindto[STATIC_LEN];
+ sprintf(bindto, "tcp://%s:%d", ip, port);
+ int rc = zmq_connect(zmq_publisher, bindto);
+ if (rc != 0) {
+ printf("Failed to connect to %s\n", bindto);
+ exit(1);
+ }
+
+ //----- Init logging variables
+ pthread_t zmq_thread;
+ FILE *regptr, *logptr;
+ char logfilepath[STATIC_LEN] = "";
+ regptr = fopen(regpower, "r");
+ char buffer[STATIC_LEN];
+ time_t interval;
+ struct timespec power_ts;
+ int queue_id = 0;
+
+ // Init queues
+ for (int i = 0; i < MAX_QUEUES; i++) {
+ queues[queue_id].issending = 0;
+ }
+ pthread_create(&zmq_thread, NULL, publisher, zmq_publisher);
+
+ //----- Start logging
+ printf("Logger started [client=%s,interval=%ds]\n", client, loginterval);
+ while (!__stop) {
+ interval = INTERVAL(loginterval);
+ // Log current interval
+ queue_id = (queue_id + 1) >= MAX_QUEUES ? 0 : (queue_id + 1);
+ // Busy wait:
+ while (queues[queue_id].issending) {
+ };
+ // Write msg header:
+ *queues[queue_id].msg = '\0';
+ sprintf(queues[queue_id].msg, "%s\n%s\n%s\n%ld\n", ZMQ_TOKEN, key, client,
+ interval);
+ queues[queue_id].size = strlen(queues[queue_id].msg);
+ // Monitor:
+ while ((TIMESTAMP() - interval) < loginterval) {
+ // Check if should stop:
+ if (__stop)
+ break;
+ // Read power:
+ fgets(buffer, STATIC_LEN, regptr);
+ // Get power measurement timestamp:
+ clock_gettime(CLOCK_REALTIME, &power_ts);
+ // Write measurement into msg buffer:
+ char line[MAX_RECORD_LEN];
+ if ((queues[queue_id].size + MAX_RECORD_LEN) > ZMQ_MSG_SIZE) {
+ printf(
+ "To many measurements to publish. Please increase ZMQ_MSG_SIZE\n");
+ } else {
+ sprintf(queues[queue_id].msg + queues[queue_id].size, "%ld,%ld,%d\n",
+ power_ts.tv_sec, power_ts.tv_nsec, atoi(buffer));
+ queues[queue_id].size +=
+ strlen(queues[queue_id].msg + queues[queue_id].size);
+ }
+ // Reset power register file:
+ fseek(regptr, 0, SEEK_SET);
+#if LOG_DELAY > 0
+ usleep(LOG_DELAY * 1000);
+#endif
}
+ queues[queue_id].issending = 1;
+ }
- zmq_close (publisher);
- zmq_ctx_destroy (context);
+ //----- Cleaning
+ fclose(regptr);
+ pthread_join(zmq_thread, NULL);
+ zmq_close(zmq_publisher);
+ zmq_ctx_destroy(zmq_context);
- return 0;
+ return 0;
}
-void publish(void *publisher, char *filepath, char* client, long int interval){
- // Build message header:
- char buffer[ZMQ_MSG_SIZE];
- sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval);
- int msglen=strlen(buffer);
-
- char * line = NULL;
- size_t len = 0;
- ssize_t read;
- FILE *fptr;
- fptr=fopen(filepath,"r");
- // Put every lines in the buffer and send it
- while ((read = getline(&line, &len, fptr)) != -1) {
- if((read+msglen) <ZMQ_MSG_SIZE){
- strcat(buffer,line);
- msglen+=read;
- } else {
- // It buffer is full, we send the message and create another one
- zmq_send (publisher, buffer, msglen, 0);
- // Build new message header:
- sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval);
- strcat(buffer,line);
- msglen=strlen(buffer);
- }
+void *publisher(void *zmq_publisher) {
+ int queue_id = 0;
+ while (!__stop) {
+ if (queues[queue_id].issending) {
+ printf("Publishing...");
+ zmq_send(zmq_publisher, queues[queue_id].msg, queues[queue_id].size, 0);
+ queues[queue_id].issending = 0;
+ printf("done\n");
+ } else {
+#if LOG_DELAY > 0
+ usleep(LOG_DELAY * 1000);
+#endif
+ continue; // Queues are always filled in order by the logger (from 0 to n)
}
- fclose(fptr);
- // Finally send the last message (or the only one)
- zmq_send (publisher, buffer, msglen, 0);
+ queue_id++;
+ if (queue_id >= MAX_QUEUES)
+ queue_id = 0;
+ }
+ pthread_exit(EXIT_SUCCESS);
} \ No newline at end of file
diff --git a/src/subscriber.c b/src/subscriber.c
index 9902d92..08e5a25 100644
--- a/src/subscriber.c
+++ b/src/subscriber.c
@@ -3,7 +3,14 @@
#include <signal.h>
#include "utils.h"
+/// @brief Set to non-zero to stop the processes
unsigned char __stop=0;
+
+/**
+ * @brief Stop process properly on SIGINT
+ *
+ * @param signo
+ */
void sighandler(int signo){
if (signo == SIGINT){
printf("Stopping...\n");
@@ -83,7 +90,6 @@ int main (int argc, char *argv [])
// Write all the measurements:
if(line>4){
fwrite(token, strlen(token), 1, fptr);
- printf("%s\n",token);
fwrite("\n",1,1,fptr);
}