WebRTC 源码阅读笔记之 TrendlineEstimator

Table of Contents

Trendline

趋势线估算器所实现的接口称为 DelayIncreaseDetectorInterface, 意谓延迟增长检测器.

每一个反馈的 PacketResult 都会用来更新这个趋势线, 这个趋势线的斜率反应了网络延迟的变化,
可以从趋势线的斜率和自适应的阈值可能得出带宽的使用状况:

  • kBwNormal: 带宽使用正常
  • kBwUnderusing: 带宽使用不足
  • kBwOverusing: 带宽使用过度

class TrendlineEstimator : public DelayIncreaseDetectorInterface {
 public:
  TrendlineEstimator(const WebRtcKeyValueConfig* key_value_config,
                     NetworkStatePredictor* network_state_predictor);

  ~TrendlineEstimator() override;
/*
* 用新的采样来更新估算器, 这个 delta 是指包组中第一个包和最后一个包之间的发送时间,或接收时间差
* Update the estimator with a new sample. The deltas should represent deltas
* between timestamp groups as defined by the InterArrival class.
* parameters: 
*   recv_delta_ms: received time delta between the first packet and last packet of a group
*   send_delta_ms: send time delta between the first packet and last packet of a group 
*   send_time_ms: packet 的发送时间
*   arrival_time_ms:  packet 的到达时间
*   packet_size: packet 的大小
*   calculated_deltas: 要不要计算 deltas 
*/
  void Update(double recv_delta_ms,
              double send_delta_ms,
              int64_t send_time_ms,
              int64_t arrival_time_ms,
              size_t packet_size,
              bool calculated_deltas) override;
  //如果接收的样本累积到了一定数量(window_size),就来更新趋势线
  void UpdateTrendline(double recv_delta_ms,
                       double send_delta_ms,
                       int64_t send_time_ms,
                       int64_t arrival_time_ms,
                       size_t packet_size);
  //获取网络带宽的使用状态:正常,使用过度,还是使用不足
  BandwidthUsage State() const override;
 //趋势线的 x 轴就是 arrival_time_ms(其实要减小第一个包的到达时间), y 轴是 smoothed_delay_ms (平滑过的延迟时间差)
  struct PacketTiming {
    PacketTiming(double arrival_time_ms,
                 double smoothed_delay_ms,
                 double raw_delay_ms)
        : arrival_time_ms(arrival_time_ms),
          smoothed_delay_ms(smoothed_delay_ms),
          raw_delay_ms(raw_delay_ms) {}
    double arrival_time_ms;
    double smoothed_delay_ms;
    double raw_delay_ms;
  };

 private:
  friend class GoogCcStatePrinter;
  //根据趋势,和可调节的阈值来检测网络状况
  void Detect(double trend, double ts_delta, int64_t now_ms);
  //根据当前计算出来的趋势(增益过的), 适时地修正阈值, 以提高其合理性和敏感度, 固定不变的阈值可能反应迟缓, 可能会被 TCP 的拥塞控制给饿死
  void UpdateThreshold(double modified_offset, int64_t now_ms);

  // Parameters.
  TrendlineEstimatorSettings settings_;
  const double smoothing_coef_;
  const double threshold_gain_;
  // Used by the existing threshold.
  int num_of_deltas_;
  // Keep the arrival times small by using the change from the first packet.
  int64_t first_arrival_time_ms_;
  // Exponential backoff filtering.
  double accumulated_delay_;
  double smoothed_delay_;
  // Linear least squares regression.
  std::deque<PacketTiming> delay_hist_;

  const double k_up_;
  const double k_down_;
  double overusing_time_threshold_;
  double threshold_;
  double prev_modified_trend_;
  int64_t last_update_ms_;
  double prev_trend_;
  double time_over_using_;
  int overuse_counter_;
  BandwidthUsage hypothesis_;
  BandwidthUsage hypothesis_predicted_;
  NetworkStatePredictor* network_state_predictor_;

  RTC_DISALLOW_COPY_AND_ASSIGN(TrendlineEstimator);
};

配置类决定了

  • 要不要将样本排序
  • 要不要根据从开始的 7 个包(beginning_packets=7)到结束的 7 个包(end_packets=7)的最小延迟之间的差计算出最大斜率来限制趋势线的斜率的最大值
  • 窗口大小也就是用不计算趋势线斜率的队列长度, 默认为20
struct TrendlineEstimatorSettings {
  static constexpr char kKey[] = "WebRTC-Bwe-TrendlineEstimatorSettings";
  static constexpr unsigned kDefaultTrendlineWindowSize = 20;

  TrendlineEstimatorSettings() = delete;
  explicit TrendlineEstimatorSettings(
      const WebRtcKeyValueConfig* key_value_config);

  // Sort the packets in the window. Should be redundant,
  // but then almost no cost.
  bool enable_sort = false;

  // Cap the trendline slope based on the minimum delay seen
  // in the beginning_packets and end_packets respectively.
  bool enable_cap = false;
  unsigned beginning_packets = 7;
  unsigned end_packets = 7;
  double cap_uncertainty = 0.0;
  //用来计算趋势线的窗口中的包的个数,默认为20
  // Size (in packets) of the window. 
  unsigned window_size = kDefaultTrendlineWindowSize;

  std::unique_ptr<StructParametersParser> Parser();
};

细节很多, 根据最小二乘法算出斜率, 将斜率乘以一个增益,再和 threshold 进行比较, 以决定是否使用不足, 还是使用过度.

TrendlineEstimator::TrendlineEstimator(
    const WebRtcKeyValueConfig* key_value_config,
    NetworkStatePredictor* network_state_predictor)
    : settings_(key_value_config),
      smoothing_coef_(kDefaultTrendlineSmoothingCoeff),
      threshold_gain_(kDefaultTrendlineThresholdGain),
      num_of_deltas_(0),
      first_arrival_time_ms_(-1),
      accumulated_delay_(0),
      smoothed_delay_(0),
      delay_hist_(),
      k_up_(0.0087),//阈值向上调整的系数
      k_down_(0.039), //阈值向下调整的系数
      overusing_time_threshold_(kOverUsingTimeThreshold),//= 10;
      threshold_(12.5),
      prev_modified_trend_(NAN),
      last_update_ms_(-1),
      prev_trend_(0.0),
      time_over_using_(-1),
      overuse_counter_(0),
      hypothesis_(BandwidthUsage::kBwNormal),
      hypothesis_predicted_(BandwidthUsage::kBwNormal),
      network_state_predictor_(network_state_predictor) {
  RTC_LOG(LS_INFO)
      << "Using Trendline filter for delay change estimation with settings "
      << settings_.Parser()->Encode() << " and "
      << (network_state_predictor_ ? "injected" : "no")
      << " network state predictor";
}

以最新样本更新趋势线

这里用到了EWMA对累积的延迟梯度做平滑, EWMA(Exponentially Weighted Moving Average ) 指数加权移动平滑法(Exponential Smoothing)是在移动平均法基础上发展起来的一种时间序列分析预测法.


void TrendlineEstimator::UpdateTrendline(double recv_delta_ms,
                                         double send_delta_ms,
                                         int64_t send_time_ms,
                                         int64_t arrival_time_ms,
                                         size_t packet_size) {
  const double delta_ms = recv_delta_ms - send_delta_ms;
  ++num_of_deltas_;
  //使用最大不超过1000个样本
  num_of_deltas_ = std::min(num_of_deltas_, kDeltaCounterMax);
  if (first_arrival_time_ms_ == -1)
    first_arrival_time_ms_ = arrival_time_ms;
  //指数退避过滤,就是EWMA 指数加权平均,权重为 0.9
  // Exponential backoff filter.
  accumulated_delay_ += delta_ms;
  BWE_TEST_LOGGING_PLOT(1, "accumulated_delay_ms", arrival_time_ms,
                        accumulated_delay_);
  smoothed_delay_ = smoothing_coef_ * smoothed_delay_ +
                    (1 - smoothing_coef_) * accumulated_delay_;
  BWE_TEST_LOGGING_PLOT(1, "smoothed_delay_ms", arrival_time_ms,
                        smoothed_delay_);

  RTC_LOG(LS_INFO) << "[BWE_TRACE] accumulated_delay_ms, " << arrival_time_ms << ", " << accumulated_delay_;
  RTC_LOG(LS_INFO) << "[BWE_TRACE] smoothed_delay_ms, " << arrival_time_ms << ", " << smoothed_delay_;

  // Maintain packet window
  delay_hist_.emplace_back(
      static_cast<double>(arrival_time_ms - first_arrival_time_ms_),
      smoothed_delay_, accumulated_delay_);
  if (settings_.enable_sort) {
    for (size_t i = delay_hist_.size() - 1;
         i > 0 &&
         delay_hist_[i].arrival_time_ms < delay_hist_[i - 1].arrival_time_ms;
         --i) {
      std::swap(delay_hist_[i], delay_hist_[i - 1]);
    }
  }
  if (delay_hist_.size() > settings_.window_size)
    delay_hist_.pop_front();

  // Simple linear regression.
  double trend = prev_trend_;
  if (delay_hist_.size() == settings_.window_size) {
    // Update trend_ if it is possible to fit a line to the data. The delay
    // trend can be seen as an estimate of (send_rate - capacity)/capacity.
    // 0 < trend < 1   ->  the delay increases, queues are filling up
    //   trend == 0    ->  the delay does not change
    //   trend < 0     ->  the delay decreases, queues are being emptied
    trend = LinearFitSlope(delay_hist_).value_or(trend);
    if (settings_.enable_cap) {
      absl::optional<double> cap = ComputeSlopeCap(delay_hist_, settings_);
      // We only use the cap to filter out overuse detections, not
      // to detect additional underuses.
      if (trend >= 0 && cap.has_value() && trend > cap.value()) {
        trend = cap.value();
      }
    }
  }
  BWE_TEST_LOGGING_PLOT(1, "trendline_slope", arrival_time_ms, trend);
  RTC_LOG(LS_INFO) << "[BWE_TRACE] trendline_slope, " << arrival_time_ms << ", " << trend;

  Detect(trend, send_delta_ms, arrival_time_ms);
}

队列中的样本累积到了20个,就开始计算斜率

最小二乘法, 公式如下

absl::optional<double> LinearFitSlope(
    const std::deque<TrendlineEstimator::PacketTiming>& packets) {
  RTC_DCHECK(packets.size() >= 2);
  // Compute the "center of mass".
  double sum_x = 0;
  double sum_y = 0;
  for (const auto& packet : packets) {
    sum_x += packet.arrival_time_ms;
    sum_y += packet.smoothed_delay_ms;
  }
  double x_avg = sum_x / packets.size();
  double y_avg = sum_y / packets.size();
  // Compute the slope k = \sum (x_i-x_avg)(y_i-y_avg) / \sum (x_i-x_avg)^2
  double numerator = 0;
  double denominator = 0;
  for (const auto& packet : packets) {
    double x = packet.arrival_time_ms;
    double y = packet.smoothed_delay_ms;
    numerator += (x - x_avg) * (y - y_avg);
    denominator += (x - x_avg) * (x - x_avg);
  }
  if (denominator == 0)
    return absl::nullopt;
  return numerator / denominator;
}

检测是否带宽使用状态

void TrendlineEstimator::Detect(double trend, double ts_delta, int64_t now_ms) {
  if (num_of_deltas_ < 2) {
    hypothesis_ = BandwidthUsage::kBwNormal;
    return;
  }
  //这个斜率要乘以样本的数量(最小为 kMinNumDeltas = 60), 再乘以一个增益值 threshold_gain_(默认为kDefaultTrendlineThresholdGain = 4), 要不然太小了, 比较起来不方便

  const double modified_trend =
      std::min(num_of_deltas_, kMinNumDeltas) * trend * threshold_gain_;
  prev_modified_trend_ = modified_trend;
  BWE_TEST_LOGGING_PLOT(1, "T", now_ms, modified_trend);
  BWE_TEST_LOGGING_PLOT(1, "threshold", now_ms, threshold_);

  RTC_LOG(LS_INFO) << "[BWE_TRACE] T, " << now_ms << ", " << modified_trend;
  RTC_LOG(LS_INFO) << "[BWE_TRACE] threshold, " << now_ms << ", " << threshold_;
  //如果增益过的斜率大于 threshold, 再检查过度使用的时间和次数
  if (modified_trend > threshold_) {
    if (time_over_using_ == -1) {
      // Initialize the timer. Assume that we've been
      // over-using half of the time since the previous
      // sample.
      time_over_using_ = ts_delta / 2;
    } else {
      // Increment timer
      time_over_using_ += ts_delta;
    }
    overuse_counter_++;
    //如果过度使用的时间差 > 过度使用的阈值, 并且过度使用的次数大于1
    if (time_over_using_ > overusing_time_threshold_ && overuse_counter_ > 1) {
      if (trend >= prev_trend_) {
        time_over_using_ = 0;
        overuse_counter_ = 0;
        hypothesis_ = BandwidthUsage::kBwOverusing;
      }
    }
  } else if (modified_trend < -threshold_) {
    time_over_using_ = -1;
    overuse_counter_ = 0;
    hypothesis_ = BandwidthUsage::kBwUnderusing;
  } else {
    time_over_using_ = -1;
    overuse_counter_ = 0;
    hypothesis_ = BandwidthUsage::kBwNormal;
  }
  prev_trend_ = trend;
  UpdateThreshold(modified_trend, now_ms);
}   

修改 Threshold

为避免路由队列过小或由于并发的TCP flow 竞争所造成的饥饿, 这个阈值的设置很关键. 阈值如果太小会对于网络的瞬时干扰过于敏感, 如果太大则会反应太迟钝, 很难设置一个合适的值. GCC v2 采用了一种在 GCC v1 中定义的自适应的阈值 Adaptive threshold


constexpr double kMaxAdaptOffsetMs = 15.0;
constexpr double kOverUsingTimeThreshold = 10;
constexpr int kMinNumDeltas = 60;
constexpr int kDeltaCounterMax = 1000;

void TrendlineEstimator::UpdateThreshold(double modified_trend,
                                         int64_t now_ms) {
  if (last_update_ms_ == -1)
    last_update_ms_ = now_ms;
  //如果 modified_trend 比当前的阈值的差值超过 15, 这很可能是一个突发情况,不作修改
  if (fabs(modified_trend) > threshold_ + kMaxAdaptOffsetMs) {
    // Avoid adapting the threshold to big latency spikes, caused e.g.,
    // by a sudden capacity drop.
    last_update_ms_ = now_ms;
    return;
  }
  //预设的调整系数  k_up_(0.0087),  k_down_(0.039),
  const double k = fabs(modified_trend) < threshold_ ? k_down_ : k_up_;
  const int64_t kMaxTimeDeltaMs = 100;
  //取当前与上次修改之间的样本数, 不大于100
  int64_t time_delta_ms = std::min(now_ms - last_update_ms_, kMaxTimeDeltaMs);
  //threshold_ 加上或减去调整的值
  threshold_ += k * (fabs(modified_trend) - threshold_) * time_delta_ms;
  //确保 threshold_ 在 6 和 600 之间
  threshold_ = rtc::SafeClamp(threshold_, 6.f, 600.f);
  last_update_ms_ = now_ms;
}

DelayBasedBwe

  • 基于延迟的带宽估算器, 其中的 video_delaydetector 及 audio_delaydetector 就是 TrendlineEstimator
video_delay_detector_(new TrendlineEstimator(key_value_config_, network_state_predictor_)),
audio_delay_detector_(new TrendlineEstimator(key_value_config_, network_state_predictor_)),

InterArrivalDelta


// Helper class to compute the inter-arrival time delta and the size delta
// between two send bursts. This code is branched from
// modules/remote_bitrate_estimator/inter_arrival.
class InterArrivalDelta {
 public:
  // After this many packet groups received out of order InterArrival will
  // reset, assuming that clocks have made a jump.
  static constexpr int kReorderedResetThreshold = 3;
  static constexpr TimeDelta kArrivalTimeOffsetThreshold =
      TimeDelta::Seconds(3);

  // A send time group is defined as all packets with a send time which are at
  // most send_time_group_length older than the first timestamp in that
  // group.
  explicit InterArrivalDelta(TimeDelta send_time_group_length);

  InterArrivalDelta() = delete;
  InterArrivalDelta(const InterArrivalDelta&) = delete;
  InterArrivalDelta& operator=(const InterArrivalDelta&) = delete;

  // This function returns true if a delta was computed, or false if the current
  // group is still incomplete or if only one group has been completed.
  // `send_time` is the send time.
  // `arrival_time` is the time at which the packet arrived.
  // `packet_size` is the size of the packet.
  // `timestamp_delta` (output) is the computed send time delta.
  // `arrival_time_delta_ms` (output) is the computed arrival-time delta.
  // `packet_size_delta` (output) is the computed size delta.
  bool ComputeDeltas(Timestamp send_time,
                     Timestamp arrival_time,
                     Timestamp system_time,
                     size_t packet_size,
                     TimeDelta* send_time_delta,
                     TimeDelta* arrival_time_delta,
                     int* packet_size_delta);

 private:
  struct SendTimeGroup {
    SendTimeGroup()
        : size(0),
          first_send_time(Timestamp::MinusInfinity()),
          send_time(Timestamp::MinusInfinity()),
          first_arrival(Timestamp::MinusInfinity()),
          complete_time(Timestamp::MinusInfinity()),
          last_system_time(Timestamp::MinusInfinity()) {}

    bool IsFirstPacket() const { return complete_time.IsInfinite(); }

    size_t size;
    Timestamp first_send_time;
    Timestamp send_time;
    Timestamp first_arrival;
    Timestamp complete_time;
    Timestamp last_system_time;
  };

  // Returns true if the last packet was the end of the current batch and the
  // packet with `send_time` is the first of a new batch.
  bool NewTimestampGroup(Timestamp arrival_time, Timestamp send_time) const;

  bool BelongsToBurst(Timestamp arrival_time, Timestamp send_time) const;

  void Reset();

  const TimeDelta send_time_group_length_;
  SendTimeGroup current_timestamp_group_;
  SendTimeGroup prev_timestamp_group_;
  int num_consecutive_reordered_packets_;
};

这个类用来计算两次发送高峰之间的 "到达时间差 inter-arrival time" 和 "大小差 size delta", 方法为

  bool NewTimestampGroup(Timestamp arrival_time, Timestamp send_time) const;

  bool BelongsToBurst(Timestamp arrival_time, Timestamp send_time) const;

如何判断哪些包属于一个包组 group, 方法为 NewTimestampGroup 和 BelongsToBurst

constexpr TimeDelta kSendTimeGroupLength = TimeDelta::Millis(5);

// Assumes that `timestamp` is not reordered compared to
// `current_timestamp_group_`.
bool InterArrivalDelta::NewTimestampGroup(Timestamp arrival_time,
                                          Timestamp send_time) const {
  if (current_timestamp_group_.IsFirstPacket()) {
    return false;
  } else if (BelongsToBurst(arrival_time, send_time)) {
    return false;
  } else {
    return send_time - current_timestamp_group_.first_send_time >
           send_time_group_length_;
  }
}

bool InterArrivalDelta::BelongsToBurst(Timestamp arrival_time,
                                       Timestamp send_time) const {
  RTC_DCHECK(current_timestamp_group_.complete_time.IsFinite());
  TimeDelta arrival_time_delta =
      arrival_time - current_timestamp_group_.complete_time;
  TimeDelta send_time_delta = send_time - current_timestamp_group_.send_time;
  if (send_time_delta.IsZero())
    return true;
  TimeDelta propagation_delta = arrival_time_delta - send_time_delta;
  if (propagation_delta < TimeDelta::Zero() &&
      arrival_time_delta <= kBurstDeltaThreshold &&
      arrival_time - current_timestamp_group_.first_arrival < kMaxBurstDuration)
    return true;
  return false;
}

Log

[034:772][47868] (rtp_transport_controller_send.cc:627): Creating fallback congestion controller
[034:772][28552] (peer_connection.cc:1827): Changing IceConnectionState 1 => 2
[034:772][48504] (rtp_transport_controller_send.cc:370): SignalNetworkState Up
[034:772][47868] (loss_based_bwe_v2.cc:113): The configuration does not specify that the estimator should be enabled, disabling it.
[034:772][47868] (alr_experiment.cc:79): Using ALR experiment settings: pacing factor: 1, max pacer queue length: 2875, ALR bandwidth usage percent: 80, ALR start budget level percent: 40, ALR end budget level percent: -60, ALR experiment group ID: 3
[034:772][47868] (trendline_estimator.cc:185): Using Trendline filter for delay change estimation with settings sort:false,cap:false,beginning_packets:7,end_packets:7,cap_uncertainty:0,window_size:20 and no network state predictor
[034:772][47868] (trendline_estimator.cc:185): Using Trendline filter for delay change estimation with settings sort:false,cap:false,beginning_packets:7,end_packets:7,cap_uncertainty:0,window_size:20 and no network state predictor
[034:772][47868] (aimd_rate_control.cc:112): Using aimd rate control with back off factor 0.85
[034:772][47868] (delay_based_bwe.cc:88): Initialized DelayBasedBwe with separate audio overuse detectionenabled:false,packet_threshold:10,time_threshold:1 s and alr limited backoff disabled
[034:772][47868] (delay_based_bwe.cc:301): BWE Setting start bitrate to: 300 kbps
[034:772][47868] (goog_cc_network_control.cc:648): [BWE_TRACE] fraction_loss, 538940584,0
[034:772][47868] (goog_cc_network_control.cc:649): [BWE_TRACE] rtt_ms, 538940584,0
[034:772][47868] (goog_cc_network_control.cc:650): [BWE_TRACE] Target_bitrate_kbps, 538940584,300
[034:772][47868] (probe_controller.cc:280): Measured bitrate: 300000 Minimum to probe further: 1260000
[034:772][47868] (goog_cc_network_control.cc:711): bwe 538940584 pushback_target_bps=300000 estimate_bps=300000
[034:772][47868] (bitrate_allocator.cc:394): Current BWE 300000
[034:772][14136] (pacing_controller.cc:229): bwe:pacer_updated pacing_kbps=330 padding_budget_kbps=0
[034:772][14136] (bitrate_prober.cc:114): Probe cluster (bitrate:min bytes:min packets): (900000:1688:5)
[034:772][14136] (bitrate_prober.cc:114): Probe cluster (bitrate:min bytes:min packets): (1800000:3375:5)

Comments |0|

Legend *) Required fields are marked
**) You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>
Category: Uncategorized