How to sync data from MySQL to Google BigQuery using Debezium and Kafka Connect

How to sync data from MySQL to Google BigQuery using Debezium and Kafka Connect

September 9, 2023 privacy 0
Syncing data from a MySQL database to Google BigQuery can be a great way to keep your data up to date and easily accessible for analysis. In this article, we will explore the process of setting up Debezium and Kafka Connect to sync data from MySQL to BigQuery, providing you with all the information you need to get started. Why use Debezium and Kafka Connect? Debezium and Kafka Connect are open-source platforms that provide a powerful solution for streaming data changes in real-time between systems. That real-time interaction allows you to keep your data in sync and accessible for various use cases such as real-time analytics, data warehousing, and data pipeline integrations. Technology used Before we go into the details of setting up Debezium and Kafka Connect to sync data from MySQL to BigQuery, it is important to understand the technologies that you will be using and how they are connected. Change data capture Change data capture (CDC) is a technique for capturing and recording all the changes made to a database over time. This allows for real-time data replication, making it easy to keep multiple systems in sync. CDC does this by detecting row-level changes in database source tables, which are characterized as “Insert,” “Update,” and “Delete” events. CDC then notifies other systems or services that rely on the same data.​ Apache Kafka Apache Kafka is a distributed streaming platform that is used for building real-time data pipelines and streaming applications. It allows for the storage and processing of streams of records in a fault-tolerant way. Kafka Connect Kafka Connect is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems, using so-called connectors.​ Kafka connectors are ready-to-use components that can help you to import data from external systems into Kafka topics and export data from Kafka topics into external systems. You can use existing connector implementations for common data sources and syncs or implement our own connectors.​ Debezium Debezium is an open-source platform that allows you to easily stream changes from your MySQL database to other systems using CDC. It works by reading MySQL binlog to capture data changes in a transactional manner, so you can be sure that you’re always working with the most up-to-date data. By using Debezium, you can capture the changes made to the MySQL database and stream them to Kafka. Data on the changes can then be consumed by Kafka Connect to load the data into BigQuery. BigQuery setup

    Creating a BigQuery project and dataset:

      In the Google Cloud Console, navigate to the BigQuery page and create a new project (Creating and managing projects | Resource Manager Documentation | Google Cloud). We will name it “mysql-bigquery” for this tutorial. Within the project, create a new dataset (Creating datasets | BigQuery | Google Cloud). We will name it “debezium” for this tutorial. Note that Debezium will automatically create tables in the dataset that match the structure of the MySQL tables being monitored.

    Creating a GCP service account with BigQuery editor role:

      In the Google Cloud Console, navigate to the IAM & Admin page and create a new service account (Creating and managing service accounts | IAM Documentation | Google Cloud). Give the service account a name and description, then select the “BigQuery Data Editor” role.

    Generating and downloading a key for the service account:

      In the Google Cloud Console, navigate to the IAM & Admin page, find the service account, and click on the three dots on the right, then select “create key” (Create and manage service account keys | IAM Documentation | Google Cloud). Select JSON as the key type and download the key file. Store the key file securely and use it to authenticate the connector in Kafka Connect when accessing the BigQuery dataset.

{SHORTCODES.blogRelatedArticles} Tutorial To start syncing data from MySQL to BigQuery we will need following components: Apache Zookeeper. Apache Kafka. Kafka Connect/Debezium service with MySQL connector and Google BigQuery connector plugins. MySQL database. Start required services

    Let’s start with creating a new directory. Open Terminal and run:

     {` $ mkdir mysql-to-bigquery $ cd mysql-to-bigquery `} 

    Create a plugins directory

     {` $ mkdir plugins `} 

    Download Debezium mysql plugin:

     {` $ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.1.Final/debezium-connector-mysql-2.1.1.Final-plugin.tar.gz -O mysql-plugin.tar.gz $ tar -xzf mysql-plugin.tar.gz -C plugins `} 

    Download BigQuery plugin and put the contents into your plugins directory (in this tutorial we are using version v2.4.3). Now your plugins directory should look like this:

     {` $ ls plugins debezium-connector-mysql wepay-kafka-connect-bigquery-2.4.3 `} 

    Create a new file (“docker-compose.yml”) with these configurations:

     {` version: '2' services: zookeeper: container_name: zookeeper image: quay.io/debezium/zookeeper:2.1 ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: container_name: kafka image: quay.io/debezium/kafka:2.1 ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 mysql: container_name: mysql image: quay.io/debezium/example-mysql:2.1 ports: - 3306:3306 environment: - MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw connect: container_name: connect image: quay.io/debezium/connect-base:2.1 volumes: - ./plugins:/kafka/connect ports: - 8083:8083 links: - kafka - mysql environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses `} 

    Let’s start the services:

     {` $ docker-compose up `} 

    You should see an output similar to the following:

     {` ... 2023-01-16 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser] ... 2023-01-16 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2023-01-16 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder] `} 

    Check if Debezium is running with Kafka Connect API. An empty array in response shows that there are no connectors currently registered with Kafka Connect.

     {` $ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors [] `} 

    We also have MySQL running with an example database inventory. You can check what tables are there by running:

     {` $ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "SHOW TABLES;" +---------------------+ | Tables_in_inventory | +---------------------+ | addresses | | customers | | geom | | orders | | products | | products_on_hand | +---------------------+ `} 

Let’s check what’s inside the customers table:

 {` $ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "SELECT * FROM customers;" +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ `} 

Configure Debezium to start syncing MySQL to Kafka Now let’s configure Debezium to start syncing the inventory database with Kafka.

    Create a new file (“register-mysql.json”) with these configurations (You can find information about these configuration properties in the Debezium documentation):

     {` { "name": "inventory-connector-mysql", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "debezium", "database.server.id": "184054", "topic.prefix": "debezium", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } } `} 

    Register a MySQL connector:

     {` $ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json `} 

    Verify that “inventory-connector” is included in the list of connectors:

     {` $ curl -H "Accept:application/json" localhost:8083/connectors/ ["inventory-connector-mysql"] `} 

    You can now see database contents in Kafka. To see topics, run:

     {` $ docker exec -it kafka bash bin/kafka-topics.sh --list --bootstrap-server kafka:9092 ... debezium.inventory.addresses debezium.inventory.customers ... `} 

Let’s check debezium.inventory.addresses:

 {` $ docker exec -it kafka bash bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic debezium.inventory.addresses --from-beginning {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"debezium.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"debezium.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"debezium.inventory.addresses.Envelope","version":1},"payload":{"before":null,"after":{"id":10,"customer_id":1001,"street":"3183 Moore Avenue","city":"Euless","state":"Texas","zip":"76036","type":"SHIPPING"},"source":{"version":"2.1.1.Final","connector":"mysql","name":"debezium","ts_ms":1673446748000,"snapshot":"first","db":"inventory","sequence":null,"table":"addresses","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1673446748425,"transaction":null}} ... `} 

For more information on Debezium events, see this Debezium documentation. Configure Debezium to start syncing data to Google BigQuery Before you start configuring the BigQuery connector, move the Google BigQuery service account key file (details in previous section) to your working directory and name it “bigquery-keyfile.json”.

    Once you have the key file, copy it to the Connect container:

     {` $ docker cp bigquery-keyfile.json connect:/bigquery-keyfile.json `} 

    Now create a file register-bigquery.json with these configurations (You can find information about these configuration properties in the official documentation):

     {` { "name": "inventory-connector-bigquery", "config": { "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "tasks.max": "1", "consumer.auto.offset.reset": "earliest", "topics.regex": "debezium.inventory.*", "sanitizeTopics": "true", "autoCreateTables": "true", "keyfile": "/bigquery-keyfile.json", "schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever", "project": "mysql-bigquery", "defaultDataset": "debezium", "allBQFieldsNullable": true, "allowNewBigQueryFields": true, "transforms": "regexTopicRename,extractAfterData", "transforms.regexTopicRename.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.regexTopicRename.regex": "debezium.inventory.(.*)", "transforms.regexTopicRename.replacement": "$1", "transforms.extractAfterData.type": "io.debezium.transforms.ExtractNewRecordState" } } `} 

    To register the BigQuery connector, run:

     {` $ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-bigquery.json `} 

    Verify that “inventory-connector” is included in the list of connectors:

     {` $ curl -H "Accept:application/json" localhost:8083/connectors/ ["inventory-connector-mysql","inventory-connector-bigquery"] `} 

In your BigQuery dataset, you will now be able to see tables matching those in MySQL. Now, select data from your customers table. Emails used are for example purposes only and do not correspond to real individuals. You can create a new entry in MySQL customers table:

 {` $ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "INSERT INTO customers VALUES(1005, \"Tom\", \"Addams\", \"tom.addams@mailer.net\");" `} 

You will see that a new entry has automatically synced to BigQuery. Conclusion You should now have a clear understanding of the benefits of syncing data from MySQL to BigQuery using Debezium and Kafka Connect. With the detailed tutorial found in this article, you will be able to set up and configure Debezium and Kafka Connect yourself. As a reminder, it’s important to test and monitor the pipeline to ensure that data is being synced as expected and to troubleshoot any issues that may arise. For more information on Debezium and Kafka Connect, you can refer to the following resources: Debezium documentation Kafka Connect documentation BigQuery connector documentation

The post How to sync data from MySQL to Google BigQuery using Debezium and Kafka Connect first appeared on NordVPN.

 

Leave a Reply

Your email address will not be published. Required fields are marked *