Skip to content
Snippets Groups Projects
Commit eec55416 authored by Wistauder, Martin's avatar Wistauder, Martin
Browse files

implemented python server and client

parent 11caad9e
No related branches found
No related tags found
No related merge requests found
......@@ -17,8 +17,8 @@ A and B in distinct networks.
- [x] Exchange messages server<->client using protobuf
- [x] Android-Client
- [x] Grab notifications
- [ ] Communicate with grpc service
- [ ] Create protocol
- [x] Communicate with grpc service
- [x] Create protocol
# Prototype 2:
......
......@@ -2,7 +2,6 @@ import sys
sys.path.insert(0, '../..')
sys.path.insert(0, '../../shared/netcode')
import queue
import grpc
from shared.netcode.netcode_pb2 import *
from shared.netcode.netcode_pb2_grpc import NotifierCommunicationStub
......@@ -10,9 +9,30 @@ from shared.netcode.netcode_pb2_grpc import NotifierCommunicationStub
usage = "Usage:\n- Sender:\n ./client.py 1\n- Receiver:\n ./client.py 0"
def sendMessage(stub, msg):
request = Message(content=msg)
response = stub.SendMessage(request)
def sender_code(id_code):
comm_response = stub.Open(OpenRequest(id_code=id_code))
comm_id = comm_response.comm_id
print("CommID:", comm_id)
stub.SendMessage(Message(comm_id=comm_id, content="init test"))
print("Starting Prompt:")
while True:
text = input()
if text == "":
print("Stopping...")
break
print("Sending:", text)
stub.SendMessage(Message(comm_id=comm_id, content=text))
def receiver_code(id_code):
comm_response = stub.Answer(AnswerRequest(id_code=id_code))
comm_id = comm_response.comm_id
print("CommID:", comm_id)
for message in stub.ReceiveMessage(ReceiveRequest(comm_id=comm_id)):
print("Received:", message.content)
if __name__ == "__main__":
......@@ -25,7 +45,7 @@ if __name__ == "__main__":
########################################################################
# SETTINGS
########################################################################
is_sender = sys.argv[1]
is_sender = sys.argv[1] == "1"
########################################################################
# print info
......@@ -40,26 +60,11 @@ if __name__ == "__main__":
stub = NotifierCommunicationStub(channel)
if is_sender:
response = stub.Open(OpenRequest(id_code=1))
print(response)
sendQueue = queue.SimpleQueue()
response_future = stub.SendMessage.future(iter(sendQueue.get, None))
sendQueue.put_nowait(Message(content="init test"))
print("Starting Prompt:")
while True:
text = input()
if text == "":
print("Stopping...")
break
print("Sending:", text)
sendQueue.put_nowait(Message(content=text))
else:
response = stub.Answer(AnswerRequest(challenge=1))
print(response)
for message in stub.ReceiveMessage(Nothing()):
print(message.content)
try:
common_id = 1
sender_code(common_id) if is_sender else receiver_code(common_id)
except grpc.RpcError as e:
print("Error: A grpc error occurred as", "SENDER" if is_sender else "RECEIVER")
print("ErrorCode:", e.code().name)
print("ErrorMessage:", e.details())
exit(1)
import sys
import grpc
sys.path.insert(0, '..')
sys.path.insert(0, '../shared/netcode')
from queue import SimpleQueue
from threading import Condition, Lock
from concurrent import futures
from shared.netcode.netcode_pb2 import *
from shared.netcode.netcode_pb2_grpc import *
PREFIX = "Server>"
start = 0
def generate_unique_comm_id():
global start
start = start + 1
return start
class NotifierService(NotifierCommunicationServicer):
class InternalOpenRequest:
def __init__(self, id_code) -> None:
self.id_code = id_code
self.cv = Condition()
self.comm_id = None
def fulfill(self):
self.cv.acquire()
self.comm_id = generate_unique_comm_id()
self.cv.notify_all()
self.cv.release()
class CommPipe:
def __init__(self) -> None:
self.cv = Condition()
self.queue = SimpleQueue()
def relay(self, message):
self.cv.acquire()
self.queue.put(message)
self.cv.notify_all()
self.cv.release()
pass
def wait_for_message(self):
self.cv.acquire()
self.cv.wait()
message = self.queue.get()
self.cv.release()
return message
def __init__(self) -> None:
super().__init__()
self.open_ids = []
self.open_ids_lock = Lock()
self.comm_pipes = {}
self.comm_pipes_lock = Lock()
def create_pipe(self, comm_id):
pipe = self.CommPipe()
self.comm_pipes_lock.acquire()
self.comm_pipes[comm_id] = pipe
self.comm_pipes_lock.release()
return pipe
def get_pipe(self, comm_id):
self.comm_pipes_lock.acquire()
if comm_id in self.comm_pipes:
pipe = self.comm_pipes[comm_id]
self.comm_pipes_lock.release()
return pipe
else:
self.comm_pipes_lock.release()
return None
###########################################################################
# RPCs
###########################################################################
def Open(self, request, context):
# TODO: append id_code to list
# TODO: wait for id_code answered
# TODO: return comm_id
return CommResponse()
# append id_code to list
comm_request = self.InternalOpenRequest(request.id_code)
comm_request.cv.acquire()
self.open_ids_lock.acquire()
self.open_ids.append(comm_request)
print(PREFIX, "OpenRequest created.")
self.open_ids_lock.release()
# wait for id_code answered
comm_request.cv.wait()
# remove request
self.open_ids_lock.acquire()
self.open_ids.remove(comm_request)
self.open_ids_lock.release()
comm_request.cv.release()
# return comm_id
print(PREFIX, "OpenResponse sent.")
return CommResponse(comm_id=comm_request.comm_id)
def Answer(self, request, context):
# TODO: check if if_code can be answered
# TODO: return comm_id
return CommResponse()
print(PREFIX, "AnswerRequest received.")
code = request.id_code
comm_id = None
# check if if_code can be answered
self.open_ids_lock.acquire()
for open_req in self.open_ids:
if code == open_req.id_code:
# found
open_req.fulfill()
comm_id = open_req.comm_id
print(PREFIX, "OpenRequest answered.")
break
self.open_ids_lock.release()
# return comm_id
if comm_id is None:
# error
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details("Error: Open request not found.")
return CommResponse()
else:
print(PREFIX, "AnswerResponse sent.")
return CommResponse(comm_id=comm_id)
def ReceiveMessage(self, request, context):
# TODO: read comm_id and wait for messages
yield Message(comm_id=-1, content="test")
def SendMessage(self, request_iterator, context):
for msg in request_iterator:
# TODO: read comm_id and route message
print(msg.comm_id)
print(msg.content)
comm_id = request.comm_id
# create pipe
comm_pipe = self.create_pipe(comm_id)
while True:
# TODO: add stop condition
# wait for message
message = comm_pipe.wait_for_message()
# echo message
yield Message(comm_id=comm_id, content=message)
def SendMessage(self, request, context):
comm_id = request.comm_id
print("CommID:", request.comm_id, "| Content:", request.content)
# get pipe
comm_pipe = self.get_pipe(comm_id)
if comm_pipe is None:
# error
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details("Error: comm_id does not exist.")
return Nothing()
# relay message
comm_pipe.relay(request.content)
return Nothing()
......@@ -37,4 +158,5 @@ if __name__ == "__main__":
NotifierService(), server)
server.add_insecure_port('[::]:8080')
server.start()
print("Server started...")
server.wait_for_termination()
......@@ -4,7 +4,7 @@ package netcode;
service NotifierCommunication {
rpc Open (OpenRequest) returns (CommResponse);
rpc Answer (AnswerRequest) returns (CommResponse);
rpc SendMessage (stream Message) returns (Nothing);
rpc SendMessage (Message) returns (Nothing);
rpc ReceiveMessage (ReceiveRequest) returns (stream Message);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment