logo
down
shadow

Stream is closing in twitter4j (Hadoop- flume)


Stream is closing in twitter4j (Hadoop- flume)

By : kidding_nana
Date : November 20 2020, 11:01 PM
I wish this helpful for you The twitter streaming API has been experiencing problems since the attack on DYN the 21st of October. These have been reported on the twitter developer forum, and twitter staff are aware of the issue: https://twittercommunity.com/t/issues-reported-with-streams-since-10-21/76429.
As of yet, there is not a clear answer for what the cause of the problem is, but the observed behaviour is that connections to any of Twitter's streaming endpoints get closed after short periods of time, normally (although not always) without receiving any data. Sometimes the connection gets closed whilst sending the response resulting in invalid JSON.
code :


Share : facebook icon twitter icon
Detect closing stream of twitter4j

Detect closing stream of twitter4j


By : Rajendra IWT Develop
Date : March 29 2020, 07:55 AM
With these it helps The static keyword means that the m_hadoopFileSystem variable will be shared by all instances of the class, however it does not prevent you from assigning the variable multiple times. Currently you will get a new HadoopFileSystem each time you instantiate an TweetHADOOPDAO object, overwriting the reference to any existing HadoopFileSystem objects.
If you truly need to ensure only one instance is created, take a look at using the singleton pattern.
How to prevent hadoop stream from closing?

How to prevent hadoop stream from closing?


By : 飞奔的茶叶蛋
Date : March 29 2020, 07:55 AM
I hope this helps you . As I can infer from the comments below, you can probably do this in each of the map() function to make things easy. I saw you do the following, to pre-create some idle threads. You can move the following code to
code :
if (once) {
  for (MSLiteThread thread : Threads) {
     System.out.println("created thread");
     thread = new MSLiteThread(pile);
     thread.start();
  }
once = false;
}
public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {
    @Override
    public void configure(JobConf job) {
       for (MSLiteThread thread : Threads) {
         System.out.println("created thread");
         thread = new MSLiteThread(pile);
         thread.start();
       }
    }

    @Override
    public void map(LongWritable key, Text value,
       OutputCollector<Text, Text> output, Reporter reporter) {
    }

}
 public static class Map extends MapReduceBase implements
                Mapper<LongWritable, Text, Text, Text> {

            @Override
            public void map(LongWritable key, Text value,
                OutputCollector<Text, Text> output, Reporter reporter) {

                String url = value.toString();
                StringTokenizer urls = new StringTokenizer(url);
                Config.LoggerProvider = LoggerProvider.DISABLED;

            //setting countdownlatch to urls.countTokens() to block off that many threads.
            final CountDownLatch latch = new CountDownLatch(urls.countTokens());
            while (urls.hasMoreTokens()) {
                try {
                    word.set(urls.nextToken());
                    String currenturl = word.toString();
                    //create thread and fire for current URL here
                    thread = new URLProcessingThread(currentURL, latch);
                    thread.start();
                } catch (Exception e) {
                    e.printStackTrace();
                    continue;
                }

            }

          latch.await();//wait for 16 threads to complete execution
          //sleep here for sometime if you wish

        }

    }
public class URLProcessingThread implments Runnable {
    CountDownLatch latch;
    URL url;
    public  URLProcessingThread(URL url,  CountDownLatch latch){
       this.latch = latch;
       this.url = url;
    }
    void run() {
         //process url here
         //after everything finishes decrement the latch
         latch.countDown();//reduce count of CountDownLatch by 1

    }
}
Authentication Error Twitter4J at Flume

Authentication Error Twitter4J at Flume


By : Brian Roundy
Date : March 29 2020, 07:55 AM
will help you You have added your consumer key and secret in the TwitterSourceConstants.java? If you have done this you should reverse this editing. The informations have to be in the flume.conf file. There is an example at twitter_flume. Look in the flume sources folder, there is the flume.conf file.
Hope it helps
Flume not processing keywords from Twitter source with flume-ng with Hadoop 2.5 cdh5.3

Flume not processing keywords from Twitter source with flume-ng with Hadoop 2.5 cdh5.3


By : Narendra
Date : March 29 2020, 07:55 AM
around this issue You are missing ".sources" property of the agent. How can Flume-ng work without knowing the source? You are missing following line.
code :
TwitterAgent.sources = Twitter
TwitterAgent.sources = Twitter
TwitterAgent.sinks = HDFS
TwitterAgent.channels = MemChannel
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

#TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel

TwitterAgent.sources.Twitter.consumerKey = xxxxx
TwitterAgent.sources.Twitter.consumerSecret = xxxxxx
TwitterAgent.sources.Twitter.accessToken = xxxxx
TwitterAgent.sources.Twitter.accessTokenSecret = xxxxx

TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://uat.cloudera:8020/user/root/flume/
    TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 10
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10
TwitterAgent.channels.MemChannel.transactionCapacity = 10
TwitterAgent.sinks.HDFS.hdfs.batchSize = 10
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10
TwitterAgent.channels.MemChannel.capacity = 10
TwitterAgent.channels.MemChannel.transactionCapacity = 10
flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/sample.conf -n TwitterAgent -Dflume.root.logger=INFO,console
2015-09-25 13:44:18,045 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.twitter.TwitterSource.start(TwitterSource.java:139)] Twitter source Twitter started.
2015-09-25 13:44:18,045 (Twitter Stream consumer-1[initializing]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Establishing connection.
2015-09-25 13:44:19,931 (Twitter Stream consumer-1[Establishing connection]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Connection established.
2015-09-25 13:44:19,931 (Twitter Stream consumer-1[Establishing connection]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Receiving status stream.
2015-09-25 13:44:20,283 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:58)] Serializer = TEXT, UseRawLocalFileSystem = false
2015-09-25 13:44:20,557 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:261)] Creating hdfs://uat.cloudera:8020/user/root/flume/FlumeData.1443213860284.tmp
2015-09-25 13:44:22,435 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:178)] Processed 100 docs
2015-09-25 13:44:25,383 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:178)] Processed 200 docs
2015-09-25 13:44:28,178 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:178)] Processed 300 docs
2015-09-25 13:44:30,505 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:413)] Closing hdfs://uat.cloudera:8020/user/root/flume/FlumeData.1443213860284.tmp
2015-09-25 13:44:30,506 (hdfs-HDFS-call-runner-2) [INFO - org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:339)] Close tries incremented
2015-09-25 13:44:30,526 (hdfs-HDFS-call-runner-3) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:673)] Renaming hdfs://uat.cloudera:8020/user/root/flume/FlumeData.1443213860284.tmp to hdfs://uat.cloudera:8020/user/root/flume/FlumeData.1443213860284
2015-09-25 13:44:30,607 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:261)] Creating hdfs://uat.cloudera:8020/user/root/flume/FlumeData.1443213860285.tmp
2015-09-25 13:44:31,157 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:178)] Processed 400 docs
2015-09-25 13:44:33,330 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:178)] Processed 500 docs
2015-09-25 13:44:36,131 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:178)] Processed 600 docs
2015-09-25 13:44:38,298 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:178)] Processed 700 docs
2015-09-25 13:44:40,465 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:178)] Processed 800 docs
2015-09-25 13:44:41,158 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:413)] Closing hdfs://uat.cloudera:8020/user/root/flume/FlumeData.1443213860285.tmp
2015-09-25 13:44:41,158 (hdfs-HDFS-call-runner-6) [INFO - org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:339)] Close tries incremented
2015-09-25 13:44:41,166 (hdfs-HDFS-call-runner-7) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:673)] Renaming hdfs://uat.cloudera:8020/user/root/flume/FlumeData.1443213860285.tmp to hdfs://uat.cloudera:8020/user/root/flume/FlumeData.1443213860285
2015-09-25 13:44:41,230 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:261)] Creating hdfs://uat.cloudera:8020/user/root/flume/FlumeData.1443213860286.tmp
2015-09-25 13:44:43,238 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:178)] Processed 900 docs
2015-09-25 13:44:46,118 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:178)] Processed 1,000 docs
2015-09-25 13:44:46,118 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:300)] Total docs indexed: 1,000, total skipped docs: 0
2015-09-25 13:44:46,118 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:302)]     35 docs/second
2015-09-25 13:44:46,118 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:304)] Run took 28 seconds and processed:
2015-09-25 13:44:46,118 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:306)]     0.009 MB/sec sent to index
2015-09-25 13:44:46,119 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:308)]     0.255 MB text sent to index
2015-09-25 13:44:46,119 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:310)] There were 0 exceptions ignored:
^C2015-09-25 13:44:46,666 (agent-shutdown-hook) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.stop(LifecycleSupervisor.java:79)] Stopping lifecycle supervisor 10
2015-09-25 13:44:46,673 (agent-shutdown-hook) [INFO - org.apache.flume.source.twitter.TwitterSource.stop(TwitterSource.java:150)] Twitter source Twitter stopping...
Error: Could not find or load main class org.apache.flume.node.Application - Install flume on hadoop version 1.2.1

Error: Could not find or load main class org.apache.flume.node.Application - Install flume on hadoop version 1.2.1


By : Boylife MJ Thailand
Date : March 29 2020, 07:55 AM
wish of those help I have built a hadoop cluster which 1 master-slave node and the other is slave. And now, I wanna build a flume to get all log of the cluster on master machine. However, when I try to install flume from tarball and I always get: Error: Could not find or load main class org.apache.flume.node.Application So, please help me to find the answer, or the best way to install flume on my cluster. many thanks! , It is basically because of FLUME_HOME..
Try this command
Related Posts Related Posts :
  • ExecutorService and OutOfMemoryError: unable to create new native thread while using Executor
  • Java Security Manager completely disable reflection
  • Placement of Thread Content. How do you know what goes inside of the thread, and what doesn't?
  • can not search for txt files android 6.0
  • Monitor program using semaphore does not work as expected in java
  • A way to define / implement failover ldap servers in java code
  • How to save embedded object in POST call using Spring Data Rest
  • Methods in test in parallel way
  • What would be the best way to implement to check if the Post has been liked by the User using JPA and MySQL?
  • GCM unregister causing the application to crash
  • Test configuration annotations in separate annotation
  • How to catch third party console log using log4j
  • Spring Boot: Conditional on database type
  • Java: Loading data from a file located under project folder fails
  • Epson epos sdk receipt alignment issue
  • Java Comparable: helper methods for isLessThan, isGreaterThan, isEqualTo
  • How to find the native method from the JVM source code?
  • Insert datetime string from java in MySQL
  • Regex look ahead to seperate string into tokens
  • How the java de-compiler get to know the variable or object names?
  • Java generics "capture of ?"
  • Specify the default value of the JSONP callback in spring using jackson?
  • Digital Signature created in C# doesn't match in Java verification
  • How to push down project, filter, aggregation to TableScan in Calcite
  • Java/SQL find duplicates
  • Getting a null error trying to add objects into an arraylist (simple piece of code)
  • JDBC query returning zero when using simple arithmetic operations and alias even though the data in table is not zero
  • incompatible type while adding value to Map
  • Openshift Build Failure - Failed to read artifact descriptor
  • updating neo4j database using java program
  • how to save an image to disk and retrive it with java/jsp
  • How to verify kerberos token?
  • String to byte array conversion varies from windows and ubuntu
  • Custom MavenResourcesFiltering hard to implement?
  • TestNG XML file failed to invoke/create the XSSFWorkbook
  • BigDecimal issue with long and lat
  • I am new to JavaFX. I want help on how to make a TreeView node Draggable and Droppable
  • Microservices and Messaging: Message Content
  • How to move an email service into a thread?
  • Reading a .csv file faster than reading same file compressed as .gz
  • How to change the json response fields to user defined fields in java?
  • Retrofit: Caused by: java.lang.IllegalArgumentException: Could not locate call adapter for CustomClass
  • Basic Authentication failed in Windows server 2012 Using Java
  • Avoid Transaction rollback in Spring
  • Transition between right - up, left - down, down - left etc. (JAVA game LWJGL keylistener)
  • How to setup a common classpath for all dependencies inside a maven project?
  • How is Mockito.mock better that using new while writing Junit tests?
  • In apache http client, how to keep the Content-Type in a StringBody as empty or null?
  • SQuirreL Configure: could not initial class org.apache.phoenix.jdbc.PhoenixDriver
  • ResponseEntity decode UTF-8
  • Time difference gives wrong value
  • How to read in and split a string of numbers separated by whitespace and forward slash
  • InvocationTargetException in javafx
  • method reference vs lambda expression
  • Scaling issue in BigDecimal(java) for exponential notation
  • What is causing my app to crash in this program? - Android Studio
  • My method returning an Optional cannot be used in a functional-like way
  • Dangling meta character '*' near index 6
  • How to parse a string in java to get only some parts of it
  • How to replace special character using regex in Mule Dataweave transformation?
  • shadow
    Privacy Policy - Terms - Contact Us © soohba.com