Grakn - база данных для AI. Это распределенная база знаний, разработанная специально для обработки сложных данных в системе, ориентированной на знания, - для чего традиционные технологии баз данных просто неадекватны.

Чтобы их внутренние знания были самыми актуальными и актуальными, системы искусственного интеллекта всегда жаждут новых обновленных данных. Поэтому беспрепятственная работа с потоковыми данными полезна для построения систем, ориентированных на знания. В этом сообщении блога мы рассмотрим, как транслировать общедоступные твиты в распределенную базу знаний Grakn.

Случаи повседневного использования

Во многих публикациях в блогах, которые мы рассмотрели до сих пор, мы встречались с данными, охватывающими все, от фармацевтических биологических целей до финансовых инструментов, которые могут быть менее доступны для среднего разработчика, и варианты использования, которые могут быть пугающими, для более начинающие разработчики. Grakn оптимизирован для работы со сложными данными и вариантами использования и предлагает возможности, с которыми существующие базы данных просто не могут конкурировать.

Тем не менее, с помощью Grakn можно легко создать множество вариантов повседневного использования. Для большего количества начинающих пользователей или для пользователей с менее сложными вариантами использования освоение Grakn будет огромным преимуществом для их навыков разработки.

Поэтому, хотя потоковая передача актуальна для большинства сложных случаев использования аналитики данных, мы решили, что было бы целесообразно использовать набор данных, который был бы хорошо знаком всем пользователям: данные социальных сетей.

В частности, мы рассмотрим использование Grakn для потоковой передачи данных Twitter.

Авария

Поскольку в этом блоге будет большая глубина (и длина) учебников, он разделен на серию из трех частей.

1-е сообщение: передача данных в базу знаний

Будут рассмотрены ключевые концепции, такие как получение, вставка и запрос данных. Этот пост будет самым длинным. К концу поста вы узнаете об этих концепциях:

  • Определение простой онтологии Grakn с помощью Java API
  • Потоковая передача публичных твитов в приложение с помощью библиотеки Twitter4J
  • Вставка твитов в граф знаний с помощью Grakn's Graph API

2-е сообщение: Запрос базы знаний.

  • Мы сосредоточимся в основном на выполнении простых запросов, например, группировке твитов по пользователям и отображении совокупного количества твитов на пользователя, а также рассмотрим несколько более сложных запросов, чтобы подчеркнуть аналитические возможности Grakn.

3-е сообщение: Оптимизация

  • Мы рассмотрим возможность добавления пакетной вставки, которая обеспечивает лучшую производительность, особенно при использовании больших объемов данных.

Хорошо, давай потянем!

(Нет необходимости) Установить Grakn

Если вы еще не загрузили и не установили Grakn, мы не будем подробно останавливаться на этом здесь, так как вам не нужно для целей этого руководства! JAR будет загружен автоматически, поскольку он использует только график в памяти, и поэтому все является самодостаточным.

Тем не менее, если вы не знакомы с Grakn и его языком запросов, Graql, мы рекомендуем проверить этот вводный пост вместе с объяснением некоторых ключевых терминов. Они охватывают основные концепции Grakn, которые являются основополагающими для использования платформы. И, если вы готовы загрузить и установить Grakn, ознакомьтесь с нашим руководством по установке.

Наконец, если у вас возникнут какие-либо вопросы при знакомстве с Grakn, пожалуйста, присоединяйтесь к нашему Slack или оставьте сообщение на нашем дискуссионном форуме.

Регистрация вашего собственного приложения Twitter

На сегодняшний день вам потребуются действительные учетные данные для вызова практически каждой конечной точки в Twitter API. Следовательно, вы должны уже иметь приложение Twitter (или зарегистрировать новое), прежде чем продолжить.

Вы можете зарегистрировать собственное приложение в Twitter Application Management. Как только вы это сделаете, вы сможете получить учетные данные, перейдя на вкладку Ключи и токены доступа. В частности, нам важны Consumer Key, Consumer Secret, Access Token и Access Token Secret.

Приступая к работе: Начальная загрузка скелетного проекта

Давайте начнем новый проект maven!

Нажмите командную строку и выполните следующую команду, чтобы сгенерировать новый проект maven:

mvn archetype:generate \
  -DgroupId=ai.grakn \
  -DartifactId=twitterexample \
  -DarchetypeArtifactId=maven-archetype-quickstart \
  -DinteractiveMode=false

Теперь, когда у вас есть базовая структура проекта и pom.xml, давайте приступим к их настройке в соответствии с нашими потребностями. В раздел <build> мы добавим две вещи:

  1. maven-compiler-plugin конфигурация для включения лямбда-выражения и других отличных функций Java 8
  2. maven-shade-plugin конфигурация, которая указывает на наш главный класс для создания толстого JAR.

Затем перейдите к разделу <dependencies> и убедитесь, что у вас есть все необходимые зависимости, то есть grakn-graph, grakn-graql, twitter4j-core и twitter4j-stream.

Вы можете увидеть полное определение pom.xml здесь.

Главный класс

Давайте начнем с определения класса Main внутри пакета ai.grakn.twitterexample. Помимо учетных данных Twitter, он содержит несколько важных настроек Grakn.

Во-первых, мы решили использовать граф в памяти для простоты - работа с графом в памяти освобождает нас от необходимости настраивать дистрибутив Grakn на локальной машине. График в памяти не предназначен для хранения данных и будет утерян после завершения выполнения программы. Во-вторых, график будет храниться в пространстве ключей с именем twitter-example.

package ai.grakn.twitterexample;
import ai.grakn.Grakn;
import ai.grakn.GraknSession;
public class Main {

}

Затем мы определяем объект GraknSession в main(). Включение его в конструкцию try-with-resource - хорошая практика, чтобы не забыть закрыть сеанс вызовом session.close().

public static void main(String[] args) {
 try (GraknSession session = Grakn.session(graphImplementation, keyspace)) {
 // our code will go here
 }
}

После этого еще один не менее важный объект для работы с графиком - GraknGraph. После выполнения желаемых операций мы не должны забывать о фиксации. Для удобства определим вспомогательный метод, который открывает GraknGraph в режиме записи и фиксирует его после выполнения функции fn. Мы будем использовать эту функцию в различных местах на протяжении всего урока.

public class GraknTweetOntologyHelper {
  public static void withGraknGraph(GraknSession session, Consumer<GraknGraph> fn) {
    GraknGraph graphWriter = session.open(GraknTxType.WRITE);
    fn.accept(graphWriter);
    graphWriter.commit();
  }
}

Мы решили опустить обработку исключений, чтобы не усложнять учебник. Однако в производственном коде это будет очень важно, и о нем нельзя забывать.

Определение онтологии

Давайте определимся с онтологией. В случае, если вы пропустили ознакомительные ссылки выше и немного устали от концепции онтологии, рекомендуется прочитать сообщение Что такое онтология?, Прежде чем продолжить.

Поскольку нас в основном интересует как твит, так и его автор, давайте зафиксируем эти концепции, определив две сущности: user и tweet.

Сущность пользователя будет содержать фактическое имя пользователя в ресурсе с именем screen_name, а сущность твита будет содержать твит пользователя в другом ресурсе с именем text. Мы также определим идентификатор ресурса для id.

Затем мы определим две роли - posts и posted_by, чтобы указать, что пользователь публикует твит, и аналогично, твит публикуется пользователем. Мы свяжем эти две роли отношением, называемым user-tweet-relation.

Структуру можно резюмировать следующим графиком:

С этим набором давайте определим новый метод initTweetOntology внутри GraknTweetOntologyHelper класса и определим там нашу онтологию.

public class GraknTweetOntologyHelper {
  public static void initTweetOntology(GraknGraph graknGraph) {
  }
}

Начнем с определения наших ресурсов:

// resources
ResourceType idType = graknGraph.putResourceType(“identifier”, ResourceType.DataType.STRING);
ResourceType textType = graknGraph.putResourceType(“text”, ResourceType.DataType.STRING);
ResourceType screenNameType = graknGraph.putResourceType(“screen_name”, ResourceType.DataType.STRING);

Сущности:

// entities
EntityType tweetType = graknGraph.putEntityType(“tweet”);
EntityType userType = graknGraph.putEntityType(“user”);

Роли и отношения:

// roles
RoleType postsType = graknGraph.putRoleType(“posts”);
RoleType postedByType = graknGraph.putRoleType(“posted_by”);
// relations
RelationType userTweetRelationType = graknGraph.putRelationType(“user-tweet-relation”).relates(postsType).relates(postedByType);

И, наконец, правильно распределите ресурсы и роли.

// resource and relation assignments
tweetType.resource(idType);
tweetType.resource(textType);
userType.resource(screenNameType);
userType.plays(postsType);
tweetType.plays(postedByType);

Теперь вызовите метод в main, чтобы онтология была создана в начале приложения.

public static void main(String[] args) {
  try (GraknSession session = Grakn.session(graphImplementation, keyspace)) {
    withGraknGraph(session, graknGraph -> initTweetOntology(graknGraph)); // initialize ontology
  }
}

Потоковая передача данных из Twitter

Теперь, когда мы закончили создание онтологии, давайте разработаем код для прослушивания общедоступного потока твитов.

Определите новый метод listenToTwitterStreamAsync и поместите его в класс с именем AsyncTweetStreamProcessorHelper. Помимо принятия настроек учетных данных Twitter, нам также потребуется предоставить обратный вызов onTweetReceived, который будет вызываться всякий раз, когда приложение получит новый твит. Далее мы будем использовать этот обратный вызов для хранения, запроса и отображения твитов по мере их поступления.

public class AsyncTweetStreamProcessorHelper {
  public static TwitterStream listenToTwitterStreamAsync(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, BiConsumer<String, String> onTweetReceived) {
    final String DEFAULT_LANGUAGE = “en”;
    // …
  }
}

Первое, что нам нужно сделать здесь, это создать объект Configuration из настроек учетных данных Twitter. Давайте напишем для этого специальный метод и назовем его createTwitterConfiguration. После этого используйте этот метод для создания объекта Configuration, который нам понадобится в listenToTwitterStreamAsync.

public class AsyncTweetStreamProcessorHelper {
  public static TwitterStream listenToTwitterStreamAsync(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, BiConsumer<String, String> onTweetReceived) {
    final String DEFAULT_LANGUAGE = “en”;
    Configuration conf = createTwitterConfiguration(consumerKey, consumerSecret, accessToken, accessTokenSecret);
      // …
    }
    private static Configuration createTwitterConfiguration(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret) {
      return new ConfigurationBuilder()
        .setDebugEnabled(false)
        .setOAuthConsumerKey(consumerKey)
        .setOAuthConsumerSecret(consumerSecret)
        .setOAuthAccessToken(accessToken)
        .setOAuthAccessTokenSecret(accessTokenSecret)
        .build();
  }
}

Затем мы создадим частный класс TweetListener и заставим его реализовать интерфейс StatusListener из Twitter4J. В этом интерфейсе есть несколько методов, которые мы можем переопределить в зависимости от того, что мы хотим получать от Twitter. Поскольку нас интересуют только обновления твитов и ничего больше, нам нужно переопределить только один метод onStatus.

Конструктор нашего TweetListener класса принимает обратный вызов onStatusReceived, который будет выполняться каждый раз, когда мы получаем новый твит.

Когда мы закончим определение класса, вернемся к listenToTwitterStreamAsync и создадим его экземпляр. Мы также создадим экземпляры двух других классов, TwitterStreamFactory и TwitterStream. Теперь мы можем начать слушать Twitter, вызвав метод sample. Мы предоставили “en”, что означает, что нас интересуют только английские твиты.

public class AsyncTweetStreamProcessorHelper {
  public static TwitterStream listenToTwitterStreamAsync(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, BiConsumer<String, String> onTweetReceived) {
    final String DEFAULT_LANGUAGE = “en”;
    Configuration conf = createTwitterConfiguration(consumerKey, consumerSecret, accessToken, accessTokenSecret);
    TweetListener tweetListener = new TweetListener(onTweetReceived);
    TwitterStreamFactory twitterStreamFactory = new TwitterStreamFactory(conf);
    TwitterStream twitterStreamSingleton = twitterStreamFactory.getInstance();
    twitterStreamSingleton.addListener(tweetListener);
    twitterStreamSingleton.sample(DEFAULT_LANGUAGE);
    return twitterStreamSingleton;
  }
}
// An implementation which implements twitter4j’s StatusListener
class TweetListener implements StatusListener {
  public TweetListener(BiConsumer<String, String> onStatusReceived) {
    this.onStatusReceived = onStatusReceived;
  }
  public void onStatus(Status status) {
    onStatusReceived.accept(status.getUser().getScreenName(), status.getText());
  }
  public void onException(Exception ex) {
    ex.printStackTrace();
  }
  // a bunch of empty event handler implementations, we’re not using them
  public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
  public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
  public void onScrubGeo(long lat, long long_) {}
  public void onStallWarning(StallWarning stallWarning) {}
  private BiConsumer<String, String> onStatusReceived;
}

Давайте завершим этот раздел, добавив вызов listenToTwitterStreamAsync в main.

public static void main(String[] args) {
  try (GraknSession session = Grakn.session(graphImplementation, keyspace)) {
    withGraknGraph(session, graknGraph -> initTweetOntology(graknGraph)); // initialize ontology
    listenToTwitterStreamAsync(consumerKey, consumerSecret, accessToken, accessTokenSecret, (screenName, tweet) -> {
      // TODO: do something upon receiving a new tweet
    });
  }
}

Вставка твитов в сеть знаний

На данный момент наша небольшая программа уже имеет четко определенную онтологию и способна прослушивать входящие твиты. Однако нам еще предстоит решить, что именно мы будем с ними делать. В этом разделе мы узнаем, как:

  1. Вставить входящий твит в граф знаний
  2. Вставьте пользователя, разместившего твит, только один раз - мы не хотим вставлять одного и того же пользователя дважды
  3. Поддерживайте связь между твитом и пользователем

Мы будем использовать API-интерфейс графа для вставки данных в граф, потому что он легкий и эффективный.

Вставить твит

Чтобы вставить твит, мы должны создать сущность твита и текстовый ресурс для хранения текстовых данных твита, прежде чем связывать указанный ресурс с сущностью.

Давайте сделаем это с помощью нового метода. Он примет один String и вставит его в граф знаний перед возвратом Entity указанного твита.

Обратите внимание на то, как нам нужно получить EntityTypes и ResourceTypes сущности и ресурса, которые нас интересуют - они нам нужны для выполнения фактической вставки.

public static Entity insertTweet(GraknGraph graknGraph, String tweet) {
  EntityType tweetEntityType = graknGraph.getEntityType(“tweet”);
  ResourceType tweetResouceType = graknGraph.getResourceType(“text”);
  Entity tweetEntity = tweetEntityType.addEntity();
  Resource tweetResource = tweetResouceType.putResource(tweet);
  return tweetEntity.resource(tweetResource);
}

Вставить пользователя

Помимо твита, мы также хотим сохранить, кто опубликовал твит. Семантика, которую нам нужно обеспечить, - это вставить конкретного пользователя только один раз, то есть нет смысла сохранять одного и того же пользователя дважды.

Поэтому давайте добавим метод проверки, сохраняли ли мы ранее определенного пользователя. Мы будем использовать Optional<T> из Java 8, где мы возвращаем Entity объект этого пользователя, только если он существует в графе знаний. В противном случае будет возвращено Optional.empty().

public static Optional<Entity> findUser(QueryBuilder queryBuilder, String user) {
  MatchQuery findUser = queryBuilder.match(var(“x”).isa(“user”).has(“screen_name”, user)).limit(1);
  Iterator<Concept> concepts = findUser.get(“x”).iterator();
  if (concepts.hasNext()) {
    Entity entity = concepts.next().asEntity();
    return Optional.of(entity);
  }
  else return Optional.empty();
}

И следующий способ вставки user. Этот очень похож на тот, который мы сделали для вставки твита.

public static Entity insertUser(GraknGraph graknGraph, String user) {
  EntityType userEntityType = graknGraph.getEntityType(“user”);
  ResourceType userResourceType =  graknGraph.getResourceType(“screen_name”);
  Entity userEntity = userEntityType.addEntity();
  Resource userResource = userResourceType.putResource(user);
  return userEntity.resource(userResource);
}

И, наконец, напишите функцию для вставки пользователя, только если его еще нет в графе знаний.

public static Entity insertUserIfNotExist(GraknGraph graknGraph, String screenName) {
  QueryBuilder qb = graknGraph.graql();
  return findUser(qb, screenName).orElse(insertUser(graknGraph, screenName));
}

Связь твита с пользователем

Мы почти закончили работу с полной функцией вставки твитов! Осталось только одно - связать объект твита с объектом пользователя. В конце концов, сохранение этой связи имеет решающее значение.

Следующая функция создаст связь между пользователем и указанным нами твитом.

public static Relation insertUserTweetRelation(GraknGraph graknGraph, Entity user, Entity tweet) {
  RelationType userTweetRelationType = graknGraph.getRelationType(“user-tweet-relation”);
  RoleType postsType = graknGraph.getRoleType(“posts”);
  RoleType postedByType = graknGraph.getRoleType(“posted_by”);
  Relation userTweetRelation = userTweetRelationType.addRelation()
    .addRolePlayer(postsType, user)
    .addRolePlayer(postedByType, tweet);
  return userTweetRelation;
}

Завершение вставки твита

Наконец, давайте подведем итоги, определив функцию, единственная ответственность которой заключается в выполнении всех методов, которые мы определили выше.

public static Relation insertUserTweet(GraknGraph graknGraph, String screenName, String tweet) {
  Entity tweetEntity = insertTweet(graknGraph, tweet);
  Entity userEntity = insertUserIfNotExist(graknGraph, screenName);
  return insertUserTweetRelation(graknGraph, userEntity, tweetEntity);
}

Давайте добавим метод, который мы только что определили, к методу main, как показано ниже.

public static void main(String[] args) {
  try (GraknSession session = Grakn.session(graphImplementation, keyspace)) {
    withGraknGraph(session, graknGraph -> initTweetOntology(graknGraph)); // initialize ontology
    listenToTwitterStreamAsync(consumerKey, consumerSecret, accessToken, accessTokenSecret, (screenName, tweet) -> {
    withGraknGraph(session, graknGraph -> insertUserTweet(graknGraph, screenName, tweet)); // insert tweet
  });
 }

Заключение

Мы закончили с функцией вставки твитов! В следующей публикации этой серии мы рассмотрим, как можно использовать Graql для запроса информации из данных.

Если вам понравилась эта статья, пожалуйста, найдите время, чтобы попасть в сердце рекомендаций ниже, чтобы другие тоже могли ее найти. Если у вас есть какие-либо вопросы или комментарии, свяжитесь с нами ниже или через канал нашего сообщества Slack.

Узнайте больше на https://grakn.ai

Изображение предоставлено: Stream от Krondol находится под лицензией CC BY 2.0