From eec554164c93b76b75638effb8955965b37382cd Mon Sep 17 00:00:00 2001
From: Martin Wistauder <mwistauder@student.tugraz.at>
Date: Sun, 29 May 2022 12:56:03 +0200
Subject: [PATCH] implemented python server and client

---
 README.md               |   4 +-
 client/python/client.py |  61 ++++++++--------
 server/server.py        | 152 ++++++++++++++++++++++++++++++++++++----
 shared/netcode.proto    |   2 +-
 4 files changed, 173 insertions(+), 46 deletions(-)

diff --git a/README.md b/README.md
index 6a2f359..409ade6 100644
--- a/README.md
+++ b/README.md
@@ -17,8 +17,8 @@ A and B in distinct networks.
     - [x] Exchange messages server<->client using protobuf
 - [x] Android-Client
     - [x] Grab notifications
-    - [ ] Communicate with grpc service
-- [ ] Create protocol
+    - [x] Communicate with grpc service
+- [x] Create protocol
 
 # Prototype 2:
 
diff --git a/client/python/client.py b/client/python/client.py
index 109822c..1ba7551 100644
--- a/client/python/client.py
+++ b/client/python/client.py
@@ -2,7 +2,6 @@ import sys
 sys.path.insert(0, '../..')
 sys.path.insert(0, '../../shared/netcode')
 
-import queue
 import grpc
 from shared.netcode.netcode_pb2 import *
 from shared.netcode.netcode_pb2_grpc import NotifierCommunicationStub
@@ -10,9 +9,30 @@ from shared.netcode.netcode_pb2_grpc import NotifierCommunicationStub
 usage = "Usage:\n- Sender:\n  ./client.py 1\n- Receiver:\n  ./client.py 0"
 
 
-def sendMessage(stub, msg):
-    request = Message(content=msg)
-    response = stub.SendMessage(request)
+def sender_code(id_code):
+    comm_response = stub.Open(OpenRequest(id_code=id_code))
+    comm_id = comm_response.comm_id
+    print("CommID:", comm_id)
+
+    stub.SendMessage(Message(comm_id=comm_id, content="init test"))
+
+    print("Starting Prompt:")
+    while True:
+        text = input()
+        if text == "":
+            print("Stopping...")
+            break
+        print("Sending:", text)
+        stub.SendMessage(Message(comm_id=comm_id, content=text))
+
+
+def receiver_code(id_code):
+    comm_response = stub.Answer(AnswerRequest(id_code=id_code))
+    comm_id = comm_response.comm_id
+    print("CommID:", comm_id)
+
+    for message in stub.ReceiveMessage(ReceiveRequest(comm_id=comm_id)):
+        print("Received:", message.content)
 
 
 if __name__ == "__main__":
@@ -25,7 +45,7 @@ if __name__ == "__main__":
     ########################################################################
     # SETTINGS
     ########################################################################
-    is_sender = sys.argv[1]
+    is_sender = sys.argv[1] == "1"
     ########################################################################
 
     # print info
@@ -40,26 +60,11 @@ if __name__ == "__main__":
 
         stub = NotifierCommunicationStub(channel)
 
-        if is_sender:
-            response = stub.Open(OpenRequest(id_code=1))
-            print(response)
-
-            sendQueue = queue.SimpleQueue()
-            response_future = stub.SendMessage.future(iter(sendQueue.get, None))
-            sendQueue.put_nowait(Message(content="init test"))
-
-            print("Starting Prompt:")
-            while True:
-                text = input()
-                if text == "":
-                    print("Stopping...")
-                    break
-                print("Sending:", text)
-                sendQueue.put_nowait(Message(content=text))
-
-        else:
-            response = stub.Answer(AnswerRequest(challenge=1))
-            print(response)
-
-            for message in stub.ReceiveMessage(Nothing()):
-                print(message.content)
+        try:
+            common_id = 1
+            sender_code(common_id) if is_sender else receiver_code(common_id)
+        except grpc.RpcError as e:
+            print("Error: A grpc error occurred as", "SENDER" if is_sender else "RECEIVER")
+            print("ErrorCode:", e.code().name)
+            print("ErrorMessage:", e.details())
+            exit(1)
diff --git a/server/server.py b/server/server.py
index f95e0fd..344fafb 100644
--- a/server/server.py
+++ b/server/server.py
@@ -1,33 +1,154 @@
 import sys
+
+import grpc
+
 sys.path.insert(0, '..')
 sys.path.insert(0, '../shared/netcode')
 
+from queue import SimpleQueue
+from threading import Condition, Lock
 from concurrent import futures
 from shared.netcode.netcode_pb2 import *
 from shared.netcode.netcode_pb2_grpc import *
 
 
+PREFIX = "Server>"
+
+start = 0
+def generate_unique_comm_id():
+    global start
+    start = start + 1
+    return start
+
+
 class NotifierService(NotifierCommunicationServicer):
+    class InternalOpenRequest:
+        def __init__(self, id_code) -> None:
+            self.id_code = id_code
+            self.cv = Condition()
+            self.comm_id = None
+
+        def fulfill(self):
+            self.cv.acquire()
+            self.comm_id = generate_unique_comm_id()
+            self.cv.notify_all()
+            self.cv.release()
+
+    class CommPipe:
+        def __init__(self) -> None:
+            self.cv = Condition()
+            self.queue = SimpleQueue()
+
+        def relay(self, message):
+            self.cv.acquire()
+            self.queue.put(message)
+            self.cv.notify_all()
+            self.cv.release()
+            pass
+
+        def wait_for_message(self):
+            self.cv.acquire()
+            self.cv.wait()
+            message = self.queue.get()
+            self.cv.release()
+            return message
+
+    def __init__(self) -> None:
+        super().__init__()
+        self.open_ids = []
+        self.open_ids_lock = Lock()
+        self.comm_pipes = {}
+        self.comm_pipes_lock = Lock()
+
+    def create_pipe(self, comm_id):
+        pipe = self.CommPipe()
+        self.comm_pipes_lock.acquire()
+        self.comm_pipes[comm_id] = pipe
+        self.comm_pipes_lock.release()
+        return pipe
+
+    def get_pipe(self, comm_id):
+        self.comm_pipes_lock.acquire()
+        if comm_id in self.comm_pipes:
+            pipe = self.comm_pipes[comm_id]
+            self.comm_pipes_lock.release()
+            return pipe
+        else:
+            self.comm_pipes_lock.release()
+            return None
+
+    ###########################################################################
+    # RPCs
+    ###########################################################################
+
     def Open(self, request, context):
-        # TODO: append id_code to list
-        # TODO: wait for id_code answered
-        # TODO: return comm_id
-        return CommResponse()
+        # append id_code to list
+        comm_request = self.InternalOpenRequest(request.id_code)
+        comm_request.cv.acquire()
+        self.open_ids_lock.acquire()
+        self.open_ids.append(comm_request)
+        print(PREFIX, "OpenRequest created.")
+        self.open_ids_lock.release()
+        # wait for id_code answered
+        comm_request.cv.wait()
+        # remove request
+        self.open_ids_lock.acquire()
+        self.open_ids.remove(comm_request)
+        self.open_ids_lock.release()
+        comm_request.cv.release()
+        # return comm_id
+        print(PREFIX, "OpenResponse sent.")
+        return CommResponse(comm_id=comm_request.comm_id)
 
     def Answer(self, request, context):
-        # TODO: check if if_code can be answered
-        # TODO: return comm_id
-        return CommResponse()
+        print(PREFIX, "AnswerRequest received.")
+        code = request.id_code
+        comm_id = None
+        # check if if_code can be answered
+        self.open_ids_lock.acquire()
+        for open_req in self.open_ids:
+            if code == open_req.id_code:
+                # found
+                open_req.fulfill()
+                comm_id = open_req.comm_id
+                print(PREFIX, "OpenRequest answered.")
+                break
+        self.open_ids_lock.release()
+        # return comm_id
+        if comm_id is None:
+            # error
+            context.set_code(grpc.StatusCode.NOT_FOUND)
+            context.set_details("Error: Open request not found.")
+            return CommResponse()
+        else:
+            print(PREFIX, "AnswerResponse sent.")
+            return CommResponse(comm_id=comm_id)
 
     def ReceiveMessage(self, request, context):
-        # TODO: read comm_id and wait for messages
-        yield Message(comm_id=-1, content="test")
-
-    def SendMessage(self, request_iterator, context):
-        for msg in request_iterator:
-            # TODO: read comm_id and route message
-            print(msg.comm_id)
-            print(msg.content)
+        comm_id = request.comm_id
+        # create pipe
+        comm_pipe = self.create_pipe(comm_id)
+        while True:
+            # TODO: add stop condition
+            # wait for message
+            message = comm_pipe.wait_for_message()
+            # echo message
+            yield Message(comm_id=comm_id, content=message)
+
+    def SendMessage(self, request, context):
+        comm_id = request.comm_id
+        print("CommID:", request.comm_id, "| Content:", request.content)
+
+        # get pipe
+        comm_pipe = self.get_pipe(comm_id)
+        if comm_pipe is None:
+            # error
+            context.set_code(grpc.StatusCode.INTERNAL)
+            context.set_details("Error: comm_id does not exist.")
+            return Nothing()
+
+        # relay message
+        comm_pipe.relay(request.content)
         return Nothing()
 
 
@@ -37,4 +158,5 @@ if __name__ == "__main__":
         NotifierService(), server)
     server.add_insecure_port('[::]:8080')
     server.start()
+    print("Server started...")
     server.wait_for_termination()
diff --git a/shared/netcode.proto b/shared/netcode.proto
index bae788f..3386e73 100644
--- a/shared/netcode.proto
+++ b/shared/netcode.proto
@@ -4,7 +4,7 @@ package netcode;
 service NotifierCommunication {
   rpc Open (OpenRequest) returns (CommResponse);
   rpc Answer (AnswerRequest) returns (CommResponse);
-  rpc SendMessage (stream Message) returns (Nothing);
+  rpc SendMessage (Message) returns (Nothing);
   rpc ReceiveMessage (ReceiveRequest) returns (stream Message);
 }
 
-- 
GitLab