From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from simark.ca by simark.ca with LMTP id iFIcBAAdGWhjUhYAWB0awg (envelope-from ) for ; Mon, 05 May 2025 16:18:08 -0400 Received: by simark.ca (Postfix, from userid 112) id 0C0F21E10E; Mon, 5 May 2025 16:18:08 -0400 (EDT) X-Spam-Checker-Version: SpamAssassin 4.0.1 (2024-03-25) on simark.ca X-Spam-Level: X-Spam-Status: No, score=-9.0 required=5.0 tests=ARC_SIGNED,ARC_VALID,BAYES_00, MAILING_LIST_MULTI,RCVD_IN_DNSWL_MED,RCVD_IN_VALIDITY_CERTIFIED, RCVD_IN_VALIDITY_RPBL,RCVD_IN_VALIDITY_SAFE autolearn=unavailable autolearn_force=no version=4.0.1 Received: from server2.sourceware.org (server2.sourceware.org [8.43.85.97]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature ECDSA (prime256v1) server-digest SHA256) (No client certificate requested) by simark.ca (Postfix) with ESMTPS id 4769C1E089 for ; Mon, 5 May 2025 16:18:07 -0400 (EDT) Received: from server2.sourceware.org (localhost [IPv6:::1]) by sourceware.org (Postfix) with ESMTP id CE1D33857836 for ; Mon, 5 May 2025 20:18:06 +0000 (GMT) DKIM-Filter: OpenDKIM Filter v2.11.0 sourceware.org CE1D33857836 Received: from simark.ca (simark.ca [158.69.221.121]) by sourceware.org (Postfix) with ESMTPS id 88106385772F for ; Mon, 5 May 2025 20:15:58 +0000 (GMT) DMARC-Filter: OpenDMARC Filter v1.4.2 sourceware.org 88106385772F Authentication-Results: sourceware.org; dmarc=fail (p=none dis=none) header.from=efficios.com Authentication-Results: sourceware.org; spf=fail smtp.mailfrom=efficios.com ARC-Filter: OpenARC Filter v1.0.0 sourceware.org 88106385772F Authentication-Results: server2.sourceware.org; arc=none smtp.remote-ip=158.69.221.121 ARC-Seal: i=1; a=rsa-sha256; d=sourceware.org; s=key; t=1746476158; cv=none; b=MCYSxgBKEfkHMXbXtnBTP4ex2pP4p7ZKrFUv+yD0PIlkkPFkT+V/4WlYDdT3/cLEtayMLTXdIPI+Nx//fnVYb3Ilx9fSLUiQaQKQd5mH86Qo2wMJ+hSB3bGZ9V74W0RUqf3YBwLElVi53uW9DBrgToqXTUHgP22katkwqNxpSDA= ARC-Message-Signature: i=1; a=rsa-sha256; d=sourceware.org; s=key; t=1746476158; c=relaxed/simple; bh=IybSEUTyelbRxDs/hjPBjtgZ3QjXzki5XJgO32tXwj8=; h=From:To:Subject:Date:Message-ID:MIME-Version; b=gZ6PaAI2sVN3YUPLhnqLVKUWNf/gj+hbhv2an4r4YI6yX3IdJr/VHADGE0bf3Km9SCDWKuP1U6AL9iWwjpJmuQUwH4XCXN2H1afUfln6NY98Jt/hA2MIQh5II+nDE1ZfksmQERKYx2LkjUTNs6iFvx2+2gYFvX7Uf4BvIfAdOSE= ARC-Authentication-Results: i=1; server2.sourceware.org DKIM-Filter: OpenDKIM Filter v2.11.0 sourceware.org 88106385772F Received: by simark.ca (Postfix, from userid 112) id 165981E11E; Mon, 5 May 2025 16:15:58 -0400 (EDT) Received: from smarchi-efficios.internal.efficios.com (96-127-217-162.qc.cable.ebox.net [96.127.217.162]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature ECDSA (prime256v1) server-digest SHA256) (No client certificate requested) by simark.ca (Postfix) with ESMTPSA id 78EE61E11B; Mon, 5 May 2025 16:15:49 -0400 (EDT) From: Simon Marchi To: gdb-patches@sourceware.org Cc: Simon Marchi Subject: [PATCH 5/6] gdbsupport: add async parallel_for_each version Date: Mon, 5 May 2025 16:15:29 -0400 Message-ID: <20250505201548.184917-5-simon.marchi@efficios.com> X-Mailer: git-send-email 2.49.0 In-Reply-To: <20250505201548.184917-1-simon.marchi@efficios.com> References: <20250505201548.184917-1-simon.marchi@efficios.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-BeenThere: gdb-patches@sourceware.org X-Mailman-Version: 2.1.30 Precedence: list List-Id: Gdb-patches mailing list List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: gdb-patches-bounces~public-inbox=simark.ca@sourceware.org From: Simon Marchi I would like to use gdb::parallel_for_each to implement the parallelism of the DWARF unit indexing. However, the existing implementation of gdb::parallel_for_each is blocking, which doesn't work with the model used by the DWARF indexer, which is asynchronous and callback-based. Add an asynchronouys version of gdb::parallel_for_each that will be suitable for this task. This new version accepts a callback that is invoked when the parallel for each is complete. This function uses the same strategy as gdb::task_group to invoke the "done" callback: worker threads have a shared_ptr reference to some object. The last worker thread to drop its reference causes the object to be deleted, which invokes the callback. Unlike for the sync version of gdb::parallel_for_each, it's not possible to keep any state in the calling thread's stack, because that disappears immediately after starting the workers. So all the state is kept in that same shared object. There is a limitation that the sync version doesn't have, regarding the arguments you can pass to the worker objects: it's not possibly to rely on references. There are more details in a comment in the code. It would be possible to implement the sync version of gdb::parallel_for_each on top of the async version, but I decided not to do it to avoid the unnecessary dynamic allocation of the shared object, and to avoid adding the limitations on passing references I mentioned just above. But if we judge that it would be an acceptable cost to avoid the duplication, we could do it. Add a self test for the new function. Change-Id: I6173defb1e09856d137c1aa05ad51cbf521ea0b0 --- gdb/unittests/parallel-for-selftests.c | 23 ++++ gdbsupport/parallel-for.h | 141 ++++++++++++++++++++++++- 2 files changed, 159 insertions(+), 5 deletions(-) diff --git a/gdb/unittests/parallel-for-selftests.c b/gdb/unittests/parallel-for-selftests.c index 6b6b557ba99e..2f9313e32858 100644 --- a/gdb/unittests/parallel-for-selftests.c +++ b/gdb/unittests/parallel-for-selftests.c @@ -109,8 +109,31 @@ test_parallel_for_each () const std::vector for_each_functions { + /* Test gdb::parallel_for_each. */ [] (int start, int end, foreach_callback_t callback) { gdb::parallel_for_each<1, int, test_worker> (start, end, callback); }, + + /* Test gdb::parallel_for_each_async. */ + [] (int start, int end, foreach_callback_t callback) + { + bool done_flag = false; + std::condition_variable cv; + std::mutex mtx; + + gdb::parallel_for_each_async<1, int, test_worker> (start, end, + [&mtx, &done_flag, &cv] () + { + std::lock_guard lock (mtx); + done_flag = true; + cv.notify_one(); + }, callback); + + /* Wait for the async parallel-for to complete. */ + std::unique_lock lock (mtx); + cv.wait (lock, [&done_flag] () { return done_flag; }); + }, + + /* Test gdb::sequential_for_each. */ [] (int start, int end, foreach_callback_t callback) { gdb::sequential_for_each (start, end, callback); }, }; diff --git a/gdbsupport/parallel-for.h b/gdbsupport/parallel-for.h index c5a15d3235a7..090eeaaec672 100644 --- a/gdbsupport/parallel-for.h +++ b/gdbsupport/parallel-for.h @@ -25,6 +25,10 @@ #include "gdbsupport/thread-pool.h" #include "gdbsupport/work-queue.h" +/* If enabled, print debug info about the inner workings of the parallel for + each functions. */ +const bool parallel_for_each_debug = false; + namespace gdb { @@ -37,7 +41,10 @@ namespace gdb thread state. Worker threads call Worker::operator() repeatedly until the queue is - empty. */ + empty. + + This function is synchronous, meaning that it blocks and returns once the + processing is complete. */ template @@ -45,10 +52,6 @@ void parallel_for_each (const RandomIt first, const RandomIt last, WorkerArgs &&...worker_args) { - /* If enabled, print debug info about how the work is distributed across - the threads. */ - const bool parallel_for_each_debug = false; - gdb_assert (first <= last); if (parallel_for_each_debug) @@ -116,6 +119,134 @@ sequential_for_each (RandomIt first, RandomIt last, WorkerArgs &&...worker_args) Worker (std::forward (worker_args)...) (first, last); } +namespace detail +{ + +/* Type to hold the state shared between threads of + gdb::parallel_for_each_async. */ + +template +struct pfea_state +{ + pfea_state (RandomIt first, RandomIt last, std::function &&done, + WorkerArgs &&...worker_args) + : first (first), + last (last), + worker_args_tuple (std::forward_as_tuple + (std::forward (worker_args))...), + queue (first, last), + m_done (std::move (done)) + {} + + DISABLE_COPY_AND_ASSIGN (pfea_state); + + /* This gets called by the last worker thread that drops its reference on + the shared state, thus when the processing is complete. */ + ~pfea_state () + { + if (m_done) + m_done (); + } + + /* The interval to process. */ + const RandomIt first, last; + + /* Tuple of arguments to pass when constructing the user's worker object. + + Use std::decay_t to avoid storing references to the caller's local + variables. If we didn't use it and the caller passed an lvalue `foo *`, + we would store it as a reference to `foo *`, thus storing a reference to + the caller's local variable. + + The downside is that it's not possible to pass arguments by reference, + callers need to pass pointers or std::reference_wrappers. */ + std::tuple...> worker_args_tuple; + + /* Work queue that worker threads pull work items from. */ + work_queue queue; + +private: + /* Callable called when the parallel-for is done. */ + std::function m_done; +}; + +} /* namespace detail */ + +/* A "parallel-for" implementation using a shared work queue. Work items get + popped in batches from the queue and handed out to worker threads. + + Batch sizes are proportional to the number of remaining items in the queue, + but always greater or equal to MIN_BATCH_SIZE. + + The DONE callback is invoked when processing is done. + + Each worker thread instantiates an object of type Worker, forwarding ARGS to + its constructor. The Worker object can be used to keep some per-worker + thread state. This version does not support passing references as arguments + to the worker. Use std::reference_wrapper or pointers instead. + + Worker threads call Worker::operator() repeatedly until the queue is + empty. + + This function is asynchronous. An arbitrary worker thread will call the DONE + callback when processing is done. */ + +template +void +parallel_for_each_async (const RandomIt first, const RandomIt last, + std::function &&done, + WorkerArgs &&...worker_args) +{ + gdb_assert (first <= last); + + if (parallel_for_each_debug) + { + debug_printf ("Parallel for: n elements: %zu\n", + static_cast (last - first)); + debug_printf ("Parallel for: min batch size: %zu\n", min_batch_size); + } + + const size_t n_worker_threads + = std::max (thread_pool::g_thread_pool->thread_count (), 1); + + /* The state shared between all worker threads. All worker threads get a + reference on the shared pointer through the lambda below. The last worker + thread to drop its reference will cause this object to be destroyed, which + will call the DONE callback. */ + using state_t = detail::pfea_state; + auto state + = std::make_shared (first, last, std::move (done), + std::forward (worker_args)...); + + /* The worker thread task. */ + auto task = [state] () + { + /* Instantiate the user-defined worker. */ + auto worker = std::make_from_tuple (state->worker_args_tuple); + + for (;;) + { + const auto [batch_first, batch_last] = state->queue.pop_batch (); + + if (batch_first == batch_last) + break; + + if (parallel_for_each_debug) + debug_printf ("Processing %zu items, range [%zu, %zu[\n", + static_cast (batch_last - batch_first), + static_cast (batch_first - state->first), + static_cast (batch_last - state->first)); + + worker (batch_first, batch_last); + } + }; + + /* Start N_WORKER_THREADS tasks. */ + for (int i = 0; i < n_worker_threads; ++i) + gdb::thread_pool::g_thread_pool->post_task (task); +} + } #endif /* GDBSUPPORT_PARALLEL_FOR_H */ -- 2.49.0