How to sync data from MySQL to Google BigQuery using Debezium and Kafka Connect
{` $ mkdir mysql-to-bigquery $ cd mysql-to-bigquery `}
{` $ mkdir plugins `}
{` $ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.1.Final/debezium-connector-mysql-2.1.1.Final-plugin.tar.gz -O mysql-plugin.tar.gz $ tar -xzf mysql-plugin.tar.gz -C plugins `}
{` $ ls plugins debezium-connector-mysql wepay-kafka-connect-bigquery-2.4.3 `}
{` version: '2' services: zookeeper: container_name: zookeeper image: quay.io/debezium/zookeeper:2.1 ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: container_name: kafka image: quay.io/debezium/kafka:2.1 ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 mysql: container_name: mysql image: quay.io/debezium/example-mysql:2.1 ports: - 3306:3306 environment: - MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw connect: container_name: connect image: quay.io/debezium/connect-base:2.1 volumes: - ./plugins:/kafka/connect ports: - 8083:8083 links: - kafka - mysql environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses `}
{` $ docker-compose up `}
{` ... 2023-01-16 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser] ... 2023-01-16 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2023-01-16 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder] `}
{` $ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors [] `}
{` $ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "SHOW TABLES;" +---------------------+ | Tables_in_inventory | +---------------------+ | addresses | | customers | | geom | | orders | | products | | products_on_hand | +---------------------+ `}
{` $ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "SELECT * FROM customers;" +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ `}
{` { "name": "inventory-connector-mysql", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "debezium", "database.server.id": "184054", "topic.prefix": "debezium", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } } `}
{` $ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json `}
{` $ curl -H "Accept:application/json" localhost:8083/connectors/ ["inventory-connector-mysql"] `}
{` $ docker exec -it kafka bash bin/kafka-topics.sh --list --bootstrap-server kafka:9092 ... debezium.inventory.addresses debezium.inventory.customers ... `}
{` $ docker exec -it kafka bash bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic debezium.inventory.addresses --from-beginning {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"debezium.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"debezium.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"debezium.inventory.addresses.Envelope","version":1},"payload":{"before":null,"after":{"id":10,"customer_id":1001,"street":"3183 Moore Avenue","city":"Euless","state":"Texas","zip":"76036","type":"SHIPPING"},"source":{"version":"2.1.1.Final","connector":"mysql","name":"debezium","ts_ms":1673446748000,"snapshot":"first","db":"inventory","sequence":null,"table":"addresses","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1673446748425,"transaction":null}} ... `}
{` $ docker cp bigquery-keyfile.json connect:/bigquery-keyfile.json `}
{` { "name": "inventory-connector-bigquery", "config": { "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "tasks.max": "1", "consumer.auto.offset.reset": "earliest", "topics.regex": "debezium.inventory.*", "sanitizeTopics": "true", "autoCreateTables": "true", "keyfile": "/bigquery-keyfile.json", "schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever", "project": "mysql-bigquery", "defaultDataset": "debezium", "allBQFieldsNullable": true, "allowNewBigQueryFields": true, "transforms": "regexTopicRename,extractAfterData", "transforms.regexTopicRename.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.regexTopicRename.regex": "debezium.inventory.(.*)", "transforms.regexTopicRename.replacement": "$1", "transforms.extractAfterData.type": "io.debezium.transforms.ExtractNewRecordState" } } `}
{` $ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-bigquery.json `}
{` $ curl -H "Accept:application/json" localhost:8083/connectors/ ["inventory-connector-mysql","inventory-connector-bigquery"] `}
{` $ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "INSERT INTO customers VALUES(1005, \"Tom\", \"Addams\", \"tom.addams@mailer.net\");" `}
The post How to sync data from MySQL to Google BigQuery using Debezium and Kafka Connect first appeared on NordVPN.
developers development Engineering engineering blog mysql NordVPN expertise