{"id":681,"date":"2022-03-29T22:12:57","date_gmt":"2022-03-29T14:12:57","guid":{"rendered":"https:\/\/www.fanyamin.com\/wordpress\/?p=681"},"modified":"2022-03-29T22:12:57","modified_gmt":"2022-03-29T14:12:57","slug":"%e7%ba%bf%e7%a8%8b%e5%88%87%e6%8d%a2%e7%9a%84%e7%bb%8f%e5%85%b8%e6%a8%a1%e5%bc%8f","status":"publish","type":"post","link":"https:\/\/www.fanyamin.com\/wordpress\/?p=681","title":{"rendered":"\u7ebf\u7a0b\u5207\u6362\u7684\u7ecf\u5178\u6a21\u5f0f"},"content":{"rendered":"<p>\u4ee5 WebRTC library \u4e3a\u4f8b\uff0c\u8fd9\u4e24\u767e\u884c\u4ee3\u7801\u6f14\u793a\u4e86\u7ebf\u7a0b\u5207\u6362\u7684\u7ecf\u5178\u6a21\u5f0f\u3002<\/p>\n<p>\u4e00\u4e2a\u7ebf\u7a0b\u4f1a\u6709\u4e00\u4e2a\u5bf9\u5e94\u7684\u4efb\u52a1\u961f\u5217\uff0c\u7ebf\u7a0b\u4f1a\u4e0d\u65ad\u68c0\u67e5\u8fd9\u4e2a\u961f\u5217\uff0c\u5982\u679c\u6709\u4efb\u52a1\u5c31\u6267\u884c\uff0c\u65e0\u4efb\u52a1\u5c31\u7b49\u5f85<\/p>\n<p>\u5f80\u8fd9\u4e2a\u4efb\u52a1\u961f\u5217\u4e2d\u63d0\u4ea4\u4e00\u4e2a\u4efb\u52a1\uff0c\u5c31\u7b49\u4e8e\u4ece\u5f53\u524d\u7ebf\u7a0b\u5207\u6362\u4e86\u5230\u4e86\u8fd9\u4e2a\u4efb\u52a1\u7ebf\u7a0b\u3002<\/p>\n<p>\u5f53\u7136\u8fd9\u4e2a\u5b9e\u73b0\u65b9\u9762\u8fd8\u662f\u6709\u4e9b\u7ec6\u8282\uff0c \u4e3a\u4e86\u7ebf\u7a0b\u5b89\u5168\uff0cGetNextTask \u65f6\u9700\u8981\u52a0\u9501<\/p>\n<pre><code class=\"language-cpp\">\n\/*\n *  Copyright 2018 The WebRTC Project Authors. All rights reserved.\n *\n *  Use of this source code is governed by a BSD-style license\n *  that can be found in the LICENSE file in the root of the source\n *  tree. An additional intellectual property rights grant can be found\n *  in the file PATENTS.  All contributing project authors may\n *  be found in the AUTHORS file in the root of the source tree.\n *\/\n\n#include &quot;rtc_base\/task_queue_stdlib.h&quot;\n\n#include &lt;string.h&gt;\n\n#include &lt;algorithm&gt;\n#include &lt;map&gt;\n#include &lt;memory&gt;\n#include &lt;queue&gt;\n#include &lt;utility&gt;\n\n#include &quot;absl\/strings\/string_view.h&quot;\n#include &quot;api\/task_queue\/queued_task.h&quot;\n#include &quot;api\/task_queue\/task_queue_base.h&quot;\n#include &quot;rtc_base\/checks.h&quot;\n#include &quot;rtc_base\/event.h&quot;\n#include &quot;rtc_base\/logging.h&quot;\n#include &quot;rtc_base\/platform_thread.h&quot;\n#include &quot;rtc_base\/synchronization\/mutex.h&quot;\n#include &quot;rtc_base\/thread_annotations.h&quot;\n#include &quot;rtc_base\/time_utils.h&quot;\n\nnamespace webrtc {\nnamespace {\n\nrtc::ThreadPriority TaskQueuePriorityToThreadPriority(\n    TaskQueueFactory::Priority priority) {\n  switch (priority) {\n    case TaskQueueFactory::Priority::HIGH:\n      return rtc::ThreadPriority::kRealtime;\n    case TaskQueueFactory::Priority::LOW:\n      return rtc::ThreadPriority::kLow;\n    case TaskQueueFactory::Priority::NORMAL:\n      return rtc::ThreadPriority::kNormal;\n  }\n}\n\nclass TaskQueueStdlib final : public TaskQueueBase {\n public:\n  TaskQueueStdlib(absl::string_view queue_name, rtc::ThreadPriority priority);\n  ~TaskQueueStdlib() override = default;\n\n  void Delete() override;\n  void PostTask(std::unique_ptr&lt;QueuedTask&gt; task) override;\n  void PostDelayedTask(std::unique_ptr&lt;QueuedTask&gt; task,\n                       uint32_t milliseconds) override;\n\n private:\n  using OrderId = uint64_t;\n\n  struct DelayedEntryTimeout {\n    int64_t next_fire_at_ms_{};\n    OrderId order_{};\n\n    bool operator&lt;(const DelayedEntryTimeout&amp; o) const {\n      return std::tie(next_fire_at_ms_, order_) &lt;\n             std::tie(o.next_fire_at_ms_, o.order_);\n    }\n  };\n\n  struct NextTask {\n    bool final_task_{false};\n    std::unique_ptr&lt;QueuedTask&gt; run_task_;\n    int64_t sleep_time_ms_{};\n  };\n\n  NextTask GetNextTask();\n\n  void ProcessTasks();\n\n  void NotifyWake();\n\n  \/\/ Indicates if the thread has started.\n  rtc::Event started_;\n\n  \/\/ Signaled whenever a new task is pending.\n  rtc::Event flag_notify_;\n\n  Mutex pending_lock_;\n\n  \/\/ Indicates if the worker thread needs to shutdown now.\n  bool thread_should_quit_ RTC_GUARDED_BY(pending_lock_){false};\n\n  \/\/ Holds the next order to use for the next task to be\n  \/\/ put into one of the pending queues.\n  OrderId thread_posting_order_ RTC_GUARDED_BY(pending_lock_){};\n\n  \/\/ The list of all pending tasks that need to be processed in the\n  \/\/ FIFO queue ordering on the worker thread.\n  std::queue&lt;std::pair&lt;OrderId, std::unique_ptr&lt;QueuedTask&gt;&gt;&gt; pending_queue_\n      RTC_GUARDED_BY(pending_lock_);\n\n  \/\/ The list of all pending tasks that need to be processed at a future\n  \/\/ time based upon a delay. On the off change the delayed task should\n  \/\/ happen at exactly the same time interval as another task then the\n  \/\/ task is processed based on FIFO ordering. std::priority_queue was\n  \/\/ considered but rejected due to its inability to extract the\n  \/\/ std::unique_ptr out of the queue without the presence of a hack.\n  std::map&lt;DelayedEntryTimeout, std::unique_ptr&lt;QueuedTask&gt;&gt; delayed_queue_\n      RTC_GUARDED_BY(pending_lock_);\n\n  \/\/ Contains the active worker thread assigned to processing\n  \/\/ tasks (including delayed tasks).\n  \/\/ Placing this last ensures the thread doesn&#039;t touch uninitialized attributes\n  \/\/ throughout it&#039;s lifetime.\n  rtc::PlatformThread thread_;\n};\n\nTaskQueueStdlib::TaskQueueStdlib(absl::string_view queue_name,\n                                 rtc::ThreadPriority priority)\n    : started_(\/*manual_reset=*\/false, \/*initially_signaled=*\/false),\n      flag_notify_(\/*manual_reset=*\/false, \/*initially_signaled=*\/false),\n      thread_(rtc::PlatformThread::SpawnJoinable(\n          [this] {\n            CurrentTaskQueueSetter set_current(this);\n            ProcessTasks();\n          },\n          queue_name,\n          rtc::ThreadAttributes().SetPriority(priority))) {\n  started_.Wait(rtc::Event::kForever);\n}\n\nvoid TaskQueueStdlib::Delete() {\n  RTC_DCHECK(!IsCurrent());\n\n  {\n    MutexLock lock(&amp;pending_lock_);\n    thread_should_quit_ = true;\n  }\n\n  NotifyWake();\n\n  delete this;\n}\n\nvoid TaskQueueStdlib::PostTask(std::unique_ptr&lt;QueuedTask&gt; task) {\n  {\n    MutexLock lock(&amp;pending_lock_);\n    OrderId order = thread_posting_order_++;\n\n    pending_queue_.push(std::pair&lt;OrderId, std::unique_ptr&lt;QueuedTask&gt;&gt;(\n        order, std::move(task)));\n  }\n\n  NotifyWake();\n}\n\nvoid TaskQueueStdlib::PostDelayedTask(std::unique_ptr&lt;QueuedTask&gt; task,\n                                      uint32_t milliseconds) {\n  auto fire_at = rtc::TimeMillis() + milliseconds;\n\n  DelayedEntryTimeout delay;\n  delay.next_fire_at_ms_ = fire_at;\n\n  {\n    MutexLock lock(&amp;pending_lock_);\n    delay.order_ = ++thread_posting_order_;\n    delayed_queue_[delay] = std::move(task);\n  }\n\n  NotifyWake();\n}\n\nTaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {\n  NextTask result{};\n\n  auto tick = rtc::TimeMillis();\n\n  MutexLock lock(&amp;pending_lock_);\n\n  if (thread_should_quit_) {\n    result.final_task_ = true;\n    return result;\n  }\n\n  if (delayed_queue_.size() &gt; 0) {\n    auto delayed_entry = delayed_queue_.begin();\n    const auto&amp; delay_info = delayed_entry-&gt;first;\n    auto&amp; delay_run = delayed_entry-&gt;second;\n    if (tick &gt;= delay_info.next_fire_at_ms_) {\n      if (pending_queue_.size() &gt; 0) {\n        auto&amp; entry = pending_queue_.front();\n        auto&amp; entry_order = entry.first;\n        auto&amp; entry_run = entry.second;\n        if (entry_order &lt; delay_info.order_) {\n          result.run_task_ = std::move(entry_run);\n          pending_queue_.pop();\n          return result;\n        }\n      }\n\n      result.run_task_ = std::move(delay_run);\n      delayed_queue_.erase(delayed_entry);\n      return result;\n    }\n\n    result.sleep_time_ms_ = delay_info.next_fire_at_ms_ - tick;\n  }\n\n  if (pending_queue_.size() &gt; 0) {\n    auto&amp; entry = pending_queue_.front();\n    result.run_task_ = std::move(entry.second);\n    pending_queue_.pop();\n  }\n\n  return result;\n}\n\nvoid TaskQueueStdlib::ProcessTasks() {\n  started_.Set();\n\n  while (true) {\n    auto task = GetNextTask();\n\n    if (task.final_task_)\n      break;\n\n    if (task.run_task_) {\n      \/\/ process entry immediately then try again\n      QueuedTask* release_ptr = task.run_task_.release();\n      if (release_ptr-&gt;Run())\n        delete release_ptr;\n\n      \/\/ attempt to sleep again\n      continue;\n    }\n\n    if (0 == task.sleep_time_ms_)\n      flag_notify_.Wait(rtc::Event::kForever);\n    else\n      flag_notify_.Wait(task.sleep_time_ms_);\n  }\n}\n\nvoid TaskQueueStdlib::NotifyWake() {\n  \/\/ The queue holds pending tasks to complete. Either tasks are to be\n  \/\/ executed immediately or tasks are to be run at some future delayed time.\n  \/\/ For immediate tasks the task queue&#039;s thread is busy running the task and\n  \/\/ the thread will not be waiting on the flag_notify_ event. If no immediate\n  \/\/ tasks are available but a delayed task is pending then the thread will be\n  \/\/ waiting on flag_notify_ with a delayed time-out of the nearest timed task\n  \/\/ to run. If no immediate or pending tasks are available, the thread will\n  \/\/ wait on flag_notify_ until signaled that a task has been added (or the\n  \/\/ thread to be told to shutdown).\n\n  \/\/ In all cases, when a new immediate task, delayed task, or request to\n  \/\/ shutdown the thread is added the flag_notify_ is signaled after. If the\n  \/\/ thread was waiting then the thread will wake up immediately and re-assess\n  \/\/ what task needs to be run next (i.e. run a task now, wait for the nearest\n  \/\/ timed delayed task, or shutdown the thread). If the thread was not waiting\n  \/\/ then the thread will remained signaled to wake up the next time any\n  \/\/ attempt to wait on the flag_notify_ event occurs.\n\n  \/\/ Any immediate or delayed pending task (or request to shutdown the thread)\n  \/\/ must always be added to the queue prior to signaling flag_notify_ to wake\n  \/\/ up the possibly sleeping thread. This prevents a race condition where the\n  \/\/ thread is notified to wake up but the task queue&#039;s thread finds nothing to\n  \/\/ do so it waits once again to be signaled where such a signal may never\n  \/\/ happen.\n  flag_notify_.Set();\n}\n\nclass TaskQueueStdlibFactory final : public TaskQueueFactory {\n public:\n  std::unique_ptr&lt;TaskQueueBase, TaskQueueDeleter&gt; CreateTaskQueue(\n      absl::string_view name,\n      Priority priority) const override {\n    return std::unique_ptr&lt;TaskQueueBase, TaskQueueDeleter&gt;(\n        new TaskQueueStdlib(name, TaskQueuePriorityToThreadPriority(priority)));\n  }\n};\n\n}  \/\/ namespace\n\nstd::unique_ptr&lt;TaskQueueFactory&gt; CreateTaskQueueStdlibFactory() {\n  return std::make_unique&lt;TaskQueueStdlibFactory&gt;();\n}\n\n}  \/\/ namespace webrtc\n<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>\u4ee5 WebRTC library \u4e3a\u4f8b\uff0c\u8fd9\u4e24\u767e\u884c\u4ee3\u7801\u6f14\u793a\u4e86\u7ebf\u7a0b\u5207\u6362\u7684\u7ecf\u5178\u6a21\u5f0f\u3002 \u4e00\u4e2a\u7ebf\u7a0b\u4f1a\u6709\u4e00\u4e2a\u5bf9\u5e94\u7684\u4efb\u52a1\u961f\u5217\uff0c\u7ebf\u7a0b\u4f1a\u4e0d\u65ad\u68c0\u67e5\u8fd9\u4e2a\u961f\u5217\uff0c\u5982\u679c\u6709\u4efb\u52a1\u5c31\u6267\u884c\uff0c\u65e0\u4efb\u52a1\u5c31\u7b49\u5f85 \u5f80\u8fd9\u4e2a\u4efb\u52a1\u961f\u5217\u4e2d\u63d0\u4ea4\u4e00\u4e2a\u4efb\u52a1\uff0c\u5c31\u7b49\u4e8e\u4ece\u5f53\u524d\u7ebf\u7a0b\u5207\u6362\u4e86\u5230\u4e86\u8fd9\u4e2a\u4efb\u52a1\u7ebf\u7a0b\u3002 \u5f53\u7136\u8fd9\u4e2a\u5b9e\u73b0\u65b9\u9762\u8fd8\u662f\u6709\u4e9b\u7ec6\u8282\uff0c \u4e3a\u4e86\u7ebf\u7a0b\u5b89\u5168\uff0cGetNextTask \u65f6\u9700\u8981\u52a0\u9501 \/* * Copyright 2018 The WebRTC Project Authors. All rights reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights [&hellip;] <a class=\"read-more\" href=\"https:\/\/www.fanyamin.com\/wordpress\/?p=681\" title=\"Permanent Link to: \u7ebf\u7a0b\u5207\u6362\u7684\u7ecf\u5178\u6a21\u5f0f\">&rarr;Read&nbsp;more<\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-681","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/www.fanyamin.com\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/681"}],"collection":[{"href":"https:\/\/www.fanyamin.com\/wordpress\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.fanyamin.com\/wordpress\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.fanyamin.com\/wordpress\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.fanyamin.com\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=681"}],"version-history":[{"count":1,"href":"https:\/\/www.fanyamin.com\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/681\/revisions"}],"predecessor-version":[{"id":682,"href":"https:\/\/www.fanyamin.com\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/681\/revisions\/682"}],"wp:attachment":[{"href":"https:\/\/www.fanyamin.com\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=681"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.fanyamin.com\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=681"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.fanyamin.com\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=681"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}