This commit is contained in:
Alexey
2026-04-05 17:23:40 +03:00
parent 1f54e4a203
commit 5f5582865e
34 changed files with 657 additions and 199 deletions

View File

@@ -1,10 +1,10 @@
use crate::proxy::client::handle_client_stream_with_shared;
use crate::proxy::handshake::{
auth_probe_fail_streak_for_testing_in_shared, auth_probe_is_throttled_for_testing_in_shared,
auth_probe_record_failure_for_testing, clear_auth_probe_state_for_testing_in_shared,
clear_unknown_sni_warn_state_for_testing_in_shared, clear_warned_secrets_for_testing_in_shared,
should_emit_unknown_sni_warn_for_testing_in_shared, warned_secrets_for_testing_in_shared,
};
use crate::proxy::client::handle_client_stream_with_shared;
use crate::proxy::middle_relay::{
clear_desync_dedup_for_testing_in_shared, clear_relay_idle_candidate_for_testing,
clear_relay_idle_pressure_state_for_testing_in_shared, mark_relay_idle_candidate_for_testing,
@@ -81,7 +81,10 @@ fn new_client_harness() -> ClientHarness {
}
}
async fn drive_invalid_mtproto_handshake(shared: Arc<ProxySharedState>, peer: std::net::SocketAddr) {
async fn drive_invalid_mtproto_handshake(
shared: Arc<ProxySharedState>,
peer: std::net::SocketAddr,
) {
let harness = new_client_harness();
let (server_side, mut client_side) = duplex(4096);
let invalid = [0u8; 64];
@@ -108,7 +111,10 @@ async fn drive_invalid_mtproto_handshake(shared: Arc<ProxySharedState>, peer: st
.write_all(&invalid)
.await
.expect("failed to write invalid handshake");
client_side.shutdown().await.expect("failed to shutdown client");
client_side
.shutdown()
.await
.expect("failed to shutdown client");
let _ = tokio::time::timeout(Duration::from_secs(3), task)
.await
.expect("client task timed out")
@@ -128,7 +134,10 @@ fn proxy_shared_state_two_instances_do_not_share_auth_probe_state() {
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip),
Some(1)
);
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), None);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip),
None
);
}
#[test]
@@ -139,8 +148,18 @@ fn proxy_shared_state_two_instances_do_not_share_desync_dedup() {
let now = Instant::now();
let key = 0xA5A5_u64;
assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now));
assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now));
assert!(should_emit_full_desync_for_testing(
a.as_ref(),
key,
false,
now
));
assert!(should_emit_full_desync_for_testing(
b.as_ref(),
key,
false,
now
));
}
#[test]
@@ -150,7 +169,10 @@ fn proxy_shared_state_two_instances_do_not_share_idle_registry() {
clear_relay_idle_pressure_state_for_testing_in_shared(a.as_ref());
assert!(mark_relay_idle_candidate_for_testing(a.as_ref(), 111));
assert_eq!(oldest_relay_idle_candidate_for_testing(a.as_ref()), Some(111));
assert_eq!(
oldest_relay_idle_candidate_for_testing(a.as_ref()),
Some(111)
);
assert_eq!(oldest_relay_idle_candidate_for_testing(b.as_ref()), None);
}
@@ -168,7 +190,10 @@ fn proxy_shared_state_reset_in_one_instance_does_not_affect_another() {
auth_probe_record_failure_for_testing(b.as_ref(), ip_b, now);
clear_auth_probe_state_for_testing_in_shared(a.as_ref());
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a), None);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a),
None
);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b),
Some(1)
@@ -191,8 +216,14 @@ fn proxy_shared_state_parallel_auth_probe_updates_stay_per_instance() {
auth_probe_record_failure_for_testing(b.as_ref(), ip, now + Duration::from_millis(1));
}
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), Some(5));
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), Some(3));
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip),
Some(5)
);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip),
Some(3)
);
}
#[tokio::test]
@@ -317,8 +348,14 @@ fn proxy_shared_state_auth_saturation_does_not_bleed_across_instances() {
auth_probe_record_failure_for_testing(a.as_ref(), ip, future_now);
}
assert!(auth_probe_is_throttled_for_testing_in_shared(a.as_ref(), ip));
assert!(!auth_probe_is_throttled_for_testing_in_shared(b.as_ref(), ip));
assert!(auth_probe_is_throttled_for_testing_in_shared(
a.as_ref(),
ip
));
assert!(!auth_probe_is_throttled_for_testing_in_shared(
b.as_ref(),
ip
));
}
#[test]
@@ -348,7 +385,10 @@ fn proxy_shared_state_poison_clear_in_one_instance_does_not_affect_other_instanc
clear_auth_probe_state_for_testing_in_shared(a.as_ref());
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a), None);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a),
None
);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b),
Some(1),
@@ -463,7 +503,10 @@ fn proxy_shared_state_warned_secret_clear_in_one_instance_does_not_clear_other()
clear_warned_secrets_for_testing_in_shared(a.as_ref());
clear_warned_secrets_for_testing_in_shared(b.as_ref());
let key = ("clear-isolation-user".to_string(), "invalid_length".to_string());
let key = (
"clear-isolation-user".to_string(),
"invalid_length".to_string(),
);
{
let warned_a = warned_secrets_for_testing_in_shared(a.as_ref());
let mut guard_a = warned_a
@@ -508,14 +551,24 @@ fn proxy_shared_state_desync_duplicate_suppression_is_instance_scoped() {
let now = Instant::now();
let key = 0xBEEF_0000_0000_0001u64;
assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now));
assert!(should_emit_full_desync_for_testing(
a.as_ref(),
key,
false,
now
));
assert!(!should_emit_full_desync_for_testing(
a.as_ref(),
key,
false,
now + Duration::from_millis(1)
));
assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now));
assert!(should_emit_full_desync_for_testing(
b.as_ref(),
key,
false,
now
));
}
#[test]
@@ -527,8 +580,18 @@ fn proxy_shared_state_desync_clear_in_one_instance_does_not_clear_other() {
let now = Instant::now();
let key = 0xCAFE_0000_0000_0001u64;
assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now));
assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now));
assert!(should_emit_full_desync_for_testing(
a.as_ref(),
key,
false,
now
));
assert!(should_emit_full_desync_for_testing(
b.as_ref(),
key,
false,
now
));
clear_desync_dedup_for_testing_in_shared(a.as_ref());
@@ -558,7 +621,10 @@ fn proxy_shared_state_idle_candidate_clear_in_one_instance_does_not_affect_other
clear_relay_idle_candidate_for_testing(a.as_ref(), 1001);
assert_eq!(oldest_relay_idle_candidate_for_testing(a.as_ref()), None);
assert_eq!(oldest_relay_idle_candidate_for_testing(b.as_ref()), Some(2002));
assert_eq!(
oldest_relay_idle_candidate_for_testing(b.as_ref()),
Some(2002)
);
}
#[test]