scalabook

Форк
0
/
concurrency.md 
335 строк · 17.5 Кб

Параллелизм

Для написания параллельных приложений на Scala, можно использовать нативный поток Java, но Scala Future предлагает более высокий уровень и идиоматический подход.

Введение

Вот описание Scala Future

из его Scaladoc:

"Future

представляет собой значение, которое может быть или не быть доступным в настоящее время, но будет доступно в какой-то момент, или исключение, если это значение не может быть сделано доступным".

Чтобы продемонстрировать, что это значит, сначала рассмотрим однопоточное программирование. В однопоточном мире результат вызова метода привязывается к переменной следующим образом:

def aShortRunningTask(): Int = 42
val x = aShortRunningTask()

В этом коде значение 42

сразу привязывается к x
.

При работе с Future

процесс назначения выглядит примерно так:

def aLongRunningTask(): Future[Int] = ???
val x = aLongRunningTask()

Но главное отличие в этом случае заключается в том, что, поскольку aLongRunningTask

возвращает неопределенное время, значение x
может быть доступно или недоступно в данный момент, но оно будет доступно в какой-то момент — в будущем.

Другой способ взглянуть на это с точки зрения блокировки. В этом однопоточном примере оператор println

не печатается до тех пор, пока не завершится выполнение aShortRunningTask
:

def aShortRunningTask(): Int =
Thread.sleep(500)
42
val x = aShortRunningTask()
println("Here")

И наоборот, если aShortRunningTask

создается как Future
, оператор println
печатается почти сразу, потому что aShortRunningTask
порождается в другом потоке — он не блокируется.

В этой главе будет рассказано, как использовать Future

, в том числе как запускать несколько Future
параллельно и объединять их результаты в выражении for
. Также будут показаны примеры методов, которые используются для обработки значения Future
после его возврата.

О Future

, важно знать, что они задуманы как одноразовая конструкция "Обработайте это относительно медленное вычисление в каком-нибудь другом потоке и верните мне результат, когда закончите". В отличие от этого, акторы Akka предназначены для работы в течение длительного времени и отвечают на множество запросов в течение своей жизни. В то время как субъект может жить вечно, Future
в конечном итоге содержит результат вычисления, которое выполнялось только один раз.

Пример в REPL

Future

используется для создания временного кармана параллелизма. Например, можно использовать Future
, когда нужно вызвать алгоритм, который выполняется неопределенное количество времени — например, вызов удаленного микросервиса, — поэтому его желательно запустить вне основного потока.

Чтобы продемонстрировать, как это работает, начнем с примера Future

в REPL. Во-первых, вставим необходимые инструкции импорта:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

Теперь можно создать Future

. Для этого примера сначала определим долговременный однопоточный алгоритм:

def longRunningAlgorithm() =
Thread.sleep(10_000)
42

Этот причудливый алгоритм возвращает целочисленное значение 42

после десятисекундной задержки. Теперь вызовем этот алгоритм, поместив его в конструктор Future
и присвоив результат переменной:

scala> val eventualInt = Future(longRunningAlgorithm())
eventualInt: scala.concurrent.Future[Int] = Future(<not completed>)

Вычисления начинают выполняться после вызова longRunningAlgorithm()

. Если сразу проверить значение переменной eventualInt
, то можно увидеть, что Future
еще не завершен:

scala> eventualInt
val res1: scala.concurrent.Future[Int] = Future(<not completed>)

Но если проверить через десять секунд еще раз, то можно увидеть, что оно выполнено успешно:

scala> eventualInt
val res2: scala.concurrent.Future[Int] = Future(Success(42))

Хотя это относительно простой пример, он демонстрирует основной подход: просто создайте новое Future

с помощью своего долговременного алгоритма.

Одна вещь, на которую следует обратить внимание - это то, что ожидаемый результат 42

обернут в Success
, который обернут в Future
. Это ключевая концепция для понимания: значение Future
всегда является экземпляром одного из scala.util.Try
: Success
или Failure
. Поэтому, при работе с результатом Future
, используются обычные методы обработки Try
.

Использование map с Future

Future

имеет метод map
, который используется точно так же, как метод map
для коллекций. Вот как выглядит результат, при вызове map
сразу после создания переменной eventualInt
:

scala> val a = eventualInt.map(_ * 2)
a: scala.concurrent.Future[Int] = Future(<not completed>)

Как показано, для Future

, созданного с помощью longRunningAlgorithm
, первоначальный вывод показывает Future(<not completed>)
. Но если проверить значение a
через десять секунд, то можно увидеть, что оно содержит ожидаемый результат 84
:

scala> a
res1: scala.concurrent.Future[Int] = Future(Success(84))

Еще раз, успешный результат обернут внутри Success

и Future
.

Использование методов обратного вызова с Future

В дополнение к функциям высшего порядка, таким как map

, с Future
также можно использовать методы обратного вызова. Одним из часто используемых методов обратного вызова является onComplete
, принимающий частичную функцию, в которой обрабатываются случаи Success
и Failure
:

eventualInt.onComplete {
case Success(value) => println(s"Got the callback, value = $value")
case Failure(e) => e.printStackTrace
}

Если вставить этот код в REPL, то в конечном итоге придет результат:

Got the callback, value = 42

Другие методы Future

Класс Future

имеет некоторые методы, которые можно найти в классах коллекций Scala, в том числе:

  • filter
  • flatMap
  • map

Методы обратного вызова:

  • onComplete
  • andThen
  • foreach

Другие методы трансформации:

  • fallbackTo
  • recover
  • recoverWith

См. страницу "Futures and Promises" для обсуждения дополнительных методов, доступных для Future

.

Запуск нескольких Future и объединение результатов

Чтобы запустить несколько вычислений параллельно и соединить их результаты после завершения всех Future

, можно использовать выражение for
.

Правильный подход такой:

  1. Запустить вычисления, которые возвращают Future
    результаты
  2. Объединить их результаты в выражении for
  3. Извлечь объединенный результат, используя onComplete
    или аналогичный метод

Пример

Рассмотрим следующий пример. Ключевой момент - сначала запускаются вычисления, возвращающие Future

, а затем они объединяются в выражении for
:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
val startTime = System.currentTimeMillis()
def delta() = System.currentTimeMillis() - startTime
def sleep(millis: Long) = Thread.sleep(millis)
@main def multipleFutures1 =
println(s"creating the futures: ${delta()}")
// (1) запуск вычислений, возвращающих Future
val f1 = Future { sleep(800); 1 } // в конце концов возвращается 1
val f2 = Future { sleep(200); 2 } // в конце концов возвращается 2
val f3 = Future { sleep(400); 3 } // в конце концов возвращается 3
// (2) объединение нескольких Future в выражении `for`
val result =
for
r1 <- f1
r2 <- f2
r3 <- f3
yield
println(s"in the 'yield': ${delta()}")
(r1 + r2 + r3)
// (3) обработка результата
result.onComplete {
case Success(x) =>
println(s"in the Success case: ${delta()}")
println(s"result = $x")
case Failure(e) =>
e.printStackTrace
}
println(s"before the 'sleep(3000)': ${delta()}")
// важно для небольшой параллельной демонстрации: не глушить jvm
sleep(3000)

После запуска этого приложения, вывод выглядит следующим образом:

creating the futures: 1
before the 'sleep(3000)': 2
in the 'yield': 806
in the Success case: 806
result = 6

Как показывает вывод, Future

создаются очень быстро, и всего за две миллисекунды достигается оператор печати непосредственно перед операцией sleep(3000)
в конце метода. Весь этот код выполняется в основном потоке JVM. Затем, через 806 мс, три Future
завершаются, и выполняется код в блоке yield
. Затем код немедленно переходит к успешному завершению в методе onComplete
.

Вывод 806 мс является ключом к тому, чтобы убедиться, что три вычисления выполняются параллельно. Если бы они выполнялись последовательно, общее время составило бы около 1400 мс — сумма времени ожидания трех вычислений. Но поскольку они выполняются параллельно, общее время чуть больше, чем у самого продолжительного вычисления f1

, которое составляет 800 мс.

Обратите внимание, что если бы вычисления выполнялись в выражении for

, они выполнялись бы последовательно, а не параллельно:

// последовательное выполнение (не параллельно!)
for
r1 <- Future { sleep(800); 1 }
r2 <- Future { sleep(200); 2 }
r3 <- Future { sleep(400); 3 }
yield
r1 + r2 + r3

Итак, если необходимо, чтобы вычисления выполнялись параллельно, не забудьте запустить их вне выражения for

.

Метод, возвращающий Future

Было показано, как передавать однопоточный алгоритм в конструктор Future

. Ту же технику можно использовать для создания метода, который возвращает Future
:

// моделируем медленно работающий метод
def slowlyDouble(x: Int, delay: Long): Future[Int] = Future {
sleep(delay)
x * 2
}

Как и в предыдущих примерах, достаточно просто присвоить результат вызова метода новой переменной. Тогда, если сразу проверить результат, то можно увидеть, что он не завершен, но по истечении времени задержки в Future

результат будет выдан:

scala> val f = slowlyDouble(2, 5_000L)
val f: concurrent.Future[Int] = Future(<not completed>)
scala> f
val res0: concurrent.Future[Int] = Future(<not completed>)
scala> f
val res1: concurrent.Future[Int] = Future(Success(4))

Ключевые моменты о Future

Подводя итог, несколько ключевых моментов о Future

:

  • Future
    создается для запуска задач вне основного потока
  • Future
    предназначены для одноразовых, потенциально длительных параллельных задач, которые в конечном итоге возвращают значение; они создают временный карман параллелизма
  • Future
    начинает работать в момент построения
  • преимущество Future
    над потоками заключается в том, что они работают с выражениями for
    и имеют множество методов обратного вызова, упрощающих процесс работы с параллельными потоками
  • при работе с Future
    не нужно беспокоиться о низкоуровневых деталях управления потоками
  • результат Future
    обрабатывается с помощью методов обратного вызова, таких как onComplete
    и andThen
    , или методов преобразования, таких как filter
    , map
    и т.д.
  • значение внутри Future
    всегда является экземпляром одного из типов Try
    : Success
    или Failure
  • при использовании нескольких Future
    для получения одного результата, они объединяются в выражении for

Кроме того, как было видно по операторам import, Scala Future

зависит от ExecutionContext
.

Дополнительные сведения о Future

см. в статье Futures and Promises, в которой обсуждаются futures, promises и execution contexts. В ней также обсуждается, как выражение for
транслируется в операцию flatMap
.


Ссылки:

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.