博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台
阅读量:7028 次
发布时间:2019-06-28

本文共 2926 字,大约阅读时间需要 9 分钟。

一 安装

前文已经表述,本文不赘述直接进入集成应用阶段。

二 启动KafKa

#启动zookeeper[root@bogon kafka_2.11-0.10.2.0]# bin/zookeeper-server-start.sh config/zookeeper.properties#启动kafka服务端[root@bogon kafka_2.11-0.10.2.0]# bin/kafka-server-start.sh config/server.properties

三 配置PostgreSQL

3.1 配置读取权限

Bottled Water会连接到postgresql获取相关数据,连接的账户需要有replication权限,pg中数据库的变化存储在WAL中,至少需要replication权限才能读取WAL。

编辑$PGDATA目录中postgresql.conf和pg_hba.conf文件。

vi $PGDATA/postgresql.conf#编辑内容如下:listen_addresses = '*'port = 5432 wal_level = logical         max_wal_senders = 8wal_keep_segments = 4max_replication_slots = 4
vi $PGDATA/pg_hba.conf#编辑内容如下:# IPv4 local connections:host    all             all             0.0.0.0/0               md5# replication privilege.local   replication     freerep                                trusthost    replication     freerep        127.0.0.1/32            trusthost    replication     freerep        ::1/128                 trust

编辑完保存,重启数据库服务:

pg_ctl restartpsqlpostgres=# CREATE ROLE freerep  WITH REPLICATION PASSWORD 'password' LOGIN;CREATE ROLE

配置完毕!

3.2 Bottled Water使用演示

3.2.1 创建测试库表

创建一个测试数据库,建立测试表

postgres=# create database mcsas;postgres=# \c mcsas;mcsas=# create extension bottledwater;mcsas=# create extension postgis;#赋予public下的表给freerep角色,要创建如下语句,否则建立的表freerep没有读取权限mcsas=# alter default privileges in schema public grant all on tables to freerep;mcsas=# create table gps(gid serial primary key,name text,geom text);mcsas=# create index gps_geom_idx on gps using gist(ST_GeomFromText(geom,4326));

在另一个终端启动bottledwater可执行程序:

source /home/postgres/.bashrccd /opt/bottledwater-pg-master/kafka[root@localhost kafka]# ./bottledwater -d postgres://freerep:password@127.0.0.1/mcsas -b 192.168.43.27:9092 -f json

启动结果如下:

[root@bogon kafka]# ./bottledwater -d postgres://freerep:password@127.0.0.1/mcsas -b 192.168.43.27:9092 -f json[INFO] Writing messages to Kafka in JSON format[INFO] Created replication slot "bottledwater", capturing consistent snapshot "0000DA72-1".INFO:  bottledwater_export: Table public.spatial_ref_sys is keyed by index spatial_ref_sys_pkeyINFO:  bottledwater_export: Table public.mark is keyed by index mark_pkey[INFO] Registering metadata for table spatial_ref_sys (relid 24263)[INFO] Opening Kafka topic "spatial_ref_sys" for table "spatial_ref_sys"[INFO] Storing key schema for table 24263[INFO] Storing row schema for table 24263[INFO] Snapshot complete, streaming changes from 0/AB016F30.

代表启动成功了。

3.2.2 监听数据改变消息

插入数据

mcsas=# insert into gps(name,geom) values ('china','Point(118 32)');INSERT 0 1mcsas=# insert into gps(name,geom) values ('england','Point(118 12)');INSERT 0 1

启动监听topic

bin/kafka-console-consumer.sh --bootstrap-server 192.168.43.27:9092 --topic gps --from-beginning{"gid": {"int": 1}, "name": {"string": "china"}, "geom": {"string": "Point(118 32)"}}{"gid": {"int": 2}, "name": {"string": "england"}, "geom": {"string": "Point(118 12)"}}

每当插入或者更新,收听的消息会源源不断的输出出来,这样,pg与kafka集成就完毕了。

转载地址:http://kxlxl.baihongyu.com/

你可能感兴趣的文章