Kafka Connect Reference Guide

Configuration Examples

Firstly, you will have to specify the common required attributes including the input topic from where the messages will be consumed:

connector.class=com.privitar.agrotera.dataflow.kafka.PrivitarSinkConnector
tasks.max=2
topics=<my-topic> 

Then, you will have to specify the destination topic where the anonymized records will be produced. The Kafka Broker can be the same as where is sitting the input topic, but can also be different:

dest.bootstrap.servers=kafka-broker:9092
dest.topics=<my-destination-topic> 

The required parameters to connect to the Privitar Policy Manager using Basic Authentication:

privitar.authentication=basic
privitar.publisherUrl=https://privitar-policy-manager:8080
privitar.publisherUsername=<myUsername>
privitar.publisherPassword=<mySecretPassword>

Alternatively, you can add the required parameters to connect to the Privitar Policy Manager using Mutual TLS:

privitar.authentication=mutualTls
privitar.tlsClientCertificatePath=<myClientPath>
privitar.tlsClientCertificatePassword=<myClientPassword>privitar.tlsTrustedCertificateAuthorityCertificatePath=<myTrustedPath>

Then, if you are using Confluent Schema Registry and Avro, you will have to setup the Kafka Connect converter for the record value and the Privitar Data Flow Job ID associated with record schema:

value.privitar.enabled=true
value.privitar.jobId=c21a
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8083
dest.value.converter=io.confluent.connect.avro.AvroConverter
dest.value.converter.schema.registry.url=http://schema-registry:808

Similarly, if you are using Confluent Schema Registry and Avro, but plan to process multiple different Avro Schema types for the given topic, you will have to set up the Kafka Connect converter for the record value and the Avro Schema Name to Job ID Mapping for each Schema type. You will also need to configure the a subject name strategy on the destination converter which supports different Schema types for the same topic (read more about the Schema Registry's subject name strategy).

value.privitar.enabled=true
value.privitar.schemaToJobIdMapping=com.privitar.SchemaName1:3uhfkd,com.privitar.SchemaName2:4uj3ld
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8083
dest.value.converter=io.confluent.connect.avro.AvroConverter
dest.value.converter.schema.registry.url=http://schema-registry:8083
dest.value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

Alternatively, if you are using JSON without schema registry, you can setup a Kafka Connect converter with JSON as below:

value.privitar.enabled=true 
value.privitar.jobId=c21a
value.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter.schemas.enable=false 
dest.value.converter=org.apache.kafka.connect.json.JsonConverter 
dest.value.converter.schemas.enable=false

If your messages have a record key, you can for example setup Kafka Connect converter for String and associate it with a different Privitar Data Flow Job ID. Note, if you are using StringConverter or LongConverter, you will have to create a Privitar Table with a single field (e.g. field1) and set the "singleFieldName" property as below.

key.privitar.enabled=true 
key.privitar.jobId=4e5c key.schema.singleFieldName=field1 
key.converter=org.apache.kafka.connect.storage.StringConverter 
dest.key.converter=org.apache.kafka.connect.storage.StringConverter

In a development environment, you might like to log failure records in Kafka Connect logs (you will need your Connect process to be logging Privitar code at least at INFO level to see failures: log4j.category.com.privitar=INFO):

dest.errors.handler=debug_log

In a production environment, you should prefer to use a dead letter queue where to send failure records. The first 2 settings are to enable Kafka Connect built-in DLQ and the last 2 to enable the handler specific to the Privitar plugin. You should additionally specify the transforms to be applied; typically they would be the same as the input transforms.* settings, but with the *.target.type setting targeting the original input (eg JSON string) rather than the Java type (eg. Date):

errors.dlq.enable=true 
errors.dlq.topic.name=my-dead-letter-queue 
dest.errors.handler=dead_letter_queue 
dest.errors.dlq.topic.name=my-dead-letter-queue 

To unmask values, all the configuration options remain the same, but the connector class property should be set to:

connector.class=com.privitar.agrotera.dataflow.kafka.UnmaskPrivitarSinkConnector