Function bodies 125 total
new function · rust · L49-L70 (22 LOC)src/cert/mod.rs
pub fn new(
ca_cert_path: &str,
ca_key_path: &str,
auto_generate: bool,
ca_validity_days: u32,
cert_validity_days: u32,
) -> Result<Self, CertError> {
let ca_cert_path = Path::new(ca_cert_path);
let ca_key_path = Path::new(ca_key_path);
if ca_cert_path.exists() && ca_key_path.exists() {
info!("Loading existing CA certificate from {:?}", ca_cert_path);
Self::load_ca(ca_cert_path, ca_key_path, cert_validity_days)
} else if auto_generate {
info!("Generating new CA certificate");
Self::generate_ca(ca_cert_path, ca_key_path, ca_validity_days, cert_validity_days)
} else {
Err(CertError::GenerationError(
"CA certificate not found and auto_generate is disabled".to_string(),
))
}
}generate_ca function · rust · L73-L126 (54 LOC)src/cert/mod.rs
fn generate_ca(
ca_cert_path: &Path,
ca_key_path: &Path,
ca_validity_days: u32,
cert_validity_days: u32,
) -> Result<Self, CertError> {
let mut params = CertificateParams::default();
let mut distinguished_name = DistinguishedName::new();
distinguished_name.push(DnType::CommonName, "Rustyman CA");
distinguished_name.push(DnType::OrganizationName, "Rustyman");
distinguished_name.push(DnType::CountryName, "US");
params.distinguished_name = distinguished_name;
params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
params.key_usages = vec![
KeyUsagePurpose::KeyCertSign,
KeyUsagePurpose::CrlSign,
KeyUsagePurpose::DigitalSignature,
];
let now = OffsetDateTime::now_utc();
params.not_before = now;
params.not_after = now + Duration::days(ca_validity_days as i64);
params.alg = &PKCS_ECDSA_P256_SHA256;
// load_ca function · rust · L129-L174 (46 LOC)src/cert/mod.rs
fn load_ca(
ca_cert_path: &Path,
ca_key_path: &Path,
cert_validity_days: u32,
) -> Result<Self, CertError> {
let cert_pem_content = fs::read_to_string(ca_cert_path)?;
let key_pem = fs::read_to_string(ca_key_path)?;
// Load the key pair
let key_pair = KeyPair::from_pem(&key_pem)?;
// Create new CA params with the loaded key
let mut params = CertificateParams::default();
let mut distinguished_name = DistinguishedName::new();
distinguished_name.push(DnType::CommonName, "Rustyman CA");
distinguished_name.push(DnType::OrganizationName, "Rustyman");
distinguished_name.push(DnType::CountryName, "US");
params.distinguished_name = distinguished_name;
params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
params.key_usages = vec![
KeyUsagePurpose::KeyCertSign,
KeyUsagePurpose::CrlSign,
KeyUsagePurpose::DigitalSignature,get_cert_for_domain function · rust · L187-L209 (23 LOC)src/cert/mod.rs
pub async fn get_cert_for_domain(&self, domain: &str) -> Result<Arc<CertifiedKey>, CertError> {
// Check cache first
{
let cache = self.cert_cache.read().await;
if let Some(cert) = cache.get(domain) {
debug!("Using cached certificate for {}", domain);
return Ok(Arc::clone(cert));
}
}
// Generate new certificate
debug!("Generating certificate for {}", domain);
let certified_key = self.generate_cert_for_domain(domain)?;
let certified_key = Arc::new(certified_key);
// Store in cache
{
let mut cache = self.cert_cache.write().await;
cache.insert(domain.to_string(), Arc::clone(&certified_key));
}
Ok(certified_key)
}generate_cert_for_domain function · rust · L212-L251 (40 LOC)src/cert/mod.rs
fn generate_cert_for_domain(&self, domain: &str) -> Result<CertifiedKey, CertError> {
let mut params = CertificateParams::default();
let mut distinguished_name = DistinguishedName::new();
distinguished_name.push(DnType::CommonName, domain);
distinguished_name.push(DnType::OrganizationName, "Rustyman");
params.distinguished_name = distinguished_name;
params.subject_alt_names = vec![SanType::DnsName(domain.try_into().map_err(|e| {
CertError::GenerationError(format!("Invalid domain name: {}", e))
})?)];
params.key_usages = vec![
KeyUsagePurpose::DigitalSignature,
KeyUsagePurpose::KeyEncipherment,
];
params.extended_key_usages = vec![ExtendedKeyUsagePurpose::ServerAuth];
let now = OffsetDateTime::now_utc();
params.not_before = now;
params.not_after = now + Duration::days(self.cert_validity_days as i64);
params.alg = &PKCS_ECDSA_P256_SHAtest_ca_generation function · rust · L273-L289 (17 LOC)src/cert/mod.rs
async fn test_ca_generation() {
let temp_dir = TempDir::new().unwrap();
let ca_cert_path = temp_dir.path().join("ca.crt");
let ca_key_path = temp_dir.path().join("ca.key");
let _ca = CertificateAuthority::new(
ca_cert_path.to_str().unwrap(),
ca_key_path.to_str().unwrap(),
true,
3650,
365,
)
.unwrap();
assert!(ca_cert_path.exists());
assert!(ca_key_path.exists());
}test_domain_cert_generation function · rust · L292-L308 (17 LOC)src/cert/mod.rs
async fn test_domain_cert_generation() {
let temp_dir = TempDir::new().unwrap();
let ca_cert_path = temp_dir.path().join("ca.crt");
let ca_key_path = temp_dir.path().join("ca.key");
let ca = CertificateAuthority::new(
ca_cert_path.to_str().unwrap(),
ca_key_path.to_str().unwrap(),
true,
3650,
365,
)
.unwrap();
let cert = ca.get_cert_for_domain("example.com").await.unwrap();
assert_eq!(cert.cert_chain.len(), 2);
}Open data scored by Repobility · https://repobility.com
default function · rust · L34-L44 (11 LOC)src/config/mod.rs
fn default() -> Self {
Self {
proxy: ProxyConfig::default(),
web_ui: WebUiConfig::default(),
cert: CertConfig::default(),
logging: LoggingConfig::default(),
map_remote: Vec::new(),
map_local: Vec::new(),
header_rules: Vec::new(),
}
}default function · rust · L65-L74 (10 LOC)src/config/mod.rs
fn default() -> Self {
Self {
host: "127.0.0.1".to_string(),
port: 8080,
mitm_enabled: true,
timeout: 30,
max_connections: 1000,
max_websocket_messages: 1000,
}
}default function · rust · L89-L95 (7 LOC)src/config/mod.rs
fn default() -> Self {
Self {
enabled: true,
host: "127.0.0.1".to_string(),
port: 8081,
}
}default function · rust · L114-L122 (9 LOC)src/config/mod.rs
fn default() -> Self {
Self {
ca_cert: "~/.rustyman/ca.crt".to_string(),
ca_key: "~/.rustyman/ca.key".to_string(),
auto_generate: true,
ca_validity_days: 3650,
cert_validity_days: 365,
}
}default function · rust · L139-L146 (8 LOC)src/config/mod.rs
fn default() -> Self {
Self {
level: "info".to_string(),
format: "text".to_string(),
file: None,
log_traffic: false,
}
}expand_paths function · rust · L254-L262 (9 LOC)src/config/mod.rs
pub fn expand_paths(&mut self) {
if let Some(home) = dirs_home() {
self.cert.ca_cert = self.cert.ca_cert.replace('~', &home);
self.cert.ca_key = self.cert.ca_key.replace('~', &home);
if let Some(ref mut file) = self.logging.file {
*file = file.replace('~', &home);
}
}
}test_default_config function · rust · L274-L279 (6 LOC)src/config/mod.rs
fn test_default_config() {
let config = Config::default();
assert_eq!(config.proxy.port, 8080);
assert!(config.proxy.mitm_enabled);
assert!(config.web_ui.enabled);
}test_yaml_serialization function · rust · L282-L287 (6 LOC)src/config/mod.rs
fn test_yaml_serialization() {
let config = Config::default();
let yaml = serde_yaml::to_string(&config).unwrap();
let parsed: Config = serde_yaml::from_str(&yaml).unwrap();
assert_eq!(parsed.proxy.port, config.proxy.port);
}Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
main function · rust · L86-L187 (102 LOC)src/main.rs
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
// Handle subcommands
if let Some(command) = cli.command {
return handle_command(command).await;
}
// Load or create configuration
let mut config = if cli.config.exists() {
info!("Loading configuration from {:?}", cli.config);
Config::load(&cli.config)?
} else {
info!("Using default configuration");
Config::default()
};
// Override config with CLI arguments
if let Some(host) = cli.host {
config.proxy.host = host;
}
if let Some(port) = cli.port {
config.proxy.port = port;
}
if let Some(web_port) = cli.web_port {
if web_port == 0 {
config.web_ui.enabled = false;
} else {
config.web_ui.port = web_port;
}
}
if cli.no_mitm {
config.proxy.mitm_enabled = false;
}
if let Some(ca_cert) = cli.ca_cert {
config.cert.ca_cert = ca_cert.to_stringhandle_command function · rust · L226-L261 (36 LOC)src/main.rs
async fn handle_command(command: Commands) -> anyhow::Result<()> {
match command {
Commands::Init { output } => {
info!("Creating default configuration at {:?}", output);
Config::create_default(&output)?;
println!("Configuration file created: {:?}", output);
println!("\nEdit this file to customize your proxy settings.");
}
Commands::GenCa { cert, key } => {
let cert_path = expand_tilde(&cert);
let key_path = expand_tilde(&key);
info!("Generating CA certificate");
let _ca = CertificateAuthority::new(&cert_path, &key_path, true, 3650, 365)?;
println!("CA certificate generated:");
println!(" Certificate: {}", cert_path);
println!(" Private key: {}", key_path);
println!("\nInstall the certificate in your system/browser to trust HTTPS interception.");
}
Commands::ExportCa { output } => {
lsetup_logging function · rust · L262-L291 (30 LOC)src/main.rs
fn setup_logging(config: &Config) -> anyhow::Result<()> {
let level = match config.logging.level.to_lowercase().as_str() {
"trace" => Level::TRACE,
"debug" => Level::DEBUG,
"info" => Level::INFO,
"warn" => Level::WARN,
"error" => Level::ERROR,
_ => Level::INFO,
};
let filter = EnvFilter::from_default_env()
.add_directive(level.into())
.add_directive("hyper=warn".parse()?)
.add_directive("rustls=warn".parse()?);
if config.logging.format == "json" {
tracing_subscriber::registry()
.with(filter)
.with(fmt::layer().json())
.init();
} else {
tracing_subscriber::registry()
.with(filter)
.with(fmt::layer().with_target(false).with_thread_ids(false))
.init();
}
Ok(())
}expand_tilde function · rust · L292-L300 (9 LOC)src/main.rs
fn expand_tilde(path: &str) -> String {
if path.starts_with('~') {
if let Ok(home) = std::env::var("HOME") {
return path.replacen('~', &home, 1);
}
}
path.to_string()
}handle function · rust · L44-L65 (22 LOC)src/proxy/handler.rs
pub async fn handle(self, stream: TcpStream) -> Result<(), ProxyError> {
let io = TokioIo::new(stream);
let state = Arc::clone(&self.state);
let client_addr = self.client_addr;
ServerBuilder::new()
.preserve_header_case(true)
.serve_connection(
io,
service_fn(move |req| {
let state = Arc::clone(&state);
let client_addr = client_addr;
async move { Self::handle_request(state, client_addr, req).await }
}),
)
.with_upgrades()
.await
.map_err(|e| ProxyError::HttpError(e.to_string()))?;
Ok(())
}handle_request function · rust · L68-L85 (18 LOC)src/proxy/handler.rs
async fn handle_request(
state: Arc<ProxyState>,
client_addr: SocketAddr,
req: Request<Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
let method = req.method().clone();
let uri = req.uri().clone();
debug!("{} {} from {}", method, uri, client_addr);
// Handle CONNECT method for HTTPS
if method == Method::CONNECT {
return Self::handle_connect(state, client_addr, req).await;
}
// Handle regular HTTP request
Self::handle_http_request(state, client_addr, req).await
}handle_connect function · rust · L88-L134 (47 LOC)src/proxy/handler.rs
async fn handle_connect(
state: Arc<ProxyState>,
client_addr: SocketAddr,
req: Request<Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
let host = req.uri().host().unwrap_or("").to_string();
let port = req.uri().port_u16().unwrap_or(443);
info!("CONNECT {}:{} from {}", host, port, client_addr);
// Get MITM enabled setting
let mitm_enabled = {
let config = state.config.read().await;
config.proxy.mitm_enabled
};
tokio::task::spawn(async move {
match hyper::upgrade::on(req).await {
Ok(upgraded) => {
let io = TokioIo::new(upgraded);
if mitm_enabled {
// MITM mode: intercept TLS
if let Err(e) =
Self::handle_mitm_tunnel(state, client_addr, io, &host, port).await
{
error!("handle_mitm_tunnel function · rust · L137-L179 (43 LOC)src/proxy/handler.rs
async fn handle_mitm_tunnel<I>(
state: Arc<ProxyState>,
client_addr: SocketAddr,
upgraded: I,
host: &str,
port: u16,
) -> Result<(), ProxyError>
where
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
// Get certificate for this host
let cert = state.ca.get_cert_for_domain(host).await?;
// Create TLS acceptor with the generated certificate
let tls_acceptor = TlsAcceptor::from_certified_key(&cert)?;
// Accept TLS connection from client
let tls_stream = tls_acceptor.accept(upgraded).await?;
let io = TokioIo::new(tls_stream);
let host = host.to_string();
let state_clone = Arc::clone(&state);
// Serve HTTP/1.1 and HTTP/2 over the TLS connection (auto-detected)
let mut auto_builder = AutoServerBuilder::new(TokioExecutor::new());
auto_builder.http1().preserve_header_case(true);
auto_builder
Repobility · MCP-ready · https://repobility.com
handle_passthrough_tunnel function · rust · L182-L209 (28 LOC)src/proxy/handler.rs
async fn handle_passthrough_tunnel<I>(
upgraded: I,
host: &str,
port: u16,
) -> Result<(), ProxyError>
where
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
{
let target = format!("{}:{}", host, port);
let server = TcpStream::connect(&target).await?;
let (mut client_read, mut client_write) = tokio::io::split(upgraded);
let (mut server_read, mut server_write) = tokio::io::split(server);
let client_to_server = async {
tokio::io::copy(&mut client_read, &mut server_write).await?;
server_write.shutdown().await
};
let server_to_client = async {
tokio::io::copy(&mut server_read, &mut client_write).await?;
client_write.shutdown().await
};
tokio::try_join!(client_to_server, server_to_client)?;
Ok(())
}handle_https_request function · rust · L212-L233 (22 LOC)src/proxy/handler.rs
async fn handle_https_request(
state: Arc<ProxyState>,
client_addr: SocketAddr,
req: Request<Incoming>,
host: &str,
port: u16,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
// Build the full URL
let uri = req.uri();
let path = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/");
let full_url = if port == 443 {
format!("https://{}{}", host, path)
} else {
format!("https://{}:{}{}", host, port, path)
};
debug!("HTTPS {} {} from {}", req.method(), full_url, client_addr);
// Process the request with rules
Self::process_request(state, client_addr, req, &full_url, host, port, true).await
}handle_http_request function · rust · L236-L248 (13 LOC)src/proxy/handler.rs
async fn handle_http_request(
state: Arc<ProxyState>,
client_addr: SocketAddr,
req: Request<Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
let uri = req.uri().clone();
let full_url = uri.to_string();
let host = uri.host().unwrap_or("").to_string();
let port = uri.port_u16().unwrap_or(80);
Self::process_request(state, client_addr, req, &full_url, &host, port, false).await
}forward_request function · rust · L505-L558 (54 LOC)src/proxy/handler.rs
async fn forward_request(
method: &Method,
url: &str,
host: &str,
port: u16,
is_https: bool,
headers: HashMap<String, String>,
body: Option<Bytes>,
) -> Result<Response<Incoming>, ProxyError> {
let uri: Uri = url.parse().map_err(|e| ProxyError::HttpError(format!("Invalid URL: {}", e)))?;
let path = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/");
// Connect to target server
let target = format!("{}:{}", host, port);
let stream = TcpStream::connect(&target)
.await
.map_err(|e| ProxyError::IoError(e))?;
if is_https {
// TLS connection with ALPN for HTTP/2 negotiation
let mut root_store = rustls::RootCertStore::empty();
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let mut config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
send_request function · rust · L561-L610 (50 LOC)src/proxy/handler.rs
async fn send_request<I>(
io: TokioIo<I>,
method: &Method,
host: &str,
path: &str,
headers: HashMap<String, String>,
body: Option<Bytes>,
) -> Result<Response<Incoming>, ProxyError>
where
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let (mut sender, conn) = ClientBuilder::new()
.preserve_header_case(true)
.handshake(io)
.await
.map_err(|e| ProxyError::HttpError(e.to_string()))?;
tokio::spawn(async move {
if let Err(e) = conn.await {
error!("Connection error: {}", e);
}
});
let mut req_builder = Request::builder()
.method(method)
.uri(path)
.header("host", host);
for (key, value) in &headers {
// Skip hop-by-hop headers and host (we set it above)
if !is_hop_by_hop_header(key) && key.to_lowercase() send_request_h2 function · rust · L613-L660 (48 LOC)src/proxy/handler.rs
async fn send_request_h2<I>(
io: TokioIo<I>,
method: &Method,
host: &str,
path: &str,
headers: HashMap<String, String>,
body: Option<Bytes>,
) -> Result<Response<Incoming>, ProxyError>
where
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let (mut sender, conn) = Http2ClientBuilder::new(TokioExecutor::new())
.handshake(io)
.await
.map_err(|e| ProxyError::HttpError(e.to_string()))?;
tokio::spawn(async move {
if let Err(e) = conn.await {
error!("HTTP/2 connection error: {}", e);
}
});
let mut req_builder = Request::builder()
.method(method)
.uri(path)
.header("host", host);
for (key, value) in &headers {
if !is_hop_by_hop_header(key) && key.to_lowercase() != "host" {
req_builder = req_builder.header(key, valueis_websocket_upgrade function · rust · L663-L680 (18 LOC)src/proxy/handler.rs
fn is_websocket_upgrade(req: &Request<Incoming>) -> bool {
let upgrade = req
.headers()
.get("upgrade")
.and_then(|v| v.to_str().ok())
.map(|v| v.to_lowercase());
let connection = req
.headers()
.get("connection")
.and_then(|v| v.to_str().ok())
.map(|v| v.to_lowercase());
upgrade.as_deref() == Some("websocket")
&& connection
.as_ref()
.map(|c| c.contains("upgrade"))
.unwrap_or(false)
}handle_websocket_proxy function · rust · L683-L781 (99 LOC)src/proxy/handler.rs
async fn handle_websocket_proxy(
state: Arc<ProxyState>,
client_addr: SocketAddr,
req: Request<Incoming>,
full_url: &str,
host: &str,
port: u16,
is_https: bool,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
// Extract WebSocket headers before consuming the request
let method = req.method().to_string();
let http_version = format!("{:?}", req.version());
let path = req.uri().path().to_string();
let headers: HashMap<String, String> = req
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
.collect();
let ws_key = headers
.get("sec-websocket-key")
.cloned()
.unwrap_or_default();
let ws_protocol = headers.get("sec-websocket-protocol").cloned();
info!("WebSocket upgrade: {} from {}", full_url, client_addr);
// Compute accept key
Repobility · open methodology · https://repobility.com/research/
relay_websocket function · rust · L785-L846 (62 LOC)src/proxy/handler.rs
async fn relay_websocket(
state: Arc<ProxyState>,
upgraded: hyper::upgrade::Upgraded,
full_url: &str,
host: &str,
port: u16,
is_https: bool,
entry_id: &str,
max_messages: usize,
protocol: Option<&str>,
) -> Result<(), ProxyError> {
// Connect to upstream
let upstream_io = Self::connect_upstream_raw(host, port, is_https).await?;
// Build WebSocket URL for upstream
let ws_url = if is_https {
full_url.replacen("https://", "wss://", 1)
} else {
full_url.replacen("http://", "ws://", 1)
};
// Build upstream WebSocket request
let mut request = tungstenite::http::Request::builder()
.uri(&ws_url)
.header("Host", host)
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header(
"Sec-WebSockrelay_websocket_frames function · rust · L849-L929 (81 LOC)src/proxy/handler.rs
async fn relay_websocket_frames<C, U>(
state: Arc<ProxyState>,
client_ws: WebSocketStream<C>,
upstream_ws: WebSocketStream<U>,
entry_id: &str,
max_messages: usize,
) where
C: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
U: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let (mut client_sink, mut client_stream) = client_ws.split();
let (mut upstream_sink, mut upstream_stream) = upstream_ws.split();
let entry_id_1 = entry_id.to_string();
let entry_id_2 = entry_id.to_string();
let state_1 = Arc::clone(&state);
let state_2 = Arc::clone(&state);
let client_to_upstream = async move {
while let Some(msg) = client_stream.next().await {
match msg {
Ok(msg) => {
let ws_info =
ws_message_to_info(&msg, WebSocketDirection::ClientToServer);
connect_upstream_raw function · rust · L932-L961 (30 LOC)src/proxy/handler.rs
async fn connect_upstream_raw(
host: &str,
port: u16,
is_https: bool,
) -> Result<UpstreamStream, ProxyError> {
let target = format!("{}:{}", host, port);
let stream = TcpStream::connect(&target).await?;
if is_https {
let mut root_store = rustls::RootCertStore::empty();
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let connector = TlsConnector::from(Arc::new(config));
let server_name = ServerName::try_from(host.to_string())
.map_err(|e| ProxyError::TlsError(format!("Invalid server name: {}", e)))?;
let tls_stream = connector
.connect(server_name, stream)
.await
.map_err(|e| ProxyError::TlsError(e.to_string()))?;
Ok(UpstreamStpoll_read function · rust · L971-L980 (10 LOC)src/proxy/handler.rs
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
match self.get_mut() {
UpstreamStream::Plain(s) => Pin::new(s).poll_read(cx, buf),
UpstreamStream::Tls(s) => Pin::new(s).poll_read(cx, buf),
}
}poll_write function · rust · L984-L993 (10 LOC)src/proxy/handler.rs
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
match self.get_mut() {
UpstreamStream::Plain(s) => Pin::new(s).poll_write(cx, buf),
UpstreamStream::Tls(s) => Pin::new(s).poll_write(cx, buf),
}
}poll_flush function · rust · L994-L1000 (7 LOC)src/proxy/handler.rs
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
match self.get_mut() {
UpstreamStream::Plain(s) => Pin::new(s).poll_flush(cx),
UpstreamStream::Tls(s) => Pin::new(s).poll_flush(cx),
}
}poll_shutdown function · rust · L1001-L1007 (7 LOC)src/proxy/handler.rs
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
match self.get_mut() {
UpstreamStream::Plain(s) => Pin::new(s).poll_shutdown(cx),
UpstreamStream::Tls(s) => Pin::new(s).poll_shutdown(cx),
}
}ws_message_to_info function · rust · L1013-L1048 (36 LOC)src/proxy/handler.rs
fn ws_message_to_info(
msg: &tungstenite::Message,
direction: WebSocketDirection,
) -> WebSocketMessageInfo {
let (message_type, payload, payload_size) = match msg {
tungstenite::Message::Text(text) => {
(WebSocketMessageType::Text, Some(text.clone()), text.len())
}
tungstenite::Message::Binary(data) => {
let encoded = base64::engine::general_purpose::STANDARD.encode(data);
(WebSocketMessageType::Binary, Some(encoded), data.len())
}
tungstenite::Message::Ping(data) => {
let encoded = base64::engine::general_purpose::STANDARD.encode(data);
(WebSocketMessageType::Ping, Some(encoded), data.len())
}
tungstenite::Message::Pong(data) => {
let encoded = base64::engine::general_purpose::STANDARD.encode(data);
(WebSocketMessageType::Pong, Some(encoded), data.len())
}
tungstenite::Message::Close(frame) => {
let payloadOpen data scored by Repobility · https://repobility.com
test_ws_message_to_info_text function · rust · L1056-L1062 (7 LOC)src/proxy/handler.rs
fn test_ws_message_to_info_text() {
let msg = tungstenite::Message::Text("hello world".into());
let info = ws_message_to_info(&msg, WebSocketDirection::ClientToServer);
assert!(matches!(info.message_type, WebSocketMessageType::Text));
assert_eq!(info.payload_size, 11);
assert!(info.payload.is_some());
}test_ws_message_to_info_binary function · rust · L1065-L1076 (12 LOC)src/proxy/handler.rs
fn test_ws_message_to_info_binary() {
let data = vec![1u8, 2, 3, 4];
let msg = tungstenite::Message::Binary(data.clone().into());
let info = ws_message_to_info(&msg, WebSocketDirection::ServerToClient);
assert!(matches!(info.message_type, WebSocketMessageType::Binary));
assert_eq!(info.payload_size, 4);
// Verify base64 round-trip
let decoded = base64::engine::general_purpose::STANDARD
.decode(info.payload.unwrap())
.unwrap();
assert_eq!(decoded, data);
}test_ws_message_to_info_close function · rust · L1079-L1087 (9 LOC)src/proxy/handler.rs
fn test_ws_message_to_info_close() {
let msg = tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame {
code: tungstenite::protocol::frame::coding::CloseCode::Normal,
reason: "goodbye".into(),
}));
let info = ws_message_to_info(&msg, WebSocketDirection::ClientToServer);
assert!(matches!(info.message_type, WebSocketMessageType::Close));
assert_eq!(info.payload.as_deref(), Some("goodbye"));
}is_hop_by_hop_header function · rust · L1091-L1104 (14 LOC)src/proxy/handler.rs
fn is_hop_by_hop_header(name: &str) -> bool {
let name = name.to_lowercase();
matches!(
name.as_str(),
"connection"
| "keep-alive"
| "proxy-authenticate"
| "proxy-authorization"
| "te"
| "trailer"
| "transfer-encoding"
| "upgrade"
)
}decompress_body function · rust · L1107-L1130 (24 LOC)src/proxy/handler.rs
fn decompress_body(body: &[u8], encoding: &str) -> Option<Vec<u8>> {
match encoding.to_lowercase().as_str() {
"gzip" => {
use std::io::Read;
let mut decoder = flate2::read::GzDecoder::new(body);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).ok()?;
Some(decompressed)
}
"deflate" => {
use std::io::Read;
let mut decoder = flate2::read::DeflateDecoder::new(body);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).ok()?;
Some(decompressed)
}
"br" => {
let mut decompressed = Vec::new();
brotli::BrotliDecompress(&mut std::io::Cursor::new(body), &mut decompressed).ok()?;
Some(decompressed)
}
_ => None,
}
}new function · rust · L42-L56 (15 LOC)src/proxy/mod.rs
pub fn new(config: Config, ca: CertificateAuthority, shutdown: CancellationToken) -> Result<Self, ProxyError> {
let rules = RuleEngine::new(
config.map_remote.clone(),
config.map_local.clone(),
config.header_rules.clone(),
)?;
Ok(Self {
config: RwLock::new(config),
ca: Arc::new(ca),
rules: RwLock::new(rules),
traffic: Arc::new(TrafficStore::default()),
shutdown,
})
}reload_rules function · rust · L59-L71 (13 LOC)src/proxy/mod.rs
pub async fn reload_rules(&self) -> Result<(), ProxyError> {
let config = self.config.read().await;
let mut rules = self.rules.write().await;
*rules = RuleEngine::new(
config.map_remote.clone(),
config.map_local.clone(),
config.header_rules.clone(),
)?;
info!("Rules reloaded");
Ok(())
}run function · rust · L90-L133 (44 LOC)src/proxy/mod.rs
pub async fn run(&self, cancel_token: CancellationToken) -> Result<(), ProxyError> {
let config = self.state.config.read().await;
let addr: SocketAddr = format!("{}:{}", config.proxy.host, config.proxy.port)
.parse()
.map_err(|e| ProxyError::IoError(std::io::Error::other(format!("Invalid address: {}", e))))?;
drop(config);
let listener = TcpListener::bind(addr).await?;
info!("Proxy server listening on {}", addr);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Proxy server shutting down...");
break;
}
result = listener.accept() => {
match result {
Ok((stream, client_addr)) => {
let state = Arc::clone(&self.state);
let token = cancel_token.clone();
tokio::spawn(async moMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
from_certified_key function · rust · L16-L25 (10 LOC)src/proxy/tls.rs
pub fn from_certified_key(certified_key: &CertifiedKey) -> Result<Self, ProxyError> {
let config = Self::create_server_config(
certified_key.cert_chain.clone(),
certified_key.private_key.clone_key(),
)?;
Ok(Self {
inner: TokioTlsAcceptor::from(Arc::new(config)),
})
}create_server_config function · rust · L28-L40 (13 LOC)src/proxy/tls.rs
fn create_server_config(
cert_chain: Vec<CertificateDer<'static>>,
private_key: PrivateKeyDer<'static>,
) -> Result<ServerConfig, ProxyError> {
let mut config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(cert_chain, private_key)
.map_err(|e| ProxyError::TlsError(format!("Failed to create TLS config: {}", e)))?;
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
Ok(config)
}accept function · rust · L43-L51 (9 LOC)src/proxy/tls.rs
pub async fn accept<IO>(&self, stream: IO) -> Result<tokio_rustls::server::TlsStream<IO>, ProxyError>
where
IO: AsyncRead + AsyncWrite + Unpin,
{
self.inner
.accept(stream)
.await
.map_err(|e| ProxyError::TlsError(format!("TLS accept failed: {}", e)))
}page 1 / 3next ›