sparklyr:R和Apache Spark的接口(一)

sparklyr:R和Apache Spark的接口(一)

本文是学习:https://spark.rstudio.com/首页的笔记,对sparklyr有了个初步的认识。

Apache Spark是一个大数据处理引擎,深入了说我也不懂。

安装

1
2
3
install.packages("sparklyr")
# 安装spark 2.3.2
# spark_install(version = "2.3.2")

连接到spark

这里是在本地创建spark实例,然后进行连接。

1
sc <- spark_connect(master = "local")

使用dplyr

首先把一些数据集拷贝到Spark集群里:

1
2
3
4
5
6
7
library(nycflights13)
library(Lahman)
library(dplyr)

iris_tbl <- copy_to(sc, iris, overwrite = T)
flights_tbl <- copy_to(sc, flights, overwrite = T)
batting_tbl <- copy_to(sc, Batting, "batting")

数据操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
flights_tbl %>% 
filter(dep_delay == 2)
# # Source: spark<?> [?? x 19]
# year month day dep_time sched_dep_time dep_delay
# <int> <int> <int> <int> <int> <dbl>
# 1 2013 1 1 517 515 2
# 2 2013 1 1 542 540 2
# 3 2013 1 1 702 700 2
# 4 2013 1 1 715 713 2
# 5 2013 1 1 752 750 2
# 6 2013 1 1 917 915 2
# 7 2013 1 1 932 930 2
# 8 2013 1 1 1028 1026 2
# 9 2013 1 1 1042 1040 2
# 10 2013 1 1 1231 1229 2
# # … with more rows, and 13 more variables: arr_time <int>,
# # sched_arr_time <int>, arr_delay <dbl>, carrier <chr>,
# # flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
# # air_time <dbl>, distance <dbl>, hour <dbl>,
# # minute <dbl>, time_hour <dttm>

绘制航班延误的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
delay <- flights_tbl %>% 
group_by(tailnum) %>%
summarise(count = n(),
dist = mean(distance),
delay = mean(arr_delay)) %>%
filter(count > 20, dist < 2000, !is.na(delay)) %>%
collect()

delay %>%
ggplot(aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() +
scale_size_area(max_size = 2)

还支持dplyr的窗口函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
batting_tbl %>% 
select(playerID, yearID, teamID, G, AB:H) %>%
arrange(playerID, yearID, teamID) %>%
group_by(playerID) %>%
filter(min_rank(desc(H)) <= 2 & H > 0)

# # Source: spark<?> [?? x 7]
# # Groups: playerID
# # Ordered by: playerID, yearID, teamID
# playerID yearID teamID G AB R H
# <chr> <int> <chr> <int> <int> <int> <int>
# 1 aaronha01 1959 ML1 154 629 116 223
# 2 aaronha01 1963 ML1 161 631 121 201
# 3 abadfe01 2012 HOU 37 7 0 1
# 4 abbated01 1905 BSN 153 610 70 170
# 5 abbated01 1904 BSN 154 579 76 148
# 6 abbeych01 1894 WAS 129 523 95 164
# 7 abbeych01 1895 WAS 132 511 102 141
# 8 abbotji01 1999 MIL 20 21 0 2
# 9 abnersh01 1992 CHA 97 208 21 58
# 10 abnersh01 1990 SDN 91 184 17 45
# # … with more rows

使用SQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
library(DBI)
iris_preview <- dbGetQuery(
sc,
"select * from iris limit 10"
) %>%
as_tibble() %>%
print()

# # A tibble: 10 x 5
# Sepal_Length Sepal_Width Petal_Length Petal_Width Species
# <dbl> <dbl> <dbl> <dbl> <chr>
# 1 5.1 3.5 1.4 0.2 setosa
# 2 4.9 3 1.4 0.2 setosa
# 3 4.7 3.2 1.3 0.2 setosa
# 4 4.6 3.1 1.5 0.2 setosa
# 5 5 3.6 1.4 0.2 setosa
# 6 5.4 3.9 1.7 0.4 setosa
# 7 4.6 3.4 1.4 0.3 setosa
# 8 5 3.4 1.5 0.2 setosa
# 9 4.4 2.9 1.4 0.2 setosa
# 10 4.9 3.1 1.5 0.1 setosa

机器学习功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 这里针对数据集mtcars拟合一个线性回归模型来通过汽车重量wt和气缸数cyl预测耗油量mpg。
mtcars_tbl <- copy_to(sc, mtcars)

# 把数据集分为训练集和测试集
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
mutate(cyl8 = cyl == 8) %>%
sdf_partition(training = 0.5,
test = 0.5,
seed = 1099)

# 针对训练集拟合一个线性模型
fit <- partitions$training %>%
ml_linear_regression(
response = "mpg",
features = c("wt", "cyl")
)

# Formula: mpg ~ wt + cyl

# Coefficients:
# (Intercept) wt cyl
# 33.499452 -2.818463 -0.923187

summary(fit)

# Deviance Residuals:
# Min 1Q Median 3Q Max
# -1.752 -1.134 -0.499 1.296 2.282

# Coefficients:
# (Intercept) wt cyl
# 33.499452 -2.818463 -0.923187

# R-Squared: 0.8274
# Root Mean Squared Error: 1.422

读取和存储数据

1
2
3
4
5
6
7
8
9
10
11
12
13
temp_csv <- tempfile(fileext = ".csv")
temp_parquet <- tempfile(fileext = ".parquet")
temp_json <- tempfile(fileext = ".json")

spark_write_csv(iris_tbl, temp_csv)

iris_csv_tbl <- spark_read_csv(sc, "iris_csv", temp_csv)

spark_write_parquet(iris_tbl, temp_parquet)
iris_parquet_tbl <- spark_read_parquet(sc, "iris_parquet", temp_parquet)

spark_write_json(iris_tbl, temp_json)
iris_json_tbl <- spark_read_json(sc, "iris_json", temp_json)

查看数据库中的表格

1
2
3
src_tbls(sc)

# [1] "batting" "flights" "iris" "mtcars"

分布式运算

你可以使用spark_apply()函数在集群中执行任意r代码。例如,我们可以应用rgamma()函数在iris数据表上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
spark_apply(
iris_tbl,
function(data){
data[1:4] + rgamma(1, 2)
}
)

# # Source: spark<?> [?? x 4]
# Sepal_Length Sepal_Width Petal_Length Petal_Width
# <dbl> <dbl> <dbl> <dbl>
# 1 6.65 5.05 2.95 1.75
# 2 6.45 4.55 2.95 1.75
# 3 6.25 4.75 2.85 1.75
# 4 6.15 4.65 3.05 1.75
# 5 6.55 5.15 2.95 1.75
# 6 6.95 5.45 3.25 1.95
# 7 6.15 4.95 2.95 1.85
# 8 6.55 4.95 3.05 1.75
# 9 5.95 4.45 2.95 1.75
# 10 6.45 4.65 3.05 1.65
# # … with more rows

还可以分组执行操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
spark_apply(
iris_tbl,
function(e) broom::tidy(lm(Petal_Width ~ Petal_Length, e)),
names = c("term", "estimate", "std.error", "statistic", "p.value"),
group_by = "Species"
)

# # Source: spark<?> [?? x 6]
# Species term estimate std.error statistic p.value
# <chr> <chr> <dbl> <dbl> <dbl> <dbl>
# 1 versicolor (Interce… -0.0843 0.161 -0.525 6.02e- 1
# 2 versicolor Petal_Le… 0.331 0.0375 8.83 1.27e-11
# 3 virginica (Interce… 1.14 0.379 2.99 4.34e- 3
# 4 virginica Petal_Le… 0.160 0.0680 2.36 2.25e- 2
# 5 setosa (Interce… -0.0482 0.122 -0.396 6.94e- 1
# 6 setosa Petal_Le… 0.201 0.0826 2.44 1.86e- 2

拓展

sparklyr内部使用的dplyr和机器学习接口设备可用于扩展包,由于Spark式通用集群计算机系统,因此存在很多扩展的潜在应用(例如定制机器学习管道的接口,第三方Spark包的接口等)。

下面是一个简单的示例,它使用R函数包装Spark文本文件进行计数功能:

1
2
3
4
5
6
7
8
9
10
11
12
tempfile <- tempfile(fileext = ".csv")
write.csv(flights, tempfile, row.names = F, na = "")

count_lines <- function(sc, path){
spark_context(sc) %>%
invoke("textFile", path, 1L) %>%
invoke("count")
}

count_lines(sc, tempfile)

# [1] 336777

表实用程序

1
2
3
4
# 可以使用下面的语句将表缓存到内存中:
tbl_cache(sc, "batting")
# 并可以使用下面的代码将表从内存中卸载:
tbl_uncache(sc, "batting")

连接实用程序

1
2
3
4
5
# 可以使用一下语句查看Spark Web控制台:
spark_web(sc)

# 使用spark_log函数查看日志:
spark_log(sc, n = 10)

断开连接

1
spark_disconnect(sc)
# R

评论

程振兴

程振兴 @czxa.top
截止今天,我已经在本博客上写了607.9k个字了!

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×