Changefeeds can stream change data to Amazon MSK clusters (Amazon Managed Streaming for Apache Kafka) using AWS IAM roles or SASL/SCRAM authentication to connect to the MSK cluster.
In this tutorial, you'll set up an MSK cluster and connect a changefeed with either IAM or SCRAM authentication:
- For IAM authentication, you'll create the MSK cluster with an IAM policy and role. CockroachDB and a Kafka client will assume the IAM role in order to connect to the MSK cluster. Then, you'll set up the Kafka client to consume the changefeed messages and start the changefeed on the CockroachDB cluster.
- For SCRAM authentication, you'll create the MSK cluster and then store your SCRAM credentials in AWS Secrets Manager. You'll set up the Kafka client configuration and consume the changefeed messages from the CockroachDB cluster.
CockroachDB changefeeds also support IAM authentication to MSK Serverless clusters. For a setup guide, refer to Stream a Changefeed to Amazon MSK Serverless.
Before you begin
You'll need:
- An AWS account.
- A CockroachDB Self-Hosted cluster hosted on AWS. You can set up a cluster using Deploy CockroachDB on AWS EC2. You must create instances in the same VPC that the MSK cluster will use in order for the changefeed to authenticate successfully.
- A Kafka client to consume the changefeed messages. You must ensure that your client machine is in the same VPC as the MSK cluster. This tutorial uses a client set up following the AWS MSK guide.
- The
CHANGEFEED
privilege in order to create and manage changefeed jobs. Refer to Required privileges for more details.
If you would like to connect a changefeed running on a CockroachDB Dedicated cluster to an Amazon MSK Serverless cluster, contact your Cockroach Labs account team.
Select the authentication method that you'll use to connect the changefeed to your MSK cluster:
Step 1. Create an MSK cluster with IAM authentication
- In the AWS Management Console, go to the Amazon MSK console and click Create cluster.
- Select Custom create, name the cluster, and select Provisioned as the cluster type. Click Next.
- Select the VPC for the MSK cluster with the subnets and security group. The VPC selection is important because the MSK cluster must be in the same VPC as the CockroachDB instance and Kafka client machine. Click Next.
- Under Access control methods select IAM role-based authentication. Click Next.
- Continue to select the required configuration options for your cluster. Click Next.
- Review the cluster details, and then click Create cluster.
- Once the cluster is running, click View client information in the Cluster summary box. Copy the endpoint addresses, which will be similar to
b-1.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9098,b-2.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9098,b-3.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9098
. Click Done to return to the cluster's overview page.
Step 2. Create an IAM policy and role to access the MSK cluster
In this step, you'll create an IAM policy that contains the permissions to interact with the MSK cluster. Then, you'll create an IAM role, which you'll associate with the IAM policy. In a later step, both the CockroachDB cluster and Kafka client machine will use this role to work with the MSK cluster.
- In the AWS Management Console, go to the IAM console, select Policies from the navigation, and then Create Policy.
Using the JSON tab option, update the policy with the following JSON. These permissions will allow you to connect to the cluster, manage topics, and consume messages. You may want to adjust the permissions to suit your permission model. For more details on the available permissions, refer to the AWS documentation on IAM Access Control for MSK.
Replace the instances of
arn:aws:kafka:{region}:{account ID}:cluster/{msk-cluster-name}
with the MSK ARN from your cluster's summary page and add/*
to the end, like the following:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster" ], "Resource": [ "arn:aws:kafka:{region}:{account ID}:cluster/{msk-cluster-name}/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:{region}:{account ID}:cluster/{msk-cluster-name}/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:{region}:{account ID}:cluster/{msk-cluster-name}/*" ] } ] }
Once you have added your policy, add a policy name (for example,
msk-policy
), click Next, and Create policy.Return to the IAM console, select Roles from the navigation, and then Create role.
Select AWS service for the Trusted entity type. For Use case, select EC2 from the dropdown. Click Next.
On the Add permissions page, search for the IAM policy (
msk-policy
) you just created. Click Next.Name the role (for example,
msk-role
) and click Create role.
Step 3. Set up the CockroachDB cluster role
In this step, you'll create a role, which contains the sts:AssumeRole
permission, for the EC2 instance that is running your CockroachDB cluster. The sts:AssumeRole
permission will allow the EC2 instance to obtain temporary security credentials to access the MSK cluster according to the msk-policy
permissions. To achieve this, you'll add the EC2 role to the trust relationship of the msk-role
you created in the previous step.
- Navigate to the IAM console, select Roles from the navigation, and then Create role.
- Select AWS service for the Trusted entity type. For Use case, select EC2 from the dropdown. Click Next.
- On the Add permissions page, click Next.
- Name the role (for example,
ec2-role
) and click Create role. Once the role has finished creating, copy the ARN in the Summary section. Click on the Trust relationships tab. You'll find a Trusted entities policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "ec2.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
Navigate to the IAM console and search for the role (
msk-role
) you created in Step 2 that contains the MSK policy. Select the role, which will take you to its summary page.Click on the Trust relationships tab, and click Edit trust policy. Add the ARN of the EC2 IAM role (
ec2-role
) to the JSON policy:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "ec2.amazonaws.com", "AWS": "arn:aws:iam::{account ID}:role/{ec2-role}" }, "Action": "sts:AssumeRole" } ] }
Once you've updated the policy, click Update policy.
Step 4. Connect the client to the MSK cluster
In this step, you'll prepare the client to connect to the MSK cluster, create a Kafka topic, and consume messages that the changefeed sends.
- Ensure that your client can connect to the MSK cluster. This tutorial uses an EC2 instance running Kafka as the client. Navigate to the summary page for the client EC2 instance. Click on the Actions dropdown. Click Security, and then select Modify IAM role.
- On the Modify IAM role page, select the role you created for the MSK cluster (
msk-role
) that contains the policy created in Step 2. Click Update IAM role. Open a terminal and connect to your Kafka client. Check that the
client.properties
file in your Kafka installation contains the correct SASL and security configuration, like the following:security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
If you need further detail on setting up the Kafka client, refer to the AWS setup guide.
Move to the directory of your Kafka installation:
cd kafka_2.12-2.8.1/bin
To create a topic, run the following:
~/kafka_2.12-2.8.1/bin/kafka-topics.sh --bootstrap-server {msk_endpoint} --command-config client.properties --create --topic {users} --partitions {1} --replication-factor {3}
Replace:
{msk_endpoint}
with your endpoint copied in Step 1.{users}
with your topic name. This tutorial will use the CockroachDBmovr
workload and will run a changefeed on themovr.users
table.{1}
with the number of partitions you require.{3}
with the replication you require.
You will receive confirmation output:
Created topic users.
Step 5. Start the changefeed
In this step, you'll prepare your CockroachDB cluster to start the changefeed.
(Optional) On the EC2 instance running CockroachDB, run the Movr application workload to set up some data for your changefeed.
Create the schema for the workload:
cockroach workload init movr
Then run the workload:
cockroach workload run movr --duration=1m
Start a SQL session. For details on the available flags, refer to the
cockroach sql
page.cockroach sql --insecure
Note:To set your Enterprise license, refer to the Licensing FAQs page.
Enable the
kv.rangefeed.enabled
cluster setting:SET CLUSTER SETTING kv.rangefeed.enabled = true;
To connect the changefeed to the MSK cluster, the URI must contain the following parameters:
- An MSK cluster endpoint prefixed with the
kafka://
scheme, for example:kafka://b-1.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9098
. tls_enabled
set totrue
.sasl_enabled
set totrue
.sasl_mechanism
set toAWS_MSK_IAM
.sasl_aws_region
set to the region of the MSK cluster.sasl_aws_iam_role_arn
set to the ARN for the IAM role (msk-role
) that has the permissions outlined in Step 2.sasl_aws_iam_session_name
set to a string that you specify to identify the session in AWS.
'kafka://b-1.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9098/?tls_enabled=true&sasl_enabled=true&sasl_mechanism=AWS_MSK_IAM&sasl_aws_region=us-east-1&sasl_aws_iam_role_arn=arn:aws:iam::{account ID}:role/{msk-role}&sasl_aws_iam_session_name={user-specified session name}'
You can either specify the Kafka URI in the
CREATE CHANGEFEED
statement directly. Or, create an external connection for the MSK URI.External connections define a name for an external connection while passing the provider URI and query parameters:
CREATE EXTERNAL CONNECTION msk AS 'kafka://b-1.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9098/?tls_enabled=true&sasl_enabled=true&sasl_mechanism=AWS_MSK_IAM&sasl_aws_region=us-east-1&sasl_aws_iam_role_arn=arn:aws:iam::{account ID}:role/{msk-role}&sasl_aws_iam_session_name={user-specified session name}';
- An MSK cluster endpoint prefixed with the
Use the
CREATE CHANGEFEED
statement to start the changefeed using either the external connection (external://
) or fullkafka://
URI:CREATE CHANGEFEED FOR TABLE movr.users INTO `external://msk` WITH resolved;
job_id ----------------------- 1002677216020987905
To view a changefeed job, use
SHOW CHANGEFEED JOBS
.
Step 6. Consume the changefeed messages on the client
Return to the terminal that is running the Kafka client. Move to the Kafka installation directory:
cd kafka_2.12-2.8.1/bin
Run the following command to start a consumer. Set
--topic
to the topic you created:~/kafka_2.12-2.8.1/bin/kafka-console-consumer.sh --bootstrap-server {msk_endpoint} --consumer.config client.properties --topic users --from-beginning
{"after": {"address": "49665 Green Highway", "city": "amsterdam", "credit_card": "0762957951", "id": "10acc68c-5cea-4d32-95db-3254b8a1170e", "name": "Jimmy Gutierrez"}} {"after": {"address": "1843 Brandon Common Apt. 68", "city": "amsterdam", "credit_card": "3414699744", "id": "53d95b9a-abf3-4af2-adc8-92d4ee026327", "name": "James Hunt"}} {"after": {"address": "87351 David Ferry Suite 24", "city": "amsterdam", "credit_card": "7689751883", "id": "58f66df9-e2ef-48bf-bdbe-436e8caa0fae", "name": "Grant Murray"}} {"after": {"address": "35991 Tran Flats", "city": "amsterdam", "credit_card": "6759782818", "id": "6e8d430d-9a3b-4519-b7ab-987d21043f6a", "name": "Mr. Alan Powers"}} {"after": {"address": "65320 Emily Ports", "city": "amsterdam", "credit_card": "7424361516", "id": "74e8c91b-9534-4e40-9d19-f23e14d24114", "name": "Michele Grant"}} {"after": {"address": "85363 Gary Square Apt. 39", "city": "amsterdam", "credit_card": "0267354734", "id": "99d2c816-2216-40f3-b60c-f19bc3e9f455", "name": "Mrs. Wendy Miller"}} {"after": {"address": "68605 Shane Shores Suite 22", "city": "amsterdam", "credit_card": "5913104602", "id": "ae147ae1-47ae-4800-8000-000000000022", "name": "Crystal Sullivan"}} {"after": {"address": "41110 Derrick Walk Apt. 42", "city": "amsterdam", "credit_card": "2866469885", "id": "b3333333-3333-4000-8000-000000000023", "name": "Michael Lewis"}} {"after": {"address": "47781 Robinson Villages Apt. 41", "city": "amsterdam", "credit_card": "6596967781", "id": "b40e7c51-7e68-43f1-a4df-92fa8a05c961", "name": "Abigail Sellers"}} ... {"resolved":"1725982589714395510.0000000000"} ...
Step 1. Create an MSK cluster with SCRAM authentication
- In the AWS Management Console, go to the Amazon MSK console and click Create cluster.
- Select Custom create, name the cluster, and select Provisioned as the cluster type. Click Next.
- Select the VPC for the MSK cluster with the subnets and security group. The VPC selection is important because the MSK cluster must be in the same VPC as the CockroachDB instance and Kafka client machine. Click Next.
- Under Access control methods select SASL/SCRAM authentication. Click Next.
- Continue to select the required configuration options for your cluster. Click Next.
- Review the cluster details, and then click Create cluster.
- Once the cluster is running, click View client information in the Cluster summary box. Copy the endpoint addresses, which will be similar to
b-1.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9096,b-2.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9096,b-3.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9096
. Click Done to return to the cluster's overview page.
Step 2. Store the SCRAM credentials
In this step, you'll store the SCRAM credentials in AWS Secrets Manager and then associate the secret with the MSK cluster.
- In the AWS Management Console, go to the Amazon Secrets Manager console and click Store a new secret.
- For Secret type, select Other type of secret.
In the Key/value pairs box, enter the user and password in Plaintext in the same format as the following:
{ "username": "your_username", "password": "your_password" }
Select or add a new encryption key. (You cannot use the default AWS KMS key with an Amazon MSK cluster.) Click Next.
Add the Secret name, you must prefix the name with
AmazonMSK_
.After selecting any other relevant configuration for your secret, click Store to complete.
Copy the Secret ARN for your secret on the Secret details page.
Return to your MSK cluster in the Amazon MSK console.
Click on the Properties tab and find the Security settings. Under SASL/SCRAM authentication, click Associate secrets and paste the ARN of your secret. Click Associate secrets.
Step 3. Set up the SCRAM authentication on the client
In this step, you'll configure the Kafka client for SASL/SCRAM authentication and create a Kafka topic.
Open a terminal window and connect to your Kafka client. Check that your
client.properties
file contains the correct SASL/SCRAM, security configuration, and your SASL username and password, like the following:security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="your_username" \ password="your_password";
Create an environment variable for your broker endpoints:
export brokers=b-3.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9096,b-1.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9096,b-2.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9096
Move to the directory of your Kafka installation:
cd kafka_2.12-2.8.1/bin
To allow the user to interact with Kafka, grant them permission with the Kafka ACL. Replace
your_username
in the following:./kafka-acls.sh --bootstrap-server $brokers --add --allow-principal User:{your_username} --operation All --cluster --command-config client.properties
For more details on permissions and ACLs, refer to Use ACLs.
Create a topic:
./kafka-topics.sh --create --bootstrap-server $brokers --replication-factor {3} --partitions {1} --topic {users} --command-config client.properties
Replace:
{users}
with your topic name. This tutorial will use the CockroachDBmovr
workload and will run a changefeed on themovr.users
table.{3}
with the replication factor you require.{1}
with the number of partitions you require.
You will receive confirmation output:
Created topic users.
Step 4. Start the changefeed
In this step, you'll prepare your CockroachDB cluster to start the changefeed.
(Optional) On the EC2 instance running CockroachDB, run the Movr application workload to set up some data for your changefeed.
Create the schema for the workload:
cockroach workload init movr
Then run the workload:
cockroach workload run movr --duration=1m
Start a SQL session. For details on the available flags, refer to the
cockroach sql
page.cockroach sql --insecure
Note:To set your Enterprise license, refer to the Licensing FAQs page.
Enable the
kv.rangefeed.enabled
cluster setting:SET CLUSTER SETTING kv.rangefeed.enabled = true;
To connect the changefeed to the MSK cluster, the URI in the changefeed statement must contain the following parameters:
- One of the MSK cluster endpoints prefixed with the
kafka://
scheme, for example:kafka://b-3.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9096
. tls_enabled
set totrue
.sasl_enabled
set totrue
.sasl_mechanism
set toSCRAM-SHA-512
.sasl_user
set toyour_username
.sasl_password
set toyour_password
.
'kafka://b-3.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9096?tls_enabled=true&sasl_enabled=true&sasl_user={your_username}&sasl_password={your_password}-secret&sasl_mechanism=SCRAM-SHA-512'
You can either specify the Kafka URI in the
CREATE CHANGEFEED
statement directly. Or, create an external connection for the MSK URI.External connections define a name for an external connection while passing the provider URI and query parameters:
CREATE EXTERNAL CONNECTION msk AS 'kafka://b-3.msk-cluster_name.1a2b3c.c4.kafka.us-east-1.amazonaws.com:9096?tls_enabled=true&sasl_enabled=true&sasl_user={your_username}&sasl_password={your_password}-secret&sasl_mechanism=SCRAM-SHA-512';
- One of the MSK cluster endpoints prefixed with the
Use the
CREATE CHANGEFEED
statement to start the changefeed using either the external connection (external://
) or fullkafka://
URI:CREATE CHANGEFEED FOR TABLE movr.users INTO `external://msk` WITH resolved;
job_id ----------------------- 1002677216020987905
To view a changefeed job, use
SHOW CHANGEFEED JOBS
.
Step 5. Consume the changefeed messages
Return to the terminal that is running the Kafka client. Move to the Kafka installation directory:
cd kafka_2.12-2.8.1/bin
Run the following command to start a consumer. Set
--topic
to the topic you created in Step 3:./kafka-console-consumer.sh --bootstrap-server $brokers --consumer.config client.properties --topic users --from-beginning
{"after": {"address": "49665 Green Highway", "city": "amsterdam", "credit_card": "0762957951", "id": "10acc68c-5cea-4d32-95db-3254b8a1170e", "name": "Jimmy Gutierrez"}} {"after": {"address": "1843 Brandon Common Apt. 68", "city": "amsterdam", "credit_card": "3414699744", "id": "53d95b9a-abf3-4af2-adc8-92d4ee026327", "name": "James Hunt"}} {"after": {"address": "87351 David Ferry Suite 24", "city": "amsterdam", "credit_card": "7689751883", "id": "58f66df9-e2ef-48bf-bdbe-436e8caa0fae", "name": "Grant Murray"}} {"after": {"address": "35991 Tran Flats", "city": "amsterdam", "credit_card": "6759782818", "id": "6e8d430d-9a3b-4519-b7ab-987d21043f6a", "name": "Mr. Alan Powers"}} {"after": {"address": "65320 Emily Ports", "city": "amsterdam", "credit_card": "7424361516", "id": "74e8c91b-9534-4e40-9d19-f23e14d24114", "name": "Michele Grant"}} {"after": {"address": "85363 Gary Square Apt. 39", "city": "amsterdam", "credit_card": "0267354734", "id": "99d2c816-2216-40f3-b60c-f19bc3e9f455", "name": "Mrs. Wendy Miller"}} {"after": {"address": "68605 Shane Shores Suite 22", "city": "amsterdam", "credit_card": "5913104602", "id": "ae147ae1-47ae-4800-8000-000000000022", "name": "Crystal Sullivan"}} {"after": {"address": "41110 Derrick Walk Apt. 42", "city": "amsterdam", "credit_card": "2866469885", "id": "b3333333-3333-4000-8000-000000000023", "name": "Michael Lewis"}} {"after": {"address": "47781 Robinson Villages Apt. 41", "city": "amsterdam", "credit_card": "6596967781", "id": "b40e7c51-7e68-43f1-a4df-92fa8a05c961", "name": "Abigail Sellers"}} ... {"resolved":"1725982589714395510.0000000000"} ...
See also
For more resources, refer to the following:
- Changefeed Sinks page for details on parameters that sinks support.
- Monitor and Debug Changefeeds for details on monitoring the changefeed job.