Kafka
Topic
Maxwell writes to a kafka topic named "maxwell" by default. It is configurable
via --kafka_topic
. The given topic can be a plain string or a dynamic
string, e.g. namespace_%{database}_%{table}
, where the topic will be
generated from data in the row.
Client version
By default, maxwell uses the kafka 1.0.0 library. The --kafka_version
flag
lets you choose an alternate library version: 0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.2.1 or
0.11.0.1, 1.0.0. This flag is only available on the command line.
- The 0.8.2.2 client is only compatible with brokers running kafka 0.8.
- The 0.10.0.x client is only compatible with brokers 0.10.0.x or later.
- Mixing the 0.10 client with other versions can lead to serious performance impacts. For More details, read about it here.
- The 0.11.0 client can talk to version 0.10.0 or newer brokers.
- The 0.9.0.1 client is not compatible with brokers running kafka 0.8. The exception below will show in logs when that is the case:
ERROR Sender - Uncaught error in kafka producer I/O thread:
SchemaException: Error reading field 'throttle_time_ms': java.nio.BufferUnderflowException
Passing options to kafka
Any options present in config.properties
that are prefixed with kafka.
will
be passed into the Kafka producer library (with kafka.
stripped off, see below for examples). We use
the "new producer" configuration, as described here:
http://kafka.apache.org/documentation.html#newproducerconfigs
Example kafka configs
Highest throughput
These properties would give high throughput performance.
kafka.acks = 1
kafka.compression.type = snappy
kafka.retries=0
Most reliable
For at-least-once delivery, you will want something more like:
kafka.acks = all
kafka.retries = 5 # or some larger number
And you will also want to set min.insync.replicas
on Maxwell's output topic.
Key format
Maxwell generates keys for its Kafka messages based upon a mysql row's primary key in JSON format:
{ "database":"test_tb","table":"test_tbl","pk.id":4,"pk.part2":"hello"}
This key is designed to co-operate with Kafka's log compaction, which will save the last-known value for a key, allowing Maxwell's Kafka stream to retain the last-known value for a row and act as a source of truth.
Partitioning
Both Kafka and AWS Kinesis support the notion of partitioned streams.
Because they like to make our lives hard, Kafka calls its two units "topics"
and "partitions", and Kinesis calls them "streams" and "shards. They're the
same thing, though. Maxwell is generally configured to write to N
partitions/shards on one topic/stream, and how it distributes to those N
partitions/shards can be controlled by producer_partition_by
.
producer_partition_by
gives you a choice of splitting your stream by database, table,
primary key, transaction id, column data, or "random". How you choose
to partition your stream greatly influences the load and serialization properties
of your downstream consumers, so choose carefully. A good rule of thumb is to
use the finest-grained partition scheme possible given serialization
constraints.
A brief partitioning digression:
If I were building, say, a simple search index of a single table, I might choose to partition by primary key; this would give you the best distribution of workload amongst your stream processors while maintaining a strict ordering of updates that happen to a certain row.
If I were building something that needed better serialization properties -- let's say I needed to maintain strict ordering between updates that occured on different tables -- I would drop back to partitioning by table or database. This will reduce my throughput by a lot as a single stream consumer node will end up will all the load for particular table/database, but I'm guaranteed that the updates stay in order.
If you choose to partition by column data (that is, values inside columns in your updates), you must set both:
producer_partition_columns
- a comma-separated list of column names, andproducer_partiton_by_fallback
- [database, table, primary_key] - this will be used as the partition key when the column does not exist.
When partitioning by column Maxwell will treat the values for the specified columns as strings, concatenate them and use that value to partition the data.
Kafka partitioning
A binlog event's partition is determined by the selected hash function and hash string as follows
HASH_FUNCTION(producer_partion_value) % TOPIC.NUMBER_OF_PARTITIONS
The HASH_FUNCTION is either java's hashCode or murmurhash3. The default
HASH_FUNCTION is hashCode. Murmurhash3 may be set with the
kafka_partition_hash
option. The seed value for the murmurhash function is
hardcoded to 25342 in the MaxwellKafkaPartitioner class. We tell you this
in case you need to reverse engineer where a row will land.
Maxwell will discover the number of partitions in its kafka topic upon boot. This means that you should pre-create your kafka topics:
bin/kafka-topics.sh --zookeeper ZK_HOST:2181 --create \
--topic maxwell --partitions 20 --replication-factor 2
http://kafka.apache.org/documentation.html#quickstart
Kinesis
AWS Credentials
You will need to obtain an IAM user that has the following permissions for the stream you are planning on producing to:
- "kinesis:PutRecord"
- "kinesis:PutRecords"
- "kinesis:DescribeStream"
Additionally, the producer will need to be able to produce CloudWatch metrics which requires the following permission applied to the resource `*``: - "cloudwatch:PutMetricData"
The resulting IAM policy document may look like this:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:us-west-2:123456789012:stream/my-stream"
},
{
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}
See the AWS docs for the latest examples on which permissions are needed.
The producer uses the DefaultAWSCredentialsProviderChain class to gain aws credentials. See the AWS docs on how to setup the IAM user with the Default Credential Provider Chain.
Options
Set the output stream in config.properties
by setting the kinesis_stream
property.
The producer uses the KPL (Kinesis Producer Library) and uses the KPL built in configurations.
Copy kinesis-producer-library.properties.example
to kinesis-producer-library.properties
and configure the properties file to your needs.
You are required to configure the region. For example:
# set explicitly
Region=us-west-2
# or set with an environment variable
Region=$AWS_DEFAULT_REGION
By default, the KPL implements record aggregation, which usually increases producer throughput by allowing you to increase the number of records sent per API call. However, aggregated records are encoded differently (using Google Protocol Buffers) than records that are not aggregated. Therefore, if you are not using the KCL (Kinesis Client Library) to consume records (for example, you are using AWS Lambda) you will need to either disaggregate the records in your consumer (for example, by using the AWS Kinesis Aggregation library), or disable record aggregation in your kinesis-producer-library.properties
configuration.
To disable aggregation, add the following to your configuration:
AggregationEnabled=false
Remember: if you disable record aggregation, you will lose the benefit of potentially greater producer throughput.
SQS
AWS Credentials
You will need to obtain an IAM user that has the permission to access the SQS service. The SQS producer also uses DefaultAWSCredentialsProviderChain to get AWS credentials.
See the AWS docs on how to setup the IAM user with the Default Credential Provider Chain.
In case you need to set up a different region also along with credentials then default one, see the AWS docs.
Options
Set the output queue in the config.properties
by setting the following properites
- sqs_signing_region: the region to use for SigV4 signing of requests. e.g.
us-east-1
- sqs_service_endpoint: the service endpoint either with or without the protocol (e.g.
https://sns.us-west-1.amazonaws.com
orsns.us-west-1.amazonaws.com
) - sqs_queue_uri: the full SQS queue uri from AWS console. e.g.
https://sqs.us-east-1.amazonaws.com/xxxxxxxxxxxx/maxwell
The producer uses the AWS SQS SDK.
SNS
AWS Credentials
You will need to obtain an IAM user that has the permission to access the SNS topic. The SNS producer also uses DefaultAWSCredentialsProviderChain to get AWS credentials.
See the AWS docs on how to setup the IAM user with the Default Credential Provider Chain.
In case you need to set up a different region also along with credentials then default one, see the AWS docs.
Options
Set the topic arn in the config.properties
by setting the sns_topic
property to the topic name. FIFO topics should have a .fifo
suffix.
Optionally, you can enable sns_attrs
to have maxwell attach various attributes to the message for subscription filtering. (Only database
and table
are currently supported)
The producer uses the AWS SNS SDK.
Nats
The configurable properties for nats are:
nats_url
- defaults to nats://localhost:4222nats_subject
- defaults to %{database}.%{table}
nats_subject
defines the Nats subject hierarchy to write to. Topic substitution is available.
All non-alphanumeric characters in the substitued values will be replaced by underscores.
Google Cloud Pub/Sub
In order to publish to Google Cloud Pub/Sub, you will need to obtain an IAM service account that has been granted the roles/pubsub.publisher
role.
See the Google Cloud Platform docs for the latest examples of which permissions are needed, as well as how to properly configure service accounts.
Set the output stream in config.properties
by setting the pubsub_project_id
and pubsub_topic
properties. Optionally configure a dedicated output topic
for DDL updates by setting the ddl_pubsub_topic
property.
The producer uses the Google Cloud Java Library for Pub/Sub and uses its built-in configurations.
Google Cloud BigQuery
To stream data into Google Cloud Bigquery, first there must be a table created on bigquery in order to stream the data
into defined as bigquery_project_id.bigquery_dataset.bigquery_table
. The schema of the table must match the outputConfig. The column types should be defined as below
- database: string
- table: string
- type: string
- ts: integer
- xid: integer
- xoffset: integer
- commit: boolean
- position: string
- gtid: string
- server_id: integer
- primary_key: string
- data: string
- old: string
See the Google Cloud Platform docs for the latest examples of which permissions are needed, as well as how to properly configure service accounts.
Set the output stream in config.properties
by setting the bigquery_project_id
, bigquery_dataset
and bigquery_table
properties.
The producer uses the Google Cloud Java Bigquery Storage Library for Bigquery Bigquery Storage Write API documenatation.
To use the Storage Write API, you must have bigquery.tables.updateData
permissions.
This producer is using the Default Stream with at-least once semantics for greater data resiliency and fewer scaling restrictions
RabbitMQ
To produce messages to RabbitMQ, you will need to specify a host in config.properties
with rabbitmq_host
. This is the only required property, everything else falls back to a sane default.
The remaining configurable properties are:
rabbitmq_user
- defaults to guestrabbitmq_pass
- defaults to guestrabbitmq_virtual_host
- defaults to /rabbitmq_handshake_timeout
- defaults to 10000rabbitmq_exchange
- defaults to maxwellrabbitmq_exchange_type
- defaults to fanoutrabbitmq_exchange_durable
- defaults to falserabbitmq_exchange_autodelete
- defaults to falserabbitmq_routing_key_template
- defaults to %db%.%table%- This config controls the routing key, where
%db%
and%table%
are placeholders that will be substituted at runtime
- This config controls the routing key, where
rabbitmq_message_persistent
- defaults to falserabbitmq_declare_exchange
- defaults to true
For more details on these options, you are encouraged to the read official RabbitMQ documentation here: https://www.rabbitmq.com/documentation.html
Redis
Choose type of redis data structure to create to by setting redis_type
to one of:
pubsub
, xadd
, lpush
or rpush
. The default is pubsub
.
redis_key
defaults to "maxwell" and supports topic substitution
Other configurable properties are:
redis_host
- defaults to localhostredis_port
- defaults to 6379redis_auth
- defaults to nullredis_database
- defaults to 0redis_type
- defaults to pubsubredis_key
- defaults to maxwellredis_stream_json_key
- defaults to messageredis_sentinels
- doesn't have a default valueredis_sentinel_master_name
- doesn't have a default value
Custom Producer
If none of the producers packaged with Maxwell meet your requirements, a custom producer can be added at runtime.
In order to register your custom producer, you must implement the ProducerFactory
interface, which is responsible for creating your custom AbstractProducer
. Next, set the custom_producer.factory
configuration property to your ProducerFactory
's fully qualified class name. Then add the custom ProducerFactory
JAR and all its dependencies to the $MAXWELL_HOME/lib directory.
Your custom producer will likely require configuration properties as well. For that, use the custom_producer.*
(or CUSTOM_PRODUCER_*
if using env-variable configuration) property namespace. Those properties will be available to your producer via MaxwellConfig.customProducerProperties
.
Custom producer factory and producer examples can be found here: https://github.com/zendesk/maxwell/tree/master/src/example/com/zendesk/maxwell/example/producerfactory
Topic substitution
Some producers may be given a template string from which they dynamically generate a topic (or whatever their equivalent of a kafka topic is).
Subsitutions are enclosed in by %{}
. The following substitutions are available:
%{database}
%{table}
%{type}
(insert/update/delete)
Topic substituion is available in the following producers:
- Kakfa, for topics
- Redis, for channels
- Nats, for subject heirarchies