modapi

🚀 Direct Modbus RTU Communication API - Bezpośrednia komunikacja z urządzeniami Modbus przez port szeregowy.

✨ Kluczowe cechy

🆚 Dlaczego nowa wersja?

Aspekt Stara wersja (PyModbus) Nowa wersja (RTU)
Komunikacja z sprzętem ❌ Nie działała Działa niezawodnie
Auto-detekcja ❌ Zwracała błędy Znajduje urządzenia
Odczyt/zapis cewek ❌ Błędy komunikacji 100% sprawne
Obsługa CRC ❌ Tylko standardowa Zaawansowana dla Waveshare
Odporność na błędy ❌ Niska Wysoka z auto-korektą
Logowanie ❌ Niejasne błędy Szczegółowe logi
Testy ❌ Zawodne Wszystkie przechodzą
Dokumentacja ❌ Nieaktualna Kompletna + przykłady

🔧 Szybki start

Wymagania

Instalacja

# Sklonuj repozytorium
git clone https://github.com/yourusername/modapi.git
cd modapi

# Utwórz środowisko wirtualne
python -m venv venv
source venv/bin/activate  # Linux/Mac
# lub: venv\Scripts\activate  # Windows

# Zainstaluj zależności
pip install -r requirements.txt
# lub użyj Poetry:
poetry install && poetry shell

⚡ Natychmiastowe uruchomienie

1. Test komunikacji RTU:

python -c "from api.rtu import ModbusRTU; client = ModbusRTU(); print('Config:', client.auto_detect())"

2. Uruchom serwer web:

python run_rtu_output.py
# Otwórz http://localhost:5005 w przeglądarce

3. Przykłady użycia:

python examples/rtu_usage.py

🧪 Development i testowanie

Uruchom testy

# Wszystkie testy RTU
python -m pytest tests/test_rtu.py -v

# Z pokryciem kodu
python -m pytest tests/test_rtu.py --cov=api.rtu

# Test z rzeczywistym sprzętem (opcjonalny)
python -c "from tests.test_rtu import TestIntegration; TestIntegration().test_real_hardware_connection()"

Debugowanie komunikacji

# Szczegółowe logi komunikacji
python -c "
import logging
logging.basicConfig(level=logging.DEBUG)
from api.rtu import ModbusRTU
client = ModbusRTU()
config = client.auto_detect()
print('Debug config:', config)
"

Budowanie i publikacja

# Budowa pakietu
poetry build

# Publikacja do PyPI
poetry publish --build

🔍 Troubleshooting

Problem: Nie można znaleźć urządzenia

# Sprawdź dostępne porty szeregowe
ls -la /dev/tty{ACM,USB}*

# Sprawdź uprawnienia (dodaj użytkownika do grupy dialout)
sudo usermod -a -G dialout $USER
# Wyloguj się i zaloguj ponownie

# Test ręczny z różnymi prędkościami
python -c "
from api.rtu import ModbusRTU
for baud in [9600, 19200, 38400]:
    client = ModbusRTU('/dev/ttyACM0', baud)
    if client.connect():
        success, result = client.test_connection(1)
        print(f'{baud} baud: {success} - {result}')
        client.disconnect()
"

Problem: Błędy komunikacji i CRC

# Sprawdź parametry szeregowe urządzenia w dokumentacji
# Typowe ustawienia: 8N1 (8 bitów danych, bez parzystości, 1 bit stopu)
# Może wymagać innych ustawień: 8E1, 8O1, itp.

# Włącz szczegółowe logowanie dla debugowania CRC
python -c "
import logging
logging.basicConfig(level=logging.DEBUG)
from api.rtu import ModbusRTU
client = ModbusRTU('/dev/ttyACM0')
client.connect()
# Dla urządzeń Waveshare - moduł automatycznie obsługuje alternatywne CRC
result = client.read_coils(1, 0, 8)
print(f'Odczyt cewek z obsługą alternatywnego CRC: {result}')
client.disconnect()
"

Problem: Urządzenia Waveshare zwracają błędy funkcji

# Moduł RTU zawiera specjalną obsługę dla urządzeń Waveshare
# Automatycznie obsługuje:
# - Alternatywne obliczenia CRC
# - Niezgodności ID jednostki (broadcast, exception responses)
# - Mapowanie kodów funkcji
# - Szczegółowe komunikaty błędów dla wyjątków Modbus

# Test z włączonym debugowaniem
python -c "
import logging
logging.basicConfig(level=logging.DEBUG)
from api.rtu import ModbusRTU
client = ModbusRTU('/dev/ttyACM0')
client.connect()
# Próba odczytu rejestrów wejściowych (może zwrócić wyjątek na niektórych urządzeniach)
result = client.read_input_registers(1, 0, 4)
print(f'Wynik z obsługą wyjątków Waveshare: {result}')
client.disconnect()
"

The simulator will start with these test values:

  1. Configure your .env file to use the virtual port:
    MODBUS_PORT=/tmp/ttyp0
    MODBUS_BAUDRATE=9600
    MODBUS_TIMEOUT=0.1
    
  2. You can now run the API server or CLI commands to interact with the simulator.

Usage

Command Line Interface

The modapi CLI supports multiple subcommands:

# Direct command execution
modapi cmd wc 0 1       # Write value 1 to coil at address 0
modapi cmd rc 0 8       # Read 8 coils starting at address 0
modapi cmd rh 0 5       # Read 5 holding registers starting at address 0
modapi cmd wh 0 42      # Write value 42 to holding register at address 0

# Interactive shell
modapi shell

# REST API server
modapi rest --host 0.0.0.0 --port 5005

# MQTT client
modapi mqtt --broker localhost --port 1883

# Scan for Modbus devices
modapi scan

# With options
modapi cmd --verbose rc 0 8    # Verbose mode
modapi cmd --modbus-port /dev/ttyACM0 wc 0 1  # Specify port

For backward compatibility, you can also use the direct command format:

# These are automatically converted to the new format
./run_cli.py wc 0 1       # Equivalent to: modapi cmd wc 0 1
./run_cli.py rc 0 8       # Equivalent to: modapi cmd rc 0 8

REST API

from modapi.api.rest import create_rest_app

# Create and run Flask app
app = create_rest_app(port='/dev/ttyACM0', api_port=5005)

🌐 REST API Server

# Uruchom serwer RTU
python run_rtu_output.py

# API endpoints:
# GET  /status              - status połączenia RTU
# GET  /coil/<address>      - odczyt cewki
# POST /coil/<address>      - zapis cewki (JSON: {"state": true})
# GET  /coils               - odczyt wszystkich cewek 0-15
# GET  /registers/<address> - odczyt rejestru

📁 Przykłady curl

# Sprawdź status
curl http://localhost:5005/status

# Odczytaj cewkę 0
curl http://localhost:5005/coil/0

# Ustaw cewkę 0 na TRUE
curl -X POST http://localhost:5005/coil/0 \
     -H "Content-Type: application/json" \
     -d '{"state": true}'

# Odczytaj wszystkie cewki
curl http://localhost:5005/coils

🔧 Zaawansowane użycie

from api.rtu import ModbusRTU
import time

# Niestandardowa konfiguracja
client = ModbusRTU(
    port='/dev/ttyACM0',
    baudrate=19200,
    timeout=2.0,
    parity='E',  # Even parity
    stopbits=1
)

if client.connect():
    # Monitorowanie zmian cewek
    previous_states = None
    
    for _ in range(10):  # Monitoruj przez 10 iteracji
        current_states = client.read_coils(1, 0, 4)
        
        if current_states and current_states != previous_states:
            print(f"{time.strftime('%H:%M:%S')} - Zmiana: {current_states}")
            previous_states = current_states
            
        time.sleep(1)
    
    client.disconnect()

MQTT API

from modapi.api.mqtt import start_mqtt_broker

# Start MQTT client
start_mqtt_broker(
    port='/dev/ttyACM0',
    broker='localhost',
    mqtt_port=1883,
    topic_prefix='modbus'
)

MQTT Topics

Direct API Usage

from modapi.api.cmd import execute_command
from modapi.api.shell import interactive_mode

# Execute a command directly
success, response = execute_command('wc', ['0', '1'], port='/dev/ttyACM0')
print(response)

# Start interactive mode
interactive_mode(port='/dev/ttyACM0', verbose=True)

Project Structure

modapi/
├── api/
│   ├── __init__.py    # Exports main API functions
│   ├── cmd.py         # Direct command execution
│   ├── mqtt.py        # MQTT broker client
│   ├── rest.py        # REST API Flask app
│   └── shell.py       # Interactive shell
├── client.py          # Modbus client implementation
├── __main__.py        # CLI entry point
└── ...

Output Moduł [output.py]

Moduł output odpowiada za wizualizację i przetwarzanie stanów wyjść cyfrowych (cewek) w systemie Modbus. Zapewnia funkcje do parsowania i wyświetlania stanów wyjść w formie interaktywnego widżetu SVG.

parse_coil_status(text: str) -> Tuple[Optional[int], Optional[bool]]

Opis:
Parsuje wiadomość o stanie cewki i zwraca jej adres oraz status.

Parametry:

Zwraca:

Przykład użycia: ```python address, status = parse_coil_status(“Coil 3 set to ON”)

address = 3, status = True

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details. python -m modapi scan –ports /dev/ttyACM0 –baudrates 9600,19200 –unit-ids 0,1,2,247 –debug modapi scan –ports /dev/ttyACM0 –baudrates 9600,19200 –unit-ids 0,1,247 –debug