本文共 2730 字,大约阅读时间需要 9 分钟。
加速器定义:
package com.daxinimport java.io.Fileimport org.apache.flink.api.common.accumulators.IntCounterimport org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.configuration.Configuration/** * Created by Daxin on 2017/4/18. */object Acc { def main(args: Array[String]) { val numLines = new IntCounter() val env = ExecutionEnvironment.getExecutionEnvironment val data = env.fromElements("1", "2", "5") //TODO RichMapFunction函数可以访问RuntimeContext val wm = data.map(new RichMapFunction[String, String]() { override def open(parameters: Configuration): Unit = {//初始化方法,初始化时候完成加速器的定义 getRuntimeContext.addAccumulator("daxinCounter", numLines) } override def map(value: String): String = { numLines.add(2) "111" } }) val file = new File("C:\\logs\\flink") if (file.exists()) { file.delete() } wm.writeAsText("C:\\logs\\flink") //可以正常执行 // wm.printToErr() //异常:No new data sinks have been defined since the last execution. //println(env.getExecutionPlan()) val rs = env.execute() val counter: Int = rs.getAccumulatorResult("daxinCounter") //println("counter : " + counter) }}
上面就是加速器使用的一个简单Demo,重点是: RichMapFunction,RichMapFunction的声明如下:
public abstract class RichMapFunction接下来看一下RichFunction声明:extends AbstractRichFunction implements MapFunction
/** * An base interface for all rich user-defined functions. This class defines methods for * the life cycle of the functions, as well as methods to access the context in which the functions * are executed. */@Publicpublic interface RichFunction extends Function
RichFunction函数是用户定义的函数的一个接口,这个函数有声明周期方法如下,同时这个接口的方法可以访问函数执行的上下文!
void open(Configuration parameters) throws Exception; //函数初始化方法。初始化操作需在此完成void close() throws Exception;
获取上下文方法:
RuntimeContext getRuntimeContext();IterationRuntimeContext getIterationRuntimeContext();
如果需要在函数中使用加速器,就需要在函数上下文中注册加速器,然后在函数中使用。故需要使用RichMapFunction的getRuntimeContext方法获取上下文!
转载地址:http://ufjlf.baihongyu.com/