Hi.
I’m using the Filtered stream API to get real-time tweets from some keywords and accounts. My code is based on this sample code.
It is usually working fine but disconnected for “operational reasons” within a few hours every time. I reconnect that endpoint and got the status code 200. However, after that, I get nothing but keep-alive signals. I made sure some tweets matched my rules.
How to solve this problem to reconnect correctly or get tweets again?
This is my code.
while True:
try:
params = {'tweet.fields': 'text', 'expansions': 'author_id'}
with requests.get(
"https://api.twitter.com/2/tweets/search/stream", auth=bearer_oauth, stream=True, params=params
) as res:
print("connect", res.raw.closed, res)
if res.status_code != requests.codes.ok:
continue
for res_l in res.iter_lines(chunk_size=1024):
if not res_l:
continue
json_response = json.loads(res_l)
print(datetime.datetime.now(), json_response)
if not "data" in json_response:
continue
if res.raw.closed:
print(res.raw.closed, "close")
raise DisconnectedError
except DisconnectedError:
print("disconnected")
time.sleep(30)
except requests.exceptions.ChunkedEncodingError as e:
print("error")
time.sleep(30)
except Exception as e:
print(e)
Thank you.
I tried to debug this problem by using JS and encountered the same problem. If it was disconnected from API, recall function “streamConnect” but not receive any data after that.
Therefore I think it is not a problem of my program. Is there any way to solve it?
This is my code. (copy from this sample code)
// Open a realtime stream of Tweets, filtered according to rules
// https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/quick-start
const needle = require('needle');
// The code below sets the bearer token from your environment variables
// To set environment variables on macOS or Linux, run the export command below from the terminal:
// export BEARER_TOKEN='YOUR-TOKEN'
const token = process.env.TOKEN;
const streamURL = 'https://api.twitter.com/2/tweets/search/stream';
function streamConnect(retryAttempt) {
console.log("test")
const stream = needle.get(streamURL, {
headers: {
"User-Agent": "v2FilterStreamJS",
"Authorization": `Bearer ${token}`
},
timeout: 20000
});
stream.on('data', data => {
try {
const json = JSON.parse(data);
console.log(json);
// A successful connection resets retry count.
retryAttempt = 0;
} catch (e) {
if (data.detail === "This stream is currently at the maximum allowed connection limit.") {
console.log(data.detail)
process.exit(1)
} else {
console.log(data)
// Keep alive signal received. Do nothing.
}
}
}).on('err', error => {
if (error.code !== 'ECONNRESET') {
console.log(error.code);
process.exit(1);
} else {
// This reconnection logic will attempt to reconnect when a disconnection is detected.
// To avoid rate limits, this logic implements exponential backoff, so the wait time
// will increase if the client cannot reconnect to the stream.
setTimeout(() => {
console.warn("A connection error occurred. Reconnecting...")
streamConnect(++retryAttempt);
}, 2 ** retryAttempt)
}
});
return stream;
}
(async () => {
// Listen to the stream.
streamConnect(0);
})();