Compare commits

..

1 Commits

Author SHA1 Message Date
Vladimir Krivopalov 50452b22c7
Merge 95685adba7 into 2d3c2807ab 2026-03-21 19:09:47 +00:00
15 changed files with 245 additions and 1699 deletions

View File

@ -7,16 +7,7 @@ queries:
- uses: security-and-quality - uses: security-and-quality
- uses: ./.github/codeql/queries - uses: ./.github/codeql/queries
paths-ignore:
- "**/tests/**"
- "**/test/**"
- "**/*_test.rs"
- "**/*/tests.rs"
query-filters: query-filters:
- exclude:
tags:
- test
- exclude: - exclude:
id: id:
- rust/unwrap-on-option - rust/unwrap-on-option

View File

@ -4,6 +4,7 @@ on:
push: push:
tags: tags:
- '[0-9]+.[0-9]+.[0-9]+' - '[0-9]+.[0-9]+.[0-9]+'
- '[0-9]+.[0-9]+.[0-9]+-*'
workflow_dispatch: workflow_dispatch:
concurrency: concurrency:
@ -12,274 +13,204 @@ concurrency:
permissions: permissions:
contents: read contents: read
packages: write
env: env:
CARGO_TERM_COLOR: always CARGO_TERM_COLOR: always
RUST_BACKTRACE: "1"
BINARY_NAME: telemt BINARY_NAME: telemt
jobs: jobs:
# ========================== prepare:
# GNU / glibc
# ==========================
build-gnu:
name: GNU ${{ matrix.target }}
runs-on: ubuntu-latest runs-on: ubuntu-latest
outputs:
strategy: version: ${{ steps.meta.outputs.version }}
fail-fast: false prerelease: ${{ steps.meta.outputs.prerelease }}
matrix: release_enabled: ${{ steps.meta.outputs.release_enabled }}
include:
- target: x86_64-unknown-linux-gnu
asset: telemt-x86_64-linux-gnu
- target: aarch64-unknown-linux-gnu
asset: telemt-aarch64-linux-gnu
steps: steps:
- uses: actions/checkout@v4 - id: meta
- uses: dtolnay/rust-toolchain@v1
with:
toolchain: stable
targets: |
x86_64-unknown-linux-gnu
aarch64-unknown-linux-gnu
- name: Install deps
run: | run: |
sudo apt-get update set -euo pipefail
sudo apt-get install -y \
build-essential \
clang \
lld \
pkg-config \
gcc-aarch64-linux-gnu \
g++-aarch64-linux-gnu
- uses: actions/cache@v4 if [[ "${GITHUB_REF}" == refs/tags/* ]]; then
with: VERSION="${GITHUB_REF#refs/tags/}"
path: | RELEASE_ENABLED=true
~/.cargo/registry
~/.cargo/git
target
key: gnu-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }}
- name: Build
run: |
if [ "${{ matrix.target }}" = "aarch64-unknown-linux-gnu" ]; then
export CC=aarch64-linux-gnu-gcc
export CXX=aarch64-linux-gnu-g++
export CC_aarch64_unknown_linux_gnu=aarch64-linux-gnu-gcc
export CXX_aarch64_unknown_linux_gnu=aarch64-linux-gnu-g++
export RUSTFLAGS="-C linker=aarch64-linux-gnu-gcc"
else else
export CC=clang VERSION="manual-${GITHUB_SHA::7}"
export CXX=clang++ RELEASE_ENABLED=false
export CC_x86_64_unknown_linux_gnu=clang
export CXX_x86_64_unknown_linux_gnu=clang++
export RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=lld"
fi fi
cargo build --release --target ${{ matrix.target }} if [[ "$VERSION" == *"-alpha"* || "$VERSION" == *"-beta"* || "$VERSION" == *"-rc"* ]]; then
PRERELEASE=true
else
PRERELEASE=false
fi
- name: Package echo "version=$VERSION" >> "$GITHUB_OUTPUT"
run: | echo "prerelease=$PRERELEASE" >> "$GITHUB_OUTPUT"
mkdir -p dist echo "release_enabled=$RELEASE_ENABLED" >> "$GITHUB_OUTPUT"
BIN=target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}
cp "$BIN" dist/${{ env.BINARY_NAME }}-${{ matrix.target }} checks:
cd dist
tar -czf ${{ matrix.asset }}.tar.gz ${{ env.BINARY_NAME }}-${{ matrix.target }}
sha256sum ${{ matrix.asset }}.tar.gz > ${{ matrix.asset }}.sha256
- uses: actions/upload-artifact@v4
with:
name: ${{ matrix.asset }}
path: |
dist/${{ matrix.asset }}.tar.gz
dist/${{ matrix.asset }}.sha256
# ==========================
# MUSL
# ==========================
build-musl:
name: MUSL ${{ matrix.target }}
runs-on: ubuntu-latest runs-on: ubuntu-latest
container: container:
image: rust:slim-bookworm image: debian:trixie
steps:
- run: |
apt-get update
apt-get install -y build-essential clang llvm pkg-config curl git
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt, clippy
- uses: actions/cache@v4
with:
path: |
/github/home/.cargo/registry
/github/home/.cargo/git
target
key: checks-${{ hashFiles('**/Cargo.lock') }}
- run: cargo fetch --locked
- run: cargo fmt --all -- --check
- run: cargo clippy
- run: cargo test
build-binaries:
needs: [prepare, checks]
runs-on: ubuntu-latest
container:
image: debian:trixie
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
include: include:
- target: x86_64-unknown-linux-musl - rust_target: x86_64-unknown-linux-gnu
asset: telemt-x86_64-linux-musl zig_target: x86_64-unknown-linux-gnu.2.28
- target: aarch64-unknown-linux-musl asset_name: telemt-x86_64-linux-gnu
asset: telemt-aarch64-linux-musl - rust_target: aarch64-unknown-linux-gnu
zig_target: aarch64-unknown-linux-gnu.2.28
asset_name: telemt-aarch64-linux-gnu
- rust_target: x86_64-unknown-linux-musl
zig_target: x86_64-unknown-linux-musl
asset_name: telemt-x86_64-linux-musl
- rust_target: aarch64-unknown-linux-musl
zig_target: aarch64-unknown-linux-musl
asset_name: telemt-aarch64-linux-musl
steps: steps:
- uses: actions/checkout@v4 - run: |
- name: Install deps
run: |
apt-get update apt-get update
apt-get install -y \ apt-get install -y clang llvm pkg-config curl git python3 python3-pip file tar xz-utils
musl-tools \
pkg-config \
curl
- uses: actions/cache@v4 - uses: actions/checkout@v4
if: matrix.target == 'aarch64-unknown-linux-musl' - uses: dtolnay/rust-toolchain@stable
with: with:
path: ~/.musl-aarch64 targets: ${{ matrix.rust_target }}
key: musl-toolchain-aarch64-v1
- name: Install aarch64 musl toolchain
if: matrix.target == 'aarch64-unknown-linux-musl'
run: |
set -e
TOOLCHAIN_DIR="$HOME/.musl-aarch64"
ARCHIVE="aarch64-linux-musl-cross.tgz"
URL="https://github.com/telemt/telemt/releases/download/toolchains/$ARCHIVE"
if [ -x "$TOOLCHAIN_DIR/bin/aarch64-linux-musl-gcc" ]; then
echo "✅ MUSL toolchain already installed"
else
echo "⬇️ Downloading musl toolchain from Telemt GitHub Releases..."
curl -fL \
--retry 5 \
--retry-delay 3 \
--connect-timeout 10 \
--max-time 120 \
-o "$ARCHIVE" "$URL"
mkdir -p "$TOOLCHAIN_DIR"
tar -xzf "$ARCHIVE" --strip-components=1 -C "$TOOLCHAIN_DIR"
fi
echo "$TOOLCHAIN_DIR/bin" >> $GITHUB_PATH
- name: Add rust target
run: rustup target add ${{ matrix.target }}
- uses: actions/cache@v4 - uses: actions/cache@v4
with: with:
path: | path: |
/usr/local/cargo/registry /github/home/.cargo/registry
/usr/local/cargo/git /github/home/.cargo/git
target target
key: musl-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }} key: build-${{ matrix.zig_target }}-${{ hashFiles('**/Cargo.lock') }}
- name: Build - run: |
run: | python3 -m pip install --user --break-system-packages cargo-zigbuild
if [ "${{ matrix.target }}" = "aarch64-unknown-linux-musl" ]; then echo "/github/home/.local/bin" >> "$GITHUB_PATH"
export CC=aarch64-linux-musl-gcc
export CC_aarch64_unknown_linux_musl=aarch64-linux-musl-gcc
export RUSTFLAGS="-C target-feature=+crt-static -C linker=aarch64-linux-musl-gcc"
else
export CC=musl-gcc
export CC_x86_64_unknown_linux_musl=musl-gcc
export RUSTFLAGS="-C target-feature=+crt-static"
fi
cargo build --release --target ${{ matrix.target }} - run: cargo fetch --locked
- name: Package - run: |
run: | cargo zigbuild --release --locked --target "${{ matrix.zig_target }}"
mkdir -p dist
BIN=target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}
cp "$BIN" dist/${{ env.BINARY_NAME }}-${{ matrix.target }} - run: |
BIN="target/${{ matrix.rust_target }}/release/${BINARY_NAME}"
llvm-strip "$BIN" || true
cd dist - run: |
tar -czf ${{ matrix.asset }}.tar.gz ${{ env.BINARY_NAME }}-${{ matrix.target }} BIN="target/${{ matrix.rust_target }}/release/${BINARY_NAME}"
sha256sum ${{ matrix.asset }}.tar.gz > ${{ matrix.asset }}.sha256 OUT="$RUNNER_TEMP/${{ matrix.asset_name }}"
mkdir -p "$OUT"
install -m755 "$BIN" "$OUT/${BINARY_NAME}"
tar -C "$RUNNER_TEMP" -czf "${{ matrix.asset_name }}.tar.gz" "${{ matrix.asset_name }}"
sha256sum "${{ matrix.asset_name }}.tar.gz" > "${{ matrix.asset_name }}.sha256"
- uses: actions/upload-artifact@v4 - uses: actions/upload-artifact@v4
with: with:
name: ${{ matrix.asset }} name: ${{ matrix.asset_name }}
path: | path: |
dist/${{ matrix.asset }}.tar.gz ${{ matrix.asset_name }}.tar.gz
dist/${{ matrix.asset }}.sha256 ${{ matrix.asset_name }}.sha256
# ========================== docker-image:
# Docker name: Docker ${{ matrix.platform }}
# ========================== needs: [prepare, build-binaries]
docker:
name: Docker
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: [build-gnu, build-musl]
continue-on-error: true strategy:
matrix:
include:
- platform: linux/amd64
artifact: telemt-x86_64-linux-gnu
- platform: linux/arm64
artifact: telemt-aarch64-linux-gnu
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: actions/download-artifact@v4 - uses: actions/download-artifact@v4
with: with:
path: artifacts name: ${{ matrix.artifact }}
path: dist
- name: Extract binaries - run: |
run: | mkdir docker-build
mkdir dist tar -xzf dist/*.tar.gz -C docker-build --strip-components=1
find artifacts -name "*.tar.gz" -exec tar -xzf {} -C dist \;
cp dist/telemt-x86_64-unknown-linux-musl dist/telemt || true
- uses: docker/setup-qemu-action@v3
- uses: docker/setup-buildx-action@v3 - uses: docker/setup-buildx-action@v3
- name: Login to GHCR - name: Login
if: ${{ needs.prepare.outputs.release_enabled == 'true' }}
uses: docker/login-action@v3 uses: docker/login-action@v3
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.actor }} username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract version - uses: docker/build-push-action@v6
id: vars
run: echo "VERSION=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT
- name: Build & Push
uses: docker/build-push-action@v6
with: with:
context: . context: ./docker-build
push: true platforms: ${{ matrix.platform }}
platforms: linux/amd64,linux/arm64 push: ${{ needs.prepare.outputs.release_enabled == 'true' }}
tags: | tags: ghcr.io/${{ github.repository }}:${{ needs.prepare.outputs.version }}
ghcr.io/${{ github.repository }}:${{ steps.vars.outputs.VERSION }} cache-from: type=gha,scope=telemt-${{ matrix.platform }}
ghcr.io/${{ github.repository }}:latest cache-to: type=gha,mode=max,scope=telemt-${{ matrix.platform }}
build-args: | provenance: false
BINARY=dist/telemt sbom: false
# ==========================
# Release
# ==========================
release: release:
name: Release if: ${{ needs.prepare.outputs.release_enabled == 'true' }}
needs: [prepare, build-binaries]
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: [build-gnu, build-musl]
permissions: permissions:
contents: write contents: write
steps: steps:
- uses: actions/download-artifact@v4 - uses: actions/download-artifact@v4
with: with:
path: artifacts path: release-artifacts
pattern: telemt-*
- name: Flatten artifacts - run: |
run: | mkdir upload
mkdir dist find release-artifacts -type f \( -name '*.tar.gz' -o -name '*.sha256' \) -exec cp {} upload/ \;
find artifacts -type f -exec cp {} dist/ \;
- name: Create Release - uses: softprops/action-gh-release@v2
uses: softprops/action-gh-release@v2
with: with:
files: dist/* files: upload/*
generate_release_notes: true generate_release_notes: true
draft: false prerelease: ${{ needs.prepare.outputs.prerelease == 'true' }}
prerelease: ${{ contains(github.ref, '-rc') || contains(github.ref, '-beta') || contains(github.ref, '-alpha') }}

View File

@ -1,8 +1,8 @@
# Code of Conduct # Code of Conduct
## Purpose ## 1. Purpose
**Telemt exists to solve technical problems.** Telemt exists to solve technical problems.
Telemt is open to contributors who want to learn, improve and build meaningful systems together. Telemt is open to contributors who want to learn, improve and build meaningful systems together.
@ -18,34 +18,27 @@ Technology has consequences. Responsibility is inherent.
--- ---
## Principles ## 2. Principles
* **Technical over emotional** * **Technical over emotional**
Arguments are grounded in data, logs, reproducible cases, or clear reasoning. Arguments are grounded in data, logs, reproducible cases, or clear reasoning.
* **Clarity over noise** * **Clarity over noise**
Communication is structured, concise, and relevant. Communication is structured, concise, and relevant.
* **Openness with standards** * **Openness with standards**
Participation is open. The work remains disciplined. Participation is open. The work remains disciplined.
* **Independence of judgment** * **Independence of judgment**
Claims are evaluated on technical merit, not affiliation or posture. Claims are evaluated on technical merit, not affiliation or posture.
* **Responsibility over capability** * **Responsibility over capability**
Capability does not justify careless use. Capability does not justify careless use.
* **Cooperation over friction** * **Cooperation over friction**
Progress depends on coordination, mutual support, and honest review. Progress depends on coordination, mutual support, and honest review.
* **Good intent, rigorous method** * **Good intent, rigorous method**
Assume good intent, but require rigor. Assume good intent, but require rigor.
> **Aussagen gelten nach ihrer Begründung.** > **Aussagen gelten nach ihrer Begründung.**
@ -54,7 +47,7 @@ Technology has consequences. Responsibility is inherent.
--- ---
## Expected Behavior ## 3. Expected Behavior
Participants are expected to: Participants are expected to:
@ -76,7 +69,7 @@ New contributors are welcome. They are expected to grow into these standards. Ex
--- ---
## Unacceptable Behavior ## 4. Unacceptable Behavior
The following is not allowed: The following is not allowed:
@ -96,7 +89,7 @@ Such discussions may be closed, removed, or redirected.
--- ---
## Security and Misuse ## 5. Security and Misuse
Telemt is intended for responsible use. Telemt is intended for responsible use.
@ -116,13 +109,15 @@ Security is both technical and behavioral.
Telemt is open to contributors of different backgrounds, experience levels, and working styles. Telemt is open to contributors of different backgrounds, experience levels, and working styles.
- Standards are public, legible, and applied to the work itself. Standards are public, legible, and applied to the work itself.
- Questions are welcome. Careful disagreement is welcome. Honest correction is welcome.
- Gatekeeping by obscurity, status signaling, or hostility is not. Questions are welcome. Careful disagreement is welcome. Honest correction is welcome.
Gatekeeping by obscurity, status signaling, or hostility is not.
--- ---
## Scope ## 7. Scope
This Code of Conduct applies to all official spaces: This Code of Conduct applies to all official spaces:
@ -132,19 +127,16 @@ This Code of Conduct applies to all official spaces:
--- ---
## Maintainer Stewardship ## 8. Maintainer Stewardship
Maintainers are responsible for final decisions in matters of conduct, scope, and direction. Maintainers are responsible for final decisions in matters of conduct, scope, and direction.
This responsibility is stewardship: This responsibility is stewardship: preserving continuity, protecting signal, maintaining standards, and keeping Telemt workable for others.
- preserving continuity,
- protecting signal,
- maintaining standards,
- keeping Telemt workable for others.
Judgment should be exercised with restraint, consistency, and institutional responsibility. Judgment should be exercised with restraint, consistency, and institutional responsibility.
- Not every decision requires extended debate.
- Not every intervention requires public explanation. Not every decision requires extended debate.
Not every intervention requires public explanation.
All decisions are expected to serve the durability, clarity, and integrity of Telemt. All decisions are expected to serve the durability, clarity, and integrity of Telemt.
@ -154,7 +146,7 @@ All decisions are expected to serve the durability, clarity, and integrity of Te
--- ---
## Enforcement ## 9. Enforcement
Maintainers may act to preserve the integrity of Telemt, including by: Maintainers may act to preserve the integrity of Telemt, including by:
@ -164,40 +156,44 @@ Maintainers may act to preserve the integrity of Telemt, including by:
* Restricting or banning participants * Restricting or banning participants
Actions are taken to maintain function, continuity, and signal quality. Actions are taken to maintain function, continuity, and signal quality.
- Where possible, correction is preferred to exclusion.
- Where necessary, exclusion is preferred to decay. Where possible, correction is preferred to exclusion.
Where necessary, exclusion is preferred to decay.
--- ---
## Final ## 10. Final
Telemt is built on discipline, structure, and shared intent. Telemt is built on discipline, structure, and shared intent.
- Signal over noise.
- Facts over opinion.
- Systems over rhetoric.
- Work is collective. Signal over noise.
- Outcomes are shared. Facts over opinion.
- Responsibility is distributed. Systems over rhetoric.
- Precision is learned. Work is collective.
- Rigor is expected. Outcomes are shared.
- Help is part of the work. Responsibility is distributed.
Precision is learned.
Rigor is expected.
Help is part of the work.
> **Ordnung ist Voraussetzung der Freiheit.** > **Ordnung ist Voraussetzung der Freiheit.**
- If you contribute — contribute with care. If you contribute — contribute with care.
- If you speak — speak with substance. If you speak — speak with substance.
- If you engage — engage constructively. If you engage — engage constructively.
--- ---
## After All ## 11. After All
Systems outlive intentions. Systems outlive intentions.
- What is built will be used.
- What is released will propagate. What is built will be used.
- What is maintained will define the future state. What is released will propagate.
What is maintained will define the future state.
There is no neutral infrastructure, only infrastructure shaped well or poorly. There is no neutral infrastructure, only infrastructure shaped well or poorly.
@ -205,8 +201,8 @@ There is no neutral infrastructure, only infrastructure shaped well or poorly.
> Every system carries responsibility. > Every system carries responsibility.
- Stability requires discipline. Stability requires discipline.
- Freedom requires structure. Freedom requires structure.
- Trust requires honesty. Trust requires honesty.
In the end: the system reflects its contributors. In the end, the system reflects its contributors.

View File

@ -1,5 +1,3 @@
# syntax=docker/dockerfile:1
# ========================== # ==========================
# Stage 1: Build # Stage 1: Build
# ========================== # ==========================
@ -7,101 +5,36 @@ FROM rust:1.88-slim-bookworm AS builder
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
pkg-config \ pkg-config \
ca-certificates \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
WORKDIR /build WORKDIR /build
# Depcache
COPY Cargo.toml Cargo.lock* ./ COPY Cargo.toml Cargo.lock* ./
RUN mkdir src && echo 'fn main() {}' > src/main.rs && \ RUN mkdir src && echo 'fn main() {}' > src/main.rs && \
cargo build --release 2>/dev/null || true && \ cargo build --release 2>/dev/null || true && \
rm -rf src rm -rf src
# Build
COPY . . COPY . .
RUN cargo build --release && strip target/release/telemt RUN cargo build --release && strip target/release/telemt
# ========================== # ==========================
# Stage 2: Compress (strip + UPX) # Stage 2: Runtime
# ========================== # ==========================
FROM debian:12-slim AS minimal FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
binutils \
curl \
ca-certificates \
&& rm -rf /var/lib/apt/lists/* \
\
# install UPX from Telemt releases
&& curl -fL \
--retry 5 \
--retry-delay 3 \
--connect-timeout 10 \
--max-time 120 \
-o /tmp/upx.tar.xz \
https://github.com/telemt/telemt/releases/download/toolchains/upx-amd64_linux.tar.xz \
&& tar -xf /tmp/upx.tar.xz -C /tmp \
&& mv /tmp/upx*/upx /usr/local/bin/upx \
&& chmod +x /usr/local/bin/upx \
&& rm -rf /tmp/upx*
COPY --from=builder /build/target/release/telemt /telemt
RUN strip /telemt || true
RUN upx --best --lzma /telemt || true
# ==========================
# Stage 3: Debug base
# ==========================
FROM debian:12-slim AS debug-base
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \ ca-certificates \
tzdata \
curl \
iproute2 \
busybox \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# ========================== RUN useradd -r -s /usr/sbin/nologin telemt
# Stage 4: Debug image
# ==========================
FROM debug-base AS debug
WORKDIR /app WORKDIR /app
COPY --from=minimal /telemt /app/telemt COPY --from=builder /build/target/release/telemt /app/telemt
COPY config.toml /app/config.toml COPY config.toml /app/config.toml
USER root RUN chown -R telemt:telemt /app
USER telemt
EXPOSE 443
EXPOSE 9090
EXPOSE 9091
ENTRYPOINT ["/app/telemt"]
CMD ["config.toml"]
# ==========================
# Stage 5: Production (distroless)
# ==========================
FROM gcr.io/distroless/base-debian12 AS prod
WORKDIR /app
COPY --from=minimal /telemt /app/telemt
COPY config.toml /app/config.toml
# TLS + timezone + shell
COPY --from=debug-base /etc/ssl/certs /etc/ssl/certs
COPY --from=debug-base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=debug-base /bin/busybox /bin/busybox
RUN ["/bin/busybox", "--install", "-s", "/bin"]
# distroless user
USER nonroot:nonroot
EXPOSE 443 EXPOSE 443
EXPOSE 9090 EXPOSE 9090

View File

@ -174,24 +174,6 @@ pub(super) struct ZeroMiddleProxyData {
pub(super) route_drop_queue_full_total: u64, pub(super) route_drop_queue_full_total: u64,
pub(super) route_drop_queue_full_base_total: u64, pub(super) route_drop_queue_full_base_total: u64,
pub(super) route_drop_queue_full_high_total: u64, pub(super) route_drop_queue_full_high_total: u64,
pub(super) d2c_batches_total: u64,
pub(super) d2c_batch_frames_total: u64,
pub(super) d2c_batch_bytes_total: u64,
pub(super) d2c_flush_reason_queue_drain_total: u64,
pub(super) d2c_flush_reason_batch_frames_total: u64,
pub(super) d2c_flush_reason_batch_bytes_total: u64,
pub(super) d2c_flush_reason_max_delay_total: u64,
pub(super) d2c_flush_reason_ack_immediate_total: u64,
pub(super) d2c_flush_reason_close_total: u64,
pub(super) d2c_data_frames_total: u64,
pub(super) d2c_ack_frames_total: u64,
pub(super) d2c_payload_bytes_total: u64,
pub(super) d2c_write_mode_coalesced_total: u64,
pub(super) d2c_write_mode_split_total: u64,
pub(super) d2c_quota_reject_pre_write_total: u64,
pub(super) d2c_quota_reject_post_write_total: u64,
pub(super) d2c_frame_buf_shrink_total: u64,
pub(super) d2c_frame_buf_shrink_bytes_total: u64,
pub(super) socks_kdf_strict_reject_total: u64, pub(super) socks_kdf_strict_reject_total: u64,
pub(super) socks_kdf_compat_fallback_total: u64, pub(super) socks_kdf_compat_fallback_total: u64,
pub(super) endpoint_quarantine_total: u64, pub(super) endpoint_quarantine_total: u64,

View File

@ -68,25 +68,6 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
route_drop_queue_full_total: stats.get_me_route_drop_queue_full(), route_drop_queue_full_total: stats.get_me_route_drop_queue_full(),
route_drop_queue_full_base_total: stats.get_me_route_drop_queue_full_base(), route_drop_queue_full_base_total: stats.get_me_route_drop_queue_full_base(),
route_drop_queue_full_high_total: stats.get_me_route_drop_queue_full_high(), route_drop_queue_full_high_total: stats.get_me_route_drop_queue_full_high(),
d2c_batches_total: stats.get_me_d2c_batches_total(),
d2c_batch_frames_total: stats.get_me_d2c_batch_frames_total(),
d2c_batch_bytes_total: stats.get_me_d2c_batch_bytes_total(),
d2c_flush_reason_queue_drain_total: stats.get_me_d2c_flush_reason_queue_drain_total(),
d2c_flush_reason_batch_frames_total: stats.get_me_d2c_flush_reason_batch_frames_total(),
d2c_flush_reason_batch_bytes_total: stats.get_me_d2c_flush_reason_batch_bytes_total(),
d2c_flush_reason_max_delay_total: stats.get_me_d2c_flush_reason_max_delay_total(),
d2c_flush_reason_ack_immediate_total: stats
.get_me_d2c_flush_reason_ack_immediate_total(),
d2c_flush_reason_close_total: stats.get_me_d2c_flush_reason_close_total(),
d2c_data_frames_total: stats.get_me_d2c_data_frames_total(),
d2c_ack_frames_total: stats.get_me_d2c_ack_frames_total(),
d2c_payload_bytes_total: stats.get_me_d2c_payload_bytes_total(),
d2c_write_mode_coalesced_total: stats.get_me_d2c_write_mode_coalesced_total(),
d2c_write_mode_split_total: stats.get_me_d2c_write_mode_split_total(),
d2c_quota_reject_pre_write_total: stats.get_me_d2c_quota_reject_pre_write_total(),
d2c_quota_reject_post_write_total: stats.get_me_d2c_quota_reject_post_write_total(),
d2c_frame_buf_shrink_total: stats.get_me_d2c_frame_buf_shrink_total(),
d2c_frame_buf_shrink_bytes_total: stats.get_me_d2c_frame_buf_shrink_bytes_total(),
socks_kdf_strict_reject_total: stats.get_me_socks_kdf_strict_reject(), socks_kdf_strict_reject_total: stats.get_me_socks_kdf_strict_reject(),
socks_kdf_compat_fallback_total: stats.get_me_socks_kdf_compat_fallback(), socks_kdf_compat_fallback_total: stats.get_me_socks_kdf_compat_fallback(),
endpoint_quarantine_total: stats.get_me_endpoint_quarantine_total(), endpoint_quarantine_total: stats.get_me_endpoint_quarantine_total(),

View File

@ -29,8 +29,6 @@ const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES: usize = 32;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024; const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 500; const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 500;
const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = true; const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = true;
const DEFAULT_ME_QUOTA_SOFT_OVERSHOOT_BYTES: u64 = 64 * 1024;
const DEFAULT_ME_D2C_FRAME_BUF_SHRINK_THRESHOLD_BYTES: usize = 256 * 1024;
const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024; const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024;
const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024; const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024;
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3; const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
@ -389,14 +387,6 @@ pub(crate) fn default_me_d2c_ack_flush_immediate() -> bool {
DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE
} }
pub(crate) fn default_me_quota_soft_overshoot_bytes() -> u64 {
DEFAULT_ME_QUOTA_SOFT_OVERSHOOT_BYTES
}
pub(crate) fn default_me_d2c_frame_buf_shrink_threshold_bytes() -> usize {
DEFAULT_ME_D2C_FRAME_BUF_SHRINK_THRESHOLD_BYTES
}
pub(crate) fn default_direct_relay_copy_buf_c2s_bytes() -> usize { pub(crate) fn default_direct_relay_copy_buf_c2s_bytes() -> usize {
DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES
} }

View File

@ -106,8 +106,6 @@ pub struct HotFields {
pub me_d2c_flush_batch_max_bytes: usize, pub me_d2c_flush_batch_max_bytes: usize,
pub me_d2c_flush_batch_max_delay_us: u64, pub me_d2c_flush_batch_max_delay_us: u64,
pub me_d2c_ack_flush_immediate: bool, pub me_d2c_ack_flush_immediate: bool,
pub me_quota_soft_overshoot_bytes: u64,
pub me_d2c_frame_buf_shrink_threshold_bytes: usize,
pub direct_relay_copy_buf_c2s_bytes: usize, pub direct_relay_copy_buf_c2s_bytes: usize,
pub direct_relay_copy_buf_s2c_bytes: usize, pub direct_relay_copy_buf_s2c_bytes: usize,
pub me_health_interval_ms_unhealthy: u64, pub me_health_interval_ms_unhealthy: u64,
@ -227,8 +225,6 @@ impl HotFields {
me_d2c_flush_batch_max_bytes: cfg.general.me_d2c_flush_batch_max_bytes, me_d2c_flush_batch_max_bytes: cfg.general.me_d2c_flush_batch_max_bytes,
me_d2c_flush_batch_max_delay_us: cfg.general.me_d2c_flush_batch_max_delay_us, me_d2c_flush_batch_max_delay_us: cfg.general.me_d2c_flush_batch_max_delay_us,
me_d2c_ack_flush_immediate: cfg.general.me_d2c_ack_flush_immediate, me_d2c_ack_flush_immediate: cfg.general.me_d2c_ack_flush_immediate,
me_quota_soft_overshoot_bytes: cfg.general.me_quota_soft_overshoot_bytes,
me_d2c_frame_buf_shrink_threshold_bytes: cfg.general.me_d2c_frame_buf_shrink_threshold_bytes,
direct_relay_copy_buf_c2s_bytes: cfg.general.direct_relay_copy_buf_c2s_bytes, direct_relay_copy_buf_c2s_bytes: cfg.general.direct_relay_copy_buf_c2s_bytes,
direct_relay_copy_buf_s2c_bytes: cfg.general.direct_relay_copy_buf_s2c_bytes, direct_relay_copy_buf_s2c_bytes: cfg.general.direct_relay_copy_buf_s2c_bytes,
me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy, me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy,
@ -515,9 +511,6 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
cfg.general.me_d2c_flush_batch_max_bytes = new.general.me_d2c_flush_batch_max_bytes; cfg.general.me_d2c_flush_batch_max_bytes = new.general.me_d2c_flush_batch_max_bytes;
cfg.general.me_d2c_flush_batch_max_delay_us = new.general.me_d2c_flush_batch_max_delay_us; cfg.general.me_d2c_flush_batch_max_delay_us = new.general.me_d2c_flush_batch_max_delay_us;
cfg.general.me_d2c_ack_flush_immediate = new.general.me_d2c_ack_flush_immediate; cfg.general.me_d2c_ack_flush_immediate = new.general.me_d2c_ack_flush_immediate;
cfg.general.me_quota_soft_overshoot_bytes = new.general.me_quota_soft_overshoot_bytes;
cfg.general.me_d2c_frame_buf_shrink_threshold_bytes =
new.general.me_d2c_frame_buf_shrink_threshold_bytes;
cfg.general.direct_relay_copy_buf_c2s_bytes = new.general.direct_relay_copy_buf_c2s_bytes; cfg.general.direct_relay_copy_buf_c2s_bytes = new.general.direct_relay_copy_buf_c2s_bytes;
cfg.general.direct_relay_copy_buf_s2c_bytes = new.general.direct_relay_copy_buf_s2c_bytes; cfg.general.direct_relay_copy_buf_s2c_bytes = new.general.direct_relay_copy_buf_s2c_bytes;
cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy; cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy;
@ -1037,20 +1030,15 @@ fn log_changes(
|| old_hot.me_d2c_flush_batch_max_bytes != new_hot.me_d2c_flush_batch_max_bytes || old_hot.me_d2c_flush_batch_max_bytes != new_hot.me_d2c_flush_batch_max_bytes
|| old_hot.me_d2c_flush_batch_max_delay_us != new_hot.me_d2c_flush_batch_max_delay_us || old_hot.me_d2c_flush_batch_max_delay_us != new_hot.me_d2c_flush_batch_max_delay_us
|| old_hot.me_d2c_ack_flush_immediate != new_hot.me_d2c_ack_flush_immediate || old_hot.me_d2c_ack_flush_immediate != new_hot.me_d2c_ack_flush_immediate
|| old_hot.me_quota_soft_overshoot_bytes != new_hot.me_quota_soft_overshoot_bytes
|| old_hot.me_d2c_frame_buf_shrink_threshold_bytes
!= new_hot.me_d2c_frame_buf_shrink_threshold_bytes
|| old_hot.direct_relay_copy_buf_c2s_bytes != new_hot.direct_relay_copy_buf_c2s_bytes || old_hot.direct_relay_copy_buf_c2s_bytes != new_hot.direct_relay_copy_buf_c2s_bytes
|| old_hot.direct_relay_copy_buf_s2c_bytes != new_hot.direct_relay_copy_buf_s2c_bytes || old_hot.direct_relay_copy_buf_s2c_bytes != new_hot.direct_relay_copy_buf_s2c_bytes
{ {
info!( info!(
"config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} me_quota_soft_overshoot_bytes={} me_d2c_frame_buf_shrink_threshold_bytes={} direct_buf_c2s={} direct_buf_s2c={}", "config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} direct_buf_c2s={} direct_buf_s2c={}",
new_hot.me_d2c_flush_batch_max_frames, new_hot.me_d2c_flush_batch_max_frames,
new_hot.me_d2c_flush_batch_max_bytes, new_hot.me_d2c_flush_batch_max_bytes,
new_hot.me_d2c_flush_batch_max_delay_us, new_hot.me_d2c_flush_batch_max_delay_us,
new_hot.me_d2c_ack_flush_immediate, new_hot.me_d2c_ack_flush_immediate,
new_hot.me_quota_soft_overshoot_bytes,
new_hot.me_d2c_frame_buf_shrink_threshold_bytes,
new_hot.direct_relay_copy_buf_c2s_bytes, new_hot.direct_relay_copy_buf_c2s_bytes,
new_hot.direct_relay_copy_buf_s2c_bytes, new_hot.direct_relay_copy_buf_s2c_bytes,
); );

View File

@ -533,19 +533,6 @@ impl ProxyConfig {
)); ));
} }
if config.general.me_quota_soft_overshoot_bytes > 16 * 1024 * 1024 {
return Err(ProxyError::Config(
"general.me_quota_soft_overshoot_bytes must be within [0, 16777216]".to_string(),
));
}
if !(4096..=16 * 1024 * 1024).contains(&config.general.me_d2c_frame_buf_shrink_threshold_bytes) {
return Err(ProxyError::Config(
"general.me_d2c_frame_buf_shrink_threshold_bytes must be within [4096, 16777216]"
.to_string(),
));
}
if !(4096..=1024 * 1024).contains(&config.general.direct_relay_copy_buf_c2s_bytes) { if !(4096..=1024 * 1024).contains(&config.general.direct_relay_copy_buf_c2s_bytes) {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.direct_relay_copy_buf_c2s_bytes must be within [4096, 1048576]" "general.direct_relay_copy_buf_c2s_bytes must be within [4096, 1048576]"

View File

@ -468,7 +468,7 @@ pub struct GeneralConfig {
pub me_c2me_send_timeout_ms: u64, pub me_c2me_send_timeout_ms: u64,
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue. /// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
/// `0` keeps non-blocking routing; values >0 enable bounded wait for compatibility. /// `0` keeps legacy no-wait behavior.
#[serde(default = "default_me_reader_route_data_wait_ms")] #[serde(default = "default_me_reader_route_data_wait_ms")]
pub me_reader_route_data_wait_ms: u64, pub me_reader_route_data_wait_ms: u64,
@ -489,14 +489,6 @@ pub struct GeneralConfig {
#[serde(default = "default_me_d2c_ack_flush_immediate")] #[serde(default = "default_me_d2c_ack_flush_immediate")]
pub me_d2c_ack_flush_immediate: bool, pub me_d2c_ack_flush_immediate: bool,
/// Additional bytes above strict per-user quota allowed in hot-path soft mode.
#[serde(default = "default_me_quota_soft_overshoot_bytes")]
pub me_quota_soft_overshoot_bytes: u64,
/// Shrink threshold for reusable ME->Client frame assembly buffer.
#[serde(default = "default_me_d2c_frame_buf_shrink_threshold_bytes")]
pub me_d2c_frame_buf_shrink_threshold_bytes: usize,
/// Copy buffer size for client->DC direction in direct relay. /// Copy buffer size for client->DC direction in direct relay.
#[serde(default = "default_direct_relay_copy_buf_c2s_bytes")] #[serde(default = "default_direct_relay_copy_buf_c2s_bytes")]
pub direct_relay_copy_buf_c2s_bytes: usize, pub direct_relay_copy_buf_c2s_bytes: usize,
@ -953,8 +945,6 @@ impl Default for GeneralConfig {
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(), me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
me_d2c_flush_batch_max_delay_us: default_me_d2c_flush_batch_max_delay_us(), me_d2c_flush_batch_max_delay_us: default_me_d2c_flush_batch_max_delay_us(),
me_d2c_ack_flush_immediate: default_me_d2c_ack_flush_immediate(), me_d2c_ack_flush_immediate: default_me_d2c_ack_flush_immediate(),
me_quota_soft_overshoot_bytes: default_me_quota_soft_overshoot_bytes(),
me_d2c_frame_buf_shrink_threshold_bytes: default_me_d2c_frame_buf_shrink_threshold_bytes(),
direct_relay_copy_buf_c2s_bytes: default_direct_relay_copy_buf_c2s_bytes(), direct_relay_copy_buf_c2s_bytes: default_direct_relay_copy_buf_c2s_bytes(),
direct_relay_copy_buf_s2c_bytes: default_direct_relay_copy_buf_s2c_bytes(), direct_relay_copy_buf_s2c_bytes: default_direct_relay_copy_buf_s2c_bytes(),
me_warmup_stagger_enabled: default_true(), me_warmup_stagger_enabled: default_true(),

View File

@ -935,462 +935,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
} }
); );
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batches_total Total DC->Client flush batches"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_batches_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_batches_total {}",
if me_allows_normal {
stats.get_me_d2c_batches_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_frames_total Total DC->Client frames flushed in batches"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_frames_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_total {}",
if me_allows_normal {
stats.get_me_d2c_batch_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_bytes_total Total DC->Client bytes flushed in batches"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_bytes_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_total {}",
if me_allows_normal {
stats.get_me_d2c_batch_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_flush_reason_total DC->Client flush reasons"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_flush_reason_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"queue_drain\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_queue_drain_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"batch_frames\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_batch_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"batch_bytes\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_batch_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"max_delay\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_max_delay_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"ack_immediate\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_ack_immediate_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"close\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_close_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_data_frames_total DC->Client data frames"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_data_frames_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_data_frames_total {}",
if me_allows_normal {
stats.get_me_d2c_data_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_ack_frames_total DC->Client quick-ack frames"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_ack_frames_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_ack_frames_total {}",
if me_allows_normal {
stats.get_me_d2c_ack_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_payload_bytes_total DC->Client payload bytes before transport framing"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_payload_bytes_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_payload_bytes_total {}",
if me_allows_normal {
stats.get_me_d2c_payload_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_write_mode_total DC->Client writer mode selection"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_write_mode_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_write_mode_total{{mode=\"coalesced\"}} {}",
if me_allows_normal {
stats.get_me_d2c_write_mode_coalesced_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_write_mode_total{{mode=\"split\"}} {}",
if me_allows_normal {
stats.get_me_d2c_write_mode_split_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_quota_reject_total DC->Client quota rejects"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_quota_reject_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_quota_reject_total{{stage=\"pre_write\"}} {}",
if me_allows_normal {
stats.get_me_d2c_quota_reject_pre_write_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_quota_reject_total{{stage=\"post_write\"}} {}",
if me_allows_normal {
stats.get_me_d2c_quota_reject_post_write_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_frame_buf_shrink_total DC->Client reusable frame buffer shrink events"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_frame_buf_shrink_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_frame_buf_shrink_total {}",
if me_allows_normal {
stats.get_me_d2c_frame_buf_shrink_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_frame_buf_shrink_bytes_total DC->Client reusable frame buffer bytes released"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_frame_buf_shrink_bytes_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_frame_buf_shrink_bytes_total {}",
if me_allows_normal {
stats.get_me_d2c_frame_buf_shrink_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_frames_bucket_total DC->Client batch frame count buckets"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_frames_bucket_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"1\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_1()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"2_4\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_2_4()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"5_8\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_5_8()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"9_16\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_9_16()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"17_32\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_17_32()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"gt_32\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_gt_32()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_bytes_bucket_total DC->Client batch byte size buckets"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_bytes_bucket_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"0_1k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_0_1k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"1k_4k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_1k_4k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"4k_16k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_4k_16k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"16k_64k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_16k_64k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"64k_128k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_64k_128k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"gt_128k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_gt_128k()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_flush_duration_us_bucket_total DC->Client flush duration buckets"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_flush_duration_us_bucket_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"0_50\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_0_50()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"51_200\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_51_200()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"201_1000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_201_1000()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"1001_5000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_1001_5000()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"5001_20000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_5001_20000()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"gt_20000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_gt_20000()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_timeout_armed_total DC->Client max-delay timer armed events"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_timeout_armed_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_timeout_armed_total {}",
if me_allows_debug {
stats.get_me_d2c_batch_timeout_armed_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_timeout_fired_total DC->Client max-delay timer fired events"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_timeout_fired_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_timeout_fired_total {}",
if me_allows_debug {
stats.get_me_d2c_batch_timeout_fired_total()
} else {
0
}
);
let _ = writeln!( let _ = writeln!(
out, out,
"# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result" "# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result"
@ -2601,16 +2145,6 @@ mod tests {
stats.increment_relay_idle_hard_close_total(); stats.increment_relay_idle_hard_close_total();
stats.increment_relay_pressure_evict_total(); stats.increment_relay_pressure_evict_total();
stats.increment_relay_protocol_desync_close_total(); stats.increment_relay_protocol_desync_close_total();
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(3);
stats.add_me_d2c_batch_bytes_total(2048);
stats.increment_me_d2c_flush_reason(crate::stats::MeD2cFlushReason::AckImmediate);
stats.increment_me_d2c_data_frames_total();
stats.increment_me_d2c_ack_frames_total();
stats.add_me_d2c_payload_bytes_total(1800);
stats.increment_me_d2c_write_mode(crate::stats::MeD2cWriteMode::Coalesced);
stats.increment_me_d2c_quota_reject_total(crate::stats::MeD2cQuotaRejectStage::PostWrite);
stats.observe_me_d2c_frame_buf_shrink(4096);
stats.increment_user_connects("alice"); stats.increment_user_connects("alice");
stats.increment_user_curr_connects("alice"); stats.increment_user_curr_connects("alice");
stats.add_user_octets_from("alice", 1024); stats.add_user_octets_from("alice", 1024);
@ -2650,17 +2184,6 @@ mod tests {
assert!(output.contains("telemt_relay_idle_hard_close_total 1")); assert!(output.contains("telemt_relay_idle_hard_close_total 1"));
assert!(output.contains("telemt_relay_pressure_evict_total 1")); assert!(output.contains("telemt_relay_pressure_evict_total 1"));
assert!(output.contains("telemt_relay_protocol_desync_close_total 1")); assert!(output.contains("telemt_relay_protocol_desync_close_total 1"));
assert!(output.contains("telemt_me_d2c_batches_total 1"));
assert!(output.contains("telemt_me_d2c_batch_frames_total 3"));
assert!(output.contains("telemt_me_d2c_batch_bytes_total 2048"));
assert!(output.contains("telemt_me_d2c_flush_reason_total{reason=\"ack_immediate\"} 1"));
assert!(output.contains("telemt_me_d2c_data_frames_total 1"));
assert!(output.contains("telemt_me_d2c_ack_frames_total 1"));
assert!(output.contains("telemt_me_d2c_payload_bytes_total 1800"));
assert!(output.contains("telemt_me_d2c_write_mode_total{mode=\"coalesced\"} 1"));
assert!(output.contains("telemt_me_d2c_quota_reject_total{stage=\"post_write\"} 1"));
assert!(output.contains("telemt_me_d2c_frame_buf_shrink_total 1"));
assert!(output.contains("telemt_me_d2c_frame_buf_shrink_bytes_total 4096"));
assert!(output.contains("telemt_user_connections_total{user=\"alice\"} 1")); assert!(output.contains("telemt_user_connections_total{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_connections_current{user=\"alice\"} 1")); assert!(output.contains("telemt_user_connections_current{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_octets_from_client{user=\"alice\"} 1024")); assert!(output.contains("telemt_user_octets_from_client{user=\"alice\"} 1024"));
@ -2722,11 +2245,6 @@ mod tests {
assert!(output.contains("# TYPE telemt_relay_idle_hard_close_total counter")); assert!(output.contains("# TYPE telemt_relay_idle_hard_close_total counter"));
assert!(output.contains("# TYPE telemt_relay_pressure_evict_total counter")); assert!(output.contains("# TYPE telemt_relay_pressure_evict_total counter"));
assert!(output.contains("# TYPE telemt_relay_protocol_desync_close_total counter")); assert!(output.contains("# TYPE telemt_relay_protocol_desync_close_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_batches_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_flush_reason_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_write_mode_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_batch_frames_bucket_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_flush_duration_us_bucket_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!( assert!(
output output

View File

@ -21,7 +21,7 @@ use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay, cutover_stagger_delay,
}; };
use crate::stats::{MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, Stats}; use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader, CryptoWriter, PooledBuffer}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter, PooledBuffer};
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag}; use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
@ -45,8 +45,6 @@ const C2ME_SEND_TIMEOUT: Duration = Duration::from_millis(50);
const C2ME_SEND_TIMEOUT: Duration = Duration::from_secs(5); const C2ME_SEND_TIMEOUT: Duration = Duration::from_secs(5);
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1; const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096; const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
const ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR: usize = 2;
const ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES: usize = 128 * 1024;
#[cfg(test)] #[cfg(test)]
const QUOTA_USER_LOCKS_MAX: usize = 64; const QUOTA_USER_LOCKS_MAX: usize = 64;
#[cfg(not(test))] #[cfg(not(test))]
@ -216,8 +214,6 @@ struct MeD2cFlushPolicy {
max_bytes: usize, max_bytes: usize,
max_delay: Duration, max_delay: Duration,
ack_flush_immediate: bool, ack_flush_immediate: bool,
quota_soft_overshoot_bytes: u64,
frame_buf_shrink_threshold_bytes: usize,
} }
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@ -288,11 +284,6 @@ impl MeD2cFlushPolicy {
.max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN), .max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN),
max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us), max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us),
ack_flush_immediate: config.general.me_d2c_ack_flush_immediate, ack_flush_immediate: config.general.me_d2c_ack_flush_immediate,
quota_soft_overshoot_bytes: config.general.me_quota_soft_overshoot_bytes,
frame_buf_shrink_threshold_bytes: config
.general
.me_d2c_frame_buf_shrink_threshold_bytes
.max(4096),
} }
} }
} }
@ -535,7 +526,6 @@ fn quota_exceeded_for_user(stats: &Stats, user: &str, quota_limit: Option<u64>)
quota_limit.is_some_and(|quota| stats.get_user_total_octets(user) >= quota) quota_limit.is_some_and(|quota| stats.get_user_total_octets(user) >= quota)
} }
#[cfg_attr(not(test), allow(dead_code))]
fn quota_would_be_exceeded_for_user( fn quota_would_be_exceeded_for_user(
stats: &Stats, stats: &Stats,
user: &str, user: &str,
@ -548,76 +538,6 @@ fn quota_would_be_exceeded_for_user(
}) })
} }
fn quota_soft_cap(limit: u64, overshoot: u64) -> u64 {
limit.saturating_add(overshoot)
}
fn quota_exceeded_for_user_soft(
stats: &Stats,
user: &str,
quota_limit: Option<u64>,
overshoot: u64,
) -> bool {
quota_limit.is_some_and(|quota| stats.get_user_total_octets(user) >= quota_soft_cap(quota, overshoot))
}
fn quota_would_be_exceeded_for_user_soft(
stats: &Stats,
user: &str,
quota_limit: Option<u64>,
bytes: u64,
overshoot: u64,
) -> bool {
quota_limit.is_some_and(|quota| {
let cap = quota_soft_cap(quota, overshoot);
let used = stats.get_user_total_octets(user);
used >= cap || bytes > cap.saturating_sub(used)
})
}
fn classify_me_d2c_flush_reason(
flush_immediately: bool,
batch_frames: usize,
max_frames: usize,
batch_bytes: usize,
max_bytes: usize,
max_delay_fired: bool,
) -> MeD2cFlushReason {
if flush_immediately {
return MeD2cFlushReason::AckImmediate;
}
if batch_frames >= max_frames {
return MeD2cFlushReason::BatchFrames;
}
if batch_bytes >= max_bytes {
return MeD2cFlushReason::BatchBytes;
}
if max_delay_fired {
return MeD2cFlushReason::MaxDelay;
}
MeD2cFlushReason::QueueDrain
}
fn observe_me_d2c_flush_event(
stats: &Stats,
reason: MeD2cFlushReason,
batch_frames: usize,
batch_bytes: usize,
flush_duration_us: Option<u64>,
) {
stats.increment_me_d2c_flush_reason(reason);
if batch_frames > 0 || batch_bytes > 0 {
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(batch_frames as u64);
stats.add_me_d2c_batch_bytes_total(batch_bytes as u64);
stats.observe_me_d2c_batch_frames(batch_frames as u64);
stats.observe_me_d2c_batch_bytes(batch_bytes as u64);
}
if let Some(duration_us) = flush_duration_us {
stats.observe_me_d2c_flush_duration_us(duration_us);
}
}
#[cfg(test)] #[cfg(test)]
fn quota_user_lock_test_guard() -> &'static Mutex<()> { fn quota_user_lock_test_guard() -> &'static Mutex<()> {
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new(); static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
@ -854,7 +774,6 @@ where
let mut batch_frames = 0usize; let mut batch_frames = 0usize;
let mut batch_bytes = 0usize; let mut batch_bytes = 0usize;
let mut flush_immediately; let mut flush_immediately;
let mut max_delay_fired = false;
let first_is_downstream_activity = let first_is_downstream_activity =
matches!(&first, MeResponse::Data { .. } | MeResponse::Ack(_)); matches!(&first, MeResponse::Data { .. } | MeResponse::Ack(_));
@ -867,7 +786,6 @@ where
stats_clone.as_ref(), stats_clone.as_ref(),
&user_clone, &user_clone,
quota_limit, quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(), bytes_me2c_clone.as_ref(),
conn_id, conn_id,
d2c_flush_policy.ack_flush_immediate, d2c_flush_policy.ack_flush_immediate,
@ -883,25 +801,7 @@ where
flush_immediately = immediate; flush_immediately = immediate;
} }
MeWriterResponseOutcome::Close => { MeWriterResponseOutcome::Close => {
let flush_started_at = if stats_clone.telemetry_policy().me_level.allows_debug() {
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await; let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX)) as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(()); return Ok(());
} }
} }
@ -925,7 +825,6 @@ where
stats_clone.as_ref(), stats_clone.as_ref(),
&user_clone, &user_clone,
quota_limit, quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(), bytes_me2c_clone.as_ref(),
conn_id, conn_id,
d2c_flush_policy.ack_flush_immediate, d2c_flush_policy.ack_flush_immediate,
@ -941,27 +840,7 @@ where
flush_immediately |= immediate; flush_immediately |= immediate;
} }
MeWriterResponseOutcome::Close => { MeWriterResponseOutcome::Close => {
let flush_started_at =
if stats_clone.telemetry_policy().me_level.allows_debug() {
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await; let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX))
as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(()); return Ok(());
} }
} }
@ -972,7 +851,6 @@ where
&& batch_frames < d2c_flush_policy.max_frames && batch_frames < d2c_flush_policy.max_frames
&& batch_bytes < d2c_flush_policy.max_bytes && batch_bytes < d2c_flush_policy.max_bytes
{ {
stats_clone.increment_me_d2c_batch_timeout_armed_total();
match tokio::time::timeout(d2c_flush_policy.max_delay, me_rx_task.recv()).await { match tokio::time::timeout(d2c_flush_policy.max_delay, me_rx_task.recv()).await {
Ok(Some(next)) => { Ok(Some(next)) => {
let next_is_downstream_activity = let next_is_downstream_activity =
@ -986,7 +864,6 @@ where
stats_clone.as_ref(), stats_clone.as_ref(),
&user_clone, &user_clone,
quota_limit, quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(), bytes_me2c_clone.as_ref(),
conn_id, conn_id,
d2c_flush_policy.ack_flush_immediate, d2c_flush_policy.ack_flush_immediate,
@ -1002,30 +879,7 @@ where
flush_immediately |= immediate; flush_immediately |= immediate;
} }
MeWriterResponseOutcome::Close => { MeWriterResponseOutcome::Close => {
let flush_started_at = if stats_clone
.telemetry_policy()
.me_level
.allows_debug()
{
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await; let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX))
as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(()); return Ok(());
} }
} }
@ -1049,7 +903,6 @@ where
stats_clone.as_ref(), stats_clone.as_ref(),
&user_clone, &user_clone,
quota_limit, quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(), bytes_me2c_clone.as_ref(),
conn_id, conn_id,
d2c_flush_policy.ack_flush_immediate, d2c_flush_policy.ack_flush_immediate,
@ -1065,30 +918,7 @@ where
flush_immediately |= immediate; flush_immediately |= immediate;
} }
MeWriterResponseOutcome::Close => { MeWriterResponseOutcome::Close => {
let flush_started_at = if stats_clone
.telemetry_policy()
.me_level
.allows_debug()
{
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await; let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX))
as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(()); return Ok(());
} }
} }
@ -1098,50 +928,11 @@ where
debug!(conn_id, "ME channel closed"); debug!(conn_id, "ME channel closed");
return Err(ProxyError::Proxy("ME connection lost".into())); return Err(ProxyError::Proxy("ME connection lost".into()));
} }
Err(_) => { Err(_) => {}
max_delay_fired = true;
stats_clone.increment_me_d2c_batch_timeout_fired_total();
}
} }
} }
let flush_reason = classify_me_d2c_flush_reason(
flush_immediately,
batch_frames,
d2c_flush_policy.max_frames,
batch_bytes,
d2c_flush_policy.max_bytes,
max_delay_fired,
);
let flush_started_at = if stats_clone.telemetry_policy().me_level.allows_debug() {
Some(Instant::now())
} else {
None
};
writer.flush().await.map_err(ProxyError::Io)?; writer.flush().await.map_err(ProxyError::Io)?;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX)) as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
flush_reason,
batch_frames,
batch_bytes,
flush_duration_us,
);
let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes;
let shrink_trigger = shrink_threshold
.saturating_mul(ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR);
if frame_buf.capacity() > shrink_trigger {
let cap_before = frame_buf.capacity();
frame_buf.shrink_to(shrink_threshold);
let cap_after = frame_buf.capacity();
let bytes_freed = cap_before.saturating_sub(cap_after) as u64;
stats_clone.observe_me_d2c_frame_buf_shrink(bytes_freed);
}
} }
_ = &mut stop_rx => { _ = &mut stop_rx => {
debug!(conn_id, "ME writer stop signal"); debug!(conn_id, "ME writer stop signal");
@ -1691,7 +1482,6 @@ async fn process_me_writer_response<W>(
stats: &Stats, stats: &Stats,
user: &str, user: &str,
quota_limit: Option<u64>, quota_limit: Option<u64>,
quota_soft_overshoot_bytes: u64,
bytes_me2c: &AtomicU64, bytes_me2c: &AtomicU64,
conn_id: u64, conn_id: u64,
ack_flush_immediate: bool, ack_flush_immediate: bool,
@ -1708,40 +1498,32 @@ where
trace!(conn_id, bytes = data.len(), flags, "ME->C data"); trace!(conn_id, bytes = data.len(), flags, "ME->C data");
} }
let data_len = data.len() as u64; let data_len = data.len() as u64;
if quota_would_be_exceeded_for_user_soft( if let Some(limit) = quota_limit {
stats, let quota_lock = quota_user_lock(user);
user, let _quota_guard = quota_lock.lock().await;
quota_limit, if quota_would_be_exceeded_for_user(stats, user, Some(limit), data_len) {
data_len,
quota_soft_overshoot_bytes,
) {
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
return Err(ProxyError::DataQuotaExceeded { return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(), user: user.to_string(),
}); });
} }
let write_mode =
write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf) write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
.await?; .await?;
stats.increment_me_d2c_write_mode(write_mode);
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed); bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
stats.add_user_octets_to(user, data.len() as u64); stats.add_user_octets_to(user, data.len() as u64);
stats.increment_me_d2c_data_frames_total();
stats.add_me_d2c_payload_bytes_total(data.len() as u64);
if quota_exceeded_for_user_soft( if quota_exceeded_for_user(stats, user, Some(limit)) {
stats,
user,
quota_limit,
quota_soft_overshoot_bytes,
) {
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PostWrite);
return Err(ProxyError::DataQuotaExceeded { return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(), user: user.to_string(),
}); });
} }
} else {
write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
.await?;
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
stats.add_user_octets_to(user, data.len() as u64);
}
Ok(MeWriterResponseOutcome::Continue { Ok(MeWriterResponseOutcome::Continue {
frames: 1, frames: 1,
@ -1756,7 +1538,6 @@ where
trace!(conn_id, confirm, "ME->C quickack"); trace!(conn_id, confirm, "ME->C quickack");
} }
write_client_ack(client_writer, proto_tag, confirm).await?; write_client_ack(client_writer, proto_tag, confirm).await?;
stats.increment_me_d2c_ack_frames_total();
Ok(MeWriterResponseOutcome::Continue { Ok(MeWriterResponseOutcome::Continue {
frames: 1, frames: 1,
@ -1807,13 +1588,13 @@ async fn write_client_payload<W>(
data: &[u8], data: &[u8],
rng: &SecureRandom, rng: &SecureRandom,
frame_buf: &mut Vec<u8>, frame_buf: &mut Vec<u8>,
) -> Result<MeD2cWriteMode> ) -> Result<()>
where where
W: AsyncWrite + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static,
{ {
let quickack = (flags & RPC_FLAG_QUICKACK) != 0; let quickack = (flags & RPC_FLAG_QUICKACK) != 0;
let write_mode = match proto_tag { match proto_tag {
ProtoTag::Abridged => { ProtoTag::Abridged => {
if !data.len().is_multiple_of(4) { if !data.len().is_multiple_of(4) {
return Err(ProxyError::Proxy(format!( return Err(ProxyError::Proxy(format!(
@ -1828,46 +1609,28 @@ where
if quickack { if quickack {
first |= 0x80; first |= 0x80;
} }
let wire_len = 1usize.saturating_add(data.len());
if wire_len <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
frame_buf.clear(); frame_buf.clear();
frame_buf.reserve(wire_len); frame_buf.reserve(1 + data.len());
frame_buf.push(first); frame_buf.push(first);
frame_buf.extend_from_slice(data); frame_buf.extend_from_slice(data);
client_writer client_writer
.write_all(frame_buf.as_slice()) .write_all(frame_buf)
.await .await
.map_err(ProxyError::Io)?; .map_err(ProxyError::Io)?;
MeD2cWriteMode::Coalesced
} else {
let header = [first];
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
MeD2cWriteMode::Split
}
} else if len_words < (1 << 24) { } else if len_words < (1 << 24) {
let mut first = 0x7fu8; let mut first = 0x7fu8;
if quickack { if quickack {
first |= 0x80; first |= 0x80;
} }
let lw = (len_words as u32).to_le_bytes(); let lw = (len_words as u32).to_le_bytes();
let wire_len = 4usize.saturating_add(data.len());
if wire_len <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
frame_buf.clear(); frame_buf.clear();
frame_buf.reserve(wire_len); frame_buf.reserve(4 + data.len());
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]); frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
frame_buf.extend_from_slice(data); frame_buf.extend_from_slice(data);
client_writer client_writer
.write_all(frame_buf.as_slice()) .write_all(frame_buf)
.await .await
.map_err(ProxyError::Io)?; .map_err(ProxyError::Io)?;
MeD2cWriteMode::Coalesced
} else {
let header = [first, lw[0], lw[1], lw[2]];
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
MeD2cWriteMode::Split
}
} else { } else {
return Err(ProxyError::Proxy(format!( return Err(ProxyError::Proxy(format!(
"Abridged frame too large: {}", "Abridged frame too large: {}",
@ -1887,10 +1650,8 @@ where
} else { } else {
0 0
}; };
let (len_val, total) = let (len_val, total) =
compute_intermediate_secure_wire_len(data.len(), padding_len, quickack)?; compute_intermediate_secure_wire_len(data.len(), padding_len, quickack)?;
if total <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
frame_buf.clear(); frame_buf.clear();
frame_buf.reserve(total); frame_buf.reserve(total);
frame_buf.extend_from_slice(&len_val.to_le_bytes()); frame_buf.extend_from_slice(&len_val.to_le_bytes());
@ -1901,32 +1662,13 @@ where
rng.fill(&mut frame_buf[start..]); rng.fill(&mut frame_buf[start..]);
} }
client_writer client_writer
.write_all(frame_buf.as_slice()) .write_all(frame_buf)
.await
.map_err(ProxyError::Io)?;
MeD2cWriteMode::Coalesced
} else {
let header = len_val.to_le_bytes();
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
if padding_len > 0 {
frame_buf.clear();
if frame_buf.capacity() < padding_len {
frame_buf.reserve(padding_len);
}
frame_buf.resize(padding_len, 0);
rng.fill(frame_buf.as_mut_slice());
client_writer
.write_all(frame_buf.as_slice())
.await .await
.map_err(ProxyError::Io)?; .map_err(ProxyError::Io)?;
} }
MeD2cWriteMode::Split
} }
}
};
Ok(write_mode) Ok(())
} }
async fn write_client_ack<W>( async fn write_client_ack<W>(

View File

@ -1540,7 +1540,6 @@ async fn process_me_writer_response_ack_obeys_flush_policy() {
&stats, &stats,
"user", "user",
None, None,
0,
&bytes_me2c, &bytes_me2c,
77, 77,
true, true,
@ -1567,7 +1566,6 @@ async fn process_me_writer_response_ack_obeys_flush_policy() {
&stats, &stats,
"user", "user",
None, None,
0,
&bytes_me2c, &bytes_me2c,
77, 77,
false, false,
@ -1608,7 +1606,6 @@ async fn process_me_writer_response_data_updates_byte_accounting() {
&stats, &stats,
"user", "user",
None, None,
0,
&bytes_me2c, &bytes_me2c,
88, 88,
false, false,
@ -1655,7 +1652,6 @@ async fn process_me_writer_response_data_enforces_live_user_quota() {
&stats, &stats,
"quota-user", "quota-user",
Some(12), Some(12),
0,
&bytes_me2c, &bytes_me2c,
89, 89,
false, false,
@ -1704,7 +1700,6 @@ async fn process_me_writer_response_concurrent_same_user_quota_does_not_overshoo
&stats, &stats,
user, user,
Some(1), Some(1),
0,
&bytes_me2c, &bytes_me2c,
91, 91,
false, false,
@ -1722,7 +1717,6 @@ async fn process_me_writer_response_concurrent_same_user_quota_does_not_overshoo
&stats, &stats,
user, user,
Some(1), Some(1),
0,
&bytes_me2c, &bytes_me2c,
92, 92,
false, false,
@ -1771,7 +1765,6 @@ async fn process_me_writer_response_data_does_not_forward_partial_payload_when_r
&stats, &stats,
"partial-quota-user", "partial-quota-user",
Some(4), Some(4),
0,
&bytes_me2c, &bytes_me2c,
90, 90,
false, false,
@ -1977,7 +1970,6 @@ async fn run_quota_race_attempt(
stats, stats,
user, user,
Some(1), Some(1),
0,
bytes_me2c, bytes_me2c,
conn_id, conn_id,
false, false,

View File

@ -26,28 +26,6 @@ enum RouteConnectionGauge {
Middle, Middle,
} }
#[derive(Debug, Clone, Copy)]
pub enum MeD2cFlushReason {
QueueDrain,
BatchFrames,
BatchBytes,
MaxDelay,
AckImmediate,
Close,
}
#[derive(Debug, Clone, Copy)]
pub enum MeD2cWriteMode {
Coalesced,
Split,
}
#[derive(Debug, Clone, Copy)]
pub enum MeD2cQuotaRejectStage {
PreWrite,
PostWrite,
}
#[must_use = "RouteConnectionLease must be kept alive to hold the connection gauge increment"] #[must_use = "RouteConnectionLease must be kept alive to hold the connection gauge increment"]
pub struct RouteConnectionLease { pub struct RouteConnectionLease {
stats: Arc<Stats>, stats: Arc<Stats>,
@ -162,44 +140,6 @@ pub struct Stats {
me_route_drop_queue_full: AtomicU64, me_route_drop_queue_full: AtomicU64,
me_route_drop_queue_full_base: AtomicU64, me_route_drop_queue_full_base: AtomicU64,
me_route_drop_queue_full_high: AtomicU64, me_route_drop_queue_full_high: AtomicU64,
me_d2c_batches_total: AtomicU64,
me_d2c_batch_frames_total: AtomicU64,
me_d2c_batch_bytes_total: AtomicU64,
me_d2c_flush_reason_queue_drain_total: AtomicU64,
me_d2c_flush_reason_batch_frames_total: AtomicU64,
me_d2c_flush_reason_batch_bytes_total: AtomicU64,
me_d2c_flush_reason_max_delay_total: AtomicU64,
me_d2c_flush_reason_ack_immediate_total: AtomicU64,
me_d2c_flush_reason_close_total: AtomicU64,
me_d2c_data_frames_total: AtomicU64,
me_d2c_ack_frames_total: AtomicU64,
me_d2c_payload_bytes_total: AtomicU64,
me_d2c_write_mode_coalesced_total: AtomicU64,
me_d2c_write_mode_split_total: AtomicU64,
me_d2c_quota_reject_pre_write_total: AtomicU64,
me_d2c_quota_reject_post_write_total: AtomicU64,
me_d2c_frame_buf_shrink_total: AtomicU64,
me_d2c_frame_buf_shrink_bytes_total: AtomicU64,
me_d2c_batch_frames_bucket_1: AtomicU64,
me_d2c_batch_frames_bucket_2_4: AtomicU64,
me_d2c_batch_frames_bucket_5_8: AtomicU64,
me_d2c_batch_frames_bucket_9_16: AtomicU64,
me_d2c_batch_frames_bucket_17_32: AtomicU64,
me_d2c_batch_frames_bucket_gt_32: AtomicU64,
me_d2c_batch_bytes_bucket_0_1k: AtomicU64,
me_d2c_batch_bytes_bucket_1k_4k: AtomicU64,
me_d2c_batch_bytes_bucket_4k_16k: AtomicU64,
me_d2c_batch_bytes_bucket_16k_64k: AtomicU64,
me_d2c_batch_bytes_bucket_64k_128k: AtomicU64,
me_d2c_batch_bytes_bucket_gt_128k: AtomicU64,
me_d2c_flush_duration_us_bucket_0_50: AtomicU64,
me_d2c_flush_duration_us_bucket_51_200: AtomicU64,
me_d2c_flush_duration_us_bucket_201_1000: AtomicU64,
me_d2c_flush_duration_us_bucket_1001_5000: AtomicU64,
me_d2c_flush_duration_us_bucket_5001_20000: AtomicU64,
me_d2c_flush_duration_us_bucket_gt_20000: AtomicU64,
me_d2c_batch_timeout_armed_total: AtomicU64,
me_d2c_batch_timeout_fired_total: AtomicU64,
me_writer_pick_sorted_rr_success_try_total: AtomicU64, me_writer_pick_sorted_rr_success_try_total: AtomicU64,
me_writer_pick_sorted_rr_success_fallback_total: AtomicU64, me_writer_pick_sorted_rr_success_fallback_total: AtomicU64,
me_writer_pick_sorted_rr_full_total: AtomicU64, me_writer_pick_sorted_rr_full_total: AtomicU64,
@ -654,215 +594,6 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_me_d2c_batches_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_d2c_batches_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn add_me_d2c_batch_frames_total(&self, frames: u64) {
if self.telemetry_me_allows_normal() {
self.me_d2c_batch_frames_total
.fetch_add(frames, Ordering::Relaxed);
}
}
pub fn add_me_d2c_batch_bytes_total(&self, bytes: u64) {
if self.telemetry_me_allows_normal() {
self.me_d2c_batch_bytes_total
.fetch_add(bytes, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_flush_reason(&self, reason: MeD2cFlushReason) {
if !self.telemetry_me_allows_normal() {
return;
}
match reason {
MeD2cFlushReason::QueueDrain => {
self.me_d2c_flush_reason_queue_drain_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::BatchFrames => {
self.me_d2c_flush_reason_batch_frames_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::BatchBytes => {
self.me_d2c_flush_reason_batch_bytes_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::MaxDelay => {
self.me_d2c_flush_reason_max_delay_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::AckImmediate => {
self.me_d2c_flush_reason_ack_immediate_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::Close => {
self.me_d2c_flush_reason_close_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_d2c_data_frames_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_d2c_data_frames_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_ack_frames_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_d2c_ack_frames_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn add_me_d2c_payload_bytes_total(&self, bytes: u64) {
if self.telemetry_me_allows_normal() {
self.me_d2c_payload_bytes_total
.fetch_add(bytes, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_write_mode(&self, mode: MeD2cWriteMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeD2cWriteMode::Coalesced => {
self.me_d2c_write_mode_coalesced_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cWriteMode::Split => {
self.me_d2c_write_mode_split_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_d2c_quota_reject_total(&self, stage: MeD2cQuotaRejectStage) {
if !self.telemetry_me_allows_normal() {
return;
}
match stage {
MeD2cQuotaRejectStage::PreWrite => {
self.me_d2c_quota_reject_pre_write_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cQuotaRejectStage::PostWrite => {
self.me_d2c_quota_reject_post_write_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn observe_me_d2c_frame_buf_shrink(&self, bytes_freed: u64) {
if !self.telemetry_me_allows_normal() {
return;
}
self.me_d2c_frame_buf_shrink_total
.fetch_add(1, Ordering::Relaxed);
self.me_d2c_frame_buf_shrink_bytes_total
.fetch_add(bytes_freed, Ordering::Relaxed);
}
pub fn observe_me_d2c_batch_frames(&self, frames: u64) {
if !self.telemetry_me_allows_debug() {
return;
}
match frames {
0 => {}
1 => {
self.me_d2c_batch_frames_bucket_1
.fetch_add(1, Ordering::Relaxed);
}
2..=4 => {
self.me_d2c_batch_frames_bucket_2_4
.fetch_add(1, Ordering::Relaxed);
}
5..=8 => {
self.me_d2c_batch_frames_bucket_5_8
.fetch_add(1, Ordering::Relaxed);
}
9..=16 => {
self.me_d2c_batch_frames_bucket_9_16
.fetch_add(1, Ordering::Relaxed);
}
17..=32 => {
self.me_d2c_batch_frames_bucket_17_32
.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.me_d2c_batch_frames_bucket_gt_32
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn observe_me_d2c_batch_bytes(&self, bytes: u64) {
if !self.telemetry_me_allows_debug() {
return;
}
match bytes {
0..=1024 => {
self.me_d2c_batch_bytes_bucket_0_1k
.fetch_add(1, Ordering::Relaxed);
}
1025..=4096 => {
self.me_d2c_batch_bytes_bucket_1k_4k
.fetch_add(1, Ordering::Relaxed);
}
4097..=16_384 => {
self.me_d2c_batch_bytes_bucket_4k_16k
.fetch_add(1, Ordering::Relaxed);
}
16_385..=65_536 => {
self.me_d2c_batch_bytes_bucket_16k_64k
.fetch_add(1, Ordering::Relaxed);
}
65_537..=131_072 => {
self.me_d2c_batch_bytes_bucket_64k_128k
.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.me_d2c_batch_bytes_bucket_gt_128k
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn observe_me_d2c_flush_duration_us(&self, duration_us: u64) {
if !self.telemetry_me_allows_debug() {
return;
}
match duration_us {
0..=50 => {
self.me_d2c_flush_duration_us_bucket_0_50
.fetch_add(1, Ordering::Relaxed);
}
51..=200 => {
self.me_d2c_flush_duration_us_bucket_51_200
.fetch_add(1, Ordering::Relaxed);
}
201..=1000 => {
self.me_d2c_flush_duration_us_bucket_201_1000
.fetch_add(1, Ordering::Relaxed);
}
1001..=5000 => {
self.me_d2c_flush_duration_us_bucket_1001_5000
.fetch_add(1, Ordering::Relaxed);
}
5001..=20_000 => {
self.me_d2c_flush_duration_us_bucket_5001_20000
.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.me_d2c_flush_duration_us_bucket_gt_20000
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_d2c_batch_timeout_armed_total(&self) {
if self.telemetry_me_allows_debug() {
self.me_d2c_batch_timeout_armed_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_batch_timeout_fired_total(&self) {
if self.telemetry_me_allows_debug() {
self.me_d2c_batch_timeout_fired_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) { pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() { if !self.telemetry_me_allows_normal() {
return; return;
@ -1498,142 +1229,6 @@ impl Stats {
pub fn get_me_route_drop_queue_full_high(&self) -> u64 { pub fn get_me_route_drop_queue_full_high(&self) -> u64 {
self.me_route_drop_queue_full_high.load(Ordering::Relaxed) self.me_route_drop_queue_full_high.load(Ordering::Relaxed)
} }
pub fn get_me_d2c_batches_total(&self) -> u64 {
self.me_d2c_batches_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_total(&self) -> u64 {
self.me_d2c_batch_frames_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_total(&self) -> u64 {
self.me_d2c_batch_bytes_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_queue_drain_total(&self) -> u64 {
self.me_d2c_flush_reason_queue_drain_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_batch_frames_total(&self) -> u64 {
self.me_d2c_flush_reason_batch_frames_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_batch_bytes_total(&self) -> u64 {
self.me_d2c_flush_reason_batch_bytes_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_max_delay_total(&self) -> u64 {
self.me_d2c_flush_reason_max_delay_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_ack_immediate_total(&self) -> u64 {
self.me_d2c_flush_reason_ack_immediate_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_close_total(&self) -> u64 {
self.me_d2c_flush_reason_close_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_data_frames_total(&self) -> u64 {
self.me_d2c_data_frames_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_ack_frames_total(&self) -> u64 {
self.me_d2c_ack_frames_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_payload_bytes_total(&self) -> u64 {
self.me_d2c_payload_bytes_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_write_mode_coalesced_total(&self) -> u64 {
self.me_d2c_write_mode_coalesced_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_write_mode_split_total(&self) -> u64 {
self.me_d2c_write_mode_split_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_quota_reject_pre_write_total(&self) -> u64 {
self.me_d2c_quota_reject_pre_write_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_quota_reject_post_write_total(&self) -> u64 {
self.me_d2c_quota_reject_post_write_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_frame_buf_shrink_total(&self) -> u64 {
self.me_d2c_frame_buf_shrink_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_frame_buf_shrink_bytes_total(&self) -> u64 {
self.me_d2c_frame_buf_shrink_bytes_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_1(&self) -> u64 {
self.me_d2c_batch_frames_bucket_1.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_2_4(&self) -> u64 {
self.me_d2c_batch_frames_bucket_2_4.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_5_8(&self) -> u64 {
self.me_d2c_batch_frames_bucket_5_8.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_9_16(&self) -> u64 {
self.me_d2c_batch_frames_bucket_9_16.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_17_32(&self) -> u64 {
self.me_d2c_batch_frames_bucket_17_32
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_gt_32(&self) -> u64 {
self.me_d2c_batch_frames_bucket_gt_32
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_0_1k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_0_1k.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_1k_4k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_1k_4k.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_4k_16k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_4k_16k.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_16k_64k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_16k_64k
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_64k_128k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_64k_128k
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_gt_128k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_gt_128k
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_0_50(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_0_50
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_51_200(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_51_200
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_201_1000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_201_1000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_1001_5000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_1001_5000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_5001_20000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_5001_20000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_gt_20000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_gt_20000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_timeout_armed_total(&self) -> u64 {
self.me_d2c_batch_timeout_armed_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_timeout_fired_total(&self) -> u64 {
self.me_d2c_batch_timeout_fired_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 { pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_success_try_total self.me_writer_pick_sorted_rr_success_try_total
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
@ -2303,83 +1898,9 @@ mod tests {
stats.increment_me_crc_mismatch(); stats.increment_me_crc_mismatch();
stats.increment_me_keepalive_sent(); stats.increment_me_keepalive_sent();
stats.increment_me_route_drop_queue_full(); stats.increment_me_route_drop_queue_full();
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(4);
stats.add_me_d2c_batch_bytes_total(4096);
stats.increment_me_d2c_flush_reason(MeD2cFlushReason::BatchBytes);
stats.increment_me_d2c_write_mode(MeD2cWriteMode::Coalesced);
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
stats.observe_me_d2c_frame_buf_shrink(1024);
stats.observe_me_d2c_batch_frames(4);
stats.observe_me_d2c_batch_bytes(4096);
stats.observe_me_d2c_flush_duration_us(120);
stats.increment_me_d2c_batch_timeout_armed_total();
stats.increment_me_d2c_batch_timeout_fired_total();
assert_eq!(stats.get_me_crc_mismatch(), 0); assert_eq!(stats.get_me_crc_mismatch(), 0);
assert_eq!(stats.get_me_keepalive_sent(), 0); assert_eq!(stats.get_me_keepalive_sent(), 0);
assert_eq!(stats.get_me_route_drop_queue_full(), 0); assert_eq!(stats.get_me_route_drop_queue_full(), 0);
assert_eq!(stats.get_me_d2c_batches_total(), 0);
assert_eq!(stats.get_me_d2c_flush_reason_batch_bytes_total(), 0);
assert_eq!(stats.get_me_d2c_write_mode_coalesced_total(), 0);
assert_eq!(stats.get_me_d2c_quota_reject_pre_write_total(), 0);
assert_eq!(stats.get_me_d2c_frame_buf_shrink_total(), 0);
assert_eq!(stats.get_me_d2c_batch_frames_bucket_2_4(), 0);
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_1k_4k(), 0);
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_51_200(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 0);
}
#[test]
fn test_telemetry_policy_me_normal_blocks_d2c_debug_metrics() {
let stats = Stats::new();
stats.apply_telemetry_policy(TelemetryPolicy {
core_enabled: true,
user_enabled: true,
me_level: MeTelemetryLevel::Normal,
});
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(2);
stats.add_me_d2c_batch_bytes_total(2048);
stats.increment_me_d2c_flush_reason(MeD2cFlushReason::QueueDrain);
stats.observe_me_d2c_batch_frames(2);
stats.observe_me_d2c_batch_bytes(2048);
stats.observe_me_d2c_flush_duration_us(100);
stats.increment_me_d2c_batch_timeout_armed_total();
stats.increment_me_d2c_batch_timeout_fired_total();
assert_eq!(stats.get_me_d2c_batches_total(), 1);
assert_eq!(stats.get_me_d2c_batch_frames_total(), 2);
assert_eq!(stats.get_me_d2c_batch_bytes_total(), 2048);
assert_eq!(stats.get_me_d2c_flush_reason_queue_drain_total(), 1);
assert_eq!(stats.get_me_d2c_batch_frames_bucket_2_4(), 0);
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_1k_4k(), 0);
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_51_200(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 0);
}
#[test]
fn test_telemetry_policy_me_debug_enables_d2c_debug_metrics() {
let stats = Stats::new();
stats.apply_telemetry_policy(TelemetryPolicy {
core_enabled: true,
user_enabled: true,
me_level: MeTelemetryLevel::Debug,
});
stats.observe_me_d2c_batch_frames(7);
stats.observe_me_d2c_batch_bytes(70_000);
stats.observe_me_d2c_flush_duration_us(1400);
stats.increment_me_d2c_batch_timeout_armed_total();
stats.increment_me_d2c_batch_timeout_fired_total();
assert_eq!(stats.get_me_d2c_batch_frames_bucket_5_8(), 1);
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_64k_128k(), 1);
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_1001_5000(), 1);
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 1);
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 1);
} }
#[test] #[test]

View File

@ -126,10 +126,14 @@ pub(crate) async fn reader_loop(
let data = body.slice(12..); let data = body.slice(12..);
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS"); trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed); let data_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
let routed = reg let routed = if data_wait_ms == 0 {
.route_with_timeout(cid, MeResponse::Data { flags, data }, route_wait_ms) reg.route_nowait(cid, MeResponse::Data { flags, data })
.await; .await
} else {
reg.route_with_timeout(cid, MeResponse::Data { flags, data }, data_wait_ms)
.await
};
if !matches!(routed, RouteResult::Routed) { if !matches!(routed, RouteResult::Routed) {
match routed { match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(), RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),