Оглавление:

1. Введение в PySpark.

2. Работа с RDD в PySpark.

3. .


Введение в PySpark.

В состав дистрибутива Spark входят интерактивные командные оболочки (shell), позволяющие выполнять специализированные виды анализа. В отличие от большинства других оболочек, позволяющих манипулировать данными на диске и в памяти единственного компьютера, оболочки Spark дают возможность оперировать данными, распределенными по нескольким компьютерам, при этом все сложности, связанные с распределенным доступом, берет на себя Spark.

Так как Spark может загружать данные в память на множестве рабочих узлов, многие распределенные вычисления, даже на массивах данных, занимающих терабайты, распределенных между десятками компьютеров, выполняются всего несколько секунд. Это делает командную оболочку Spark вполне пригодной для исследования данных в интерактивном режиме. Spark предоставляет оболочки для обоих языков, Python и Scala , которые с успехом могут использоваться в кластерах.

Запускать командную оболочку PySpark Shell мы уже научились ЗДЕСЬ.

Не всегда удобно использовать встроенную командную оболочку, поэтому я буду использовать Zeppeline, как настроить интерпретатор описано ЗДЕСЬ.

Примеры в уроках представлены для распределенного кластера с HADOOP, но нет никаких проблем (даже проще) в учебных целях изучать на локальной машине.

В общем случае любое приложение на основе Spark состоит из программы-драйвера (driver program), который запускает различные параллельные операции в кластере. Драйвер содержит функцию main приложения и определяет распределенные наборы данных, а затем применяет к ним различные операции.

Драйвер обращается к Spark посредством объекта SparkContext, представляющего соединение с вычислительным кластером. Командная оболочка Spark автоматически создает объект SparkContext в виде переменной с именем sc. Zeppeline тоже автоматически создает свой контекст, поэтому этой теме не будем уделять особое внимание.

Для выполнения операций драйверы обычно используют несколько узлов (nodes), которые называют исполнителями (executors). Например, если бы некоторая операция с файлом выполнялась в кластере, разные машины могли бы выполнять ее в разных фрагментах файла, так как HDFS - распределенная файловая система. Но как увидим дальше, раскидать по кластеру можно данные из любых источников.

Следует отметить, что в случае использования зависимостей, они должны быть установлены на всех нодах кластера, и версии Python должны совпадать.


Работа с RDD в PySpark.

Набор RDD (Resilient Distributed Datasets) или устойчивый распределенный набор данных – это распределенная коллекция элементов (аналог списка в Python). Собственно, вся работа Spark заключается в создании новых, преобразовании существующих или выполнении операций с наборами RDD. За кулисами Spark автоматически распределяет данные в наборах RDD между компьютерами в кластере и распараллеливает выполнение операций над ними.

Каждый набор RDD делится на множество частей, которые могут обрабатываться разными узлами в кластере. Наборы RDD могут содержать объекты любого типа на Python, Java или Scala, включая экземпляры пользовательских классов.

В общем случае любое приложение на основе Spark состоит из программы-драйвера (driver program), который запускает различные параллельные операции в кластере. Драйвер содержит функцию main приложения и определяет распределенные наборы данных, а затем применяет к ним различные операции. В предыдущих примерах роль драйвера выполняет сама командная оболочка Spark, благодаря чему можно просто вводить желаемые операции. Драйвер обращается к Spark посредством объекта SparkContext, представляющего соединение с вычислительным кластером. Командная оболочка Spark автоматически создает объект SparkContext в виде переменной с именем sc.

Имея объект SparkContext, можно создавать наборы RDD.

После создания набора можно приступать к выполнению разнообразных операций со строками, таких как count().

Для выполнения этих операций драйверы обычно используют несколько узлов (nodes), которые называют исполнителями (executors). Например, если бы операция count() выполнялась в кластере, разные машины могли бы выполнять подсчет строк в разных фрагментах файла.

Закончим с теорией, приступим к практике.

Так как я использую Zeppelin, вопрос создания SparkContext его интерпретатор берет на себя, он уже существует. Однако для подстраховки можно использовать следующую функцию библиотеки SparkContext в Python:

sc = SparkContext.getOrCreate()

Таким образом мы либо создаем, либо используем уже существующий контекст.

Создать RDD можно двумя способами: 1. распараллеливая локальный набор (parallelize), 2. либо считывая уже распределенный набор (textFile).

1.

from pyspark import SparkContext

f = open("/usr/local/spark-2.4.4/logs/log.txt")

sc = SparkContext.getOrCreate()

rdd = sc.parallelize(f)



2.

from pyspark import SparkContext

sc = SparkContext.getOrCreate()

rdd = sc.textFile("hdfs://Master:9000//input/log.txt")



Для примера взят произвольный лог-файл Spark, для упрощения он переименован в локальной системе, и потом скопирован в каталог input в HDFS. Считаем дальше в наших примерах, что набор создан, работаем с RDD.

После создания RDD появляется возможность выполнять два вида операций: преобразования (transformations) и действия (actions).

Преобразования создают новые наборы RDD на основе существующих. Примером типичного преобразования может служить фильтрация данных по заданному условию. В следующем примере отфильтруем строки со словом «ERROR» с помощью функции filter.

Действия, напротив, вычисляют результат, не создавая новых наборов RDD, и возвращают его программе-драйверу или сохраняют во внешнем хранилище (например, в HDFS). Примером действия может служить вызов метода count() или collect().

Продолжая пример с log-файлом, можно создать новый набор RDD, хранящий только строки со словом «ERROR». Затем выведем его построчно:

Кратко о том, что здесь происходит:

1. Читаем локальный файл: f = open("/usr/local/spark-2.4.4/logs/log.txt")

2. Создаем или используем контекст: sc = SparkContext.getOrCreate()

3. Создаем распределенный набор RDD: rdd = sc.parallelize(f)

4. Фильтруем (преобразование filter) каждую строку в наборе по слову "ERROR" (о лямбда-функциях (lambda ) гуглим сами): err = rdd.filter(lambda x: "ERROR" in x)

5. Считаем количество строк и выводим (действие count): print (err.count())

6. Собираем весь RDD (collect) и проходи по нему построчно: for line in err.collect():

7. Выводим результат.


Преобразования и действия отличаются способом обработки наборов RDD. Даже при том, что новый набор RDD можно создать в любой момент, Spark откладывает фактическое его создание до момента первого обращения к нему.

Наборы RDD по умолчанию вычисляются фреймворком Spark заново всякий раз, когда выполняется очередное действие. Если предполагается использовать один и тот же набор RDD для выполнения нескольких действий, можно потребовать от Spark сохранить его вызовом метода persist().

После вычисления набора RDD в первый раз Spark сохранит его содержимое в памяти (по частям, на узлах в кластере) и будет использовать при выполнении последующих действий. Имеется также возможность сохранить RDD на диске.

Такое поведение, когда по умолчанию Spark не сохраняет набор, также выглядит необычным, но в этом есть определенный смысл при работе с большими объемами данных: если набор RDD нужен для получения единственного результата и в дальнейшем не будет использоваться, нет смысла напрасно расходовать память.

Комментарии

Комментировать могуть только зарегистрированные пользователи

Перевести страницу (translate page)
Реклама