From d30a84a829ff87f7e94faf8a261603a05c20ec0e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Micha=C3=ABl=20Minelli?= <michael@minelli.me>
Date: Tue, 30 May 2023 17:12:54 +0200
Subject: [PATCH] Add multiple process management

---
 ExpressAPI/src/process/ClusterManager.ts  | 71 +++++++++++++++++++++++
 ExpressAPI/src/process/ClusterStrategy.ts |  7 +++
 ExpressAPI/src/process/WorkerPool.ts      | 16 +++++
 ExpressAPI/src/process/WorkerRole.ts      |  7 +++
 ExpressAPI/src/process/WorkerTask.ts      |  6 ++
 5 files changed, 107 insertions(+)
 create mode 100644 ExpressAPI/src/process/ClusterManager.ts
 create mode 100644 ExpressAPI/src/process/ClusterStrategy.ts
 create mode 100644 ExpressAPI/src/process/WorkerPool.ts
 create mode 100644 ExpressAPI/src/process/WorkerRole.ts
 create mode 100644 ExpressAPI/src/process/WorkerTask.ts

diff --git a/ExpressAPI/src/process/ClusterManager.ts b/ExpressAPI/src/process/ClusterManager.ts
new file mode 100644
index 0000000..d49f884
--- /dev/null
+++ b/ExpressAPI/src/process/ClusterManager.ts
@@ -0,0 +1,71 @@
+import cluster, { Worker } from 'node:cluster';
+import WorkerRole          from './WorkerRole';
+import os                  from 'os';
+import ClusterStrategy     from './ClusterStrategy';
+import WorkerPool          from './WorkerPool';
+import logger              from '../shared/logging/WinstonLogger';
+
+
+/*
+ This class create a cluster of workers by following the strategy (array of WorkerPool) given in the constructor.
+ */
+class ClusterManager {
+    public static readonly CORES = os.cpus().length;
+
+    private workers: { [pid: number]: WorkerRole; } = [];
+
+    constructor(private strategy: ClusterStrategy) {}
+
+    private getWorkerPool(role: WorkerRole): WorkerPool | undefined {
+        return this.strategy.find(elem => elem.role === role);
+    }
+
+    private runPrimary() {
+        logger.info(`Number of cores: ${ ClusterManager.CORES }`);
+        logger.info(`Primary process is running`);
+
+        this.strategy.forEach(workerPool => {
+            for ( let i = 0 ; i < workerPool.quantity ; i += 1 ) {
+                const worker = cluster.fork({ role: workerPool.role });
+                this.workers[worker.process.pid] = workerPool.role;
+            }
+        });
+
+        // Listen for dying workers and restart them
+        cluster.on('exit', (worker: Worker, code: number) => {
+            logger.info(`Worker ${ worker.process.pid } exited with code ${ code }`);
+
+            const workerRole = this.workers[worker.process.pid];
+
+            const workerPool = this.getWorkerPool(workerRole);
+            if ( workerPool && workerPool.restartOnFail ) {
+                const newWorker = cluster.fork({ role: workerRole });
+                this.workers[newWorker.process.pid] = workerPool.role;
+            }
+
+            delete this.workers[worker.process.pid];
+        });
+    }
+
+    private runWorker() {
+        const workerRole = Number(process.env['role']);
+
+        const workerPool = this.getWorkerPool(workerRole);
+        if ( workerPool ) {
+            workerPool.loadTask().run();
+        } else {
+            logger.warn(`Process task not found for role ${ workerRole }`);
+        }
+    }
+
+    run() {
+        if ( cluster.isPrimary ) {
+            this.runPrimary();
+        } else {
+            this.runWorker();
+        }
+    }
+}
+
+
+export default ClusterManager;
diff --git a/ExpressAPI/src/process/ClusterStrategy.ts b/ExpressAPI/src/process/ClusterStrategy.ts
new file mode 100644
index 0000000..4628b6c
--- /dev/null
+++ b/ExpressAPI/src/process/ClusterStrategy.ts
@@ -0,0 +1,7 @@
+import WorkerPool from './WorkerPool';
+
+
+type ClusterStrategy = Array<WorkerPool>
+
+export default ClusterStrategy;
+ 
\ No newline at end of file
diff --git a/ExpressAPI/src/process/WorkerPool.ts b/ExpressAPI/src/process/WorkerPool.ts
new file mode 100644
index 0000000..79c469e
--- /dev/null
+++ b/ExpressAPI/src/process/WorkerPool.ts
@@ -0,0 +1,16 @@
+import WorkerRole from './WorkerRole';
+import WorkerTask from './WorkerTask';
+
+
+/*
+ This interface describe a pool of workers.
+ */
+interface WorkerPool {
+    role: WorkerRole,
+    quantity: number,
+    restartOnFail: boolean,
+    loadTask: () => WorkerTask, //This is a function for lazy load the task (only loaded on function call)
+}
+
+
+export default WorkerPool; 
diff --git a/ExpressAPI/src/process/WorkerRole.ts b/ExpressAPI/src/process/WorkerRole.ts
new file mode 100644
index 0000000..ae9ee44
--- /dev/null
+++ b/ExpressAPI/src/process/WorkerRole.ts
@@ -0,0 +1,7 @@
+enum WorkerRole {
+    None = 0,
+    API  = 1
+}
+
+
+export default WorkerRole;
diff --git a/ExpressAPI/src/process/WorkerTask.ts b/ExpressAPI/src/process/WorkerTask.ts
new file mode 100644
index 0000000..d661eb3
--- /dev/null
+++ b/ExpressAPI/src/process/WorkerTask.ts
@@ -0,0 +1,6 @@
+interface WorkerTask {
+    run(): void;
+}
+
+
+export default WorkerTask;
-- 
GitLab