🚀 Direct Modbus RTU Communication API - Bezpośrednia komunikacja z urządzeniami Modbus przez port szeregowy.
/dev/ttyACM0
Aspekt | Stara wersja (PyModbus) | Nowa wersja (RTU) |
---|---|---|
Komunikacja z sprzętem | ❌ Nie działała | ✅ Działa niezawodnie |
Auto-detekcja | ❌ Zwracała błędy | ✅ Znajduje urządzenia |
Odczyt/zapis cewek | ❌ Błędy komunikacji | ✅ 100% sprawne |
Obsługa CRC | ❌ Tylko standardowa | ✅ Zaawansowana dla Waveshare |
Odporność na błędy | ❌ Niska | ✅ Wysoka z auto-korektą |
Logowanie | ❌ Niejasne błędy | ✅ Szczegółowe logi |
Testy | ❌ Zawodne | ✅ Wszystkie przechodzą |
Dokumentacja | ❌ Nieaktualna | ✅ Kompletna + przykłady |
/dev/ttyACM0
lub /dev/ttyUSB0
dialout
)# Sklonuj repozytorium
git clone https://github.com/yourusername/modapi.git
cd modapi
# Utwórz środowisko wirtualne
python -m venv venv
source venv/bin/activate # Linux/Mac
# lub: venv\Scripts\activate # Windows
# Zainstaluj zależności
pip install -r requirements.txt
# lub użyj Poetry:
poetry install && poetry shell
1. Test komunikacji RTU:
python -c "from api.rtu import ModbusRTU; client = ModbusRTU(); print('Config:', client.auto_detect())"
2. Uruchom serwer web:
python run_rtu_output.py
# Otwórz http://localhost:5005 w przeglądarce
3. Przykłady użycia:
python examples/rtu_usage.py
# Wszystkie testy RTU
python -m pytest tests/test_rtu.py -v
# Z pokryciem kodu
python -m pytest tests/test_rtu.py --cov=api.rtu
# Test z rzeczywistym sprzętem (opcjonalny)
python -c "from tests.test_rtu import TestIntegration; TestIntegration().test_real_hardware_connection()"
# Szczegółowe logi komunikacji
python -c "
import logging
logging.basicConfig(level=logging.DEBUG)
from api.rtu import ModbusRTU
client = ModbusRTU()
config = client.auto_detect()
print('Debug config:', config)
"
# Budowa pakietu
poetry build
# Publikacja do PyPI
poetry publish --build
# Sprawdź dostępne porty szeregowe
ls -la /dev/tty{ACM,USB}*
# Sprawdź uprawnienia (dodaj użytkownika do grupy dialout)
sudo usermod -a -G dialout $USER
# Wyloguj się i zaloguj ponownie
# Test ręczny z różnymi prędkościami
python -c "
from api.rtu import ModbusRTU
for baud in [9600, 19200, 38400]:
client = ModbusRTU('/dev/ttyACM0', baud)
if client.connect():
success, result = client.test_connection(1)
print(f'{baud} baud: {success} - {result}')
client.disconnect()
"
# Sprawdź parametry szeregowe urządzenia w dokumentacji
# Typowe ustawienia: 8N1 (8 bitów danych, bez parzystości, 1 bit stopu)
# Może wymagać innych ustawień: 8E1, 8O1, itp.
# Włącz szczegółowe logowanie dla debugowania CRC
python -c "
import logging
logging.basicConfig(level=logging.DEBUG)
from api.rtu import ModbusRTU
client = ModbusRTU('/dev/ttyACM0')
client.connect()
# Dla urządzeń Waveshare - moduł automatycznie obsługuje alternatywne CRC
result = client.read_coils(1, 0, 8)
print(f'Odczyt cewek z obsługą alternatywnego CRC: {result}')
client.disconnect()
"
# Moduł RTU zawiera specjalną obsługę dla urządzeń Waveshare
# Automatycznie obsługuje:
# - Alternatywne obliczenia CRC
# - Niezgodności ID jednostki (broadcast, exception responses)
# - Mapowanie kodów funkcji
# - Szczegółowe komunikaty błędów dla wyjątków Modbus
# Test z włączonym debugowaniem
python -c "
import logging
logging.basicConfig(level=logging.DEBUG)
from api.rtu import ModbusRTU
client = ModbusRTU('/dev/ttyACM0')
client.connect()
# Próba odczytu rejestrów wejściowych (może zwrócić wyjątek na niektórych urządzeniach)
result = client.read_input_registers(1, 0, 4)
print(f'Wynik z obsługą wyjątków Waveshare: {result}')
client.disconnect()
"
The simulator will start with these test values:
[1, 0, 1, 0]
[1234, 5678, 9012]
.env
file to use the virtual port:
MODBUS_PORT=/tmp/ttyp0
MODBUS_BAUDRATE=9600
MODBUS_TIMEOUT=0.1
The modapi CLI supports multiple subcommands:
# Direct command execution
modapi cmd wc 0 1 # Write value 1 to coil at address 0
modapi cmd rc 0 8 # Read 8 coils starting at address 0
modapi cmd rh 0 5 # Read 5 holding registers starting at address 0
modapi cmd wh 0 42 # Write value 42 to holding register at address 0
# Interactive shell
modapi shell
# REST API server
modapi rest --host 0.0.0.0 --port 5005
# MQTT client
modapi mqtt --broker localhost --port 1883
# Scan for Modbus devices
modapi scan
# With options
modapi cmd --verbose rc 0 8 # Verbose mode
modapi cmd --modbus-port /dev/ttyACM0 wc 0 1 # Specify port
For backward compatibility, you can also use the direct command format:
# These are automatically converted to the new format
./run_cli.py wc 0 1 # Equivalent to: modapi cmd wc 0 1
./run_cli.py rc 0 8 # Equivalent to: modapi cmd rc 0 8
from modapi.api.rest import create_rest_app
# Create and run Flask app
app = create_rest_app(port='/dev/ttyACM0', api_port=5005)
# Uruchom serwer RTU
python run_rtu_output.py
# API endpoints:
# GET /status - status połączenia RTU
# GET /coil/<address> - odczyt cewki
# POST /coil/<address> - zapis cewki (JSON: {"state": true})
# GET /coils - odczyt wszystkich cewek 0-15
# GET /registers/<address> - odczyt rejestru
# Sprawdź status
curl http://localhost:5005/status
# Odczytaj cewkę 0
curl http://localhost:5005/coil/0
# Ustaw cewkę 0 na TRUE
curl -X POST http://localhost:5005/coil/0 \
-H "Content-Type: application/json" \
-d '{"state": true}'
# Odczytaj wszystkie cewki
curl http://localhost:5005/coils
from api.rtu import ModbusRTU
import time
# Niestandardowa konfiguracja
client = ModbusRTU(
port='/dev/ttyACM0',
baudrate=19200,
timeout=2.0,
parity='E', # Even parity
stopbits=1
)
if client.connect():
# Monitorowanie zmian cewek
previous_states = None
for _ in range(10): # Monitoruj przez 10 iteracji
current_states = client.read_coils(1, 0, 4)
if current_states and current_states != previous_states:
print(f"{time.strftime('%H:%M:%S')} - Zmiana: {current_states}")
previous_states = current_states
time.sleep(1)
client.disconnect()
from modapi.api.mqtt import start_mqtt_broker
# Start MQTT client
start_mqtt_broker(
port='/dev/ttyACM0',
broker='localhost',
mqtt_port=1883,
topic_prefix='modbus'
)
modbus/command/#
to send commandsmodbus/request/#
to send requestsmodbus/command/write_coil
with payload {"address": 0, "value": true}
to write to a coilmodbus/request/read_coils
with payload {"address": 0, "count": 8}
to read coilsmodbus/result/<command>
and modbus/response/<request>
from modapi.api.cmd import execute_command
from modapi.api.shell import interactive_mode
# Execute a command directly
success, response = execute_command('wc', ['0', '1'], port='/dev/ttyACM0')
print(response)
# Start interactive mode
interactive_mode(port='/dev/ttyACM0', verbose=True)
modapi/
├── api/
│ ├── __init__.py # Exports main API functions
│ ├── cmd.py # Direct command execution
│ ├── mqtt.py # MQTT broker client
│ ├── rest.py # REST API Flask app
│ └── shell.py # Interactive shell
├── client.py # Modbus client implementation
├── __main__.py # CLI entry point
└── ...
Moduł output odpowiada za wizualizację i przetwarzanie stanów wyjść cyfrowych (cewek) w systemie Modbus. Zapewnia funkcje do parsowania i wyświetlania stanów wyjść w formie interaktywnego widżetu SVG.
Opis:
Parsuje wiadomość o stanie cewki i zwraca jej adres oraz status.
Parametry:
text
- Tekst wiadomości (np. ‘Coil 0 set to ON’ lub ‘Coil 5 set to OFF’)Zwraca:
address
(int) - Adres cewkiPrzykład użycia: ```python address, status = parse_coil_status(“Coil 3 set to ON”)
This project is licensed under the Apache 2.0 License - see the LICENSE file for details. python -m modapi scan –ports /dev/ttyACM0 –baudrates 9600,19200 –unit-ids 0,1,2,247 –debug modapi scan –ports /dev/ttyACM0 –baudrates 9600,19200 –unit-ids 0,1,247 –debug