Merge pull request #595 from xaosproxy/fix/apply-tg-connect-timeout-upstream

Apply [timeouts] tg_connect to upstream DC TCP connect attempts
This commit is contained in:
Alexey 2026-03-28 21:14:51 +03:00 committed by GitHub
commit 07d774a82a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 106 additions and 10 deletions

View File

@ -91,6 +91,7 @@ This document lists all configuration keys accepted by `config.toml`.
| upstream_connect_retry_attempts | `u32` | `2` | Must be `> 0`. | Connect attempts for selected upstream before error/fallback. | | upstream_connect_retry_attempts | `u32` | `2` | Must be `> 0`. | Connect attempts for selected upstream before error/fallback. |
| upstream_connect_retry_backoff_ms | `u64` | `100` | — | Delay between upstream connect attempts (ms). | | upstream_connect_retry_backoff_ms | `u64` | `100` | — | Delay between upstream connect attempts (ms). |
| upstream_connect_budget_ms | `u64` | `3000` | Must be `> 0`. | Total wall-clock budget for one upstream connect request (ms). | | upstream_connect_budget_ms | `u64` | `3000` | Must be `> 0`. | Total wall-clock budget for one upstream connect request (ms). |
| tg_connect | `u64` | `10` | Must be `> 0`. | Per-attempt upstream TCP connect timeout to Telegram DC (seconds). |
| upstream_unhealthy_fail_threshold | `u32` | `5` | Must be `> 0`. | Consecutive failed requests before upstream is marked unhealthy. | | upstream_unhealthy_fail_threshold | `u32` | `5` | Must be `> 0`. | Consecutive failed requests before upstream is marked unhealthy. |
| upstream_connect_failfast_hard_errors | `bool` | `false` | — | Skips additional retries for hard non-transient connect errors. | | upstream_connect_failfast_hard_errors | `bool` | `false` | — | Skips additional retries for hard non-transient connect errors. |
| stun_iface_mismatch_ignore | `bool` | `false` | none | Reserved compatibility flag in current runtime revision. | | stun_iface_mismatch_ignore | `bool` | `false` | none | Reserved compatibility flag in current runtime revision. |
@ -249,7 +250,6 @@ Note: When `server.proxy_protocol` is enabled, incoming PROXY protocol headers a
| relay_client_idle_soft_secs | `u64` | `120` | Must be `> 0`; must be `<= relay_client_idle_hard_secs`. | Soft idle threshold for middle-relay client uplink inactivity (seconds). | | relay_client_idle_soft_secs | `u64` | `120` | Must be `> 0`; must be `<= relay_client_idle_hard_secs`. | Soft idle threshold for middle-relay client uplink inactivity (seconds). |
| relay_client_idle_hard_secs | `u64` | `360` | Must be `> 0`; must be `>= relay_client_idle_soft_secs`. | Hard idle threshold for middle-relay client uplink inactivity (seconds). | | relay_client_idle_hard_secs | `u64` | `360` | Must be `> 0`; must be `>= relay_client_idle_soft_secs`. | Hard idle threshold for middle-relay client uplink inactivity (seconds). |
| relay_idle_grace_after_downstream_activity_secs | `u64` | `30` | Must be `<= relay_client_idle_hard_secs`. | Extra hard-idle grace after recent downstream activity (seconds). | | relay_idle_grace_after_downstream_activity_secs | `u64` | `30` | Must be `<= relay_client_idle_hard_secs`. | Extra hard-idle grace after recent downstream activity (seconds). |
| tg_connect | `u64` | `10` | — | Upstream Telegram connect timeout. |
| client_keepalive | `u64` | `15` | — | Client keepalive timeout. | | client_keepalive | `u64` | `15` | — | Client keepalive timeout. |
| client_ack | `u64` | `90` | — | Client ACK timeout. | | client_ack | `u64` | `90` | — | Client ACK timeout. |
| me_one_retry | `u8` | `12` | none | Fast reconnect attempts budget for single-endpoint DC scenarios. | | me_one_retry | `u8` | `12` | none | Fast reconnect attempts budget for single-endpoint DC scenarios. |

View File

@ -228,7 +228,7 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
me_pool_force_close_secs: cfg.general.effective_me_pool_force_close_secs(), me_pool_force_close_secs: cfg.general.effective_me_pool_force_close_secs(),
timeouts: EffectiveTimeoutLimits { timeouts: EffectiveTimeoutLimits {
client_handshake_secs: cfg.timeouts.client_handshake, client_handshake_secs: cfg.timeouts.client_handshake,
tg_connect_secs: cfg.timeouts.tg_connect, tg_connect_secs: cfg.general.tg_connect,
client_keepalive_secs: cfg.timeouts.client_keepalive, client_keepalive_secs: cfg.timeouts.client_keepalive,
client_ack_secs: cfg.timeouts.client_ack, client_ack_secs: cfg.timeouts.client_ack,
me_one_retry: cfg.timeouts.me_one_retry, me_one_retry: cfg.timeouts.me_one_retry,

View File

@ -584,6 +584,7 @@ me_pool_drain_soft_evict_cooldown_ms = 1000
me_bind_stale_mode = "never" me_bind_stale_mode = "never"
me_pool_min_fresh_ratio = 0.8 me_pool_min_fresh_ratio = 0.8
me_reinit_drain_timeout_secs = 90 me_reinit_drain_timeout_secs = 90
tg_connect = 10
[network] [network]
ipv4 = true ipv4 = true
@ -610,7 +611,6 @@ ip = "::"
[timeouts] [timeouts]
client_handshake = 15 client_handshake = 15
tg_connect = 10
client_keepalive = 60 client_keepalive = 60
client_ack = 300 client_ack = 300

View File

@ -696,6 +696,7 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
if old.general.upstream_connect_retry_attempts != new.general.upstream_connect_retry_attempts if old.general.upstream_connect_retry_attempts != new.general.upstream_connect_retry_attempts
|| old.general.upstream_connect_retry_backoff_ms || old.general.upstream_connect_retry_backoff_ms
!= new.general.upstream_connect_retry_backoff_ms != new.general.upstream_connect_retry_backoff_ms
|| old.general.tg_connect != new.general.tg_connect
|| old.general.upstream_unhealthy_fail_threshold || old.general.upstream_unhealthy_fail_threshold
!= new.general.upstream_unhealthy_fail_threshold != new.general.upstream_unhealthy_fail_threshold
|| old.general.upstream_connect_failfast_hard_errors || old.general.upstream_connect_failfast_hard_errors

View File

@ -346,6 +346,12 @@ impl ProxyConfig {
)); ));
} }
if config.general.tg_connect == 0 {
return Err(ProxyError::Config(
"general.tg_connect must be > 0".to_string(),
));
}
if config.general.upstream_unhealthy_fail_threshold == 0 { if config.general.upstream_unhealthy_fail_threshold == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.upstream_unhealthy_fail_threshold must be > 0".to_string(), "general.upstream_unhealthy_fail_threshold must be > 0".to_string(),
@ -1907,6 +1913,26 @@ mod tests {
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[test]
fn tg_connect_zero_is_rejected() {
let toml = r#"
[general]
tg_connect = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_tg_connect_zero_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.tg_connect must be > 0"));
let _ = std::fs::remove_file(path);
}
#[test] #[test]
fn rpc_proxy_req_every_out_of_range_is_rejected() { fn rpc_proxy_req_every_out_of_range_is_rejected() {
let toml = r#" let toml = r#"

View File

@ -663,6 +663,10 @@ pub struct GeneralConfig {
#[serde(default = "default_upstream_connect_budget_ms")] #[serde(default = "default_upstream_connect_budget_ms")]
pub upstream_connect_budget_ms: u64, pub upstream_connect_budget_ms: u64,
/// Per-attempt TCP connect timeout to Telegram DC (seconds).
#[serde(default = "default_connect_timeout")]
pub tg_connect: u64,
/// Consecutive failed requests before upstream is marked unhealthy. /// Consecutive failed requests before upstream is marked unhealthy.
#[serde(default = "default_upstream_unhealthy_fail_threshold")] #[serde(default = "default_upstream_unhealthy_fail_threshold")]
pub upstream_unhealthy_fail_threshold: u32, pub upstream_unhealthy_fail_threshold: u32,
@ -1007,6 +1011,7 @@ impl Default for GeneralConfig {
upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(), upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(),
upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(), upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(),
upstream_connect_budget_ms: default_upstream_connect_budget_ms(), upstream_connect_budget_ms: default_upstream_connect_budget_ms(),
tg_connect: default_connect_timeout(),
upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(), upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(),
upstream_connect_failfast_hard_errors: default_upstream_connect_failfast_hard_errors(), upstream_connect_failfast_hard_errors: default_upstream_connect_failfast_hard_errors(),
stun_iface_mismatch_ignore: false, stun_iface_mismatch_ignore: false,
@ -1335,9 +1340,6 @@ pub struct TimeoutsConfig {
#[serde(default = "default_relay_idle_grace_after_downstream_activity_secs")] #[serde(default = "default_relay_idle_grace_after_downstream_activity_secs")]
pub relay_idle_grace_after_downstream_activity_secs: u64, pub relay_idle_grace_after_downstream_activity_secs: u64,
#[serde(default = "default_connect_timeout")]
pub tg_connect: u64,
#[serde(default = "default_keepalive")] #[serde(default = "default_keepalive")]
pub client_keepalive: u64, pub client_keepalive: u64,
@ -1362,7 +1364,6 @@ impl Default for TimeoutsConfig {
relay_client_idle_hard_secs: default_relay_client_idle_hard_secs(), relay_client_idle_hard_secs: default_relay_client_idle_hard_secs(),
relay_idle_grace_after_downstream_activity_secs: relay_idle_grace_after_downstream_activity_secs:
default_relay_idle_grace_after_downstream_activity_secs(), default_relay_idle_grace_after_downstream_activity_secs(),
tg_connect: default_connect_timeout(),
client_keepalive: default_keepalive(), client_keepalive: default_keepalive(),
client_ack: default_ack_timeout(), client_ack: default_ack_timeout(),
me_one_retry: default_me_one_retry(), me_one_retry: default_me_one_retry(),

View File

@ -302,6 +302,7 @@ async fn run_inner(
config.general.upstream_connect_retry_attempts, config.general.upstream_connect_retry_attempts,
config.general.upstream_connect_retry_backoff_ms, config.general.upstream_connect_retry_backoff_ms,
config.general.upstream_connect_budget_ms, config.general.upstream_connect_budget_ms,
config.general.tg_connect,
config.general.upstream_unhealthy_fail_threshold, config.general.upstream_unhealthy_fail_threshold,
config.general.upstream_connect_failfast_hard_errors, config.general.upstream_connect_failfast_hard_errors,
stats.clone(), stats.clone(),

View File

@ -94,6 +94,7 @@ async fn adversarial_tls_handshake_timeout_during_masking_delay() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -141,6 +142,7 @@ async fn blackhat_proxy_protocol_slowloris_timeout() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -193,6 +195,7 @@ async fn negative_proxy_protocol_enabled_but_client_sends_tls_hello() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -239,6 +242,7 @@ async fn edge_client_stream_exactly_4_bytes_eof() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -282,6 +286,7 @@ async fn edge_client_stream_tls_header_valid_but_body_1_byte_short_eof() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -328,6 +333,7 @@ async fn integration_non_tls_modes_disabled_immediately_masks() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),

View File

@ -47,6 +47,7 @@ async fn invariant_tls_clienthello_truncation_exact_boundary_triggers_masking()
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -177,6 +178,7 @@ async fn invariant_direct_mode_partial_header_eof_is_error_not_bad_connect() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),

View File

@ -40,6 +40,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -36,6 +36,7 @@ fn build_harness(config: ProxyConfig) -> PipelineHarness {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),

View File

@ -20,6 +20,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -20,6 +20,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -34,6 +34,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -20,6 +20,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -20,6 +20,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -47,6 +47,7 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> PipelineHarness {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),

View File

@ -25,6 +25,7 @@ fn make_test_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -48,6 +48,7 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> RedTeamHarness {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -237,6 +238,7 @@ async fn redteam_03_masking_duration_must_be_less_than_1ms_when_backend_down() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
Arc::new(Stats::new()), Arc::new(Stats::new()),
@ -477,6 +479,7 @@ async fn measure_invalid_probe_duration_ms(delay_ms: u64, tls_len: u16, body_sen
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
Arc::new(Stats::new()), Arc::new(Stats::new()),
@ -550,6 +553,7 @@ async fn capture_forwarded_probe_len(tls_len: u16, body_sent: usize) -> usize {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
Arc::new(Stats::new()), Arc::new(Stats::new()),

View File

@ -22,6 +22,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -20,6 +20,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -20,6 +20,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -20,6 +20,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -20,6 +20,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -34,6 +34,7 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -100,6 +100,7 @@ async fn blackhat_proxy_protocol_massive_garbage_rejected_quickly() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -146,6 +147,7 @@ async fn edge_tls_body_immediate_eof_triggers_masking_and_bad_connect() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -195,6 +197,7 @@ async fn security_classic_mode_disabled_masks_valid_length_payload() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),

View File

@ -339,6 +339,7 @@ async fn relay_task_abort_releases_user_gate_and_ip_reservation() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -452,6 +453,7 @@ async fn relay_cutover_releases_user_gate_and_ip_reservation() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -575,6 +577,7 @@ async fn integration_route_cutover_and_quota_overlap_fails_closed_and_releases_s
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -744,6 +747,7 @@ async fn proxy_protocol_header_is_rejected_when_trust_list_is_empty() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -820,6 +824,7 @@ async fn proxy_protocol_header_from_untrusted_peer_range_is_rejected_under_load(
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -979,6 +984,7 @@ async fn short_tls_probe_is_masked_through_client_pipeline() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1066,6 +1072,7 @@ async fn tls12_record_probe_is_masked_through_client_pipeline() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1151,6 +1158,7 @@ async fn handle_client_stream_increments_connects_all_exactly_once() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1243,6 +1251,7 @@ async fn running_client_handler_increments_connects_all_exactly_once() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1332,6 +1341,7 @@ async fn partial_tls_header_stall_triggers_handshake_timeout() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1514,6 +1524,7 @@ async fn valid_tls_path_does_not_fall_back_to_mask_backend() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1622,6 +1633,7 @@ async fn valid_tls_with_invalid_mtproto_falls_back_to_mask_backend() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1728,6 +1740,7 @@ async fn client_handler_tls_bad_mtproto_is_forwarded_to_mask_backend() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1849,6 +1862,7 @@ async fn alpn_mismatch_tls_probe_is_masked_through_client_pipeline() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1941,6 +1955,7 @@ async fn invalid_hmac_tls_probe_is_masked_through_client_pipeline() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -2039,6 +2054,7 @@ async fn burst_invalid_tls_probes_are_masked_verbatim() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -2876,6 +2892,7 @@ async fn relay_connect_error_releases_user_and_ip_before_return() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -3436,6 +3453,7 @@ async fn untrusted_proxy_header_source_is_rejected() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -3505,6 +3523,7 @@ async fn empty_proxy_trusted_cidrs_rejects_proxy_header_by_default() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -3601,6 +3620,7 @@ async fn oversized_tls_record_is_masked_in_generic_stream_pipeline() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -3703,6 +3723,7 @@ async fn oversized_tls_record_is_masked_in_client_handler_pipeline() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -3819,6 +3840,7 @@ async fn tls_record_len_min_minus_1_is_rejected_in_generic_stream_pipeline() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -3921,6 +3943,7 @@ async fn tls_record_len_min_minus_1_is_rejected_in_client_handler_pipeline() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -4026,6 +4049,7 @@ async fn tls_record_len_16384_is_accepted_in_generic_stream_pipeline() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -4126,6 +4150,7 @@ async fn tls_record_len_16384_is_accepted_in_client_handler_pipeline() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),

View File

@ -33,6 +33,7 @@ fn make_test_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -35,6 +35,7 @@ fn make_test_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -36,6 +36,7 @@ fn make_test_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats, stats,

View File

@ -50,6 +50,7 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> PipelineHarness {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),

View File

@ -1302,6 +1302,7 @@ async fn direct_relay_abort_midflight_releases_route_gauge() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1408,6 +1409,7 @@ async fn direct_relay_cutover_midflight_releases_route_gauge() {
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1529,6 +1531,7 @@ async fn direct_relay_cutover_storm_multi_session_keeps_generic_errors_and_relea
1, 1,
1, 1,
1, 1,
10,
1, 1,
false, false,
stats.clone(), stats.clone(),
@ -1761,6 +1764,7 @@ async fn negative_direct_relay_dc_connection_refused_fails_fast() {
1, 1,
100, 100,
5000, 5000,
10,
3, 3,
false, false,
stats.clone(), stats.clone(),
@ -1851,6 +1855,7 @@ async fn adversarial_direct_relay_cutover_integrity() {
1, 1,
100, 100,
5000, 5000,
10,
3, 3,
false, false,
stats.clone(), stats.clone(),

View File

@ -34,8 +34,6 @@ const NUM_DCS: usize = 5;
/// Timeout for individual DC ping attempt /// Timeout for individual DC ping attempt
const DC_PING_TIMEOUT_SECS: u64 = 5; const DC_PING_TIMEOUT_SECS: u64 = 5;
/// Timeout for direct TG DC TCP connect readiness.
const DIRECT_CONNECT_TIMEOUT_SECS: u64 = 10;
/// Interval between upstream health-check cycles. /// Interval between upstream health-check cycles.
const HEALTH_CHECK_INTERVAL_SECS: u64 = 30; const HEALTH_CHECK_INTERVAL_SECS: u64 = 30;
/// Timeout for a single health-check connect attempt. /// Timeout for a single health-check connect attempt.
@ -319,6 +317,8 @@ pub struct UpstreamManager {
connect_retry_attempts: u32, connect_retry_attempts: u32,
connect_retry_backoff: Duration, connect_retry_backoff: Duration,
connect_budget: Duration, connect_budget: Duration,
/// Per-attempt TCP connect timeout to Telegram DC (`[general] tg_connect`, seconds).
tg_connect_timeout_secs: u64,
unhealthy_fail_threshold: u32, unhealthy_fail_threshold: u32,
connect_failfast_hard_errors: bool, connect_failfast_hard_errors: bool,
no_upstreams_warn_epoch_ms: Arc<AtomicU64>, no_upstreams_warn_epoch_ms: Arc<AtomicU64>,
@ -332,6 +332,7 @@ impl UpstreamManager {
connect_retry_attempts: u32, connect_retry_attempts: u32,
connect_retry_backoff_ms: u64, connect_retry_backoff_ms: u64,
connect_budget_ms: u64, connect_budget_ms: u64,
tg_connect_timeout_secs: u64,
unhealthy_fail_threshold: u32, unhealthy_fail_threshold: u32,
connect_failfast_hard_errors: bool, connect_failfast_hard_errors: bool,
stats: Arc<Stats>, stats: Arc<Stats>,
@ -347,6 +348,7 @@ impl UpstreamManager {
connect_retry_attempts: connect_retry_attempts.max(1), connect_retry_attempts: connect_retry_attempts.max(1),
connect_retry_backoff: Duration::from_millis(connect_retry_backoff_ms), connect_retry_backoff: Duration::from_millis(connect_retry_backoff_ms),
connect_budget: Duration::from_millis(connect_budget_ms.max(1)), connect_budget: Duration::from_millis(connect_budget_ms.max(1)),
tg_connect_timeout_secs: tg_connect_timeout_secs.max(1),
unhealthy_fail_threshold: unhealthy_fail_threshold.max(1), unhealthy_fail_threshold: unhealthy_fail_threshold.max(1),
connect_failfast_hard_errors, connect_failfast_hard_errors,
no_upstreams_warn_epoch_ms: Arc::new(AtomicU64::new(0)), no_upstreams_warn_epoch_ms: Arc::new(AtomicU64::new(0)),
@ -798,7 +800,7 @@ impl UpstreamManager {
} }
let remaining_budget = self.connect_budget.saturating_sub(elapsed); let remaining_budget = self.connect_budget.saturating_sub(elapsed);
let attempt_timeout = let attempt_timeout =
Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS).min(remaining_budget); Duration::from_secs(self.tg_connect_timeout_secs).min(remaining_budget);
if attempt_timeout.is_zero() { if attempt_timeout.is_zero() {
last_error = Some(ProxyError::ConnectionTimeout { last_error = Some(ProxyError::ConnectionTimeout {
addr: target.to_string(), addr: target.to_string(),
@ -1901,6 +1903,7 @@ mod tests {
1, 1,
100, 100,
1000, 1000,
10,
1, 1,
false, false,
Arc::new(Stats::new()), Arc::new(Stats::new()),