import Robot import asyncio; import json import websockets; import sys; import Container from PWMDriver import PWMDriver robot = Robot.Robot() #robot = Robot.Robot() #t = robot.Translation #r = robot.Rotation #sd = t.sd #v = t.droit #d = t.trv class commandHandler: def ping(self): return "Pong" """def set_vector(self,vector_type:str,vector_args): print(f"{vector_type=}, {vector_args=}") for i in range(len(vector_args)): vector_args[i]= float(vector_args[i]) if vector_type =='trans': if abs(vector_args[0])>=0.1 and abs(vector_args[1])<=0.1: v((vector_args[0]*10)) if abs(vector_args[1])>=0.1 and abs(vector_args[0])<=0.1: sd((vector_args[1]*10)) if abs(vector_args[1])>=0.1 and abs(vector_args[0])>=0.1: if vector_args[0]>0: d((vector_args[0]+vector_args[1]//2)*10,True) elif vector_args[0]<0: d((vector_args[0]+vector_args[1]//2)*10,False) if vector_type == 'rot': r(vector_args[1]*10) return f"{vector_type=},{vector_args=}" """ speed=0 def set_direction(self,arg): try: return robot.translation(arg,self.speed) except Exception as e: return 'error' def stop(self): robot.stop() return 'stopped' def init(): robot.init() return 'initialised' def set_rotation(self, arg): try: robot.rotation(arg,self.speed) except Exception as e: raise e def set_speed(self,v): if v>=50: v = 50 else: v=v self.speed=v return 'speed set to '+str(v) def set_range(self, name:str, mini:int, maxi:int): robot.set_range(name, mini, maxi) cmd_handler = commandHandler() def commands(data): print('commands:') print(data) if not data: data = { } else: data = json.loads(data) if 'STOP' in data: if data['STOP']: sys.exit() try: if(data['cmd']=='set_direction'): res= cmd_handler.set_direction(data["args"]['direction']) elif(data['cmd']=='set_rotation'): res= cmd_handler.set_rotation(data["args"]['rotation']) elif(data['cmd']=='stop_robot'): res=cmd_handler.stop() elif(data['cmd']=='set_range'): res=cmd_handler.set_range(data['args']['name'],data['args']['min'],data['args']['max']) elif(data['cmd']=='init'): res=cmd_handler.init() elif(data['cmd']=='set_speed'): res=cmd_handler.set_speed(data["args"]["speed"]) else: return 'NOPE!!!' except Exception as e: return e return res async def handler(websocket, path): print('new handler created at path'+path) while True: try : data = await websocket.recv() except websockets.exceptions.ConnectionClosedOK: break try: res= commands(data) except Exception as e: await websocket.send(json.dumps({"type":"error","message":e})) print(e) await websocket.send(json.dumps({"type":"success","message":res})) print('server running at 0.0.0.0 with port 1567') start_server = websockets.serve(handler, "0.0.0.0", 1567) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()