Unable to deploy a proxy to Apigee Edge with Java Callout (publishing to Kafka topic)

Hi,

I have a simple Apigee proxy with Java Callout which contains a very simple class to publish messages to Kafka Topic (I am trying to do this for poc purpose):

*****************************************************************************************************

package com.aneesh.test;

import java.util.Properties;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import com.apigee.flow.execution.ExecutionContext;

import com.apigee.flow.execution.ExecutionResult;

import com.apigee.flow.message.MessageContext;

public class KafkaTest implements Execution{

public ExecutionResult execute(MessageContext messageContext, ExecutionContext executionContext) {

Producer<String, String> producer = null;

Properties props = null;

String topicName = "Kafka.Test.Topic";

try {

props = new Properties();

props.put("bootstrap.servers", "<Kafka Server Name>:<Kafka Port>");

props.put("acks", "all");

props.put("retries", 0);

props.put("batch.size", 16384);

props.put("linger.ms", 1);

props.put("buffer.memory", 33554432);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(props);

String inputMessage = messageContext.getMessage().getContent();

if (inputMessage != null && inputMessage.length() > 0) {

producer.send(new ProducerRecord<String, String>(topicName, null, inputMessage));

} else {

messageContext.getMessage().setContent("Input message is empty!!");

}

return ExecutionResult.SUCCESS;

} catch (Exception e) {

messageContext.getMessage().setContent("Error in java class!!" + e.getClass().getName() + ": " + e.getMessage());

return ExecutionResult.ABORT;

} finally {

if (producer != null) {

try {

producer.close();

} catch (Exception e) {

messageContext.getMessage().setContent("Error in java class!!" + e.getClass().getName() + ": " + e.getMessage());

return ExecutionResult.ABORT;

}

}

}

}

}

***************************************************************************************************************

Java Callout policy configuration:

**************************************************************************************************************

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>

<JavaCallout async="false" continueOnError="false" enabled="true" name="JC_Kafka_Conn">

<DisplayName>JC_Kafka_Conn</DisplayName>

<Properties/>

<ClassName>com.aneesh.test.KafkaTest</ClassName>

<ResourceURL>java://KafkaTest.jar</ResourceURL>

</JavaCallout>

**************************************************************************************************************

I uploaded the my custom jar and the dependent library "kafka-clients-1.0.0.jar" as below.

5943-kafka.jpg

When i try to deploy the proxy to Apigee Edge dev region (cloud), its throwing me the following error:

Error in deployment for environment dev. The revision is deployed, but traffic cannot flow. com.apigee.kernel.exceptions.spi.UncheckedException{ code = application.bootstrap.FailedToConfigure, message = Configuration failed, associated contexts = []}

Can anybody please help me resolve?

Thanks,

Aneesh.

0 7 927
7 REPLIES 7

The deployment error went away after i implemented the Interface "Execution".

But while invoking the proxy, i am getting the error:

java.security.AccessControlException: access denied ("javax.management.MBeanServerPermission" "createMBeanServer")

@AneeshAnanthakrishnan There are certain restriction when using JavaCallouts which are mentioned here

The class javax.management.MBeanServerPermission and method createMBeanServer you are using in your Javacallout is one of them which is restricted. We will be soon publishing the set of restricted classes and methods on docs.apigee.com.

If you cannot find an alternative method, then you would need to put in a support ticket to make use of hosted functions.

@Divya Achan

I am not using or calling the class javax.management.MBeanServerPermission and the method createMBeanServer in my class (Please refer to the the custom java class code that i have pasted in my original post above that I wrote for publishing messages to Kafka topic).

Apigee is throwing error here:

producer = new KafkaProducer<String, String>(props);

Thanks,

Aneesh.

Hmm, Yes. I agree, the createMBeanServer seems to not be relevant here.

But the idea is right. There are class restrictions for Java callouts. Apparently you are bumping into one of those restrictions. At the time KafkaProducer is instantiated, it seems that the Java is attempting to load another class, a dependency, and THAT is failing. I don't know how to determine which class is causing the problem, without having access to the MP logs.

BTW, How do you know that "Apigee is throwing here: ..." ? What leads you to that conclusion?

i tried setting value of messageContext.getMessage().setContent() in Catch(Exception e) to see the last successful line of code that gets executed before error is thrown. Something like this:

public class KafkaTest implements Execution {
	public ExecutionResult execute(MessageContext messageContext, ExecutionContext executionContext) {
		Producer<String, String> producer = null;
		Properties props = null;
		String topicName = "Kafka.Test.Topic";
		String codeLine = "0";
		try {
			props = new Properties();
			codeLine = "1";
			props.put("bootstrap.servers", "<Kafka_Server>:32925");
			codeLine = "2";
			props.put("acks", "all");
			codeLine = "3";
			props.put("retries", 0);
			codeLine = "4";
			props.put("batch.size", 16384);
			codeLine = "5";
			props.put("linger.ms", 1);
			codeLine = "6";
			props.put("buffer.memory", 33554432);
			codeLine = "7";
			props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			codeLine = "8";
			props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			codeLine = "9";
			producer = new KafkaProducer<String, String>(props);
			codeLine = "10";
			String inputMessage = messageContext.getMessage().getContent();
			codeLine = "11";
			if (inputMessage != null && inputMessage.length() > 0) {
				producer.send(new ProducerRecord<String, String>(topicName, null, inputMessage));
				codeLine = "12";
				messageContext.getMessage().setContent("Message published successfully!!");
				codeLine = "13";
				
			} else {
				messageContext.getMessage().setContent("Input message is empty!!");
				codeLine = "14";
			}
			return ExecutionResult.SUCCESS;
		} catch (Exception e) {
			messageContext.getMessage().setContent("Last successful codeLine is :[" + codeLine + "]. Error in java class!!" + e.getClass().getName() + ": " + e.getMessage());
			//return ExecutionResult.ABORT;
			return ExecutionResult.SUCCESS;
		}  finally {
			if (producer != null) {
				try {
					producer.close();
				} catch (Exception e) {
					messageContext.getMessage().setContent("Error in java class!!" + e.getClass().getName() + ": " + e.getMessage());
					//return ExecutionResult.ABORT;
					return ExecutionResult.SUCCESS;
				}
			}
		}
	}
}<br>

when i invoke the proxy i get the following response:

Last successful codeLine is :[9]. Error in java class!!java.security.AccessControlException: access denied ("javax.management.MBeanServerPermission" "createMBeanServer")

Thanks,
Aneesh.

oh! Whoops! I rescind my previous remark. It is the createMBeanServer() method call as Divya said.

But anyway it's a permissions error. At runtime, the Callout is trying to load, within the KafkaProducer constructor, a class which is disallowed.

My quick look at the KafkaProducer class shows that there is no way to turn off the JmxReporter. In other words, there's no way to avoid the createMBeanServer, which will cause your Java callout to fail. I suppose you could custom-build your own KafkaProducer class, if you wanted to.

By changing the Javacallout to print the stack trace instead of the error message.

catch (Exception e) {
messageContext.getMessage().setContent("Error in java class!!" + e.getClass().getName() + ": " + e.getStackTrace());

If you look at the stack trace, you can see its the class org.apache.kafka.common.utils.AppInfoParser and method unregisterAppInfo() which is calling the class java.lang.management.ManagementFactory and method getPlatformMBeanServer().

Last successful codeLine is :[9]. Error in java class!!java.security.AccessControlException: java.security.AccessControlContext.checkPermission(AccessControlContext.java:472) java.security.AccessController.checkPermission(AccessController.java:884) java.lang.SecurityManager.checkPermission(SecurityManager.java:549) com.apigee.securitypolicy.InternalSecurityManager.checkPermission(InternalSecurityManager.java:84) java.lang.management.ManagementFactory.getPlatformMBeanServer(ManagementFactory.java:465) org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:71) org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1075) org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:431) org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291) com.aneesh.test.KafkaTest.execute(KafkaTest.java:38) com.apigee.steps.javacallout.JavaCalloutStepDefinition$ClassLoadWrappedExecution.execute(JavaCalloutStepDefinition.java:176) com.apigee.steps.javacallout.JavaCalloutStepDefinition$SecurityWrappedExecution$1.run(JavaCalloutStepDefinition.java:242) com.apigee.steps.javacallout.JavaCalloutStepDefinition$SecurityWrappedExecution$1.run(JavaCalloutStepDefinition.java:240) 

If you look at this method here :

https://docs.oracle.com/javase/7/docs/api/java/lang/management/ManagementFactory.html#getPlatformMBe...

You can see that this method returns the platform MBeanServer. On the first call to this method, it first creates the platform MBeanServer by calling the MBeanServerFactory.createMBeanServer method and registers each platform MXBean in this platform MBeanServer with its ObjectName. This method, in subsequent calls, will simply return the initially created platform MBeanServer.

When this method calls MBeanServerFactory.createMBeanServer is when the MessageProcessor's java security policy kicks in and restricts the access.