Merge pull request #200 from telemt/flow

ME Connection lost fixes
This commit is contained in:
Alexey 2026-02-21 16:31:49 +03:00 committed by GitHub
commit d552ae84d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 34 additions and 23 deletions

View File

@ -940,42 +940,39 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
.run() .run()
.await .await
{ {
let peer_closed = match &e { let peer_closed = matches!(
crate::error::ProxyError::Io(ioe) => { &e,
matches!( crate::error::ProxyError::Io(ioe)
if matches!(
ioe.kind(), ioe.kind(),
std::io::ErrorKind::ConnectionReset std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted | std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::NotConnected | std::io::ErrorKind::NotConnected
) )
} ) || matches!(
&e,
crate::error::ProxyError::Stream( crate::error::ProxyError::Stream(
crate::error::StreamError::Io(ioe), crate::error::StreamError::Io(ioe)
) => { )
matches!( if matches!(
ioe.kind(), ioe.kind(),
std::io::ErrorKind::ConnectionReset std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted | std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::NotConnected | std::io::ErrorKind::NotConnected
) )
} );
_ => false,
};
if peer_closed { let me_closed = matches!(
debug!( &e,
peer = %peer_addr, crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost"
error = %e,
"Connection closed by peer"
);
} else {
warn!(
peer = %peer_addr,
error = %e,
"Connection closed with error"
); );
match (peer_closed, me_closed) {
(true, _) => debug!(peer = %peer_addr, error = %e, "Connection closed by client"),
(_, true) => warn!(peer = %peer_addr, error = %e, "Connection closed: Middle-End dropped session"),
_ => warn!(peer = %peer_addr, error = %e, "Connection closed with error"),
} }
} }
}); });

View File

@ -98,6 +98,7 @@ where
}); });
let mut main_result: Result<()> = Ok(()); let mut main_result: Result<()> = Ok(());
let mut client_closed = false;
loop { loop {
match read_client_payload(&mut crypto_reader, proto_tag, frame_limit, &user).await { match read_client_payload(&mut crypto_reader, proto_tag, frame_limit, &user).await {
Ok(Some((payload, quickack))) => { Ok(Some((payload, quickack))) => {
@ -124,6 +125,7 @@ where
} }
Ok(None) => { Ok(None) => {
debug!(conn_id, "Client EOF"); debug!(conn_id, "Client EOF");
client_closed = true;
let _ = me_pool.send_close(conn_id).await; let _ = me_pool.send_close(conn_id).await;
break; break;
} }
@ -135,7 +137,19 @@ where
} }
let _ = stop_tx.send(()); let _ = stop_tx.send(());
let writer_result = me_writer.await.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}")))); let mut writer_result = me_writer
.await
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}"))));
// When client closes, but ME channel stopped as unregistered - it isnt error
if client_closed {
if matches!(
writer_result,
Err(ProxyError::Proxy(ref msg)) if msg == "ME connection lost"
) {
writer_result = Ok(());
}
}
let result = match (main_result, writer_result) { let result = match (main_result, writer_result) {
(Ok(()), Ok(())) => Ok(()), (Ok(()), Ok(())) => Ok(()),