Skip to content
Snippets Groups Projects
Commit 2dcaed33 authored by mohamad.moussa's avatar mohamad.moussa
Browse files

Add SWARM with predcition

parent 3dcab5de
No related branches found
No related tags found
No related merge requests found
Showing with 70158 additions and 29 deletions
......@@ -56,7 +56,7 @@ def mainProg(neighbors, rank, base_port, ip_address):
sender = f"node{sender}"
send_Message(rank,sender,"WEIGHTS")
except ConnectionRefusedError as e:
logger.error(f"Exception {e}\nConnection refused when {rank} tried to send WEIGHTS to {sender}'s service {sender_ip}.")
logger.error(f"Exception {e}\nConnection refused when {rank} tried to send WEIGHTS to {sender}'s service {sender}.")
continue
if message_rcvd == "WEIGHTS":
......
This diff is collapsed.
FROM python:3.10-slim
WORKDIR /app
RUN pip install --no-cache-dir -r requirements.txt
COPY forecast.py /app/
COPY Dev0.csv /app/
COPY requirements.txt /app/
EXPOSE 5000
ENV NAME ForecastApp
CMD ["python", "forecast.py"]
\ No newline at end of file
FROM python:3.10-slim
WORKDIR /App
COPY comPred.py /App/
RUN pip install requests
CMD ["python", "comPred.py"]
# Lab : Distributed Continuum Computing for Federated Learning
## Overview
The objective of this project is to create two different services on different EC2 instances. The first service aims to establish a distributed communication tool between the nodes. The second service focuses on providing localised forecasts. To achieve this goal, the project involves the development and building of both containers. Subsequently, Docker Swarm is employed to deploy these containers onto the respective EC2 instances.
## Prerequisites
- Python 3.x
- Docker and Docker Compose
- AWS account
- Dockerhub account
## Initial Setup
1. **Create Virtual Machines**:
- Create *n* EC2 instances on Amazon.
- Save the private IP addresses in the file `instance_privateIPs.txt`
2. **Install Docker and Docker Compose**:
- Use the provided script `install-docker.sh` to install Docker and Docker Compose on each VM. **Note:** This script is for installing Docker and Docker Compose on Ubuntu.
- First, upload the `install-docker.sh` script to each VM.
- Run the following commands to make the script executable and run it
```bash
chmod +x install-docker.sh
./install-docker.sh
```
3. **Create a Docker Swarm**:
- On one of the VMs, initialize the Docker Swarm and note the join token:
```bash
docker swarm init
```
- Copy the `docker swarm join` command displayed after initialization.
- On all other VMs, join the Docker Swarm using the command copied from the previous step:
```bash
docker swarm join --token <SWARM_JOIN_TOKEN> <VM1_IP>:2377
```
## Building and Pushing Docker Images
To deploy your distributed algorithm, you need to create and build Docker images using the provided `Dockerfile.forecast` and `Dockerfile.main` files and push them to Docker Hub. Follow these steps:
1. **Prepare Dockerfiles**:
- Use the provided `Dockerfile.forecast` and `Dockerfile.main` files to define the specifications of your Docker images. Ensure that the source code is accessible on a machine with the same CPU architecture as your target devices. For simplicity, you can use one of the VMs.
2. **Build Docker Images**:
- Navigate to the directory containing your Dockerfile.forecast and Dockerfile.main files, then execute the following commands to build the Docker images. Replace `<your_forecast_image_name>` and `<your_main_image_name>` with suitable names for your images.
```
docker build -t <your_forecast_image_name> -f Dockerfile.forecast .
docker build -t <your_main_image_name> -f Dockerfile.main .
```
3. **Push to Docker Hub**:
- Log in to your Docker Hub account using the command:
```
docker login
```
- Push your Docker images to Docker Hub using the following commands. Replace `<your_forecast_image_name>`, `<your_main_image_name>`, `<your_dockerhub_username>` with the names of the images you built and your Docker Hub username respectively. Also, replace `<forecast_tag>` and `<main_tag>` with appropriate tags for your images.
```
docker tag <your_forecast_image_name> <your_dockerhub_username>/<your_forecast_image_name>:<forecast_tag>
docker push <your_dockerhub_username>/<your_forecast_image_name>:<forecast_tag>
docker tag <your_main_image_name> <your_dockerhub_username>/<your_main_image_name>:<main_tag>
docker push <your_dockerhub_username>/<your_main_image_name>:<main_tag>
```
## Deployment
These deployment steps should be performed on the master node of the Docker Swarm:
1. **Create Neighbors Configuration**:
- Create a `neighbors.json` file ([view example](neighbors.json)) that specifies the neighbors for each node in your distributed algorithm network.
2. **Create Docker Config**:
- Use the `docker config create` command to create a Docker config from your `neighbors.json` file:
```bash
docker config create <config_name> neighbors.json
```
3. **Generate Docker Compose File**:
- Use the provided script `generate-docker-compose.py` to generate a Docker Compose configuration for deploying your distributed algorithm.
- Before running the script, ensure to replace line 06 and 07 in the script with the URL of your previously created Docker images:
```python
forecast_image = "<your_dockerhub_username>/<your_forecast_image_name>:<forecast_tag>"
comm_image = "<your_dockerhub_username>/<your_main_image_name>:<main_tag>"
```
- Additionally, replace lines 34 and 74 in the script `generate-docker-compose.py` with your `<config_name>` created
```python
- 34| source: <config_name>
- 73| configs:
74| <config_name>:
```
- Run the script:
```bash
python generate-docker-compose.py
```
4. **Deploy the Stack**:
- Once the Docker Compose file is generated, deploy the stack to your Docker Swarm using the following command:
```bash
docker stack deploy -c docker-compose.yml <stack_name>
```
Replace `<stack_name>` with a suitable name for your Docker stack.
5. **Monitor Deployment**:
- Monitor the deployment and check the logs of individual containers
## Troubleshooting
- Review container logs
- **Verify that the required ports are open in the security groups to enable communication between nodes**.
import socket
import time
import sys
import json
import select
import os
import logging
import requests
NB_ITERATIONS = 20
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s:%(message)s')
logger = logging.getLogger()
def fetch_forecast(date):
url = f"http://{SERVER_ID}:5000?date={date}"
response = requests.get(url)
if response.status_code == 200:
forecast_data = response.json()
return forecast_data.get('prediction')
else:
logger.error(f"Failed to fetch forecast. Status code: {response.status_code}")
return None
def send_Message(src,dest,msg):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((dest, base_port))
data = json.dumps((msg, src))
client_socket.send(data.encode())
time.sleep (3)
client_socket.close()
def mainProg(neighbors, NODE_ID,forecast_date):
c = 0
iteration = 0
received_dict = {neighbor: False for neighbor in neighbors}
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('0.0.0.0', base_port))
server_socket.listen(1000)
time.sleep(60)
logger.info(f"Server {NODE_ID} listening on {NODE_ID} and port {base_port}")
forecast = fetch_forecast(forecast_date)
if forecast is None:
logger.error("Failed to fetch forecast. Exiting.")
return
logger.info(f"============== Received forecast: =========== {forecast}")
while True:
for neighbor in neighbors:
try:
send_Message(NODE_ID,neighbor,"REQUEST_WEIGHT")
#logger.info(f"Rank {NODE_ID} sent {message} to {neighbor_ip} at iteration {iteration}")
except ConnectionRefusedError as e:
logger.error(f"Exception {e} \n Connection refused when {NODE_ID} tried to connect to {neighbor}'s port {base_port}.")
continue
logger.info(f"I am at -- \t iteration {iteration}")
readable, _, _ = select.select([server_socket], [], [], 1) # x seconds timeout
if readable:
conn, addr = server_socket.accept()
data = conn.recv(1024).decode()
message_rcvd, sender = json.loads(data)
conn.close()
if message_rcvd == "REQUEST_WEIGHT":
try:
send_Message(NODE_ID,sender,"WEIGHTS")
except ConnectionRefusedError as e:
logger.error(f"Exception {e} \n Connection refused when {NODE_ID} tried to send WEIGHTS to {sender}'s port {base_port}.")
continue
if message_rcvd == "WEIGHTS":
logger.info(f"I am here -- iteration {iteration} and message received {message_rcvd} from {sender}")
received_dict[sender] = True
iteration += 1
return
if __name__ == "__main__":
base_port = 12345
NODE_ID = os.getenv('NODE_ID')
SERVER_ID = os.getenv('SERVER_ID')
NEIGHBORHOOD_CONFIG = "/etc/neighbors.json"
forecast_date = os.getenv('FORECAST_DATE')
if forecast_date is None:
logger.error("Forecast date not provided. Exiting.")
sys.exit(1)
with open(NEIGHBORHOOD_CONFIG) as f:
neighborhood_data = json.load(f)
NEIGHBORHOOD = neighborhood_data.get(NODE_ID, [])
logger.info(f"Node {NODE_ID} has neighbors: {NEIGHBORHOOD}")
mainProg(list(NEIGHBORHOOD), NODE_ID, forecast_date)
version: '3.8'
services:
forecast0:
image: mohamadmoussa/forecast_image:1.0
networks:
- localNetwork0_overlay
deploy:
placement:
constraints:
- node.hostname == ip-10-0-0-42
environment:
- SERVER_ID=forecast0
comm0:
image: mohamadmoussa/comm_forecast:1.0
depends_on:
- forecast0
configs:
- source: neighborhood_config
target: /etc/neighbors.json
networks:
- commNetwork_overlay
- localNetwork0_overlay
deploy:
placement:
constraints:
- node.hostname == ip-10-0-0-42
environment:
- NODE_ID=comm0
- FORECAST_DATE=2022-06-04T14:15:00
- SERVER_ID=forecast0
forecast1:
image: mohamadmoussa/forecast_image:1.0
networks:
- localNetwork1_overlay
deploy:
placement:
constraints:
- node.hostname == ip-10-0-0-163
environment:
- SERVER_ID=forecast1
comm1:
image: mohamadmoussa/comm_forecast:1.0
depends_on:
- forecast1
configs:
- source: neighborhood_config
target: /etc/neighbors.json
networks:
- commNetwork_overlay
- localNetwork1_overlay
deploy:
placement:
constraints:
- node.hostname == ip-10-0-0-163
environment:
- NODE_ID=comm1
- FORECAST_DATE=2022-06-04T14:15:00
- SERVER_ID=forecast1
forecast2:
image: mohamadmoussa/forecast_image:1.0
networks:
- localNetwork2_overlay
deploy:
placement:
constraints:
- node.hostname == ip-10-0-0-253
environment:
- SERVER_ID=forecast2
comm2:
image: mohamadmoussa/comm_forecast:1.0
depends_on:
- forecast2
configs:
- source: neighborhood_config
target: /etc/neighbors.json
networks:
- commNetwork_overlay
- localNetwork2_overlay
deploy:
placement:
constraints:
- node.hostname == ip-10-0-0-253
environment:
- NODE_ID=comm2
- FORECAST_DATE=2022-06-04T14:15:00
- SERVER_ID=forecast2
forecast3:
image: mohamadmoussa/forecast_image:1.0
networks:
- localNetwork3_overlay
deploy:
placement:
constraints:
- node.hostname == ip-10-0-0-74
environment:
- SERVER_ID=forecast3
comm3:
image: mohamadmoussa/comm_forecast:1.0
depends_on:
- forecast3
configs:
- source: neighborhood_config
target: /etc/neighbors.json
networks:
- commNetwork_overlay
- localNetwork3_overlay
deploy:
placement:
constraints:
- node.hostname == ip-10-0-0-74
environment:
- NODE_ID=comm3
- FORECAST_DATE=2022-06-04T14:15:00
- SERVER_ID=forecast3
networks:
localNetwork0_overlay:
driver: overlay
localNetwork1_overlay:
driver: overlay
localNetwork2_overlay:
driver: overlay
localNetwork3_overlay:
driver: overlay
commNetwork_overlay:
driver: overlay
configs:
neighborhood_config:
external: true
from flask import Flask, jsonify, request
import pandas as pd
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
app = Flask(__name__)
def create_lstm_model(input_shape):
model = Sequential()
model.add(LSTM(50, activation='relu', input_shape=input_shape))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mse')
return model
def read_csv_and_predict(date):
df = pd.read_csv('Dev0.csv', parse_dates=['DATETIME'])
df['DATETIME'] = pd.to_datetime(df['DATETIME'], utc=True)
df.set_index('DATETIME', inplace=True)
date = pd.to_datetime(date, utc=True)
previous_values = df.loc[:date].iloc[-5:-1].values.flatten()
if len(previous_values) < 4:
return None
previous_values = previous_values.reshape((1, 4, 1))
model = create_lstm_model((4, 1))
prediction = model.predict(previous_values)
return prediction[0][0]
@app.route('/', methods=['GET'])
def forecast():
date = request.args.get('date')
if not date:
return jsonify({'error': 'Date parameter is required'}), 400
prediction = read_csv_and_predict(date)
if prediction is None:
return jsonify({'error': 'Not enough data to make a prediction'}), 400
prediction = float(prediction)
return jsonify({'prediction': prediction})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
input_file = 'instance_privateIPs.txt'
output_file = 'docker-compose.yml'
forecast_image = 'mohamadmoussa/forecast_image:1.0'
comm_image = 'mohamadmoussa/comm_forecast:1.0'
FORECAST_DATE = '2022-06-04T14:15:00'
with open(input_file, 'r') as file:
ip_addresses = [line.strip() for line in file if line.strip()]
forecast_service_template = """
forecast{i}:
image: {forecast_img}
networks:
- localNetwork{i}_overlay
deploy:
placement:
constraints:
- node.hostname == ip-{ip}
environment:
- SERVER_ID=forecast{i}
"""
comm_service_template = """
comm{i}:
image: {comm_img}
depends_on:
- forecast{i}
configs:
- source: neighborhood_config
target: /etc/neighbors.json
networks:
- commNetwork_overlay
- localNetwork{i}_overlay
deploy:
placement:
constraints:
- node.hostname == ip-{ip}
environment:
- NODE_ID=comm{i}
- FORECAST_DATE={forecast_date}
- SERVER_ID=forecast{i}
"""
compose_content = """version: '3.8'
services:"""
for i, ip in enumerate(ip_addresses):
formatted_ip = ip.replace('.', '-')
compose_content += forecast_service_template.format(i=i, ip=formatted_ip, forecast_img=forecast_image)
compose_content += comm_service_template.format(i=i, ip=formatted_ip, comm_img=comm_image, forecast_date=FORECAST_DATE)
compose_content += """
networks:"""
for i in range(len(ip_addresses)):
compose_content += f"""
localNetwork{i}_overlay:
driver: overlay"""
compose_content += """
commNetwork_overlay:
driver: overlay
configs:
neighborhood_config:
external: true
"""
with open(output_file, 'w') as file:
file.write(compose_content)
print(f"Docker-compose file generated successfully: {output_file}")
10.0.0.42
10.0.0.163
10.0.0.253
10.0.0.74
\ No newline at end of file
{
"comm0": [
"comm1",
"comm3"
],
"comm1": [
"comm0",
"comm2"
],
"comm2": [
"comm1",
"comm3"
],
"comm3": [
"comm0",
"comm2"
]
}
\ No newline at end of file
pandas
flask
tensorflow
\ No newline at end of file
......@@ -6,11 +6,18 @@ import select
import os
import logging
NB_ITERATIONS = 20
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s:%(message)s')
logger = logging.getLogger()
def send_Message(src,dest,msg):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((dest, base_port))
data = json.dumps((msg, src))
client_socket.send(data.encode())
time.sleep (3)
client_socket.close()
def mainProg(neighbors, NODE_ID):
c = 0
iteration = 0
......@@ -24,22 +31,13 @@ def mainProg(neighbors, NODE_ID):
logger.info(f"Server {NODE_ID} listening on {NODE_ID} and port {base_port}")
while iteration < NB_ITERATIONS:
if iteration < 3:
while True:
for neighbor in neighbors:
try:
neighbor_ip = neighbor
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
logger.info(f"The neighbor_ip is {neighbor_ip}")
client_socket.connect((neighbor_ip, base_port))
message = "REQUEST_WEIGHT"
data = json.dumps((message, NODE_ID))
client_socket.send(data.encode())
logger.info(f"Rank {NODE_ID} sent {message} to {neighbor_ip} at iteration {iteration}")
time.sleep(3)
client_socket.close()
send_Message(NODE_ID,neighbor,"REQUEST_WEIGHT")
#logger.info(f"Rank {NODE_ID} sent {message} to {neighbor_ip} at iteration {iteration}")
except ConnectionRefusedError as e:
logger.error(f"Exception {e} \n Connection refused when {NODE_ID} tried to connect to {neighbor_ip}'s port {base_port}.")
logger.error(f"Exception {e} \n Connection refused when {NODE_ID} tried to connect to {neighbor}'s port {base_port}.")
continue
logger.info(f"I am at -- \t iteration {iteration}")
......@@ -54,20 +52,13 @@ def mainProg(neighbors, NODE_ID):
if message_rcvd == "REQUEST_WEIGHT":
try:
sender_ip = sender
cl_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cl_socket.connect((sender_ip, base_port))
message = "WEIGHTS"
data = json.dumps((message, NODE_ID))
cl_socket.send(data.encode())
time.sleep(3)
cl_socket.close()
send_Message(NODE_ID,sender,"WEIGHTS")
except ConnectionRefusedError as e:
logger.error(f"Exception {e} \n Connection refused when {NODE_ID} tried to send WEIGHTS to {sender_ip}'s port {base_port}.")
logger.error(f"Exception {e} \n Connection refused when {NODE_ID} tried to send WEIGHTS to {sender}'s port {base_port}.")
continue
if message_rcvd == "WEIGHTS":
logger.info(f"I am here -- iteration {iteration} and message received {message_rcvd} from {sender_ip}")
logger.info(f"I am here -- iteration {iteration} and message received {message_rcvd} from {sender}")
received_dict[sender] = True
iteration += 1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment