mirror of
https://github.com/telemt/telemt.git
synced 2026-04-29 08:24:09 +03:00
Src-IP in ME Routing + more strict bind_addresses
This commit is contained in:
@@ -42,20 +42,30 @@ impl MePool {
|
||||
tag_override: Option<&[u8]>,
|
||||
) -> Result<()> {
|
||||
let tag = tag_override.or(self.proxy_tag.as_deref());
|
||||
let payload = build_proxy_req_payload(
|
||||
conn_id,
|
||||
client_addr,
|
||||
our_addr,
|
||||
data,
|
||||
tag,
|
||||
proto_flags,
|
||||
);
|
||||
let meta = ConnMeta {
|
||||
let fallback_meta = ConnMeta {
|
||||
target_dc,
|
||||
client_addr,
|
||||
our_addr,
|
||||
proto_flags,
|
||||
};
|
||||
let build_routed_payload = |effective_our_addr: SocketAddr| {
|
||||
(
|
||||
build_proxy_req_payload(
|
||||
conn_id,
|
||||
client_addr,
|
||||
effective_our_addr,
|
||||
data,
|
||||
tag,
|
||||
proto_flags,
|
||||
),
|
||||
ConnMeta {
|
||||
target_dc,
|
||||
client_addr,
|
||||
our_addr: effective_our_addr,
|
||||
proto_flags,
|
||||
},
|
||||
)
|
||||
};
|
||||
let no_writer_mode =
|
||||
MeRouteNoWriterMode::from_u8(self.me_route_no_writer_mode.load(Ordering::Relaxed));
|
||||
let (routed_dc, unknown_target_dc) = self
|
||||
@@ -70,8 +80,14 @@ impl MePool {
|
||||
let mut hybrid_wait_current = hybrid_wait_step;
|
||||
|
||||
loop {
|
||||
let current_meta = self
|
||||
.registry
|
||||
.get_meta(conn_id)
|
||||
.await
|
||||
.unwrap_or_else(|| fallback_meta.clone());
|
||||
let (current_payload, _) = build_routed_payload(current_meta.our_addr);
|
||||
if let Some(current) = self.registry.get_writer(conn_id).await {
|
||||
match current.tx.try_send(WriterCommand::Data(payload.clone())) {
|
||||
match current.tx.try_send(WriterCommand::Data(current_payload.clone())) {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(TrySendError::Full(cmd)) => {
|
||||
if current.tx.send(cmd).await.is_ok() {
|
||||
@@ -354,11 +370,13 @@ impl MePool {
|
||||
if !self.writer_accepts_new_binding(w) {
|
||||
continue;
|
||||
}
|
||||
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
|
||||
let (payload, meta) = build_routed_payload(effective_our_addr);
|
||||
match w.tx.try_send(WriterCommand::Data(payload.clone())) {
|
||||
Ok(()) => {
|
||||
self.stats.increment_me_writer_pick_success_try_total(pick_mode);
|
||||
self.registry
|
||||
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
|
||||
.bind_writer(conn_id, w.id, w.tx.clone(), meta)
|
||||
.await;
|
||||
if w.generation < self.current_generation() {
|
||||
self.stats.increment_pool_stale_pick_total();
|
||||
@@ -397,12 +415,14 @@ impl MePool {
|
||||
continue;
|
||||
}
|
||||
self.stats.increment_me_writer_pick_blocking_fallback_total();
|
||||
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
|
||||
let (payload, meta) = build_routed_payload(effective_our_addr);
|
||||
match w.tx.send(WriterCommand::Data(payload.clone())).await {
|
||||
Ok(()) => {
|
||||
self.stats
|
||||
.increment_me_writer_pick_success_fallback_total(pick_mode);
|
||||
self.registry
|
||||
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
|
||||
.bind_writer(conn_id, w.id, w.tx.clone(), meta)
|
||||
.await;
|
||||
if w.generation < self.current_generation() {
|
||||
self.stats.increment_pool_stale_pick_total();
|
||||
|
||||
Reference in New Issue
Block a user