Hi, I'm Ask INFA!
What would you like to know?
ASK INFAPreview
Please to access Bolo.

Table of Contents

Search

  1. Introducing Mass Ingestion
  2. Getting Started with Mass Ingestion
  3. Connectors and Connections
  4. Mass Ingestion Applications
  5. Mass Ingestion Databases
  6. Mass Ingestion Files
  7. Mass Ingestion Streaming
  8. Monitoring Mass Ingestion Jobs
  9. Asset Management
  10. Troubleshooting

Mass Ingestion

Mass Ingestion

Kafka source properties

Kafka source properties

When you define Kafka as the source of a
streaming ingestion
task, you must configure the mandatory Kafka source properties on the
Source
tab. Optionally, provide a comma-separated list of consumer configuration properties.
The following table describes the mandatory Kafka source properties:
Property
Description
Connection
Name of the Kafka source connection.
Connection Type
The Kafka connection type.
The connection type populates automatically based on the connection that you select.
Topic
Kafka source topic name or a Java supported regular expression for the Kafka source topic name pattern to read the events from.
You can either enter the topic name manually or fetch the metadata of the Kafka connection. To select the metadata of the Kafka connection perform the following actions:
  1. Click
    Select
    .
    The
    Select Source Object
    dialog box appears, showing all the topics or topic patterns available in the Kafka broker.
  2. Select the topic and click
    OK
    .
When you add a new Kafka source topic to a
streaming ingestion
job that is in
Up and Running
state, redeploy the job immediately to avoid data loss from the new topics.
Consumer Configuration Properties
On the
Advanced Properties
section of the
Source
tab, in
Consumer Configuration Properties
, you can provide a comma-separated list of optional consumer configuration properties. Specify the values as key-value pairs.
The following table describes the consumer configuration properties that you can configure for Kafka sources:
Property
Description
group.id
Specifies the name of the consumer group the Kafka consumer belongs to. If
group.id
doesn't exist when you construct the Kafka consumer, the task creates the consumer group automatically. This property is auto-generated. You can override this property. Default is
key1=value1
,
key2=value2
.
auto.offset.reset
Specifies the behavior of the consumer when there is no committed position or when an offset is out of range.
You can use the following types of auto offset reset:
  • Earliest. Resets the offset position to the beginning of the topic.
  • Latest. Resets the offset position to the latest position of the topic.
  • None.
When you read data from a Kafka topic or use a topic pattern and the offset of the last checkpoint is deleted during message recovery, provide the following property to recover the messages from the next available offset:
auto.offset.reset=earliest
Otherwise, the
streaming ingestion
task reads data from the latest offset available.
message-demarcator
Kafka source receives messages in batches. You can contain all Kafka messages in a single batch for a given topic and partition. This property allows you to provide a string to use as a demarcation for multiple Kafka messages. If you don't provide a value, each Kafka message is triggered as a single event.
You can use the following delimiters as demarcators:
  • New line. Separates the new content with a new line feed. Enter the following value to use a new line as a message demarcator:
    message-demarcator=${literal('
'):unescapeXml()}
  • Comma. Separates the new content with a comma. Enter the following value to use a comma as a message demarcator:
    message-demarcator=${literal(','):unescapeXml()}
  • Semicolon. Separates the new content with a semicolon. Enter the following value to use a semicolon as a message demarcator:
    message-demarcator=${literal('&#59;'):unescapeXml()}
  • Tab. Separates the new content with a tab. Enter the following value to use a tab as a message demarcator:
    message-demarcator=${literal('	'):unescapeXml()}
max.poll.records
Specifies the maximum number of records returned in a single call to poll.
For example, max.poll.records=100000

0 COMMENTS

We’d like to hear from you!