Event-Driven Orchestration
Event-Driven Orchestration
Gova-backend utilizes an event-driven architecture to manage real-time content moderation across multiple platforms. By leveraging Apache Kafka, the system ensures high throughput and decoupled communication between the API, the AI evaluation engine, and the platform-specific execution handlers.
Core Architecture Overview
The orchestration pipeline follows a reactive flow:
- Ingestion: Incoming messages from connected platforms (e.g., Discord) are captured.
- Evaluation: The
ReviewAgentanalyzes the message context, assigns a severity score (0.0 to 1.0), and proposes an action. - Orchestration: Control signals for starting, stopping, or updating moderators are broadcast via Kafka.
- Execution: Actions are either executed automatically or held for manual approval based on the moderator's configuration.
Moderator Control Events
The lifecycle of a moderator is managed via specific event types sent to the KAFKA_MODERATOR_EVENTS_TOPIC. These events instruct the engine runners to synchronize their local state with the database configuration.
Event Types
| Event | Trigger | Description |
| :--- | :--- | :--- |
| StartModeratorEvent | POST /moderators/{id}/start | Initializes the listener for the specific platform server. |
| StopModeratorEvent | POST /moderators/{id}/stop | Gracefully shuts down the listener and disconnects from the platform. |
| UpdateConfigModeratorEvent | PATCH /moderators/{id} | Signals the engine to reload moderation rules and AI prompts. |
Manual Action Approval
When the AI identifies a violation that requires human intervention, it creates an ActionEvent with a status of AWAITING_APPROVAL. Users interact with these events through the /actions router.
Approving an Action
To execute a pending moderation action (such as a Discord timeout or kick), send a request to the approve endpoint:
POST /actions/{action_id}/approve
Authorization: Bearer <JWT_TOKEN>
Response (ActionResponse):
{
"action_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"status": "COMPLETED",
"action_type": "TIMEOUT",
"action_params": {
"duration": 3600,
"reason": "Repeated policy violation"
},
"executed_at": "2023-10-27T10:00:00Z"
}
Rejecting an Action
If the AI's proposal is deemed incorrect, the action can be rejected, preventing execution and updating the event history.
POST /actions/{action_id}/reject
AI Evaluation & Scoring
The orchestration engine employs a ReviewAgent that evaluates messages within the context of the server's specific guidelines.
- Severity Score: A float between
0and1. - Contextual Analysis: The agent considers the
ServerSummary,ServerGuidelines, and theChannelSummary(recent message history) to prevent false positives. - Behavior Tracking: Continuous evaluations contribute to a user's
behaviour_scoreover time, stored inEvaluationEvents.
Webhook Integration
In addition to internal Kafka events, the system handles external events via the Payments router. The StripeEventHandler processes subscription lifecycle events (e.g., checkout.session.completed, invoice.payment_succeeded) to update user pricing_tier and moderator limits in real-time.
# Internal role: Handles Stripe's event-driven updates
@router.post("/stripe/webhook")
async def stripe_webhook(request: Request, stripe_signature: str = Header(None)):
# Verifies signature and routes to the appropriate handler
# e.g., handle_customer_subscription_deleted(event)
...
Configuration
To ensure proper orchestration, the following environment variables must be configured for Kafka connectivity:
KAFKA_BOOTSTRAP_SERVERS: The address of your Kafka brokers.KAFKA_MODERATOR_EVENTS_TOPIC: The topic used for control signals.KAFKA_CONSUMER_GROUP_ID: The group ID for engine instances.