[Node.js][Twit] Refreshing Twitter Stream with new track terms?


#1

Hello.

I am using Node.js in conjuction with the Twit module and a load of other modules but I am mainly focused on the Twit module. I have an application that accepts user input as the track terms for the Streaming API and I want the application to restart the stream with the new track terms once the user changes the search terms and clicks the start button again.

Currently the stream is working fine with user input and I can also detect when the user changes the search terms and clicks the start button again. I just can’t work out how to restart the stream with the new track terms. I am doing this A LOT as I am also going to be using a feature extraction API to grab terms from the tweets received to add even more track terms to the stream so that I can generate more load on the server (I am doing this so that I can scale out the system automatically to more instances). This is my current code AKA my current attempt at solving this problem.

// do some stuff up here to determine if the search terms have changed

// stop the stream, update the terms and start the stream again
stream.stop();
stream.params.track = decodedSearch['terms[]'];
stream.start();

// When we receive a tweet
stream.on('tweet', function(tweet) {
   // do my sentiment analysis and emit the results
});

So that’s my code so far on a basic level. Any suggestions?

NB: This was one of the suggestions I saw online but unfortunately doesn’t work. I also saw a suggestion to create a new stream and stop the old one but that would not be ideal for my system since I have to change the track terms a lot.


#2

I don’t think this is going to scale well (but, if I’m wrong I’d love to know how you end up doing it). The docs mention but do not specify a limit to how often one can make a streaming request.

To answer your question…

stream.stop()
stream = TWIT.stream('statuses/filter', {'track':[term1,term2,etc]})
stream.on('tweet', ...)

#3

Yeah well it’s for a university assignment so it should work since this is what we are supposed to use. I just need to create a lot of computational load for the server.

I have tried that already. I already had stream initialised further up

stream = T.stream('statuses/filter', { track: decodedSearch['terms[]'] });

If I try and do it the way you mentioned, it just keeps returning tweets with the previous track terms AS WELL AS the new track terms entered. I don’t know why it does this. I have done some debugging and the track terms change successfully but for some reason it keeps displaying tweets from the previous track terms. I’m not sure if there’s like a ‘pipeline’ of tweets still being processed by the client or if it just isn’t working for some reason.


#4

If you look at the source code for stop() it closes the request and the response objects, as well as removing the event listeners. So, it should work. Is it possible your stream var is not scoped properly (i.e. it is local in scope)?


#5

I don’t think it’s local in scope. If this wasn’t a university assignment I would post all of my source code. I’ll try and post as much of it as I can to try and show you the main components of my code. This is basically what it looks like:

var Twit = require('twit');
// Other node modules here

var stream;
// Other variables here

// Create a new Twit object using the required credentials
var T = new Twit({
    consumer_key:         ''
  , consumer_secret:      ''
  , access_token:         ''
  , access_token_secret:  ''
});

// Application handling here

// When the application requests the homepage with the POST method, execute the following code
app.post('/', function(req, res) {
	// Initialise the search variable
	var search = '';
	
	// When data is received
	req.on('data', function(chunk) {
		// Grab the search details
	});

	// Once all of the above data is received
	req.on('end', function() {
		// Write an OK header as a response
		res.writeHead(200, "OK", {'Content-Type': 'text/html'});	
		
		// Decode the search data
		var decodedSearch = querystring.parse(search);
		
		// Declare the stream variable if it is null
		if (stream == null) {
			stream = T.stream('statuses/filter', { track: decodedSearch['terms[]'] });
		}
		
		// If the search terms have been changed
		if (oldSearch != search || decodedSearch.length > trackCount) {
			// Update the oldSearch variable
			oldSearch = search;
			trackCount = decodedSearch.length;
			
			stream.stop();
			stream = T.stream('statuses/filter', { track: decodedSearch['terms[]'] });;
			
			// When we receive a tweet
			stream.on('tweet', function(tweet) {
				// Sentiment analysis API stuff
				unirest.post("")
				.header("")
				.header("Content-Type", "application/x-www-form-urlencoded")
				.field("text", tweet.text)
				.end(function (result) {
					// Once we get a result from the sentiment analysis API, emit the results for the client-side to grab
					io.emit('tweet', tweet);
					io.emit('sentiment', result.body);
				});
				
				// Initialise the pos variables here
				
				for (i in taggedTweet) {
					// Grab the word and tag from the selected word here
					
					// If the word is tagged as a proper noun or plural proper noun
					if (tag == "NNP" || tag == "NNPS") {
						// Add the word to the selected words and increase the index
						selectedWords[ind] = word;
						ind++;
					}
					
					// Assign all of the selected words to a unique array of search terms if it doesn't already exist
					for (j in selectedWords) {
						if (decodedSearch['terms[]'].indexOf(selectedWords[j]) == -1) {
							// Determines if the current track terms is a string or an array and acts accordingly
							if (typeof decodedSearch['terms[]'] == 'string') {
								var temp = decodedSearch['terms[]'];
								decodedSearch['terms[]'] = [ temp, selectedWords[j] ];
							} else {
								decodedSearch['terms[]'].push(selectedWords[j]);
							}
						}
					}
				}
			});
		}
		
		// Close the response
		res.end();
	});
});

// Stop the stream once the stop button is clicked
app.post('/stop', function(req, res) {
	stream.stop();
});

This is not all of the code and a lot of the code here is hidden and isn’t necessary. Just don’t want to get in trouble for posting a full assignment online or anything like that. I think that’s all the code you would need to get an understanding of what I am trying to do.

On the client side I have this:

<script>
	<!-- Declare the Socket.IO variable and start a listener !-->
	var socket = io();
	
	socket.on('tweet', function(msg) {
		$('#tweets').prepend('<li class="list-group-item">' + msg.text + '</li>');
	});
	
	socket.on('sentiment', function(msg) {
		var result = JSON.parse(msg);
		var type = result.type.charAt(0).toUpperCase() + result.type.slice(1);
		$('#sentiment').prepend('<li class="list-group-item">' + type + '</li>');
	});
</script>

So this grabs the values from the first chunk of text from the io.emit methods and dispalys them on the client side to the user. Hopefully this gives you a better idea of what I am doing.