Create a sink connector from Apache Kafka® to Google Cloud Storage#
The Apache Kafka Connect® Google Cloud Storage (GCS) sink connector by Aiven enables you to move data from an Aiven for Apache Kafka® cluster to a Google Cloud Storage bucket for long term storage. The full connector documentation is available in the dedicated GitHub repository.
Note
You can check the full set of available parameters and configuration options in the connector’s documentation.
Prerequisites#
To setup the Google Cloud Storage sink connector by Aiven, you need an Aiven for Apache Kafka® service with Apache Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.
Furthermore you need to follow the steps to prepare the GCP account and GCS sink and collect the following information about the target GCS bucket upfront:
GCS_NAME
: The name of the GCS bucketGCS_CREDENTIALS
: The Google service account JSON service key created during the prerequisite phase
Warning
The GCS sink connector accepts the GCS_CREDENTIALS
JSON service key as string, therefore all "
symbols within it must be escaped \"
.
The GCS_CREDENTIALS
parameter should be in the format {\"type\": \"service_account\",\"project_id\": \"XXXXXX\", ...}
Additionally, any \n
symbols contained in the private_key
field need to be escaped (by substituting with \\n
)
Setup an GCS sink connector with Aiven Console#
The following example demonstrates how to setup an Apache Kafka Connect® GCS sink connector using the Aiven Console.
Define an Apache Kafka Connect® configuration file#
Define the connector configurations in a file (we’ll refer to it with the name gcs_sink.json
) with the following content:
{
"name": "my-gcs-connector",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "TOPIC_NAME",
"gcs.credentials.json": "GCS_CREDENTIALS",
"gcs.bucket.name": "GCS_NAME",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}
The configuration file contains the following entries:
name
: The connector nametopics
: The list of Apache Kafka® topics to sink to the GCS bucketkey.converter
andvalue.converter
: Data converters, depending on the topic data format. Check the GitHub repository documentation for more informationgcs.credentials.json
: The Google service account JSON service key as JSON stringgcs.bucket.name
: The name of the GCS bucketfile.name.prefix
: The file name prefixfile.compression.type
: The type of compression to use when creating the fileformat.output.type
: The format used to store the message valuesformat.output.fields
: The message fields to be included in the target file
Tip
You can define GCS sink connector naming and data formats by setting the dedicated parameters.
Check out the GitHub repository parameters documentation for the full list of configuration options.
Create a Kafka Connect connector with the Aiven Console#
To create a Kafka Connect connector, follow these steps:
Log in to the Aiven Console and select the Aiven for Apache Kafka® or Aiven for Apache Kafka Connect® service where the connector needs to be defined.
Select Connectors from the left sidebar.
Select Create New Connector, the button is enabled only for services with Kafka Connect enabled.
Select Google Cloud Storage sink.
In the Common tab, locate the Connector configuration text box and select on Edit.
Paste the connector configuration (stored in the
gcs_sink.json
file) in the form.Select Apply.
Note
The Aiven Console parses the configuration file and fills the relevant UI fields. You can review the UI fields across the various tab and change them if necessary. The changes will be reflected in JSON format in the Connector configuration text box.
After all the settings are correctly configured, select Create connector.
Verify the connector status under the Connectors screen.
Verify the presence of the data in the target GCS bucket.
Note
You can also create connectors using the Aiven CLI command.
Example: define a GCS sink connector#
The example creates an GCS sink connector with the following properties:
connector name:
my_gcs_sink
source topics:
test
target GCS bucket name:
my-test-bucket
target Google service key:
{\"type\": \"service_account\", \"project_id\": \XXXXXXXXX\", ..}
name prefix:
my-custom-prefix/
data compression:
gzip
message data format:
jsonl
fields to include in the message:
value, offset
number of messages per file: 1
The connector configuration is the following:
{
"name": "my_gcs_sink",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "test",
"gcs.credentials.json": "{\"type\": \"service_account\", \"project_id\": \XXXXXXXXX\", ..}",
"gcs.bucket.name": "my-test-bucket",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"file.max.records": "1",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}