diff --git a/Dockerfile b/Dockerfile index d138ce9..0955031 100644 --- a/Dockerfile +++ b/Dockerfile @@ -77,6 +77,8 @@ COPY config.toml /app/config.toml EXPOSE 443 9090 9091 +HEALTHCHECK --interval=30s --timeout=5s --start-period=20s --retries=3 CMD ["/app/telemt", "healthcheck", "/app/config.toml", "--mode", "liveness"] + ENTRYPOINT ["/app/telemt"] CMD ["config.toml"] @@ -94,5 +96,7 @@ USER nonroot:nonroot EXPOSE 443 9090 9091 +HEALTHCHECK --interval=30s --timeout=5s --start-period=20s --retries=3 CMD ["/app/telemt", "healthcheck", "/app/config.toml", "--mode", "liveness"] + ENTRYPOINT ["/app/telemt"] CMD ["config.toml"] diff --git a/docker-compose.yml b/docker-compose.yml index e1cd678..9fd0641 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,12 @@ services: - /etc/telemt:rw,mode=1777,size=4m environment: - RUST_LOG=info + healthcheck: + test: [ "CMD", "/app/telemt", "healthcheck", "/etc/telemt/config.toml", "--mode", "liveness" ] + interval: 30s + timeout: 5s + retries: 3 + start_period: 20s # Uncomment this line if you want to use host network for IPv6, but bridge is default and usually better # network_mode: host cap_drop: diff --git a/src/api/mod.rs b/src/api/mod.rs index 850fb0e..8ee46f7 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -41,7 +41,7 @@ use config_store::{current_revision, load_config_from_disk, parse_if_match}; use events::ApiEventStore; use http_utils::{error_response, read_json, read_optional_json, success_response}; use model::{ - ApiFailure, CreateUserRequest, DeleteUserResponse, HealthData, PatchUserRequest, + ApiFailure, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData, PatchUserRequest, RotateSecretRequest, SummaryData, UserActiveIps, }; use runtime_edge::{ @@ -275,6 +275,33 @@ async fn handle( }; Ok(success_response(StatusCode::OK, data, revision)) } + ("GET", "/v1/health/ready") => { + let revision = current_revision(&shared.config_path).await?; + let admission_open = shared.runtime_state.admission_open.load(Ordering::Relaxed); + let upstream_health = shared.upstream_manager.api_health_summary().await; + let ready = admission_open && upstream_health.healthy_total > 0; + let reason = if ready { + None + } else if !admission_open { + Some("admission_closed") + } else { + Some("no_healthy_upstreams") + }; + let data = HealthReadyData { + ready, + status: if ready { "ready" } else { "not_ready" }, + reason, + admission_open, + healthy_upstreams: upstream_health.healthy_total, + total_upstreams: upstream_health.configured_total, + }; + let status_code = if ready { + StatusCode::OK + } else { + StatusCode::SERVICE_UNAVAILABLE + }; + Ok(success_response(status_code, data, revision)) + } ("GET", "/v1/system/info") => { let revision = current_revision(&shared.config_path).await?; let data = build_system_info_data(shared.as_ref(), cfg.as_ref(), &revision); diff --git a/src/api/model.rs b/src/api/model.rs index 66de644..c6e24ea 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -60,6 +60,17 @@ pub(super) struct HealthData { pub(super) read_only: bool, } +#[derive(Serialize)] +pub(super) struct HealthReadyData { + pub(super) ready: bool, + pub(super) status: &'static str, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, + pub(super) admission_open: bool, + pub(super) healthy_upstreams: usize, + pub(super) total_upstreams: usize, +} + #[derive(Serialize)] pub(super) struct SummaryData { pub(super) uptime_seconds: f64, diff --git a/src/cli.rs b/src/cli.rs index 47a10d5..2e24017 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -6,12 +6,15 @@ //! - `reload [--pid-file PATH]` - Reload configuration (SIGHUP) //! - `status [--pid-file PATH]` - Check daemon status //! - `run [OPTIONS] [config.toml]` - Run in foreground (default behavior) +//! - `healthcheck [OPTIONS] [config.toml]` - Run control-plane health probe use rand::RngExt; use std::fs; use std::path::{Path, PathBuf}; use std::process::Command; +use crate::healthcheck::{self, HealthcheckMode}; + #[cfg(unix)] use crate::daemon::{self, DEFAULT_PID_FILE, DaemonOptions}; @@ -28,6 +31,8 @@ pub enum Subcommand { Reload, /// Check daemon status (`status` subcommand). Status, + /// Run health probe and exit with status code. + Healthcheck, /// Fire-and-forget setup (`--init`). Init, } @@ -38,6 +43,8 @@ pub struct ParsedCommand { pub subcommand: Subcommand, pub pid_file: PathBuf, pub config_path: String, + pub healthcheck_mode: HealthcheckMode, + pub healthcheck_mode_invalid: Option, #[cfg(unix)] pub daemon_opts: DaemonOptions, pub init_opts: Option, @@ -52,6 +59,8 @@ impl Default for ParsedCommand { #[cfg(not(unix))] pid_file: PathBuf::from("/var/run/telemt.pid"), config_path: "config.toml".to_string(), + healthcheck_mode: HealthcheckMode::Liveness, + healthcheck_mode_invalid: None, #[cfg(unix)] daemon_opts: DaemonOptions::default(), init_opts: None, @@ -91,6 +100,9 @@ pub fn parse_command(args: &[String]) -> ParsedCommand { "status" => { cmd.subcommand = Subcommand::Status; } + "healthcheck" => { + cmd.subcommand = Subcommand::Healthcheck; + } "run" => { cmd.subcommand = Subcommand::Run; #[cfg(unix)] @@ -113,7 +125,35 @@ pub fn parse_command(args: &[String]) -> ParsedCommand { while i < args.len() { match args[i].as_str() { // Skip subcommand names - "start" | "stop" | "reload" | "status" | "run" => {} + "start" | "stop" | "reload" | "status" | "run" | "healthcheck" => {} + "--mode" => { + i += 1; + if i < args.len() { + match HealthcheckMode::from_cli_arg(&args[i]) { + Some(mode) => { + cmd.healthcheck_mode = mode; + cmd.healthcheck_mode_invalid = None; + } + None => { + cmd.healthcheck_mode_invalid = Some(args[i].clone()); + } + } + } else { + cmd.healthcheck_mode_invalid = Some(String::new()); + } + } + s if s.starts_with("--mode=") => { + let raw = s.trim_start_matches("--mode="); + match HealthcheckMode::from_cli_arg(raw) { + Some(mode) => { + cmd.healthcheck_mode = mode; + cmd.healthcheck_mode_invalid = None; + } + None => { + cmd.healthcheck_mode_invalid = Some(raw.to_string()); + } + } + } // PID file option (for stop/reload/status) "--pid-file" => { i += 1; @@ -152,6 +192,20 @@ pub fn execute_subcommand(cmd: &ParsedCommand) -> Option { Subcommand::Stop => Some(cmd_stop(&cmd.pid_file)), Subcommand::Reload => Some(cmd_reload(&cmd.pid_file)), Subcommand::Status => Some(cmd_status(&cmd.pid_file)), + Subcommand::Healthcheck => { + if let Some(invalid_mode) = cmd.healthcheck_mode_invalid.as_ref() { + if invalid_mode.is_empty() { + eprintln!("[telemt] Missing value for --mode (supported: liveness, ready)"); + } else { + eprintln!( + "[telemt] Invalid --mode value '{invalid_mode}' (supported: liveness, ready)" + ); + } + Some(2) + } else { + Some(healthcheck::run(&cmd.config_path, cmd.healthcheck_mode)) + } + } Subcommand::Init => { if let Some(opts) = cmd.init_opts.clone() { match run_init(opts) { @@ -177,6 +231,20 @@ pub fn execute_subcommand(cmd: &ParsedCommand) -> Option { eprintln!("[telemt] Subcommand not supported on this platform"); Some(1) } + Subcommand::Healthcheck => { + if let Some(invalid_mode) = cmd.healthcheck_mode_invalid.as_ref() { + if invalid_mode.is_empty() { + eprintln!("[telemt] Missing value for --mode (supported: liveness, ready)"); + } else { + eprintln!( + "[telemt] Invalid --mode value '{invalid_mode}' (supported: liveness, ready)" + ); + } + Some(2) + } else { + Some(healthcheck::run(&cmd.config_path, cmd.healthcheck_mode)) + } + } Subcommand::Init => { if let Some(opts) = cmd.init_opts.clone() { match run_init(opts) { diff --git a/src/healthcheck.rs b/src/healthcheck.rs new file mode 100644 index 0000000..be9a381 --- /dev/null +++ b/src/healthcheck.rs @@ -0,0 +1,208 @@ +use std::io::{Read, Write}; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream}; +use std::time::Duration; + +use serde_json::Value; + +use crate::config::ProxyConfig; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum HealthcheckMode { + Liveness, + Ready, +} + +impl HealthcheckMode { + pub(crate) fn from_cli_arg(value: &str) -> Option { + match value { + "liveness" => Some(Self::Liveness), + "ready" => Some(Self::Ready), + _ => None, + } + } + + fn request_path(self) -> &'static str { + match self { + Self::Liveness => "/v1/health", + Self::Ready => "/v1/health/ready", + } + } +} + +pub(crate) fn run(config_path: &str, mode: HealthcheckMode) -> i32 { + match run_inner(config_path, mode) { + Ok(()) => 0, + Err(error) => { + eprintln!("[telemt] healthcheck failed: {error}"); + 1 + } + } +} + +fn run_inner(config_path: &str, mode: HealthcheckMode) -> Result<(), String> { + let config = + ProxyConfig::load(config_path).map_err(|error| format!("config load failed: {error}"))?; + let api_cfg = &config.server.api; + if !api_cfg.enabled { + return Ok(()); + } + + let listen: SocketAddr = api_cfg + .listen + .parse() + .map_err(|_| format!("invalid API listen address: {}", api_cfg.listen))?; + if listen.port() == 0 { + return Err("API listen port is 0".to_string()); + } + let target = probe_target(listen); + + let mut stream = TcpStream::connect_timeout(&target, Duration::from_secs(2)) + .map_err(|error| format!("connect {target} failed: {error}"))?; + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .map_err(|error| format!("set read timeout failed: {error}"))?; + stream + .set_write_timeout(Some(Duration::from_secs(2))) + .map_err(|error| format!("set write timeout failed: {error}"))?; + + let request = build_request(target, mode.request_path(), &api_cfg.auth_header); + stream + .write_all(request.as_bytes()) + .map_err(|error| format!("request write failed: {error}"))?; + stream + .flush() + .map_err(|error| format!("request flush failed: {error}"))?; + + let mut raw_response = Vec::new(); + stream + .read_to_end(&mut raw_response) + .map_err(|error| format!("response read failed: {error}"))?; + let response = + String::from_utf8(raw_response).map_err(|_| "response is not valid UTF-8".to_string())?; + + let (status_code, body) = split_response(&response)?; + if status_code != 200 { + return Err(format!("HTTP status {status_code}")); + } + + validate_payload(mode, body)?; + Ok(()) +} + +fn probe_target(listen: SocketAddr) -> SocketAddr { + match listen { + SocketAddr::V4(addr) => { + let ip = if addr.ip().is_unspecified() { + Ipv4Addr::LOCALHOST + } else { + *addr.ip() + }; + SocketAddr::from((ip, addr.port())) + } + SocketAddr::V6(addr) => { + let ip = if addr.ip().is_unspecified() { + Ipv6Addr::LOCALHOST + } else { + *addr.ip() + }; + SocketAddr::from((ip, addr.port())) + } + } +} + +fn build_request(target: SocketAddr, path: &str, auth_header: &str) -> String { + let mut request = format!("GET {path} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n", target); + if !auth_header.is_empty() { + request.push_str("Authorization: "); + request.push_str(auth_header); + request.push_str("\r\n"); + } + request.push_str("\r\n"); + request +} + +fn split_response(response: &str) -> Result<(u16, &str), String> { + let header_end = response + .find("\r\n\r\n") + .ok_or_else(|| "invalid HTTP response headers".to_string())?; + let header = &response[..header_end]; + let body = &response[header_end + 4..]; + let status_line = header + .lines() + .next() + .ok_or_else(|| "missing HTTP status line".to_string())?; + let status_code = parse_status_code(status_line)?; + Ok((status_code, body)) +} + +fn parse_status_code(status_line: &str) -> Result { + let mut parts = status_line.split_whitespace(); + let version = parts + .next() + .ok_or_else(|| "missing HTTP version".to_string())?; + if !version.starts_with("HTTP/") { + return Err(format!("invalid HTTP status line: {status_line}")); + } + let code = parts + .next() + .ok_or_else(|| "missing HTTP status code".to_string())?; + code.parse::() + .map_err(|_| format!("invalid HTTP status code: {code}")) +} + +fn validate_payload(mode: HealthcheckMode, body: &str) -> Result<(), String> { + let payload: Value = + serde_json::from_str(body).map_err(|_| "response body is not valid JSON".to_string())?; + if payload.get("ok").and_then(Value::as_bool) != Some(true) { + return Err("response JSON has ok=false".to_string()); + } + + let data = payload + .get("data") + .ok_or_else(|| "response JSON has no data field".to_string())?; + match mode { + HealthcheckMode::Liveness => { + if data.get("status").and_then(Value::as_str) != Some("ok") { + return Err("liveness status is not ok".to_string()); + } + } + HealthcheckMode::Ready => { + if data.get("ready").and_then(Value::as_bool) != Some(true) { + return Err("readiness flag is false".to_string()); + } + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::{HealthcheckMode, parse_status_code, split_response, validate_payload}; + + #[test] + fn parse_status_code_reads_http_200() { + let status = parse_status_code("HTTP/1.1 200 OK").expect("must parse status"); + assert_eq!(status, 200); + } + + #[test] + fn split_response_extracts_status_and_body() { + let response = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{\"ok\":true}"; + let (status, body) = split_response(response).expect("must split response"); + assert_eq!(status, 200); + assert_eq!(body, "{\"ok\":true}"); + } + + #[test] + fn validate_payload_accepts_liveness_contract() { + let body = "{\"ok\":true,\"data\":{\"status\":\"ok\"}}"; + validate_payload(HealthcheckMode::Liveness, body).expect("liveness payload must pass"); + } + + #[test] + fn validate_payload_rejects_not_ready() { + let body = "{\"ok\":true,\"data\":{\"ready\":false}}"; + let result = validate_payload(HealthcheckMode::Ready, body); + assert!(result.is_err()); + } +} diff --git a/src/main.rs b/src/main.rs index 5c134b8..ce6b943 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ mod crypto; #[cfg(unix)] mod daemon; mod error; +mod healthcheck; mod ip_tracker; #[cfg(test)] #[path = "tests/ip_tracker_encapsulation_adversarial_tests.rs"] diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 791fc00..229dfbf 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -279,6 +279,12 @@ pub struct UpstreamApiSummarySnapshot { pub shadowsocks_total: usize, } +#[derive(Debug, Clone, Copy, Default)] +pub struct UpstreamApiHealthSummary { + pub configured_total: usize, + pub healthy_total: usize, +} + #[derive(Debug, Clone)] pub struct UpstreamApiSnapshot { pub summary: UpstreamApiSummarySnapshot, @@ -444,6 +450,20 @@ impl UpstreamManager { Some(UpstreamApiSnapshot { summary, upstreams }) } + pub async fn api_health_summary(&self) -> UpstreamApiHealthSummary { + let guard = self.upstreams.read().await; + let mut summary = UpstreamApiHealthSummary { + configured_total: guard.len(), + healthy_total: 0, + }; + for upstream in guard.iter() { + if upstream.healthy { + summary.healthy_total += 1; + } + } + summary + } + fn describe_upstream(upstream_type: &UpstreamType) -> (UpstreamRouteKind, String) { match upstream_type { UpstreamType::Direct { .. } => (UpstreamRouteKind::Direct, "direct".to_string()),