Multiple Message Processors and Twitter Streaming API

kkleva
Participant V

We are creating an facade to work with the Twitter Streaming API. Our requirement is to be able to have an api that can start / stop the stream and while it's running emit requests to store each tweet to our syslog server. The issue we are running into is that the Twitter Streaming API only allows a single connection to the service. However, since our Node.js proxies are across multiple message processors / routers there isn't any context shared about which one is currently holding the streamed connection. So when it comes to disconnecting or stoping the stream we don't really now which MP is current connected.

So for example, if we create a stream only a single Node.js server will start the connection and begin feeding the tweets into syslog. Rather than having the client hold the connection open we let the node.js server on apigee keep running.

As mentioned, this presents an issue when we go to stop the stream. Since only one MPs currently have a open connection, we need to send up to three 'stop' commands before we hit the server that is holding the connection.

Is there any way to deal with this?

Solved Solved
3 2 523
1 ACCEPTED SOLUTION

Kris, I LOVE THIS QUESTION.

And I'm completely clear on what you're asking.

To solve your problem, you need a way to synchronize or communicate between the different MPs in the environment. One MP starts doing something, and you'd like a control command sent to ANY of the other MPs, to affect the one that is working.

I've used the Apigee Edge cache as the shared infrastructure that allows inter-process communication.

When a control command arrives, the receiving MP writes it into the Edge cache, using a cache key that is specific to your nodejs listener thing. Call it "twitter-stream-listener-state" or something like that. Write a "running" or "stopped" string to that cache item. The control might look something like this:

app.post('/control', function(request, response) {
  var payload,
      // post body parameter, or query param
      action = request.body.action || request.query.action,
      putCallback = function(e) {
        if (e) {
          payload = { error: true, cacheException: e.toString()};
          response.send(500, JSON.stringify(payload, null, 2) + "\n");
          return;
        }
        payload = { status : "ok" };
        response.send(200, JSON.stringify(payload, null, 2) + "\n");
      };


  response.header('Content-Type', 'application/json');


  if (action != "stop" && action != "start") {
    payload = { error : "unsupported request (action)" };
    response.send(400, JSON.stringify(payload, null, 2) + "\n");
    return;
  }


  // There is a 2x2 matrix of possibilities.
  // state = {stopped, running}
  // action = {stop, start}
  // A different response is required for each combination.
  cache.get(gStatusCacheKey, function(e, value) {
    if (e) {
      payload = { error : true, cacheFail: true, cacheException : e.toString()};
      response.send(500, JSON.stringify(payload, null, 2) + "\n");
      return;
    }


    if (value == "stopped") {
      if (action == "stop") {
        // nothing to do...send a 400.
        payload = { error : "already stopped"};
        response.send(400, JSON.stringify(payload, null, 2) + "\n");
      }
      else {
        // action == start
        console.log('starting...');
        cache.put(gStatusCacheKey, "running", 8640000, putCallback);
      }
    }
    else {
      // is marked "running" now.
      if (action == "stop") {
        console.log('stopping...');
        cache.put(gStatusCacheKey, "stopped", 8640000, putCallback);
      }
      else {
        // action == start
        // nothing to do, send a 400.
        payload = { error : "already running"};
        response.send(400, JSON.stringify(payload, null, 2) + "\n");
      }
    }
  });
});

In this way, ANY MP can receive the control command and write the data to the cache. Now, how to get that information?

In the MP that is actually listening, you'd need to periodically pause and read that cache item, using the same cache key. This works if you do it in the callback of whatever is streaming stuff to you, or in a function that constantly re-invokes itself. It looks kinda like this:

function runIteration() {
  try {
    cache.get(gStatusCacheKey, function(e, value) {
      if (e) {
        console.log('cannot retrieve status, presumed running.');
      }
      if (e || value != "stopped") {
        // the agent is running
        // do work in here...
      }
      // else {
      //  // agent is stopped. do nothing during this cycle
      // }
    });
  }
  catch (exc1) {
    console.log("Exception:" + exc1);
    console.log(exc1.stack);
  }
  setTimeout(runIteration, sleepTimeBetweenCyclesInMs);
}

According to the above logic, the nodejs code in the MP runs perpetually, invoking this "runIteration" function. Inside that function it checks the cached status. Only if the cached status is "running" does the MP do any actual work.

By the way, to get the cache object, you need something like this:

var apigee = require('apigee-access'), 
    cache = apigee.getCache(undefined, {scope: 'application'}); 

Attached is an API proxy that include a nodejs agent that runs forever like this. It does not use the twitter streaming API, but I'll bet you will be able to adapt it to your purposes.

View solution in original post

2 REPLIES 2

Kris, I LOVE THIS QUESTION.

And I'm completely clear on what you're asking.

To solve your problem, you need a way to synchronize or communicate between the different MPs in the environment. One MP starts doing something, and you'd like a control command sent to ANY of the other MPs, to affect the one that is working.

I've used the Apigee Edge cache as the shared infrastructure that allows inter-process communication.

When a control command arrives, the receiving MP writes it into the Edge cache, using a cache key that is specific to your nodejs listener thing. Call it "twitter-stream-listener-state" or something like that. Write a "running" or "stopped" string to that cache item. The control might look something like this:

app.post('/control', function(request, response) {
  var payload,
      // post body parameter, or query param
      action = request.body.action || request.query.action,
      putCallback = function(e) {
        if (e) {
          payload = { error: true, cacheException: e.toString()};
          response.send(500, JSON.stringify(payload, null, 2) + "\n");
          return;
        }
        payload = { status : "ok" };
        response.send(200, JSON.stringify(payload, null, 2) + "\n");
      };


  response.header('Content-Type', 'application/json');


  if (action != "stop" && action != "start") {
    payload = { error : "unsupported request (action)" };
    response.send(400, JSON.stringify(payload, null, 2) + "\n");
    return;
  }


  // There is a 2x2 matrix of possibilities.
  // state = {stopped, running}
  // action = {stop, start}
  // A different response is required for each combination.
  cache.get(gStatusCacheKey, function(e, value) {
    if (e) {
      payload = { error : true, cacheFail: true, cacheException : e.toString()};
      response.send(500, JSON.stringify(payload, null, 2) + "\n");
      return;
    }


    if (value == "stopped") {
      if (action == "stop") {
        // nothing to do...send a 400.
        payload = { error : "already stopped"};
        response.send(400, JSON.stringify(payload, null, 2) + "\n");
      }
      else {
        // action == start
        console.log('starting...');
        cache.put(gStatusCacheKey, "running", 8640000, putCallback);
      }
    }
    else {
      // is marked "running" now.
      if (action == "stop") {
        console.log('stopping...');
        cache.put(gStatusCacheKey, "stopped", 8640000, putCallback);
      }
      else {
        // action == start
        // nothing to do, send a 400.
        payload = { error : "already running"};
        response.send(400, JSON.stringify(payload, null, 2) + "\n");
      }
    }
  });
});

In this way, ANY MP can receive the control command and write the data to the cache. Now, how to get that information?

In the MP that is actually listening, you'd need to periodically pause and read that cache item, using the same cache key. This works if you do it in the callback of whatever is streaming stuff to you, or in a function that constantly re-invokes itself. It looks kinda like this:

function runIteration() {
  try {
    cache.get(gStatusCacheKey, function(e, value) {
      if (e) {
        console.log('cannot retrieve status, presumed running.');
      }
      if (e || value != "stopped") {
        // the agent is running
        // do work in here...
      }
      // else {
      //  // agent is stopped. do nothing during this cycle
      // }
    });
  }
  catch (exc1) {
    console.log("Exception:" + exc1);
    console.log(exc1.stack);
  }
  setTimeout(runIteration, sleepTimeBetweenCyclesInMs);
}

According to the above logic, the nodejs code in the MP runs perpetually, invoking this "runIteration" function. Inside that function it checks the cached status. Only if the cached status is "running" does the MP do any actual work.

By the way, to get the cache object, you need something like this:

var apigee = require('apigee-access'), 
    cache = apigee.getCache(undefined, {scope: 'application'}); 

Attached is an API proxy that include a nodejs agent that runs forever like this. It does not use the twitter streaming API, but I'll bet you will be able to adapt it to your purposes.

This thread was huge leg up we needed to design our solution.

We ended up using the cache to hold the stream connection state. In addition, we adding a GET resource to allow use to get the current connections status. To monitor that the stream is still flowing and the connection is remaining open we use health.apigee.com to monitor that the GET always returns "connected".

If the status ever comes back as "disconnected" we have a Slack alert go out to the team. Then we just restart the connection and validate that the get status is returning connected.

Using the approach we've been able to maintain an open stream of tweets for days-to-weeks. Most of the times the connections became disrupted where caused by another user interrupting the stream and forgetting to leave it connected. ;0)