Select Git revision
sock.py 3.15 KiB
import socket
import time
import sys
import json
import select
import os
import logging
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', handlers=[logging.StreamHandler(sys.stdout)])
logger = logging.getLogger(__name__)
def send_Message(sock,src,dest,msg):
sock.connect((dest, base_port))
data = json.dumps((msg, src))
sock.send(data.encode())
def mainProg(neighbors, rank, base_port, ip_address):
c = 0
iteration = 0
received_dict = {neighbor: False for neighbor in neighbors}
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((ip_address, base_port))
server_socket.listen(1000)
time.sleep(10)
logger.info(f"Server {rank} listening on {ip_address} and port {base_port}")
while True:
for neighbor in neighbors:
try:
neighbor_ip = f"node{neighbor}"
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
send_Message(client_socket,rank,neighbor_ip,"REQUEST_WEIGHT")
logger.info(f"Rank {rank} sent REQUEST_WEIGHT to {neighbor} at iteration {iteration}")
time.sleep(3)
client_socket.close()
except ConnectionRefusedError as e:
logger.error(f"Exception {e}\nConnection refused when {rank} tried to connect to {neighbor}'s service {neighbor_ip}.")
continue
logger.info(f"I am at -- iteration {iteration}")
readable, _, _ = select.select([server_socket], [], [], 1) # x seconds timeout
if readable:
conn, addr = server_socket.accept()
data = conn.recv(1024).decode()
message_rcvd, sender = json.loads(data)
conn.close()
if message_rcvd == "REQUEST_WEIGHT":
try:
sender_ip = f"node{sender}"
cl_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
send_Message(cl_socket,rank,sender_ip,"WEIGHTS")
time.sleep(3)
cl_socket.close()
except ConnectionRefusedError as e:
logger.error(f"Exception {e}\nConnection refused when {rank} tried to send WEIGHTS to {sender}'s service {sender_ip}.")
continue
if message_rcvd == "WEIGHTS":
received_dict[sender] = True
logger.info(f"I am here -- iteration {iteration} and message received {message_rcvd} from {sender}")
iteration += 1
return
if __name__ == "__main__":
rank = int(os.getenv('RANK'))
base_port = int(os.getenv('BASE_PORT'))
ip_address = os.getenv('IP_ADDRESS')
if rank <0 or not base_port or not ip_address:
logger.error("Environment variables RANK, BASE_PORT, and IP_ADDRESS must be set.")
sys.exit(1)
neighbors_file = '/app/neighbors.txt'
with open(neighbors_file, "r") as file:
neighbors = [int(line.strip()) for line in file]
logger.info(f"I am rank {rank} running on port {base_port} and neighbors are {neighbors}")
mainProg(neighbors, rank, base_port, ip_address)