ri/gateway/mod.rs
1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of Ri.
4//! The Ri project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Gateway Module
21//!
22//! This module provides a comprehensive API gateway functionality for Ri, offering routing, middleware support,
23//! load balancing, rate limiting, and circuit breaking capabilities.
24//!
25//! ## Key Components
26//!
27//! - **RiGateway**: Main gateway struct implementing the RiModule trait
28//! - **RiGatewayConfig**: Configuration for gateway behavior
29//! - **RiGatewayRequest**: Request structure for gateway operations
30//! - **RiGatewayResponse**: Response structure for gateway operations
31//! - **RiRoute**: Route definition for API endpoints
32//! - **RiRouter**: Router for handling request routing
33//! - **RiMiddleware**: Middleware interface for request processing
34//! - **RiMiddlewareChain**: Chain of middleware for sequential execution
35//! - **RiLoadBalancer**: Load balancing for distributing requests across multiple services
36//! - **RiLoadBalancerStrategy**: Load balancing strategies (RoundRobin, LeastConnections, etc.)
37//! - **RiRateLimiter**: Rate limiting for controlling request rates
38//! - **RiRateLimitConfig**: Configuration for rate limiting
39//! - **RiCircuitBreaker**: Circuit breaker for preventing cascading failures
40//! - **RiCircuitBreakerConfig**: Configuration for circuit breakers
41//!
42//! ## Design Principles
43//!
44//! 1. **Modular Design**: Separate components for routing, middleware, load balancing, rate limiting, and circuit breaking
45//! 2. **Async-First**: All gateway operations are asynchronous
46//! 3. **Configurable**: Highly configurable gateway behavior through RiGatewayConfig
47//! 4. **Middleware Support**: Extensible middleware system for request processing
48//! 5. **Resilience**: Built-in circuit breaker and rate limiting for service resilience
49//! 6. **Load Balancing**: Support for distributing requests across multiple service instances
50//! 7. **CORS Support**: Built-in CORS configuration for cross-origin requests
51//! 8. **Logging**: Comprehensive logging support
52//! 9. **Service Integration**: Implements RiModule trait for seamless integration into Ri
53//! 10. **Thread-safe**: Uses Arc and RwLock for safe concurrent access
54//!
55//! ## Usage
56//!
57//! ```rust
58//! use ri::prelude::*;
59//! use ri::gateway::{RiGateway, RiGatewayConfig, RiRoute};
60//! use std::collections::HashMap;
61//!
62//! async fn example() -> RiResult<()> {
63//! // Create gateway configuration
64//! let gateway_config = RiGatewayConfig {
65//! listen_address: "0.0.0.0".to_string(),
66//! listen_port: 8080,
67//! max_connections: 10000,
68//! request_timeout_seconds: 30,
69//! enable_rate_limiting: true,
70//! enable_circuit_breaker: true,
71//! enable_load_balancing: true,
72//! cors_enabled: true,
73//! cors_origins: vec!["*".to_string()],
74//! cors_methods: vec!["GET".to_string(), "POST".to_string()],
75//! cors_headers: vec!["Content-Type".to_string(), "Authorization".to_string()],
76//! enable_logging: true,
77//! log_level: "info".to_string(),
78//! };
79//!
80//! // Create gateway instance
81//! let gateway = RiGateway::new();
82//!
83//! // Get router and add routes
84//! let router = gateway.router();
85//!
86//! // Add a simple GET route
87//! router.add_route(RiRoute {
88//! path: "/api/v1/health".to_string(),
89//! method: "GET".to_string(),
90//! handler: Arc::new(|req| Box::pin(async move {
91//! Ok(RiGatewayResponse::json(200, &serde_json::json!({ "status": "ok" }), req.id.clone())?)
92//! })),
93//! ..Default::default()
94//! }).await?;
95//!
96//! // Add middleware
97//! let middleware_chain = gateway.middleware_chain();
98//! middleware_chain.add_middleware(Arc::new(|req, next| Box::pin(async move {
99//! // Log request
100//! println!("Request: {} {}", req.method, req.path);
101//! next(req).await
102//! }))).await;
103//!
104//! // Handle a sample request
105//! let sample_request = RiGatewayRequest::new(
106//! "GET".to_string(),
107//! "/api/v1/health".to_string(),
108//! HashMap::new(),
109//! HashMap::new(),
110//! None,
111//! "127.0.0.1:12345".to_string(),
112//! );
113//!
114//! let response = gateway.handle_request(sample_request).await;
115//! println!("Response: {} {}", response.status_code, String::from_utf8_lossy(&response.body));
116//!
117//! Ok(())
118//! }
119//! ```
120
121use crate::core::{RiModule, RiServiceContext};
122use log;
123use serde::{Deserialize, Serialize};
124use std::collections::HashMap;
125use std::sync::Arc;
126use tokio::sync::RwLock;
127
128pub mod middleware;
129pub mod routing;
130pub mod radix_tree;
131pub mod circuit_breaker;
132pub mod load_balancer;
133pub mod rate_limiter;
134pub mod server;
135
136pub use routing::{RiRoute, RiRouter};
137pub use radix_tree::{RiRadixTree, RadixNode, PathSegment, SegmentType, RouteMatch};
138pub use middleware::{RiMiddleware, RiMiddlewareChain};
139pub use load_balancer::{RiLoadBalancer, RiLoadBalancerStrategy};
140pub use rate_limiter::{RiRateLimiter, RiRateLimitConfig};
141pub use circuit_breaker::{RiCircuitBreaker, RiCircuitBreakerConfig};
142
143#[cfg(feature = "gateway")]
144pub use server::{RiGatewayServer, load_tls_config};
145
146/// Configuration for the Ri Gateway.
147///
148/// This struct defines the configuration options for the API gateway, including network settings,
149/// feature toggles, and CORS configuration.
150#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass(get_all, set_all))]
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct RiGatewayConfig {
153 /// Address to listen on
154 pub listen_address: String,
155 /// Port to listen on
156 pub listen_port: u16,
157 /// Maximum number of concurrent connections
158 pub max_connections: usize,
159 /// Request timeout in seconds
160 pub request_timeout_seconds: u64,
161 /// Whether to enable rate limiting
162 pub enable_rate_limiting: bool,
163 /// Whether to enable circuit breaker
164 pub enable_circuit_breaker: bool,
165 /// Whether to enable load balancing
166 pub enable_load_balancing: bool,
167 /// Whether to enable CORS
168 pub cors_enabled: bool,
169 /// Allowed CORS origins
170 pub cors_origins: Vec<String>,
171 /// Allowed CORS methods
172 pub cors_methods: Vec<String>,
173 /// Allowed CORS headers
174 pub cors_headers: Vec<String>,
175 /// Whether to enable logging
176 pub enable_logging: bool,
177 /// Log level for gateway operations
178 pub log_level: String,
179}
180
181#[cfg(feature = "pyo3")]
182/// Python bindings for RiGatewayConfig
183#[pyo3::prelude::pymethods]
184impl RiGatewayConfig {
185 #[new]
186 fn py_new() -> Self {
187 Self::default()
188 }
189
190 #[staticmethod]
191 fn py_new_with_address(listen_address: String, listen_port: u16) -> Self {
192 Self {
193 listen_address,
194 listen_port,
195 ..Self::default()
196 }
197 }
198}
199
200impl RiGatewayConfig {
201 /// Creates a new RiGatewayConfig with default values.
202 pub fn new() -> Self {
203 Self::default()
204 }
205}
206
207impl Default for RiGatewayConfig {
208 /// Returns the default configuration for the gateway.
209 ///
210 /// Default values:
211 /// - listen_address: "0.0.0.0"
212 /// - listen_port: 8080
213 /// - max_connections: 10000
214 /// - request_timeout_seconds: 30
215 /// - enable_rate_limiting: true
216 /// - enable_circuit_breaker: true
217 /// - enable_load_balancing: true
218 /// - cors_enabled: true
219 /// - cors_origins: ["*"]
220 /// - cors_methods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"]
221 /// - cors_headers: ["Content-Type", "Authorization", "X-Requested-With"]
222 /// - enable_logging: true
223 /// - log_level: "info"
224 fn default() -> Self {
225 Self {
226 listen_address: "0.0.0.0".to_string(),
227 listen_port: 8080,
228 max_connections: 10000,
229 request_timeout_seconds: 30,
230 enable_rate_limiting: true,
231 enable_circuit_breaker: true,
232 enable_load_balancing: true,
233 cors_enabled: true,
234 cors_origins: vec!["*".to_string()],
235 cors_methods: vec!["GET".to_string(), "POST".to_string(), "PUT".to_string(), "DELETE".to_string(), "OPTIONS".to_string()],
236 cors_headers: vec!["Content-Type".to_string(), "Authorization".to_string(), "X-Requested-With".to_string()],
237 enable_logging: true,
238 log_level: "info".to_string(),
239 }
240 }
241}
242
243/// Request structure for gateway operations.
244///
245/// This struct represents an HTTP request received by the gateway, including method, path, headers,
246/// query parameters, body, and remote address.
247#[derive(Debug, Clone)]
248#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
249pub struct RiGatewayRequest {
250 /// Unique request ID
251 pub id: String,
252 /// HTTP method (GET, POST, etc.)
253 pub method: String,
254 /// Request path
255 pub path: String,
256 /// HTTP headers
257 pub headers: HashMap<String, String>,
258 /// Query parameters
259 pub query_params: HashMap<String, String>,
260 /// Request body (if any)
261 pub body: Option<Vec<u8>>,
262 /// Remote address of the client
263 pub remote_addr: String,
264 /// Timestamp when the request was created
265 pub timestamp: std::time::Instant,
266}
267
268impl RiGatewayRequest {
269 /// Creates a new gateway request.
270 ///
271 /// # Parameters
272 ///
273 /// - `method`: HTTP method
274 /// - `path`: Request path
275 /// - `headers`: HTTP headers
276 /// - `query_params`: Query parameters
277 /// - `body`: Request body (optional)
278 /// - `remote_addr`: Remote address of the client
279 ///
280 /// # Returns
281 ///
282 /// A new `RiGatewayRequest` instance
283 pub fn new(
284 method: String,
285 path: String,
286 headers: HashMap<String, String>,
287 query_params: HashMap<String, String>,
288 body: Option<Vec<u8>>,
289 remote_addr: String,
290 ) -> Self {
291 Self {
292 id: uuid::Uuid::new_v4().to_string(),
293 method,
294 path,
295 headers,
296 query_params,
297 body,
298 remote_addr,
299 timestamp: std::time::Instant::now(),
300 }
301 }
302}
303
304/// Response structure for gateway operations.
305///
306/// This struct represents an HTTP response returned by the gateway, including status code,
307/// headers, body, and request ID.
308#[derive(Debug, Clone)]
309#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
310pub struct RiGatewayResponse {
311 /// HTTP status code
312 pub status_code: u16,
313 /// HTTP headers
314 pub headers: HashMap<String, String>,
315 /// Response body
316 pub body: Vec<u8>,
317 /// Request ID associated with this response
318 pub request_id: String,
319}
320
321impl RiGatewayResponse {
322 /// Creates a new gateway response.
323 ///
324 /// # Parameters
325 ///
326 /// - `status_code`: HTTP status code
327 /// - `body`: Response body
328 /// - `request_id`: Request ID associated with this response
329 ///
330 /// # Returns
331 ///
332 /// A new `RiGatewayResponse` instance
333 pub fn new(status_code: u16, body: Vec<u8>, request_id: String) -> Self {
334 let mut headers = HashMap::new();
335 headers.insert("Content-Type".to_string(), "application/json".to_string());
336 headers.insert("X-Request-ID".to_string(), request_id.clone());
337
338 Self {
339 status_code,
340 headers,
341 body,
342 request_id,
343 }
344 }
345
346 /// Adds a header to the response.
347 ///
348 /// # Parameters
349 ///
350 /// - `key`: Header name
351 /// - `value`: Header value
352 ///
353 /// # Returns
354 ///
355 /// The updated `RiGatewayResponse` instance
356 pub fn with_header(mut self, key: String, value: String) -> Self {
357 self.headers.insert(key, value);
358 self
359 }
360
361 /// Creates a JSON response.
362 ///
363 /// # Parameters
364 ///
365 /// - `status_code`: HTTP status code
366 /// - `data`: Data to serialize as JSON
367 /// - `request_id`: Request ID associated with this response
368 ///
369 /// # Returns
370 ///
371 /// A `RiResult<Self>` containing the JSON response
372 pub fn json<T: serde::Serialize>(status_code: u16, data: &T, request_id: String) -> crate::core::RiResult<Self> {
373 let body = serde_json::to_vec(data)?;
374 Ok(Self::new(status_code, body, request_id))
375 }
376
377 /// Creates an error response.
378 ///
379 /// # Parameters
380 ///
381 /// - `status_code`: HTTP status code
382 /// - `message`: Error message
383 /// - `request_id`: Request ID associated with this response
384 ///
385 /// # Returns
386 ///
387 /// A new `RiGatewayResponse` instance with error information
388 pub fn error(status_code: u16, message: String, request_id: String) -> Self {
389 let error_body = serde_json::json!({
390 "error": message,
391 "request_id": request_id
392 });
393
394 let body = serde_json::to_vec(&error_body).unwrap_or_else(|_| b"{}".to_vec());
395 Self::new(status_code, body, request_id)
396 }
397}
398
399/// Main gateway struct implementing the RiModule trait.
400///
401/// This struct provides the core gateway functionality, including request handling,
402/// routing, middleware execution, rate limiting, and circuit breaking.
403#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
404pub struct RiGateway {
405 /// Gateway configuration, protected by a RwLock for thread-safe access
406 config: RwLock<RiGatewayConfig>,
407 /// Router for handling request routing
408 router: Arc<RiRouter>,
409 /// Middleware chain for request processing
410 middleware_chain: Arc<RiMiddlewareChain>,
411 /// Rate limiter for controlling request rates
412 rate_limiter: Option<Arc<RiRateLimiter>>,
413 /// Circuit breaker for preventing cascading failures
414 circuit_breaker: Option<Arc<RiCircuitBreaker>>,
415}
416
417impl Default for RiGateway {
418 fn default() -> Self {
419 Self::new()
420 }
421}
422
423impl RiGateway {
424 /// Creates a new gateway instance with default configuration.
425 ///
426 /// # Returns
427 ///
428 /// A new `RiGateway` instance
429 pub fn new() -> Self {
430 let config = RiGatewayConfig::default();
431 let router = Arc::new(RiRouter::new());
432 let middleware_chain = Arc::new(RiMiddlewareChain::new());
433
434 let rate_limiter = if config.enable_rate_limiting {
435 Some(Arc::new(RiRateLimiter::new(RiRateLimitConfig::default())))
436 } else {
437 None
438 };
439
440 let circuit_breaker = if config.enable_circuit_breaker {
441 Some(Arc::new(RiCircuitBreaker::new(RiCircuitBreakerConfig::default())))
442 } else {
443 None
444 };
445
446 Self {
447 config: RwLock::new(config),
448 router,
449 middleware_chain,
450 rate_limiter,
451 circuit_breaker,
452 }
453 }
454
455 /// Returns a reference to the router.
456 ///
457 /// # Returns
458 ///
459 /// An Arc<RiRouter> providing thread-safe access to the router
460 pub fn router(&self) -> Arc<RiRouter> {
461 self.router.clone()
462 }
463
464 /// Returns a reference to the middleware chain.
465 ///
466 /// # Returns
467 ///
468 /// An Arc<RiMiddlewareChain> providing thread-safe access to the middleware chain
469 pub fn middleware_chain(&self) -> Arc<RiMiddlewareChain> {
470 self.middleware_chain.clone()
471 }
472
473 /// Handles a gateway request.
474 ///
475 /// This method processes a request through the gateway pipeline, including:
476 /// 1. Rate limiting
477 /// 2. Circuit breaker check
478 /// 3. Middleware chain execution
479 /// 4. Request routing
480 /// 5. Route handler execution
481 ///
482 /// # Parameters
483 ///
484 /// - `request`: The request to handle
485 ///
486 /// # Returns
487 ///
488 /// A `RiGatewayResponse` containing the response to the request
489 pub async fn handle_request(&self, request: RiGatewayRequest) -> RiGatewayResponse {
490 let request_id = request.id.clone();
491
492 // Apply rate limiting
493 if let Some(rate_limiter) = &self.rate_limiter {
494 if !rate_limiter.check_request(&request).await {
495 return RiGatewayResponse::new(429, "Rate limit exceeded".to_string().into_bytes(), request_id);
496 }
497 }
498
499 // Apply circuit breaker
500 if let Some(circuit_breaker) = &self.circuit_breaker {
501 if !circuit_breaker.allow_request() {
502 return RiGatewayResponse::new(503, "Service temporarily unavailable".to_string().into_bytes(), request_id);
503 }
504 }
505
506 // Apply middleware chain
507 let mut request = request;
508 match self.middleware_chain.execute(&mut request).await {
509 Ok(()) => {
510 // Route the request
511 match self.router.route(&request).await {
512 Ok(route_handler) => {
513 // Execute the route handler
514 match route_handler(request).await {
515 Ok(response) => response,
516 Err(e) => {
517 RiGatewayResponse::new(500, format!("Internal server error: {e}").into_bytes(), request_id)
518 }
519 }
520 },
521 Err(e) => {
522 RiGatewayResponse::new(404, format!("Route not found: {e}").into_bytes(), request_id)
523 }
524 }
525 },
526 Err(e) => {
527 RiGatewayResponse::new(403, format!("Middleware error: {e}").into_bytes(), request_id)
528 }
529 }
530 }
531}
532
533#[async_trait::async_trait]
534impl RiModule for RiGateway {
535 /// Returns the name of the gateway module.
536 ///
537 /// # Returns
538 ///
539 /// The module name as a string
540 fn name(&self) -> &str {
541 "Ri.Gateway"
542 }
543
544 /// Initializes the gateway module.
545 ///
546 /// # Parameters
547 ///
548 /// - `ctx`: Service context containing configuration and other services
549 ///
550 /// # Returns
551 ///
552 /// A `RiResult<()>` indicating success or failure
553 async fn init(&mut self, ctx: &mut RiServiceContext) -> crate::core::RiResult<()> {
554 let logger = ctx.logger();
555 logger.info("Ri.Gateway", "Initializing API gateway module")?;
556
557 let config = self.config.read().await;
558 logger.info(
559 "Ri.Gateway",
560 format!("Gateway will listen on {}:{}", config.listen_address, config.listen_port)
561 )?;
562
563 logger.info("Ri.Gateway", "API gateway module initialized successfully")?;
564 Ok(())
565 }
566
567 /// Performs cleanup after the gateway has shut down.
568 ///
569 /// This method ensures proper resource cleanup during gateway shutdown:
570 /// - Clears all rate limiter buckets
571 /// - Resets the circuit breaker to closed state
572 /// - Clears all registered routes
573 ///
574 /// # Parameters
575 ///
576 /// - `_ctx`: Service context (not used in this implementation)
577 ///
578 /// # Returns
579 ///
580 /// A `RiResult<()>` indicating success or failure
581 ///
582 /// # Logs
583 ///
584 /// Logs cleanup progress at INFO level for debugging purposes
585 async fn after_shutdown(&mut self, _ctx: &mut RiServiceContext) -> crate::core::RiResult<()> {
586 log::info!("Cleaning up Ri Gateway Module");
587
588 if let Some(rate_limiter) = &self.rate_limiter {
589 rate_limiter.clear_all_buckets();
590 log::info!("Rate limiter cleanup completed");
591 }
592
593 if let Some(circuit_breaker) = &self.circuit_breaker {
594 circuit_breaker.reset();
595 log::info!("Circuit breaker reset completed");
596 }
597
598 self.router.clear_routes();
599 log::info!("Router cleanup completed");
600
601 log::info!("Ri Gateway Module cleanup completed");
602 Ok(())
603 }
604}
605
606#[cfg(feature = "pyo3")]
607/// Python bindings for RiGateway
608#[pyo3::prelude::pymethods]
609impl RiGateway {
610 #[new]
611 fn py_new() -> Self {
612 Self::new()
613 }
614}