博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink关于加速器的使用
阅读量:2055 次
发布时间:2019-04-28

本文共 2730 字,大约阅读时间需要 9 分钟。

加速器定义:

 Accumulators collect distributed statistics or aggregates in a from user functions and operators. Each parallel instance creates and updates its own accumulator object, and the different parallel instances of the accumulator are later merged. merged by the system at the end of the job. The result can be obtained from the result of a job execution, or from the web runtime monitor. The accumulators are inspired by the Hadoop/MapReduce counters. The type added to the accumulator might differ from the type returned. This is the case e.g. for a set-accumulator: We add single objects, but the result is a set of objects.
这段英文定义很容易理解,故不翻译!
直接上代码:

package com.daxinimport java.io.Fileimport org.apache.flink.api.common.accumulators.IntCounterimport org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.configuration.Configuration/**  * Created by Daxin on 2017/4/18.  */object Acc {  def main(args: Array[String]) {    val numLines = new IntCounter()    val env = ExecutionEnvironment.getExecutionEnvironment    val data = env.fromElements("1", "2", "5")    //TODO RichMapFunction函数可以访问RuntimeContext    val wm = data.map(new RichMapFunction[String, String]() {      override def open(parameters: Configuration): Unit = {//初始化方法,初始化时候完成加速器的定义        getRuntimeContext.addAccumulator("daxinCounter", numLines)      }      override def map(value: String): String = {        numLines.add(2)        "111"      }    })    val file = new File("C:\\logs\\flink")    if (file.exists()) {      file.delete()    }    wm.writeAsText("C:\\logs\\flink") //可以正常执行    // wm.printToErr()  //异常:No new data sinks have been defined since the last execution.    //println(env.getExecutionPlan())    val rs = env.execute()    val counter: Int = rs.getAccumulatorResult("daxinCounter")    //println("counter : " + counter)  }}

上面就是加速器使用的一个简单Demo,重点是: RichMapFunction,RichMapFunction的声明如下:

public abstract class RichMapFunction
extends AbstractRichFunction implements MapFunction
接下来看一下RichFunction声明:

/** * An base interface for all rich user-defined functions. This class defines methods for * the life cycle of the functions, as well as methods to access the context in which the functions * are executed. */@Publicpublic interface RichFunction extends Function

RichFunction函数是用户定义的函数的一个接口,这个函数有声明周期方法如下,同时这个接口的方法可以访问函数执行的上下文!

void open(Configuration parameters) throws Exception; //函数初始化方法。初始化操作需在此完成void close() throws Exception;

获取上下文方法:

RuntimeContext getRuntimeContext();IterationRuntimeContext getIterationRuntimeContext();

如果需要在函数中使用加速器,就需要在函数上下文中注册加速器,然后在函数中使用。故需要使用RichMapFunction的getRuntimeContext方法获取上下文!

转载地址:http://ufjlf.baihongyu.com/

你可能感兴趣的文章
让 Linux 防火墙新秀 nftables 为你的 VPS 保驾护航
查看>>
Istio 1.4 部署指南
查看>>
贫苦家庭用户的 Envoy xDS 控制平面
查看>>
Kubernetes Pod 网络精髓:pause 容器详解
查看>>
Docker 技术鼻祖 Linux Namespace 入门系列:Namespace API
查看>>
使用 ebpf 深入分析容器网络 dup 包问题
查看>>
Kubelet 中的 “PLEG is not healthy” 到底是个什么鬼?
查看>>
超详细的网络抓包神器 Tcpdump 使用指南
查看>>
从 Kubernetes 资源控制到开放应用模型,控制器的进化之旅
查看>>
从此以后运维与开发过上了没羞没臊的性福生活
查看>>
教你如何优雅地魔改 Grafana 主题,太实用了!
查看>>
让我们来看看回到单体的 Istio 到底该怎么部署
查看>>
超详细的网络抓包神器 tcpdump 使用指南
查看>>
iTerm2 都不会用,还敢自称老司机?(上)
查看>>
两个奇技淫巧,将 Docker 镜像体积减小 99%
查看>>
Istio 1.5 部署指南修正版
查看>>
不要轻易使用 Alpine 镜像来构建 Docker 镜像,有坑!
查看>>
Kubectl exec 背后到底发生了什么?
查看>>
程序员涨薪宝典
查看>>
什么?终止一个容器竟然用了 10 秒钟,这不能忍!
查看>>