import sys import grpc sys.path.insert(0, '..') sys.path.insert(0, '../shared/netcode') from queue import SimpleQueue from threading import Condition, Lock from concurrent import futures from shared.netcode.netcode_pb2 import * from shared.netcode.netcode_pb2_grpc import * PREFIX = "Server>" start = 0 def generate_unique_comm_id(): global start start = start + 1 return start class NotifierService(NotifierCommunicationServicer): class InternalOpenRequest: def __init__(self, id_code) -> None: self.id_code = id_code self.cv = Condition() self.comm_id = None def fulfill(self): self.cv.acquire() self.comm_id = generate_unique_comm_id() self.cv.notify_all() self.cv.release() class CommPipe: def __init__(self) -> None: self.cv = Condition() self.queue = SimpleQueue() def relay(self, message): self.cv.acquire() self.queue.put(message) self.cv.notify_all() self.cv.release() pass def wait_for_message(self): self.cv.acquire() self.cv.wait() message = self.queue.get() self.cv.release() return message def __init__(self) -> None: super().__init__() self.open_ids = [] self.open_ids_lock = Lock() self.comm_pipes = {} self.comm_pipes_lock = Lock() def create_pipe(self, comm_id): pipe = self.CommPipe() self.comm_pipes_lock.acquire() self.comm_pipes[comm_id] = pipe self.comm_pipes_lock.release() return pipe def get_pipe(self, comm_id): self.comm_pipes_lock.acquire() if comm_id in self.comm_pipes: pipe = self.comm_pipes[comm_id] self.comm_pipes_lock.release() return pipe else: self.comm_pipes_lock.release() return None ########################################################################### # RPCs ########################################################################### def Open(self, request, context): # append id_code to list comm_request = self.InternalOpenRequest(request.id_code) comm_request.cv.acquire() self.open_ids_lock.acquire() self.open_ids.append(comm_request) print(PREFIX, "OpenRequest created.") self.open_ids_lock.release() # wait for id_code answered comm_request.cv.wait() # remove request self.open_ids_lock.acquire() self.open_ids.remove(comm_request) self.open_ids_lock.release() comm_request.cv.release() # return comm_id print(PREFIX, "OpenResponse sent.") return CommResponse(comm_id=comm_request.comm_id) def Answer(self, request, context): print(PREFIX, "AnswerRequest received.") code = request.id_code comm_id = None # check if if_code can be answered self.open_ids_lock.acquire() for open_req in self.open_ids: if code == open_req.id_code: # found open_req.fulfill() comm_id = open_req.comm_id print(PREFIX, "OpenRequest answered.") break self.open_ids_lock.release() # return comm_id if comm_id is None: # error context.set_code(grpc.StatusCode.NOT_FOUND) context.set_details("Error: Open request not found.") return CommResponse() else: print(PREFIX, "AnswerResponse sent.") return CommResponse(comm_id=comm_id) def ReceiveMessage(self, request, context): comm_id = request.comm_id # create pipe comm_pipe = self.create_pipe(comm_id) while True: # TODO: add stop condition # wait for message message = comm_pipe.wait_for_message() # echo message yield Message(comm_id=comm_id, content=message) def SendMessage(self, request, context): comm_id = request.comm_id print("CommID:", request.comm_id, "| Content:", request.content) # get pipe comm_pipe = self.get_pipe(comm_id) if comm_pipe is None: # error context.set_code(grpc.StatusCode.INTERNAL) context.set_details("Error: comm_id does not exist.") return Nothing() # relay message comm_pipe.relay(request.content) return Nothing() if __name__ == "__main__": server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) add_NotifierCommunicationServicer_to_server( NotifierService(), server) server.add_insecure_port('[::]:8080') server.start() print("Server started...") server.wait_for_termination()