mirror of https://github.com/telemt/telemt.git
ME Draining Writers threshold + Inherited per-user unique IP limit: merge pull request #426 from telemt/flow
ME Draining Writers threshold + Inherited per-user unique IP limit
This commit is contained in:
commit
4abc0e5134
|
|
@ -90,6 +90,7 @@ pub(super) struct EffectiveMiddleProxyLimits {
|
|||
|
||||
#[derive(Serialize)]
|
||||
pub(super) struct EffectiveUserIpPolicyLimits {
|
||||
pub(super) global_each: usize,
|
||||
pub(super) mode: &'static str,
|
||||
pub(super) window_secs: u64,
|
||||
}
|
||||
|
|
@ -262,6 +263,7 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
|
|||
me2dc_fallback: cfg.general.me2dc_fallback,
|
||||
},
|
||||
user_ip_policy: EffectiveUserIpPolicyLimits {
|
||||
global_each: cfg.access.user_max_unique_ips_global_each,
|
||||
mode: user_max_unique_ips_mode_label(cfg.access.user_max_unique_ips_mode),
|
||||
window_secs: cfg.access.user_max_unique_ips_window_secs,
|
||||
},
|
||||
|
|
|
|||
|
|
@ -386,7 +386,16 @@ pub(super) async fn users_from_config(
|
|||
.get(&username)
|
||||
.map(chrono::DateTime::<chrono::Utc>::to_rfc3339),
|
||||
data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(),
|
||||
max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(),
|
||||
max_unique_ips: cfg
|
||||
.access
|
||||
.user_max_unique_ips
|
||||
.get(&username)
|
||||
.copied()
|
||||
.filter(|limit| *limit > 0)
|
||||
.or(
|
||||
(cfg.access.user_max_unique_ips_global_each > 0)
|
||||
.then_some(cfg.access.user_max_unique_ips_global_each),
|
||||
),
|
||||
current_connections: stats.get_user_curr_connects(&username),
|
||||
active_unique_ips: active_ip_list.len(),
|
||||
active_unique_ips_list: active_ip_list,
|
||||
|
|
|
|||
|
|
@ -584,6 +584,10 @@ pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 {
|
|||
90
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_pool_drain_threshold() -> u64 {
|
||||
128
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_bind_stale_ttl_secs() -> u64 {
|
||||
default_me_pool_drain_ttl_secs()
|
||||
}
|
||||
|
|
@ -635,6 +639,10 @@ pub(crate) fn default_user_max_unique_ips_window_secs() -> u64 {
|
|||
DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS
|
||||
}
|
||||
|
||||
pub(crate) fn default_user_max_unique_ips_global_each() -> usize {
|
||||
0
|
||||
}
|
||||
|
||||
// Custom deserializer helpers
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ pub struct HotFields {
|
|||
pub me_reinit_coalesce_window_ms: u64,
|
||||
pub hardswap: bool,
|
||||
pub me_pool_drain_ttl_secs: u64,
|
||||
pub me_pool_drain_threshold: u64,
|
||||
pub me_pool_min_fresh_ratio: f32,
|
||||
pub me_reinit_drain_timeout_secs: u64,
|
||||
pub me_hardswap_warmup_delay_min_ms: u64,
|
||||
|
|
@ -118,6 +119,7 @@ pub struct HotFields {
|
|||
pub user_expirations: std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
|
||||
pub user_data_quota: std::collections::HashMap<String, u64>,
|
||||
pub user_max_unique_ips: std::collections::HashMap<String, usize>,
|
||||
pub user_max_unique_ips_global_each: usize,
|
||||
pub user_max_unique_ips_mode: crate::config::UserMaxUniqueIpsMode,
|
||||
pub user_max_unique_ips_window_secs: u64,
|
||||
}
|
||||
|
|
@ -135,6 +137,7 @@ impl HotFields {
|
|||
me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms,
|
||||
hardswap: cfg.general.hardswap,
|
||||
me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs,
|
||||
me_pool_drain_threshold: cfg.general.me_pool_drain_threshold,
|
||||
me_pool_min_fresh_ratio: cfg.general.me_pool_min_fresh_ratio,
|
||||
me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs,
|
||||
me_hardswap_warmup_delay_min_ms: cfg.general.me_hardswap_warmup_delay_min_ms,
|
||||
|
|
@ -232,6 +235,7 @@ impl HotFields {
|
|||
user_expirations: cfg.access.user_expirations.clone(),
|
||||
user_data_quota: cfg.access.user_data_quota.clone(),
|
||||
user_max_unique_ips: cfg.access.user_max_unique_ips.clone(),
|
||||
user_max_unique_ips_global_each: cfg.access.user_max_unique_ips_global_each,
|
||||
user_max_unique_ips_mode: cfg.access.user_max_unique_ips_mode,
|
||||
user_max_unique_ips_window_secs: cfg.access.user_max_unique_ips_window_secs,
|
||||
}
|
||||
|
|
@ -450,6 +454,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
|
|||
cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms;
|
||||
cfg.general.hardswap = new.general.hardswap;
|
||||
cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs;
|
||||
cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold;
|
||||
cfg.general.me_pool_min_fresh_ratio = new.general.me_pool_min_fresh_ratio;
|
||||
cfg.general.me_reinit_drain_timeout_secs = new.general.me_reinit_drain_timeout_secs;
|
||||
cfg.general.me_hardswap_warmup_delay_min_ms = new.general.me_hardswap_warmup_delay_min_ms;
|
||||
|
|
@ -532,6 +537,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
|
|||
cfg.access.user_expirations = new.access.user_expirations.clone();
|
||||
cfg.access.user_data_quota = new.access.user_data_quota.clone();
|
||||
cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone();
|
||||
cfg.access.user_max_unique_ips_global_each = new.access.user_max_unique_ips_global_each;
|
||||
cfg.access.user_max_unique_ips_mode = new.access.user_max_unique_ips_mode;
|
||||
cfg.access.user_max_unique_ips_window_secs = new.access.user_max_unique_ips_window_secs;
|
||||
|
||||
|
|
@ -823,6 +829,13 @@ fn log_changes(
|
|||
);
|
||||
}
|
||||
|
||||
if old_hot.me_pool_drain_threshold != new_hot.me_pool_drain_threshold {
|
||||
info!(
|
||||
"config reload: me_pool_drain_threshold: {} → {}",
|
||||
old_hot.me_pool_drain_threshold, new_hot.me_pool_drain_threshold,
|
||||
);
|
||||
}
|
||||
|
||||
if (old_hot.me_pool_min_fresh_ratio - new_hot.me_pool_min_fresh_ratio).abs() > f32::EPSILON {
|
||||
info!(
|
||||
"config reload: me_pool_min_fresh_ratio: {:.3} → {:.3}",
|
||||
|
|
@ -1099,12 +1112,14 @@ fn log_changes(
|
|||
new_hot.user_max_unique_ips.len()
|
||||
);
|
||||
}
|
||||
if old_hot.user_max_unique_ips_mode != new_hot.user_max_unique_ips_mode
|
||||
if old_hot.user_max_unique_ips_global_each != new_hot.user_max_unique_ips_global_each
|
||||
|| old_hot.user_max_unique_ips_mode != new_hot.user_max_unique_ips_mode
|
||||
|| old_hot.user_max_unique_ips_window_secs
|
||||
!= new_hot.user_max_unique_ips_window_secs
|
||||
{
|
||||
info!(
|
||||
"config reload: user_max_unique_ips policy mode={:?} window={}s",
|
||||
"config reload: user_max_unique_ips policy global_each={} mode={:?} window={}s",
|
||||
new_hot.user_max_unique_ips_global_each,
|
||||
new_hot.user_max_unique_ips_mode,
|
||||
new_hot.user_max_unique_ips_window_secs
|
||||
);
|
||||
|
|
|
|||
|
|
@ -794,6 +794,11 @@ pub struct GeneralConfig {
|
|||
#[serde(default = "default_me_pool_drain_ttl_secs")]
|
||||
pub me_pool_drain_ttl_secs: u64,
|
||||
|
||||
/// Maximum allowed number of draining ME writers before oldest ones are force-closed in batches.
|
||||
/// Set to 0 to disable threshold-based draining cleanup and keep timeout-only behavior.
|
||||
#[serde(default = "default_me_pool_drain_threshold")]
|
||||
pub me_pool_drain_threshold: u64,
|
||||
|
||||
/// Policy for new binds on stale draining writers.
|
||||
#[serde(default)]
|
||||
pub me_bind_stale_mode: MeBindStaleMode,
|
||||
|
|
@ -973,6 +978,7 @@ impl Default for GeneralConfig {
|
|||
me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(),
|
||||
proxy_secret_len_max: default_proxy_secret_len_max(),
|
||||
me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(),
|
||||
me_pool_drain_threshold: default_me_pool_drain_threshold(),
|
||||
me_bind_stale_mode: MeBindStaleMode::default(),
|
||||
me_bind_stale_ttl_secs: default_me_bind_stale_ttl_secs(),
|
||||
me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(),
|
||||
|
|
@ -1317,6 +1323,11 @@ pub struct AccessConfig {
|
|||
#[serde(default)]
|
||||
pub user_max_unique_ips: HashMap<String, usize>,
|
||||
|
||||
/// Global per-user unique IP limit applied when a user has no individual override.
|
||||
/// `0` disables the inherited limit.
|
||||
#[serde(default = "default_user_max_unique_ips_global_each")]
|
||||
pub user_max_unique_ips_global_each: usize,
|
||||
|
||||
#[serde(default)]
|
||||
pub user_max_unique_ips_mode: UserMaxUniqueIpsMode,
|
||||
|
||||
|
|
@ -1342,6 +1353,7 @@ impl Default for AccessConfig {
|
|||
user_expirations: HashMap::new(),
|
||||
user_data_quota: HashMap::new(),
|
||||
user_max_unique_ips: HashMap::new(),
|
||||
user_max_unique_ips_global_each: default_user_max_unique_ips_global_each(),
|
||||
user_max_unique_ips_mode: UserMaxUniqueIpsMode::default(),
|
||||
user_max_unique_ips_window_secs: default_user_max_unique_ips_window_secs(),
|
||||
replay_check_len: default_replay_check_len(),
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ pub struct UserIpTracker {
|
|||
active_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, usize>>>>,
|
||||
recent_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, Instant>>>>,
|
||||
max_ips: Arc<RwLock<HashMap<String, usize>>>,
|
||||
default_max_ips: Arc<RwLock<usize>>,
|
||||
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
|
||||
limit_window: Arc<RwLock<Duration>>,
|
||||
last_compact_epoch_secs: Arc<AtomicU64>,
|
||||
|
|
@ -28,6 +29,7 @@ impl UserIpTracker {
|
|||
active_ips: Arc::new(RwLock::new(HashMap::new())),
|
||||
recent_ips: Arc::new(RwLock::new(HashMap::new())),
|
||||
max_ips: Arc::new(RwLock::new(HashMap::new())),
|
||||
default_max_ips: Arc::new(RwLock::new(0)),
|
||||
limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)),
|
||||
limit_window: Arc::new(RwLock::new(Duration::from_secs(30))),
|
||||
last_compact_epoch_secs: Arc::new(AtomicU64::new(0)),
|
||||
|
|
@ -100,7 +102,10 @@ impl UserIpTracker {
|
|||
limits.remove(username);
|
||||
}
|
||||
|
||||
pub async fn load_limits(&self, limits: &HashMap<String, usize>) {
|
||||
pub async fn load_limits(&self, default_limit: usize, limits: &HashMap<String, usize>) {
|
||||
let mut default_max_ips = self.default_max_ips.write().await;
|
||||
*default_max_ips = default_limit;
|
||||
drop(default_max_ips);
|
||||
let mut max_ips = self.max_ips.write().await;
|
||||
max_ips.clone_from(limits);
|
||||
}
|
||||
|
|
@ -114,9 +119,14 @@ impl UserIpTracker {
|
|||
|
||||
pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> {
|
||||
self.maybe_compact_empty_users().await;
|
||||
let default_max_ips = *self.default_max_ips.read().await;
|
||||
let limit = {
|
||||
let max_ips = self.max_ips.read().await;
|
||||
max_ips.get(username).copied()
|
||||
max_ips
|
||||
.get(username)
|
||||
.copied()
|
||||
.filter(|limit| *limit > 0)
|
||||
.or((default_max_ips > 0).then_some(default_max_ips))
|
||||
};
|
||||
let mode = *self.limit_mode.read().await;
|
||||
let window = *self.limit_window.read().await;
|
||||
|
|
@ -255,10 +265,16 @@ impl UserIpTracker {
|
|||
pub async fn get_stats(&self) -> Vec<(String, usize, usize)> {
|
||||
let active_ips = self.active_ips.read().await;
|
||||
let max_ips = self.max_ips.read().await;
|
||||
let default_max_ips = *self.default_max_ips.read().await;
|
||||
|
||||
let mut stats = Vec::new();
|
||||
for (username, user_ips) in active_ips.iter() {
|
||||
let limit = max_ips.get(username).copied().unwrap_or(0);
|
||||
let limit = max_ips
|
||||
.get(username)
|
||||
.copied()
|
||||
.filter(|limit| *limit > 0)
|
||||
.or((default_max_ips > 0).then_some(default_max_ips))
|
||||
.unwrap_or(0);
|
||||
stats.push((username.clone(), user_ips.len(), limit));
|
||||
}
|
||||
|
||||
|
|
@ -293,8 +309,13 @@ impl UserIpTracker {
|
|||
}
|
||||
|
||||
pub async fn get_user_limit(&self, username: &str) -> Option<usize> {
|
||||
let default_max_ips = *self.default_max_ips.read().await;
|
||||
let max_ips = self.max_ips.read().await;
|
||||
max_ips.get(username).copied()
|
||||
max_ips
|
||||
.get(username)
|
||||
.copied()
|
||||
.filter(|limit| *limit > 0)
|
||||
.or((default_max_ips > 0).then_some(default_max_ips))
|
||||
}
|
||||
|
||||
pub async fn format_stats(&self) -> String {
|
||||
|
|
@ -546,7 +567,7 @@ mod tests {
|
|||
config_limits.insert("user1".to_string(), 5);
|
||||
config_limits.insert("user2".to_string(), 3);
|
||||
|
||||
tracker.load_limits(&config_limits).await;
|
||||
tracker.load_limits(0, &config_limits).await;
|
||||
|
||||
assert_eq!(tracker.get_user_limit("user1").await, Some(5));
|
||||
assert_eq!(tracker.get_user_limit("user2").await, Some(3));
|
||||
|
|
@ -560,16 +581,46 @@ mod tests {
|
|||
let mut first = HashMap::new();
|
||||
first.insert("user1".to_string(), 2);
|
||||
first.insert("user2".to_string(), 3);
|
||||
tracker.load_limits(&first).await;
|
||||
tracker.load_limits(0, &first).await;
|
||||
|
||||
let mut second = HashMap::new();
|
||||
second.insert("user2".to_string(), 5);
|
||||
tracker.load_limits(&second).await;
|
||||
tracker.load_limits(0, &second).await;
|
||||
|
||||
assert_eq!(tracker.get_user_limit("user1").await, None);
|
||||
assert_eq!(tracker.get_user_limit("user2").await, Some(5));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_global_each_limit_applies_without_user_override() {
|
||||
let tracker = UserIpTracker::new();
|
||||
tracker.load_limits(2, &HashMap::new()).await;
|
||||
|
||||
let ip1 = test_ipv4(172, 16, 0, 1);
|
||||
let ip2 = test_ipv4(172, 16, 0, 2);
|
||||
let ip3 = test_ipv4(172, 16, 0, 3);
|
||||
|
||||
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
|
||||
assert!(tracker.check_and_add("test_user", ip2).await.is_ok());
|
||||
assert!(tracker.check_and_add("test_user", ip3).await.is_err());
|
||||
assert_eq!(tracker.get_user_limit("test_user").await, Some(2));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_user_override_wins_over_global_each_limit() {
|
||||
let tracker = UserIpTracker::new();
|
||||
let mut limits = HashMap::new();
|
||||
limits.insert("test_user".to_string(), 1);
|
||||
tracker.load_limits(3, &limits).await;
|
||||
|
||||
let ip1 = test_ipv4(172, 17, 0, 1);
|
||||
let ip2 = test_ipv4(172, 17, 0, 2);
|
||||
|
||||
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
|
||||
assert!(tracker.check_and_add("test_user", ip2).await.is_err());
|
||||
assert_eq!(tracker.get_user_limit("test_user").await, Some(1));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_time_window_mode_blocks_recent_ip_churn() {
|
||||
let tracker = UserIpTracker::new();
|
||||
|
|
|
|||
|
|
@ -237,6 +237,7 @@ pub(crate) async fn initialize_me_pool(
|
|||
config.general.me_adaptive_floor_max_warm_writers_global,
|
||||
config.general.hardswap,
|
||||
config.general.me_pool_drain_ttl_secs,
|
||||
config.general.me_pool_drain_threshold,
|
||||
config.general.effective_me_pool_force_close_secs(),
|
||||
config.general.me_pool_min_fresh_ratio,
|
||||
config.general.me_hardswap_warmup_delay_min_ms,
|
||||
|
|
|
|||
|
|
@ -168,17 +168,24 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||
stats.clone(),
|
||||
));
|
||||
let ip_tracker = Arc::new(UserIpTracker::new());
|
||||
ip_tracker.load_limits(&config.access.user_max_unique_ips).await;
|
||||
ip_tracker
|
||||
.load_limits(
|
||||
config.access.user_max_unique_ips_global_each,
|
||||
&config.access.user_max_unique_ips,
|
||||
)
|
||||
.await;
|
||||
ip_tracker
|
||||
.set_limit_policy(
|
||||
config.access.user_max_unique_ips_mode,
|
||||
config.access.user_max_unique_ips_window_secs,
|
||||
)
|
||||
.await;
|
||||
if !config.access.user_max_unique_ips.is_empty() {
|
||||
if config.access.user_max_unique_ips_global_each > 0 || !config.access.user_max_unique_ips.is_empty()
|
||||
{
|
||||
info!(
|
||||
"IP limits configured for {} users",
|
||||
config.access.user_max_unique_ips.len()
|
||||
global_each_limit = config.access.user_max_unique_ips_global_each,
|
||||
explicit_user_limits = config.access.user_max_unique_ips.len(),
|
||||
"User unique IP limits configured"
|
||||
);
|
||||
}
|
||||
if !config.network.dns_overrides.is_empty() {
|
||||
|
|
|
|||
|
|
@ -131,6 +131,10 @@ pub(crate) async fn spawn_runtime_tasks(
|
|||
let mut config_rx_ip_limits = config_rx.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut prev_limits = config_rx_ip_limits.borrow().access.user_max_unique_ips.clone();
|
||||
let mut prev_global_each = config_rx_ip_limits
|
||||
.borrow()
|
||||
.access
|
||||
.user_max_unique_ips_global_each;
|
||||
let mut prev_mode = config_rx_ip_limits.borrow().access.user_max_unique_ips_mode;
|
||||
let mut prev_window = config_rx_ip_limits
|
||||
.borrow()
|
||||
|
|
@ -143,9 +147,17 @@ pub(crate) async fn spawn_runtime_tasks(
|
|||
}
|
||||
let cfg = config_rx_ip_limits.borrow_and_update().clone();
|
||||
|
||||
if prev_limits != cfg.access.user_max_unique_ips {
|
||||
ip_tracker_policy.load_limits(&cfg.access.user_max_unique_ips).await;
|
||||
if prev_limits != cfg.access.user_max_unique_ips
|
||||
|| prev_global_each != cfg.access.user_max_unique_ips_global_each
|
||||
{
|
||||
ip_tracker_policy
|
||||
.load_limits(
|
||||
cfg.access.user_max_unique_ips_global_each,
|
||||
&cfg.access.user_max_unique_ips,
|
||||
)
|
||||
.await;
|
||||
prev_limits = cfg.access.user_max_unique_ips.clone();
|
||||
prev_global_each = cfg.access.user_max_unique_ips_global_each;
|
||||
}
|
||||
|
||||
if prev_mode != cfg.access.user_max_unique_ips_mode
|
||||
|
|
|
|||
|
|
@ -1774,14 +1774,24 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||
"# HELP telemt_user_unique_ips_recent_window Per-user unique IPs seen in configured observation window"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_recent_window gauge");
|
||||
let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)");
|
||||
let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Effective per-user unique IP limit (0 means unlimited)");
|
||||
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge");
|
||||
let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)");
|
||||
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge");
|
||||
|
||||
for user in unique_users {
|
||||
let current = ip_counts.get(&user).copied().unwrap_or(0);
|
||||
let limit = config.access.user_max_unique_ips.get(&user).copied().unwrap_or(0);
|
||||
let limit = config
|
||||
.access
|
||||
.user_max_unique_ips
|
||||
.get(&user)
|
||||
.copied()
|
||||
.filter(|limit| *limit > 0)
|
||||
.or(
|
||||
(config.access.user_max_unique_ips_global_each > 0)
|
||||
.then_some(config.access.user_max_unique_ips_global_each),
|
||||
)
|
||||
.unwrap_or(0);
|
||||
let utilization = if limit > 0 {
|
||||
current as f64 / limit as f64
|
||||
} else {
|
||||
|
|
@ -1904,6 +1914,25 @@ mod tests {
|
|||
assert!(output.contains("telemt_user_unique_ips_recent_window{user="));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_render_uses_global_each_unique_ip_limit() {
|
||||
let stats = Stats::new();
|
||||
stats.increment_user_connects("alice");
|
||||
stats.increment_user_curr_connects("alice");
|
||||
let tracker = UserIpTracker::new();
|
||||
tracker
|
||||
.check_and_add("alice", "203.0.113.10".parse().unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut config = ProxyConfig::default();
|
||||
config.access.user_max_unique_ips_global_each = 2;
|
||||
|
||||
let output = render_metrics(&stats, &config, &tracker).await;
|
||||
|
||||
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 2"));
|
||||
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.500000"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_render_has_type_annotations() {
|
||||
let stats = Stats::new();
|
||||
|
|
|
|||
|
|
@ -298,6 +298,7 @@ async fn run_update_cycle(
|
|||
pool.update_runtime_reinit_policy(
|
||||
cfg.general.hardswap,
|
||||
cfg.general.me_pool_drain_ttl_secs,
|
||||
cfg.general.me_pool_drain_threshold,
|
||||
cfg.general.effective_me_pool_force_close_secs(),
|
||||
cfg.general.me_pool_min_fresh_ratio,
|
||||
cfg.general.me_hardswap_warmup_delay_min_ms,
|
||||
|
|
@ -524,6 +525,7 @@ pub async fn me_config_updater(
|
|||
pool.update_runtime_reinit_policy(
|
||||
cfg.general.hardswap,
|
||||
cfg.general.me_pool_drain_ttl_secs,
|
||||
cfg.general.me_pool_drain_threshold,
|
||||
cfg.general.effective_me_pool_force_close_secs(),
|
||||
cfg.general.me_pool_min_fresh_ratio,
|
||||
cfg.general.me_hardswap_warmup_delay_min_ms,
|
||||
|
|
|
|||
|
|
@ -118,7 +118,11 @@ async fn reap_draining_writers(
|
|||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
let now = Instant::now();
|
||||
let drain_ttl_secs = pool.me_pool_drain_ttl_secs.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let drain_threshold = pool
|
||||
.me_pool_drain_threshold
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let writers = pool.writers.read().await.clone();
|
||||
let mut draining_writers = Vec::new();
|
||||
for writer in writers {
|
||||
if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
continue;
|
||||
|
|
@ -128,6 +132,36 @@ async fn reap_draining_writers(
|
|||
pool.remove_writer_and_close_clients(writer.id).await;
|
||||
continue;
|
||||
}
|
||||
draining_writers.push(writer);
|
||||
}
|
||||
|
||||
if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize {
|
||||
draining_writers.sort_by(|left, right| {
|
||||
let left_started = left
|
||||
.draining_started_at_epoch_secs
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let right_started = right
|
||||
.draining_started_at_epoch_secs
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
left_started
|
||||
.cmp(&right_started)
|
||||
.then_with(|| left.created_at.cmp(&right.created_at))
|
||||
.then_with(|| left.id.cmp(&right.id))
|
||||
});
|
||||
let overflow = draining_writers.len().saturating_sub(drain_threshold as usize);
|
||||
warn!(
|
||||
draining_writers = draining_writers.len(),
|
||||
me_pool_drain_threshold = drain_threshold,
|
||||
removing_writers = overflow,
|
||||
"ME draining writer threshold exceeded, force-closing oldest draining writers"
|
||||
);
|
||||
for writer in draining_writers.drain(..overflow) {
|
||||
pool.stats.increment_pool_force_close_total();
|
||||
pool.remove_writer_and_close_clients(writer.id).await;
|
||||
}
|
||||
}
|
||||
|
||||
for writer in draining_writers {
|
||||
let drain_started_at_epoch_secs = writer
|
||||
.draining_started_at_epoch_secs
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
|
|
@ -1270,3 +1304,190 @@ async fn maybe_rotate_single_endpoint_shadow(
|
|||
"Single-endpoint shadow writer rotated"
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use super::reap_draining_writers;
|
||||
use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode};
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::network::probe::NetworkDecision;
|
||||
use crate::stats::Stats;
|
||||
use crate::transport::middle_proxy::codec::WriterCommand;
|
||||
use crate::transport::middle_proxy::pool::{MePool, MeWriter, WriterContour};
|
||||
use crate::transport::middle_proxy::registry::ConnMeta;
|
||||
|
||||
async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
|
||||
let general = GeneralConfig {
|
||||
me_pool_drain_threshold,
|
||||
..GeneralConfig::default()
|
||||
};
|
||||
MePool::new(
|
||||
None,
|
||||
vec![1u8; 32],
|
||||
None,
|
||||
false,
|
||||
None,
|
||||
Vec::new(),
|
||||
1,
|
||||
None,
|
||||
12,
|
||||
1200,
|
||||
HashMap::new(),
|
||||
HashMap::new(),
|
||||
None,
|
||||
NetworkDecision::default(),
|
||||
None,
|
||||
Arc::new(SecureRandom::new()),
|
||||
Arc::new(Stats::default()),
|
||||
general.me_keepalive_enabled,
|
||||
general.me_keepalive_interval_secs,
|
||||
general.me_keepalive_jitter_secs,
|
||||
general.me_keepalive_payload_random,
|
||||
general.rpc_proxy_req_every,
|
||||
general.me_warmup_stagger_enabled,
|
||||
general.me_warmup_step_delay_ms,
|
||||
general.me_warmup_step_jitter_ms,
|
||||
general.me_reconnect_max_concurrent_per_dc,
|
||||
general.me_reconnect_backoff_base_ms,
|
||||
general.me_reconnect_backoff_cap_ms,
|
||||
general.me_reconnect_fast_retry_count,
|
||||
general.me_single_endpoint_shadow_writers,
|
||||
general.me_single_endpoint_outage_mode_enabled,
|
||||
general.me_single_endpoint_outage_disable_quarantine,
|
||||
general.me_single_endpoint_outage_backoff_min_ms,
|
||||
general.me_single_endpoint_outage_backoff_max_ms,
|
||||
general.me_single_endpoint_shadow_rotate_every_secs,
|
||||
general.me_floor_mode,
|
||||
general.me_adaptive_floor_idle_secs,
|
||||
general.me_adaptive_floor_min_writers_single_endpoint,
|
||||
general.me_adaptive_floor_min_writers_multi_endpoint,
|
||||
general.me_adaptive_floor_recover_grace_secs,
|
||||
general.me_adaptive_floor_writers_per_core_total,
|
||||
general.me_adaptive_floor_cpu_cores_override,
|
||||
general.me_adaptive_floor_max_extra_writers_single_per_core,
|
||||
general.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||
general.me_adaptive_floor_max_active_writers_per_core,
|
||||
general.me_adaptive_floor_max_warm_writers_per_core,
|
||||
general.me_adaptive_floor_max_active_writers_global,
|
||||
general.me_adaptive_floor_max_warm_writers_global,
|
||||
general.hardswap,
|
||||
general.me_pool_drain_ttl_secs,
|
||||
general.me_pool_drain_threshold,
|
||||
general.effective_me_pool_force_close_secs(),
|
||||
general.me_pool_min_fresh_ratio,
|
||||
general.me_hardswap_warmup_delay_min_ms,
|
||||
general.me_hardswap_warmup_delay_max_ms,
|
||||
general.me_hardswap_warmup_extra_passes,
|
||||
general.me_hardswap_warmup_pass_backoff_base_ms,
|
||||
general.me_bind_stale_mode,
|
||||
general.me_bind_stale_ttl_secs,
|
||||
general.me_secret_atomic_snapshot,
|
||||
general.me_deterministic_writer_sort,
|
||||
MeWriterPickMode::default(),
|
||||
general.me_writer_pick_sample_size,
|
||||
MeSocksKdfPolicy::default(),
|
||||
general.me_writer_cmd_channel_capacity,
|
||||
general.me_route_channel_capacity,
|
||||
general.me_route_backpressure_base_timeout_ms,
|
||||
general.me_route_backpressure_high_timeout_ms,
|
||||
general.me_route_backpressure_high_watermark_pct,
|
||||
general.me_reader_route_data_wait_ms,
|
||||
general.me_health_interval_ms_unhealthy,
|
||||
general.me_health_interval_ms_healthy,
|
||||
general.me_warn_rate_limit_ms,
|
||||
MeRouteNoWriterMode::default(),
|
||||
general.me_route_no_writer_wait_ms,
|
||||
general.me_route_inline_recovery_attempts,
|
||||
general.me_route_inline_recovery_wait_ms,
|
||||
)
|
||||
}
|
||||
|
||||
async fn insert_draining_writer(
|
||||
pool: &Arc<MePool>,
|
||||
writer_id: u64,
|
||||
drain_started_at_epoch_secs: u64,
|
||||
) -> u64 {
|
||||
let (conn_id, _rx) = pool.registry.register().await;
|
||||
let (tx, _writer_rx) = mpsc::channel::<WriterCommand>(8);
|
||||
let writer = MeWriter {
|
||||
id: writer_id,
|
||||
addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000 + writer_id as u16),
|
||||
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
|
||||
writer_dc: 2,
|
||||
generation: 1,
|
||||
contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())),
|
||||
created_at: Instant::now() - Duration::from_secs(writer_id),
|
||||
tx: tx.clone(),
|
||||
cancel: CancellationToken::new(),
|
||||
degraded: Arc::new(AtomicBool::new(false)),
|
||||
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
|
||||
draining: Arc::new(AtomicBool::new(true)),
|
||||
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(drain_started_at_epoch_secs)),
|
||||
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
|
||||
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
|
||||
};
|
||||
pool.writers.write().await.push(writer);
|
||||
pool.registry.register_writer(writer_id, tx).await;
|
||||
pool.conn_count.fetch_add(1, Ordering::Relaxed);
|
||||
assert!(
|
||||
pool.registry
|
||||
.bind_writer(
|
||||
conn_id,
|
||||
writer_id,
|
||||
ConnMeta {
|
||||
target_dc: 2,
|
||||
client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6000),
|
||||
our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443),
|
||||
proto_flags: 0,
|
||||
},
|
||||
)
|
||||
.await
|
||||
);
|
||||
conn_id
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reap_draining_writers_force_closes_oldest_over_threshold() {
|
||||
let pool = make_pool(2).await;
|
||||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
let conn_a = insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(30)).await;
|
||||
let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await;
|
||||
let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
|
||||
let writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
|
||||
assert_eq!(writer_ids, vec![20, 30]);
|
||||
assert!(pool.registry.get_writer(conn_a).await.is_none());
|
||||
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);
|
||||
assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reap_draining_writers_keeps_timeout_only_behavior_when_threshold_disabled() {
|
||||
let pool = make_pool(0).await;
|
||||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
let conn_a = insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(30)).await;
|
||||
let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await;
|
||||
let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
|
||||
let writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
|
||||
assert_eq!(writer_ids, vec![10, 20, 30]);
|
||||
assert_eq!(pool.registry.get_writer(conn_a).await.unwrap().writer_id, 10);
|
||||
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);
|
||||
assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -171,6 +171,7 @@ pub struct MePool {
|
|||
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
|
||||
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
|
||||
pub(super) me_pool_drain_ttl_secs: AtomicU64,
|
||||
pub(super) me_pool_drain_threshold: AtomicU64,
|
||||
pub(super) me_pool_force_close_secs: AtomicU64,
|
||||
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
|
||||
pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64,
|
||||
|
|
@ -271,6 +272,7 @@ impl MePool {
|
|||
me_adaptive_floor_max_warm_writers_global: u32,
|
||||
hardswap: bool,
|
||||
me_pool_drain_ttl_secs: u64,
|
||||
me_pool_drain_threshold: u64,
|
||||
me_pool_force_close_secs: u64,
|
||||
me_pool_min_fresh_ratio: f32,
|
||||
me_hardswap_warmup_delay_min_ms: u64,
|
||||
|
|
@ -446,6 +448,7 @@ impl MePool {
|
|||
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
|
||||
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
|
||||
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
|
||||
me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold),
|
||||
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
|
||||
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
|
||||
me_pool_min_fresh_ratio,
|
||||
|
|
@ -492,6 +495,7 @@ impl MePool {
|
|||
&self,
|
||||
hardswap: bool,
|
||||
drain_ttl_secs: u64,
|
||||
pool_drain_threshold: u64,
|
||||
force_close_secs: u64,
|
||||
min_fresh_ratio: f32,
|
||||
hardswap_warmup_delay_min_ms: u64,
|
||||
|
|
@ -530,6 +534,8 @@ impl MePool {
|
|||
self.hardswap.store(hardswap, Ordering::Relaxed);
|
||||
self.me_pool_drain_ttl_secs
|
||||
.store(drain_ttl_secs, Ordering::Relaxed);
|
||||
self.me_pool_drain_threshold
|
||||
.store(pool_drain_threshold, Ordering::Relaxed);
|
||||
self.me_pool_force_close_secs
|
||||
.store(force_close_secs, Ordering::Relaxed);
|
||||
self.me_pool_min_fresh_ratio_permille
|
||||
|
|
|
|||
Loading…
Reference in New Issue