feat: added class CommandsRobot, func command_to_robot, fuctional for send and get answer robot

This commit is contained in:
Arduinum628
2025-06-02 12:24:30 +03:00
parent d567835bcc
commit 3d7a7a64db
2 changed files with 62 additions and 20 deletions

View File

@@ -1,8 +1,8 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Класс для данных конфига"""
class ModelConfig(BaseSettings):
"""Модель конфига"""
model_config = SettingsConfigDict(
env_file = '.env',
@@ -10,7 +10,27 @@ class Settings(BaseSettings):
extra='ignore'
)
class CommandsRobot(ModelConfig):
"""Класс с командами для робота"""
forward: str
backward: str
left: str
right: str
def get_list_commands(self):
"""Метод вернёт список всех команд"""
return list(self.model_dump().values())
class Settings(ModelConfig):
"""Класс для данных конфига"""
stream_url: str
websocket_url_robot: str
commands_robot: CommandsRobot = CommandsRobot()
settings = Settings()

View File

@@ -1,10 +1,10 @@
from fastapi import APIRouter, WebSocket
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, WebSocketException
from fastapi.requests import Request
from fastapi.responses import HTMLResponse, Response
from fastapi.templating import Jinja2Templates
from starlette.websockets import WebSocketDisconnect
import httpx
from websockets import exceptions, connect
import asyncio
import socket
from web_robot_control.settings import settings
@@ -35,27 +35,49 @@ async def get_config() -> dict:
return {'stream_url': settings.stream_url}
async def command_to_robot(command: str) -> str:
"""Асинхронная функция для отправки команды роботу через websockets"""
try:
async with connect(settings.websocket_url_robot) as robot_ws:
await robot_ws.send(command)
response = await robot_ws.recv()
return response
# Todo: для каждой ошибки написать своё сообщение
except (
exceptions.InvalidURI,
asyncio.TimeoutError,
exceptions.ConnectionClosedError,
exceptions.ConnectionClosedOK,
exceptions.InvalidHandshake,
ConnectionRefusedError,
socket.gaierror,
exceptions.InvalidMessage
) as err:
return f'{err.__class__.__name__}: {err}'
@router.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket) -> None:
# Установка содединения по веб-сокету
await websocket.accept()
try:
async with httpx.AsyncClient() as client:
while True:
# Получение команды от клиента (с веб-сокета)
command = await websocket.receive_text()
print(f'Получена команда: {command}')
while True:
# Получение команды от клиента (с веб-сокета)
command = await websocket.receive_text()
valid_commands = settings.commands_robot.get_list_commands()
# Todo: здесь будет логика валидации команд
# Todo: здесь будет логика обработки команды
if command in valid_commands:
# оптравка команды роботу
robot_answer = await command_to_robot(command=command)
if robot_answer:
# отправка ответа робота на вебсокет фронтенда
await websocket.send_text(f'Получена команда: {command}, ответ робота: {robot_answer}')
print(f'Ответ робота: {robot_answer}')
except WebSocketDisconnect:
print('WebSocket отключен') # Todo: для вывода ошибок будет настроен logger
# Todo: вместо Exception будут добавлена ловля других ошибок
# (после того как функция будет полностью дописана)
except Exception as err:
err_text = f'Ошибка: {str(err)}'
await websocket.send_text(err_text)
print(err_text)
# Todo: для каждой ошибки написать своё сообщение
except (WebSocketException, exceptions.InvalidMessage) as err:
print(f'{err.__class__.__name__}: {err}')