Compare commits

...

12 Commits

Author SHA1 Message Date
Alexey 7b570be5b3
DC -> Client Runtime in Metrics and API 2026-03-22 15:28:55 +03:00
Alexey 0461bc65c6
DC -> Client Optimizations 2026-03-22 15:00:15 +03:00
Alexey cf82b637d2
Merge branch 'main' into flow 2026-03-22 12:38:37 +03:00
Alexey 2e8bfa1101
Update codeql-config.yml 2026-03-22 12:38:15 +03:00
Alexey d091b0b251
Update CODE_OF_CONDUCT.md 2026-03-22 11:48:06 +03:00
Alexey 56fc6c4896
Update Dockerfile 2026-03-22 11:16:09 +03:00
Alexey 042d4fd612
Merge branch 'main' into flow 2026-03-22 11:06:03 +03:00
Alexey bbc69f945e
Update release.yml 2026-03-22 11:04:09 +03:00
Alexey 03c9a2588f
Merge branch 'main' into flow 2026-03-22 10:37:13 +03:00
Alexey 9de8b2f0bf
Update release.yml 2026-03-22 10:36:54 +03:00
Alexey 76eb8634a4
Merge branch 'main' into flow 2026-03-22 10:29:01 +03:00
Alexey 4e5b67bae8
Update release.yml 2026-03-22 10:28:06 +03:00
15 changed files with 1587 additions and 158 deletions

View File

@ -7,7 +7,16 @@ queries:
- uses: security-and-quality
- uses: ./.github/codeql/queries
paths-ignore:
- "**/tests/**"
- "**/test/**"
- "**/*_test.rs"
- "**/*/tests.rs"
query-filters:
- exclude:
tags:
- test
- exclude:
id:
- rust/unwrap-on-option

View File

@ -19,104 +19,200 @@ env:
BINARY_NAME: telemt
jobs:
build:
name: Build ${{ matrix.target }}
# ==========================
# GNU / glibc
# ==========================
build-gnu:
name: GNU ${{ matrix.target }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
# ===== GNU / glibc =====
- target: x86_64-unknown-linux-gnu
asset_name: telemt-x86_64-linux-gnu
asset: telemt-x86_64-linux-gnu
- target: aarch64-unknown-linux-gnu
asset_name: telemt-aarch64-linux-gnu
# ===== MUSL =====
- target: x86_64-unknown-linux-musl
asset_name: telemt-x86_64-linux-musl
asset: telemt-aarch64-linux-gnu
steps:
- uses: actions/checkout@v4
# ---------- Toolchain ----------
- uses: dtolnay/rust-toolchain@v1
with:
toolchain: stable
targets: |
x86_64-unknown-linux-gnu
aarch64-unknown-linux-gnu
x86_64-unknown-linux-musl
# ---------- System deps (bookworm) ----------
- name: Install build deps
- name: Install deps
run: |
sudo apt-get update
sudo apt-get install -y --no-install-recommends \
sudo apt-get install -y \
build-essential \
clang \
lld \
pkg-config \
musl-tools \
gcc-aarch64-linux-gnu \
g++-aarch64-linux-gnu \
ca-certificates
g++-aarch64-linux-gnu
# ---------- Cache ----------
- uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }}
key: gnu-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }}
# ---------- Build ----------
- name: Build
env:
CC_x86_64_unknown_linux_gnu: clang
CXX_x86_64_unknown_linux_gnu: clang++
CC_aarch64_unknown_linux_gnu: aarch64-linux-gnu-gcc
CXX_aarch64_unknown_linux_gnu: aarch64-linux-gnu-g++
CC_x86_64_unknown_linux_musl: musl-gcc
RUSTFLAGS: "-C linker=clang -C link-arg=-fuse-ld=lld"
run: |
case "${{ matrix.target }}" in
x86_64-unknown-linux-musl)
export RUSTFLAGS="-C target-feature=+crt-static"
;;
esac
if [ "${{ matrix.target }}" = "aarch64-unknown-linux-gnu" ]; then
export CC=aarch64-linux-gnu-gcc
export CXX=aarch64-linux-gnu-g++
export CC_aarch64_unknown_linux_gnu=aarch64-linux-gnu-gcc
export CXX_aarch64_unknown_linux_gnu=aarch64-linux-gnu-g++
export RUSTFLAGS="-C linker=aarch64-linux-gnu-gcc"
else
export CC=clang
export CXX=clang++
export CC_x86_64_unknown_linux_gnu=clang
export CXX_x86_64_unknown_linux_gnu=clang++
export RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=lld"
fi
cargo build --release --target ${{ matrix.target }}
# ---------- Package ----------
- name: Package
run: |
mkdir -p dist
BIN=target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}
cp "$BIN" dist/${{ env.BINARY_NAME }}-${{ matrix.target }}
cd dist
tar -czf ${{ matrix.asset_name }}.tar.gz ${{ env.BINARY_NAME }}-${{ matrix.target }}
sha256sum ${{ matrix.asset_name }}.tar.gz > ${{ matrix.asset_name }}.sha256
tar -czf ${{ matrix.asset }}.tar.gz ${{ env.BINARY_NAME }}-${{ matrix.target }}
sha256sum ${{ matrix.asset }}.tar.gz > ${{ matrix.asset }}.sha256
- uses: actions/upload-artifact@v4
with:
name: ${{ matrix.asset_name }}
name: ${{ matrix.asset }}
path: |
dist/${{ matrix.asset_name }}.tar.gz
dist/${{ matrix.asset_name }}.sha256
dist/${{ matrix.asset }}.tar.gz
dist/${{ matrix.asset }}.sha256
# ==========================
# MUSL
# ==========================
build-musl:
name: MUSL ${{ matrix.target }}
runs-on: ubuntu-latest
container:
image: rust:slim-bookworm
strategy:
fail-fast: false
matrix:
include:
- target: x86_64-unknown-linux-musl
asset: telemt-x86_64-linux-musl
- target: aarch64-unknown-linux-musl
asset: telemt-aarch64-linux-musl
steps:
- uses: actions/checkout@v4
- name: Install deps
run: |
apt-get update
apt-get install -y \
musl-tools \
pkg-config \
curl
- uses: actions/cache@v4
if: matrix.target == 'aarch64-unknown-linux-musl'
with:
path: ~/.musl-aarch64
key: musl-toolchain-aarch64-v1
- name: Install aarch64 musl toolchain
if: matrix.target == 'aarch64-unknown-linux-musl'
run: |
set -e
TOOLCHAIN_DIR="$HOME/.musl-aarch64"
ARCHIVE="aarch64-linux-musl-cross.tgz"
URL="https://github.com/telemt/telemt/releases/download/toolchains/$ARCHIVE"
if [ -x "$TOOLCHAIN_DIR/bin/aarch64-linux-musl-gcc" ]; then
echo "✅ MUSL toolchain already installed"
else
echo "⬇️ Downloading musl toolchain from Telemt GitHub Releases..."
curl -fL \
--retry 5 \
--retry-delay 3 \
--connect-timeout 10 \
--max-time 120 \
-o "$ARCHIVE" "$URL"
mkdir -p "$TOOLCHAIN_DIR"
tar -xzf "$ARCHIVE" --strip-components=1 -C "$TOOLCHAIN_DIR"
fi
echo "$TOOLCHAIN_DIR/bin" >> $GITHUB_PATH
- name: Add rust target
run: rustup target add ${{ matrix.target }}
- uses: actions/cache@v4
with:
path: |
/usr/local/cargo/registry
/usr/local/cargo/git
target
key: musl-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }}
- name: Build
run: |
if [ "${{ matrix.target }}" = "aarch64-unknown-linux-musl" ]; then
export CC=aarch64-linux-musl-gcc
export CC_aarch64_unknown_linux_musl=aarch64-linux-musl-gcc
export RUSTFLAGS="-C target-feature=+crt-static -C linker=aarch64-linux-musl-gcc"
else
export CC=musl-gcc
export CC_x86_64_unknown_linux_musl=musl-gcc
export RUSTFLAGS="-C target-feature=+crt-static"
fi
cargo build --release --target ${{ matrix.target }}
- name: Package
run: |
mkdir -p dist
BIN=target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}
cp "$BIN" dist/${{ env.BINARY_NAME }}-${{ matrix.target }}
cd dist
tar -czf ${{ matrix.asset }}.tar.gz ${{ env.BINARY_NAME }}-${{ matrix.target }}
sha256sum ${{ matrix.asset }}.tar.gz > ${{ matrix.asset }}.sha256
- uses: actions/upload-artifact@v4
with:
name: ${{ matrix.asset }}
path: |
dist/${{ matrix.asset }}.tar.gz
dist/${{ matrix.asset }}.sha256
# ==========================
# Docker
# ==========================
docker:
name: Docker
runs-on: ubuntu-latest
needs: build
needs: [build-gnu, build-musl]
continue-on-error: true
steps:
@ -147,11 +243,10 @@ jobs:
id: vars
run: echo "VERSION=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT
- name: Build & Push prod
- name: Build & Push
uses: docker/build-push-action@v6
with:
context: .
target: prod
push: true
platforms: linux/amd64,linux/arm64
tags: |
@ -160,10 +255,13 @@ jobs:
build-args: |
BINARY=dist/telemt
# ==========================
# Release
# ==========================
release:
name: Release
runs-on: ubuntu-latest
needs: build
needs: [build-gnu, build-musl]
permissions:
contents: write

View File

@ -1,8 +1,8 @@
# Code of Conduct
## 1. Purpose
## Purpose
Telemt exists to solve technical problems.
**Telemt exists to solve technical problems.**
Telemt is open to contributors who want to learn, improve and build meaningful systems together.
@ -18,27 +18,34 @@ Technology has consequences. Responsibility is inherent.
---
## 2. Principles
## Principles
* **Technical over emotional**
Arguments are grounded in data, logs, reproducible cases, or clear reasoning.
* **Clarity over noise**
Communication is structured, concise, and relevant.
* **Openness with standards**
Participation is open. The work remains disciplined.
* **Independence of judgment**
Claims are evaluated on technical merit, not affiliation or posture.
* **Responsibility over capability**
Capability does not justify careless use.
* **Cooperation over friction**
Progress depends on coordination, mutual support, and honest review.
* **Good intent, rigorous method**
Assume good intent, but require rigor.
> **Aussagen gelten nach ihrer Begründung.**
@ -47,7 +54,7 @@ Technology has consequences. Responsibility is inherent.
---
## 3. Expected Behavior
## Expected Behavior
Participants are expected to:
@ -69,7 +76,7 @@ New contributors are welcome. They are expected to grow into these standards. Ex
---
## 4. Unacceptable Behavior
## Unacceptable Behavior
The following is not allowed:
@ -89,7 +96,7 @@ Such discussions may be closed, removed, or redirected.
---
## 5. Security and Misuse
## Security and Misuse
Telemt is intended for responsible use.
@ -109,15 +116,13 @@ Security is both technical and behavioral.
Telemt is open to contributors of different backgrounds, experience levels, and working styles.
Standards are public, legible, and applied to the work itself.
Questions are welcome. Careful disagreement is welcome. Honest correction is welcome.
Gatekeeping by obscurity, status signaling, or hostility is not.
- Standards are public, legible, and applied to the work itself.
- Questions are welcome. Careful disagreement is welcome. Honest correction is welcome.
- Gatekeeping by obscurity, status signaling, or hostility is not.
---
## 7. Scope
## Scope
This Code of Conduct applies to all official spaces:
@ -127,16 +132,19 @@ This Code of Conduct applies to all official spaces:
---
## 8. Maintainer Stewardship
## Maintainer Stewardship
Maintainers are responsible for final decisions in matters of conduct, scope, and direction.
This responsibility is stewardship: preserving continuity, protecting signal, maintaining standards, and keeping Telemt workable for others.
This responsibility is stewardship:
- preserving continuity,
- protecting signal,
- maintaining standards,
- keeping Telemt workable for others.
Judgment should be exercised with restraint, consistency, and institutional responsibility.
Not every decision requires extended debate.
Not every intervention requires public explanation.
- Not every decision requires extended debate.
- Not every intervention requires public explanation.
All decisions are expected to serve the durability, clarity, and integrity of Telemt.
@ -146,7 +154,7 @@ All decisions are expected to serve the durability, clarity, and integrity of Te
---
## 9. Enforcement
## Enforcement
Maintainers may act to preserve the integrity of Telemt, including by:
@ -156,44 +164,40 @@ Maintainers may act to preserve the integrity of Telemt, including by:
* Restricting or banning participants
Actions are taken to maintain function, continuity, and signal quality.
Where possible, correction is preferred to exclusion.
Where necessary, exclusion is preferred to decay.
- Where possible, correction is preferred to exclusion.
- Where necessary, exclusion is preferred to decay.
---
## 10. Final
## Final
Telemt is built on discipline, structure, and shared intent.
- Signal over noise.
- Facts over opinion.
- Systems over rhetoric.
Signal over noise.
Facts over opinion.
Systems over rhetoric.
- Work is collective.
- Outcomes are shared.
- Responsibility is distributed.
Work is collective.
Outcomes are shared.
Responsibility is distributed.
Precision is learned.
Rigor is expected.
Help is part of the work.
- Precision is learned.
- Rigor is expected.
- Help is part of the work.
> **Ordnung ist Voraussetzung der Freiheit.**
If you contribute — contribute with care.
If you speak — speak with substance.
If you engage — engage constructively.
- If you contribute — contribute with care.
- If you speak — speak with substance.
- If you engage — engage constructively.
---
## 11. After All
## After All
Systems outlive intentions.
What is built will be used.
What is released will propagate.
What is maintained will define the future state.
- What is built will be used.
- What is released will propagate.
- What is maintained will define the future state.
There is no neutral infrastructure, only infrastructure shaped well or poorly.
@ -201,8 +205,8 @@ There is no neutral infrastructure, only infrastructure shaped well or poorly.
> Every system carries responsibility.
Stability requires discipline.
Freedom requires structure.
Trust requires honesty.
- Stability requires discipline.
- Freedom requires structure.
- Trust requires honesty.
In the end, the system reflects its contributors.
In the end: the system reflects its contributors.

View File

@ -28,9 +28,23 @@ RUN cargo build --release && strip target/release/telemt
FROM debian:12-slim AS minimal
RUN apt-get update && apt-get install -y --no-install-recommends \
upx \
binutils \
&& rm -rf /var/lib/apt/lists/*
curl \
ca-certificates \
&& rm -rf /var/lib/apt/lists/* \
\
# install UPX from Telemt releases
&& curl -fL \
--retry 5 \
--retry-delay 3 \
--connect-timeout 10 \
--max-time 120 \
-o /tmp/upx.tar.xz \
https://github.com/telemt/telemt/releases/download/toolchains/upx-amd64_linux.tar.xz \
&& tar -xf /tmp/upx.tar.xz -C /tmp \
&& mv /tmp/upx*/upx /usr/local/bin/upx \
&& chmod +x /usr/local/bin/upx \
&& rm -rf /tmp/upx*
COPY --from=builder /build/target/release/telemt /telemt

View File

@ -174,6 +174,24 @@ pub(super) struct ZeroMiddleProxyData {
pub(super) route_drop_queue_full_total: u64,
pub(super) route_drop_queue_full_base_total: u64,
pub(super) route_drop_queue_full_high_total: u64,
pub(super) d2c_batches_total: u64,
pub(super) d2c_batch_frames_total: u64,
pub(super) d2c_batch_bytes_total: u64,
pub(super) d2c_flush_reason_queue_drain_total: u64,
pub(super) d2c_flush_reason_batch_frames_total: u64,
pub(super) d2c_flush_reason_batch_bytes_total: u64,
pub(super) d2c_flush_reason_max_delay_total: u64,
pub(super) d2c_flush_reason_ack_immediate_total: u64,
pub(super) d2c_flush_reason_close_total: u64,
pub(super) d2c_data_frames_total: u64,
pub(super) d2c_ack_frames_total: u64,
pub(super) d2c_payload_bytes_total: u64,
pub(super) d2c_write_mode_coalesced_total: u64,
pub(super) d2c_write_mode_split_total: u64,
pub(super) d2c_quota_reject_pre_write_total: u64,
pub(super) d2c_quota_reject_post_write_total: u64,
pub(super) d2c_frame_buf_shrink_total: u64,
pub(super) d2c_frame_buf_shrink_bytes_total: u64,
pub(super) socks_kdf_strict_reject_total: u64,
pub(super) socks_kdf_compat_fallback_total: u64,
pub(super) endpoint_quarantine_total: u64,

View File

@ -68,6 +68,25 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
route_drop_queue_full_total: stats.get_me_route_drop_queue_full(),
route_drop_queue_full_base_total: stats.get_me_route_drop_queue_full_base(),
route_drop_queue_full_high_total: stats.get_me_route_drop_queue_full_high(),
d2c_batches_total: stats.get_me_d2c_batches_total(),
d2c_batch_frames_total: stats.get_me_d2c_batch_frames_total(),
d2c_batch_bytes_total: stats.get_me_d2c_batch_bytes_total(),
d2c_flush_reason_queue_drain_total: stats.get_me_d2c_flush_reason_queue_drain_total(),
d2c_flush_reason_batch_frames_total: stats.get_me_d2c_flush_reason_batch_frames_total(),
d2c_flush_reason_batch_bytes_total: stats.get_me_d2c_flush_reason_batch_bytes_total(),
d2c_flush_reason_max_delay_total: stats.get_me_d2c_flush_reason_max_delay_total(),
d2c_flush_reason_ack_immediate_total: stats
.get_me_d2c_flush_reason_ack_immediate_total(),
d2c_flush_reason_close_total: stats.get_me_d2c_flush_reason_close_total(),
d2c_data_frames_total: stats.get_me_d2c_data_frames_total(),
d2c_ack_frames_total: stats.get_me_d2c_ack_frames_total(),
d2c_payload_bytes_total: stats.get_me_d2c_payload_bytes_total(),
d2c_write_mode_coalesced_total: stats.get_me_d2c_write_mode_coalesced_total(),
d2c_write_mode_split_total: stats.get_me_d2c_write_mode_split_total(),
d2c_quota_reject_pre_write_total: stats.get_me_d2c_quota_reject_pre_write_total(),
d2c_quota_reject_post_write_total: stats.get_me_d2c_quota_reject_post_write_total(),
d2c_frame_buf_shrink_total: stats.get_me_d2c_frame_buf_shrink_total(),
d2c_frame_buf_shrink_bytes_total: stats.get_me_d2c_frame_buf_shrink_bytes_total(),
socks_kdf_strict_reject_total: stats.get_me_socks_kdf_strict_reject(),
socks_kdf_compat_fallback_total: stats.get_me_socks_kdf_compat_fallback(),
endpoint_quarantine_total: stats.get_me_endpoint_quarantine_total(),

View File

@ -29,6 +29,8 @@ const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES: usize = 32;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 500;
const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = true;
const DEFAULT_ME_QUOTA_SOFT_OVERSHOOT_BYTES: u64 = 64 * 1024;
const DEFAULT_ME_D2C_FRAME_BUF_SHRINK_THRESHOLD_BYTES: usize = 256 * 1024;
const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024;
const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024;
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
@ -387,6 +389,14 @@ pub(crate) fn default_me_d2c_ack_flush_immediate() -> bool {
DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE
}
pub(crate) fn default_me_quota_soft_overshoot_bytes() -> u64 {
DEFAULT_ME_QUOTA_SOFT_OVERSHOOT_BYTES
}
pub(crate) fn default_me_d2c_frame_buf_shrink_threshold_bytes() -> usize {
DEFAULT_ME_D2C_FRAME_BUF_SHRINK_THRESHOLD_BYTES
}
pub(crate) fn default_direct_relay_copy_buf_c2s_bytes() -> usize {
DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES
}

View File

@ -106,6 +106,8 @@ pub struct HotFields {
pub me_d2c_flush_batch_max_bytes: usize,
pub me_d2c_flush_batch_max_delay_us: u64,
pub me_d2c_ack_flush_immediate: bool,
pub me_quota_soft_overshoot_bytes: u64,
pub me_d2c_frame_buf_shrink_threshold_bytes: usize,
pub direct_relay_copy_buf_c2s_bytes: usize,
pub direct_relay_copy_buf_s2c_bytes: usize,
pub me_health_interval_ms_unhealthy: u64,
@ -225,6 +227,8 @@ impl HotFields {
me_d2c_flush_batch_max_bytes: cfg.general.me_d2c_flush_batch_max_bytes,
me_d2c_flush_batch_max_delay_us: cfg.general.me_d2c_flush_batch_max_delay_us,
me_d2c_ack_flush_immediate: cfg.general.me_d2c_ack_flush_immediate,
me_quota_soft_overshoot_bytes: cfg.general.me_quota_soft_overshoot_bytes,
me_d2c_frame_buf_shrink_threshold_bytes: cfg.general.me_d2c_frame_buf_shrink_threshold_bytes,
direct_relay_copy_buf_c2s_bytes: cfg.general.direct_relay_copy_buf_c2s_bytes,
direct_relay_copy_buf_s2c_bytes: cfg.general.direct_relay_copy_buf_s2c_bytes,
me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy,
@ -511,6 +515,9 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
cfg.general.me_d2c_flush_batch_max_bytes = new.general.me_d2c_flush_batch_max_bytes;
cfg.general.me_d2c_flush_batch_max_delay_us = new.general.me_d2c_flush_batch_max_delay_us;
cfg.general.me_d2c_ack_flush_immediate = new.general.me_d2c_ack_flush_immediate;
cfg.general.me_quota_soft_overshoot_bytes = new.general.me_quota_soft_overshoot_bytes;
cfg.general.me_d2c_frame_buf_shrink_threshold_bytes =
new.general.me_d2c_frame_buf_shrink_threshold_bytes;
cfg.general.direct_relay_copy_buf_c2s_bytes = new.general.direct_relay_copy_buf_c2s_bytes;
cfg.general.direct_relay_copy_buf_s2c_bytes = new.general.direct_relay_copy_buf_s2c_bytes;
cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy;
@ -1030,15 +1037,20 @@ fn log_changes(
|| old_hot.me_d2c_flush_batch_max_bytes != new_hot.me_d2c_flush_batch_max_bytes
|| old_hot.me_d2c_flush_batch_max_delay_us != new_hot.me_d2c_flush_batch_max_delay_us
|| old_hot.me_d2c_ack_flush_immediate != new_hot.me_d2c_ack_flush_immediate
|| old_hot.me_quota_soft_overshoot_bytes != new_hot.me_quota_soft_overshoot_bytes
|| old_hot.me_d2c_frame_buf_shrink_threshold_bytes
!= new_hot.me_d2c_frame_buf_shrink_threshold_bytes
|| old_hot.direct_relay_copy_buf_c2s_bytes != new_hot.direct_relay_copy_buf_c2s_bytes
|| old_hot.direct_relay_copy_buf_s2c_bytes != new_hot.direct_relay_copy_buf_s2c_bytes
{
info!(
"config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} direct_buf_c2s={} direct_buf_s2c={}",
"config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} me_quota_soft_overshoot_bytes={} me_d2c_frame_buf_shrink_threshold_bytes={} direct_buf_c2s={} direct_buf_s2c={}",
new_hot.me_d2c_flush_batch_max_frames,
new_hot.me_d2c_flush_batch_max_bytes,
new_hot.me_d2c_flush_batch_max_delay_us,
new_hot.me_d2c_ack_flush_immediate,
new_hot.me_quota_soft_overshoot_bytes,
new_hot.me_d2c_frame_buf_shrink_threshold_bytes,
new_hot.direct_relay_copy_buf_c2s_bytes,
new_hot.direct_relay_copy_buf_s2c_bytes,
);

View File

@ -533,6 +533,19 @@ impl ProxyConfig {
));
}
if config.general.me_quota_soft_overshoot_bytes > 16 * 1024 * 1024 {
return Err(ProxyError::Config(
"general.me_quota_soft_overshoot_bytes must be within [0, 16777216]".to_string(),
));
}
if !(4096..=16 * 1024 * 1024).contains(&config.general.me_d2c_frame_buf_shrink_threshold_bytes) {
return Err(ProxyError::Config(
"general.me_d2c_frame_buf_shrink_threshold_bytes must be within [4096, 16777216]"
.to_string(),
));
}
if !(4096..=1024 * 1024).contains(&config.general.direct_relay_copy_buf_c2s_bytes) {
return Err(ProxyError::Config(
"general.direct_relay_copy_buf_c2s_bytes must be within [4096, 1048576]"

View File

@ -468,7 +468,7 @@ pub struct GeneralConfig {
pub me_c2me_send_timeout_ms: u64,
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
/// `0` keeps legacy no-wait behavior.
/// `0` keeps non-blocking routing; values >0 enable bounded wait for compatibility.
#[serde(default = "default_me_reader_route_data_wait_ms")]
pub me_reader_route_data_wait_ms: u64,
@ -489,6 +489,14 @@ pub struct GeneralConfig {
#[serde(default = "default_me_d2c_ack_flush_immediate")]
pub me_d2c_ack_flush_immediate: bool,
/// Additional bytes above strict per-user quota allowed in hot-path soft mode.
#[serde(default = "default_me_quota_soft_overshoot_bytes")]
pub me_quota_soft_overshoot_bytes: u64,
/// Shrink threshold for reusable ME->Client frame assembly buffer.
#[serde(default = "default_me_d2c_frame_buf_shrink_threshold_bytes")]
pub me_d2c_frame_buf_shrink_threshold_bytes: usize,
/// Copy buffer size for client->DC direction in direct relay.
#[serde(default = "default_direct_relay_copy_buf_c2s_bytes")]
pub direct_relay_copy_buf_c2s_bytes: usize,
@ -945,6 +953,8 @@ impl Default for GeneralConfig {
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
me_d2c_flush_batch_max_delay_us: default_me_d2c_flush_batch_max_delay_us(),
me_d2c_ack_flush_immediate: default_me_d2c_ack_flush_immediate(),
me_quota_soft_overshoot_bytes: default_me_quota_soft_overshoot_bytes(),
me_d2c_frame_buf_shrink_threshold_bytes: default_me_d2c_frame_buf_shrink_threshold_bytes(),
direct_relay_copy_buf_c2s_bytes: default_direct_relay_copy_buf_c2s_bytes(),
direct_relay_copy_buf_s2c_bytes: default_direct_relay_copy_buf_s2c_bytes(),
me_warmup_stagger_enabled: default_true(),

View File

@ -935,6 +935,462 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batches_total Total DC->Client flush batches"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_batches_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_batches_total {}",
if me_allows_normal {
stats.get_me_d2c_batches_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_frames_total Total DC->Client frames flushed in batches"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_frames_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_total {}",
if me_allows_normal {
stats.get_me_d2c_batch_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_bytes_total Total DC->Client bytes flushed in batches"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_bytes_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_total {}",
if me_allows_normal {
stats.get_me_d2c_batch_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_flush_reason_total DC->Client flush reasons"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_flush_reason_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"queue_drain\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_queue_drain_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"batch_frames\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_batch_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"batch_bytes\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_batch_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"max_delay\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_max_delay_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"ack_immediate\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_ack_immediate_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"close\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_close_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_data_frames_total DC->Client data frames"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_data_frames_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_data_frames_total {}",
if me_allows_normal {
stats.get_me_d2c_data_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_ack_frames_total DC->Client quick-ack frames"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_ack_frames_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_ack_frames_total {}",
if me_allows_normal {
stats.get_me_d2c_ack_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_payload_bytes_total DC->Client payload bytes before transport framing"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_payload_bytes_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_payload_bytes_total {}",
if me_allows_normal {
stats.get_me_d2c_payload_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_write_mode_total DC->Client writer mode selection"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_write_mode_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_write_mode_total{{mode=\"coalesced\"}} {}",
if me_allows_normal {
stats.get_me_d2c_write_mode_coalesced_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_write_mode_total{{mode=\"split\"}} {}",
if me_allows_normal {
stats.get_me_d2c_write_mode_split_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_quota_reject_total DC->Client quota rejects"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_quota_reject_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_quota_reject_total{{stage=\"pre_write\"}} {}",
if me_allows_normal {
stats.get_me_d2c_quota_reject_pre_write_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_quota_reject_total{{stage=\"post_write\"}} {}",
if me_allows_normal {
stats.get_me_d2c_quota_reject_post_write_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_frame_buf_shrink_total DC->Client reusable frame buffer shrink events"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_frame_buf_shrink_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_frame_buf_shrink_total {}",
if me_allows_normal {
stats.get_me_d2c_frame_buf_shrink_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_frame_buf_shrink_bytes_total DC->Client reusable frame buffer bytes released"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_frame_buf_shrink_bytes_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_frame_buf_shrink_bytes_total {}",
if me_allows_normal {
stats.get_me_d2c_frame_buf_shrink_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_frames_bucket_total DC->Client batch frame count buckets"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_frames_bucket_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"1\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_1()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"2_4\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_2_4()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"5_8\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_5_8()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"9_16\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_9_16()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"17_32\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_17_32()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"gt_32\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_gt_32()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_bytes_bucket_total DC->Client batch byte size buckets"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_bytes_bucket_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"0_1k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_0_1k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"1k_4k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_1k_4k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"4k_16k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_4k_16k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"16k_64k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_16k_64k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"64k_128k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_64k_128k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"gt_128k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_gt_128k()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_flush_duration_us_bucket_total DC->Client flush duration buckets"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_flush_duration_us_bucket_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"0_50\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_0_50()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"51_200\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_51_200()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"201_1000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_201_1000()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"1001_5000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_1001_5000()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"5001_20000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_5001_20000()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"gt_20000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_gt_20000()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_timeout_armed_total DC->Client max-delay timer armed events"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_timeout_armed_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_timeout_armed_total {}",
if me_allows_debug {
stats.get_me_d2c_batch_timeout_armed_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_timeout_fired_total DC->Client max-delay timer fired events"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_timeout_fired_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_timeout_fired_total {}",
if me_allows_debug {
stats.get_me_d2c_batch_timeout_fired_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result"
@ -2145,6 +2601,16 @@ mod tests {
stats.increment_relay_idle_hard_close_total();
stats.increment_relay_pressure_evict_total();
stats.increment_relay_protocol_desync_close_total();
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(3);
stats.add_me_d2c_batch_bytes_total(2048);
stats.increment_me_d2c_flush_reason(crate::stats::MeD2cFlushReason::AckImmediate);
stats.increment_me_d2c_data_frames_total();
stats.increment_me_d2c_ack_frames_total();
stats.add_me_d2c_payload_bytes_total(1800);
stats.increment_me_d2c_write_mode(crate::stats::MeD2cWriteMode::Coalesced);
stats.increment_me_d2c_quota_reject_total(crate::stats::MeD2cQuotaRejectStage::PostWrite);
stats.observe_me_d2c_frame_buf_shrink(4096);
stats.increment_user_connects("alice");
stats.increment_user_curr_connects("alice");
stats.add_user_octets_from("alice", 1024);
@ -2184,6 +2650,17 @@ mod tests {
assert!(output.contains("telemt_relay_idle_hard_close_total 1"));
assert!(output.contains("telemt_relay_pressure_evict_total 1"));
assert!(output.contains("telemt_relay_protocol_desync_close_total 1"));
assert!(output.contains("telemt_me_d2c_batches_total 1"));
assert!(output.contains("telemt_me_d2c_batch_frames_total 3"));
assert!(output.contains("telemt_me_d2c_batch_bytes_total 2048"));
assert!(output.contains("telemt_me_d2c_flush_reason_total{reason=\"ack_immediate\"} 1"));
assert!(output.contains("telemt_me_d2c_data_frames_total 1"));
assert!(output.contains("telemt_me_d2c_ack_frames_total 1"));
assert!(output.contains("telemt_me_d2c_payload_bytes_total 1800"));
assert!(output.contains("telemt_me_d2c_write_mode_total{mode=\"coalesced\"} 1"));
assert!(output.contains("telemt_me_d2c_quota_reject_total{stage=\"post_write\"} 1"));
assert!(output.contains("telemt_me_d2c_frame_buf_shrink_total 1"));
assert!(output.contains("telemt_me_d2c_frame_buf_shrink_bytes_total 4096"));
assert!(output.contains("telemt_user_connections_total{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_connections_current{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_octets_from_client{user=\"alice\"} 1024"));
@ -2245,6 +2722,11 @@ mod tests {
assert!(output.contains("# TYPE telemt_relay_idle_hard_close_total counter"));
assert!(output.contains("# TYPE telemt_relay_pressure_evict_total counter"));
assert!(output.contains("# TYPE telemt_relay_protocol_desync_close_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_batches_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_flush_reason_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_write_mode_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_batch_frames_bucket_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_flush_duration_us_bucket_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(
output

View File

@ -21,7 +21,7 @@ use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay,
};
use crate::stats::Stats;
use crate::stats::{MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, Stats};
use crate::stream::{BufferPool, CryptoReader, CryptoWriter, PooledBuffer};
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
@ -45,6 +45,8 @@ const C2ME_SEND_TIMEOUT: Duration = Duration::from_millis(50);
const C2ME_SEND_TIMEOUT: Duration = Duration::from_secs(5);
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
const ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR: usize = 2;
const ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES: usize = 128 * 1024;
#[cfg(test)]
const QUOTA_USER_LOCKS_MAX: usize = 64;
#[cfg(not(test))]
@ -214,6 +216,8 @@ struct MeD2cFlushPolicy {
max_bytes: usize,
max_delay: Duration,
ack_flush_immediate: bool,
quota_soft_overshoot_bytes: u64,
frame_buf_shrink_threshold_bytes: usize,
}
#[derive(Clone, Copy)]
@ -284,6 +288,11 @@ impl MeD2cFlushPolicy {
.max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN),
max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us),
ack_flush_immediate: config.general.me_d2c_ack_flush_immediate,
quota_soft_overshoot_bytes: config.general.me_quota_soft_overshoot_bytes,
frame_buf_shrink_threshold_bytes: config
.general
.me_d2c_frame_buf_shrink_threshold_bytes
.max(4096),
}
}
}
@ -538,6 +547,76 @@ fn quota_would_be_exceeded_for_user(
})
}
fn quota_soft_cap(limit: u64, overshoot: u64) -> u64 {
limit.saturating_add(overshoot)
}
fn quota_exceeded_for_user_soft(
stats: &Stats,
user: &str,
quota_limit: Option<u64>,
overshoot: u64,
) -> bool {
quota_limit.is_some_and(|quota| stats.get_user_total_octets(user) >= quota_soft_cap(quota, overshoot))
}
fn quota_would_be_exceeded_for_user_soft(
stats: &Stats,
user: &str,
quota_limit: Option<u64>,
bytes: u64,
overshoot: u64,
) -> bool {
quota_limit.is_some_and(|quota| {
let cap = quota_soft_cap(quota, overshoot);
let used = stats.get_user_total_octets(user);
used >= cap || bytes > cap.saturating_sub(used)
})
}
fn classify_me_d2c_flush_reason(
flush_immediately: bool,
batch_frames: usize,
max_frames: usize,
batch_bytes: usize,
max_bytes: usize,
max_delay_fired: bool,
) -> MeD2cFlushReason {
if flush_immediately {
return MeD2cFlushReason::AckImmediate;
}
if batch_frames >= max_frames {
return MeD2cFlushReason::BatchFrames;
}
if batch_bytes >= max_bytes {
return MeD2cFlushReason::BatchBytes;
}
if max_delay_fired {
return MeD2cFlushReason::MaxDelay;
}
MeD2cFlushReason::QueueDrain
}
fn observe_me_d2c_flush_event(
stats: &Stats,
reason: MeD2cFlushReason,
batch_frames: usize,
batch_bytes: usize,
flush_duration_us: Option<u64>,
) {
stats.increment_me_d2c_flush_reason(reason);
if batch_frames > 0 || batch_bytes > 0 {
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(batch_frames as u64);
stats.add_me_d2c_batch_bytes_total(batch_bytes as u64);
stats.observe_me_d2c_batch_frames(batch_frames as u64);
stats.observe_me_d2c_batch_bytes(batch_bytes as u64);
}
if let Some(duration_us) = flush_duration_us {
stats.observe_me_d2c_flush_duration_us(duration_us);
}
}
#[cfg(test)]
fn quota_user_lock_test_guard() -> &'static Mutex<()> {
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
@ -774,6 +853,7 @@ where
let mut batch_frames = 0usize;
let mut batch_bytes = 0usize;
let mut flush_immediately;
let mut max_delay_fired = false;
let first_is_downstream_activity =
matches!(&first, MeResponse::Data { .. } | MeResponse::Ack(_));
@ -786,6 +866,7 @@ where
stats_clone.as_ref(),
&user_clone,
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@ -801,7 +882,25 @@ where
flush_immediately = immediate;
}
MeWriterResponseOutcome::Close => {
let flush_started_at = if stats_clone.telemetry_policy().me_level.allows_debug() {
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX)) as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(());
}
}
@ -825,6 +924,7 @@ where
stats_clone.as_ref(),
&user_clone,
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@ -840,7 +940,27 @@ where
flush_immediately |= immediate;
}
MeWriterResponseOutcome::Close => {
let flush_started_at =
if stats_clone.telemetry_policy().me_level.allows_debug() {
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX))
as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(());
}
}
@ -851,6 +971,7 @@ where
&& batch_frames < d2c_flush_policy.max_frames
&& batch_bytes < d2c_flush_policy.max_bytes
{
stats_clone.increment_me_d2c_batch_timeout_armed_total();
match tokio::time::timeout(d2c_flush_policy.max_delay, me_rx_task.recv()).await {
Ok(Some(next)) => {
let next_is_downstream_activity =
@ -864,6 +985,7 @@ where
stats_clone.as_ref(),
&user_clone,
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@ -879,7 +1001,30 @@ where
flush_immediately |= immediate;
}
MeWriterResponseOutcome::Close => {
let flush_started_at = if stats_clone
.telemetry_policy()
.me_level
.allows_debug()
{
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX))
as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(());
}
}
@ -903,6 +1048,7 @@ where
stats_clone.as_ref(),
&user_clone,
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@ -918,7 +1064,30 @@ where
flush_immediately |= immediate;
}
MeWriterResponseOutcome::Close => {
let flush_started_at = if stats_clone
.telemetry_policy()
.me_level
.allows_debug()
{
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX))
as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(());
}
}
@ -928,11 +1097,50 @@ where
debug!(conn_id, "ME channel closed");
return Err(ProxyError::Proxy("ME connection lost".into()));
}
Err(_) => {}
Err(_) => {
max_delay_fired = true;
stats_clone.increment_me_d2c_batch_timeout_fired_total();
}
}
}
let flush_reason = classify_me_d2c_flush_reason(
flush_immediately,
batch_frames,
d2c_flush_policy.max_frames,
batch_bytes,
d2c_flush_policy.max_bytes,
max_delay_fired,
);
let flush_started_at = if stats_clone.telemetry_policy().me_level.allows_debug() {
Some(Instant::now())
} else {
None
};
writer.flush().await.map_err(ProxyError::Io)?;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX)) as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
flush_reason,
batch_frames,
batch_bytes,
flush_duration_us,
);
let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes;
let shrink_trigger = shrink_threshold
.saturating_mul(ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR);
if frame_buf.capacity() > shrink_trigger {
let cap_before = frame_buf.capacity();
frame_buf.shrink_to(shrink_threshold);
let cap_after = frame_buf.capacity();
let bytes_freed = cap_before.saturating_sub(cap_after) as u64;
stats_clone.observe_me_d2c_frame_buf_shrink(bytes_freed);
}
}
_ = &mut stop_rx => {
debug!(conn_id, "ME writer stop signal");
@ -1482,6 +1690,7 @@ async fn process_me_writer_response<W>(
stats: &Stats,
user: &str,
quota_limit: Option<u64>,
quota_soft_overshoot_bytes: u64,
bytes_me2c: &AtomicU64,
conn_id: u64,
ack_flush_immediate: bool,
@ -1498,31 +1707,39 @@ where
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
}
let data_len = data.len() as u64;
if let Some(limit) = quota_limit {
let quota_lock = quota_user_lock(user);
let _quota_guard = quota_lock.lock().await;
if quota_would_be_exceeded_for_user(stats, user, Some(limit), data_len) {
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
if quota_would_be_exceeded_for_user_soft(
stats,
user,
quota_limit,
data_len,
quota_soft_overshoot_bytes,
) {
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
let write_mode =
write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
.await?;
stats.increment_me_d2c_write_mode(write_mode);
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
stats.add_user_octets_to(user, data.len() as u64);
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
stats.add_user_octets_to(user, data.len() as u64);
stats.increment_me_d2c_data_frames_total();
stats.add_me_d2c_payload_bytes_total(data.len() as u64);
if quota_exceeded_for_user(stats, user, Some(limit)) {
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
} else {
write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
.await?;
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
stats.add_user_octets_to(user, data.len() as u64);
if quota_exceeded_for_user_soft(
stats,
user,
quota_limit,
quota_soft_overshoot_bytes,
) {
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PostWrite);
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
Ok(MeWriterResponseOutcome::Continue {
@ -1538,6 +1755,7 @@ where
trace!(conn_id, confirm, "ME->C quickack");
}
write_client_ack(client_writer, proto_tag, confirm).await?;
stats.increment_me_d2c_ack_frames_total();
Ok(MeWriterResponseOutcome::Continue {
frames: 1,
@ -1588,13 +1806,13 @@ async fn write_client_payload<W>(
data: &[u8],
rng: &SecureRandom,
frame_buf: &mut Vec<u8>,
) -> Result<()>
) -> Result<MeD2cWriteMode>
where
W: AsyncWrite + Unpin + Send + 'static,
{
let quickack = (flags & RPC_FLAG_QUICKACK) != 0;
match proto_tag {
let write_mode = match proto_tag {
ProtoTag::Abridged => {
if !data.len().is_multiple_of(4) {
return Err(ProxyError::Proxy(format!(
@ -1609,28 +1827,46 @@ where
if quickack {
first |= 0x80;
}
frame_buf.clear();
frame_buf.reserve(1 + data.len());
frame_buf.push(first);
frame_buf.extend_from_slice(data);
client_writer
.write_all(frame_buf)
.await
.map_err(ProxyError::Io)?;
let wire_len = 1usize.saturating_add(data.len());
if wire_len <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
frame_buf.clear();
frame_buf.reserve(wire_len);
frame_buf.push(first);
frame_buf.extend_from_slice(data);
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
MeD2cWriteMode::Coalesced
} else {
let header = [first];
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
MeD2cWriteMode::Split
}
} else if len_words < (1 << 24) {
let mut first = 0x7fu8;
if quickack {
first |= 0x80;
}
let lw = (len_words as u32).to_le_bytes();
frame_buf.clear();
frame_buf.reserve(4 + data.len());
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
frame_buf.extend_from_slice(data);
client_writer
.write_all(frame_buf)
.await
.map_err(ProxyError::Io)?;
let wire_len = 4usize.saturating_add(data.len());
if wire_len <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
frame_buf.clear();
frame_buf.reserve(wire_len);
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
frame_buf.extend_from_slice(data);
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
MeD2cWriteMode::Coalesced
} else {
let header = [first, lw[0], lw[1], lw[2]];
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
MeD2cWriteMode::Split
}
} else {
return Err(ProxyError::Proxy(format!(
"Abridged frame too large: {}",
@ -1650,25 +1886,46 @@ where
} else {
0
};
let (len_val, total) =
compute_intermediate_secure_wire_len(data.len(), padding_len, quickack)?;
frame_buf.clear();
frame_buf.reserve(total);
frame_buf.extend_from_slice(&len_val.to_le_bytes());
frame_buf.extend_from_slice(data);
if padding_len > 0 {
let start = frame_buf.len();
frame_buf.resize(start + padding_len, 0);
rng.fill(&mut frame_buf[start..]);
if total <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
frame_buf.clear();
frame_buf.reserve(total);
frame_buf.extend_from_slice(&len_val.to_le_bytes());
frame_buf.extend_from_slice(data);
if padding_len > 0 {
let start = frame_buf.len();
frame_buf.resize(start + padding_len, 0);
rng.fill(&mut frame_buf[start..]);
}
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
MeD2cWriteMode::Coalesced
} else {
let header = len_val.to_le_bytes();
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
if padding_len > 0 {
frame_buf.clear();
if frame_buf.capacity() < padding_len {
frame_buf.reserve(padding_len);
}
frame_buf.resize(padding_len, 0);
rng.fill(frame_buf.as_mut_slice());
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
}
MeD2cWriteMode::Split
}
client_writer
.write_all(frame_buf)
.await
.map_err(ProxyError::Io)?;
}
}
};
Ok(())
Ok(write_mode)
}
async fn write_client_ack<W>(

View File

@ -1540,6 +1540,7 @@ async fn process_me_writer_response_ack_obeys_flush_policy() {
&stats,
"user",
None,
0,
&bytes_me2c,
77,
true,
@ -1566,6 +1567,7 @@ async fn process_me_writer_response_ack_obeys_flush_policy() {
&stats,
"user",
None,
0,
&bytes_me2c,
77,
false,
@ -1606,6 +1608,7 @@ async fn process_me_writer_response_data_updates_byte_accounting() {
&stats,
"user",
None,
0,
&bytes_me2c,
88,
false,
@ -1652,6 +1655,7 @@ async fn process_me_writer_response_data_enforces_live_user_quota() {
&stats,
"quota-user",
Some(12),
0,
&bytes_me2c,
89,
false,
@ -1700,6 +1704,7 @@ async fn process_me_writer_response_concurrent_same_user_quota_does_not_overshoo
&stats,
user,
Some(1),
0,
&bytes_me2c,
91,
false,
@ -1717,6 +1722,7 @@ async fn process_me_writer_response_concurrent_same_user_quota_does_not_overshoo
&stats,
user,
Some(1),
0,
&bytes_me2c,
92,
false,
@ -1765,6 +1771,7 @@ async fn process_me_writer_response_data_does_not_forward_partial_payload_when_r
&stats,
"partial-quota-user",
Some(4),
0,
&bytes_me2c,
90,
false,
@ -1970,6 +1977,7 @@ async fn run_quota_race_attempt(
stats,
user,
Some(1),
0,
bytes_me2c,
conn_id,
false,

View File

@ -26,6 +26,28 @@ enum RouteConnectionGauge {
Middle,
}
#[derive(Debug, Clone, Copy)]
pub enum MeD2cFlushReason {
QueueDrain,
BatchFrames,
BatchBytes,
MaxDelay,
AckImmediate,
Close,
}
#[derive(Debug, Clone, Copy)]
pub enum MeD2cWriteMode {
Coalesced,
Split,
}
#[derive(Debug, Clone, Copy)]
pub enum MeD2cQuotaRejectStage {
PreWrite,
PostWrite,
}
#[must_use = "RouteConnectionLease must be kept alive to hold the connection gauge increment"]
pub struct RouteConnectionLease {
stats: Arc<Stats>,
@ -140,6 +162,44 @@ pub struct Stats {
me_route_drop_queue_full: AtomicU64,
me_route_drop_queue_full_base: AtomicU64,
me_route_drop_queue_full_high: AtomicU64,
me_d2c_batches_total: AtomicU64,
me_d2c_batch_frames_total: AtomicU64,
me_d2c_batch_bytes_total: AtomicU64,
me_d2c_flush_reason_queue_drain_total: AtomicU64,
me_d2c_flush_reason_batch_frames_total: AtomicU64,
me_d2c_flush_reason_batch_bytes_total: AtomicU64,
me_d2c_flush_reason_max_delay_total: AtomicU64,
me_d2c_flush_reason_ack_immediate_total: AtomicU64,
me_d2c_flush_reason_close_total: AtomicU64,
me_d2c_data_frames_total: AtomicU64,
me_d2c_ack_frames_total: AtomicU64,
me_d2c_payload_bytes_total: AtomicU64,
me_d2c_write_mode_coalesced_total: AtomicU64,
me_d2c_write_mode_split_total: AtomicU64,
me_d2c_quota_reject_pre_write_total: AtomicU64,
me_d2c_quota_reject_post_write_total: AtomicU64,
me_d2c_frame_buf_shrink_total: AtomicU64,
me_d2c_frame_buf_shrink_bytes_total: AtomicU64,
me_d2c_batch_frames_bucket_1: AtomicU64,
me_d2c_batch_frames_bucket_2_4: AtomicU64,
me_d2c_batch_frames_bucket_5_8: AtomicU64,
me_d2c_batch_frames_bucket_9_16: AtomicU64,
me_d2c_batch_frames_bucket_17_32: AtomicU64,
me_d2c_batch_frames_bucket_gt_32: AtomicU64,
me_d2c_batch_bytes_bucket_0_1k: AtomicU64,
me_d2c_batch_bytes_bucket_1k_4k: AtomicU64,
me_d2c_batch_bytes_bucket_4k_16k: AtomicU64,
me_d2c_batch_bytes_bucket_16k_64k: AtomicU64,
me_d2c_batch_bytes_bucket_64k_128k: AtomicU64,
me_d2c_batch_bytes_bucket_gt_128k: AtomicU64,
me_d2c_flush_duration_us_bucket_0_50: AtomicU64,
me_d2c_flush_duration_us_bucket_51_200: AtomicU64,
me_d2c_flush_duration_us_bucket_201_1000: AtomicU64,
me_d2c_flush_duration_us_bucket_1001_5000: AtomicU64,
me_d2c_flush_duration_us_bucket_5001_20000: AtomicU64,
me_d2c_flush_duration_us_bucket_gt_20000: AtomicU64,
me_d2c_batch_timeout_armed_total: AtomicU64,
me_d2c_batch_timeout_fired_total: AtomicU64,
me_writer_pick_sorted_rr_success_try_total: AtomicU64,
me_writer_pick_sorted_rr_success_fallback_total: AtomicU64,
me_writer_pick_sorted_rr_full_total: AtomicU64,
@ -594,6 +654,215 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_batches_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_d2c_batches_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn add_me_d2c_batch_frames_total(&self, frames: u64) {
if self.telemetry_me_allows_normal() {
self.me_d2c_batch_frames_total
.fetch_add(frames, Ordering::Relaxed);
}
}
pub fn add_me_d2c_batch_bytes_total(&self, bytes: u64) {
if self.telemetry_me_allows_normal() {
self.me_d2c_batch_bytes_total
.fetch_add(bytes, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_flush_reason(&self, reason: MeD2cFlushReason) {
if !self.telemetry_me_allows_normal() {
return;
}
match reason {
MeD2cFlushReason::QueueDrain => {
self.me_d2c_flush_reason_queue_drain_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::BatchFrames => {
self.me_d2c_flush_reason_batch_frames_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::BatchBytes => {
self.me_d2c_flush_reason_batch_bytes_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::MaxDelay => {
self.me_d2c_flush_reason_max_delay_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::AckImmediate => {
self.me_d2c_flush_reason_ack_immediate_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::Close => {
self.me_d2c_flush_reason_close_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_d2c_data_frames_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_d2c_data_frames_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_ack_frames_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_d2c_ack_frames_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn add_me_d2c_payload_bytes_total(&self, bytes: u64) {
if self.telemetry_me_allows_normal() {
self.me_d2c_payload_bytes_total
.fetch_add(bytes, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_write_mode(&self, mode: MeD2cWriteMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeD2cWriteMode::Coalesced => {
self.me_d2c_write_mode_coalesced_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cWriteMode::Split => {
self.me_d2c_write_mode_split_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_d2c_quota_reject_total(&self, stage: MeD2cQuotaRejectStage) {
if !self.telemetry_me_allows_normal() {
return;
}
match stage {
MeD2cQuotaRejectStage::PreWrite => {
self.me_d2c_quota_reject_pre_write_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cQuotaRejectStage::PostWrite => {
self.me_d2c_quota_reject_post_write_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn observe_me_d2c_frame_buf_shrink(&self, bytes_freed: u64) {
if !self.telemetry_me_allows_normal() {
return;
}
self.me_d2c_frame_buf_shrink_total
.fetch_add(1, Ordering::Relaxed);
self.me_d2c_frame_buf_shrink_bytes_total
.fetch_add(bytes_freed, Ordering::Relaxed);
}
pub fn observe_me_d2c_batch_frames(&self, frames: u64) {
if !self.telemetry_me_allows_debug() {
return;
}
match frames {
0 => {}
1 => {
self.me_d2c_batch_frames_bucket_1
.fetch_add(1, Ordering::Relaxed);
}
2..=4 => {
self.me_d2c_batch_frames_bucket_2_4
.fetch_add(1, Ordering::Relaxed);
}
5..=8 => {
self.me_d2c_batch_frames_bucket_5_8
.fetch_add(1, Ordering::Relaxed);
}
9..=16 => {
self.me_d2c_batch_frames_bucket_9_16
.fetch_add(1, Ordering::Relaxed);
}
17..=32 => {
self.me_d2c_batch_frames_bucket_17_32
.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.me_d2c_batch_frames_bucket_gt_32
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn observe_me_d2c_batch_bytes(&self, bytes: u64) {
if !self.telemetry_me_allows_debug() {
return;
}
match bytes {
0..=1024 => {
self.me_d2c_batch_bytes_bucket_0_1k
.fetch_add(1, Ordering::Relaxed);
}
1025..=4096 => {
self.me_d2c_batch_bytes_bucket_1k_4k
.fetch_add(1, Ordering::Relaxed);
}
4097..=16_384 => {
self.me_d2c_batch_bytes_bucket_4k_16k
.fetch_add(1, Ordering::Relaxed);
}
16_385..=65_536 => {
self.me_d2c_batch_bytes_bucket_16k_64k
.fetch_add(1, Ordering::Relaxed);
}
65_537..=131_072 => {
self.me_d2c_batch_bytes_bucket_64k_128k
.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.me_d2c_batch_bytes_bucket_gt_128k
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn observe_me_d2c_flush_duration_us(&self, duration_us: u64) {
if !self.telemetry_me_allows_debug() {
return;
}
match duration_us {
0..=50 => {
self.me_d2c_flush_duration_us_bucket_0_50
.fetch_add(1, Ordering::Relaxed);
}
51..=200 => {
self.me_d2c_flush_duration_us_bucket_51_200
.fetch_add(1, Ordering::Relaxed);
}
201..=1000 => {
self.me_d2c_flush_duration_us_bucket_201_1000
.fetch_add(1, Ordering::Relaxed);
}
1001..=5000 => {
self.me_d2c_flush_duration_us_bucket_1001_5000
.fetch_add(1, Ordering::Relaxed);
}
5001..=20_000 => {
self.me_d2c_flush_duration_us_bucket_5001_20000
.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.me_d2c_flush_duration_us_bucket_gt_20000
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_d2c_batch_timeout_armed_total(&self) {
if self.telemetry_me_allows_debug() {
self.me_d2c_batch_timeout_armed_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_batch_timeout_fired_total(&self) {
if self.telemetry_me_allows_debug() {
self.me_d2c_batch_timeout_fired_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
@ -1229,6 +1498,142 @@ impl Stats {
pub fn get_me_route_drop_queue_full_high(&self) -> u64 {
self.me_route_drop_queue_full_high.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batches_total(&self) -> u64 {
self.me_d2c_batches_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_total(&self) -> u64 {
self.me_d2c_batch_frames_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_total(&self) -> u64 {
self.me_d2c_batch_bytes_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_queue_drain_total(&self) -> u64 {
self.me_d2c_flush_reason_queue_drain_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_batch_frames_total(&self) -> u64 {
self.me_d2c_flush_reason_batch_frames_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_batch_bytes_total(&self) -> u64 {
self.me_d2c_flush_reason_batch_bytes_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_max_delay_total(&self) -> u64 {
self.me_d2c_flush_reason_max_delay_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_ack_immediate_total(&self) -> u64 {
self.me_d2c_flush_reason_ack_immediate_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_close_total(&self) -> u64 {
self.me_d2c_flush_reason_close_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_data_frames_total(&self) -> u64 {
self.me_d2c_data_frames_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_ack_frames_total(&self) -> u64 {
self.me_d2c_ack_frames_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_payload_bytes_total(&self) -> u64 {
self.me_d2c_payload_bytes_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_write_mode_coalesced_total(&self) -> u64 {
self.me_d2c_write_mode_coalesced_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_write_mode_split_total(&self) -> u64 {
self.me_d2c_write_mode_split_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_quota_reject_pre_write_total(&self) -> u64 {
self.me_d2c_quota_reject_pre_write_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_quota_reject_post_write_total(&self) -> u64 {
self.me_d2c_quota_reject_post_write_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_frame_buf_shrink_total(&self) -> u64 {
self.me_d2c_frame_buf_shrink_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_frame_buf_shrink_bytes_total(&self) -> u64 {
self.me_d2c_frame_buf_shrink_bytes_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_1(&self) -> u64 {
self.me_d2c_batch_frames_bucket_1.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_2_4(&self) -> u64 {
self.me_d2c_batch_frames_bucket_2_4.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_5_8(&self) -> u64 {
self.me_d2c_batch_frames_bucket_5_8.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_9_16(&self) -> u64 {
self.me_d2c_batch_frames_bucket_9_16.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_17_32(&self) -> u64 {
self.me_d2c_batch_frames_bucket_17_32
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_gt_32(&self) -> u64 {
self.me_d2c_batch_frames_bucket_gt_32
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_0_1k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_0_1k.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_1k_4k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_1k_4k.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_4k_16k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_4k_16k.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_16k_64k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_16k_64k
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_64k_128k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_64k_128k
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_gt_128k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_gt_128k
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_0_50(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_0_50
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_51_200(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_51_200
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_201_1000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_201_1000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_1001_5000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_1001_5000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_5001_20000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_5001_20000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_gt_20000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_gt_20000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_timeout_armed_total(&self) -> u64 {
self.me_d2c_batch_timeout_armed_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_timeout_fired_total(&self) -> u64 {
self.me_d2c_batch_timeout_fired_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_success_try_total
.load(Ordering::Relaxed)
@ -1898,9 +2303,83 @@ mod tests {
stats.increment_me_crc_mismatch();
stats.increment_me_keepalive_sent();
stats.increment_me_route_drop_queue_full();
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(4);
stats.add_me_d2c_batch_bytes_total(4096);
stats.increment_me_d2c_flush_reason(MeD2cFlushReason::BatchBytes);
stats.increment_me_d2c_write_mode(MeD2cWriteMode::Coalesced);
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
stats.observe_me_d2c_frame_buf_shrink(1024);
stats.observe_me_d2c_batch_frames(4);
stats.observe_me_d2c_batch_bytes(4096);
stats.observe_me_d2c_flush_duration_us(120);
stats.increment_me_d2c_batch_timeout_armed_total();
stats.increment_me_d2c_batch_timeout_fired_total();
assert_eq!(stats.get_me_crc_mismatch(), 0);
assert_eq!(stats.get_me_keepalive_sent(), 0);
assert_eq!(stats.get_me_route_drop_queue_full(), 0);
assert_eq!(stats.get_me_d2c_batches_total(), 0);
assert_eq!(stats.get_me_d2c_flush_reason_batch_bytes_total(), 0);
assert_eq!(stats.get_me_d2c_write_mode_coalesced_total(), 0);
assert_eq!(stats.get_me_d2c_quota_reject_pre_write_total(), 0);
assert_eq!(stats.get_me_d2c_frame_buf_shrink_total(), 0);
assert_eq!(stats.get_me_d2c_batch_frames_bucket_2_4(), 0);
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_1k_4k(), 0);
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_51_200(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 0);
}
#[test]
fn test_telemetry_policy_me_normal_blocks_d2c_debug_metrics() {
let stats = Stats::new();
stats.apply_telemetry_policy(TelemetryPolicy {
core_enabled: true,
user_enabled: true,
me_level: MeTelemetryLevel::Normal,
});
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(2);
stats.add_me_d2c_batch_bytes_total(2048);
stats.increment_me_d2c_flush_reason(MeD2cFlushReason::QueueDrain);
stats.observe_me_d2c_batch_frames(2);
stats.observe_me_d2c_batch_bytes(2048);
stats.observe_me_d2c_flush_duration_us(100);
stats.increment_me_d2c_batch_timeout_armed_total();
stats.increment_me_d2c_batch_timeout_fired_total();
assert_eq!(stats.get_me_d2c_batches_total(), 1);
assert_eq!(stats.get_me_d2c_batch_frames_total(), 2);
assert_eq!(stats.get_me_d2c_batch_bytes_total(), 2048);
assert_eq!(stats.get_me_d2c_flush_reason_queue_drain_total(), 1);
assert_eq!(stats.get_me_d2c_batch_frames_bucket_2_4(), 0);
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_1k_4k(), 0);
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_51_200(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 0);
}
#[test]
fn test_telemetry_policy_me_debug_enables_d2c_debug_metrics() {
let stats = Stats::new();
stats.apply_telemetry_policy(TelemetryPolicy {
core_enabled: true,
user_enabled: true,
me_level: MeTelemetryLevel::Debug,
});
stats.observe_me_d2c_batch_frames(7);
stats.observe_me_d2c_batch_bytes(70_000);
stats.observe_me_d2c_flush_duration_us(1400);
stats.increment_me_d2c_batch_timeout_armed_total();
stats.increment_me_d2c_batch_timeout_fired_total();
assert_eq!(stats.get_me_d2c_batch_frames_bucket_5_8(), 1);
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_64k_128k(), 1);
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_1001_5000(), 1);
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 1);
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 1);
}
#[test]

View File

@ -126,14 +126,10 @@ pub(crate) async fn reader_loop(
let data = body.slice(12..);
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
let data_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
let routed = if data_wait_ms == 0 {
reg.route_nowait(cid, MeResponse::Data { flags, data })
.await
} else {
reg.route_with_timeout(cid, MeResponse::Data { flags, data }, data_wait_ms)
.await
};
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
let routed = reg
.route_with_timeout(cid, MeResponse::Data { flags, data }, route_wait_ms)
.await;
if !matches!(routed, RouteResult::Routed) {
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),