Apache Spark – это универсальная и высокопроизводительная кластерная вычислительная платформа.
Фреймворк создан для того, чтобы охватить более широкий диапазон рабочих нагрузок, которые прежде требовали создания отдельных распределенных систем, включая приложения пакетной обработки, циклические алгоритмы, интерактивные запросы и потоковую обработку. Поддерживая все эти виды задач с помощью единого механизма, Spark упрощает и удешевляет объединение разных видов обработки, которые часто необходимо выполнять в едином конвейере обработки данных. Кроме того, он уменьшает бремя обслуживания, поддерживая отдельные инструменты.
Apache Spark работает в парадигме резидентных вычислений — обрабатывает данные в оперативной памяти, благодаря чему позволяет получать значительный выигрыш в скорости работы для некоторых классов задач, в частности, возможность многократного доступа к загруженным в память пользовательским данным делает библиотеку привлекательной для алгоритмов машинного обучения.
Проект предоставляет программные интерфейсы для языков Java, Scala, Python, R. Изначально написан на Scala, впоследствии добавлена существенная часть кода на Java для предоставления возможности написания программ непосредственно на Java.
В этом документе мы будем использовать Python.
Состоит из ядра и нескольких расширений, таких как:
Spark SQL - позволяет выполнять SQL-запросы над данными
Spark Streaming - надстройка для обработки потоковых данных
Spark MLlib - набор библиотек машинного обучения
GraphX - предназначено для распределённой обработки графов.
Может работать как в среде кластера Hadoop под управлением YARN, так и без компонентов ядра Hadoop, поддерживает несколько распределённых систем хранения — HDFS, OpenStack Swift, NoSQL-СУБД Cassandra, Amazon S3.
В этой статье описана Standalone установка Apache Spark без привязки к Hadoop
Все, Apache Spark готов к работе
Заходим в систему как пользователь spark
Запускаем:
start-all.sh
2. Работа с Рyspark
Запускаем питоновскую командную строку:
pyspark
Мы попали в командный интерпретатор.
Apache Spark работает в архитектуре master-slave, где master называется “Driver”, а slave называются "Worker”. Когда вы запускаете приложение Spark, драйвер Spark создает контекст, который является точкой входа в ваше приложение, и все операции выполняются на рабочих узлах, а ресурсами управляет диспетчер кластеров.
Spark поддерживает следующие менеджеры кластеров:
Standalone – простой менеджер кластеров, входящий в состав Spark.
Это простейший менеджер, который используется в этой статье
Apache Mesos - это менеджер кластеров, который также может запускать приложения Hadoop MapReduce и PySpark.
Hadoop YARN – менеджер ресурсов в Hadoop 2.
Kubernetes – система для автоматизации развертывания, масштабирования и управления контейнерными приложениями.
PySpark RDD (Resilient Distributed Dataset) - это фундаментальная структура данных PySpark, которая является отказоустойчивой, неизменяемой распределенной коллекцией объектов, что означает, что после создания RDD вы не можете ее изменить. Каждый набор данных в RDD разделен на логические разделы, которые могут быть вычислены на разных узлах кластера.
Для создания RDD нужно прежде создать обьект SparkSession, для этого есть 2 метода:
builder()
newSession()
Внутри сессии создается другой обьект - SparkContext.
Обьектов SparkSession можно создавать сколько угодно, но глобальный обьект SparkContext всегда будет один.
Пример создания сессии:
У RDD все-же есть возможность для апдэйта, при этом будет создаваться другой RDD.
Это называется
RDD transformations
Эти преобразования бывают следующих видов:
flatMap()
map()
reduceByKey()
filter()
sortByKey()
Также у RDD есть акции - actions - которые выполняют какие-то вычисления внутри RDD
и возвращают какой-то результат.
Акции бывают следующих видов:
count()
collect()
first()
max()
reduce()
4. Pyspark DataFrame
DataFrame - это распределенный набор данных, организованный в именованные столбцы. Концептуально он эквивалентен таблице в реляционной базе данных или фрейму данных , но с более богатой оптимизацией. Фреймы данных могут быть созданы из широкого спектра источников, таких как файлы структурированных данных, таблицы в Hive, внешние базы данных или существующие RDDS.
pandas DataFrame отличается от Pyspark DataFrame тем, что выполняется на одной ноде.
DataFrame поддерживает следующие форматы файлов:
csv
text
Avro
Parquet
tsv
xml
и другие.
5. Pyspark SQL
PySpark SQL - один из наиболее часто используемых модулей PySpark, который используется для обработки структурированного столбчатого формата данных. После создания фрейма данных вы можете взаимодействовать с данными, используя синтаксис SQL.
Spark SQL предоставляет собственные SQL-запросы , что означает, что вы можете запускать традиционные ANSI SQL в Spark Dataframe, используя select, where, group by, join, union и т. д.
Сначала нужно создать временную таблицу во фрейме данных с помощью функции createOrReplaceTempView(). После создания к этой таблице можно получить доступ во время SparkSession с помощью sql().
df.createOrReplaceTempView("PERSON_DATA")
df2 = spark.sql("SELECT * from PERSON_DATA")
df2.printSchema()
df2.show()
groupDF = spark.sql("SELECT gender, count(*) from PERSON_DATA group by gender")
groupDF.show()
6. Pyspark Streaming
PySpark Streaming - это масштабируемая, высокопроизводительная, отказоустойчивая система потоковой обработки, которая поддерживает как пакетные, так и потоковые рабочие нагрузки. Он используется для обработки данных в реальном времени из таких источников, как файловая система, сокет TCP, Kafka, Twitter, Amazon Kinesis, и это лишь некоторые из них. Обработанные данные могут быть отправлены в базы данных, Kafka, live dashboards и т. д.
Используя readStream.format("socket"), можно читать данные из сокета, задавая в качестве параметров хост, порт:
Возможно также читать данные из Kafka, писать в Kafka в форматах TEXT, CSV, AVRO, JSON:
df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.100:9092")
.option("subscribe", "json_topic")
.option("startingOffsets", "earliest") // From starting
.load()
df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.outputMode("append")
.option("kafka.bootstrap.servers", "192.168.1.100:9092")
.option("topic", "josn_data_topic")
.start()
.awaitTermination()
7. Немного теории графов
На самом абстрактном уровне анализ графов применяется для прогнозирования поведения и предписания действий для динамических групп.
Для этого требуется понимание отношений и структуры внутри группы.
Графовые алгоритмы достигают этого понимания путем изучения общей
природы сетей через их соединения. Благодаря такому подходу вы в
результате сможете понять топологию связанных систем и смоделировать
их процессы.
Вот несколько типов задач, в которых используются графовые алгоритмы:
1. изучение путей распространения заболевания или каскадного
транспортного сбоя;
2. выявление наиболее уязвимых или вредоносных компонентов при
сетевой атаке;
3. определение наименее затратного или самого быстрого способа
маршрутизации информации или ресурсов;
4. предсказание недостающие связей в ваших данных;
5. выявление прямого и косвенного влияния в сложной системе;
6. обнаружение невидимых иерархий и зависимостей;
7. прогнозирование, будут ли группы объединяться или распадаться;
8. поиск перегруженных или недогруженных узлов в сетях;
9. выявление сообщества на основе поведения и создание персональных рекомендаций;
10. уменьшение количества ложных срабатываний при обнаружении мошенничества и аномалий;
11. извлечение дополнительных признаков для машинного обучения.
В классической теории графов термин граф приравнивается к простому
(simple), или строгому (strict), графу, где две вершины связывает только
одно отношение. Однако большинство графов реального мира имеют несколько связей между вершинами и даже
связи, замкнутые на себя:
В следующей таблице приведены ключевые обобщенные признаки графов:
Существуют три популярных области анализа, которые используют графовые алгоритмы:
1. Поиск пути
Пути являются основополагающим понятием в анализе графов и графовых алгоритмов.
Поиск кратчайших путей, является одной из наиболее частых задач, решаемых с помощью графовых алгоритмов,
и является предшественником различных типов анализа. Кратчайший
путь – это маршрут движения с наименьшим количеством переходов или
самым низким суммарным весом ребер. Если граф ориентированный, то
это самый короткий маршрут между двумя узлами с учетом разрешенных
направлений.
2. Поиск веса (важности) узла
Это представление о том, какие узлы важнее в сети.
Существуют различные типы алгоритмов , созданных для измерения разных свойств,
например, таких как способность быстро распространять информацию.
3. Поиск связанности узлов.
Связанность – это основная концепция теории графов, которая позволяет проводить сложный сетевой анализ, например поиск сообществ. Большинство реальных сетей имеют подструктуры (часто квазифрактальные)
из более или менее независимых подграфов.
GraphFrames – это библиотека обработки графов для Spark, сменившая GraphX в 2016 году. Библиотека GraphFrames основана на GraphX, но использует объекты DataFrame в
качестве базовой структуры данных. GraphFrames поддерживает языки программирования Java, Scala и Python.
Вершины и ребра графа представлены в виде DataFrame с уникальным
идентификатором для каждой вершины и указанием начальной и конечной вершины для каждого отношения. Мы можем увидеть пример описания вершины DataFrame в табл. 3.1 и описания ребра DataFrame в табл. 3.2:
Граф GraphFrame, основанный на DataFrame из этого примера, будет
иметь две вершины, JFK и SEA, и одно направленное ребро от JFK до SEA.
Таблица DataFrame для вершин должна иметь столбец id – значение в
этом столбце используется для уникальной идентификации каждой вершины. Ребра в таблице DataFrame должны иметь столбцы src и dst – значения в этих столбцах описывают, какие вершины связаны ребром, и должны ссылаться на записи в столбце id таблицы вершин DataFrame.
8. Поиск пути
В следующей таблице приведен обзор алгоритмов нахождения пути и поисковых алгоритмов:
Примеры в этом разделы работают с графом, содержащим подмножество европейской дорожной сети.
Прежде, чем выполнять эти примеры, нужно предварительно загрузить библиотеку graphframes в виде бинарного файла .jar нужной версии, который можно взять по адресу:
https://spark-packages.org/package/graphframes/graphframes
Этот файл нужно положить в каталог
/usr/local/spark/jars
Нам понадобятся две таблицы
Первая таблица находится в файле transport-nodes.csv(вершины). В ней 4 поля - название города, широта, долгота и население:
Вторая таблица - файл transport-relationships.csv (ребра). В ней 4 поля - 2 прилегающих города, тип дороги и расстояние между городами:
Положим файл import_graph.py в каталог ../spark/bin/ , там же лежат два csv-файла.
Выполним этот файл, не заходя в pyspark, с помощью
spark-submit import_graph.py
Переведя сырые данные из csv-файлов в dataframe, теперь можно начать графовый поиск.
Поиск в ширину (breadth first search, BFS) – один из фундаментальных алгоритмов обхода графа. Он начинается с выбранной вершины и исследует
всех ее соседей на расстоянии одного перехода, а затем посещает всех
соседей на расстоянии двух переходов и т. д.
BFS чаще всего используется в качестве основы для других более целенаправленных алгоритмов. Например, алгоритмы кратчайшего пути,
связанных компонентов и центральности применяют алгоритм BFS. Его
также можно использовать для поиска кратчайшего пути между вершинами.
Алгоритм поиска в ширину на платформе Spark находит кратчайший
путь между двумя вершинами по количеству связей (т. е. переходов)
между ними. Вы можете явно назвать вашу целевую вершину или задать критерии, которые должны быть выполнены.
Например, вы можете использовать функцию bfs, чтобы найти первый город среднего размера
(по европейским стандартам) с населением от 100 000 до 300 000 человек. Давайте сначала проверим, в каких местах
население соответствует этим критериям.
Для этого добавим в наш питоновский файл следующий код:
(gf.vertices
.filter("population > 100000 and population < 300000")
.sort('population')
.show())
Найдено всего два города по выбранному критерию - Ипсвич (Ipswich) и Колчестер (Colchester).
Следующий код находит кратчайший путь от Гааги до города среднего размера:
from_expr = "id='Den Haag'"
to_expr = "population > 100000 and population < 300000 and id <> 'Den Haag'"
result = gf.bfs(from_expr, to_expr)
columns = [column for column in result.columns if not column.startswith("e")]
result.select(columns).show()
Получим вывод - ближайший к Гааге город 100-сячник - это Ипсвич:
Вычислим минимальный кратчайший путь между Амстердамом и Колчестером.
В предыдущем примере в качестве критерия минимальности мы испоьзовали минимальное количество узлов.
Здесь, в отличие от предыдущего примера, в качестве основного критерия мы выбираем минимальное суммарное расстояние между двумя городами. Расстояние - это столбик cost:
Общая длина кратчайшего пути между Амстердамом и Колчестером составляет 347 км, маршрут проходит через Гаагу, Хук-ван-Холланд,Феликстоу и Ипсвич.
Напротив, кратчайший путь с точки зрения количества переходов между городами, который мы разработали с помощью
алгоритма поиска в ширину , пролегает через Иммингем, Донкастер и
Лондон.
+----------+--------+------------------------------------------------------------------------+
|id |distance|path |
+----------+--------+------------------------------------------------------------------------+
|Colchester|347.0 |[Amsterdam, Den Haag, Hoek van Holland, Felixstowe, Ipswich, Colchester]|
+----------+--------+------------------------------------------------------------------------+
Алгоритмы вычисления центральности (centrality algorithm, далее просто
алгоритмы центральности) используются для понимания роли отдельных
узлов в сети и оценки уровня их влияния на сеть. Они полезны, потому
что выявляют наиболее важные узлы и помогают нам понять групповую
динамику, такую как достоверность, доступность, скорость обмена информацией, и мосты между группами. Хотя многие из этих алгоритмов были
изобретены для анализа социальных сетей, с тех пор они нашли применение в других областях.
Алгоритмы центральности применимы к любым графам, но социальные
сети являются наиболее ярким примером динамически меняющегося
влияния и потоков информации. Примеры в этой главе рассматривают
небольшой граф в стиле Твиттера.
Есть одна большая группа пользователей со связями между ними и
меньшая группа, не имеющая связей с большой группой.
Понимание охвата (reach) вершины является справедливой мерой
важности. Скольких других вершин эта вершина может коснуться прямо
сейчас? Степень (degree) вершины – это общее число прямых входящих и
исходящих связей, которыми она обладает. Вы можете интерпретировать
степень как меру непосредственной доступности узла. Например, человек
с высокой степенью в социальной сети будет иметь много непосредственных контактов и с большей вероятностью окажется и в вашей сети тоже.
Средняя степень (average degree) сети – это просто общее количество
связей, деленное на общее количество вершин; это значение может быть
сильно искажено наличием вершин с высокой степенью. Распределение
степеней (degree distribution) – это вероятность того, что случайно выбранная вершина будет иметь определенное число отношений.
Следующий код грузит грузит данные из csv-файлов.
Программа подсчитывает количество всех входящих
и исходящих связей вершины и используется для поиска популярных вершин.
Код позволяет применить алгоритм степенной
центральности к данным из нашего примера социальных связей:
Дуг (Doug) – самый популярный пользователь в нашем графе Твиттера с пятью подписчиками (входящие связи).
Все остальные пользователи в этой части графа подписаны на него, а он – только на одного человека.
Код примера и csv-файлы можно скачать тут