Elasticsearch

Как настроить интеграцию данных PostgreSQL в индексы Elasticsearch

"Максимальная полнота данных в документе Elasticsearch упорядочит и ускорит поиск."
Александр Романюк
автор, инженер-проектировщик систем мониторинга
Зачем мне интегрировать PostgreSQL и Elasticsearch?
Обогащать и наполнять контекстом документы
Ответ на вопрос зачем — дан выше. А мы поговорим о том, как это сделать. Хотя эта статья и основана на работе с PostgreSQL, но, используя jdbc-адаптер, данные можно извлекать из любой реляционной БД, которая поддерживает эту технологию. А делать это мы будем при помощи Logstash.

Logstash — это инструмент для интеграции различных источников данных с открытым исходным кодом. Logstash может брать данные из нескольких источников, преобразовывать их и помещать в хранилище. Первоначально, он использовался для сбора и обработки логов, но, со временем, его возможности стали сильно выходит за рамки только этого варианта использования.

Для начала установим Logstash при помощи dnf. Для этого мы создадим репозиторий в каталоге /etc/yum.repos.d с именем logstash. repo и вставляем следующий текст в файл.

[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
После этого, установим его. Обращаю внимание пользователей из России: сделать это вы сможете только при включенном VPN, т.к. Elastic ограничил возможность работы с дистрибутивами с российских IP-адресов.

dnf install logstash
У Logstash множество плагинов, которые позволяют извлекать данные из определенных источников. jdbc — это Java API для доступа к базам данных и выполнения запросов. Oracle, PostgreSQL и MySQL совместимы с jdbc, и можно свободно использовать этот адаптер для извлечения данных из перечисленных БД. Теперь установим jdbc-плагин:

/usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc
Также понадобится библиотека jdbc для чтения данных из PostgreSQL. Загрузить последнюю версию jdbc 4.2 можно отсюда. Скачанный пакет нужно поместить в эту папку:

/usr/share/logstash/logstash-core/lib/jars
Теперь можно использовать Logstash для синхронизации данных реляционной БД PostgreSQL с Elasticsearch. Следом создадим файлы конфигурации Logstash, чтобы указать входные данные в качестве запроса и куда записать в качестве вывода. После этой настройки Logstash начнет синхронизировать данные. В папке /etc/logstash есть файл logstash.yml для управления общими настройками Logstash. Подробности конфигурирования можно посмотреть здесь. Установим следующую настройку, чтобы можно было использовать escape-символы в запросах к PostgreSQL из Logstash.

config.support_escapes: true
Давайте представим, что у нас есть такая таблица в PostgreSQL и что мы хотим синхронизировать данные из неё с Elasticsearch:
USERS
UserID int
UserName varchar
Email varchar
CreatedAt timestamp
Сначала создадим файл конфигурации в /etc/logstash/conf.d с именем usersync.conf:

input {
  jdbc {
     jdbc_connection_string => "jdbc:postgresql://localhost:5432/testdatabase"
     jdbc_user => "db_user"
     jdbc_password => "db_password"
     jdbc_driver_class => "org.postgresql.Driver"
     statement => "SELECT * from public.\"Users\""
 }
}
output {
  elasticsearch {
    hosts => [“http://localhost:9200"]
    index => “users”
    document_id => “users_%{userid}”
    doc_as_upsert => true
    #user => “es_user”
    #password => “es_password”
 }
}
В разделе input конфигурации указываем строку подключения, имя пользователя, пароль и запрос, который будет выполняться для синхронизации. Этот запрос может быть простым или сложным комплексным запросом.

В разделе output конфигурации определяем адрес Elasticsearch и указываем в какой индекс Elasticsearch синхронизировать данные. Можете использовать существующий индекс или попросить Logstash создать новый. Чтобы предотвратить дублирование данных, нужно установить определённый document_id для Elasticsearch. Например, вы можно использовать PK (primary key) в качестве document_id для обновления существующих данных, если исходные данные вдруг изменяются. Также вы можно объединить несколько полей или строк в качестве document_id. В этом примере document_id установлен как tablename_PK (users_{userid}). doc_as_upsert означает, что Elasticsearch создаст новый документ, если документа с таким document_id не существует, в противном случае выполнит обновление существующего документа. Более подробно о выводе в Elasticsearch можно узнать по ссылке.

Можно создать несколько файлов конфигурации для каждого процесса синхронизации. Чтобы не путать эти процессы синхронизации, их нужно разделить их на отдельные конвейеры. Конвейеры Logstash — это изолированные пакеты процессов. Они могут иметь разные входы и выходы, а также иметь разные параметры долговечности. Для каждого файла конфигурации можно создать отдельный конвейер. Пайплайнами нужно управлять в файле pipelines.yml, расположенном в /etc/logstash. Пример конфигурации:

- pipeline.id: users-pipeline
 path.config: “/etc/logstash/conf.d/usersync.conf"
 
- pipeline.id: orders-pipeline
 path.config: "/etc/logstash/conf.d/logstash-ordersync.conf"
Теперь можно запускать синхронизацию. Перезапустим сервис Logstash:

systemctl enable logstash --now
После этого Logstash периодически будет выполнять синхронизацию и создавать новые документы.

Спасибо за внимание!
Что дальше

Приглашаем на семинар-инструктаж по Elastic Stack 8

25-27 марта 2024
Другие наши статьи об Elastic
Статья на Хабре
Статья на Хабре
Статья на Хабре
Статья на Хабре
Есть вопросы или предложения?
Вы можете написать здесь и при необходимости приложить файлы.
Нажимая на кнопку, вы даете согласие на обработку персональных данных и соглашаетесь c политикой конфиденциальности.