Configuration Examples
Firstly, you will have to specify the common required attributes including the input topic from where the messages will be consumed:
connector.class=com.privitar.agrotera.dataflow.kafka.PrivitarSinkConnector tasks.max=2 topics=<my-topic>
Then, you will have to specify the destination topic where the anonymized records will be produced. The Kafka Broker can be the same as where is sitting the input topic, but can also be different:
dest.bootstrap.servers=kafka-broker:9092 dest.topics=<my-destination-topic>
The required parameters to connect to the Privitar Policy Manager using Basic Authentication:
privitar.authentication=basic privitar.publisherUrl=https://privitar-policy-manager:8080 privitar.publisherUsername=<myUsername> privitar.publisherPassword=<mySecretPassword>
Alternatively, you can add the required parameters to connect to the Privitar Policy Manager using Mutual TLS:
privitar.authentication=mutualTls privitar.tlsClientCertificatePath=<myClientPath> privitar.tlsClientCertificatePassword=<myClientPassword>privitar.tlsTrustedCertificateAuthorityCertificatePath=<myTrustedPath>
Then, if you are using Confluent Schema Registry and Avro, you will have to setup the Kafka Connect converter for the record value and the Privitar Data Flow Job ID associated with record schema:
value.privitar.enabled=true value.privitar.jobId=c21a value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://schema-registry:8083 dest.value.converter=io.confluent.connect.avro.AvroConverter dest.value.converter.schema.registry.url=http://schema-registry:808
Similarly, if you are using Confluent Schema Registry and Avro, but plan to process multiple different Avro Schema types for the given topic, you will have to set up the Kafka Connect converter for the record value and the Avro Schema Name to Job ID Mapping for each Schema type. You will also need to configure the a subject name strategy on the destination converter which supports different Schema types for the same topic (read more about the Schema Registry's subject name strategy).
value.privitar.enabled=true value.privitar.schemaToJobIdMapping=com.privitar.SchemaName1:3uhfkd,com.privitar.SchemaName2:4uj3ld value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://schema-registry:8083 dest.value.converter=io.confluent.connect.avro.AvroConverter dest.value.converter.schema.registry.url=http://schema-registry:8083 dest.value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Alternatively, if you are using JSON without schema registry, you can setup a Kafka Connect converter with JSON as below:
value.privitar.enabled=true value.privitar.jobId=c21a value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false dest.value.converter=org.apache.kafka.connect.json.JsonConverter dest.value.converter.schemas.enable=false
If your messages have a record key, you can for example setup Kafka Connect converter for String and associate it with a different Privitar Data Flow Job ID. Note, if you are using StringConverter or LongConverter, you will have to create a Privitar Table with a single field (e.g. field1) and set the "singleFieldName" property as below.
key.privitar.enabled=true key.privitar.jobId=4e5c key.schema.singleFieldName=field1 key.converter=org.apache.kafka.connect.storage.StringConverter dest.key.converter=org.apache.kafka.connect.storage.StringConverter
In a development environment, you might like to log failure records in Kafka Connect logs (you will need your Connect process to be logging Privitar code at least at INFO level to see failures: log4j.category.com.privitar=INFO):
dest.errors.handler=debug_log
In a production environment, you should prefer to use a dead letter queue where to send failure records. The first 2 settings are to enable Kafka Connect built-in DLQ and the last 2 to enable the handler specific to the Privitar plugin. You should additionally specify the transforms to be applied; typically they would be the same as the input transforms.* settings, but with the *.target.type setting targeting the original input (eg JSON string) rather than the Java type (eg. Date):
errors.dlq.enable=true errors.dlq.topic.name=my-dead-letter-queue dest.errors.handler=dead_letter_queue dest.errors.dlq.topic.name=my-dead-letter-queue
To unmask values, all the configuration options remain the same, but the connector class property should be set to:
connector.class=com.privitar.agrotera.dataflow.kafka.UnmaskPrivitarSinkConnector