简易python爬虫–爬取冷笑话

今天闲来无事,就想接触一下爬虫的基本知识,搞大数据,没有数据是万万不可的,而数据的来源,有很大一部分是通过网络爬虫来抓取的。

关于爬虫,似乎python的框架会比较多,比如scrapy。不过刚开始接触,用python一些原生的库就可以完成一些简单的小爬虫程序了,就比如我刚刚写的冷笑话爬虫。

在这之前,得大概搞清楚爬虫的工作原理,其实也很简单。首先是对你的目标网站发出http请求,目标网站会返回一个html文档给你,也就是我们俗称的网页,原本,这些文档都是由一大堆标签构成的,但浏览器配合上css,js,就形成了我们看到的各种各样的网页。但我们的爬虫不需要这些外衣,我们只需要内容,所以爬虫接下来的工作是处理这个html文档,提取我们要的信息,然后继续爬取下一个网页,如此往复循环。

不多说,先上我写的一段python脚本,爬取<我们爱讲冷笑话>的随机页面,网址为:http://lengxiaohua.com/random,这个页面每次刷新都有20则不同的随机冷笑话,我的程序功能是:输入数字1到20看对应的笑话,输入new重新爬取另外20则新笑话,输入quit退出程序。代码如下:

#-*- coding:utf-8 -*-
__author__ = '李鹏飞'
import urllib2
import re

class randomJoke:

    #初始化方法
    def __init__(self):
        self.url = 'http://lengxiaohua.com/random'
        self.user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
        #初始化headers
        self.headers = { 'User-Agent' : self.user_agent }
        #笑话内容
        self.content = []

    #获取网页源代码
    def getSourceCode(self):
        try:
            request = urllib2.Request(url = self.url, headers=self.headers)
            response = urllib2.urlopen(request)
            sourceCode = response.read().decode('utf-8')
            return sourceCode
        except urllib2.URLError, e:
            if hasattr(e,"reason"):
                print u"网络错误...",e.reason
                return None

    #获取笑话
    def setContent(self):
        sourceCode = self.getSourceCode()
        if not sourceCode:
            print('获取网页内容失败~!')
            quit()
        pattern = re.compile(' <pre.*?js="joke_summary".*?"first_char">(.*?)</span>(.*?)</pre>

.*?class="user_info">.*?<a.*?>(.*?)</a>.*?

(.*?)

',re.S)
        items = re.findall(pattern,sourceCode)
        self.content = items
        print u"已经爬取源代码...正在解析源代码..."

    #返回笑话
    def getContent(self):
        return self.content

    #打印一则笑话
    def printAJoke(self,number):
        joke = self.content[number]
        print u"作者:%s" %(joke[2])
        print u'发表于:'+ joke[3]
        #item[0]和item[1]组成完整的内容
        print joke[0]+joke[1]

randomJoke = randomJoke()
notQuit = True
print u"你好,这里是随机笑话!"
print u"---------------------"
randomJoke.setContent()
print u"...."
print u"笑话池已经装满,20/20"
print u"输入1到20看笑话~~,输入quit退出,输入new重新爬取新笑话"
while notQuit:
    input = raw_input()
    if input == "quit" :
        print u"bye!"
        notQuit = False
    elif input == "new":
        randomJoke.setContent()  #重新抓取笑话内容
        print u"...."
        print u"笑话池已经装满,20/20"
        print u"输入1到20看笑话~~,输入quit退出,输入new重新爬取新笑话"
    else:
        input = int(input)
        randomJoke.printAJoke(input-1)
        print u"--------------------------------------------------"
        print u"输入1到20看笑话~~,输入quit退出,输入new重新爬取新笑话"
print u"您已经成功退出!"
quit()

保存这段脚本,运行,效果如下:

运行冷笑话爬虫

说明一下,有些笑话是带图片的,而我没有抓取图片,仅仅抓取了作者,发表日期,和内容。

下面对程序稍微进行一下解释:

引入了urllib2re两个模块,urllib2是用来发送http请求的,re是正则表达式的相关模块。

randomJoke是程序的主要部分,下面就所有的方法,进行简单的说明。

__init__方法

位置:9-15行

这个是初始化方法,定义了四个变量,url为目标网址;user_agent是我们发送http请求的时候伪造的头部的一个客户端信息;headers是头部信息,这里只把user_agent信息加进去就行了;content[]是一个列表,用来存放笑话内容。

getSourceCode方法

位置:18-27行

这个方法是用来获取网页的源代码的。在第20行,发起一个http请求,接着读取返回的源代码,并return源代码。

setContent方法

位置:30-44行

这个方法是比较关键的一个方法,主要是利用正则表达式匹配源代码,并把我们需要的内容爬取下来。第35-41行代码就是正则表达式的匹配模式,我们要抓取网页中的20个笑话,其中的.*?是匹配任意字符,多一个括号,即(.*?)表示匹配的内容是我们需要的内容,在这个片段当中,出现了4(.*?),因此,程序的第36行,item表示的是一个列表,这个列表的形式是这样的:[[[item1-1],[item1-2],[item1-3],[item1-4]],[…],[…],…[[item20-1],[item20-2],[item20-3],[item20-4]]]。即这个列表有20个元素(代表这个网页的20个笑话),每个元素又是一个列表–列表中有4个元素(代表爬取的4个内容,对应前面说的4个(.*?)。这里附上其中的一小部分网页源代码,对应着网页的html源代码,我们就能感受到这个简单的正则表达式的作用了:


<pre js="joke_summary"><span class="first_char">来</span>表哥家吃饭,小侄子一边看西游记一边手舞足蹈上蹦下跳学猴哥。
我哥:“傻儿子,你是孙悟空吗?”
侄子:“正是,你孙爷爷在此。”
打到现在还在哭。</pre>
</div>
<div class="para_info clearfix">
<div class="user_info">
                        <img src="http://joke-image.b0.upaiyun.com/img/user/cover/630347-299927_40x40.jpg" class="left corner4px" height="40" width="40"/>
                        <a href="/user/630347">离愁▍ Feast aw</a>
                        <p>15天前</p>

一共有20个类似的html片段,正则表达式的作用就是把笑话内容,作者和发表日期提取出来。

printAJoke方法

位置:51-56行

打印一则笑话,在这里我们更能感受到content[]的结构了,因为printAJoke这个方法需要传一个number参数进来,范围是1到20,我们就可以读取对应的笑话了,比如content[0]代表第一则笑话,和java中的数组类似。

53行以后就是程序运行的基本逻辑了,比较简单,就不多解释了。

就这样,一个简单的爬虫就完成了!

hbase的基本概念以及shell命令的基本用法【转载】

作为入门,这篇文章写得挺好的,在文章的后半部分,是hbase的shell的简单使用,我们可以通过shell来更好的了解一下hbase,实践出真知。

简介

HBase是一个分布式的、面向列的开源数据库,源于google的一篇论文《bigtable:一个结构化数据的分布式存储系统》。HBase是Google Bigtable的开源实现,它利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协同服务。

HBase的表结构

HBase以表的形式存储数据。表有行和列组成。列划分为若干个列族/列簇(column family)。

Row Key

column-family1

column-family2

column-family3

column1

column2

column1

column2

column3

column1

key1

t1:abc

t2:def

t4:lookfor404

t3:hello

t2:world

key2

t3:abc

t1:lipengfei

t4:java

t3:hello

t2:hadoop

t3:hbase

key3

t2:dfadfasd

t1:dfdasddsf

t2:hi

t1:fine

如上图所示,key1,key2,key3是三条记录的唯一的row key值,column-family1,column-family2,column-family3是三个列族,每个列族下又包括几列。比如column-family1这个列族下包括两列,名字是column1和column2,t1:abc,t2:def是由row key1和column-family1-column1唯一确定的一个单元cell。这个cell中有两个数据,abc和def。两个值的时间戳不一样,分别是t1,t2, hbase会返回最新时间的值给请求者。

这些名词的具体含义如下:

 (1)Row Key

与nosql数据库们一样,row key是用来检索记录的主键。访问hbase table中的行,只有三种方式:

          1.通过单个row key访问

          2.通过row key的range

          3.全表扫描

Row key行键 (Row key)可以是任意字符串(最大长度是 64KB,实际应用中长度一般为 10-100bytes),在hbase内部,row key保存为字节数组。

存储时,数据按照Row key的字典序(byte order)排序存储。设计key时,要充分排序存储这个特性,将经常一起读取的行存储放到一起。(位置相关性)

注意:

字典序对int排序的结果是1,10,100,11,12,13,14,15,16,17,18,19,2,20,21,…,9,91,92,93,94,95,96,97,98,99。要保持整形的自然序,行键必须用0作左填充。

行的一次读写是原子操作 (不论一次读写多少列)。这个设计决策能够使用户很容易的理解程序在对同一个行进行并发更新操作时的行为。

(2)列族 column family

hbase表中的每个列,都归属与某个列族。列族是表的chema的一部分(而列不是),必须在使用表之前定义。列名都以列族作为前缀。例如courses:history , courses:math 都属于 courses 这个列族。

访问控制、磁盘和内存的使用统计都是在列族层面进行的。实际应用中,列族上的控制权限能帮助我们管理不同类型的应用:我们允许一些应用可以添加新的基本数据、一些应用可以读取基本数据并创建继承的列族、一些应用则只允许浏览数据(甚至可能因为隐私的原因不能浏览所有数据)。

(3) 单元 Cell

HBase中通过row和columns确定的为一个存贮单元称为cell。由{row key, column( =<family> + <label>), version} 唯一确定的单元。cell中的数据是没有类型的,全部是字节码形式存贮。

(4) 时间戳 timestamp

每个cell都保存着同一份数据的多个版本。版本通过时间戳来索引。时间戳的类型是 64位整型。时间戳可以由hbase(在数据写入时自动 )赋值,此时时间戳是精确到毫秒的当前系统时间。时间戳也可以由客户显式赋值。如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳。每个cell中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。

为了避免数据存在过多版本造成的的管理 (包括存贮和索引)负担,hbase提供了两种数据版本回收方式。一是保存数据的最后n个版本,二是保存最近一段时间内的版本(比如最近七天)。用户可以针对每个列族进行设置。

HBase shell的基本用法

hbase提供了一个shell的终端给用户交互。通过执行 help get 可以看到命令的帮助信息。进入shell很简单,切换目录到hbase的bin下,运行命令:
./hbase shell
以网上的一个学生成绩表的例子来演示hbase的用法。

name

grade

course

math

art

zkb

5

97

87

baoniu

4

89

80

这里grade对于表来说是一个列,course对于表来说是一个列族,这个列族由两个列组成math和art,当然我们可以根据我们的需要在course中建立更多的列族,如computer,physics等相应的列添加入course列族。图中需要注意的是90这个值,列族下面的列也是可以没有名字的。

(1) 建立一个表格scores  具有两个列族grade 和courese

键入命令:
create 'scores','grade', 'course'

(2) 查看当前HBase中具有哪些表

键入命令:
list

(3) 查看表的构造

键入命令:
describe 'scores'

(4) 加入一行数据,行名称为zkb 列族为grade,列名为””(空),值为5

键入命令:
put 'scores','zkb','grade:','5'

(5) 给zkb这一行的数据的列族course添加一列<math,97>

键入命令:
put 'scores','zkb','course:math','97'

(6) 给zkb这一行的数据的列族course添加一列<art,87>

键入命令:
put'scores','zkb','course:art','87'

(7) 加入一行数据,行名称为baoniu 列族为grade,列名为””(空), 值为4

键入命令:
put'scores','baoniu','grade:','4'

(8) 给baoniu这一行的数据的列族course添加一列<math,89>

键入命令:
put'scores','baoniu','course:math','89'

(9) 给Jerry这一行的数据的列族course添加一列<art,80>

键入命令:
put'scores','baoniu','course:art','80'

(10) 查看scores表中zkb的相关数据

键入命令:
get'scores','zkb'

(11) 查看scores表中所有数据
注意:scan命令可以指定startrow,stoprow来scan多个row,例如:scan 'user_test',{COLUMNS =>'info:username',LIMIT =>10, STARTROW => 'test',STOPROW=>'test2'}

键入命令:
scan'scores'

(12) 查看scores表中所有数据courses列族的所有数据

键入命令:
scan'scores',{COLUMNS => 'course'}

(13) 删除scores表

键入命令:
disable'scores'
drop'scores'

总结下,hbase shell常用的操作命令有create,describe,disable,drop,list,scan,put,get,delete,deleteall,count,status等,通过help可以看到详细的用法。

转自http://blog.csdn.net/smcwwh/article/details/7468672,有些许修改,感谢原作者!

hbase的配置以及遇到的问题

这几天在熟悉学习hbase,记录一下配置过程,以及出现的问题。

首先,hbase是什么?先来一段摘抄:

HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)所提供的分布式数据存储一样,HBase在Hadoop之上提供了类似于Bigtable的能力。HBase是Apache的Hadoop项目的子项目。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。

反正这款数据库在大数据领域是比较出名的,nosql、面向列,都是它的特点。

在安装之前,我们首先最好查看一下hbase和hadoop对应的版本是否兼容。官方文档地址:http://hbase.apache.org/book.html

看到“4. Basic Prerequisites”,往下拉到hadoop那里,有一个表格,表明了hadoop和hbase的兼容情况:

HBase-0.94.x HBase-0.98.x (Support for Hadoop 1.1+ is deprecated.) HBase-1.0.x (Hadoop 1.x is NOT supported) HBase-1.1.x HBase-1.2.x

Hadoop-1.0.x

X

X

X

X

X

Hadoop-1.1.x

S

NT

X

X

X

Hadoop-0.23.x

S

X

X

X

X

Hadoop-2.0.x-alpha

NT

X

X

X

X

Hadoop-2.1.0-beta

NT

X

X

X

X

Hadoop-2.2.0

NT

S

NT

NT

NT

Hadoop-2.3.x

NT

S

NT

NT

NT

Hadoop-2.4.x

NT

S

S

S

S

Hadoop-2.5.x

NT

S

S

S

S

Hadoop-2.6.0

X

X

X

X

X

Hadoop-2.6.1+

NT

NT

NT

NT

S

Hadoop-2.7.0

X

X

X

X

X

Hadoop-2.7.1+

NT

NT

NT

NT

S

我下载的是hbase-0.98.16.1-hadoop2-bin.tar.gz。

下面讲一下配置:

分布式配置

解压文件:
tar -zxvf  hbase-0.98.16.1-hadoop2-bin.tar.gz -C /env/

切换到 hbase目录下的conf目录,编辑hbase-env.sh
vim hbase-env.sh
修改以下两行,并把原有的#去掉,第一行换上你jdk的目录,第二行表示使用hbase自带的zookeeper。

export JAVA_HOME=/usr/java/jdk1.7.0_79/
export HBASE_MANAGES_ZK=true

再修改hbase-site.xml
vim hbase-site.xml
改为以下内容,其中的主机名不要搞错,要在/etc/hosts里面有相应的映射。

<configuration>

  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://main:49002/hbase</value>
    <description>The directory shared byRegionServers.
    </description>
  </property>

  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>

    <property>
      <name>hbase.zookeeper.property.clientPort</name>
      <value>2222</value>
    </property>

    <property>
      <name>hbase.zookeeper.quorum</name>
      <value>main,slave1,slave2</value>
    </property>

    <property>
      <name>hbase.zookeeper.property.dataDir</name>
      <value>/home/hadoop/zookeeper</value>
    </property>

</configuration>

修改regionservers,我把slave1和slave2加进去
vim regionservers
内容修改为

slave1
slave2

一个节点占一行,保存。
把配置好的hbase复制到另外两台主机上

scp -r /env/hbase-0.98.16.1-hadoop2/ root@slave1:/env/
scp -r /env/hbase-0.98.16.1-hadoop2/ root@slave2:/env/

先启动hadoop的hdfs,然后再启动hbase即可。
在bin目录下,输入以下命令
./start-hbase.sh

不过,我是在虚拟机上弄的分布式环境,在运行hbase的shell的时候,HMaster老是自动退出,查看日志,看到是zookeeper的错误,错误如下:

zookeeper.ClientCnxn: Opening socket connection to server main/192.168.254.100:2299. Will not attempt to authenticate using SASL (unknown error)
2016-01-11 05:23:12,267 WARN [main-SendThread(main:2299)] zookeeper.ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused

有点郁闷,找了好多解决方法,都不行,后来转念一想,既然是想熟悉一下hbase,运行一下它的shell,就不搞那么复杂了,重新配个单机版,单机版的配置非常简单。

单机版配置

和分布式配置的步骤基本一致,只需要把hbase-site.xml改为简易版的就行,内容如下:

<configuration>
  <property>
    <name>hbase.rootdir</name>
    <value>/home/hbase</value>
    <description>The directory shared byRegionServers.
    </description>
  </property>
</configuration>

改为单机版之后,首先还是得运行hadoop的hdfs,然后再运行./start-hbase.sh,此时jps一下,看到hadoop的namenode启动了,hbase的Hmaster也启动了,此时再运行./hbase shell就正常了。下一篇转一篇文章,学习一下如何直接在shell操作数据库。

啊啊啊赶在12点断网之前。

在java中使用dom4j进行xml文件的解析

之前在做文本挖掘实验的时候,千辛万苦要到这份duc2002的语料库之后,就要对它进行处理了,它是存在xml文件里的。

一直在用java进行相关的处理,所以这次也不例外,java中,关于xml文件解析的框架有很多,在网上也看了很多,最终选择了dom4j这个框架。下面就简单记录一下使用的过程。

首先去下载,我下载的是dom4j1.6.1。下载地址:http://sourceforge.net/projects/dom4j/files/

下载之后解压就行。然后在工程的buildpath里面添加一个external jars,选择刚才解压之后根目录下的dom4j-1.6.1.jar文件。

我的这个xml文件的层级比较多,对照我要需要的内容,这里就列举一下一部分的内容:

<corpus>
	<cluster cid="d061j">
		<title>d061j</title>
		<topics>
		  <topic>d061j</topic>
		</topics>
		<queries/>
		<documents>
		  <document docid="AP880911-0016">
			<title> Hurricane Gilbert Heads Toward Dominican Coast</title>
			<text>
				<s sid="9"> Hurricane Gilbert swept toward the Dominican Republic Sunday, and the Civil Defense alerted its heavily populated south coast to prepare for high winds, heavy rains and high seas.</s>
				<s sid="10"> The storm was approaching from the southeast with sustained winds of 75 mph gusting to 92 mph.</s>
				<s sid="11"> ``There is no need for alarm,'' Civil Defense Director Eugenio Cabral said in a television alert shortly before midnight Saturday.</s>
				<s sid="12"> Cabral said residents of the province of Barahona should closely follow Gilbert's movement.</s>
				<s sid="13"> An estimated 100,000 people live in the province, including 70,000 in the city of Barahona, about 125 miles west of Santo Domingo.</s>
				<s sid="14"> Tropical Storm Gilbert formed in the eastern Caribbean and strengthened into a hurricane Saturday night.</s>
				<s sid="15"> The National Hurricane Center in Miami reported its position at 2 a.m. Sunday at latitude 16.1 north, longitude 67.5 west, about 140 miles south of Ponce, Puerto Rico, and 200 miles southeast of Santo Domingo.</s>
				<s sid="16"> The National Weather Service in San Juan, Puerto Rico, said Gilbert was moving westward at 15 mph with a ``broad area of cloudiness and heavy weather'' rotating around the center of the storm.</s>
				<s sid="17"> The weather service issued a flash flood watch for Puerto Rico and the Virgin Islands until at least 6 p.m. Sunday.</s>
				<s sid="18"> Strong winds associated with the Gilbert brought coastal flooding, strong southeast winds and up to 12 feet feet to Puerto Rico's south coast.</s>
				<s sid="19"> There were no reports of casualties.</s>
				<s sid="20"> San Juan, on the north coast, had heavy rains and gusts Saturday, but they subsided during the night.</s>
				<s sid="21"> On Saturday, Hurricane Florence was downgraded to a tropical storm and its remnants pushed inland from the U.S. Gulf Coast.</s>
				<s sid="22"> Residents returned home, happy to find little damage from 80 mph winds and sheets of rain.</s>
				<s sid="23"> Florence, the sixth named storm of the 1988 Atlantic storm season, was the second hurricane.</s>
				<s sid="24"> The first, Debby, reached minimal hurricane strength briefly before hitting the Mexican coast last month .</s>
			</text>
		  </document>
		</documents>
	</cluster>
</corpus>

每个cluster有自己的title,接下来,documents是和cluster同级的,ducuments里面又有若干document,每个document下有title,text,text下有p标签,p标签下又有各个句子,标签是s。

我的任务是,将每一篇document的内容输出到一个文本文件,命名格式为:”cl”+cid+”-“+”docid”+docid,其中cid和docid分别是cluster和document的一个属性。

PS:我把文本文件duc2002.xml已经复制到本项目的scr下了。

下面贴上源代码:

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.util.Iterator;

import org.dom4j.Attribute;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;

public class MyTest {
	public static void main(String[] args) throws Exception {
		SAXReader reader = new SAXReader();
		Document thedocument = reader.read(new File("./src/duc2002.xml"));
		Element root = thedocument.getRootElement();
		System.out.println(root);
		Iterator it = root.elementIterator();
		int count = 0;
		for (Iterator i = root.elementIterator("cluster"); i.hasNext();) {
			Element cluster = (Element) i.next();
			System.out.println("cluster id:"
					+ cluster.attribute("cid").getText()); // cluster的id
			Element documents = cluster.element("documents");
			for (Iterator j = documents.elementIterator("document"); j
					.hasNext();) {
				Element document = (Element) j.next();
				Element title = document.element("title");
				for (Iterator k = document.element("models").elementIterator(
						"model"); k.hasNext();) {
					Element model = (Element) k.next();
				}
				File file = new File("F:\\xmltest\\" + "cl"
						+ cluster.attribute("cid").getText() + "-" + "docid"
						+ document.attribute("docid").getText() + ".txt");
				file.createNewFile();
				BufferedWriter bw = new BufferedWriter(new FileWriter(file,
						true));
				bw.write("title:" + title.getText());
				bw.newLine();

				for (Iterator l = document.element("text").element("p")
						.elementIterator("s"); l.hasNext();) {
					Element sentence = (Element) l.next();
					bw.write(sentence.getText().trim());
					bw.newLine();
				}
				bw.close();
			}
			count++;
			System.out.println(count + "finished!");
		}
		
	}
}

这段程序在F盘的xmltest文件夹下不断生成文本文件,我就拿列举的那部分内容生成的文本文件做例子,输出结果是这样的:

xml文件输出为txt结果

对照程序,再简单整理一下dom4j基本的xml解析功能:

  • 第13-15行,初始化解析器,读入xml文件,获得根节点
  • 第19行,用root.elementIterator("cluster")来迭代获得root节点下的cluster
  • 第20行,用Element cluster = (Element) i.next();来获得每一个cluster,如果要获得属性,比如xml文件第2行的cid,可以用第22行的cluster.attribute("cid").getText());
  • 用element方法获得子节点,如23行的Element documents = cluster.element("documents");
  • 获得节点的内容,直接用getText()方法,如第38行,获得<title>的内容

hadoop官方mapreduce简单例子-WordMean

上一篇写了MapReduce的经典例子–WordCount,为了进一步理解和熟悉一下MapReduce这个框架,这次再来看看官方给出的另外一个例子:WordMean

WordMean,Mean我们都知道,是平均数的意思。所以很显然,这个程序是用来统计单词的平均字符数的。

先来跑一下。

在《centos+虚拟机配置hadoop2.5.2-mapreduce-wordcount例子》里面,我已经在hdfs里面新建了一个input文件夹,并且在里面放置了一个test.txt,内容如下:

test.txt的内容
事实上,无论是WordCount还是WordMean,都不仅仅是只处理一个文件,为了验证这一点,我在跑WordMean之前,建多一个文件。

先在linux下新建一个test2.txt

vim /home/txt/test2.txt

内容如下:
test2.txt的内容

将其复制到hdfs下的input文件夹内:

hadoop fs -put /home/txt/text2.txt input

准备工作完毕,现在跑一下程序。仍然是这个jar文件:/hadoop/share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-2.5.2-sources.jar

输出我们放在名为wordmean-output的文件夹下(命令行的最后一个参数),具体运行命令如下:

hadoop jar share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-2.5.2-sources.jar org.apache.hadoop.examples.WordMean input wordmean-output

运行过程中,控制台最后几行信息输出如下:

Bytes Read=173
File Output Format Counters
Bytes Written=20
The mean is: 5.653846153846154

最后一行显示,the mean is 5.653846153846154,即单词的平均字符数。

我手工统计了一下,一共有26个单词,合计147个字符,147/26,确实是这个数。为了进一步确认,到wordmean-output那里看一下输出结果。

hadoop fs -cat wordmean-output/part-r-00000

结果如下,看来我手工也没数错哈哈:
WORDMEAN输出结果

跑完程序了,把源代码贴一下:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.google.common.base.Charsets;

public class WordMean extends Configured implements Tool {

  private double mean = 0;

  private final static Text COUNT = new Text("count");
  private final static Text LENGTH = new Text("length");
  private final static LongWritable ONE = new LongWritable(1);

  
  public static class WordMeanMapper extends
      Mapper<Object, Text, Text, LongWritable> {

    private LongWritable wordLen = new LongWritable();

    
    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        String string = itr.nextToken();
        this.wordLen.set(string.length());
        context.write(LENGTH, this.wordLen);
        context.write(COUNT, ONE);
      }
    }
  }


  public static class WordMeanReducer extends
      Reducer<Text, LongWritable, Text, LongWritable> {

    private LongWritable sum = new LongWritable();

    
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {

      int theSum = 0;
      for (LongWritable val : values) {
        theSum += val.get();
      }
      sum.set(theSum);
      context.write(key, sum);
    }
  }

  private double readAndCalcMean(Path path, Configuration conf)
      throws IOException {
    FileSystem fs = FileSystem.get(conf);
    Path file = new Path(path, "part-r-00000");

    if (!fs.exists(file))
      throw new IOException("Output not found!");

    BufferedReader br = null;

    // average = total sum / number of elements;
    try {
      br = new BufferedReader(new InputStreamReader(fs.open(file), Charsets.UTF_8));

      long count = 0;
      long length = 0;

      String line;
      while ((line = br.readLine()) != null) {
        StringTokenizer st = new StringTokenizer(line);

        // grab type
        String type = st.nextToken();

        // differentiate
        if (type.equals(COUNT.toString())) {
          String countLit = st.nextToken();
          count = Long.parseLong(countLit);
        } else if (type.equals(LENGTH.toString())) {
          String lengthLit = st.nextToken();
          length = Long.parseLong(lengthLit);
        }
      }

      double theMean = (((double) length) / ((double) count));
      System.out.println("The mean is: " + theMean);
      return theMean;
    } finally {
      if (br != null) {
        br.close();
      }
    }
  }

  public static void main(String[] args) throws Exception {
    ToolRunner.run(new Configuration(), new WordMean(), args);
  }

  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: wordmean <in> <out>");
      return 0;
    }

    Configuration conf = getConf();

    @SuppressWarnings("deprecation")
    Job job = new Job(conf, "word mean");
    job.setJarByClass(WordMean.class);
    job.setMapperClass(WordMeanMapper.class);
    job.setCombinerClass(WordMeanReducer.class);
    job.setReducerClass(WordMeanReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    Path outputpath = new Path(args[1]);
    FileOutputFormat.setOutputPath(job, outputpath);
    boolean result = job.waitForCompletion(true);
    mean = readAndCalcMean(outputpath, conf);

    return (result ? 0 : 1);
  }

  /**
   * Only valuable after run() called.
   * 
   * @return Returns the mean value.
   */
  public double getMean() {
    return mean;
  }
}

 

简单分析一下,大概的过程如下:

map过程

看第43和44行,可以知道map输出了两个键值对,一个是<“length”,单词的长度>,一个是<“count”,1>即单词的个数。

reduce过程

代码56至66行。对已经处理好的键值对进行最终处理,分别处理<“length”,<单词的长度>>,和<“count”,<1,1,1,1,1…>>,做的是同样的处理–累加。

readAndCalcMean方法

第68行到111行,主要是读取输出的文件,计算平均数。如上文所示,文件里面的内容如下:

WORDMEAN输出结果

所以这个方法,就是用来读取26和147两个数字,作除法,然后输出到屏幕。具体代码写的很清楚,就不细读了。

hadoop官方mapreduce简单例子-WordCount代码解读

WordCount经常在我们初学mapreduce的时候,被作为最简单的例子来讲解,这次我们就从官方给出的源码入手,看看是怎么回事。

所谓Mapreduce,就是Map(映射)”和”Reduce(归约)。map过程,就是把一组<key,value>映射成新的<key,value>;reduce过程,就是把map过程产生的一些列<key,value>,其中那些key相同的,归约成<key,<values>>来处理,一个key对应一组value。

有了这点认识,我们来看看hadoop的mapreduce源码范例:wordcount。

这个程序用来统计一个文本里面各个单词出现的个数。具体怎么运行可以参看我之前写的《centos+虚拟机配置hadoop2.5.2-mapreduce-wordcount例子

下面分析一下WordCount源码:

Map过程:

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

关于mapper类的要点:

  • maps将输入的键值对输出为中间键值对,输出不一定是和输入是同样类型的,一个输入键值对可以输出多个键值对,也可能不输出。
  • hadoop的Map-Reduce框架对每一个InputSplit键值对都生成一个map任务,这些键值对是由InputFormat生成的。
  • 框架首先调用setup函数,随后是map函数(InputSplit产生的每一个键值对都调用一次),最后调用finally函数。在wordcount中,只重写了map函数。
  • mapper的输出被分成了不同组,供每个reducer来处理,我们可以通过实现抽象类Partitioner来控制排序和分组。

TokenizerMapper类的简单解析:

由于继承mapper类,因此要实现map()方法。
其中有这么一行代码:

StringTokenizer itr = new StringTokenizer(value.toString());

value是传进来的值,对于这个例子而言,是一行的文本,将一行分割成一个又一个单词,然后,用一个while循环迭代,生成新的(key,value)。代码如下:

while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
}

其中的one这个变量,是在刚开始声明的:

private final static IntWritable one = new IntWritable(1);

用来计数,这里是1,所以,输出的(key,value)都是这样的形式:(“单词”,1)。可供之后处理。

Reduce过程:

  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

关于reducer类的要点:

  • reducer将中间输出键值对中那些键相同的合并,值为集合。
  • reduce主要有三个阶段:
    1. shuffle洗牌:reducer将排序好的输出从每个mapper里面复制出来,整个过程用HTTP来通信。
    2. sort排序:框架将reducer的具有相同键的输入合并排序,因为不同的mapper可能有相同的键,shuffle过程和sort过程是同时进行的。
    3. Reduce归约:在reduce阶段,每个key传进来,reduce方法都被调用一次,进行归约。

IntSumReducer类的简单解析:

重写了reduce方法,由于之前已经有map过程了,因此此时传进来的键值对的形式是<key,>,即value不是一个值,而是值的集合。用一个for循环,即可遍历某个key里面的所有值:

for (IntWritable val : values) {
    sum += val.get();
}

这个for循环将某个key对应的所有的value累加,即某单词出现次数的累加。
然后把结果输出:

result.set(sum);
context.write(key, result);

其中的result在之前声明了:

private IntWritable result = new IntWritable();

是整型,所以之后context.write(key,result)就把某单词出现的次数输出了。

Mapreduce的整个过程:

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

要点:

  • Configuration conf = new Configuration(); 这句新建了一个配置对象conf,可以配置mapreduce的一些参数,这里没有对配置作过多的调整。
  • Job job = new Job(conf, “word count”); 这句新建了一个job,用于控制整个工作流程,接下来的6个set函数:
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

它们分别是:
1.设置工作的类名,这里是WordCount
2.设置mapper的类:TokenizerMapper.class)
3.设置combiner的类:IntSumReducer.class
4.设置reducer的类:IntSumReducer.class,和combiner是一样的。
5.设置输出key的格式,text类型
6.设置输出value的格式,int类型。

  • 接着就是设置输入输出路径,都由命令行参数来指定
  • 最后调用job.waitForCompletion(true) 来开始工作。

最后附上完整源代码:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

windows和eclipse下maven的安装与配置

作为Apache旗下的项目,maven是一款著名的软件项目管理工具,通过它,可以解决很多项目流程的问题。就拿hadoop的官方源代码来说,它的项目也是基于maven构建的。以下介绍来自百度百科:

Maven这个单词来自于意第绪语,意为知识的积累,最早在Jakata Turbine项目中它开始被用来试图简化构建过程。当时有很多项目,它们的Ant build文件仅有细微的差别,而JAR文件都由CVS来维护。于是Maven创始者开始了Maven这个项目,该项目的清晰定义包括,一种很方便的发布项目信息的方式,以及一种在多个项目中共享JAR的方式。

介绍就不深入了,主要记录一下windows和eclipse下maven的安装与配置。

 

windows下maven的安装与配置

首先要注意的是,安装maven之前,本机安装的jdk的版本最好在1.7以及以上,否则可能会出现一些问题。

官网下载:http://maven.apache.org/download.cgi

找到新的稳定版本下载即可,我下载的是apache-maven-3.3.9.tar.gz。

解压到你想安装的地方,我放在了D盘。D:\apache-maven-3.3.9。其实到了这一步,基本就快完成了。

接下来只要配置环境变量即可。

在path里加入maven的bin 目录D:\apache-maven-3.3.9\bin  即可

为maven添加环境变量

配置完毕,在cmd下输入命令:mvn -version

查看maven的版本,如果出现以下提示,则说明配置成功。

查看maven版本

 

在eclipse安装maven插件

最简单的方法:下载最新的eclipse,下载最新的eclipse,下载最新的eclipse!

我拿旧版本的eclipse对照着网上的教程,无论是通过eclipse的在线应用商店安装,还是下载离线插件,都安装失败了!当然如果你感兴趣,还是可以找那些教程安装的。安装的插件如下:

eclipse的maven插件

过程就不再详述了,还是那句话,新版本里面已经预装了,能省事就省事,毕竟使用这个工具才是我们的目的。

有了这个插件,还要配置一下,让eclipse里面的maven和我们刚才在windows下安装的maven版本一致。步骤如下

在eclipse的菜单栏依次点击windows–preference

找到maven–Installations

把刚才安装的maven的目录 add进去即可。

配置成功!

eclipese中查看源代码出现Source not found怎么解决

今天在eclipse上,想查看一些开源框架的源代码,在按下ctrl点击左键之后,出现了Source not found的提示,如下图:

source-not-found提示

这个问题怎么解决呢?

其实是这样的。在查看自己写的函数的时候,ctrl当然没有问题,但是我们引用别人写的包,就有问题了。比如我这个工程,引入了3个external jar,并且在程序里用到了它们封装的类和方法:

external-jars我们是看不到这些类和方法的源代码的,因为这些jar文件都是编译好的jar包,所以,我们需要在里面绑定source文件,这样才能查看源代码。

对其中一个jar文件点击右键–properties,第一个选项就是java source attachment,在这里,我们可以绑定和这个jar包对应的源文件。一般来说,开源的项目里,这些源文件的命名都是在后面加多一个-sources。在这里,我需要找到的是hadoop-mapreduce-client-core-2.5.2-sources.jar,一般来说,这个文件会在你引用的jar包目录下的source文件夹下,可以找找看。

比如我找到hadoop-mapreduce-client-core-2.5.2-sources.jar,将其绑定:

source文件

点击ok,绑定完毕。这时候这个jar包图标发生了变化,即有个小的正方形附在上面(对比前面的图):

source绑定完毕其它的jar包也是照葫芦画瓢,绑定完毕。

现在,再使用ctrl+左键,或者对着类或者函数右键–open declaration,都可以直接跳到源代码页面了。

 

centos+虚拟机配置hadoop2.5.2-mapreduce-wordcount例子

配置完hadoop的环境,照例,肯定要跑一个简单的程序测试一下。

似乎提到hadoop,就不得不提mapreduce,作为mapreduce的一个开源实现,hadoop的应用非常广。引用一段文字,大概就能了解一下它们的历史了:

2003年和2004年,Google公司在国际会议上分别发表了两篇关于Google分布式文件系统和MapReduce的论文,公布了Google的GFS和MapReduce的基本原理和主要设计思想。2004年,开源项目Lucene(搜索索引程序库)和Nutch(搜索引擎)的创始人Doug Cutting发现MapReduce正是其所需要的解决大规模Web数据处理的重要技术,因而模仿Google MapReduce,基于Java设计开发了一个称为Hadoop的开源MapReduce并行计算框架和系统。自此,Hadoop成为Apache开源组织下最重要的项目,自其推出后很快得到了全球学术界和工业界的普遍关注,并得到推广和普及应用。

废话不多说,我们来看看怎么跑基于mapreduce的简单例子–wordcount。

首先要格式化namenode。根据你的目录,运行以下的命令

hadoop/bin/hadoop namenode -format

查看信息的倒数几行,如果显示:

Storage directory ~/hadoop-2.5.2/hdfs/name has been successfully formatted

则说明成功了!

接下来启动hdfs,在hadoop文件夹下的sbin文件夹内运行以下命令(根据你当前目录的位置自行调整):

sbin/start-dfs.sh

在主节点运行jps命令,看到namenode进程已经启动:
主节点jps

在子节点键入jps命令,看到datanode进程已经启动:

子节点jps进程

因为之前的配置,所以我们可以在50070端口查看hadoop的工作状态。

比如我可以在浏览器输入:http://192.168.254.100:50070/  (根据你的主节点ip改变网址),可以看到工作状态:

hadoop工作状态

接下来启动yarn,在sbin目录下:

sbin/start-yarn.sh

好了,准备工作完成,接下来,跑程序。

1.在hdfs上创建input目录

如果你的hadoop已经加入环境变量了,你就不需要切换目录到bin下,直接hadoop + 命令就可以了。否则,你得先切换到hadoop的bin目录下,运行以下命令

./hadoop fs -mkdir -p input

2.新建文本
新建一个test.txt

vim /home/test.txt

在里面输入任意内容,单词,然后保存。比如我是这样输入的:

test.txt的内容

将文件复制到hdfs的input目录下:

./hadoop fs -put /home/test.txt input

3.运行wordcount程序

程序在hadoop文件夹下的share下,注意以下命令的路径:

./Hadoop jar share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-2.5.2-sources.jar org.apache.hadoop.examples.WordCount input output

4.观察结果
这是运行过程中的状态:
wordcount工作状态

看看结果生成了什么文件:

./hadoop fs -ls output/

结果生成了两个文件:
wordcount结果
part-r -00000就是我们要的结果。
看看里面的内容:

./hadoop fs -cat output/part-r-00000

里面是计数结果:
wordcount计数结果

 

以上就是wordcount运行的全过程,最后退出hdfs和yarn。先切换到sbin目录下,然后运行以下命令即可:

./stop-dfs.sh
./stop-yarn.sh