Process System to modułowa i rozszerzalna platforma do przetwarzania tekstu z wieloma interfejsami usług, w tym gRPC, REST API i integracją z Model Context Protocol (MCP). System został zaprojektowany z myślą o elastyczności, umożliwiając łatwe rozszerzanie funkcjonalności i integrację z różnymi systemami.
System Process składa się z następujących głównych komponentów:
project/
├── process/ # Silnik Process
│ ├── process.py # Główna implementacja Process
│ ├── process_base.py # Klasa bazowa abstrakcyjna
│ └── plugin_system.py # System wtyczek
├── core/ # Rdzeń frameworka
│ ├── config.py # Podstawowa konfiguracja
│ ├── config_manager.py # Zaawansowane zarządzanie konfiguracją
│ ├── logging.py # Narzędzia logowania
│ ├── utils.py # Wspólne narzędzia
│ └── error_handling.py # Standardowa obsługa błędów
├── grpc/ # Usługa gRPC
│ ├── server.py # Implementacja serwera gRPC
│ ├── client.py # Klient gRPC
│ └── proto/ # Definicje Protocol Buffer
├── rest/ # Usługa REST API
│ ├── server.py # Implementacja serwera REST
│ └── client.py # Klient REST
└── mcp/ # Model Context Protocol
├── mcp_server.py # Serwer MCP
├── tools/ # Narzędzia MCP
└── resources/ # Zasoby MCP
Każdy komponent jest zaprojektowany tak, aby mógł działać niezależnie, umożliwiając elastyczne wdrażanie i skalowanie.
Najłatwiejszym sposobem rozpoczęcia pracy jest użycie skryptu konfiguracji środowiska deweloperskiego:
# Konfiguracja środowiska deweloperskiego
python dev_setup.py
# Uruchomienie usług
docker-compose up -d
Jeśli wolisz ręcznie skonfigurować środowisko:
# Instalacja zależności dla każdego komponentu
cd process && poetry install
cd ../grpc && poetry install
cd ../rest && poetry install
cd ../mcp && poetry install
# Konfiguracja hooków pre-commit
pip install pre-commit
pre-commit install
Każdy komponent ma plik .env.example, który można skopiować do .env i dostosować:
cp process/.env.example process/.env
cp grpc/.env.example grpc/.env
cp rest/.env.example rest/.env
cp mcp/.env.example mcp/.env
Process Engine to centralny komponent odpowiedzialny za przetwarzanie tekstu. Jest zbudowany wokół abstrakcyjnej klasy bazowej ProcessBase, która definiuje standardowy interfejs dla wszystkich implementacji Process.
from process.process import Process
# Inicjalizacja silnika Process
process = Process()
# Przetwarzanie tekstu
result = process.process_text("Tekst do przetworzenia", language="pl-PL")
# Dostęp do wyniku
output_data = result.data
output_format = result.format
Process Engine można skonfigurować za pomocą pliku konfiguracyjnego lub zmiennych środowiskowych:
from core.config_manager import create_config_manager
# Utworzenie menedżera konfiguracji dla komponentu
config = create_config_manager("process")
# Dostęp do wartości konfiguracyjnych
value = config.get("KEY", "default_value")
System Process udostępnia trzy główne interfejsy usług:
Interfejs gRPC zapewnia wydajną komunikację między usługami:
# Klient gRPC
from grpc.client import ProcessClient
client = ProcessClient("localhost:50051")
result = client.process_text("Tekst do przetworzenia", language="pl-PL")
Interfejs REST API zapewnia łatwą integrację z aplikacjami webowymi:
# Klient REST
from rest.client import ProcessClient
client = ProcessClient("http://localhost:5000")
result = client.process_text("Tekst do przetworzenia", language="pl-PL")
Interfejs MCP umożliwia integrację z dużymi modelami językowymi (LLM):
# Uruchomienie serwera MCP
python -m mcp.mcp_server
# Połączenie z serwerem MCP
# (zazwyczaj używane przez LLM poprzez protokół MCP)
System Process można rozszerzyć za pomocą wtyczek, które dodają nowe funkcje przetwarzania.
from process.process_base import ProcessBase
class CustomPlugin(ProcessBase):
"""Niestandardowa wtyczka przetwarzania."""
def __init__(self, config=None):
super().__init__(config)
# Inicjalizacja wtyczki
def process_text(self, text, **options):
# Implementacja przetwarzania tekstu
return processed_text
def get_resources(self):
# Zwraca dostępne zasoby dla tej wtyczki
return resources
from process.plugin_system import PluginRegistry
# Rejestracja wtyczki
registry = PluginRegistry()
registry.register_plugin("custom", CustomPlugin)
# Użycie wtyczki
plugin = registry.get_plugin("custom")
result = plugin.process_text("Tekst do przetworzenia")
Aby utworzyć nową implementację procesu:
ProcessBase:from process.process_base import ProcessBase
class MyProcess(ProcessBase):
"""Moja implementacja procesu."""
def __init__(self, config=None):
super().__init__(config)
# Inicjalizacja
def process_text(self, text, **options):
# Implementacja przetwarzania tekstu
return self.create_result(data, format="text")
def get_resources(self):
# Zwraca dostępne zasoby
return {"resources": [...]}
from process.plugin_system import PluginRegistry
registry = PluginRegistry()
registry.register_plugin("my_process", MyProcess)
Aby dodać nową wtyczkę:
process/plugins/my_plugin.py
from process.process_base import ProcessBase
class MyPlugin(ProcessBase):
"""Moja wtyczka przetwarzania."""
def process_text(self, text, **options):
# Implementacja przetwarzania
return self.create_result(processed_data, format="text")
__init__.py do katalogu wtyczek, który rejestruje wtyczkę:from process.plugin_system import PluginRegistry
from .my_plugin import MyPlugin
def register_plugins():
registry = PluginRegistry()
registry.register_plugin("my_plugin", MyPlugin)
Aby zaimplementować nowy interfejs usługi (np. WebSocket):
websocket/
├── __init__.py
├── server.py
└── client.py
import asyncio
import websockets
import json
import sys
import os
# Dodaj katalog nadrzędny do ścieżki
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from process.process import Process
from core.logging import get_logger
class WebSocketServer:
def __init__(self, host="0.0.0.0", port=6789):
self.host = host
self.port = port
self.logger = get_logger("websocket.server")
self.process = Process()
async def handle_client(self, websocket, path):
async for message in websocket:
try:
data = json.loads(message)
text = data.get("text", "")
options = data.get("options", {})
result = self.process.process_text(text, **options)
await websocket.send(json.dumps({
"result_id": result.result_id,
"format": result.format,
"data": result.data
}))
except Exception as e:
self.logger.error(f"Error: {e}")
await websocket.send(json.dumps({"error": str(e)}))
async def start(self):
self.server = await websockets.serve(
self.handle_client, self.host, self.port
)
self.logger.info(f"WebSocket server running on {self.host}:{self.port}")
await self.server.wait_closed()
def main():
server = WebSocketServer()
asyncio.run(server.start())
if __name__ == "__main__":
main()
import asyncio
import websockets
import json
import sys
import os
# Dodaj katalog nadrzędny do ścieżki
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core.logging import get_logger
class WebSocketClient:
def __init__(self, url="ws://localhost:6789"):
self.url = url
self.logger = get_logger("websocket.client")
async def process_text(self, text, **options):
async with websockets.connect(self.url) as websocket:
request = {
"text": text,
"options": options
}
await websocket.send(json.dumps(request))
response = await websocket.recv()
return json.loads(response)
def process_text_sync(self, text, **options):
return asyncio.run(self.process_text(text, **options))
FROM python:3.8-slim
WORKDIR /app
COPY pyproject.toml poetry.lock ./
RUN pip install poetry && \
poetry config virtualenvs.create false && \
poetry install --no-dev
COPY . .
CMD ["python", "-m", "websocket.server"]
core.config_manager do wszystkich potrzeb konfiguracyjnychRozwiązanie: Sprawdź:
Rozwiązanie: Sprawdź:
Rozwiązanie: Sprawdź:
__init__.py zawierają kod rejestracjiWłącz logowanie na poziomie DEBUG, aby uzyskać więcej informacji:
# Ustaw poziom logowania na DEBUG
export LOG_LEVEL=debug
# Uruchom usługę
python -m rest.server
ProcessBaseAbstrakcyjna klasa bazowa dla wszystkich implementacji Process.
class ProcessBase:
def __init__(self, config=None):
"""Inicjalizuje bazową klasę Process."""
def process_text(self, text, **options):
"""Przetwarza tekst z podanymi opcjami."""
def get_resources(self):
"""Zwraca dostępne zasoby."""
def create_result(self, data, format="text", metadata=None):
"""Tworzy obiekt wyniku."""
ProcessGłówna implementacja silnika Process.
class Process(ProcessBase):
def __init__(self, config=None):
"""Inicjalizuje silnik Process."""
def process_text(self, text, **options):
"""Przetwarza tekst z podanymi opcjami."""
def get_resources(self):
"""Zwraca dostępne zasoby."""
ConfigManagerZarządza konfiguracją dla komponentów.
class ConfigManager:
def __init__(self, component_name, config_path=None):
"""Inicjalizuje menedżera konfiguracji."""
def get(self, key, default=None):
"""Pobiera wartość konfiguracyjną."""
def set(self, key, value):
"""Ustawia wartość konfiguracyjną."""
def validate(self, schema):
"""Waliduje konfigurację względem schematu."""
def as_dict(self):
"""Zwraca całą konfigurację jako słownik."""
ErrorHandlingStandardowa obsługa błędów.
class ProcessError(Exception):
"""Bazowa klasa dla wszystkich błędów Process."""
def __init__(self, message, code=None, details=None):
"""Inicjalizuje błąd Process."""
def create_error_handler(component_name):
"""Tworzy handler błędów dla komponentu."""
PluginRegistryRejestruje i zarządza wtyczkami.
class PluginRegistry:
def register_plugin(self, name, plugin_class):
"""Rejestruje wtyczkę."""
def get_plugin(self, name):
"""Pobiera wtyczkę według nazwy."""
def get_all_plugins(self):
"""Zwraca wszystkie zarejestrowane wtyczki."""
def discover_plugins(self, directory):
"""Odkrywa wtyczki w katalogu."""
class ProcessClient:
def __init__(self, server_address):
"""Inicjalizuje klienta gRPC."""
def process_text(self, text, **options):
"""Przetwarza tekst za pomocą serwera gRPC."""
def get_resources(self):
"""Pobiera dostępne zasoby."""
class ProcessClient:
def __init__(self, base_url):
"""Inicjalizuje klienta REST."""
def process_text(self, text, **options):
"""Przetwarza tekst za pomocą serwera REST."""
def get_resources(self):
"""Pobiera dostępne zasoby."""
class ProcessTool:
def __init__(self, config):
"""Inicjalizuje narzędzie Process dla MCP."""
def get_schema(self):
"""Zwraca schemat narzędzia."""
def execute(self, parameters):
"""Wykonuje narzędzie z podanymi parametrami."""
To jest podstawowa dokumentacja dla deweloperów systemu Process. Dla bardziej szczegółowych informacji na temat konkretnych komponentów, zapoznaj się z dokumentacją w odpowiednich katalogach.