Translate

неділя, 31 січня 2016 р.

Apache Spark for Newbie


Вступ (Лінивим можна пропустити)

    Врамках Big Data тренінгу я почав вивчати роботу з Apach Spark computing framework. В інтернеті є достатня кількість прикладів побудови пайплайнів на різних мовах програмування (наприклад тут), проте я ніде не зміг найти хорошого гайду чи прикладу як написати проект з "нуля" і запустити його виконання на кластері. Усі що я знаходив приклади використовували локальний (тестовий) режим мастера , таким чином ніякої дистрибуції логіки не відбувалося і відчути що таке Apache Spark в повній мірі мені не вдавалося. До дня коли я прослухав тренінг людини, що працювала з цим інструментом в продакшені, саме він вніс ясність в те як Apache Spark працює (дякую тобі Тарас Матяшовський). Отже, після того як я в цьому розібрався я вирішив залишити ці знання на просторах Java User Group для всіх кому цікаво і для самого себе, щоб не забути ;-) Також, надіюся з заголовку статті зрозуміло що тут буде йтись лише про поверхневе ознайомлення з спарком і основна мета це зробити усе правильно. Тут буде використаний Stand Alone Spark Master який не потрібно використувувати в продакшені, для цого ви може використати Hadoop Yarn.

Опис задачі

    Задача взята з документації ApacheSpark - підрахунок слів у текстовому файлі. Проте зробимо це по законам жанру:
1. Дистрибутивна бібліотека (саме та яка розлетиться по воркерах кластеру і буде робити усю роботу);
2. Spark Driver (іншими словами це і буде наш клієнт через який ми доступатимемось до мастера і де ми будемо будувати наші пайплайни);
3. Web application (Наша веб аплікація - ендпоінти які ми будемо викликати щоб виконати ту чи іншу роботу, ну і щоб Spark Application UI був доступний і ми змогли подивитись статистику виконання роботи).

    Надіюся усе зрозуміло , проте якщо й ні , то далі постараюся усе розписати як найкраще. Поїхали !

Створення архітектури проектів

    Архітектура нашого проекту буде складатися з трьох модулів, а саме :
1) Distributed JAR
2) Spark Driver
3) Web Application
Загальна, архітектура зображена нижче :

    Для збірки проектів я буду використовувати Apache Maven, і почнемо з того що створемо батьківський проект який назвемо "spark-for-newbie" і додамо туди pom.xml файл. Батьківський pom.xml файл повинен включати усі вище згадані модулі :

<modules>
        <module>distributed-library</module>
        <module>web-api</module>
        <module>spark-driver</module>
</modules>

    Тепер підключаємо до нього основні залежності :
- Залежності на наші бібліотечні модулі :
<dependency>
        <groupId>org.ar.spark.newbie</groupId>
        <artifactId>spark-client</artifactId>
        <version>${project.version}</version>
</dependency>
<dependency>
        <groupId>org.ar.spark.newbie</groupId>
        <artifactId>distributed-library</artifactId>
        <version>${project.version}</version>
</dependency>

- Також залежність на останню (на момент публікації) версію Apache Spark :
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Тепер створюємо наші модулі в середині батьківського проекту:

distributed-library
    Додаємо до цього проекту лише залежність на Apache Spark , так як ця бібліотека міститиме класи функцій з його пакету, що будуть виконуватись на різних воркерах нашого кластеру.
<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <scope>provided</scope>
</dependency>

spark-driver
    Цей проект також буде бібліотекою яку ми використаємо у наступному web-api модулі. Він міститиме Spark Conext та залежність на функції, що будуть виконувати логіку нашого обчислення, тому нам потрібні наступні залежності :
<dependency>
      <groupId>org.ar.spark.newbie</groupId>
      <artifactId>distributed-library</artifactId>
</dependency>
<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
</dependency>

web-api
    Даний модуль буде нашою основною аплікацією, він повинен містити залежність на наш Spark Driver а також на дуже цікаву бібліотеку "Java Spark", завдяки якій ми зробимо REST Endpoint всього в одну лінійку коду. Spark Java, немає нічого спілького з Big Data та Apache Spark, також вона заслуговує окремої статті щоб описати її можливості.
<dependency>
      <groupId>org.ar.spark.newbie</groupId>
      <artifactId>spark-driver</artifactId>
      <version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
      <groupId>com.sparkjava</groupId>
      <artifactId>spark-core</artifactId>
      <version>2.3</version>
</dependency>

Розробка дистрибутивної бібліотеки (distributed-library)

    Для того, щоб підрахувати кількість слів у текстовому файлі ми повинні зробити наступний пайплайн :


    Отже нам треба буде написати 4 функції, 3 map і 1 reduce а також клас в який ми будемо мапити key-value рузультати і повернемо результати з воркера назад в аплікацію.

    SeparateWordLinesFunction.java - її задача отримати на вхід об'єкт типу String котрий є вичитаною лінією тексту з текстового файлу і розбити на окремі слова , тобто повернути список об'єктів String. Для цього, клас функції робитиме наступне :

public class SeparateWordLinesFunction implements FlatMapFunction<String, String> {
  @Override
  public Iterable<String> call(String s) throws Exception {
    return Arrays.asList(s.split(" "));
  }
};

    MapWordsToKeyValueFunction.java - повинна замапити кожне слово до лічіильника, який по замовчуванню буде "1", таким чином ми отримаємо key-value структуру даних. Для того, щоб тримати таку структуру буде використано об'єкт Tuple2 з пакету Scala бібліотеки Apache Spark : 

public class MapWordsToKeyValueFunction implements PairFunction<String, String, Integer> {
  @Override
  public Tuple2<String, Integer> call(String s) throws Exception {
    return new Tuple2<String, Integer>(s, 1);
  }
};

    ReduceKeyValueWordsByKey.java - reduce операція по ключу нашої структури, тут ми повинні вирішити що будемо робити з значенням і в нашому випадку ми його просумуємо :

public class ReduceKeyValueWordsByKey implements Function2<Integer, Integer, Integer> {
  @Override
  public Integer call(Integer intVal1, Integer intVal2) throws Exception {
    return intVal1 + intVal2;
  }
}

    MapKeyValueWordsToWrapperObject.java - функція що перетворить key-value структуру в більш лояльний для подальшої роботи об'єкт :

public class MapKeyValueWordsToWrapperObject implements Function<Tuple2<String,Integer>, WordResult> {
  @Override
  public WordResult call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
    return new WordResult(stringIntegerTuple2._1, stringIntegerTuple2._2);
  }
}

   WordsResult.java - звичайний POJO об'єкт котрий містить поля для самого слова і його лічильника. Потрібно лише імплементувати інтерфейс Serializable (для надання можливості серіалізувати його при передачі по мережі) і особисто від себе я перевизначив метод toString так як буду виводи об'єкти напряму до користувача.

Розробка  Spark Driver бібліотеки (spark-driver)

    Тут ми створемо SparkContext через який отримаємо доступ до мастера і задекларуємо наш пайплайн.

    Створення  SparkContext :
Для цього необідно створити SparkConf - конфігураційних об'єкт якому ми передамо назву нашої аплікації, шлях до мастера та шлях до скомпільованого JAR файлу який Spark розповсюдить на воркери де будуть виконуватись етапи нашого пайплайну :

private String[] distributedJars = new String[]{"/<path_to_workspace>/distributed-library/target/distributed-library-1.0-SNAPSHOT.jar"};

private JavaSparkContext sparkContext;

public SparkDriver(){
    SparkConf sparkConf = new SparkConf().setAppName("SparkForNewbie")
.setJars(distributedJars).setMaster("spark://127.0.0.1:7077");
    this.sparkContext = new JavaSparkContext(sparkConf);
}

    Імплементація Pipeline :

public List<WordResult> countWordsFromFile(String filePath){
        JavaRDD<String> words = sparkContext.textFile(filePath);
        return words
            .flatMap(new SeparateWordLinesFunction())
            .mapToPair(new MapWordsToKeyValueFunction())
            .reduceByKey(new ReduceKeyValueWordsByKey())
            .map(new MapKeyValueWordsToWrapperObject())
            .collect();
}

    З імплементації пайплайну чітко видно нашу попередню схему , де я показував як він виглядатиме. Усі функції підтянуті через залежність і знаходяться в distributed-library.

Розробка Веб аплікації

    Цей модуль потрібен нам саме для взаємодії нашого проекту з кінцевим користувачем. Тут ми зробимо енд-поінт який користувач зможе викликати в браузері і запустити на виконання Spark Pipeline, проте найцікавіше в цьому модулі це використання Spark Java, за допомогою якої ми реалізуємо енд-поінт :

get("/data", (request, response) -> dataService.countWords())

    Угу, саме так, 1 рядок коду і при старті web-api Spark Java розгорне Embedded Jetty і реалізує "/data" енд-поінт.  Далі, при виклику нашого енд-поінту викликаємо сервіс який містить в собі інстанс SparkDriver класу і виконує наш пайплайн передаючи шлях до піддослідного файлу:

public class DataService {

  private static final String TEXT_FILE_PATH = "/workspace/projects/spark-for-newbie/test.txt";
  private SparkDriver sparkDriver = new SparkDriver();

  public List<WordResult> countWords(){
    return sparkDriver.countWordsFromFile(TEXT_FILE_PATH);
  }

}

    Ось і все , усі модулі нашого проекту розписані , і можна переходити до тестування.

Тестування

Запуск Apache Master (stand alone version)
    Для цього качаємо дистрибутив Apache Spark (я викачував spark-1.6.0-bin-hadoop2.6.tgz) і йдемо у папку "conf" що в середині.
Тут потрібно змінити розширення файлу "spark-env.sh.template" на "spark-env.sh", зайти в середину і додати кілька рядків для конфігурації :

export SPARK_LOCAL_IP=127.0.0.1

export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8082

export SPARK_WORKER_MEMORY=2g
export SPARK_WORKER_CORES=2
export SPARK_WORKER_WEBUI_PORT=8083

(Опис усіх цих та інших можливих опцій присутні у цьомуж файлі)

    Тепер йдемо назад і переходимо у папку "sbin", тут виконуємо "./start-master.sh " і переходимо в браузері по шляху "http://localhost:8082/", тут має бути розгорутий ЮІ нашого мастера :


    Далі запускаємо воркер (так, так це не кластер адже все робиться на одній машині, але спарк про це не знає, він має все що необхідно для роботи у розподіленому режимі - мастер а мастер має воркер). Виконуємо "/start-slave.sh 127.0.0.1:7077" і переходимо по шляху "http://localhost:8083/" :

    Тепер у нас є все щоб запустити нашу аплікацію, йдемо в мейн метод модуля web-api та запускаємо його. В логах розгортання проекту ви повинні побачити як спарк розгорає ЮІ нашої аплікації :


    Тут буде відображатися найцікавіша інформація виконання нашого пайплайну.

    Переходимо по цьому шляху і повинні побачити :


    Щож, тепер усе готово, відкриваємо нову вкладку в браузері і переходимо на наш енд-поінт :

http://localhost:4567/data
    
    Якщо ж все було зроблено правильно ви повинні побачити результат нашого обчислення :



    P.S.

    Якщо ж у вас щось не вийшло або не спрацювало, ви можете знайти сорси мого проекту тут . Також ділюся з вами сорсами проекту Тараса, котрі я використовував в довідкових цілях коли знайомився з Apache Spark, вдачі !

 

понеділок, 18 січня 2016 р.

JUG Meetup, Different flavors of polymorphism in Scala



Dear Friends, 

We would like to invite you to the next JUG Meetup, which will occur this Wednesday, January 20th, 19:00!

This time, we have a great speaker, from Odesa! 

Boris Trofimov, Software Architect @Sigma Software
Co-founder of Odessa JUG. Professional software engineer with 10+ years of experience in wide range of expertises from mobile and enterprise to big data and high load distributed applications. Passionate follower of Scala-ble architectures.

Topic of the talk:


 "Different flavors of polymorphism in Scala" або  "Такой разный полиморфизм в Scala".

Topic description:


"Working with Scala can be compared to experiencing the "forth dimension".
Many of the features of Scala are unique and provide ways to look at application development in a new way.
Polymorphism in Scala is multifaceted and this is going to be our topic.

If you feel like you need a new view on some of the things you do everyday, if you are interested in scala and want to have a good time - come and join us!

Venue:

15 Horodotska St.
Lviv

Sponsor:

This event is supported by ELEKS!



The event is completely free, but requires registration, here: