Introduction

As we all know - Postgres is eating the world of databases. It stands out like a Swiss army knife of databases. So, more and more developers adopt PostgreSQL in their projects to store the data. But as it always happens when projects grow - the need to stream the changes from the database to other services arises. This is where DataBrew Cloud and Open Source Blink come in.

Why would you do that?

Streaming data is not a silver bullet, but it still has a lot of use cases. Here are some of them:

  • Building event-driven architecture
  • Real-time analytics
  • Sharing data with external systems

What are the benefits?

Data streaming from Postgres also called CDC (Change-Data-Capture) is a process of reading changes from a WAL file directly, instead of querying your data which may cause a significant load on the database.

It also allows you to be sure your consumer may be offline for a while and still get all the changes when it comes back online.

Requirements

Postgres setup

First, let’s ensure you have your database ready for CDC. Let’s check your WAL_LEVEL:

SHOW wal_level;

If the result is not logical you should change it to logical:

WAL_LEVEL param represents the way your database will work with WAL. We want to have it set to logical as it makes the database write changes to a WAL file in a way that we can read it later.

NATS setup

Make sure you have nats.io server running. You can use the official docker image:

docker run -p 4222:4222 -ti nats:latest

You should see logs like this:

[1] 2019/05/24 15:42:58.228063 [INF] Starting nats-server version #.#.#
[1] 2019/05/24 15:42:58.228115 [INF] Git commit [#######]
[1] 2019/05/24 15:42:58.228201 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2019/05/24 15:42:58.228740 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/05/24 15:42:58.228765 [INF] Server is ready
[1] 2019/05/24 15:42:58.229003 [INF] Listening for route connections on 0.0.0.0:6222

If you are going to use DataBrew Cloud - you must ensure your Postgres and NATS are accessible from the internet. You can use services like ngrok to expose your local services to the internet. Or you can deploy them in the cloud.

Start with DataBrew Cloud

First, you need to create a new account in DataBrew Cloud or log into an existing one.

Then you need to create a new pipeline. You can do this by clicking on the “New Pipeline” button in the top right corner.

Add Postgres source

First, we must configure our PostgreSQL database as a source for the pipeline. Click on the “Add Connector” button and select “Postgres-CDC” from the list.

Create new Postgres-CDC Connector

Then you need to fill in the connection details for your Postgres database. You need to provide the following information:

Postgres-CDC Connector settings

When you fill out all the info - Press “Check Connection” to ensure the connection is working.

You will later be asked to provide the table you want to stream the changes from. Simply select the one needed to proceed.

Add NATS sink

To create a full pipeline you need to add a sink for the data. In our case, it will be NATS.

Click on the “Add Connector” button and select “NATS” from the list. The flow is relatively the same as with Postgres-CDC connector. You need to provide the connection details for your NATS server.

Create NATS Connector destination

Provide the connection details and press “Check Connection” to ensure the connection is working.

NATS Connector settings

Creating the pipeline

Once you have both connectors configured, you can press the “Create Pipeline” button to create the pipeline.

Select the previously created Postgres-CDC Connection as a source and NATS connector as a destination. It our case the connection name is “Taxi rides”, as we are going to stream the changes from the “taxi_rides” table.

Select Postgres as pipeline source

Select NATS as pipeline destination

Now is the time to save and deploy our pipeline. Press the “Save pipeline” button. We are not going to add any processors to our data flow just yet.

Save new pipeline

After you store the pipeline and press the “Deploy” button - you will see the logs of the pipeline execution.

Please keep in mind that the first pipeline deployment may take a few seconds.

Within a few seconds, you will see the logs from the pipeline execution. If everything is correct - you will see logs like this:

2024-05-22 22:11:59 INFO Metrics: Component has been loaded
2024-05-22 22:11:59 INFO Source: Loaded driver=postgres_cdc
2024-05-22 22:11:59 INFO Sinks: Loaded driver=nats
2024/05/22 22:11:59 INFO [source]: PostgreSQL-CDC: Create publication for table schemas with query CREATE PUBLICATION pglog_stream_rs_databrew_replication_slot_174_2231 FOR TABLE public.taxi_rides;
2024/05/22 22:11:59 INFO [source]: PostgreSQL-CDC: Created Postgresql publication publication_name=rs_databrew_replication_slot_174_2231
2024/05/22 22:11:59 INFO [source]: PostgreSQL-CDC: System identification result SystemID:=7293538614695768105 Timeline:=1 XLogPos:=E4/5C009318 DBName:=mocks
BEGIN
0
2024/05/22 22:12:00 INFO [source]: PostgreSQL-CDC: Processing database snapshot schema=public
  table=
{TableName:public.taxi_rides Schema:schema:
  │   fields: 11
  │     - _cq_sync_time: type=utf8, nullable
  │     - distance_traveled: type=float64, nullable
  │     - driver_id: type=int32, nullable
  │     - duration: type=int32, nullable
  │     - end_location: type=utf8, nullable
  │     - fare_amount: type=float64, nullable
  │     - log_id: type=int32
  │     - passenger_id: type=int32, nullable
  │     - payment_method: type=utf8, nullable
  │     - start_location: type=utf8, nullable
  │     - timestamp: type=utf8, nullable}
2024/05/22 22:12:00 INFO [source]: PostgreSQL-CDC: Query snapshot batch-size=13500
2024/05/22 22:12:00 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=0
2024/05/22 22:12:05 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=13500
2024/05/22 22:12:08 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=27000
2024/05/22 22:12:09 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=40500
2024-05-22 22:12:09 INFO Stream: Messages stat messages_received=53649 messages_sent=53649 messages_dropped_or_filtered=0
2024/05/22 22:12:09 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=54000
2024/05/22 22:12:10 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=67500
2024/05/22 22:12:11 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=81000
2024/05/22 22:12:11 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=94500
2024/05/22 22:12:12 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=108000

Blink is an Open-Source project from DataBrew that allows you to stream data from various sources to various destinations.

In this section, we will cover how to start with Blink and stream data from Postgres to NATS.

Assuming you already have all Postgres and NATS setup - let’s start with Blink.

You can read more about the installation here - Installing Blink.

Create a new pipeline

Comparing to the DataBrew Cloud - Blink is a CLI tool. You can create a new pipeline by defining the pipeline configuration in a YAML file.

Here is an example of the pipeline configuration for our particular use case: Store the file with the name blink.yaml

service:
  pipeline_id: 223
source:
  driver: postgres_cdc
  config:
    host: localhost
    slot_name: slot_example_name
    user: postgres
    password: 12345
    port: 5432
    schema: public
    stream_snapshot: false
    snapshot_memory_safety_factor: 0.1
    snapshot_batch_size: 10000
    ssl_required: true
    database: mocks
  stream_schema:
    - stream: public.taxi_rides
      columns:
        - name: log_id
          databrewType: Int32
          nativeConnectorType: integer
          pk: true
          nullable: false
        - name: _cq_sync_time
          databrewType: String
          nativeConnectorType: timestamp without time zone
          pk: false
          nullable: true
        - name: distance_traveled
          databrewType: Float64
          nativeConnectorType: double precision
          pk: false
          nullable: true
        - name: driver_id
          databrewType: Int32
          nativeConnectorType: integer
          pk: false
          nullable: true
        - name: duration
          databrewType: Int32
          nativeConnectorType: integer
          pk: false
          nullable: true
        - name: end_location
          databrewType: String
          nativeConnectorType: text
          pk: false
          nullable: true
        - name: fare_amount
          databrewType: Float64
          nativeConnectorType: double precision
          pk: false
          nullable: true
        - name: passenger_id
          databrewType: Int32
          nativeConnectorType: integer
          pk: false
          nullable: true
        - name: payment_method
          databrewType: String
          nativeConnectorType: text
          pk: false
          nullable: true
        - name: start_location
          databrewType: String
          nativeConnectorType: text
          pk: false
          nullable: true
        - name: timestamp
          databrewType: String
          nativeConnectorType: text
          pk: false
          nullable: true
processors: []
sink:
  driver: nats
  config:
    url: localhost:4222
    subject: taxi_rides
    username: ""
    password: ""

Start the pipeline

If you have Blink installed locally, you can start the pipeline by running the following command:

blink start -c blink.yaml

You should see the following output:

2024/05/22 22:12:41 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=567000
2024/05/22 22:12:42 INFO [source]: PostgreSQL-CDC: Query snapshot:  table=public.taxi_rides columns="[\"_cq_sync_time\" \"distance_traveled\" \"driver_id\" \"duration\" \"end_location\" \"fare_amount\" \"log_id\" \"passenger_id\" \"payment_method\" \"start_location\" \"timestamp\"]" batch-size=13500 offset=580500

The logs above display the data that is being streamed from the snapshot of existing data in the Postgres table.

Your logs may be slightly different as you may have different data in your Postgres table.

Check the data in NATS

The last step we can do is to check the data in NATS. You can use the NATS CLI tool to check the data in the subject.

nats sub -s nats://127.0.0.1:4222 "taxi_rides"

If you did everything correctly, you should see the following logs:

22:17:42 Subscribing on taxi_rides

[#1] Received on "taxi_rides"
[{"_cq_sync_time":null,"distance_traveled":19.15,"driver_id":1,"duration":320,"end_location":"55 Schlimgen Road","fare_amount":309.56,"log_id":8540,"passenger_id":583,"payment_method":"cash","start_location":"87 Fisk Driv","timestamp":"08/18/2022"}]