// 1. 从文件中读取数据
DataStreamSource<String> stream1 = env.readTextFile("input/clicks.csv");
// 2. 从集合中读取数据
ArrayList<Integer> nums = new ArrayList<>();
nums.add(2);
nums.add(5);
DataStreamSource<Integer> numStream = env.fromCollection(nums);
ArrayList<Event> events = new ArrayList<>();
events.add(new Event("Mary", "./home", 1000L));
events.add(new Event("Bob", "./cart", 2000L));
DataStreamSource<Event> stream2 = env.fromCollection(events);
// 3. 从元素读取数据
DataStreamSource<Event> stream3 = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
// 4. 从Socket文本流读取
DataStreamSource<String> stream4 = env.socketTextStream("localhost", 7777);
- pom文件
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
- addSource
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","hdp1:9092");
DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties) {
});
//有了自定义的source function,调用addSource方法
DataStreamSource<Event> stream = env.addSource(new ClickSource());
public class ClickSource implements SourceFunction<Event> {
// 声明一个布尔变量,作为控制数据生成的标识位
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
Random random = new Random(); // 在指定的数据集中随机选取数据
String[] users = {"Mary", "Alice", "Bob", "Cary"};
String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
while (running) {
ctx.collect(new Event(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis()
));
// 隔1秒生成一个点击事件,方便观测
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
env.addSource(new CustomSource()).setParallelism(2).print();
public static class CustomSource implements ParallelSourceFunction<Integer> {
private boolean running = true;
private Random random = new Random();
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
while (running) {
sourceContext.collect(random.nextInt());
}
}
@Override
public void cancel() {
running = false;
}
}