Kafka Connect Reference Guide

Configuration Options

The connector can be configured with the default SinkConnector settings and the following additional settings.

Note

In the Privacy Platform v3.1.1 and earlier, the properties starting with privitar.*, value.privitar.* and key.privitar.* used to start with anonymiser.*, value.anonymiser.* and key.anonymiser.*. From 3.2.0 it is possible to unmask values in a Data Flow Job by using the separate UnmaskPrivitarSinkConnector connector class.

The following sections describe the configuration options that are available.

The platform

The following table defines the configuration options available for the platform.

Property Name

Description

key.privitar.enabled

Set to True if you want to enable the Data Flow Job on the record keys.

key.privitar.jobId

This is the Unique ID of the Data Flow Job to apply to the record keys.

key.privitar.schemaToJobIdMapping

This is a mapping of fully qualified Avro schema names to Data Flow Job IDs provided by the Privitar Policy Manager. The format should be:

<fully-qualified-schema-name>:<job-id>

in a comma-separated list. For example:

com.privitar.SchemaName1:3uhfkd,com.privitar.SchemaName2:4uj3ld

value.privitar.enabled

Set to True if you want to enable the Data Flow Job on the record values.

value.privitar.jobId

This is the Unique ID of the Data Flow Job to apply to the record values.

value.privitar.schemaToJobIdMapping

This is a mapping of fully qualified Avro schema names to Data Flow Job IDs provided by the Privitar Policy Manager. The format should be:

<fully-qualified-schema-name>:<job-id>

For example:

com.privitar.SchemaName1:3uhfkd,com.privitar.SchemaName2:4uj3ld

privitar.publisherUrl

The platform host and port. For example:

http://localhost:8080

privitar.publisherUsername

The username and password of the API user.

The API user must have a role with Run Data Flow permission for Masking jobs or Unmasking jobs in the team that the job is defined in.

For more information about configuring users, see Configuring users.

privitar.publisherPassword

privitar.authentication

The method used to authenticate with the Privitar Policy Manager. Possible values are mutualTls and basic.

The default setting is basic authentication.

privitar.tlsClientCertificatePath

The location of the certificate file used for authenticating with the Privitar Policy Manager.

privitar.tlsClientCertificatePassword

The password for the TLS client certificate file.

privitar.tlsTrustedCertificateAuthorityCertificatePath

The location of the TLS CA certificate file used for authenticating with the Privitar Policy Manager.

privitar.tlsHostnameVerification

Set to True (by default) to enable hostname verification for outgoing connections to Privitar Policy Manager.

This property should be enabled in most cases. Disabling hostname verification will degrade the overall security of TLS as there is no guarantee about the server identity.

Connector converters

The following table defines the configuration options available for the Privitar Connector converters.

Property Name

Description

dest.key.converter

The converter class to use to serialise the record key before to send them out.

dest.value.converter

The converter class to use to serialise the record value before to send them out.

Processing Guarantee

The following table defines the configuration options available for the Privitar Processing Guarantee.

Property Name

Description

processing.guarantee

The processing guarantee that should be used. Possible values are exactly_once (default) and at_least_once. Note that exactly-once processing requires a cluster of at least three brokers by default which is the recommended setting for production; for development you can change this, by adjusting broker setting offset.state.log.replication.factor.

transactional.id.prefix

This is the prefix the connector will use to generate the transactional.id in case processing.guarantee=exactly_once. Check Kafka documentation for more details about how to pick a transactional.id.

Error Handling

The following table defines the configuration options available for Error Handling.

Property Name

Description

dest.errors.handler

The failure handler to use. Possible values are:

  • none (default)

  • dead_letter_queue

  • debug_log

Note that debug_log should only be chosen with non-confidential test data since it will expose the data that is sent. This handler will cover failures in Processor, Destination Transformations, Destination Converter and Producer. To cover failures on the first 2 stages 'Consumer', 'Converter' and 'Transformations', you will have to specify the Kafka connect built-in errors.handler.

dest.errors.dlq.topic.name

The topic name used if the error handler is dead_letter_queue. The failed records will be sent out to this topic. This is applicable for the following connector stages: Processor, Destination Transformations, Destination Converter and Producer. All errors in the stages before these ones won't be forwarded to this DLQ. However, there is a property called errors.dlq.topic.name in Kafka Connect you can enable to cover these other stages.

dest.errors.transforms

Aliases for the transformations to be applied to records sent to the DLQ. Similar to the one provided by Kafka Connect with ‘transforms’.

dest.errors.transforms.*

The configuration of the transformation applied before sending the failed input record to the DLQ. The configuration is similar to the 'transforms' setting from Kafka Connect.

Advanced Connector Settings

The following table defines the Advanced Connector Settings for the platform.

Property Name

Description

value.schema.singleFieldName

The field name in the platform schema to be used in case the record value is a simple String or Long (and not an object with multiple fields.

key.schema.singleFieldName

This is the prefix the connector will use to generate the transactional.id in case processing.guarantee=exactly_once. Check Kafka documentation for more details about how to pick a transactional.id.

dest.key.schema.name

The fully qualified name of the schema used for record keys "(eg. with Avro, it will be the namespace and the name of a record such as `com.record.namespace" .RecordName`). Default is the same schema name as the input record. This property is only valid in conjunction with the key.privitar.jobId property.

dest.value.schema.name

The fully qualified name of the schema used for record values "(eg. with Avro, it will be the namespace and the name of a record such as `com.record.namespace" .RecordName`). Default is the same schema name as the input record. This property is only valid in conjunction with the key.privitar.jobId property.

dest.key.schema.name.mapping

The mapping of the fully qualified name of the input schemas to the desired fully qualified name of the output schemas used for record keys. Default is the same schema name as the input record. This property is only valid in conjunction with the value.privitar.schemaToJobIdMapping property. The format should be:

<fully-qualified-input-schema-name>:<fully-qualified-output-schema-name>

in a comma-separated list. For example:

com.privitar.SchemaName1:com.privitar.deidentified.SchemaName1,

com.privitar.SchemaName2:com.privitar.deidentified.SchemaName

dest.value.schema.name.mapping

The mapping of the fully qualified name of the input schemas to the desired fully qualified name of the output schemas used for record values. Default is the same schema name as the input record. This property is only valid in conjunction with the key.privitar.schemaToJobIdMapping property. The format should be:

<fully-qualified-input-schema-name>:<fully-qualified-output-schema-name>

in a comma-separated list. For example:

com.privitar.SchemaName1:com.privitar.deidentified.SchemaName1,

com.privitar.SchemaName2:com.privitar.deidentified.SchemaName

dest.transforms

Aliases for the transformations to be applied to records. Similar to the one provided by Kafka Connect with ‘transforms’.

dest.transforms.*

The configuration of the transformation applied before to send the anonymised record. The configuration is similar to the 'transforms' setting from Kafka Connect.

Advanced Privacy Platform Settings

The following table defines advanced settings for the platform.

Property Name

Description

privitar.maxCacheWeightBytes

The maximum size (in bytes) that can be used by cached tokens.

privitar.maxBatchSize

Incoming records will be processed in batches no larger than this size.

privitar.numConcurrentBatches

The maximum number of batches that can be processed in parallel.

privitar.tokenVault.kerberosKeytabPath

Specifies the location of the kerberos keytab used for connecting to a HBase token vault.