Kafka integration?

Not applicable

What would it take to make Apigee logging behave as a Kafka message producer? Has anyone attempted this?

Solved Solved
0 3 1,644
1 ACCEPTED SOLUTION

Not applicable
so this is an option - but unless you are a logstash shop - its a HUGE work-around (in which case you should make a feature request!)

In Logstash 1.5 + you have an output option called "kafka"

For my specific requirements I need to put certain specific fields on the bus. So i started by having Apigee output all of my data in key:value format with spaces between pairs of data. I did this with a message-logging policy configured as so:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<MessageLogging  async="false" continueOnError="false" enabled="true" name="kafka-output">
    <DisplayName>kafka-output


</DisplayName> <File> <Message>val1:{key1} val2:{key2} val3:{key3}


</Message> <FileName>kafka.log</FileName> <FileRotationOptions rotateFileOnStartup="false"> <FileRotationType>SIZE</FileRotationType> <MaxFileSizeInMB>100</MaxFileSizeInMB> <MaxFilesToRetain>10</MaxFilesToRetain> </FileRotationOptions> </File> </MessageLogging>

Im outputting my data to a file and telling logstash to read it:

 file {
    path => [ "/opt/apigee4/var/log/apigee/message-processor/messagelogging/*/*/*/*/kafka-output/*.log" ]
    add_field => { "component" => "KAFKA" }
  }

Note im also adding a field called "component" you dont need to do this - but im up to ... 15 different kinds of logs right now which I have to manage w/ conditionals. Ill let you think about that - but leave it out of the rest of the example.

You can send data to a kafka bus with something like this in the logstash output:

output {
	kafka {

		topic_id => "<your-topic-id>"

		broker_list => "<your-server-address-and-port>

		request_required_acks => 1

	{


}

You are going to want to massage your data a bit before you ship it though - because there will be a couple of things that logstash adds that you dont want:

filter {
    kv {
      value_split => ":"


    }
    mutate {
      remove_field => [ "message", "@version", "@timestamp", "path", "tags" ]
      rename => [ "host", "machine" ]
    }
  }


}

To see what it looks like you can set your output (temporarily) to this:

output {
    stdout { codec => json }

}

Make sure you test your config with this command:

/opt/logstash/bin/logstash -f <path-to-test-config> --configtest

If you want to see your output you can look at it like this:

/opt/logstash/bin/logstash -f <path-to-test-config>

This got me a nice JSON message to put on my kafka queue. I am going to send some test data in a couple of minutes.

Quick note: ill give example files for everything if you need a quick start - i just dont have them prepared and scrubbed right now.

View solution in original post

3 REPLIES 3

Not applicable

Believe it or not -I am right in the middle of working on this very problem! We have chosen (for now) to log data in an arbitrary format and use logstash (latest version) to push transformed data to kafka. In the future we may write directly to it - but this method has the advantage of allowing us to backlog data locally if something goes wrong with the message queue. I am hoping to test everything next week - but I am working on how to get some extra data out of Apigee right now - things that our Kafka workflow require - like "hostname" and "total transaction time"

Not applicable
so this is an option - but unless you are a logstash shop - its a HUGE work-around (in which case you should make a feature request!)

In Logstash 1.5 + you have an output option called "kafka"

For my specific requirements I need to put certain specific fields on the bus. So i started by having Apigee output all of my data in key:value format with spaces between pairs of data. I did this with a message-logging policy configured as so:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<MessageLogging  async="false" continueOnError="false" enabled="true" name="kafka-output">
    <DisplayName>kafka-output


</DisplayName> <File> <Message>val1:{key1} val2:{key2} val3:{key3}


</Message> <FileName>kafka.log</FileName> <FileRotationOptions rotateFileOnStartup="false"> <FileRotationType>SIZE</FileRotationType> <MaxFileSizeInMB>100</MaxFileSizeInMB> <MaxFilesToRetain>10</MaxFilesToRetain> </FileRotationOptions> </File> </MessageLogging>

Im outputting my data to a file and telling logstash to read it:

 file {
    path => [ "/opt/apigee4/var/log/apigee/message-processor/messagelogging/*/*/*/*/kafka-output/*.log" ]
    add_field => { "component" => "KAFKA" }
  }

Note im also adding a field called "component" you dont need to do this - but im up to ... 15 different kinds of logs right now which I have to manage w/ conditionals. Ill let you think about that - but leave it out of the rest of the example.

You can send data to a kafka bus with something like this in the logstash output:

output {
	kafka {

		topic_id => "<your-topic-id>"

		broker_list => "<your-server-address-and-port>

		request_required_acks => 1

	{


}

You are going to want to massage your data a bit before you ship it though - because there will be a couple of things that logstash adds that you dont want:

filter {
    kv {
      value_split => ":"


    }
    mutate {
      remove_field => [ "message", "@version", "@timestamp", "path", "tags" ]
      rename => [ "host", "machine" ]
    }
  }


}

To see what it looks like you can set your output (temporarily) to this:

output {
    stdout { codec => json }

}

Make sure you test your config with this command:

/opt/logstash/bin/logstash -f <path-to-test-config> --configtest

If you want to see your output you can look at it like this:

/opt/logstash/bin/logstash -f <path-to-test-config>

This got me a nice JSON message to put on my kafka queue. I am going to send some test data in a couple of minutes.

Quick note: ill give example files for everything if you need a quick start - i just dont have them prepared and scrubbed right now.

val1:{key1} val2:{key2} val3:{key3} yeah - sorry - ive been doing this a lot lately.