Compare commits

...

4 Commits

Author SHA1 Message Date
Horror Proton
2a7d2bb7f3 revert: remove atomic_wait impl temporarily 2024-02-15 10:40:24 +08:00
Horror Proton
834a2b0294 perf: use atomic_wait in Assistant completed_call 2024-02-15 10:31:03 +08:00
Horror Proton
d158150053 perf: unlock m_msg_condvar before notify_all 2024-02-15 10:31:03 +08:00
Horror Proton
075fe5450b refactor: assistant replace thread_idle with enum 2024-02-15 10:31:03 +08:00
2 changed files with 101 additions and 61 deletions

View File

@@ -76,21 +76,22 @@ Assistant::~Assistant()
// which creates empty files with random name on Linux. I have no idea how this could work
ResourceLoader::get_instance().cancel();
m_thread_exit = true;
m_thread_idle = true;
{
// m_thread_exit is locked, in case wait happens after notify
std::unique_lock work_lock { m_mutex };
std::unique_lock call_lock { m_call_mutex };
m_thread_exit = true;
}
{
std::unique_lock<std::mutex> lock(m_call_mutex);
m_call_condvar.notify_all();
}
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condvar.notify_all();
}
{
std::unique_lock<std::mutex> lock(m_msg_mutex);
m_msg_condvar.notify_all();
}
m_run_status = RunStatus::Stopping;
m_call_condvar.notify_all();
m_completed_call_condvar.notify_all();
m_condvar.notify_all();
m_msg_condvar.notify_all();
if (m_working_thread.joinable()) {
m_working_thread.join();
@@ -169,18 +170,15 @@ bool asst::Assistant::ctrl_connect(const std::string& adb_path, const std::strin
std::unique_lock<std::mutex> lock(m_mutex);
// 仍有任务进行connect 前需要 stop
if (!m_thread_idle) {
if (running()) {
return false;
}
m_thread_idle = false;
bool ret = m_ctrler->connect(adb_path, address, config.empty() ? "General" : config);
if (ret) {
m_uuid = m_ctrler->get_uuid();
}
m_thread_idle = true;
return ret;
}
@@ -352,15 +350,12 @@ bool asst::Assistant::start(bool block)
LogTraceFunction;
Log.info("Start |", block ? "block" : "non block");
if (!m_thread_idle) {
return false;
auto old_run_status = RunStatus::Stopped;
while (!m_run_status.compare_exchange_weak(old_run_status, RunStatus::Starting)) {
if (m_thread_exit) return false;
if (old_run_status != RunStatus::Stopped) return false;
}
std::unique_lock<std::mutex> lock;
if (block) { // 外部调用
lock = std::unique_lock<std::mutex>(m_mutex);
}
m_thread_idle = false;
m_running = true;
m_condvar.notify_one();
return true;
@@ -371,11 +366,15 @@ bool Assistant::stop(bool block)
LogTraceFunction;
Log.info("Stop |", block ? "block" : "non block");
m_thread_idle = true;
auto old_run_status = RunStatus::Started;
while (!m_run_status.compare_exchange_weak(old_run_status, RunStatus::Stopping)) {
if (m_thread_exit) return true;
if (old_run_status == RunStatus::Stopping || old_run_status == RunStatus::Stopped) break;
}
std::unique_lock<std::mutex> lock;
std::unique_lock<std::mutex> lock(m_mutex, std::defer_lock);
if (block) { // 外部调用
lock = std::unique_lock<std::mutex>(m_mutex);
lock.lock();
}
m_tasks_list.clear();
@@ -385,7 +384,7 @@ bool Assistant::stop(bool block)
bool asst::Assistant::running() const
{
return m_running;
return m_run_status.load() != RunStatus::Stopped;
}
void Assistant::working_proc()
@@ -393,23 +392,43 @@ void Assistant::working_proc()
LogTraceFunction;
std::vector<TaskId> finished_tasks;
std::unique_lock<std::mutex> lock(m_mutex, std::defer_lock);
while (true) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_thread_exit) {
m_running = false;
if (!lock.owns_lock()) lock.lock();
if (m_thread_exit) [[unlikely]] {
m_run_status = RunStatus::Stopped;
return;
}
if (m_thread_idle || m_tasks_list.empty()) {
auto old_run_status = m_run_status.load();
switch (old_run_status) {
case RunStatus::Stopping: // set status to stopped, block on condvar.wait
// start() stop() would fail, no need to cmpxchg
m_run_status = RunStatus::Stopped;
finished_tasks.clear();
m_thread_idle = true;
m_running = false;
clear_cache();
Log.flush();
[[fallthrough]];
case RunStatus::Stopped:
m_condvar.wait(lock);
continue;
}
m_running = true;
case RunStatus::Starting: // set status to started
m_run_status = RunStatus::Started;
old_run_status = RunStatus::Started;
[[fallthrough]];
case RunStatus::Started: // run task or set status to stopped if there is nothing to runs
if (m_tasks_list.empty()) {
if (!m_run_status.compare_exchange_weak(old_run_status, RunStatus::Stopped)) continue;
finished_tasks.clear();
clear_cache();
Log.flush();
continue;
}
break; //--+
} // | the only way out
// <-+
const auto [id, task_ptr] = m_tasks_list.front();
lock.unlock();
// only one instance of working_proc running, unlock here to allow set_task_param to the running task
@@ -424,20 +443,23 @@ void Assistant::working_proc()
finished_tasks.emplace_back(id);
lock.lock();
if (!m_tasks_list.empty()) {
m_tasks_list.pop_front();
}
for (auto iter = m_tasks_list.begin(); iter != m_tasks_list.end(); ++iter)
if (iter->first == id) {
m_tasks_list.erase(iter);
break;
}
lock.unlock();
auto msg =
m_thread_idle ? AsstMsg::TaskChainStopped : (ret ? AsstMsg::TaskChainCompleted : AsstMsg::TaskChainError);
auto msg = (m_run_status == Assistant::RunStatus::Stopping)
? AsstMsg::TaskChainStopped
: (ret ? AsstMsg::TaskChainCompleted : AsstMsg::TaskChainError);
append_callback(msg, callback_json);
if (m_thread_idle) {
finished_tasks.clear();
if (m_run_status != RunStatus::Started) {
continue;
}
lock.lock();
if (m_tasks_list.empty()) {
callback_json["finished_tasks"] = json::array(finished_tasks);
append_callback(AsstMsg::AllTasksCompleted, callback_json);
@@ -446,10 +468,11 @@ void Assistant::working_proc()
}
const int delay = Config.get_options().task_delay;
lock.lock();
m_condvar.wait_for(lock, std::chrono::milliseconds(delay), [&]() -> bool { return m_thread_idle; });
m_condvar.wait_for(lock, std::chrono::milliseconds(delay),
[this]() -> bool { return m_run_status != RunStatus::Started; });
if (lock.owns_lock()) lock.unlock();
if (m_thread_idle) {
if (m_run_status == RunStatus::Stopping) { // status became 'Stopping' while we were waiting?
append_callback(AsstMsg::TaskChainStopped, callback_json);
}
}
@@ -506,18 +529,22 @@ asst::Assistant::AsyncCallId asst::Assistant::append_async_call(AsyncCallItem::T
bool asst::Assistant::wait_async_id(AsyncCallId id)
{
while (true) {
std::unique_lock<std::mutex> lock(m_completed_call_mutex);
std::unique_lock<std::mutex> lock { m_completed_call_mutex };
if (m_thread_exit) {
return false;
}
auto old = m_completed_call.load();
// 需要保证队列中id一定是有序的
if (id <= m_completed_call) {
if (id <= old) {
return true;
}
#ifdef ASST_USE_ATOMIC_WAIT
m_completed_call.wait(old);
#else
m_completed_call_condvar.wait(lock);
#endif
}
return false;
}
void asst::Assistant::call_proc()
@@ -566,8 +593,9 @@ void asst::Assistant::call_proc()
}
{
std::unique_lock<std::mutex> completed_call_lock(m_completed_call_mutex);
m_completed_call = call_item.id;
std::unique_lock<std::mutex> completed_call_lock(m_completed_call_mutex);
m_completed_call_condvar.notify_all();
}
@@ -606,9 +634,10 @@ void Assistant::append_callback(AsstMsg msg, const json::value& detail)
// 加入回调消息队列,由回调消息线程外抛给外部
Log.info("Assistant::append_callback |", msg, more_detail.to_string());
std::unique_lock<std::mutex> lock(m_msg_mutex);
m_msg_queue.emplace(msg, std::move(more_detail));
{
std::unique_lock<std::mutex> lock(m_msg_mutex);
m_msg_queue.emplace(msg, std::move(more_detail));
}
m_msg_condvar.notify_one();
}

View File

@@ -99,7 +99,7 @@ namespace asst
public:
std::shared_ptr<Controller> ctrler() const { return m_ctrler; }
std::shared_ptr<Status> status() const { return m_status; }
bool need_exit() const { return m_thread_idle; }
bool need_exit() const { return m_run_status.load() == RunStatus::Stopping; }
private:
void append_callback(AsstMsg msg, const json::value& detail);
@@ -156,12 +156,18 @@ namespace asst
std::atomic_bool m_thread_exit = false;
std::list<std::pair<TaskId, std::shared_ptr<InterfaceTask>>> m_tasks_list;
inline static TaskId m_task_id = 0; // 进程级唯一
inline static std::atomic<TaskId> m_task_id = 0; // 进程级唯一
ApiCallback m_callback = nullptr;
void* m_callback_arg = nullptr;
std::atomic_bool m_thread_idle = true;
std::atomic_bool m_running = false;
enum struct RunStatus
{
Stopped = 0,
Starting = 1,
Stopping = 2,
Started = 3,
};
std::atomic<RunStatus> m_run_status = RunStatus::Stopped;
mutable std::mutex m_mutex;
std::condition_variable m_condvar;
@@ -174,9 +180,14 @@ namespace asst
std::mutex m_call_mutex;
std::condition_variable m_call_condvar;
AsyncCallId m_completed_call = 0; // 每个实例有自己独立的执行队列,所以不能静态
std::atomic<AsyncCallId> m_completed_call = 0; // 每个实例有自己独立的执行队列,所以不能静态
// TODO: use atomic_wait instead
std::mutex m_completed_call_mutex;
std::condition_variable m_completed_call_condvar;
// we can use atomic wait on windows, but some
// users seemed to have missing dlls
// ref: https://github.com/MaaAssistantArknights/MaaAssistantArknights/pull/4360
std::thread m_msg_thread;
std::thread m_call_thread;