Skip to content
Snippets Groups Projects
Select Git revision
  • 99feeae89e59998b57dc7c579108093b00e8805f
  • master default protected
2 results

sock.py

Blame
  • sock.py 3.15 KiB
    import socket
    import time
    import sys
    import json
    import select
    import os
    import logging
    
    # Setup logging
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', handlers=[logging.StreamHandler(sys.stdout)])
    logger = logging.getLogger(__name__)
    
    
    def send_Message(sock,src,dest,msg):
        sock.connect((dest, base_port))
        data = json.dumps((msg, src))
        sock.send(data.encode())
    
    def mainProg(neighbors, rank, base_port, ip_address):
        c = 0
        iteration = 0
        received_dict = {neighbor: False for neighbor in neighbors}
    
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_socket.bind((ip_address, base_port))
        server_socket.listen(1000)
    
        time.sleep(10)
        logger.info(f"Server {rank} listening on {ip_address} and port {base_port}")
    
        while True:
            for neighbor in neighbors:
                try:
                    neighbor_ip = f"node{neighbor}" 
                    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    send_Message(client_socket,rank,neighbor_ip,"REQUEST_WEIGHT")
                    logger.info(f"Rank {rank} sent REQUEST_WEIGHT to {neighbor} at iteration {iteration}")
                    time.sleep(3)
                    client_socket.close()
                except ConnectionRefusedError as e:
                    logger.error(f"Exception {e}\nConnection refused when {rank} tried to connect to {neighbor}'s service {neighbor_ip}.")
                    continue
    
            logger.info(f"I am at -- iteration {iteration}")
    
            readable, _, _ = select.select([server_socket], [], [], 1)  # x seconds timeout
    
            if readable:
                conn, addr = server_socket.accept()
                data = conn.recv(1024).decode()
                message_rcvd, sender = json.loads(data)
                conn.close()
                
                if message_rcvd == "REQUEST_WEIGHT":
                    try:
                        sender_ip = f"node{sender}" 
                        cl_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        send_Message(cl_socket,rank,sender_ip,"WEIGHTS")
                        time.sleep(3)
                        cl_socket.close()
                    except ConnectionRefusedError as e:
                        logger.error(f"Exception {e}\nConnection refused when {rank} tried to send WEIGHTS to {sender}'s service {sender_ip}.")
                        continue
    
                if message_rcvd == "WEIGHTS":
                    received_dict[sender] = True
                    logger.info(f"I am here -- iteration {iteration} and message received {message_rcvd} from {sender}")
    
            iteration += 1
    
        return
    
    if __name__ == "__main__":
        rank = int(os.getenv('RANK'))
        base_port = int(os.getenv('BASE_PORT'))
        ip_address = os.getenv('IP_ADDRESS')
    
        if  rank <0 or not base_port or not ip_address:
            logger.error("Environment variables RANK, BASE_PORT, and IP_ADDRESS must be set.")
            sys.exit(1)
    
        neighbors_file = '/app/neighbors.txt'
    
        with open(neighbors_file, "r") as file:
            neighbors = [int(line.strip()) for line in file]
    
        logger.info(f"I am rank {rank} running on port {base_port} and neighbors are {neighbors}")
    
        mainProg(neighbors, rank, base_port, ip_address)