mirror of https://github.com/telemt/telemt.git
API from main
This commit is contained in:
parent
508eea0131
commit
e1f3efb619
|
|
@ -11,7 +11,7 @@ use hyper::server::conn::http1;
|
|||
use hyper::service::service_fn;
|
||||
use hyper::{Method, Request, Response, StatusCode};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::{Mutex, watch};
|
||||
use tokio::sync::{Mutex, RwLock, watch};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::config::ProxyConfig;
|
||||
|
|
@ -67,7 +67,7 @@ pub(super) struct ApiRuntimeState {
|
|||
pub(super) struct ApiShared {
|
||||
pub(super) stats: Arc<Stats>,
|
||||
pub(super) ip_tracker: Arc<UserIpTracker>,
|
||||
pub(super) me_pool: Option<Arc<MePool>>,
|
||||
pub(super) me_pool: Arc<RwLock<Option<Arc<MePool>>>>,
|
||||
pub(super) upstream_manager: Arc<UpstreamManager>,
|
||||
pub(super) config_path: PathBuf,
|
||||
pub(super) startup_detected_ip_v4: Option<IpAddr>,
|
||||
|
|
@ -91,7 +91,7 @@ pub async fn serve(
|
|||
listen: SocketAddr,
|
||||
stats: Arc<Stats>,
|
||||
ip_tracker: Arc<UserIpTracker>,
|
||||
me_pool: Option<Arc<MePool>>,
|
||||
me_pool: Arc<RwLock<Option<Arc<MePool>>>>,
|
||||
upstream_manager: Arc<UpstreamManager>,
|
||||
config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
||||
admission_rx: watch::Receiver<bool>,
|
||||
|
|
@ -248,7 +248,7 @@ async fn handle(
|
|||
}
|
||||
("GET", "/v1/runtime/gates") => {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let data = build_runtime_gates_data(shared.as_ref(), cfg.as_ref());
|
||||
let data = build_runtime_gates_data(shared.as_ref(), cfg.as_ref()).await;
|
||||
Ok(success_response(StatusCode::OK, data, revision))
|
||||
}
|
||||
("GET", "/v1/limits/effective") => {
|
||||
|
|
|
|||
|
|
@ -260,7 +260,7 @@ pub(super) fn build_security_whitelist_data(cfg: &ProxyConfig) -> SecurityWhitel
|
|||
|
||||
pub(super) async fn build_runtime_me_pool_state_data(shared: &ApiShared) -> RuntimeMePoolStateData {
|
||||
let now_epoch_secs = now_epoch_secs();
|
||||
let Some(pool) = &shared.me_pool else {
|
||||
let Some(pool) = shared.me_pool.read().await.clone() else {
|
||||
return RuntimeMePoolStateData {
|
||||
enabled: false,
|
||||
reason: Some(SOURCE_UNAVAILABLE_REASON),
|
||||
|
|
@ -350,7 +350,7 @@ pub(super) async fn build_runtime_me_pool_state_data(shared: &ApiShared) -> Runt
|
|||
|
||||
pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> RuntimeMeQualityData {
|
||||
let now_epoch_secs = now_epoch_secs();
|
||||
let Some(pool) = &shared.me_pool else {
|
||||
let Some(pool) = shared.me_pool.read().await.clone() else {
|
||||
return RuntimeMeQualityData {
|
||||
enabled: false,
|
||||
reason: Some(SOURCE_UNAVAILABLE_REASON),
|
||||
|
|
@ -486,7 +486,7 @@ pub(super) async fn build_runtime_upstream_quality_data(
|
|||
|
||||
pub(super) async fn build_runtime_nat_stun_data(shared: &ApiShared) -> RuntimeNatStunData {
|
||||
let now_epoch_secs = now_epoch_secs();
|
||||
let Some(pool) = &shared.me_pool else {
|
||||
let Some(pool) = shared.me_pool.read().await.clone() else {
|
||||
return RuntimeNatStunData {
|
||||
enabled: false,
|
||||
reason: Some(SOURCE_UNAVAILABLE_REASON),
|
||||
|
|
|
|||
|
|
@ -297,7 +297,7 @@ async fn get_minimal_payload_cached(
|
|||
}
|
||||
}
|
||||
|
||||
let pool = shared.me_pool.as_ref()?;
|
||||
let pool = shared.me_pool.read().await.clone()?;
|
||||
let status = pool.api_status_snapshot().await;
|
||||
let runtime = pool.api_runtime_snapshot().await;
|
||||
let generated_at_epoch_secs = status.generated_at_epoch_secs;
|
||||
|
|
|
|||
|
|
@ -142,12 +142,17 @@ pub(super) fn build_system_info_data(
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) fn build_runtime_gates_data(shared: &ApiShared, cfg: &ProxyConfig) -> RuntimeGatesData {
|
||||
pub(super) async fn build_runtime_gates_data(
|
||||
shared: &ApiShared,
|
||||
cfg: &ProxyConfig,
|
||||
) -> RuntimeGatesData {
|
||||
let me_runtime_ready = if !cfg.general.use_middle_proxy {
|
||||
true
|
||||
} else {
|
||||
shared
|
||||
.me_pool
|
||||
.read()
|
||||
.await
|
||||
.as_ref()
|
||||
.map(|pool| pool.is_runtime_ready())
|
||||
.unwrap_or(false)
|
||||
|
|
|
|||
140
src/main.rs
140
src/main.rs
|
|
@ -8,7 +8,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
|||
use rand::Rng;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::signal;
|
||||
use tokio::sync::{Semaphore, mpsc, watch};
|
||||
use tokio::sync::{RwLock, Semaphore, mpsc, watch};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload};
|
||||
#[cfg(unix)]
|
||||
|
|
@ -473,6 +473,70 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||
config.general.upstream_connect_failfast_hard_errors,
|
||||
stats.clone(),
|
||||
));
|
||||
let ip_tracker = Arc::new(UserIpTracker::new());
|
||||
ip_tracker.load_limits(&config.access.user_max_unique_ips).await;
|
||||
ip_tracker
|
||||
.set_limit_policy(
|
||||
config.access.user_max_unique_ips_mode,
|
||||
config.access.user_max_unique_ips_window_secs,
|
||||
)
|
||||
.await;
|
||||
if !config.access.user_max_unique_ips.is_empty() {
|
||||
info!(
|
||||
"IP limits configured for {} users",
|
||||
config.access.user_max_unique_ips.len()
|
||||
);
|
||||
}
|
||||
if !config.network.dns_overrides.is_empty() {
|
||||
info!(
|
||||
"Runtime DNS overrides configured: {} entries",
|
||||
config.network.dns_overrides.len()
|
||||
);
|
||||
}
|
||||
|
||||
let (api_config_tx, api_config_rx) = watch::channel(Arc::new(config.clone()));
|
||||
let initial_admission_open = !config.general.use_middle_proxy;
|
||||
let (admission_tx, admission_rx) = watch::channel(initial_admission_open);
|
||||
let api_me_pool = Arc::new(RwLock::new(None::<Arc<MePool>>));
|
||||
|
||||
if config.server.api.enabled {
|
||||
let listen = match config.server.api.listen.parse::<SocketAddr>() {
|
||||
Ok(listen) => listen,
|
||||
Err(error) => {
|
||||
warn!(
|
||||
error = %error,
|
||||
listen = %config.server.api.listen,
|
||||
"Invalid server.api.listen; API is disabled"
|
||||
);
|
||||
SocketAddr::from(([127, 0, 0, 1], 0))
|
||||
}
|
||||
};
|
||||
if listen.port() != 0 {
|
||||
let stats_api = stats.clone();
|
||||
let ip_tracker_api = ip_tracker.clone();
|
||||
let me_pool_api = api_me_pool.clone();
|
||||
let upstream_manager_api = upstream_manager.clone();
|
||||
let config_rx_api = api_config_rx.clone();
|
||||
let admission_rx_api = admission_rx.clone();
|
||||
let config_path_api = std::path::PathBuf::from(&config_path);
|
||||
tokio::spawn(async move {
|
||||
api::serve(
|
||||
listen,
|
||||
stats_api,
|
||||
ip_tracker_api,
|
||||
me_pool_api,
|
||||
upstream_manager_api,
|
||||
config_rx_api,
|
||||
admission_rx_api,
|
||||
config_path_api,
|
||||
None,
|
||||
None,
|
||||
process_started_at_epoch_secs,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
|
||||
tls_domains.push(config.censorship.tls_domain.clone());
|
||||
|
|
@ -620,26 +684,6 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||
let beobachten = Arc::new(BeobachtenStore::new());
|
||||
let rng = Arc::new(SecureRandom::new());
|
||||
|
||||
// IP Tracker initialization
|
||||
let ip_tracker = Arc::new(UserIpTracker::new());
|
||||
ip_tracker.load_limits(&config.access.user_max_unique_ips).await;
|
||||
ip_tracker
|
||||
.set_limit_policy(
|
||||
config.access.user_max_unique_ips_mode,
|
||||
config.access.user_max_unique_ips_window_secs,
|
||||
)
|
||||
.await;
|
||||
|
||||
if !config.access.user_max_unique_ips.is_empty() {
|
||||
info!("IP limits configured for {} users", config.access.user_max_unique_ips.len());
|
||||
}
|
||||
if !config.network.dns_overrides.is_empty() {
|
||||
info!(
|
||||
"Runtime DNS overrides configured: {} entries",
|
||||
config.network.dns_overrides.len()
|
||||
);
|
||||
}
|
||||
|
||||
// Connection concurrency limit
|
||||
let max_connections = Arc::new(Semaphore::new(10_000));
|
||||
|
||||
|
|
@ -1074,6 +1118,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||
if let Some(ref pool) = me_pool {
|
||||
pool.set_runtime_ready(true);
|
||||
}
|
||||
*api_me_pool.write().await = me_pool.clone();
|
||||
|
||||
// Background tasks
|
||||
let um_clone = upstream_manager.clone();
|
||||
|
|
@ -1114,6 +1159,17 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||
detected_ip_v4,
|
||||
detected_ip_v6,
|
||||
);
|
||||
let mut config_rx_api_bridge = config_rx.clone();
|
||||
let api_config_tx_bridge = api_config_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if config_rx_api_bridge.changed().await.is_err() {
|
||||
break;
|
||||
}
|
||||
let cfg = config_rx_api_bridge.borrow_and_update().clone();
|
||||
api_config_tx_bridge.send_replace(cfg);
|
||||
}
|
||||
});
|
||||
|
||||
let stats_policy = stats.clone();
|
||||
let mut config_rx_policy = config_rx.clone();
|
||||
|
|
@ -1345,7 +1401,6 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||
print_proxy_links(&host, port, &config);
|
||||
}
|
||||
|
||||
let (admission_tx, admission_rx) = watch::channel(true);
|
||||
if config.general.use_middle_proxy {
|
||||
if let Some(pool) = me_pool.as_ref() {
|
||||
let initial_open = pool.admission_ready_conditional_cast().await;
|
||||
|
|
@ -1546,47 +1601,6 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||
});
|
||||
}
|
||||
|
||||
if config.server.api.enabled {
|
||||
let listen = match config.server.api.listen.parse::<SocketAddr>() {
|
||||
Ok(listen) => listen,
|
||||
Err(error) => {
|
||||
warn!(
|
||||
error = %error,
|
||||
listen = %config.server.api.listen,
|
||||
"Invalid server.api.listen; API is disabled"
|
||||
);
|
||||
SocketAddr::from(([127, 0, 0, 1], 0))
|
||||
}
|
||||
};
|
||||
if listen.port() != 0 {
|
||||
let stats = stats.clone();
|
||||
let ip_tracker_api = ip_tracker.clone();
|
||||
let me_pool_api = me_pool.clone();
|
||||
let upstream_manager_api = upstream_manager.clone();
|
||||
let config_rx_api = config_rx.clone();
|
||||
let admission_rx_api = admission_rx.clone();
|
||||
let config_path_api = std::path::PathBuf::from(&config_path);
|
||||
let startup_detected_ip_v4 = detected_ip_v4;
|
||||
let startup_detected_ip_v6 = detected_ip_v6;
|
||||
tokio::spawn(async move {
|
||||
api::serve(
|
||||
listen,
|
||||
stats,
|
||||
ip_tracker_api,
|
||||
me_pool_api,
|
||||
upstream_manager_api,
|
||||
config_rx_api,
|
||||
admission_rx_api,
|
||||
config_path_api,
|
||||
startup_detected_ip_v4,
|
||||
startup_detected_ip_v6,
|
||||
process_started_at_epoch_secs,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
for (listener, listener_proxy_protocol) in listeners {
|
||||
let mut config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
|
||||
let mut admission_rx_tcp = admission_rx.clone();
|
||||
|
|
|
|||
Loading…
Reference in New Issue