1)调度器的初始化。
调度器调度 Spark Streaming 的运行,用户可以通过配置相关参数进行调优。
2)将输入流的接收器转化为 RDD 在集群进行分布式分配,然后启动接收器集合中的每个接收器。
针对不同的数据源, Spark Streaming 提供了不同的数据接收器,分布在各个节点上的每个接收器可以认为是一个特定的进程,接收一部分流数据作为输入。
package org.apache.spark.examples.streaming
import org.apache.spark.sparkconf
import org.apache.spark.streaming.{seconds, streamingcontext}
import org.apache.spark.storage.storagelevel
object networkwordcount {
def main(args: array[string]) {
if (args.length < 2)="" {="" system.err.println("usage:="" networkwordcount="">
system.exit(1)
}
streamingexamples.setstreamingloglevels()
// create the context with a 1 second batch size
val sparkconf = new sparkconf().setappname("networkwordcount")
val ssc = new streamingcontext(sparkconf, seconds(1))
// create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// note that no duplication in storage level only for running locally.
// replication necessary in distributed scenario for fault tolerance.
val lines = ssc.sockettextstream(args(0), args(1).toint, storagelevel.memory_and_disk_ser)
val words = lines.flatmap(_.split(" "))
val wordcounts = words.map(x => (x, 1)).reducebykey(_ + _)
wordcounts.print()
ssc.start()
ssc.awaittermination()
}
}
java版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package org.apache.spark.examples.streaming;
import scala.tuple2;
import com.google.common.collect.lists;
import org.apache.spark.sparkconf;
import org.apache.spark.api.java.function.flatmapfunction;
import org.apache.spark.api.java.function.function2;
import org.apache.spark.api.java.function.pairfunction;
import org.apache.spark.api.java.storagelevels;
import org.apache.spark.streaming.durations;
import org.apache.spark.streaming.api.java.javadstream;
import org.apache.spark.streaming.api.java.javapairdstream;
import org.apache.spark.streaming.api.java.javareceiverinputdstream;
import org.apache.spark.streaming.api.java.javastreamingcontext;
import java.util.regex.pattern;
public final class javanetworkwordcount {
private static final pattern space = pattern.compile(" ");
public static void main(string[] args) {
if (args.length < 2)="" {="" system.err.println("usage:="" javanetworkwordcount="">
system.exit(1);
}
streamingexamples.setstreamingloglevels();
// create the context with a 1 second batch size
sparkconf sparkconf = new sparkconf().setappname("javanetworkwordcount");
javastreamingcontext ssc = new javastreamingcontext(sparkconf, durations.seconds(1));
// create a javareceiverinputdstream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// note that no duplication in storage level only for running locally.
// replication necessary in distributed scenario for fault tolerance.
javareceiverinputdstream
args[0], integer.parseint(args[1]), storagelevels.memory_and_disk_ser);
javadstream
@override
public iterable
return lists.newarraylist(space.split(x));
}
});
javapairdstream
new pairfunction
@override
public tuple2
return new tuple2
}
}).reducebykey(new function2
@override
public integer call(integer i1, integer i2) {
return i1 + i2;
}
});
wordcounts.print();
ssc.start();
ssc.awaittermination();
}
}
我之前是将spark作业以yarn cluster模式提交到yarn,由yarn启动spark作业,在某个子节点的executor会监听该端口,接收数据。