// This file is part of Eigen, a lightweight C++ template library // for linear algebra. // // Copyright (C) 2023 Charlie Schlosser // // This Source Code Form is subject to the terms of the Mozilla // Public License v. 2.0. If a copy of the MPL was not distributed // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. #ifndef EIGEN_CORE_THREAD_POOL_DEVICE_H #define EIGEN_CORE_THREAD_POOL_DEVICE_H namespace Eigen { // CoreThreadPoolDevice provides an easy-to-understand Device for parallelizing Eigen Core expressions with // Threadpool. Expressions are recursively split evenly until the evaluation cost is less than the threshold for // delegating the task to a thread. /* a / \ / \ / \ / \ / \ / \ / \ a e / \ / \ / \ / \ / \ / \ a c e g / \ / \ / \ / \ / \ / \ / \ / \ a b c d e f g h */ // Each task descends the binary tree to the left, delegates the right task to a new thread, and continues to the // left. This ensures that work is evenly distributed to the thread pool as quickly as possible and minimizes the number // of tasks created during the evaluation. Consider an expression that is divided into 8 chunks. The // primary task 'a' creates tasks 'e' 'c' and 'b', and executes its portion of the expression at the bottom of the // tree. Likewise, task 'e' creates tasks 'g' and 'f', and executes its portion of the expression. struct CoreThreadPoolDevice { using Task = std::function; EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoreThreadPoolDevice(ThreadPool& pool, float threadCostThreshold = 3e-5f) : m_pool(pool) { eigen_assert(threadCostThreshold >= 0.0f && "threadCostThreshold must be non-negative"); m_costFactor = threadCostThreshold; } template EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int calculateLevels(Index size, float cost) const { eigen_assert(cost >= 0.0f && "cost must be non-negative"); Index numOps = size / PacketSize; int actualThreads = numOps < m_pool.NumThreads() ? static_cast(numOps) : m_pool.NumThreads(); float totalCost = static_cast(numOps) * cost; float idealThreads = totalCost * m_costFactor; if (idealThreads < static_cast(actualThreads)) { idealThreads = numext::maxi(idealThreads, 1.0f); actualThreads = numext::mini(actualThreads, static_cast(idealThreads)); } int maxLevel = internal::log2_ceil(actualThreads); return maxLevel; } // MSVC does not like inlining parallelForImpl #if EIGEN_COMP_MSVC && !EIGEN_COMP_CLANG #define EIGEN_PARALLEL_FOR_INLINE #else #define EIGEN_PARALLEL_FOR_INLINE EIGEN_STRONG_INLINE #endif template EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index begin, Index end, UnaryFunctor& f, Barrier& barrier, int level) { while (level > 0) { level--; Index size = end - begin; eigen_assert(size % PacketSize == 0 && "this function assumes size is a multiple of PacketSize"); Index mid = begin + numext::round_down(size >> 1, PacketSize); Task right = [this, mid, end, &f, &barrier, level]() { parallelForImpl(mid, end, f, barrier, level); }; m_pool.Schedule(std::move(right)); end = mid; } for (Index i = begin; i < end; i += PacketSize) f(i); barrier.Notify(); } template EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index outerBegin, Index outerEnd, Index innerBegin, Index innerEnd, BinaryFunctor& f, Barrier& barrier, int level) { while (level > 0) { level--; Index outerSize = outerEnd - outerBegin; if (outerSize > 1) { Index outerMid = outerBegin + (outerSize >> 1); Task right = [this, &f, &barrier, outerMid, outerEnd, innerBegin, innerEnd, level]() { parallelForImpl(outerMid, outerEnd, innerBegin, innerEnd, f, barrier, level); }; m_pool.Schedule(std::move(right)); outerEnd = outerMid; } else { Index innerSize = innerEnd - innerBegin; eigen_assert(innerSize % PacketSize == 0 && "this function assumes innerSize is a multiple of PacketSize"); Index innerMid = innerBegin + numext::round_down(innerSize >> 1, PacketSize); Task right = [this, &f, &barrier, outerBegin, outerEnd, innerMid, innerEnd, level]() { parallelForImpl(outerBegin, outerEnd, innerMid, innerEnd, f, barrier, level); }; m_pool.Schedule(std::move(right)); innerEnd = innerMid; } } for (Index outer = outerBegin; outer < outerEnd; outer++) for (Index inner = innerBegin; inner < innerEnd; inner += PacketSize) f(outer, inner); barrier.Notify(); } #undef EIGEN_PARALLEL_FOR_INLINE template EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index begin, Index end, UnaryFunctor& f, float cost) { Index size = end - begin; int maxLevel = calculateLevels(size, cost); Barrier barrier(1 << maxLevel); parallelForImpl(begin, end, f, barrier, maxLevel); barrier.Wait(); } template EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index outerBegin, Index outerEnd, Index innerBegin, Index innerEnd, BinaryFunctor& f, float cost) { Index outerSize = outerEnd - outerBegin; Index innerSize = innerEnd - innerBegin; Index size = outerSize * innerSize; int maxLevel = calculateLevels(size, cost); Barrier barrier(1 << maxLevel); parallelForImpl(outerBegin, outerEnd, innerBegin, innerEnd, f, barrier, maxLevel); barrier.Wait(); } ThreadPool& m_pool; // costFactor is the cost of delegating a task to a thread // the inverse is used to avoid a floating point division float m_costFactor; }; // specialization of coefficient-wise assignment loops for CoreThreadPoolDevice namespace internal { #ifdef EIGEN_PARSED_BY_DOXYGEN struct Kernel; #endif template struct cost_helper { using SrcEvaluatorType = typename Kernel::SrcEvaluatorType; using DstEvaluatorType = typename Kernel::DstEvaluatorType; using SrcXprType = typename SrcEvaluatorType::XprType; using DstXprType = typename DstEvaluatorType::XprType; static constexpr Index Cost = functor_cost::Cost + functor_cost::Cost; }; template struct dense_assignment_loop_with_device { static constexpr Index XprEvaluationCost = cost_helper::Cost; struct AssignmentFunctor : public Kernel { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {} EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) { this->assignCoeffByOuterInner(outer, inner); } }; static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) { const Index innerSize = kernel.innerSize(); const Index outerSize = kernel.outerSize(); constexpr float cost = static_cast(XprEvaluationCost); AssignmentFunctor functor(kernel); device.template parallelFor(0, outerSize, 0, innerSize, functor, cost); } }; template struct dense_assignment_loop_with_device { using DstXprType = typename Kernel::DstEvaluatorType::XprType; static constexpr Index XprEvaluationCost = cost_helper::Cost, InnerSize = DstXprType::InnerSizeAtCompileTime; struct AssignmentFunctor : public Kernel { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {} EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) { copy_using_evaluator_DefaultTraversal_InnerUnrolling::run(*this, outer); } }; static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) { const Index outerSize = kernel.outerSize(); AssignmentFunctor functor(kernel); constexpr float cost = static_cast(XprEvaluationCost) * static_cast(InnerSize); device.template parallelFor(0, outerSize, functor, cost); } }; template struct dense_assignment_loop_with_device { using PacketType = typename Kernel::PacketType; static constexpr Index XprEvaluationCost = cost_helper::Cost, PacketSize = unpacket_traits::size, SrcAlignment = Kernel::AssignmentTraits::SrcAlignment, DstAlignment = Kernel::AssignmentTraits::DstAlignment; struct AssignmentFunctor : public Kernel { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {} EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) { this->template assignPacketByOuterInner(outer, inner); } }; static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) { const Index innerSize = kernel.innerSize(); const Index outerSize = kernel.outerSize(); const float cost = static_cast(XprEvaluationCost) * static_cast(innerSize); AssignmentFunctor functor(kernel); device.template parallelFor(0, outerSize, 0, innerSize, functor, cost); } }; template struct dense_assignment_loop_with_device { using PacketType = typename Kernel::PacketType; using DstXprType = typename Kernel::DstEvaluatorType::XprType; static constexpr Index XprEvaluationCost = cost_helper::Cost, PacketSize = unpacket_traits::size, SrcAlignment = Kernel::AssignmentTraits::SrcAlignment, DstAlignment = Kernel::AssignmentTraits::DstAlignment, InnerSize = DstXprType::InnerSizeAtCompileTime; struct AssignmentFunctor : public Kernel { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {} EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) { copy_using_evaluator_innervec_InnerUnrolling::run(*this, outer); } }; static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) { const Index outerSize = kernel.outerSize(); constexpr float cost = static_cast(XprEvaluationCost) * static_cast(InnerSize); AssignmentFunctor functor(kernel); device.template parallelFor(0, outerSize, functor, cost); } }; template struct dense_assignment_loop_with_device { using Scalar = typename Kernel::Scalar; using PacketType = typename Kernel::PacketType; static constexpr Index XprEvaluationCost = cost_helper::Cost, PacketSize = unpacket_traits::size; struct PacketAssignmentFunctor : public Kernel { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE PacketAssignmentFunctor(Kernel& kernel) : Kernel(kernel) {} EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) { this->template assignPacketByOuterInner(outer, inner); } }; struct ScalarAssignmentFunctor : public Kernel { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE ScalarAssignmentFunctor(Kernel& kernel) : Kernel(kernel) {} EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) { const Index innerSize = this->innerSize(); const Index packetAccessSize = numext::round_down(innerSize, PacketSize); for (Index inner = packetAccessSize; inner < innerSize; inner++) this->assignCoeffByOuterInner(outer, inner); } }; static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) { const Index outerSize = kernel.outerSize(); const Index innerSize = kernel.innerSize(); const Index packetAccessSize = numext::round_down(innerSize, PacketSize); constexpr float packetCost = static_cast(XprEvaluationCost); const float scalarCost = static_cast(XprEvaluationCost) * static_cast(innerSize - packetAccessSize); PacketAssignmentFunctor packetFunctor(kernel); ScalarAssignmentFunctor scalarFunctor(kernel); device.template parallelFor(0, outerSize, 0, packetAccessSize, packetFunctor, packetCost); device.template parallelFor(0, outerSize, scalarFunctor, scalarCost); }; }; template struct dense_assignment_loop_with_device { static constexpr Index XprEvaluationCost = cost_helper::Cost; struct AssignmentFunctor : public Kernel { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {} EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index) { this->assignCoeff(index); } }; static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) { const Index size = kernel.size(); constexpr float cost = static_cast(XprEvaluationCost); AssignmentFunctor functor(kernel); device.template parallelFor(0, size, functor, cost); } }; template struct dense_assignment_loop_with_device { using Scalar = typename Kernel::Scalar; using PacketType = typename Kernel::PacketType; static constexpr Index XprEvaluationCost = cost_helper::Cost, RequestedAlignment = Kernel::AssignmentTraits::LinearRequiredAlignment, PacketSize = unpacket_traits::size, DstIsAligned = Kernel::AssignmentTraits::DstAlignment >= RequestedAlignment, DstAlignment = packet_traits::AlignedOnScalar ? RequestedAlignment : Kernel::AssignmentTraits::DstAlignment, SrcAlignment = Kernel::AssignmentTraits::JointAlignment; struct AssignmentFunctor : public Kernel { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {} EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index) { this->template assignPacket(index); } }; static constexpr bool UsePacketSegment = Kernel::AssignmentTraits::UsePacketSegment; using head_loop = unaligned_dense_assignment_loop; using tail_loop = unaligned_dense_assignment_loop; static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) { const Index size = kernel.size(); const Index alignedStart = DstIsAligned ? 0 : internal::first_aligned(kernel.dstDataPtr(), size); const Index alignedEnd = alignedStart + numext::round_down(size - alignedStart, PacketSize); head_loop::run(kernel, 0, alignedStart); constexpr float cost = static_cast(XprEvaluationCost); AssignmentFunctor functor(kernel); device.template parallelFor(alignedStart, alignedEnd, functor, cost); tail_loop::run(kernel, alignedEnd, size); } }; } // namespace internal } // namespace Eigen #endif // EIGEN_CORE_THREAD_POOL_DEVICE_H