Новые технологии распределенного хранения и обработки больших массивов данных о. В. Сухорослов
Скачать 296.71 Kb.
|
1.4. HBase Система HBase [7] входит в состав свободно распространяемой платформы распределенных вычислений Hadoop (см. раздел 2.2). HBase является общедоступным аналогом описанной выше системы BigTable, работающим поверх распределенной файловой системы HDFS. Приведенное ниже описание HBase соответствует текущей на момент написания статьи версии HBase 0.1.2. Поскольку модель данных, архитектура и реализация HBase очень близка к BigTable, остановимся только на характерных особенностях и отличиях HBase от BigTable. 22 HBase использует несколько иную терминологию, чем BigTable. Так, аналог таблетов называется регионом (region), а обслуживающие регионы серверы называются регион-сервером (RegionServer). Внутри регионов данные разбиты на вертикальные семейства столбцов. Каждое семейство внутри региона хранится в отдельной структуре данных, называемой Store и аналогичной SSTable в BigTable. Содержимое Store хранится в нескольких файлах в HDFS, называемых StoreFile. Регион-сервер кэширует последние изменения в оперативной памяти (MemCache) и периодически сохраняет их в новые Store-файлы. Аналогично процедурам сжатия в BigTable, регион-сервер периодически объединяет Store-файлы. В отличие от BigTable, HBase не поддерживает определение прав доступа для семейств столбцов. Главный сервер HBase (Master) управляет назначением регионов серверам и отслеживает их состояние. В отличие от BigTable, HBase не использует подобный Chubby высоконадежный сервис для координации серверов. В случае, если регион- сервер теряет связь с главным сервером, то он автоматически завершает работу и перезапускается. При этом главный сервер перераспределяет обслуживавшиеся данным сервером регионы между другими регион-серверами. В отличие от этого, таблет-сервер BigTable может продолжать обслуживать клиентов после потери соединения с главным сервером. При перезапуске главного сервера, регион-сервера подключаются к нему и передают список обслуживаемых ими регионов. Метаданные и местоположения регионов хранятся в таблице META. В свою очередь, местоположения всех регионов таблицы META хранятся в таблице ROOT, занимающей всегда один регион. Клиенты запрашивают местоположение ROOT- региона у главного сервера, после чего просматривают таблицу META путем взаимодействия с регион-серверами. Местоположения регионов кэшируются клиентом. Как остальные компоненты, входящие в состав платформы Hadoop, HBase реализована на языке Java. Система имеет несколько интерфейсов для клиентских приложений: Java API, REST-интерфейс и доступ по протоколу Thrift. Пользователи могут взаимодействовать с системой через командную оболочку HBase Shell, которая поддерживает SQL-подобный язык HQL. HBase также предоставляет Web-интерфейс, позволяющий пользователям просматривать информацию о системе и хранимых таблицах. 23 2. Модели программирования и технологии распределенной обработки данных В главе рассматриваются модели программирования и технологии, ориентированные на обработку больших массивов данных на распределенных кластерных системах. Центральное место в главе занимает описание модели программирования MapReduce, разработанной в компании Google, и ее открытой реализации Apache Hadoop. В качестве другого подхода к описанию и реализации процессов обработки данных рассматривается технология Microsoft Dryad. 2.1. Модель программирования MapReduce MapReduce [2] – модель программирования и платформа для пакетной обработки больших объемов данных, разработанная и используемая внутри компании Google для широкого круга приложений. Модель MapeReduce отличается простотой и удобством использования, скрывая от пользователя детали организации вычислений в ненадежной распределенной среде. Пользователю достаточно описать процедуру обработки данных в виде двух функций – map и reduce, после чего система автоматически распределяет вычисления по кластеру из большого количества машин, обрабатывает отказы машин, балансирует нагрузку и координирует взаимодействия между машинами для эффективного использования сетевых и дисковых ресурсов. Впервые описание MapReduce было опубликовано в работе [8]. За последние четыре года внутри Google было разработано более 10 тысяч программ для MapReduce. В среднем, каждый день на кластерах Google выполняется около тысячи MapReduce- заданий, обрабатывающих вместе более 20 петабайтов данных [2]. Используемая в Google реализация MapReduce является закрытой технологией, однако существует общедоступная реализация Apache Hadoop (см. раздел 2.2). В рамках MapReduce вычисления принимают на вход и производят на выходе данные, состоящие из множества пар “ключ-значение”. Обозначим входные данные как list(k1,v1) , а выходные – как list(k2,v2). Пользователь описывает вычисления в виде 24 двух функций - map и reduce, близких по смыслу одноименным функциям языка Lisp. Функция map(k1,v1)→list(k2,v2) применяется к каждой паре входных данных и возвращает набор промежуточных пар. Далее реализация MapReduce группирует промежуточные значения v2, связанные с одним ключом k2, и передает эти значения функции reduce. Функция reduce(k2,list(v2))→list(v2) преобразует промежуточные значения в окончательный набор значений для данного ключа. Как правило, это одно агрегированное значение, например, сумма. В качестве примера рассмотрим задачу подсчета числа вхождения каждого слова в большую коллекцию документов. Входные данные можно представить в виде пар < имя документа, содержимое>. Тогда функция map для каждого слова, найденного в содержимом документа, должна возвратить пару <слово, 1>, а функция reduce должна просуммировать все значения для каждого слова, возвратив пары <слово, число вхождений>. Ниже приведен пример реализации данных функций на псевдокоде: map(String key, String value): // key: имя документа // value: содержимое документа for each word w in value: EmitIntermediate(w, “1”); reduce(String key, Iterator values): // key: слово // values: список отметок о вхождении слова int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result)); Обратим внимание на использование итератора в приведенном коде функции reduce. Обусловлено это тем, что обрабатываемые функцией наборы данных могут быть слишком большими для размещения в памяти. 25 Помимо описания функций map и reduce, в спецификацию MapReduce-задания входят пути к входным и выходным файлам, размещаемым в GFS, а также дополнительные конфигурационные параметры. Пользователь формирует объект со спецификацией задания при помощи интерфейса прикладного программирования на языке C++, предоставляемого библиотекой MapReduce. Запуск задания производится с помощью вызова специальной функции MapReduce, которой передается спецификация задания. Реализация MapReduce в Google ориентирована на вычислительную инфраструктуру, состоящую из большого числа недорогих серверов из массовых комплектующих. Как правило, это двухпроцессорные x86-машины с 4-8 Гб оперативной памяти, работающие под управлением Linux. Для соединения машин в кластере используется коммутируемый Gigabit Ethernet. Кластеры состоят из тысяч машин, поэтому постоянно возникают отказы отдельных узлов. Для хранения данных используются недорогие IDE-диски, подключенные к каждой из машин. Данные хранятся под управлением распределенной файловой системы GFS (см. раздел 1.1). Напомним, что GFS использует репликацию для надежного хранения данных. Запуском MapReduce-заданий на кластере управляет планировщик, который отслеживает состояние машин и подбирает группу машин для выполнения задания. Вызовы функции map распределяются между несколькими машинами путем автоматического разбиения входных данных на M частей, размер каждой из которых составляет обычно 16-64 Мб. Полученные порции данных могут обрабатываться параллельно различными машинами. Вызовы reduce распределяются путем разбиения пространства промежуточных ключей на R частей, определяемых с помощью функции разбиения (partitioning function). По умолчанию используется функция hash(k2) mod R. Число частей R и функция разбиения указываются пользователем в спецификации задания. Рассмотрим детально процесс выполнения задания в MapReduce. Запуск задания инициируется блокирующим вызовом внутри кода пользователя библиотеки MapReduce. После чего система разбивает входные файлы на M частей, размер которых контролируется с помощью конфигурационного параметра. Далее MapReduce осуществляет запуск копий программы на машинах кластера. Одна из 26 копий программы играет роль управляющего (master), а остальные являются рабочими (worker). Управляющий процесс осуществляет распределение задач (M map-задач и R reduce- задач) между рабочими. Каждому свободному рабочему процессу назначается map- или reduce-задача. Для каждой из задач управляющий процесс хранит ее состояние (ожидает выполнения, выполняется, завершена) и идентификатор рабочего процесса (для выполняющихся и выполненных заданий). Рабочий процесс, выполняющий map-задачу, считывает содержимое соответствующего фрагмента входных данных из GFS, выделяет из данных пары ключ- значение и передает каждую из пар заданной пользователем функции map. Полученные промежуточные пары буферизуются в памяти и периодически записываются на локальный диск. При записи данные разбиваются на R частей, в соответствии с функцией разбиения. По завершении задачи, местоположения файлов с промежуточными данными передаются управляющему процессу. Для каждой из map- задач управляющий процесс хранит местоположения и размеры R фрагментов с промежуточными данными, полученными задачей. Данная информация периодически обновляется по мере выполнения map-заданий и передается рабочим процессам, выполняющим reduce-задачи. При получении от управляющего процесса информации о готовых промежуточных данных, reduce-процесс считывает данные с локального диска map- процесса при помощи RPC-вызовов. Когда reduce-процесс получил все промежуточные данные для его порции выходных ключей, он производит сортировку значений по ключам. В случае если объем промежуточных данных слишком велик для размещения в памяти, используется внешняя сортировка. После сортировки, reduce-процесс последовательно сканирует промежуточные данные и для каждого встреченного уникального ключа вызывает заданную пользователем функцию reduce, передавая ей ключ и список найденных значений. Результаты вызова reduce записываются в выходной файл в GFS. Данные внутри выходного файла отсортированы по значению ключа. Для каждой reduce-задачи создается отдельный выходной файл. Когда все map- и reduce-задачи выполнены, управляющий процесс завершает выполнение вызова MapReduce в программе пользователя и передает управление следующему за ней коду. Результаты выполнения задания доступны в виде R файлов. 27 Обычно пользователь не объединяет полученные данные в один файл, а передает на вход следующему MappReduce-заданию или другому приложению, которое поддерживает работу с несколькими входными файлами. Рассмотрим обеспечение отказоустойчивости при выполнении задач MapReduce. Управляющий процесс периодически опрашивает рабочие процессы. В случае если в течение определенного времени ответ от рабочего процесса не поступил, управляющий процесс помечает рабочий процесс как отказавший. Все map-задачи, выполненные данным процессом переводятся в состояние “ожидает выполнения” и запускаются повторно на других машинах. Аналогично, все выполняемые отказавшим процессом map- и reduce-задачи переводятся в состояние “ожидает выполнения” и запускаются повторно. Перезапуск map-задач необходим, потому что их результаты хранятся на локальном диске отказавшей машины и, следовательно, становятся недоступны. Перезапуск reduce-задач не требуется, поскольку их результаты сохраняются в GFS. В случае повторного запуска уже выполненного map-задания, все reduce- процессы уведомляются об этом. Те процессы, которые не успели считать результаты предыдущего запуска задачи, будут считывать их у нового map-процесса. В случае если заданные пользователем функции map и reduce являются детерминированными функциями своих входных значений, распределенная реализация MapReduce гарантирует получение такого же результата, что и при последовательном выполнении программы без ошибок. Для этого используется атомарная фиксация результатов map- и reduce-задач. По выполнении map-задачи, рабочий процесс отправляет управляющему процессу сообщение, в котором указывает список R промежуточных файлов. В случае, если управляющий процесс получает сообщение о завершении уже выполненной задачи, он игнорирует данное сообщение. По завершении reduce-задачи, рабочий процесс атомарно переименовывает средствами GFS временный выходной файл в окончательный выходной файл. В случае если reduce- задача выполняется на нескольких машинах, то только результат одной из них будет в итоге записан в окончательный выходной файл. В случае если функции map и/или reduce являются недетерминированными, система предоставляет более слабые гарантии. Утверждается, что результат определенной reduce-задачи R1 будет эквивалентен результату R1, полученному при 28 некотором запуске последовательной недетерминированной программы. При этом результат другой reduce-задачи R2 может соответствовать результату R2, полученному при другом запуске последовательной недетерминированной программы. Подобная ситуация может возникать из-за того, что задачи R1 и R2 использовали результаты двух различных запусков некоторой map-задачи. Количество map- и reduce-задач обычно подбирается таким образом, что оно было гораздо больше числа рабочих машин. В этом случае достигается лучшая балансировка нагрузки между машинами. Кроме того, это позволяет эффективнее обрабатывать отказ map-процесса, поскольку большое число выполненных им задач может быть распределено по всем рабочим машинам. На количество map-заданий также влияет размер получаемых фрагментов входных данных. Идеальным является размер 16-64 Мб, не превышающий размер блока GFS и позволяющий локализовать вычисления рядом с данными. Количество reduce-заданий обычно ограничивается пользователями допустимым числом выходных файлов. Рассмотрим некоторые оптимизации, примененные при реализации MapReduce. При распределении map-задач по машинам в кластере управляющий процесс учитывает то, каким образом входные данные размещены в GFS (см. раздел 1.1). Поскольку данные хранятся в виде нескольких реплик на тех же машинах кластера, то управляющий сервер пытается отправить map-задачу на машину, хранящую соответствующий фрагмент входных данных, или же на машину, наиболее близкую к данным в смысле сетевой топологии. Подобная стратегия позволяет существенно снизить объем данных, передаваемых по сети во время запуска задания, и, тем самым, уменьшить время выполнения задания. Для уменьшения промежуточных данных передаваемых по сети от map- процессов к reduce-процессам, внутри map-задачи может производиться локальная редукция промежуточных данных с одним значением ключа. Для этого, пользователь должен указать в спецификации задания так называемую combiner-функцию, которая имеет такую же семантику, что функция reduce. Часто в качестве combiner-функции указывается reduce. Одной из частых причин увеличения времени выполнения MapReduce-задания является появление “отстающего” процесса, который слишком долго выполняет одну 29 из последних map- или reduce-задач. Подобное поведение может быть обусловлено многими причинами, например, неисправностью жесткого диска или запуском на машине других вычислений. Для устранения подобной проблемы в MapReduce используется следующая стратегия. В конце вычислений, управляющий процесс запускает выполняющиеся задачи на дополнительных машинах. При выполнении задачи на одной из машин, ее выполнение на другой машине приостанавливается. Подобная стратегия значительно уменьшает время выполнения больших заданий. Для отладки MapReduce-программ предусмотрен последовательный режим выполнения задания на локальной машине. Во время выполнения задания на кластере, управляющий процесс предоставляет Web-интерфейс с подробной информацией о статусе выполнения задания, доступом к log-файлам и т.д. В заключение, рассмотрим отличия MapReduce от существующих моделей и систем параллельных вычислений. Модели параллельных вычислений с элементами функционального программирования, позволяющие пользователю формировать программу из примитивов типа map, reduce, scan, sort и т.д., предлагались в академической среде задолго до появления MapReduce (см., например [9]). С этой точки зрения MapReduce можно рассматривать как упрощенную квинтэссенцию данных моделей, ориентированную на решение определенного круга задач по обработке больших массивов данных. Пожалуй, главная заслуга создателей MapReduce заключается в отказоустойчивой реализации вычислений на большом количестве ненадежных машин. В отличие от MapReduce, большинство систем параллельной обработки данных были реализованы на кластерах меньшего масштаба и часто требуют от программиста ручной обработки возникающих отказов. Модель MapReduce накладывает ряд ограничений на программу для того, чтобы автоматизировать распараллеливание, запуск и управление вычислениями на кластере. С одной стороны, это значительно упрощает задачу программиста и практически не требует от него специальной квалификации. С другой стороны, накладываемые системой ограничения не позволяют реализовать в ней решение произвольных задач. Например, в рамках описанной модели нельзя простым образом реализовать операции типа JOIN и SPLIT или организовать взаимодействие между параллельными процессами так, как это делается в технологии MPI. 30 2.2. Платформа Apache Hadoop Платформа распределенных вычислений Hadoop [10] разрабатывается на принципах open source в рамках организации The Apache Software Foundation. Платформа ориентирована на поддержку обработки больших объемов данных и заимствует многие идеи у закрытых технологий Google, таких как MapReduce, GFS и Big Table (см. разделы 2.1, 1.1, 1.3). Hadoop состоит из двух частей: Hadoop Core и HBase. В состав Hadoop Core входят распределенная файловая система HDFS (см. раздел 1.2) и реализация модели MapReduce. HBase содержит реализацию распределенной системы хранения структурированный данных (см. раздел 1.4). Данный раздел посвящен особенностям реализации модели MapReduce в Hadoop. Приведенное описание соответствует текущей на момент написания статьи версии Hadoop 0.17.0. Для реализации вычислений в Hadoop используется архитектура “master- worker ”. В отличие от Google MapReduce, в системе есть один выделенный управляющий процесс (т.н. JobTracker) и множество рабочих процессов (т.н. TaskTracker) , которые осуществляют выполнение всех заданий пользователей. JobTracker принимает задания от приложений, разбивает их на map- и reduce-задачи, распределяет задачи по рабочим процессам, отслеживает выполнение и осуществляет перезапуск задач. TaskTracker запрашивает задачи у управляющего процесса, загружает код и выполняет запуск задачи, уведомляет управляющий процесс о состоянии выполнения задачи и предоставляет доступ к промежуточным данным map-задач. Процессы взаимодействуют с помощью RPC-вызовов, причем все вызовы идут в направлении от рабочего к управляющему процессу, что позволяет уменьшить его зависимость от состояния рабочих процессов. Система реализована на языке Java. Для создания приложений используется прикладной интерфейс программирования на Java. Функции map и reduce описываются в виде классов, реализующих стандартные интерфейсы Mapper и Reducer. Спецификация задания оформляется в виде объекта типа JobConf, содержащего методы 31 для указания классов с map и reduce, форматов входных и выходных данных, путей к входных и выходным данным в HDFS и других параметров задания. Отметим, что, в отличие от Google MapReduce, реализация функции reduce может возвращать пары с произвольными ключами, не совпадающими с переданным на вход функции промежуточным ключом. Аналогично Google MapReduce, пользователь может указать функции разбиения входных данных (Partitioner) и комбинирования промежуточных данных (Combiner), оформленные в виде Java-классов. В состав Hadoop входят часто используемые на практике реализации функций map и reduce, а также функции разбиения. Запуск задания осуществляется с помощью вызова функции runJob или submitJob, каждой из которых передается объект JobConf. В первом случае приложение блокируется до завершения заданий, а во втором случае вызов сразу возвращает управление коду приложения. При запуске задания его спецификация и jar-файл с кодом автоматически размещаются в HDFS, после чего задание направляется JobTracker- процессу. В описании задания пользователь может указать набор дополнительных файлов, копируемых на рабочие узлы перед запуском вычислений. Количество map-задач определяется системой автоматически, исходя из указанного пользователем желаемого числа задач, а также максимального (размер HDFS- блока) и минимального размеров фрагмента входных данных. В общем случае, количество map-задач может не совпадать с указанным пользователем. Количество reduce- задач управляется с помощью параметра задания. По умолчанию используется значение 1. Пользователь может установить число reduce-задач равным 0. В этом случае фаза reduce не проводится, и промежуточные результаты map-задач записываются в выходные файлы в HDFS. Данная возможность полезна в тех случаях, когда не требуется агрегация или сортировка результатов фазы map. Hadoop поддерживает различные форматы входных и выходных данных, включая текстовый файл, двоичный формат со сжатием и таблицы HBase. Пользователь может использовать другие форматы данных путем создания специальных Java-классов для чтения и записи данных. Для отладки и отслеживания статуса выполнения map- и reduce-задач Hadoop позволяет обновлять внутри функций map и reduce строку статуса задачи и значения 32 счетчиков, определенных пользователем. Статус задачи и значения счетчиков отображаются в Web-интерфейсе Hadoop. В заключение отметим, что помимо запуска Java-программ, Hadoop позволяет указать в качестве реализаций map и reduce произвольные программы. Данные программы должны считывать входные данные из потока ввода и записывать результаты в поток вывода. |