mirror of
https://github.com/telemt/telemt.git
synced 2026-05-26 05:31:44 +03:00
ME: Bound writer queue waits under backpressure
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
use http_body_util::{BodyExt, Full};
|
||||
use hyper::StatusCode;
|
||||
use hyper::body::{Bytes, Incoming};
|
||||
use hyper::header::ALLOW;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
@@ -25,6 +26,8 @@ pub(super) fn success_response<T: Serialize>(
|
||||
}
|
||||
|
||||
pub(super) fn error_response(request_id: u64, failure: ApiFailure) -> hyper::Response<Full<Bytes>> {
|
||||
let status = failure.status;
|
||||
let allow = failure.allow;
|
||||
let payload = ErrorResponse {
|
||||
ok: false,
|
||||
error: ErrorBody {
|
||||
@@ -40,11 +43,13 @@ pub(super) fn error_response(request_id: u64, failure: ApiFailure) -> hyper::Res
|
||||
)
|
||||
.into_bytes()
|
||||
});
|
||||
hyper::Response::builder()
|
||||
.status(failure.status)
|
||||
.header("content-type", "application/json; charset=utf-8")
|
||||
.body(Full::new(Bytes::from(body)))
|
||||
.unwrap()
|
||||
let mut builder = hyper::Response::builder()
|
||||
.status(status)
|
||||
.header("content-type", "application/json; charset=utf-8");
|
||||
if let Some(allow) = allow {
|
||||
builder = builder.header(ALLOW, allow);
|
||||
}
|
||||
builder.body(Full::new(Bytes::from(body))).unwrap()
|
||||
}
|
||||
|
||||
pub(super) async fn read_json<T: DeserializeOwned>(
|
||||
|
||||
@@ -41,7 +41,9 @@ mod runtime_watch;
|
||||
mod runtime_zero;
|
||||
mod users;
|
||||
|
||||
use config_store::{current_revision, load_config_from_disk, parse_if_match};
|
||||
use config_store::{
|
||||
current_revision, ensure_expected_revision, load_config_from_disk, parse_if_match,
|
||||
};
|
||||
use events::ApiEventStore;
|
||||
use http_utils::{error_response, read_json, read_optional_json, success_response};
|
||||
use model::{
|
||||
@@ -75,6 +77,10 @@ use users::{
|
||||
const API_MAX_CONTROL_CONNECTIONS: usize = 1024;
|
||||
const API_HTTP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(15);
|
||||
const ROUTE_USERNAME_ERROR: &str = "username must match [A-Za-z0-9_.-] and be 1..64 chars";
|
||||
const ALLOW_GET: &str = "GET";
|
||||
const ALLOW_POST: &str = "POST";
|
||||
const ALLOW_GET_POST: &str = "GET, POST";
|
||||
const ALLOW_GET_PATCH_DELETE: &str = "GET, PATCH, DELETE";
|
||||
|
||||
pub(super) struct ApiRuntimeState {
|
||||
pub(super) process_started_at_epoch_secs: u64,
|
||||
@@ -125,6 +131,57 @@ fn parse_route_username(user: &str) -> Result<&str, ApiFailure> {
|
||||
}
|
||||
}
|
||||
|
||||
fn user_action_route_matches(path: &str, suffix: &str) -> bool {
|
||||
path.strip_prefix("/v1/users/")
|
||||
.and_then(|path| path.strip_suffix(suffix))
|
||||
.map(|user| !user.is_empty() && !user.contains('/'))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn allowed_methods_for_path(path: &str) -> Option<&'static str> {
|
||||
match path {
|
||||
"/v1/health"
|
||||
| "/v1/health/ready"
|
||||
| "/v1/system/info"
|
||||
| "/v1/runtime/gates"
|
||||
| "/v1/runtime/initialization"
|
||||
| "/v1/limits/effective"
|
||||
| "/v1/security/posture"
|
||||
| "/v1/security/whitelist"
|
||||
| "/v1/stats/summary"
|
||||
| "/v1/stats/zero/all"
|
||||
| "/v1/stats/upstreams"
|
||||
| "/v1/stats/minimal/all"
|
||||
| "/v1/stats/me-writers"
|
||||
| "/v1/stats/dcs"
|
||||
| "/v1/runtime/me-pool-state"
|
||||
| "/v1/runtime/me_pool_state"
|
||||
| "/v1/runtime/me-quality"
|
||||
| "/v1/runtime/me_quality"
|
||||
| "/v1/runtime/upstream-quality"
|
||||
| "/v1/runtime/upstream_quality"
|
||||
| "/v1/runtime/nat-stun"
|
||||
| "/v1/runtime/nat_stun"
|
||||
| "/v1/runtime/me-selftest"
|
||||
| "/v1/runtime/connections/summary"
|
||||
| "/v1/runtime/events/recent"
|
||||
| "/v1/stats/users/active-ips"
|
||||
| "/v1/stats/users/quota"
|
||||
| "/v1/stats/users" => Some(ALLOW_GET),
|
||||
"/v1/users" => Some(ALLOW_GET_POST),
|
||||
_ if user_action_route_matches(path, "/reset-quota") => Some(ALLOW_POST),
|
||||
_ if user_action_route_matches(path, "/rotate-secret") => Some(ALLOW_POST),
|
||||
_ if path
|
||||
.strip_prefix("/v1/users/")
|
||||
.map(|user| !user.is_empty() && !user.contains('/'))
|
||||
.unwrap_or(false) =>
|
||||
{
|
||||
Some(ALLOW_GET_PATCH_DELETE)
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn serve(
|
||||
listen: SocketAddr,
|
||||
stats: Arc<Stats>,
|
||||
@@ -435,22 +492,22 @@ async fn handle(
|
||||
let data = build_dcs_data(shared.as_ref(), api_cfg).await;
|
||||
Ok(success_response(StatusCode::OK, data, revision))
|
||||
}
|
||||
("GET", "/v1/runtime/me_pool_state") => {
|
||||
("GET", "/v1/runtime/me-pool-state") | ("GET", "/v1/runtime/me_pool_state") => {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let data = build_runtime_me_pool_state_data(shared.as_ref()).await;
|
||||
Ok(success_response(StatusCode::OK, data, revision))
|
||||
}
|
||||
("GET", "/v1/runtime/me_quality") => {
|
||||
("GET", "/v1/runtime/me-quality") | ("GET", "/v1/runtime/me_quality") => {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let data = build_runtime_me_quality_data(shared.as_ref()).await;
|
||||
Ok(success_response(StatusCode::OK, data, revision))
|
||||
}
|
||||
("GET", "/v1/runtime/upstream_quality") => {
|
||||
("GET", "/v1/runtime/upstream-quality") | ("GET", "/v1/runtime/upstream_quality") => {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let data = build_runtime_upstream_quality_data(shared.as_ref()).await;
|
||||
Ok(success_response(StatusCode::OK, data, revision))
|
||||
}
|
||||
("GET", "/v1/runtime/nat_stun") => {
|
||||
("GET", "/v1/runtime/nat-stun") | ("GET", "/v1/runtime/nat_stun") => {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let data = build_runtime_nat_stun_data(shared.as_ref()).await;
|
||||
Ok(success_response(StatusCode::OK, data, revision))
|
||||
@@ -506,7 +563,7 @@ async fn handle(
|
||||
.await;
|
||||
Ok(success_response(StatusCode::OK, users, revision))
|
||||
}
|
||||
("GET", "/v1/users/quota") => {
|
||||
("GET", "/v1/stats/users/quota") => {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let disk_cfg = load_config_from_disk(&shared.config_path).await?;
|
||||
let data = build_user_quota_list(&disk_cfg, shared.stats.as_ref());
|
||||
@@ -567,6 +624,16 @@ async fn handle(
|
||||
),
|
||||
));
|
||||
}
|
||||
let expected_revision = parse_if_match(req.headers());
|
||||
let disk_cfg = load_config_from_disk(&shared.config_path).await?;
|
||||
ensure_expected_revision(&shared.config_path, expected_revision.as_deref())
|
||||
.await?;
|
||||
if !disk_cfg.access.users.contains_key(user) {
|
||||
return Ok(error_response(
|
||||
request_id,
|
||||
ApiFailure::new(StatusCode::NOT_FOUND, "not_found", "User not found"),
|
||||
));
|
||||
}
|
||||
let snapshot = match crate::quota_state::reset_user_quota(
|
||||
&shared.quota_state_path,
|
||||
shared.stats.as_ref(),
|
||||
@@ -761,16 +828,18 @@ async fn handle(
|
||||
if method == Method::POST {
|
||||
return Ok(error_response(
|
||||
request_id,
|
||||
ApiFailure::new(StatusCode::NOT_FOUND, "not_found", "Route not found"),
|
||||
ApiFailure::method_not_allowed(ALLOW_GET_PATCH_DELETE),
|
||||
));
|
||||
}
|
||||
return Ok(error_response(
|
||||
request_id,
|
||||
ApiFailure::new(
|
||||
StatusCode::METHOD_NOT_ALLOWED,
|
||||
"method_not_allowed",
|
||||
"Unsupported HTTP method for this route",
|
||||
),
|
||||
ApiFailure::method_not_allowed(ALLOW_GET_PATCH_DELETE),
|
||||
));
|
||||
}
|
||||
if let Some(allow) = allowed_methods_for_path(normalized_path) {
|
||||
return Ok(error_response(
|
||||
request_id,
|
||||
ApiFailure::method_not_allowed(allow),
|
||||
));
|
||||
}
|
||||
debug!(
|
||||
|
||||
@@ -15,6 +15,7 @@ pub(super) struct ApiFailure {
|
||||
pub(super) status: StatusCode,
|
||||
pub(super) code: &'static str,
|
||||
pub(super) message: String,
|
||||
pub(super) allow: Option<&'static str>,
|
||||
}
|
||||
|
||||
impl ApiFailure {
|
||||
@@ -23,6 +24,7 @@ impl ApiFailure {
|
||||
status,
|
||||
code,
|
||||
message: message.into(),
|
||||
allow: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +35,15 @@ impl ApiFailure {
|
||||
pub(super) fn bad_request(message: impl Into<String>) -> Self {
|
||||
Self::new(StatusCode::BAD_REQUEST, "bad_request", message)
|
||||
}
|
||||
|
||||
pub(super) fn method_not_allowed(allow: &'static str) -> Self {
|
||||
Self {
|
||||
status: StatusCode::METHOD_NOT_ALLOWED,
|
||||
code: "method_not_allowed",
|
||||
message: "Unsupported HTTP method for this route".to_string(),
|
||||
allow: Some(allow),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::collections::BTreeSet;
|
||||
#[cfg(test)]
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
#[cfg(test)]
|
||||
use std::future::Future;
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -44,7 +44,10 @@ pub(super) fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u
|
||||
dashmap::mapref::entry::Entry::Occupied(_) => false,
|
||||
dashmap::mapref::entry::Entry::Vacant(entry) => {
|
||||
entry.insert(meta);
|
||||
registry.ordered.lock().insert((meta.mark_order_seq, conn_id));
|
||||
registry
|
||||
.ordered
|
||||
.lock()
|
||||
.insert((meta.mark_order_seq, conn_id));
|
||||
true
|
||||
}
|
||||
}
|
||||
@@ -123,7 +126,11 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in(
|
||||
return false;
|
||||
}
|
||||
|
||||
let Some(candidate_meta) = registry.by_conn_id.get(&conn_id).map(|entry| *entry.value()) else {
|
||||
let Some(candidate_meta) = registry
|
||||
.by_conn_id
|
||||
.get(&conn_id)
|
||||
.map(|entry| *entry.value())
|
||||
else {
|
||||
return false;
|
||||
};
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::time::{Duration, Instant};
|
||||
use bytes::BytesMut;
|
||||
use rand::RngExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
@@ -26,6 +27,7 @@ const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
|
||||
const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5;
|
||||
const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700;
|
||||
const ME_PING_TRACKER_CLEANUP_EVERY: u32 = 32;
|
||||
const ME_SERVICE_SIGNAL_SEND_TIMEOUT_MS: u64 = 50;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum WriterTeardownMode {
|
||||
@@ -45,6 +47,11 @@ enum WriterLifecycleExit {
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
enum ServiceWriterCommandSendError {
|
||||
Closed,
|
||||
TimedOut,
|
||||
}
|
||||
|
||||
async fn writer_command_loop(
|
||||
mut rx: mpsc::Receiver<WriterCommand>,
|
||||
mut rpc_writer: RpcWriter,
|
||||
@@ -75,6 +82,27 @@ async fn writer_command_loop(
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_service_writer_command(
|
||||
tx: &mpsc::Sender<WriterCommand>,
|
||||
cmd: WriterCommand,
|
||||
) -> std::result::Result<(), ServiceWriterCommandSendError> {
|
||||
match tx.try_send(cmd) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(TrySendError::Closed(_)) => Err(ServiceWriterCommandSendError::Closed),
|
||||
Err(TrySendError::Full(cmd)) => {
|
||||
let wait = Duration::from_millis(ME_SERVICE_SIGNAL_SEND_TIMEOUT_MS);
|
||||
match tokio::time::timeout(wait, tx.reserve()).await {
|
||||
Ok(Ok(permit)) => {
|
||||
permit.send(cmd);
|
||||
Ok(())
|
||||
}
|
||||
Ok(Err(_)) => Err(ServiceWriterCommandSendError::Closed),
|
||||
Err(_) => Err(ServiceWriterCommandSendError::TimedOut),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn ping_loop(
|
||||
pool_ping: std::sync::Weak<MePool>,
|
||||
@@ -154,14 +182,24 @@ async fn ping_loop(
|
||||
}
|
||||
ping_id = ping_id.wrapping_add(1);
|
||||
stats_ping.increment_me_keepalive_sent();
|
||||
if tx_ping
|
||||
.send(WriterCommand::ControlAndFlush(payload))
|
||||
.await
|
||||
.is_err()
|
||||
if let Err(error) =
|
||||
send_service_writer_command(&tx_ping, WriterCommand::ControlAndFlush(payload)).await
|
||||
{
|
||||
{
|
||||
let mut tracker = ping_tracker_ping.lock().await;
|
||||
tracker.remove(&sent_id);
|
||||
}
|
||||
stats_ping.increment_me_keepalive_failed();
|
||||
debug!("ME ping failed, removing dead writer");
|
||||
return;
|
||||
match error {
|
||||
ServiceWriterCommandSendError::Closed => {
|
||||
debug!("ME ping failed, removing dead writer");
|
||||
return;
|
||||
}
|
||||
ServiceWriterCommandSendError::TimedOut => {
|
||||
debug!("ME ping skipped: writer command channel is full");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -238,14 +276,15 @@ async fn rpc_proxy_req_signal_loop(
|
||||
meta.proto_flags,
|
||||
);
|
||||
|
||||
if tx_signal
|
||||
.send(WriterCommand::DataAndFlush(payload))
|
||||
.await
|
||||
.is_err()
|
||||
if let Err(error) =
|
||||
send_service_writer_command(&tx_signal, WriterCommand::DataAndFlush(payload)).await
|
||||
{
|
||||
stats_signal.increment_me_rpc_proxy_req_signal_failed_total();
|
||||
let _ = pool.registry.unregister(conn_id).await;
|
||||
return;
|
||||
match error {
|
||||
ServiceWriterCommandSendError::Closed => return,
|
||||
ServiceWriterCommandSendError::TimedOut => continue,
|
||||
}
|
||||
}
|
||||
|
||||
stats_signal.increment_me_rpc_proxy_req_signal_sent_total();
|
||||
@@ -263,14 +302,16 @@ async fn rpc_proxy_req_signal_loop(
|
||||
|
||||
let close_payload = build_control_payload(RPC_CLOSE_EXT_U32, conn_id);
|
||||
|
||||
if tx_signal
|
||||
.send(WriterCommand::ControlAndFlush(close_payload))
|
||||
.await
|
||||
.is_err()
|
||||
if let Err(error) =
|
||||
send_service_writer_command(&tx_signal, WriterCommand::ControlAndFlush(close_payload))
|
||||
.await
|
||||
{
|
||||
stats_signal.increment_me_rpc_proxy_req_signal_failed_total();
|
||||
let _ = pool.registry.unregister(conn_id).await;
|
||||
return;
|
||||
match error {
|
||||
ServiceWriterCommandSendError::Closed => return,
|
||||
ServiceWriterCommandSendError::TimedOut => continue,
|
||||
}
|
||||
}
|
||||
|
||||
stats_signal.increment_me_rpc_proxy_req_signal_close_sent_total();
|
||||
|
||||
@@ -14,7 +14,9 @@ use super::{
|
||||
|
||||
impl ConnRegistry {
|
||||
fn set_writer_bound_count(&self, writer_id: u64, count: usize) {
|
||||
self.binding.bound_clients_by_writer.insert(writer_id, count);
|
||||
self.binding
|
||||
.bound_clients_by_writer
|
||||
.insert(writer_id, count);
|
||||
if count == 0 {
|
||||
self.binding
|
||||
.writer_idle_since_epoch_secs
|
||||
@@ -38,8 +40,11 @@ impl ConnRegistry {
|
||||
return;
|
||||
}
|
||||
|
||||
let remove =
|
||||
if let Some(mut count) = self.binding.active_sessions_by_target_dc.get_mut(&target_dc) {
|
||||
let remove = if let Some(mut count) = self
|
||||
.binding
|
||||
.active_sessions_by_target_dc
|
||||
.get_mut(&target_dc)
|
||||
{
|
||||
let decrement = delta.unsigned_abs();
|
||||
*count = count.saturating_sub(decrement);
|
||||
*count == 0
|
||||
@@ -57,7 +62,10 @@ impl ConnRegistry {
|
||||
.conns_for_writer
|
||||
.entry(writer_id)
|
||||
.or_insert_with(HashSet::new);
|
||||
self.binding.bound_clients_by_writer.entry(writer_id).or_insert(0);
|
||||
self.binding
|
||||
.bound_clients_by_writer
|
||||
.entry(writer_id)
|
||||
.or_insert(0);
|
||||
self.binding
|
||||
.writer_idle_since_epoch_secs
|
||||
.entry(writer_id)
|
||||
@@ -474,10 +482,7 @@ impl ConnRegistry {
|
||||
self.hot_binding.map.remove(&conn_id);
|
||||
}
|
||||
if let Some(m) = meta {
|
||||
out.push(BoundConn {
|
||||
conn_id,
|
||||
meta: m,
|
||||
});
|
||||
out.push(BoundConn { conn_id, meta: m });
|
||||
}
|
||||
}
|
||||
out
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
@@ -33,6 +34,11 @@ mod close;
|
||||
mod recovery;
|
||||
mod selection;
|
||||
|
||||
enum WriterCommandReserveError {
|
||||
Closed,
|
||||
TimedOut,
|
||||
}
|
||||
|
||||
fn proxy_tag_array(tag: Option<&[u8]>) -> Option<[u8; 16]> {
|
||||
tag.and_then(|tag| <[u8; 16]>::try_from(tag).ok())
|
||||
}
|
||||
@@ -44,6 +50,21 @@ fn proxy_req_payload_from_command(cmd: WriterCommand) -> Option<PooledBuffer> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn reserve_writer_command_slot(
|
||||
tx: &mpsc::Sender<WriterCommand>,
|
||||
wait: Option<Duration>,
|
||||
) -> std::result::Result<mpsc::OwnedPermit<WriterCommand>, WriterCommandReserveError> {
|
||||
let reserve = tx.clone().reserve_owned();
|
||||
match wait {
|
||||
Some(wait) => match tokio::time::timeout(wait, reserve).await {
|
||||
Ok(Ok(permit)) => Ok(permit),
|
||||
Ok(Err(_)) => Err(WriterCommandReserveError::Closed),
|
||||
Err(_) => Err(WriterCommandReserveError::TimedOut),
|
||||
},
|
||||
None => reserve.await.map_err(|_| WriterCommandReserveError::Closed),
|
||||
}
|
||||
}
|
||||
|
||||
impl MePool {
|
||||
/// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default.
|
||||
pub async fn send_proxy_req(
|
||||
@@ -104,9 +125,25 @@ impl MePool {
|
||||
return Ok(());
|
||||
}
|
||||
Err(TrySendError::Full(cmd)) => {
|
||||
if current.tx.send(cmd).await.is_ok() {
|
||||
self.note_hybrid_route_success();
|
||||
return Ok(());
|
||||
match reserve_writer_command_slot(
|
||||
¤t.tx,
|
||||
self.route_runtime.me_route_blocking_send_timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(permit) => {
|
||||
permit.send(cmd);
|
||||
self.note_hybrid_route_success();
|
||||
return Ok(());
|
||||
}
|
||||
Err(WriterCommandReserveError::TimedOut) => {
|
||||
self.stats
|
||||
.increment_me_writer_pick_full_total(self.writer_pick_mode());
|
||||
return Err(ProxyError::Proxy(
|
||||
"ME writer channel full within blocking send timeout".into(),
|
||||
));
|
||||
}
|
||||
Err(WriterCommandReserveError::Closed) => {}
|
||||
}
|
||||
warn!(writer_id = current.writer_id, "ME writer channel closed");
|
||||
self.remove_writer_and_close_clients(current.writer_id)
|
||||
@@ -156,11 +193,7 @@ impl MePool {
|
||||
for (dc, addrs) in preferred.iter() {
|
||||
for addr in addrs {
|
||||
let _ = self
|
||||
.connect_one_for_dc(
|
||||
*addr,
|
||||
*dc,
|
||||
self.rng.as_ref(),
|
||||
)
|
||||
.connect_one_for_dc(*addr, *dc, self.rng.as_ref())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
@@ -558,33 +591,48 @@ impl MePool {
|
||||
self.note_hybrid_route_success();
|
||||
return Ok(());
|
||||
}
|
||||
Err(TrySendError::Full(cmd)) => match current.tx.send(cmd).await {
|
||||
Ok(()) => {
|
||||
self.note_hybrid_route_success();
|
||||
return Ok(());
|
||||
}
|
||||
Err(send_err) => {
|
||||
let Some(payload) = proxy_req_payload_from_command(send_err.0) else {
|
||||
Err(TrySendError::Full(cmd)) => {
|
||||
match reserve_writer_command_slot(
|
||||
¤t.tx,
|
||||
self.route_runtime.me_route_blocking_send_timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(permit) => {
|
||||
permit.send(cmd);
|
||||
self.note_hybrid_route_success();
|
||||
return Ok(());
|
||||
}
|
||||
Err(WriterCommandReserveError::TimedOut) => {
|
||||
self.stats
|
||||
.increment_me_writer_pick_full_total(self.writer_pick_mode());
|
||||
return Err(ProxyError::Proxy(
|
||||
"ME writer rejected unexpected command type".into(),
|
||||
"ME writer channel full within blocking send timeout".into(),
|
||||
));
|
||||
};
|
||||
warn!(writer_id = current.writer_id, "ME writer channel closed");
|
||||
self.remove_writer_and_close_clients(current.writer_id)
|
||||
.await;
|
||||
return self
|
||||
.send_proxy_req(
|
||||
conn_id,
|
||||
target_dc,
|
||||
client_addr,
|
||||
our_addr,
|
||||
payload.as_ref(),
|
||||
proto_flags,
|
||||
tag.as_ref().map(|tag| tag.as_slice()),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(WriterCommandReserveError::Closed) => {
|
||||
let Some(payload) = proxy_req_payload_from_command(cmd) else {
|
||||
return Err(ProxyError::Proxy(
|
||||
"ME writer rejected unexpected command type".into(),
|
||||
));
|
||||
};
|
||||
warn!(writer_id = current.writer_id, "ME writer channel closed");
|
||||
self.remove_writer_and_close_clients(current.writer_id)
|
||||
.await;
|
||||
return self
|
||||
.send_proxy_req(
|
||||
conn_id,
|
||||
target_dc,
|
||||
client_addr,
|
||||
our_addr,
|
||||
payload.as_ref(),
|
||||
proto_flags,
|
||||
tag.as_ref().map(|tag| tag.as_slice()),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
Err(TrySendError::Closed(cmd)) => {
|
||||
let Some(payload) = proxy_req_payload_from_command(cmd) else {
|
||||
return Err(ProxyError::Proxy(
|
||||
|
||||
@@ -10,18 +10,43 @@ use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
|
||||
|
||||
use super::super::MePool;
|
||||
use super::super::codec::{WriterCommand, build_control_payload};
|
||||
use super::{WriterCommandReserveError, reserve_writer_command_slot};
|
||||
|
||||
const ME_CLOSE_SIGNAL_SEND_TIMEOUT: Duration = Duration::from_millis(50);
|
||||
|
||||
impl MePool {
|
||||
/// Sends an extended close signal for a client-bound ME connection.
|
||||
pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
||||
if let Some(w) = self.registry.get_writer(conn_id).await {
|
||||
let payload = build_control_payload(RPC_CLOSE_EXT_U32, conn_id);
|
||||
if w.tx
|
||||
.send(WriterCommand::ControlAndFlush(payload))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
debug!("ME close write failed");
|
||||
self.remove_writer_and_close_clients(w.writer_id).await;
|
||||
match w.tx.try_send(WriterCommand::ControlAndFlush(payload)) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(cmd)) => {
|
||||
match reserve_writer_command_slot(&w.tx, Some(ME_CLOSE_SIGNAL_SEND_TIMEOUT))
|
||||
.await
|
||||
{
|
||||
Ok(permit) => {
|
||||
permit.send(cmd);
|
||||
}
|
||||
Err(WriterCommandReserveError::TimedOut) => {
|
||||
debug!(conn_id, "ME close skipped: writer command channel is full");
|
||||
}
|
||||
Err(WriterCommandReserveError::Closed) => {
|
||||
debug!(
|
||||
conn_id,
|
||||
"ME close skipped: writer command channel is closed"
|
||||
);
|
||||
self.remove_writer_and_close_clients(w.writer_id).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
debug!(
|
||||
conn_id,
|
||||
"ME close skipped: writer command channel is closed"
|
||||
);
|
||||
self.remove_writer_and_close_clients(w.writer_id).await;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!(conn_id, "ME close skipped (writer missing)");
|
||||
@@ -31,13 +56,16 @@ impl MePool {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sends the compact close signal used by ME-side forced connection teardown.
|
||||
pub async fn send_close_conn(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
||||
if let Some(w) = self.registry.get_writer(conn_id).await {
|
||||
let payload = build_control_payload(RPC_CLOSE_CONN_U32, conn_id);
|
||||
match w.tx.try_send(WriterCommand::ControlAndFlush(payload)) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(cmd)) => {
|
||||
let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await;
|
||||
let _ = reserve_writer_command_slot(&w.tx, Some(ME_CLOSE_SIGNAL_SEND_TIMEOUT))
|
||||
.await
|
||||
.map(|permit| permit.send(cmd));
|
||||
}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
debug!(conn_id, "ME close_conn skipped: writer channel closed");
|
||||
@@ -51,6 +79,7 @@ impl MePool {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sends close signals for all currently registered ME-bound connections during shutdown.
|
||||
pub async fn shutdown_send_close_conn_all(self: &Arc<Self>) -> usize {
|
||||
let conn_ids = self.registry.active_conn_ids().await;
|
||||
let total = conn_ids.len();
|
||||
@@ -60,6 +89,7 @@ impl MePool {
|
||||
total
|
||||
}
|
||||
|
||||
/// Returns the current number of active ME writers tracked by the pool.
|
||||
pub fn connection_count(&self) -> usize {
|
||||
self.conn_count.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user