DC-Indexes +/- Fixes: merge pull request #341 from telemt/flow-dc-index

DC-Indexes +/- Fixes
This commit is contained in:
Alexey 2026-03-06 20:07:24 +03:00 committed by GitHub
commit fcdd8a9796
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 190 additions and 255 deletions

View File

@ -16,6 +16,10 @@ API runtime is configured in `[server.api]`.
| `request_body_limit_bytes` | `usize` | `65536` | Maximum request body size. Must be `> 0`. | | `request_body_limit_bytes` | `usize` | `65536` | Maximum request body size. Must be `> 0`. |
| `minimal_runtime_enabled` | `bool` | `false` | Enables runtime snapshot endpoints requiring ME pool read-lock aggregation. | | `minimal_runtime_enabled` | `bool` | `false` | Enables runtime snapshot endpoints requiring ME pool read-lock aggregation. |
| `minimal_runtime_cache_ttl_ms` | `u64` | `1000` | Cache TTL for minimal snapshots. `0` disables cache; valid range is `[0, 60000]`. | | `minimal_runtime_cache_ttl_ms` | `u64` | `1000` | Cache TTL for minimal snapshots. `0` disables cache; valid range is `[0, 60000]`. |
| `runtime_edge_enabled` | `bool` | `false` | Enables runtime edge endpoints with cached aggregation payloads. |
| `runtime_edge_cache_ttl_ms` | `u64` | `1000` | Cache TTL for runtime edge summary payloads. `0` disables cache. |
| `runtime_edge_top_n` | `usize` | `10` | Top-N rows for runtime edge leaderboard payloads. |
| `runtime_edge_events_capacity` | `usize` | `256` | Ring-buffer size for `/v1/runtime/events/recent`. |
| `read_only` | `bool` | `false` | Disables mutating endpoints. | | `read_only` | `bool` | `false` | Disables mutating endpoints. |
`server.admin_api` is accepted as an alias for backward compatibility. `server.admin_api` is accepted as an alias for backward compatibility.
@ -24,6 +28,9 @@ Runtime validation for API config:
- `server.api.listen` must be a valid `IP:PORT`. - `server.api.listen` must be a valid `IP:PORT`.
- `server.api.request_body_limit_bytes` must be `> 0`. - `server.api.request_body_limit_bytes` must be `> 0`.
- `server.api.minimal_runtime_cache_ttl_ms` must be within `[0, 60000]`. - `server.api.minimal_runtime_cache_ttl_ms` must be within `[0, 60000]`.
- `server.api.runtime_edge_cache_ttl_ms` must be within `[0, 60000]`.
- `server.api.runtime_edge_top_n` must be within `[1, 1000]`.
- `server.api.runtime_edge_events_capacity` must be within `[16, 4096]`.
## Protocol Contract ## Protocol Contract
@ -80,12 +87,19 @@ Notes:
| `GET` | `/v1/runtime/gates` | none | `200` | `RuntimeGatesData` | | `GET` | `/v1/runtime/gates` | none | `200` | `RuntimeGatesData` |
| `GET` | `/v1/limits/effective` | none | `200` | `EffectiveLimitsData` | | `GET` | `/v1/limits/effective` | none | `200` | `EffectiveLimitsData` |
| `GET` | `/v1/security/posture` | none | `200` | `SecurityPostureData` | | `GET` | `/v1/security/posture` | none | `200` | `SecurityPostureData` |
| `GET` | `/v1/security/whitelist` | none | `200` | `SecurityWhitelistData` |
| `GET` | `/v1/stats/summary` | none | `200` | `SummaryData` | | `GET` | `/v1/stats/summary` | none | `200` | `SummaryData` |
| `GET` | `/v1/stats/zero/all` | none | `200` | `ZeroAllData` | | `GET` | `/v1/stats/zero/all` | none | `200` | `ZeroAllData` |
| `GET` | `/v1/stats/upstreams` | none | `200` | `UpstreamsData` | | `GET` | `/v1/stats/upstreams` | none | `200` | `UpstreamsData` |
| `GET` | `/v1/stats/minimal/all` | none | `200` | `MinimalAllData` | | `GET` | `/v1/stats/minimal/all` | none | `200` | `MinimalAllData` |
| `GET` | `/v1/stats/me-writers` | none | `200` | `MeWritersData` | | `GET` | `/v1/stats/me-writers` | none | `200` | `MeWritersData` |
| `GET` | `/v1/stats/dcs` | none | `200` | `DcStatusData` | | `GET` | `/v1/stats/dcs` | none | `200` | `DcStatusData` |
| `GET` | `/v1/runtime/me_pool_state` | none | `200` | `RuntimeMePoolStateData` |
| `GET` | `/v1/runtime/me_quality` | none | `200` | `RuntimeMeQualityData` |
| `GET` | `/v1/runtime/upstream_quality` | none | `200` | `RuntimeUpstreamQualityData` |
| `GET` | `/v1/runtime/nat_stun` | none | `200` | `RuntimeNatStunData` |
| `GET` | `/v1/runtime/connections/summary` | none | `200` | `RuntimeEdgeConnectionsSummaryData` |
| `GET` | `/v1/runtime/events/recent` | none | `200` | `RuntimeEdgeEventsData` |
| `GET` | `/v1/stats/users` | none | `200` | `UserInfo[]` | | `GET` | `/v1/stats/users` | none | `200` | `UserInfo[]` |
| `GET` | `/v1/users` | none | `200` | `UserInfo[]` | | `GET` | `/v1/users` | none | `200` | `UserInfo[]` |
| `POST` | `/v1/users` | `CreateUserRequest` | `201` | `CreateUserResponse` | | `POST` | `/v1/users` | `CreateUserRequest` | `201` | `CreateUserResponse` |
@ -268,6 +282,25 @@ Note: the request contract is defined, but the corresponding route currently ret
| `telemetry_user_enabled` | `bool` | Per-user telemetry toggle. | | `telemetry_user_enabled` | `bool` | Per-user telemetry toggle. |
| `telemetry_me_level` | `string` | ME telemetry level (`silent`, `normal`, `debug`). | | `telemetry_me_level` | `string` | ME telemetry level (`silent`, `normal`, `debug`). |
### `SecurityWhitelistData`
| Field | Type | Description |
| --- | --- | --- |
| `generated_at_epoch_secs` | `u64` | Snapshot generation timestamp. |
| `enabled` | `bool` | `true` when whitelist has at least one CIDR entry. |
| `entries_total` | `usize` | Number of whitelist CIDR entries. |
| `entries` | `string[]` | Whitelist CIDR entries as strings. |
### Runtime Min Endpoints
- `/v1/runtime/me_pool_state`: generations, hardswap state, writer contour/health counts, refill inflight snapshot.
- `/v1/runtime/me_quality`: ME error/drift/reconnect counters and per-DC RTT coverage snapshot.
- `/v1/runtime/upstream_quality`: upstream runtime policy, connect counters, health summary and per-upstream DC latency/IP preference.
- `/v1/runtime/nat_stun`: NAT/STUN runtime flags, server lists, reflection cache state and backoff remaining.
### Runtime Edge Endpoints
- `/v1/runtime/connections/summary`: cached connection totals (`total/me/direct`), active users and top-N users by connections/traffic.
- `/v1/runtime/events/recent?limit=N`: bounded control-plane ring-buffer events (`limit` clamped to `[1, 1000]`).
- If `server.api.runtime_edge_enabled=false`, runtime edge endpoints return `enabled=false` with `reason=feature_disabled`.
### `ZeroAllData` ### `ZeroAllData`
| Field | Type | Description | | Field | Type | Description |
| --- | --- | --- | | --- | --- | --- |

View File

@ -942,22 +942,21 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut grouped: BTreeMap<i32, Vec<MePingSample>> = BTreeMap::new(); let mut grouped: BTreeMap<i32, Vec<MePingSample>> = BTreeMap::new();
for report in me_results { for report in me_results {
for s in report.samples { for s in report.samples {
let key = s.dc.abs(); grouped.entry(s.dc).or_default().push(s);
grouped.entry(key).or_default().push(s);
} }
} }
let family_order = if prefer_ipv6 { let family_order = if prefer_ipv6 {
vec![(MePingFamily::V6, true), (MePingFamily::V6, false), (MePingFamily::V4, true), (MePingFamily::V4, false)] vec![MePingFamily::V6, MePingFamily::V4]
} else { } else {
vec![(MePingFamily::V4, true), (MePingFamily::V4, false), (MePingFamily::V6, true), (MePingFamily::V6, false)] vec![MePingFamily::V4, MePingFamily::V6]
}; };
for (dc_abs, samples) in grouped { for (dc, samples) in grouped {
for (family, is_pos) in &family_order { for family in &family_order {
let fam_samples: Vec<&MePingSample> = samples let fam_samples: Vec<&MePingSample> = samples
.iter() .iter()
.filter(|s| matches!(s.family, f if &f == family) && (s.dc >= 0) == *is_pos) .filter(|s| matches!(s.family, f if &f == family))
.collect(); .collect();
if fam_samples.is_empty() { if fam_samples.is_empty() {
continue; continue;
@ -967,7 +966,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
MePingFamily::V4 => "IPv4", MePingFamily::V4 => "IPv4",
MePingFamily::V6 => "IPv6", MePingFamily::V6 => "IPv6",
}; };
info!(" DC{} [{}]", dc_abs, fam_label); info!(" DC{} [{}]", dc, fam_label);
for sample in fam_samples { for sample in fam_samples {
let line = format_sample_line(sample); let line = format_sample_line(sample);
info!("{}", line); info!("{}", line);

View File

@ -84,38 +84,7 @@ impl MePool {
} }
async fn resolve_dc_idx_for_endpoint(&self, addr: SocketAddr) -> Option<i16> { async fn resolve_dc_idx_for_endpoint(&self, addr: SocketAddr) -> Option<i16> {
if addr.is_ipv4() { i16::try_from(self.resolve_dc_for_endpoint(addr).await).ok()
let map = self.proxy_map_v4.read().await;
for (dc, addrs) in map.iter() {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
let abs_dc = dc.abs();
if abs_dc > 0
&& let Ok(dc_idx) = i16::try_from(abs_dc)
{
return Some(dc_idx);
}
}
}
} else {
let map = self.proxy_map_v6.read().await;
for (dc, addrs) in map.iter() {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
let abs_dc = dc.abs();
if abs_dc > 0
&& let Ok(dc_idx) = i16::try_from(abs_dc)
{
return Some(dc_idx);
}
}
}
}
None
} }
fn direct_bind_ip_for_stun( fn direct_bind_ip_for_stun(

View File

@ -102,7 +102,7 @@ async fn check_family(
let mut dc_endpoints = HashMap::<i32, Vec<SocketAddr>>::new(); let mut dc_endpoints = HashMap::<i32, Vec<SocketAddr>>::new();
for (dc, addrs) in map { for (dc, addrs) in map {
let entry = dc_endpoints.entry(dc.abs()).or_default(); let entry = dc_endpoints.entry(dc).or_default();
for (ip, port) in addrs { for (ip, port) in addrs {
entry.push(SocketAddr::new(ip, port)); entry.push(SocketAddr::new(ip, port));
} }

View File

@ -320,7 +320,7 @@ impl MePool {
pool_size: 2, pool_size: 2,
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
default_dc: AtomicI32::new(default_dc.unwrap_or(0)), default_dc: AtomicI32::new(default_dc.unwrap_or(2)),
next_writer_id: AtomicU64::new(1), next_writer_id: AtomicU64::new(1),
ping_tracker: Arc::new(Mutex::new(HashMap::new())), ping_tracker: Arc::new(Mutex::new(HashMap::new())),
rtt_stats: Arc::new(Mutex::new(HashMap::new())), rtt_stats: Arc::new(Mutex::new(HashMap::new())),
@ -625,6 +625,58 @@ impl MePool {
order order
} }
pub(super) fn default_dc_for_routing(&self) -> i32 {
let dc = self.default_dc.load(Ordering::Relaxed);
if dc == 0 { 2 } else { dc }
}
pub(super) fn dc_lookup_chain_for_target(&self, target_dc: i32) -> Vec<i32> {
let mut out = Vec::with_capacity(1);
if target_dc != 0 {
out.push(target_dc);
} else {
// Use default DC only when target DC is unknown and pinning is not established.
let fallback_dc = self.default_dc_for_routing();
out.push(fallback_dc);
}
out
}
pub(super) async fn resolve_dc_for_endpoint(&self, addr: SocketAddr) -> i32 {
let map_guard = if addr.is_ipv4() {
self.proxy_map_v4.read().await
} else {
self.proxy_map_v6.read().await
};
let mut matched_dc: Option<i32> = None;
let mut ambiguous = false;
for (dc, addrs) in map_guard.iter() {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
match matched_dc {
None => matched_dc = Some(*dc),
Some(prev_dc) if prev_dc == *dc => {}
Some(_) => {
ambiguous = true;
break;
}
}
}
}
drop(map_guard);
if !ambiguous
&& let Some(dc) = matched_dc
{
return dc;
}
self.default_dc_for_routing()
}
pub(super) async fn proxy_map_for_family( pub(super) async fn proxy_map_for_family(
&self, &self,
family: IpFamily, family: IpFamily,

View File

@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet}; use std::collections::HashSet;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
@ -27,20 +27,14 @@ impl MePool {
for family in family_order { for family in family_order {
let map = self.proxy_map_for_family(family).await; let map = self.proxy_map_for_family(family).await;
let mut grouped_dc_addrs: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new(); let mut dc_addrs: Vec<(i32, Vec<(IpAddr, u16)>)> = map
for (dc, addrs) in map {
if addrs.is_empty() {
continue;
}
grouped_dc_addrs.entry(dc.abs()).or_default().extend(addrs);
}
let mut dc_addrs: Vec<(i32, Vec<(IpAddr, u16)>)> = grouped_dc_addrs
.into_iter() .into_iter()
.map(|(dc, mut addrs)| { .map(|(dc, mut addrs)| {
addrs.sort_unstable(); addrs.sort_unstable();
addrs.dedup(); addrs.dedup();
(dc, addrs) (dc, addrs)
}) })
.filter(|(_, addrs)| !addrs.is_empty())
.collect(); .collect();
dc_addrs.sort_unstable_by_key(|(dc, _)| *dc); dc_addrs.sort_unstable_by_key(|(dc, _)| *dc);
dc_addrs.sort_by_key(|(_, addrs)| (addrs.len() != 1, addrs.len())); dc_addrs.sort_by_key(|(_, addrs)| (addrs.len() != 1, addrs.len()));

View File

@ -108,19 +108,10 @@ impl MePool {
} else { } else {
IpFamily::V6 IpFamily::V6
}; };
let map = self.proxy_map_for_family(family).await; Some(RefillDcKey {
for (dc, endpoints) in map { dc: self.resolve_dc_for_endpoint(addr).await,
if endpoints
.into_iter()
.any(|(ip, port)| SocketAddr::new(ip, port) == addr)
{
return Some(RefillDcKey {
dc: dc.abs(),
family, family,
}); })
}
}
None
} }
async fn resolve_refill_dc_keys_for_endpoints( async fn resolve_refill_dc_keys_for_endpoints(
@ -177,50 +168,26 @@ impl MePool {
} }
async fn endpoints_for_same_dc(&self, addr: SocketAddr) -> Vec<SocketAddr> { async fn endpoints_for_same_dc(&self, addr: SocketAddr) -> Vec<SocketAddr> {
let mut target_dc = HashSet::<i32>::new();
let mut endpoints = HashSet::<SocketAddr>::new(); let mut endpoints = HashSet::<SocketAddr>::new();
let target_dc = self.resolve_dc_for_endpoint(addr).await;
if self.decision.ipv4_me { if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone(); let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in &map { if let Some(addrs) = map.get(&target_dc) {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
target_dc.insert(dc.abs());
}
}
for dc in &target_dc {
for key in [*dc, -*dc] {
if let Some(addrs) = map.get(&key) {
for (ip, port) in addrs { for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port)); endpoints.insert(SocketAddr::new(*ip, *port));
} }
} }
} }
}
}
if self.decision.ipv6_me { if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone(); let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in &map { if let Some(addrs) = map.get(&target_dc) {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
target_dc.insert(dc.abs());
}
}
for dc in &target_dc {
for key in [*dc, -*dc] {
if let Some(addrs) = map.get(&key) {
for (ip, port) in addrs { for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port)); endpoints.insert(SocketAddr::new(*ip, *port));
} }
} }
} }
}
}
let mut sorted: Vec<SocketAddr> = endpoints.into_iter().collect(); let mut sorted: Vec<SocketAddr> = endpoints.into_iter().collect();
sorted.sort_unstable(); sorted.sort_unstable();

View File

@ -128,7 +128,7 @@ impl MePool {
if self.decision.ipv4_me { if self.decision.ipv4_me {
let map_v4 = self.proxy_map_v4.read().await.clone(); let map_v4 = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map_v4 { for (dc, addrs) in map_v4 {
let entry = out.entry(dc.abs()).or_default(); let entry = out.entry(dc).or_default();
for (ip, port) in addrs { for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port)); entry.insert(SocketAddr::new(ip, port));
} }
@ -138,7 +138,7 @@ impl MePool {
if self.decision.ipv6_me { if self.decision.ipv6_me {
let map_v6 = self.proxy_map_v6.read().await.clone(); let map_v6 = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map_v6 { for (dc, addrs) in map_v6 {
let entry = out.entry(dc.abs()).or_default(); let entry = out.entry(dc).or_default();
for (ip, port) in addrs { for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port)); entry.insert(SocketAddr::new(ip, port));
} }

View File

@ -1,5 +1,5 @@
use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::net::SocketAddr; use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::Instant; use std::time::Instant;
@ -104,35 +104,11 @@ impl MePool {
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new(); let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
if self.decision.ipv4_me { if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone(); let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
if self.decision.ipv6_me { if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone(); let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
if endpoints_by_dc.is_empty() { if endpoints_by_dc.is_empty() {
@ -166,35 +142,11 @@ impl MePool {
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new(); let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
if self.decision.ipv4_me { if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone(); let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
if self.decision.ipv6_me { if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone(); let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
if endpoints_by_dc.is_empty() { if endpoints_by_dc.is_empty() {
@ -234,41 +186,17 @@ impl MePool {
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new(); let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
if self.decision.ipv4_me { if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone(); let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
if self.decision.ipv6_me { if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone(); let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
let mut endpoint_to_dc = HashMap::<SocketAddr, i16>::new(); let mut endpoint_to_dc = HashMap::<SocketAddr, BTreeSet<i16>>::new();
for (dc, endpoints) in &endpoints_by_dc { for (dc, endpoints) in &endpoints_by_dc {
for endpoint in endpoints { for endpoint in endpoints {
endpoint_to_dc.entry(*endpoint).or_insert(*dc); endpoint_to_dc.entry(*endpoint).or_default().insert(*dc);
} }
} }
@ -292,7 +220,13 @@ impl MePool {
for writer in writers { for writer in writers {
let endpoint = writer.addr; let endpoint = writer.addr;
let dc = endpoint_to_dc.get(&endpoint).copied(); let dc = endpoint_to_dc.get(&endpoint).and_then(|dcs| {
if dcs.len() == 1 {
dcs.iter().next().copied()
} else {
None
}
});
let draining = writer.draining.load(Ordering::Relaxed); let draining = writer.draining.load(Ordering::Relaxed);
let degraded = writer.degraded.load(Ordering::Relaxed); let degraded = writer.degraded.load(Ordering::Relaxed);
let bound_clients = activity let bound_clients = activity
@ -499,6 +433,24 @@ fn ratio_pct(part: usize, total: usize) -> f64 {
pct.clamp(0.0, 100.0) pct.clamp(0.0, 100.0)
} }
fn extend_signed_endpoints(
endpoints_by_dc: &mut BTreeMap<i16, BTreeSet<SocketAddr>>,
map: HashMap<i32, Vec<(IpAddr, u16)>>,
) {
for (dc, addrs) in map {
if dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
fn floor_mode_label(mode: MeFloorMode) -> &'static str { fn floor_mode_label(mode: MeFloorMode) -> &'static str {
match mode { match mode {
MeFloorMode::Static => "static", MeFloorMode::Static => "static",

View File

@ -273,13 +273,12 @@ impl ConnRegistry {
bound_clients_by_writer.insert(*writer_id, conn_ids.len()); bound_clients_by_writer.insert(*writer_id, conn_ids.len());
} }
for conn_meta in inner.meta.values() { for conn_meta in inner.meta.values() {
let dc_u16 = conn_meta.target_dc.unsigned_abs(); if conn_meta.target_dc == 0 {
if dc_u16 == 0 {
continue; continue;
} }
if let Ok(dc) = i16::try_from(dc_u16) { *active_sessions_by_target_dc
*active_sessions_by_target_dc.entry(dc).or_insert(0) += 1; .entry(conn_meta.target_dc)
} .or_insert(0) += 1;
} }
WriterActivitySnapshot { WriterActivitySnapshot {
@ -402,7 +401,8 @@ mod tests {
let snapshot = registry.writer_activity_snapshot().await; let snapshot = registry.writer_activity_snapshot().await;
assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&2)); assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&2));
assert_eq!(snapshot.bound_clients_by_writer.get(&20), Some(&1)); assert_eq!(snapshot.bound_clients_by_writer.get(&20), Some(&1));
assert_eq!(snapshot.active_sessions_by_target_dc.get(&2), Some(&2)); assert_eq!(snapshot.active_sessions_by_target_dc.get(&2), Some(&1));
assert_eq!(snapshot.active_sessions_by_target_dc.get(&-2), Some(&1));
assert_eq!(snapshot.active_sessions_by_target_dc.get(&4), Some(&1)); assert_eq!(snapshot.active_sessions_by_target_dc.get(&4), Some(&1));
} }
} }

View File

@ -195,17 +195,9 @@ impl MePool {
return Err(ProxyError::Proxy("No ME writers available for target DC".into())); return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
} }
emergency_attempts += 1; emergency_attempts += 1;
for family in self.family_order() { let mut endpoints = self.endpoint_candidates_for_target_dc(target_dc).await;
let map_guard = match family { endpoints.shuffle(&mut rand::rng());
IpFamily::V4 => self.proxy_map_v4.read().await, for addr in endpoints {
IpFamily::V6 => self.proxy_map_v6.read().await,
};
if let Some(addrs) = map_guard.get(&(target_dc as i32)) {
let mut shuffled = addrs.clone();
shuffled.shuffle(&mut rand::rng());
drop(map_guard);
for (ip, port) in shuffled {
let addr = SocketAddr::new(ip, port);
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
break; break;
} }
@ -222,11 +214,6 @@ impl MePool {
.candidate_indices_for_dc(&writers_snapshot, target_dc, true) .candidate_indices_for_dc(&writers_snapshot, target_dc, true)
.await; .await;
} }
if !candidate_indices.is_empty() {
break;
}
}
}
if candidate_indices.is_empty() { if candidate_indices.is_empty() {
return Err(ProxyError::Proxy("No ME writers available for target DC".into())); return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
} }
@ -458,27 +445,29 @@ impl MePool {
let key = target_dc as i32; let key = target_dc as i32;
let mut preferred = Vec::<SocketAddr>::new(); let mut preferred = Vec::<SocketAddr>::new();
let mut seen = HashSet::<SocketAddr>::new(); let mut seen = HashSet::<SocketAddr>::new();
let lookup_keys = self.dc_lookup_chain_for_target(key);
for family in self.family_order() { for family in self.family_order() {
let map = match family { let map = match family {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(), IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
IpFamily::V6 => self.proxy_map_v6.read().await.clone(), IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
}; };
let mut lookup_keys = vec![key, key.abs(), -key.abs()]; let mut family_selected = Vec::<SocketAddr>::new();
let def = self.default_dc.load(Ordering::Relaxed); for lookup in lookup_keys.iter().copied() {
if def != 0 {
lookup_keys.push(def);
}
for lookup in lookup_keys {
if let Some(addrs) = map.get(&lookup) { if let Some(addrs) = map.get(&lookup) {
for (ip, port) in addrs { for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port); family_selected.push(SocketAddr::new(*ip, *port));
}
}
if !family_selected.is_empty() {
break;
}
}
for addr in family_selected {
if seen.insert(addr) { if seen.insert(addr) {
preferred.push(addr); preferred.push(addr);
} }
} }
}
}
if !preferred.is_empty() && !self.decision.effective_multipath { if !preferred.is_empty() && !self.decision.effective_multipath {
break; break;
} }
@ -569,36 +558,23 @@ impl MePool {
) -> Vec<usize> { ) -> Vec<usize> {
let key = target_dc as i32; let key = target_dc as i32;
let mut preferred = Vec::<SocketAddr>::new(); let mut preferred = Vec::<SocketAddr>::new();
let lookup_keys = self.dc_lookup_chain_for_target(key);
for family in self.family_order() { for family in self.family_order() {
let map_guard = match family { let map_guard = match family {
IpFamily::V4 => self.proxy_map_v4.read().await, IpFamily::V4 => self.proxy_map_v4.read().await,
IpFamily::V6 => self.proxy_map_v6.read().await, IpFamily::V6 => self.proxy_map_v6.read().await,
}; };
let mut family_selected = Vec::<SocketAddr>::new();
if let Some(v) = map_guard.get(&key) { for lookup in lookup_keys.iter().copied() {
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); if let Some(v) = map_guard.get(&lookup) {
family_selected.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
} }
if preferred.is_empty() { if !family_selected.is_empty() {
let abs = key.abs(); break;
if let Some(v) = map_guard.get(&abs) {
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
}
if preferred.is_empty() {
let abs = key.abs();
if let Some(v) = map_guard.get(&-abs) {
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
}
if preferred.is_empty() {
let def = self.default_dc.load(Ordering::Relaxed);
if def != 0
&& let Some(v) = map_guard.get(&def)
{
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
} }
} }
preferred.extend(family_selected);
drop(map_guard); drop(map_guard);
@ -608,9 +584,7 @@ impl MePool {
} }
if preferred.is_empty() { if preferred.is_empty() {
return (0..writers.len()) return Vec::new();
.filter(|i| self.writer_eligible_for_selection(&writers[*i], include_warm))
.collect();
} }
let mut out = Vec::new(); let mut out = Vec::new();
@ -622,11 +596,6 @@ impl MePool {
out.push(idx); out.push(idx);
} }
} }
if out.is_empty() {
return (0..writers.len())
.filter(|i| self.writer_eligible_for_selection(&writers[*i], include_warm))
.collect();
}
out out
} }