前言
记录spark-submit提交Spark程序出现的一个异常,以供第一次出现这种异常且不知道原因,该怎么解决的的同学参考。
1、异常信息
1 | Exception in thread "main" org.apache.spark.SparkException: Application application_1529650293575_0148 finished with failed status |
记录spark-submit提交Spark程序出现的一个异常,以供第一次出现这种异常且不知道原因,该怎么解决的的同学参考。
1 | Exception in thread "main" org.apache.spark.SparkException: Application application_1529650293575_0148 finished with failed status |
本文总结如何将DataFrame按某列降序排序,因为Spark默认的排序方式为升序,而降序的用法和java语言等又不一样,所以需要特地总结记录一下其用法。
1 | val data = Array((7, 2, 3), (1, 8, 6), (4, 5, 9)) |
1 | +----+----+----+ |
1 | df.orderBy("col2").show() |
1 | +----+----+----+ |
本文讲解Spark如何获取当前分区的partitionId,这是一位群友提出的问题,其实只要通过TaskContext.get.partitionId(我是在官网上看到的),下面给出一些示例。
下面的代码主要测试SparkSession,SparkContext创建的rdd和df是否都支持。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37package com.dkl.leanring.partition
import org.apache.spark.sql.SparkSession
import org.apache.spark.TaskContext
/**
* 获取当前分区的partitionId
*/
object GetPartitionIdDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("GetPartitionIdDemo").master("local").getOrCreate()
val sc = spark.sparkContext
val data = Seq(1, 2, 3, 4)
// 测试rdd,三个分区
val rdd = sc.parallelize(data, 3)
rdd.foreach(i => {
println("partitionId:" + TaskContext.get.partitionId)
})
import spark.implicits._
// 测试df,三个分区
val df = rdd.toDF("id")
df.show
df.foreach(row => {
println("partitionId:" + TaskContext.get.partitionId)
})
// 测试df,两个分区
val data1 = Array((1, 2), (3, 4))
val df1 = spark.createDataFrame(data1).repartition(2)
df1.show()
df1.foreach(row => {
println("partitionId:" + TaskContext.get.partitionId)
})
}
}
本文利用SparkStreaming+Kafka实现实时的统计uv,即独立访客,一个用户一天内访问多次算一次,这个看起来要对用户去重,其实只要按照WordCount的思路,最后输出key的数量即可,所以可以利用SparkStreaming+Kafka 实现基于缓存的实时wordcount程序,这里稍加改动,如果uv数量增加的话就打印uv的数量(key的数量)。
数据是我随机在kafka里生产的几条,用户以空格区分开(因为用的之前单词统计的程序)
首先在kafka建一个程序用到topic:KafkaUV1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic KafkaUV
我的目录为:/spark/dkl/kafka/UV_checkpoint1
hadoop fs -mkdir -p /spark/dkl/kafka/UV_checkpoint
本文记录博主如何设置kafka的offset过期时间并测试其效果
通过修改offsets.retention.minutes的值即可改变kafka offset的过期时间,单位为分钟,改完之后需要重启kafka。具体的配置文件为$KAFKA_HOME/config/server.properties,原生的kafka配置文件里可能没有这个配置项,自己添加上即可,比如设置过期时间为一小时,那么按如下配置即可1
offsets.retention.minutes=60
网上有的博客说官网文档对于这个配置的说明有点错误,将offsets.retention.minutes错写成了offsets.topic.retention.minutes,但是我查看了一下,官方文档上并没有写错,可能是之前的版本写错了,而且很多博客按之前的版本写的,大家注意一下。官网文档地址http://kafka.apache.org/documentation/
因本人用ambari管理大数据集群的各个组件,所以在界面上直接修改kafka的配置,在界面上查看kafka的配置offsets.retention.minutes为86400000,因为kafka offset默认过期时间为一天,那么根据这个86400000来看offsets.retention.minutes的单位为毫秒才对,所以一开始误认为单位为毫秒,所以修改配置后的时间设置的很大,导致一开始测试不成功,经过一点点的验证,发现单位实际上为分钟,而ambari上显示的86400000应该是个bug,因为kafka默认的配置文件里是没有这个配置项的,所以我估计ambari一开始也没有配置只是搜索的时候将其显示为86400000,而并没有真正的生效,只有将这个配置项修改之后,才会生效,并且单位为分钟(看了一下ambari的大部分默认时间单位都是毫秒~)。
后来在官网上看到offsets.retention.minutes的default为1440也证实了这一点。
虽然本人的需求是将默认的一天的时间改长一点,但是时间长了测试太慢,所以将时间改短一点测试效果即可,测试代码见Spark Streamming+Kafka提交offset实现有且仅有一次,经过多次测试,得出结论,在修改重启之后,不管是新增加的topic还是之前的topic,只要是新保存的offset都会生效,而之前保存的offset,比如之前是一天才会删除,那么修改重启后,之前保存的offset还是会一天后才能删掉。
more >>
tag:
缺失模块。
1、请确保node版本大于6.2
2、在博客根目录(注意不是yilia根目录)执行以下命令:
npm i hexo-generator-json-content --save
3、在根目录_config.yml里添加配置:
jsonContent: meta: false pages: false posts: title: true date: true path: true text: false raw: false content: false slug: false updated: false comments: false link: false permalink: false excerpt: false categories: false tags: true