技术开发 频道

结合使用Hadoop与Couchbase Server

  将数据从 Couchbase Server 导入 Hadoop

  尽管该场景不是我们这里将直接处理的场景,但需要注意我们可从 Couchbase Server 将数据导入 Hadoop。如果您在 Couchbase Server 中加载了大量数据,并希望利用 Hadoop 来处理和简化它,这可能很有用。为此,您可以使用以下命令,从 Couchbase Server 将整个数据集加载到 HDFS 中的一个 Hadoop 文件中:$ sqoop import --connect http://192.168.0.71:8091/pools --table cbdata。

  这里提供的 URL 是 Couchbase Server 桶池 (bucket pool) 的位置。这里指定的表实际上是 HDFS 中将存储数据的目录的名称。

  数据本身被存储为来自 Couchbase Server 的信息的一种键/值转储形式。在 Couchbase Server 2.0 中,这意味着数据是使用惟一文档 ID 写出的,包含记录的 JSON 值。

  将 JSON 数据写入 Hadoop MapReduce

  要在 Hadoop 与 Couchbase Server 之间交换信息,需要使用一种通用语言来表达这些信息,在本例中使用的是 JSON(参见 清单 5)。

  清单 5. 在 Hadoop MapReduce 中输出 JSON

package org.mcslp;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import com.google.gson.*;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper
Text, Text, IntWritable
> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector
IntWritable
> output, Reporter reporter) throws IOException {
String line
= value.toString();
StringTokenizer tokenizer
= new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer
IntWritable, Text, Text
> {
class wordRecord {
private String word;
private int count;
wordRecord() {
}
}
public void reduce(Text key,
Iterator values,
OutputCollector output,
Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum
+= values.next().get();
}
wordRecord word
= new wordRecord();
word.word
= key.toString();;
word.count
= sum;
Gson json
= new Gson();
System.out.println(json.toJson(word));
output.collect(key,
new Text(json.toJson(word)));
}
}
public static void main(String[] args) throws Exception {
JobConf conf
= new JobConf(WordCount.class);
conf.setJobName(
"wordcount");
conf.setOutputKeyClass(Text.
class);
conf.setOutputValueClass(IntWritable.
class);
conf.setMapperClass(Map.
class);
conf.setReducerClass(Reduce.
class);
conf.setInputFormat(TextInputFormat.
class);
conf.setOutputFormat(TextOutputFormat.
class);
FileInputFormat.setInputPaths(conf,
new Path(args[0]));
FileOutputFormat.setOutputPath(conf,
new Path(args[1]));
JobClient.runJob(conf);
}
}

  该代码是 Hadoop 发行版所提供的字数示例的修改版。

  此版本使用 Google Gson 库从处理过程的精减阶段写入 JSON 信息。为了方便起见,我们使用了一个新类 (wordRecord),它由 Gson 转换为一条 JSON 记录,这种记录是 Couchbase Server 逐个文档地处理和解析内容所需的格式。

  请注意,我们没有为 Hadoop 定义一个 Combiner 类。这将阻止 Hadoop 尝试重新精减该信息,该操作在当前的代码中会失败,因为我们的精减阶段仅接收该单词和一位数,并输出一个 JSON 值。对于辅助的精减/组合阶段,我们需要解析 JSON 输入或定义一个新 Combiner 类,以便输出信息的 JSON 版本。这稍微简化了定义。

  要在 Hadoop 中使用此代码,首先需要将 Google Gson 库复制到 Hadoop 目录中 (/usr/lib/hadoop/lib)。然后重新启动 Hadoop,以确保 Hadoop 已经正确识别出该库。

  接下来,将您的代码编译到一个目录中: $ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar:./google-gson-2.2.1/gson-2.2.1.jar -d wordcount_classes WordCount.java 。

  现在为您的库创建一个 jar 文件: $ jar -cvf wordcount.jar -C wordcount_classes/。

  完成此过程后,您可以将一些文本文件复制到某个目录中,然后使用此 jar 文件将这些文本文件处理为许多独立的单词,创建一条 JSON 记录来包含每个单词和计数。例如,要在一些 Project Gutenberg 文本上处理此数据: $ hadoop jar wordcount.jar org.mcslp.WordCount /user/mc/gutenberg /user/mc/gutenberg-output。

  这将在我们的目录中生成已由 Hadoop 内的 MapReduce 函数统计的单词列表。

  将数据从 Hadoop 导出到 Couchbase Server

  要从 Hadoop 取回数据并导入 Couchbase Server 中,则需要使用 Sqoop 导出该数据: $ sqoop export --connect http://10.2.1.55:8091/pools --table ignored --export-dir gutenberg-output。

  此示例中忽略了 --table 参数,但 --export-dir 是要导出的信息所在的目录的名称。

0
相关文章