背景

最近要给一个客户交付一套系统,想做成全自动安装,于是调研了各种系统镜像打包工具,目标操作系统是openEuler,打包工具调研了packer,openEuler专用工具(isocut,imageTailor,oemaker),genisoimage(据说是从mkisofs fork来的),因为第一次接触这个方面,某开源软件推荐了packer,
于是在这上面花的时间最多,过年的前后折腾了两周,但是发现坑也最多。

packer

这个工具是hashicorp出品的,文档看着挺“好看”的,但是该说的细节一句都没说,难道官方认为能用这个工具的人都是linux内核大牛吗,对小白极不友好。而且我提了个bug一个多月了他们公司也没有人认领。我的需求是要做一个裸金属的系统镜像,可是官方以各种车轱辘话解释我们就是不做裸金属的builder,github issues上有3个相关的讨论时隔7、8年了都没有解决。虽然有个俄罗斯的哥们提出了一个hack方法,但是只能以live boot方式启动,没有通用性。看来哈西莫多走了之后这公司是每况愈下了?

openEuler专用工具

openEuler官方还算贴心一口气出了三个自定义工具,但是看文档看的眼睛都痛了也没看出有什么区别。而且文档里的错误很多,也是在gitee提了issue后就进入没人管的境地。我只试了isocut,起初还期待满满,上手挺容易的,后来发现配置有的反人性,第三方rpm包除了在ks package中指明外,还需要写入rpmlist中,而且还不能带版本和架构信息,如果我有上百个rpm要装进iso里,光写配置文件就得写到抓狂!而且如果我有不同版本要求,不让写版本号你让我怎么用啊!

genisoimage

真是相见恨晚的一个工具!只用了一天时间就把自定义镜像搞完了。基本的使用方法很简单,下一篇会讲一下新手会遇到的几个坑。

Go是一门天生为服务器程序设计的简洁的语言,因此Go的设计原则聚焦在可扩展性,可读性和并发性,而多态性并不是这门语言的设计初衷,因此就被放在了一边。虽然在2.0版本之前还没有泛型的支持,但是go自带的一些语言特性可以满足一些类似“泛型”的要求,比如内置类型:

  • array
  • slice
  • map
  • chan
    这四种类型可以用任意类型的元素初始化,例如map[yourtype]bool就可以用来实现任意元素的集合。go的一些内置函数也是通用的,比如len()既可以用于string, array, slice, 也可以用于求map的长度。

但是如果golang内置结构和函数不能满足需求,可以从以下几种方法来入手:

  1. 类型断言
    当你想实现一个通用函数的时候,会考虑传入的参数是不固定类型的,go正好提供了interface{}类型,它可以代表任意类型。当你不确定用什么类型合适的时候,用它就对了。
    来个简单的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Container struct {
elem []interface{}
}

func (this *Container) Put(v interface{}) {
*this = append(*this, elem)
}
// 取出最后一个元素
func (this *Container) Get() interface{} {
ret := (*c)[len(*c)-1]
*c = (*c)[:len(*c)-1]
return ret
}

func assertExample() {
container := &Container{}
container.Put(1)
container.Put("Hello")
_, ok := container.Get().(int);!ok {
fmt.Println("Unable to read an int from container")
}
}

通过接口类型我们把细节完全隐藏了起来,但是我们也把运行时类型检查失败的风险留给了调用者,而且调用者每次都要写 _, ok := YourType.(type);!ok{} 这种类型断言,比较啰嗦。

  1. 反射机制
    反射机制就是在运行时动态的调用对象的方法和属性,Python和Java语言都有实现,golang是通过reflect包来实现的,反射机制允许程序在编译时处理未知类型的对象,这听上去可以解决我们上面的问题,现在修改一下代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    type Container struct {
    elem reflect.Value
    }
    func NewContainer(t reflect.Type) *Container {
    return &Container{
    elem: reflect.MakeSlice(reflect.SliceOf(t), 0, 10),
    }
    }
    func (this *Container) Put(v interface{}) {
    if reflect.ValueOf(val).Type() != c.elem.Type().Elem() {
    panic(fmt.Sprintf("Cannot set a %T into a slice of %s", val, c.elem.Type().Elem()))
    }
    c.elem = reflect.Append(c.elem, reflect.ValueOf(val))
    }
    func (this *Container) Get(regref interface{}) interface{} {
    retref = c.elem.Index(c.elem.Len()-1)
    c.elem = c.elem.Slice(0, c.elem.Len()-1)
    return retref
    }
    func assertExample() {
    a := 0.123456
    nc := NewContainer(reflect.TypeOf(a))
    nc.Put(a)
    nc.Put(1.11)
    nc.Put(2.22)
    b := 0.0
    c := nc.Get(&b)
    fmt.Println(c)
    d := nc.Get(&b)
    fmt.Println(d)
    }

可以看到使用反射的代码量要增加不少,而且要写各种reflect方法的前缀,对于有代码洁癖的人来说是个灾难,也会让程序员效率变慢,因为没有编译时的类型检查,会带来额外的运行时开销。

  1. 使用接口
    接口有个特点是只做定义不管细节实现,可以利用这一特性实现泛型,例如标准库中的sort包就是使用接口实现对任意容器元素排序的例子
    1
    2
    3
    4
    5
    type Interface interface {
    Len() int
    Less(i, j int) bool
    Swap(i, j int)
    }

只要实现接口定义的这三种方法即可对自定义的容器元素进行排序,具体例子可以参考sort包的文档。查看sort包的源码后可以发现代码非常简洁,但是缺点也很明显,使用者需要自己把接口方法重新实现一遍。

  1. 代码生成工具
    代码生成的原理是先写一个mock类型,这个mock只做占位符使用,然后通过转换工具把这个占位符替换成具体类型,已经有很多人写过了这里不再多说,缺点是生成的文件比较大,依赖第三方工具和模板语法。

总之,golang的泛型实现没有一个固定的方法,或者说一个放之四海而皆准的理想方法,程序设计达到一定规模后,总是需要在代码效率,编译器效率和运行效率之间做出一些取舍,因此我们需要知道实现泛型的不同方法,在适当的时候使用合适的那个。

你有没有考虑过,你的goroutines是如何被go的runtime系统调度的?是否尝试理解过为什么在程序中增加了并发,但并没有给它带来更好的性能?go执行跟踪程序可以帮助回答这些疑问,还有其他和其有关性能的问题,例如延迟、竞争和较低的并行效率。

该工具是Go 1.5版本加入的,通过度量go语言特定事件的运行时,例如:

  1. 创建,启动和终止goroutines
  2. 阻塞/非阻塞goroutines(syscalls, channels, locks)
  3. 网络 I/O
  4. Syscalls
  5. 垃圾回收

以上事件的所有数据会被跟踪器收集,而且不会做任何类型的聚合和抽样。这在一些复杂的应用程序中,通过go tool trace命令对其进行分析后可能会产生一个较大的文件。

在引入执行trace程序之前,已经有了pprof内存和CPU分析器,那么为什么它还会被添加到官方的工具链中呢?虽然CPU分析器做了一件很好的工作,告诉你什么函数占用了最多的CPU时间,但它并不能帮助你确定是什么阻止了goroutine运行,或者在可用的OS线程上如何调度goroutines。这正是跟踪器真正起作用的地方。trace设计文档很好地解释了跟踪程序背后的动机以及它是如何被设计和工作的。

Trace概览

让我们从一个简单的“Hello,world”示例开始。在本例中,我们使用runtime/trace包将trace数据写入标准错误输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"os"
"runtime/trace"
)

func main() {
trace.Start(os.Stderr)
defer trace.Stop()
// create new channel of type int
ch := make(chan int)

// start new anonymous goroutine
go func() {
// send 42 to channel
ch <- 42
}()
// read from channel
<-ch
}

这个例子创建了一个无缓冲的channel,初始化一个goroutine,并发送数字42到channel。运行主goroutine时是阻塞的,它会等待另一个goroutines发送一个int数值给channel。

go run main.go 2> trace.out 运行这段代码会发送trace数据到trace.out,之后可以用go tool trace trace.out读取trace。(该程序是个web app,默认启动127.0.0.1地址的一个随机端口,如果需要修改host可以加参数解决,例如 go tool trace --http=':8080' trace.out,译者加)

Tips: go 1.8之前,你同时需要可执行二进制文件和trace数据来分析trace;用go 1.8之后的版本编译的程序,trace数据已经包含了go tool trace命令所有的信息。

运行该命令后,在浏览器窗口打开该地址,它会提供一些选项。每一个都会打开trace的不同视图,涵盖了程序执行的不同信息。

  1. View trace
    最复杂、最强大和交互式的可视化显示了整个程序执行的时间轴。这个视图显示了在每个虚拟处理器上运行着什么,以及什么是被阻塞等待运行的。稍后我们将在这篇文章中深入探讨这个视图。注意它只能在chrome上显示。

  2. Goroutine analysis
    显示了在整个执行过程中,每种类型的goroutines是如何创建的。在选择一种类型之后就可以看到关于这种类型的goroutine的信息。例如,在试图从mutex获取锁、从网络读取、运行等等每个goroutine被阻塞的时间。

  3. Network/Sync/Syscall blocking profile
    这些图表显示了goroutines在这些资源上所花费的时间。它们非常接近pprof上的内存/cpu分析。这是分析锁竞争的最佳选择。

  4. Scheduler latency profiler
    为调度器级别的信息提供计时功能,显示调度在哪里最耗费时间。

View Trace

点击“View Trace”链接,你会看到一个界面,里面充满了关于整个程序执行的信息。

Tips: 右上角的"?"按钮可以获取快捷方式列表,以帮助跟踪trace。

下面的图片突出了最重要的部分,图片下面是对每个部分的说明描述:

  1. Timeline
    显示执行的时间,根据跟踪定位的不同,时间单位可能会发生变化。你可以通过使用键盘快捷键(WASD键,就像视频游戏一样😊)来导航时间轴。
  2. Heap
    在执行期间显示内存分配,这对于发现内存泄漏非常有用,并检查垃圾回收在每次运行时能够释放多少内存。
  3. Goroutines
    在每个时间点显示有多少goroutines在运行,有多少是可运行的(等待被调度的)。大量可运行的goroutines可能显示调度竞争,例如,当程序创建过多的goroutines,会导致调度程序繁忙。
  4. OS Threads
    显示有多少OS线程正在被使用,有多少个被syscalls阻塞。
  5. Virtual Processors
    每个虚拟处理器显示一行。虚拟处理器的数量由GOMAXPROCS环境变量控制(默认为内核数)。
  6. Goroutines and events
    显示在每个虚拟处理器上有什么goroutine在运行。连接goroutines的连线代表事件。在示例图片中,我们可以看到goroutine “G1.runtime.main”衍生出了两个不同的goroutines:G6和G5(前者是负责收集trace数据的goroutine,后者是我们使用“go”关键字启动的那个)。
    每个处理器的第二行可能显示额外的事件,比如syscalls和运行时事件。这还包括goroutine代表运行时所做的一些工作(例如辅助垃圾回收)。
    下图显示了当选择一个goroutine时得到的信息。

    该信息包含:
  • 它的“名称”(Title)
  • 何时开始(Start)
  • 持续时间(Wall Duration)
  • 开始时的栈trace
  • 结束时的栈trace
  • 该goroutine产生的事件

我们可以看到,这个goroutine创造了两个事件:trace goroutine和在channel上发送42的goroutine。

通过点击一个特定的事件(点击图中的一条连线或者在点击goroutine后选择事件),我们可以看到:

  • 事件开始时的栈信息
  • 事件持续时长
  • 事件包含的goroutine

你可以点击这些goroutines来定位跟踪到它们的trace数据。

阻塞概况

从trace中获得的另一个特殊视图是网络/同步/syscall阻塞概况。阻塞概况显示了一个类似于pprof的内存/cpu概况中的图形视图。不同之处在于,这些概况显示每个goroutine在一个特定资源上花费的阻塞时间,而不是显示每个函数分配了多少内存。
下图告诉我们示例代码的“同步阻塞概况”

这告诉我们,我们的主goroutine从一个channel接收花费了12.08微秒。当太多的goroutines在竞争着获取一个资源的锁时,这种类型的图是找到锁竞争的很好的方法。

收集trace

有三种收集trace的方法:

  1. 使用runtime/trace
    这个需要调用trace.Starttrace.Stop,已经在我们的示例程序中讲过。
  2. 使用-trace=<file>测试标志
    用来收集关于被测试代码的trace时比较有用
  3. 使用debug/pprof/trace handler
    这是用来收集运行中的web应用的trace的最好的方法

跟踪一个web应用

想要从一个运行的web应用收集trace, 你需要添加/debug/pprof/trace handler。下面的代码示例展示了如何通过简单地导入net/http/pprof包为http.DefaultServerMux做到这一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"net/http"
_ "net/http/pprof"
)

func main() {
http.Handle("/hello", http.HandlerFunc(helloHandler))

http.ListenAndServe("localhost:8181", http.DefaultServeMux)
}

func helloHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("hello world!"))
}

为了收集trace,我们需要向endpoint发出请求,例如,curl localhost:8181/debug/pprof/trace?seconds=10 > trace.out 此请求将阻塞10秒钟,trace数据将写入文件trace.out。像这样生成的trace可以像我们以前那样查看:go tool trace trace.out

Tips: 请注意,将pprof handlers暴露给Internet是不建议的。推荐的做法是在不同的只绑定到loopback接口的http.Server暴露这些endpoint。这篇[博客](http://mmcloughlin.com/posts/your-pprof-is-showing)讨论该风险,并有代码示例解释如何正确地暴露pprof handler。

在收集trace之前,让我们首先通过wrk来给我们的服务加一些负载:
$ wrk -c 100 -t 10 -d 60s http://localhost:8181/hello

这将使用10个线程的100个连接在60秒内发出请求。当wrk正在运行时,我们可以使用curl localhost:8181/debug/pprof/trace?seconds=5 > trace.out来收集5s的trace数据。这会产生一个5MB的文件(如果我们能够在我的4核CPU机器上生成更多的负载,它就可以快速增长)。
同样,打开trace是由go tool trace命令完成的。当该工具解析文件的整个内容时,这将花费比我们之前的示例花费的时间更长。当它完成时,页面看起来略有不同:

1
2
3
4
5
6
7
8
View trace (0s-2.546634537s)
View trace (2.546634537s-5.00392737s)

Goroutine analysis
Network blocking profile
Synchronization blocking profile
Syscall blocking profile
Scheduler latency profile

为了保证浏览器渲染呈现所有内容,该工具将trace分为两个连续的部分。更复杂的应用或更长的trace可能需要工具将其分割成更多的部分。
点击“View trace(2.546634537-5.00392737)”我们可以看到有很多事情正在发生:

这个特殊的屏幕截图显示了一个GC运行情况,它从1169ms-1170ms开始,在1174ms之后结束。在这段时间里,一个OS线程(PROC 1)运行一个用于GC的goroutine,而其他goroutines则在一些GC阶段中提供辅助(这些步骤显示在goroutine的连线中,并被叫做MARK ASSIST)。在截图的最后,我们可以看到大部分分配的内存都被GC释放了。
另一个特别有用的信息是在“Runnable”状态下的goroutines的数量(在选定的时间内是13):如果这个数字随着时间的推移变得很大,这就意味着我们需要更多的cpu来处理负载。

结论

trace程序是调试并发问题的强大工具。例如,竞争和逻辑冲突。但它并不能解决所有的问题:它并不是用来跟踪哪块代码花费最多CPU时间或分配的最佳工具。go tool pprof更适用于这些用例。
当你想要了解一个耗时程序的行为,并且想知道当每个goroutine不运行时它在做什么,这个工具就会很好地发挥作用。收集trace可能会有一些开销,并且会生成大量的数据用来检查。
不幸的是,官方文档是缺失的,因此需要进行一些试验来尝试和理解trace程序所显示的内容。这也是对官方文档和整个社区作出贡献的机会(e.g 博客文章)。

参考

  1. Go execution tracer (design doc)
  2. Using the go tracer to speed fractal rendering
  3. Go tool trace
  4. Your pprof is showing

原文链接

花了几个月时间把《未来简史》看完了,感慨于作者的博学和出众的思考能力,书中不但对科技做了大胆预测和推断,也提到了很多宗教,神学和政治观点。我最近痴迷历史,恰逢研究生课程讲中国特色社会主义,在老师的推荐下,我也简单翻看了马克思韦伯的《新教伦理与资本主义精神》,有这些做铺垫,看到《未来简史》中对宗教和政治的阐述时便更容易理解了,资本主义制度,绝不是我们教科书上写的那样。我出生的年代正好赶上中国掀起市场经济浪潮,而我的父辈,祖辈,刚好经历了中国历史上最为动乱和飘摇不定的年代,他们没有太多文化,也对社会制度不甚了解,甚至我这一代还是有很多人对自己对社会制度对生活的大环境一无所知,思想守旧,极度缺乏理性思维,不能与时俱进。这是极其可怕的,沉沦的人们急需觉醒,否则思想鸦片将贻害子孙。过去一百多年,中国社会制度既姓过公也姓过私,教科书的洗脑和左的力量压制下,我们放大了社会主义的作用,而故意轻视了资本主义的影响。这里有个人值得提一下,是他强调“我们落后的关键还是我们从五十年代起,不抓经济而抓阶级斗争,搞一大二公的社会主义。我这里不是说社会主义搞错了,但我也不能说我们完全搞对了。”“发展才是硬道理,成天去争论什么资本主义、社会主义有啥意思?你搞得清楚吗?反正我是搞不清楚。”他是名副其实的,真正让大多数中国人一年到头有钱大吃大喝、消遣娱乐、满世界旅游以至于让春节都失去了年味儿的“财神”。中国(经济)在70年代末基本频临崩溃,没有他,我们极可能仍如朝鲜一样饥寒交迫,更惘谈看世界。三十年来不断有把他印到更大面额人民币上去的传闻,但最终他的形象还只是出现在《时代》周刊之类的媒体封面上。人民币大钞从10元涨到了100元,印的还是那个让我们凭票证过年才能吃点儿好的、穿衣服新三年旧三年缝缝补补又三年的大救星。

(引用自华尔街见闻https://wallstreetcn.com/articles/3236961)

昨天一个大新闻,宪法移除国家主席任期不超两届的内容。之前在外媒多次看到过,竟然终于来了。中华民族必将崛起,社会制度可能还会变化,科学和宗教也会继续进化,又是一个戊戌年,不接受新思维,以后的日子大概不会好过。

最近开始学习Go语言,在windows下面用vscode + go插件做开发环境遇到了一些问题:

  1. 代码补全不能用
  2. 代码定义和引用找不到

总结一下主要是go插件的使用,和gopath的设置问题

  1. go插件并没有为你做完所有事情,你需要自己用go get安装必要的外部包,或者点击右下角的Analysis Tools Missing 按钮,vscode可以帮你自动安装,这和命令行执行go get是一样的。
    另外,go get命令在windows下要注意加参数

    1
    go get -u -ldflags -H=windowsgui github.com/nsf/gocode

    这样可以避免windows服务器程序窗口挂起。

  2. gopath可以设置多个:我在本机环境变量中加了两个路径到GOPATH中,也在vscode设置中设置好了,但是只能够在标准库包中找到代码定义和引用的跳转,引用第三方库的时候就找不到了。经过一番研究,是vscode的设置问题,需要在gopath中把两个path都写上,举例如下

    1
    "go.gopath": "C:/path1;D:/path2"

    windows环境变量是用分号分隔,所以这里也用分号,写到一行里。这回vscode就可以跳转到外部包的定义和引用了

最近用airflow做流程调度,遇到了不少跟时间相关的问题。现在整理一下遇到的几个时间,并给出他们在源码中的定义。

模板时间

在models.py get_template_context(self, session=None)函数中定义

1
2
3
4
5
6
7
8
9
10
11
12
ds = self.execution_date.isoformat()[:10]
ts = self.execution_date.isoformat()
yesterday_ds = (self.execution_date - timedelta(1)).isoformat()[:10]
tomorrow_ds = (self.execution_date + timedelta(1)).isoformat()[:10]

prev_execution_date = task.dag.previous_schedule(self.execution_date)
next_execution_date = task.dag.following_schedule(self.execution_date)

ds_nodash = ds.replace('-', '')
ts_nodash = ts.replace('-', '').replace(':', '')
yesterday_ds_nodash = yesterday_ds.replace('-', '')
tomorrow_ds_nodash = tomorrow_ds.replace('-', '')

可见模板时间全部依赖execution_date, execution_date从哪定义的呢?稍后说

使用模板变量是还遇到一个格式化字符串问题:例如我有下面一个模板字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sqoop_inc_cmd = r"""
sqoop import \
--connect 'jdbc:sqlserver://hostname:port;username=USERNAME;password=PASSWORD;database=DBNAME' \
--query "{{ params.query }} and \$CONDITIONS" \
--delete-target-dir \
--target-dir {{ params.targetdir }} \
--hive-import \
--hive-table {{ params.hivetable }} \
--fields-terminated-by '\t' \
--hive-partition-key ds \
--hive-partition-value {{ params.ds}} \
--null-string '\\N' \
--null-non-string '\\N' \
-m 1
"""

如果params.ds这个变量是用bash_operator参数传递的,模板的变量替换会失败。猜测可能是因为ds变量命名问题
改成params.ds_value后,模板变量替换正常。

execution_date

找到airflow web 页面,execution date随处可见,分两种:

  • 一种是scheduled run, dag的execution date 是start date减去一天
  • 另一种是external triggered run, execution date就是当时触发的时间

最近在学习搭建Hadoop分布式系统,折腾一番后发现,利用虚拟机搭建实验环境真是很快。这里介绍一下虚拟机集群常用的网络类型。

Bridge

最初我选择了桥接模式,这种方式是把虚拟机和主机归入一个局域网络中,虚拟机要设置成静态IP,主机和虚拟机之间互相可以访问,虚拟机可以上网。
缺点是虚拟机占用局域网的ip资源,而且如果换了网络环境,需要重新更改ip地址。

Host-only

这种方式与桥接类似。这种方式需要创建一个虚拟以太网,然后主机和虚拟机都利用这块网卡通信。
优点是这个局域网是虚拟的,因此可以在主机上创建多个局域网环境,不依赖物理网络。

先在preference里添加一块host-only网卡,如果安装virtualbox时自动添加了,可以忽略。
检查这块网卡的DHCP server是否启用。若禁用,需要先启用它,否则你的虚拟机不会被分配对应的IP地址。如下图所示

然后在虚拟机的settings->network选项中,启用adapter1,选择NAT,再启用adapter2,选择Host-only mode,保存,启动虚拟机,大功告成。
虚拟机启动后,分别在主机和虚拟机ping检测,成功!

我用的是最新的ubuntu 16.04.2 server,启动虚拟机后检查网卡ls /sys/class/net,会发现网卡名不是eth0,eth1这样了,而是enp0s3,enp0s8,原因是ubuntu 16使用了新的网卡命名规范

认识Confluent

Kafka用得最多的是他的大吞吐量的消息队列,很多公司利用这个特点构建了自己的数据管道系统。数据管道是大数据系统的灵魂。
由LinkedIn创建的Kafka在apache开源后并没有停下脚步,三位作者离开LinkedIn创建了Confluent公司,并且开发并开源了一些新的玩具。
我在工作中遇到一个任务,要为第三方用户构建一个数据查询系统。不能直接拿OLTP数据库来给第三方用,需要把业务数据导入到这个查询系统里,如果能保持时效性就更好了。
研究了一下Kafka的主页,发现了一个新玩具蛮好用的,现在就介绍给大家。

Kafka connect

Kafka connect就是这个新玩具,可以将数据和文件实时导入到Kafka中,然后再从Kafka中导入到下游系统进行存储。下面我会介绍两个例子,把MySQL source数据库导入到sink数据库,
重点是connect配置文件的用法。

快速上手

安装

需要安装Java>=1.7和最新版本Kafka,可以参考官方手册,在这里就不多说了

启动

安装完成后,默认的bin文件以及被copy到了系统路径中,分别启动ZooKeeper, Kafka, Schema Registry,为了方便调试,我都在前台运行

1
2
3
4
5
6
7
8
# Start ZooKeeper.  Run this command in its own terminal.
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

# Start Kafka. Run this command in its own terminal.
$ ./bin/kafka-server-start ./etc/kafka/server.properties

# Start Schema Registry. Run this command in its own terminal.
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

Kafka connect只包含了PostgreSQL和SQLite的JDBC驱动,没有MySQL驱动,我理解可能因为MySQL是商业公司的产品,所以没有包含进来,没有关系,可以自己下载。
下载MySQL驱动,把jar文件解压到/usr/share/java/kafka-connect-jdbc就可以了。

/etc/kafka-connect-jdbc/下面有一些配置文件的模板,先来复制一个source模板,然后在这基础上添加我们自己的参数:

1
2
3
4
5
6
7
8
name=mysql-myprj-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/myprj_online?user=root&password=mypassword
table.whitelist=main_orderinfo
mode=incrementing
incrementing.column.name=right_id
topic.prefix=mysql-

然后再复制一个sink模板,修改如下:

1
2
3
4
5
6
name=sink-mysql-myprj
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=mysql-main_orderinfo
connection.url=jdbc:mysql://localhost:3306/myprj_copy?user=root&password=mypassword
auto.create=true

这里我把同一个节点上的online数据库复制到了另一个copy数据库中。
接下来运行执行standalone模式的worker:

1
2
# 第一个参数是worker的配置文件,后面可以跟无数个connector[connector1, connector2,...]
connect-standalone /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-mysql-myprj.properties /etc/kafka-connect-jdbc/sink-mysql-myprj.properties

然后去查看copy数据库,可以看到数据都已经同步过来了,在source库中插入一条数据,可以看到copy库中也出现了这条记录。

遇到的几个小问题

  • Kafka中包含了多个slf4j,运行时可能会报错,只要删除其中一个即可
  • hostname可能会出现不能识别的问题,这时只要把hostname在/etc/hosts中映射到127.0.0.1即可
  • 运行worker是可以跟着多个connect配置文件的,我之前错误的启动了两个standalone worker,一直报错说REST server找不到,是因为/etc/schema-registry/connect-avro-standalone.properties
    只能对应一个worker, 如果启动多个standalone worker,需要针对worker实例分别对offset.storage.file.filenamerest.port进行配置

第二个例子会深入介绍一下connect配置文件的用法,下一篇再讲吧,happy big data~

最近在看《Spark for python developer》, 在安装spark和集成notebook时遇到一点小问题,记录一下。
直接安装Spark编译后的最新版本,现在notebook已经被jupyter集成,也需要安装。然后需要设置两个环境变量如下

1
2
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --NotebookApp.open_browser=False --NotebookApp.ip='*' --NotebookApp.port=8888"

这时再重新运行 ./bin/pyspark会看到控制台上notebook server log输出, 浏览器访问localhost:8888,就可以看到notebook页面了

接下来测试一下,统计shakespeare全集中的单词数量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from operator import add

file_in = sc.textFile('/opt/shakespeare.txt')
# count lines
print('number of lines in file: %s' % file_in.count())
# add up lengths of each line
chars = file_in.map(lambda s: len(s)).reduce(add)
print('number of characters in file: %s' % chars)
# Get words from the input file
words =file_in.flatMap(lambda line: re.split('\W+', line.lower().strip()))
# words of more than 3 characters
words = words.filter(lambda x: len(x) > 3)
# set count 1 per word
words = words.map(lambda w: (w,1))
# reduce phase - sum count all the words
words = words.reduceByKey(add)

Happy Bigdata!

认识Kylin

之前的BI报表开发,都是基于关系型数据库,数据仓库工程师把数据放到了大宽表里。但是关系型数据库无法满足
日益增长的查询需求,而且性能受到了挑战,关系型数据仓库需要向大数据数据仓库转型。
我是从Web开发走过来的,不了解数据仓库的设计逻辑,在网上查了下资料,了解了数据仓库的星型模型和星座模型,
比如这篇星型模型wiki
如果更复杂的可以用雪花模型snowflake schema 雪花模型wiki
大数据的数据仓库,用Hive的比较多,Kylin是大数据的数据仓库OLAP引擎。

使用中的问题

新手最重要的是看懂Learn Kylin里面的example,把每一个细节都看懂,这样就可以设计一个好的Cube.
在这里我先跳过安装的步骤,直接总结使用Kylin两周以来遇到的问题:

设计Model

  • Kylin只支持星型模型,这一点非常重要,如果已经有了Hive表但不是星型模型,可以通过创建Hive View来使用Kylin。
  • Model Diemnsions 需要把所有要用到的Dimensions都选上,如果遗漏了Column,在设计Cube时就找不到了。
  • Measures也是同样,不要遗漏
  • settings中的过滤功能可以过滤掉不需要的数据

设计Cube

  • 设计Cube dimension,可以直接用Auto generation功能生成所需的维度
  • 维度有两种类型:有外键对应的维度表中的column,称为derived维度,意思是由外键派生出来的;其他的都是normal维度。
    接下来在advnced settings中,mandatory 维度意思是每个cube都包含的;Hierarchy意思是分层的维度,例如国家-省份-城市关系
  • 在advanced settings中,rowkeys是自动生成的,但是如果column的值太长,比如ID这种字段,Encoding如果用默认的dict会报错
    Caused by: java.lang.IllegalArgumentException: Too high cardinality is not suitable for dictionary
    改成integer后报错消失,但目前我对rowkeys还是不太理解,我只试过integer。
  • 如果Cube已经build过,对他修改前需要先Purge

查询

Kylin的SQL语法使用了Apache Calcite, 功能还是很强大的,但是使用中也有一些限制:

  • 不支持case when,使用case when的场景就说明聚合时还要对cube的measure结果进行重新分割,不符合Kylin的设计。可以把case when的场景就说明聚合时还要对cube的measure结果进行重新分割,不符合Kylin的设计,
    可以把case when条件写入到列中,或者把条件放到一个lookup table中;如果Hive表改动代价比较大,可以使用Hive视图,性能是不会受到损失的。
0%