scalabook
Параллелизм
Параллелизм с помощью чистых функций.
Рассмотрим операцию суммирования чисел:
def sum(ints: Seq[Int]): Int = ints.foldLeft(0)((a, b) => a + b)
Здесь используется последовательное сворачивание, когда мы проходим по всей коллекции элемент за элементом, каждый раз складывая текущий элемент к итоговому результату.
Вместо последовательного сворачивания можно было бы использовать алгоритм "разделяй и властвуй":
def sum(ints: IndexedSeq[Int]): Int = if ints.size <= 1 then ints.headOption.getOrElse(0) else val (l, r) = ints.splitAt(ints.size / 2) sum(l) + sum(r)
Последовательность делится пополам с помощью функции splitAt
, рекурсивно суммируются обе половины,
а затем результаты объединяются.
И в отличие от реализации на основе foldLeft
,
реализацию "разделяй и властвуй" можно распараллелить — две половины можно суммировать параллельно.
Тип данных для параллельных вычислений
Для параллельных вычислений с помощью чистых функций можно определить следующий API:
def unit[A](a: A): Par[A]def fork[A](a: => Par[A]): Par[A]def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
extension [A](pa: Par[A]) def run: A def map2[B, C](pb: Par[B])(f: (A, B) => C): Par[C]
unit
"поднимает" константу до параллельных вычисленийfork
помечает для параллельного вычисления. На самом деле вычисление не произойдет, пока не будет принудительно запущена с помощьюrun
.lazyUnit
заключает свой невычисленный аргумент вPar
и помечает его для параллельного вычисления.run
извлекает значение изPar
, фактически выполняя вычислениеmap2
объединяет результаты двух параллельных вычислений с помощью бинарной функции
Тогда сумму можно реализовать так:
def sum(ints: IndexedSeq[Int]): Par[Int] = if ints.size <= 1 then Par.unit(ints.headOption.getOrElse(0)) else val (l, r) = ints.splitAt(ints.size / 2) Par.map2(sum(l), sum(r))(_ + _)
При этом для параллельных вычислений должен выполняться следующий закон:
unit(x).map(f) == unit(f(x))
Ссылки: