Dec 18, 2024
4 min read
Rust,
Pingora,

Nginx 的竞争者: Pingora 的使用

Pingora 的使用

Pingora 是由 Cloudflare 公司开发的类似 Nginx 的产品。据悉,Pingora 历经了在 Cloudflare 的实战检验,在数年多的时间里,它每秒能够处理 4000 多万个互联网请求。

尽管 Cloudflare 一直将 Pingora 与 Nginx 进行对标,但对于使用者而言,它们之间根本不具可比性,毕竟 Nginx 是一款软件,而 Pingora 只是一个库。所以,倘若你只是单纯地想用 Pingora 来替代 Nginx,恐怕会大失所望。我觉得你可能想找的是类似于 https://github.com/vicanso/pingap 这样的基于 Pingora 的上层应用。

不过从另一个方面来讲,正因为它是一个库,所以它的自由度要大得多。这就意味着我们可以将应用和类 Nginx 的功能打包在一起使用(当然,牺牲的是便捷性)。

一个常见例子是,一般前端如 react 或者vue 等单面应用,打完包后得到的静态页面,会直接丢给nginx 部署入口直接指向index文件就行。如果存在路由,nginx 只需要类似的下面的配置:

location / {
  try_files $uri $uri/ /index.html;
}

因为pingora 不是开箱即用的,因此我们需要手动在代码中实现上面的逻辑。

首先,我们需要在项目中添加pingora 依赖:

[dependencies]
pingora = { version = "0.4.0", features = ["lb"] }

最好我们能内嵌前端资源文件到二进制包,需要使用下面的库:

[dependencies]
rust-embed = "8.5.0"

创建一个嵌入资源结构体:

// asset.rs

use rust_embed::Embed;

#[derive(Embed)]
#[folder = "../dist"]
pub struct Asset;

其中 dist 为前端打包后的静态文件目录。

接下来我们可以实现反向代理逻辑了。我们需要定义一个代理服务器结构体:

pub(crate) struct ServerProxy {
    pub port: u16,
    pub target_host: String,
    lb: Arc<LoadBalancer<RoundRobin>>,
}

上面除了端口和目标主机名外,还定义了一个 lb 负载均衡器。因为前端一般会有后端的请求接口,我们直接在这里做一些负载均衡。

实现一个初始化工厂函数:

/// 创建新的负载均衡器实例
/// 
/// # 参数
/// * `port`: 监听的端口号
/// * `up_streams`: 上游服务器的地址列表
/// * `target_host`: 目标主机的地址
/// 
/// # 返回
/// 返回一个初始化后的负载均衡器实例
pub fn new(port: u16, up_streams: Vec<String>, target_host: String) -> Self {
    // 创建负载均衡器实例
    let mut lb = LoadBalancer::try_from_iter(up_streams).unwrap();
    
    // 添加健康检查
    let hc = TcpHealthCheck::new();
    lb.set_health_check(hc);
    // 设置健康检查的频率为每3秒一次
    lb.health_check_frequency = Some(std::time::Duration::from_secs(3));
    
    // 启动健康检查的后台服务
    let background = background_service("health check", lb);
    let lb = background.task();

    // 构建负载均衡器实例
    Self {
        port,
        lb,
        target_host,
    }
}

这里直接为负载均衡器添加健康检查。

我们需要为 ServerProxy 实现 ProxyHttp trait 。

#[async_trait]
impl ProxyHttp for ServerProxy {
    type CTX = ();
    fn new_ctx(&self) {}

    async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
    where
        Self::CTX: Send + Sync,
    {
        // 限流
        Ok(false)
    }
    async fn proxy_upstream_filter(
        &self,
        session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<bool>
    where
        Self::CTX: Send + Sync,
    {
        // todo

        Ok(false)
    }

    async fn upstream_peer(
        &self,
        _session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>> {
        //todo
    }
}

request_filter 方法的主要职责是在请求到达时进行初步处理,例如解析、验证、速率限制、访问控制等。如果在此阶段已经生成并发送了响应,则返回 Ok(true) 结束处理;否则返回 Ok(false) 继续后续处理。默认实现中,该方法不做任何处理并返回 Ok(false),允许开发者根据需要自定义具体的处理逻辑。

proxy_upstream_filter 方法的主要职责是在缓存未命中后,决定请求是否应该继续转发到上游服务器。这个方法可以用于执行延迟检查,例如速率限制或访问控制,从而提高性能和安全性。默认实现中,请求总是继续转发,但可以通过自定义实现来添加更复杂的逻辑。

upstream_peer 方法的主要职责是根据当前会话和上下文信息,决定请求应该被转发到哪个上游服务器,并返回相应的 HttpPeer 对象。这个方法在代理服务器中非常重要,因为它决定了流量的路由方向。通过实现该方法,可以灵活地控制请求的转发逻辑,例如基于不同的条件选择不同的上游服务器。

他们的触发顺序是这样的: request_filter→proxy_upstream_filter→upstream_peer。 当前,pingora 的生命周期函数并不止是这三个,更多的生命周期函数可以看看这个流程图: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md

request_filter 直接做一些限速逻辑,用来防止被大流量攻击,这里不表,直接跳过。在proxy_upstream_filter我们希望,当处理请求路径以 /api 开头的时候,直接跳过到下一个处理,其他路径都视为前端请求路由路径:

// 处理上游代理的过滤逻辑
// 该函数根据请求路径和方法决定是否继续处理请求,以及如何处理
async fn proxy_upstream_filter(
    &self,
    session: &mut Session,
    _ctx: &mut Self::CTX,
) -> Result<bool>
where
    Self::CTX: Send + Sync,
{
    // 获取请求路径和方法
    let path = session.as_ref().req_header().uri.path();
    let method = session.as_ref().req_header().method.as_ref();

    // 优先处理 server, 直接放行
    if path.starts_with("/api") {
        info!("request path: {},method: {}", path, method);
        return Ok(true);
    }

    // 其他的目前还没有更多的路由,默认走前端页面
    // 实现以下功能
    // location / {
    //     try_files $uri $uri/ /index.html;
    //  }
    let start_path = path.strip_prefix('/').unwrap_or_default();
    let send_body = session.req_header().method != Method::HEAD;
    let content = match Asset::get(start_path) {
        Some(content) => bytes::Bytes::copy_from_slice(&content.data),
        None => {
            let path = "index.html";
            Asset::get(path)
                .map(|b| bytes::Bytes::copy_from_slice(&b.data))
                .unwrap_or(bytes::Bytes::from_static(b"404 Not Found"))
        }
    };

    // 构造响应头
    let header = web_response(path, content.len())?;
    // 写入响应头
    session
        .write_response_header(Box::new(header), !send_body)
        .await?;
    // 根据请求方法决定是否发送响应体
    if send_body {
        session.write_response_body(Some(content), true).await?;
    }

    Ok(false)
}

upstream_peer 是直接处理负载均衡的地方:

// 选择上游服务器并创建 HttpPeer 实例
async fn upstream_peer(
    &self,
    _session: &mut Session,
    _ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
    // let path = session.req_header().uri.path();
    // 优先处理 server

    // 使用负载均衡策略选择上游服务器
    let upstream = self.lb.select(b"", 256).unwrap();

    // 记录日志,输出选定的上游服务器信息
    info!("upstream peer: {upstream:?}");

    // 创建并返回 HttpPeer 实例
    let peer = Box::new(HttpPeer::new(
        upstream,
        false,
        self.target_host.to_owned(),
    ));
    Ok(peer)
}

注意:我这里是比较简陋的写法,事实上,需要根据更复杂的权重和一定的策略来选择上游服务器。

最后为 ServerProxy 实现一个注册实例方法:

/// 将当前实例注册为服务到指定的服务器上
///
/// # Parameters
///
/// * `server`: 一个可变引用,指向服务器实例,以便将HTTP代理服务添加到该服务器
pub fn into_service(self, server: &mut Server) {
    // 根据实例的端口属性构建监听地址
    let addr = format!("0.0.0.0:{}", self.port);
    
    // 创建HTTP代理服务,这里传入服务器配置和当前实例作为参数
    let mut service = http_proxy_service(&server.configuration, self);
    
    // 为HTTP代理服务添加TCP监听地址
    service.add_tcp(&addr);
    
    // 将创建的HTTP代理服务添加到服务器中
    server.add_service(service);
    
    // 记录日志,表明代理服务正在监听的地址
    info!("PP listening on {}", addr);
}

使用

我们来看看怎么使用:

fn main() {

    // 初始化日志系统,设置日志格式为紧凑型,并限定最高日志级别为INFO
    tracing_subscriber::fmt()
        .event_format(format().compact())
        .with_max_level(Level::INFO)
        .init();
    
    // 创建并初始化服务器对象
    let mut pp_server = Server::new(Some(Opt::parse_args())).unwrap();
    pp_server.bootstrap();

    // 根据配置信息构建服务器地址字符串
    let server1 = format!("127.0.0.1:{}", 1200);
    let server2 = format!("127.0.0.1:{}", 1201);

    // 定义上游服务器地址列表
    let up_streams = vec![server,server2];

    // 定义目标主机地址
    let target_host = "127.0.0.1".to_owned();
    // 创建代理对象
    let pp = ServerProxy::new(
        3000,
        up_streams,
        target_host,
    );

    // 将代理对象转换为服务并注册到服务器对象中
    pp.into_service(&mut pp_server);

    // 记录日志:代理服务即将启动
    info!("starting...");
    // 运行服务器,进入事件循环
    pp_server.run_forever();
}

最后启动服务,就得到了一个 Nginx + 前端项目+负载均衡器功能的组合体了。

最后

想必大家也都看到了,对于小型项目或者不太需要高度自定义的项目而言,直接使用 pingora 相比使用 nginx 并没有什么优势。甚至上述这些功能,如果不考虑太高的并发量,完全可以在 web 框架内部实现。

如果想要一个纯 rust 实现的功能直接对标 nginx 的实现,可以看看下面的这个 pingap 或者 proksi。

pingap: https://github.com/vicanso/pingap

proksi: github.com/luizfonseca/proksi

pingora: https://github.com/cloudflare/pingora