I searched the internet for hours trying to solve this problem. I found several discussion threads on the subject, but none seem to provide a fix.
When I run the following flume-ng command:
flume-ng agent --conf-file flume.conf --name TwitterAgent -Dflume.root.logger=INFO,console
I get the following error:
17/03/13 05:54:34 INFO twitter4j.TwitterStreamImpl: 401:Authentication credentials (https://dev.twitter.com/pages/auth) were missing or incorrect. Ensure that you have set valid consumer key/secret, access token/secret, and the system clock is in sync.
I know my keys are ok. I tested it by pulling tweets with a python script, and they work fine. I set my computer clock with ntp.
Here is my flume.conf file:
# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = twitterSink
# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = <consumerKey>
TwitterAgent.sources.Twitter.consumerSecret = <consumerSecret>
TwitterAgent.sources.Twitter.accessToken = <accessToken>
TwitterAgent.sources.Twitter.accessTokenSecret = <accessTokenSecret>
# Describing/Configuring the sink
TwitterAgent.sinks.twitterSink.type = org.apache.flume.sink.kafka.KafkaSink
TwitterAgent.sinks.twitterSink.kafka.topic = data
TwitterAgent.sinks.twitterSink.kafka.bootstrap.servers = localhost:9092
TwitterAgent.sinks.twitterSink.channel = channel1
TwitterAgent.sinks.twitterSink.kafka.flumeBatchSize = 20
TwitterAgent.sinks.twitterSink.kafka.producer.acks = 1
TwitterAgent.sinks.twitterSink.kafka.producer.linger.ms = 1
TwitterAgent.sinks.twitterSink.kafka.producer.compression.type = snappy
# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.twitterSink.channel = MemChannel
Any help will be greatly appreciated.
Thanks!