Elasticsearch

Как настроить интеграцию данных PostgreSQL в индексы Elasticsearch

"Максимальная полнота данных в документе Elasticsearch упорядочит и ускорит поиск."
Александр Романюк
автор, инженер-проектировщик систем мониторинга
Зачем мне интегрировать PostgreSQL и Elasticsearch?
Обогащать и наполнять контекстом документы
Ответ на вопрос зачем — дан выше. А мы поговорим о том, как это сделать. Хотя эта статья и основана на работе с PostgreSQL, но, используя jdbc-адаптер, данные можно извлекать из любой реляционной БД, которая поддерживает эту технологию. А делать это мы будем при помощи Logstash.

Logstash — это инструмент для интеграции различных источников данных с открытым исходным кодом. Logstash может брать данные из нескольких источников, преобразовывать их и помещать в хранилище. Первоначально, он использовался для сбора и обработки логов, но, со временем, его возможности стали сильно выходит за рамки только этого варианта использования.

Для начала установим Logstash при помощи dnf. Для этого мы создадим репозиторий в каталоге /etc/yum.repos.d с именем logstash. repo и вставляем следующий текст в файл.

[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
После этого, установим его. Обращаю внимание пользователей из России: сделать это вы сможете только при включенном VPN, т.к. Elastic ограничил возможность работы с дистрибутивами с российских IP-адресов.

dnf install logstash
У Logstash множество плагинов, которые позволяют извлекать данные из определенных источников. jdbc — это Java API для доступа к базам данных и выполнения запросов. Oracle, PostgreSQL и MySQL совместимы с jdbc, и можно свободно использовать этот адаптер для извлечения данных из перечисленных БД. Теперь установим jdbc-плагин:

/usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc
Также понадобится библиотека jdbc для чтения данных из PostgreSQL. Загрузить последнюю версию jdbc 4.2 можно отсюда. Скачанный пакет нужно поместить в эту папку:

/usr/share/logstash/logstash-core/lib/jars
Теперь можно использовать Logstash для синхронизации данных реляционной БД PostgreSQL с Elasticsearch. Следом создадим файлы конфигурации Logstash, чтобы указать входные данные в качестве запроса и куда записать в качестве вывода. После этой настройки Logstash начнет синхронизировать данные. В папке /etc/logstash есть файл logstash.yml для управления общими настройками Logstash. Подробности конфигурирования можно посмотреть здесь. Установим следующую настройку, чтобы можно было использовать escape-символы в запросах к PostgreSQL из Logstash.

config.support_escapes: true
Давайте представим, что у нас есть такая таблица в PostgreSQL и что мы хотим синхронизировать данные из неё с Elasticsearch:
USERS
UserID int
UserName varchar
Email varchar
CreatedAt timestamp
Сначала создадим файл конфигурации в /etc/logstash/conf.d с именем usersync.conf:

input {
  jdbc {
     jdbc_connection_string => "jdbc:postgresql://localhost:5432/testdatabase"
     jdbc_user => "db_user"
     jdbc_password => "db_password"
     jdbc_driver_class => "org.postgresql.Driver"
     statement => "SELECT * from public.\"Users\""
 }
}
output {
  elasticsearch {
    hosts => [“http://localhost:9200"]
    index => “users”
    document_id => “users_%{userid}”
    doc_as_upsert => true
    #user => “es_user”
    #password => “es_password”
 }
}
В разделе input конфигурации указываем строку подключения, имя пользователя, пароль и запрос, который будет выполняться для синхронизации. Этот запрос может быть простым или сложным комплексным запросом.

В разделе output конфигурации определяем адрес Elasticsearch и указываем в какой индекс Elasticsearch синхронизировать данные. Можете использовать существующий индекс или попросить Logstash создать новый. Чтобы предотвратить дублирование данных, нужно установить определённый document_id для Elasticsearch. Например, вы можно использовать PK (primary key) в качестве document_id для обновления существующих данных, если исходные данные вдруг изменяются. Также вы можно объединить несколько полей или строк в качестве document_id. В этом примере document_id установлен как tablename_PK (users_{userid}). doc_as_upsert означает, что Elasticsearch создаст новый документ, если документа с таким document_id не существует, в противном случае выполнит обновление существующего документа. Более подробно о выводе в Elasticsearch можно узнать по ссылке.

Можно создать несколько файлов конфигурации для каждого процесса синхронизации. Чтобы не путать эти процессы синхронизации, их нужно разделить их на отдельные конвейеры. Конвейеры Logstash — это изолированные пакеты процессов. Они могут иметь разные входы и выходы, а также иметь разные параметры долговечности. Для каждого файла конфигурации можно создать отдельный конвейер. Пайплайнами нужно управлять в файле pipelines.yml, расположенном в /etc/logstash. Пример конфигурации:

- pipeline.id: users-pipeline
 path.config: “/etc/logstash/conf.d/usersync.conf"
 
- pipeline.id: orders-pipeline
 path.config: "/etc/logstash/conf.d/logstash-ordersync.conf"
Теперь можно запускать синхронизацию. Перезапустим сервис Logstash:

systemctl enable logstash --now
После этого Logstash периодически будет выполнять синхронизацию и создавать новые документы.

Спасибо за внимание!
Что дальше

Приглашаем наши тренинги по Zabbix, OpenSearch, ElasticSearch

Максимум знаний за короткое время
Другие наши статьи об Elastic
Статья на Хабре
Статья на Хабре
Статья на Хабре
Статья на Хабре
Есть вопросы или предложения?
Вы можете написать здесь и при необходимости приложить файлы.
Нажимая на кнопку, вы даете согласие на обработку персональных данных и соглашаетесь c политикой конфиденциальности.