IT技術互動交流平臺

使用Phoenix將SQL代碼移植至HBase

來源:IT165收集  發布日期:2016-07-14 22:04:47

1.前言

HBase是云計算環境下最重要的NOSQL數據庫,提供了基于Hadoop的數據存儲、索引、查詢,其最大的優點就是可以通過硬件的擴展從而幾乎無限的擴展其存儲和檢索能力。但是HBase與傳統的基于SQL語言的關系數據庫無論從理念還是使用方式上都相去甚遠,以至于要將基于SQL的項目移植到HBase時往往需要重寫整個項目。
為了解決這個問題,很多開源項目提供了HBase的類SQL中間件,意即提供一種在HBase上使用的類SQL語言,使得程序員能夠像使用關系數據庫一樣使用HBase,Apache Phoenix就是其中的一個優秀項目。
本文介紹了如何將基于傳統關系數據庫的程序通過Apache Phoenix移植到基于HBase的云計算平臺上的方法,并詳細講述了該過程中碰到的種種困難。主要內容包括:

HBase及云計算環境的安裝配置; HBase的Java API編程; Phoenix的安裝配置與使用; Squirrel的安裝配置與使用; 使用Phoenix移植SQL代碼至HBase; Phoenix性能調優;

本文的讀者應該是數據庫系統項目的開發人員和維護人員,云計算項目開發人員,最好具有以下基本知識:

linux系統使用常識; Hadoop、Hbase、Zookeeper等云計算環境使用常識; Java編程開發基礎; SQL語言基礎; Oracle、SQLServer或Mysql等關系數據庫使用管理基礎

2. HBase及云計算環境的安裝配置

2.1 環境配置

云計算環境通常安裝在linux或者CentOS等類UNIX操作系統中,本文涉及的軟件至少需要三個,即Hadoop、Hbase和Zookeeper,其版本號如下:

hadoop-2.3.0-cdh5.1.0 zookeeper-3.4.5-cdh5.1.0 hbase-0.98.1-cdh5.1.0
注意:本文使用了云時代的版本5.1.0,由于此類軟件版本眾多,互相之間的兼容性復雜,因此最好統一采用cdh的版本。系統配置如下圖所示:
這里寫圖片描述

系統一共六個節點,即Node1~Node6,hadoop安裝在全部六個節點上,其中Node1和Node2是NameNode,其他是DataNode;ZooKeeper安裝在Node4、Node5和Node6上,其端口使用默認的2181;Hbase安裝在Node1、Node3~Node6上,其眾喎?http://www.bjxfcs.com/pro/pkqt/" target="_blank" class="keylink">QTm9kZTHKx0hNYXN0ZXKjrMbky/vKx0hSZWdpb25TZXJ2ZXKhoyA8YnI+Cr7fzOWyzsr9xeTWw7/J0tSyzr+8xuTL+87EtbWjrLTLtKayu9f2z+rPuMPoyvahoyA8YnI+CjxlbT7XotLio7q/zbuntsux2NDrzai5/Vpvb0tlZXBlctXStb1IYmFzZbXEyOu/2qGjttTT2r/Nu6fAtMu1o6zWu9Do0qrWqrXAWm9vS2VlcGVy1NrExLb5o7vQ6NKqt8POymhiYXNlyrGjrL/Nu6e2y8il1dJab29LZWVwZXKjrFpvb0tlZXBlctTZyKWy6dGvSEJhc2W1xEhNYXN0ZXK6zUhSZWdpb25TZXJ2ZXK1yNDFz6KjrL7fzOXH6b/2vPuhtkhCYXNlyrXVvaG3NjPSs6GjPC9lbT48L3A+Cgo8aDMgaWQ9"22-hbase-shell使用">2.2 HBase Shell使用

環境配置成功后,即可使用HBase Shell對HBase數據庫進行操作,類似于Oracle提供的sqlplus。
登陸任意一個安裝了HBase的服務器,輸入:

hbase shell
list

即可列出該hbase中存儲的所有表格。
創建一個名為test的表格,它帶有一個名為cf的列族,并使用list來查看表格是否被創建,然后插入一些數據:

hbase(main):003:0> create 'test', 'cf'
0 row(s) in 1.2200 seconds
hbase(main):003:0> list
test
1 row(s) in 0.0550 seconds
hbase(main):004:0> put 'test', 'row1', 'cf:a', 'value1'
0 row(s) in 0.0560 seconds
hbase(main):005:0> put 'test', 'row2', 'cf:b', 'value2'
0 row(s) in 0.0370 seconds
hbase(main):006:0> put 'test', 'row3', 'cf:c', 'value3'
0 row(s) in 0.0450 seconds

使用scan來查看test表格中的內容:

hbase(main):007:0> scan 'test'
ROW        COLUMN+CELL
row1       column=cf:a, timestamp=1288380727188, value=value1
row2       column=cf:b, timestamp=1288380738440, value=value2
row3       column=cf:c, timestamp=1288380747365, value=value3
3 row(s) in 0.0590 seconds

得到表中的一行數據:

hbase(main):008:0> get 'test', 'row1'
COLUMN      CELL
cf:a        timestamp=1288380727188, value=value1
1 row(s) in 0.0400 seconds
disable和drop一個表格:
hbase(main):012:0> disable 'test'
0 row(s) in 1.0930 seconds
hbase(main):013:0> drop 'test'
0 row(s) in 0.0770 seconds 

退出shell:

hbase(main):014:0> exit

其他更多具體的命令請參看HBase的手冊或者在線幫助。

3. HBase Java API 編程

使用HBase的Java API進行開發需要掌握HBase的基本理念,推薦閱讀《HBase實戰》一書。
在進行開發的操作系統(例如Windows、Linux或者CentOS)中解壓hbase-0.98.1-cdh5.1.0.tar.gz,得到開發所依賴的所有jar包,位于hbase-0.98.1-cdh5.1.0/lib目錄中。
在開發環境(例如Eclipse、NetBean或者Intellij)中建立工程,導入hbase-0.98.1-cdh5.1.0\lib中的所有jar包。

3.1 關于遠程連接HBase

在給出源代碼之前,先介紹一下遠程連接HBase的問題。從Oracle時代過來的程序員,顯然期望得到數據庫服務器的ip、port和Service Name之類的信息。但是在連接HBase時,你需要的卻是一個或多個ZooKeeper服務器的ip(或者hostname)和port,因為只有它才知曉整個HBase集群的元數據。
顯然,使用hostname比使用ip要顯得習慣更好,因為它帶來了更大的可移植性,因此費一點筆墨講講linux和windows的hostname設置。
在linux下,hostname通過修改/etc/hosts文件來完成,在集群的每臺服務器上加入如下內容:

192.168.1.101  Node1
192.168.1.102  Node2
192.168.1.103  Node3
192.168.1.104  Node4
192.168.1.105  Node5
192.168.1.106  Node6

在各自的/etc/sysconfig/network文件中,將“HOSTNAME=”修改為“HOSTNAME=Node?”(將Node?替換為本服務器的hostname)。
在Windows下(僅測試過Win7 64),修改Windows/System32/drivers/etc/hosts文件,加入:

192.168.1.101  Node1
192.168.1.102  Node2
192.168.1.103  Node3
192.168.1.104  Node4
192.168.1.105  Node5
192.168.1.106  Node6

(不同的windows平臺hosts文件的位置可能不一樣,建議裝一個everything,桌面搜索速度極快)。
其實多種方法都可以連接到ZooKeeper,例如ip加端口:

public static String hbase_svr_ip = "192.168.1.104, 192.168.1.105, 192.168.1.106";
public static String hbase_svr_port = "2181";

或者hostname加端口:

public static String hbase_svr_hostname = "Node4,Node5,Node6";
public static String hbase_svr_port = "2181";

或者將端口直接寫在ip后:

public static String hbase_svr_ip = "192.168.1.104:2181, 192.168.1.105:2181, 192.168.1.106:2181";

或者將端口直接寫在hostname后:

public static String hbase_svr_hostname = "Node4:2181,Node5:2181,Node6:2181";

或者僅使用一個ZooKeeper服務器:

public static String hbase_svr_hostname = "Node4:2181";

具體使用哪種方法就看程序員自己的偏好,也存在某種方法在某些版本中可能無法連接的問題,本文中沒有窮盡測試,但個人認為hostname加端口的方法可能比較穩妥。

3.2 源代碼

本篇給出了使用Java API操作HBase的源代碼,注意要將這幾行替換為實際的ZooKeeper服務器地址、hostname和端口號:

public static String hbase_svr_ip = "192.168.1.104, 192.168.1.105, 192.168.1.106";
public static String hbase_svr_port = "2181";
public static String hbase_svr_hostname = "Node4,Node5,Node6";

代碼功能包括:

遠程連接Hbase數據庫; 創建表; 掃描所有表; 插入數據; 掃描數據; 刪除數據; 刪除表。
package com.wxb;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

/**
 * @author wxb hbase的基本操作方法
 */
public class HBaseSample {
public static String hbase_svr_ip = "192.168.1.104, 192.168.1.105, 192.168.1.106";
    public static String hbase_svr_port = "2181";
    public static String hbase_svr_hostname = "Node4,Node5,Node6";
    private HConnection connection = null;
    Configuration config = null;

    /**
     * 構造函數,構造一個HBaseSample對象,必須在最后調用close方法來關閉所有的連接,釋放所有的資源
     */
    public HBaseSample() {
        config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", hbase_svr_hostname);
        config.set("hbase.zookeeper.property.clientPort", hbase_svr_port);
        // System.out.println(config.get("hbase.zookeeper.quorum"));
        // System.out.println(config.get("hbase.zookeeper.property.clientPort"));

        try {
            connection = HConnectionManager.createConnection(config);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 釋放資源
     */
    public void close() {
        try {
            if (null != connection) {
                connection.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 創建表格
     * 
     * @param tableName
     * @param columnFarily
     */
    public void createTable(final String tableName, String columnFarily) {
        if (null != config) {
            System.out.println("begin create table...");
            HBaseAdmin admin = null;
            try {
                admin = new HBaseAdmin(config);
                if (admin.tableExists(tableName)) {
                    System.out.println(tableName + " is already exist!");
                } else {
                    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
                    tableDesc.addFamily(new HColumnDescriptor(columnFarily));
                    admin.createTable(tableDesc);
                    System.out.println(tableDesc.toString()
                            + " has been created.");
                }
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("hbase could not connected!");
        }
    }

    /**
     * 向指定表格中添加一行數據
     * 
     * @param table
     * @param key
     * @param family
     * @param col
     * @param dataIn
     * @return
     */
    public boolean addOneRecord(String table, String key, String family,
            String col, byte[] dataIn) {
        if (null != connection) {
            try {
                HTableInterface tb = connection.getTable(table);
                Put put = new Put(key.getBytes());
                put.add(family.getBytes(), col.getBytes(), dataIn);
                tb.put(put);
                System.out.println("put data key = " + key);
                return true;
            } catch (IOException e) {
                System.out.println("put data failed.");
                return false;
            }
        } else {
            System.out.println("hbase could not connected!");
            return false;
        }
    }

    /**
     * 得到hbase中所有的表
     * 
     * @return
     */
    public List<String> getAllTables() {
        List<String> tables = null;
        if (connection != null) {
            try {
                HTableDescriptor[] allTable = connection.listTables();
                if (allTable.length > 0)
                    tables = new ArrayList<String>();
                for (HTableDescriptor hTableDescriptor : allTable) {
                    tables.add(hTableDescriptor.getNameAsString());
                    System.out.println(hTableDescriptor.getNameAsString());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("hbase could not connected!");
        }
        return tables;
    }

    public byte[] getValueWithKey(String tableName, String rowKey,
            String family, String qualifier) {
        byte[] rel = null;
        if (null != connection) {
            try {
                HTableInterface table = connection.getTable(tableName);
                Get get = new Get(rowKey.getBytes());
                get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
                Result result = table.get(get);
                if (!result.isEmpty()) {
                    rel = result.getValue(Bytes.toBytes(family),
                            Bytes.toBytes(qualifier));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("hbase could not connected!");
        }
        return rel;
    }

    /**
     * 從表中刪除一行
     * 
     * @param tableName
     * @param rowKey
     */
    public void deleteWithKey(String tableName, String rowKey) {
        if (null != connection) {
            try {
                HTableInterface table = connection.getTable(tableName);
                Delete delete = new Delete(rowKey.getBytes());
                table.delete(delete);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("hbase could not connected!");
        }
    }

    /**
     * 得到一個表中的所有元素
     * 
     * @param tableName
     */
    public void getAllData(String tableName) {
        if (null != connection) {
            try {
                HTableInterface table = connection.getTable(tableName);
                Scan scan = new Scan();
                ResultScanner rs = table.getScanner(scan);
                for (Result r : rs) {
                    Cell[] cells = r.rawCells();
                    System.out.println("This row have " + cells.length
                            + " cells:");
                    for (Cell cell : cells) {
                        String row = Bytes.toString(CellUtil.cloneRow(cell));
                        String family = Bytes.toString(CellUtil
                                .cloneFamily(cell));
                        String qualifier = Bytes.toString(CellUtil
                                .cloneQualifier(cell));
                        String value = Bytes
                                .toString(CellUtil.cloneValue(cell));
                        System.out.println(String.format("%s:%s:%s:%s", row,
                                family, qualifier, value));
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("hbase could not connected!");
        }
    }

    public void deleteTable(String tableName) {
        if (null != config) {
            System.out.println("begin delete table...");
            HBaseAdmin admin = null;
            try {
                admin = new HBaseAdmin(config);
                if (!admin.tableExists(tableName)) {
                    System.out.println(tableName + " is not exist!");
                } else {
                    admin.disableTable(tableName);
                    admin.deleteTable(tableName);
                    System.out.println(tableName + " has been deleted.");
                }
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("hbase could not connected!");
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        HBaseSample sample = new HBaseSample();
        // 1.create table and insert data
        sample.createTable("student", "fam1");
        sample.addOneRecord("student", "id1", "fam1", "name", "Jack".getBytes());
        sample.addOneRecord("student", "id1", "fam1", "address",
                "HZ".getBytes());

        // 2.list table
        sample.getAllTables();

        // 3.getValue
        byte[] value = sample.getValueWithKey("student", "id1", "fam1",
                "address");
        System.out.println("value = " + Bytes.toString(value));

        // 4.addOneRecord and delete
//      sample.addOneRecord("student", "id2", "fam1", "name", "wxb".getBytes());
//      sample.addOneRecord("student", "id2", "fam1", "address",
//              "here".getBytes());
//      sample.deleteWithKey("student", "id2");

        // 5.scan table
        sample.getAllData("student");

        // 6.delete table
        // sample.deleteTable("student");

        sample.close();
    }
}

4. Phoenix的安裝配置與使用

從上一章可以看出,HBase的基本理念和傳統的關系數據庫是截然不同的,為了使得熟悉SQL的程序員能夠快速使用HBase,使用Apache Phoenix是比較好的辦法。它提供了一組類似于SQL的語法,以及序列、索引、函數等工具,使得將SQL代碼移植至HBase成為可能。

4.1 Phoenix安裝

同其他分布式軟件一樣,Phoenix的安裝也是較為復雜的,且要密切關注其版本兼容性,否則很可能無法正常運行。例如Phoenix4.x版本都有兼容HBase0.98的版本,但是經過兩天的測試才發現不同的Phoenix版本對HBase0.98的小版本號的要求不同。
由于本文使用的是HBase0.98.1,因此只能使用Phoenix4.1.0版本。如果使用的Phoenix版本和HBase版本不兼容,會出現第一次能夠連接HBase,但以后都連接失敗的現象。
Phoenix的具體安裝步驟如下:
第一步:將phoenix-4.1.0-bin.tar.gz拷貝到Node1(HBase的HMaster)的某路徑下,解壓縮,拷貝hadoop2/phoenix-4.1.0-server-hadoop2.jar到HBase的lib目錄下。
第二步:然后用scp(關于scp和ssh的設置請參考網上的其他文章,假設用戶名為hadoop)拷貝到各個regionserver的HBase的lib目錄下:

scp phoenix-4.1.0-server-hadoop2.jar hadoop@Node3:/home/hadoop/hbase-0.98.1-cdh5.1.0/lib/
phoenix-core-4.6.0-HBase-0.98.jar                                                                                                    
scp phoenix-4.1.0-server-hadoop2.jar hadoop@Node4:/home/hadoop/hbase-0.98.1-cdh5.1.0/lib/
phoenix-core-4.6.0-HBase-0.98.jar                                                                                                  
scp phoenix-4.1.0-server-hadoop2.jar hadoop@Node5:/home/hadoop/hbase-0.98.1-cdh5.1.0/lib/
phoenix-core-4.6.0-HBase-0.98.jar
scp phoenix-4.1.0-server-hadoop2.jar hadoop@Node6:/home/hadoop/hbase-0.98.1-cdh5.1.0/lib/
phoenix-core-4.6.0-HBase-0.98.jar            

第三步:在HMaster上重啟hbase(即Node1);
第四步:將phoenix-4.1.0-client-hadoop2.jar加入客戶端的CLASSPATH變量路徑中,修改用戶的.bash_profile文件,同時將此文件拷貝到hbase的lib目錄下。
第五步:測試使用phoenix,輸入命令:

sqlline.py Node4:2181

注意:后面的參數是ZooKeeper的服務器和端口。
出現以下顯示則說明連接成功。

[hadoop@iips25 hadoop2]$bin/sqlline.py Node1:2181
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix:Node4 none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:Node4
16/06/21 08:04:24 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-phoenix.properties,hadoop-metrics2.properties
Connected to: Phoenix (version 4.1)
Driver: org.apache.phoenix.jdbc.PhoenixDriver (version 4.1)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
59/59 (100%) Done
Done
sqlline version 1.1.2
0: jdbc:phoenix:Node4>

查看數據庫表:(注意,phoenix只能看到自己創建的表,不能看到HBase創建的表)

0: jdbc:phoenix:Node4> !tables
+------------+-------------+------------+------------+------------+------------+---------------------------+----------------+-------------+----------------+--------+
| TABLE_CAT  | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE |  REMARKS   | TYPE_NAME  | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_ROWS | SALT_B |
+------------+-------------+------------+------------+------------+------------+---------------------------+----------------+-------------+----------------+--------+
| null       | SYSTEM      | CATALOG    | SYSTEM TABLE | null       | null       | null                      | null           | null        | false          | null |
| null       | SYSTEM      | SEQUENCE   | SYSTEM TABLE | null       | null       | null                      | null           | null        | false          | null |
+------------+-------------+------------+------------+------------+------------+---------------------------+----------------+-------------+----------------+--------+
0: jdbc:phoenix:Node4>

創建表,并插入數據:

0: jdbc:phoenix:Node4> create table abc(a integer primary key, b integer) ;
No rows affected (1.133 seconds)
0: jdbc:phoenix:Node4> UPSERT INTO abc VALUES (1, 1); 
1 row affected (0.064 seconds)
0: jdbc:phoenix:Node4> UPSERT INTO abc VALUES (2, 2); 
1 row affected (0.009 seconds)
0: jdbc:phoenix:Node4> UPSERT INTO abc VALUES (3, 12); 
1 row affected (0.009 seconds)
0: jdbc:phoenix:Node4> select * from abc;
+------------+------------+
|     A      |     B      |
+------------+------------+
| 1          | 1          |
| 2          | 2          |
| 3          | 12         |
+------------+------------+
3 rows selected (0.082 seconds)
0: jdbc:phoenix:Node4>

創建包含中文的表(注意中文要使用VARCHAR):

create table user ( id integer primary key, name VARCHAR);
upsert into user values ( 2, '測試員2');
upsert into user values ( 1, '測試員1');
select * from user;
+------------+------------+
|     ID     |    NAME    |
+------------+------------+
| 1          | 測試員1        |
| 2          | 測試員2         |

4.2 phoenix配置

在hbase集群每個服務器的hbase-site.xml配置文件中,加入:

<property>
  <name>hbase.regionserver.wal.codec</name>
  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>

這是在phoenix中建立索引的先決條件。如果不添加此設置,Phoenix依然可以正常使用,但不能建立索引。

4.3 phoenix語法簡介

phoenix的語法可參考其官方網站,也可下載其“Grammar _ Apache Phoenix.html”網頁。
訪問Phoenix時,可以使用其提供的sqlline.py命令,也可以使用下一章介紹的數據庫圖形界面工具Squirrel,當然也可以通過Phoenix提供的Java API。

4.3.1. 創建表

注意:Phoenix中的表必須有主鍵,這一點和許多關系數據庫不同。因為主鍵是后續很多表操作的必備因素。

CREATE TABLE IF NOT EXISTS MYTABLE (ID INTEGER PRIMARY KEY, NAME VARCHAR, SEX VARCHAR, ADDRESS VARCHAR);

4.3.2. 刪除表

DROP TABLE IF EXISTS MYTABLE;

4.3.3. 插入數據

UPSERT INTO MYTABLE VALUES (1, 'WXB', 'MALE', '010-22222222');

注意phoenix使用UPSERT而不是INSERT。

4.3.4. 刪除數據

DELETE FROM MYTABLE WHERE ID = 1;

4.3.5. 查詢數據

SELECT * FROM MYTABLE WHERE ID=1;

4.3.6. 修改數據

UPSERT INTO MYTABLE VALUES (1, 'WXB', 'MALE', '010-22222222');

可以看到,修改數據與插入數據一樣,都是使用UPSERT語句,若此主鍵對應的行不存在,就插入,否則就修改。這也是為什么Phoenix的表必須有主鍵的原因之一。

4.3.7. 創建序列

Phoenix的序列與Oracle很像,也是先創建,然后調用next得到下一個值。也可以繼續調用current value得到當前序列值,沒有調用next時,不能使用current value。
創建一個序列:

CREATE SEQUENCE IF NOT EXISTS WXB_SEQ START WITH 1000 INCREMENT BY 1 MINVALUE 1000 MAXVALUE 999999999 CYCLE CACHE 30;

其含義基本上與Oracle類似。

4.3.8. 使用序列

序列只能在Select或者Upsert語句中使用,例如在Upsert中使用:

UPSERT INTO MYTABLE VALUES (NEXT VALUE FOR WXB_SEQ, 'WXB', 'MALE', '010-22222222');

讀取序列的當前值時,采用這個語句:

SELECT CURRENT VALUE FOR WXB_SEQ DUALID FROM WXB_DUAL;

然后讀取DUALID就可得到序列的當前值。
這里的WXB_DUAL是我自己創建的一個特殊表,用來模擬Oracle中的Dual表。

CREATE TABLE  IF NOT EXISTS WXB_DUAL (DUALID INTEGER PRIMARY KEY );
UPSERT INTO WXB_DUAL VALUES (1);

4.3.9. 刪除序列

DROP SEQUENCE IF EXISTS WXB_SEQ;

本章至此為止,詳細的操作留待后續再講。

5. 安裝SQuirrel

Squirrel是一個圖形化的數據庫工具,它可以將Phoenix以圖形化的方式展示出來,它可以安裝在windows或linux系統中。

5.1 安裝步驟

第一步:
設置好JDK,JAVA_HOME,CLASSPATH等一系列的環境變量,注意無論是在windows還是在linux下,都需要上面安裝的hbase和phoenix的存放jar包的目錄,并將其設置到CLASSPATH中。windows下的CLASSPATH如下:

%JAVA_HOME%\lib;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar;D:\hbase-0.98.1-cdh5.1.0\lib;D:\phoenix-4.1.0-bin\hadoop2

linux的CLASSPATH如下:

export PHOENIX_HOME=/home/hadoop/phoenix-4.1.0-bin
export CLASSPATH=$PHOENIX_HOME/hadoop2/phoenix-4.1.0-client-hadoop2.jar:$HBASE_HOME/lib/:$CLASSPATH
export PATH=$PHOENIX_HOME/bin:$PATH

第二步:
下載解壓squirrel-sql-snapshot-20160613_2107-standard.jar(最新版本的squirrel安裝包),在命令行中運行java -jar squirrel-sql-snapshot-20160613_2107-standard.jar開始安裝。
第三步:執行如下安裝
1. Remove prior phoenix-[oldversion]-client.jar from the lib directory of SQuirrel, copy phoenix-[newversion]-client.jar to the lib directory (newversion should be compatible with the version of the phoenix server jar used with your HBase installation)
2. Start SQuirrel and add new driver to SQuirrel (Drivers -> New Driver)
3. In Add Driver dialog box, set Name to Phoenix, and set the Example URL to jdbc:phoenix:localhost.
4. Type “org.apache.phoenix.jdbc.PhoenixDriver” into the Class Name textbox and click OK to close this dialog.
5. Switch to Alias tab and create the new Alias (Aliases -> New Aliases)
6. In the dialog box, Name:Any name, Driver: Phoenix, User Name:Anything, Password:Anything
7. Construct URL as follows: jdbc:phoenix:zookeeper quorum server. For example, to connect to a local HBase use: jdbc:phoenix:localhost
8. Press Test (which should succeed if everything is setup correctly) and press OK to close.
9. Now double click on your newly created Phoenix alias and click Connect. Now you are ready to run SQL queries against Phoenix.
注意,我們連接的URL是jdbc:phoenix:Node4,用戶名和密碼隨意即可。連接成功后,如下:
這里寫圖片描述

5.2 使用

安裝完畢后,就可以在Squirrel中執行各種phoenix支持的類SQL語句和觀察數據了,例如在SQL欄中輸入如下語句:

CREATE TABLE IF NOT EXISTS MYTABLE (ID INTEGER PRIMARY KEY, NAME VARCHAR, SEX VARCHAR, ADDRESS VARCHAR);

UPSERT INTO MYTABLE VALUES (1, 'WXB', 'MALE', '010-22222222');

UPSERT INTO MYTABLE VALUES (2, ‘LL’, 'MALE', '010-11111111');

SELECT * FROM MYTABLE;

結果如下:
這里寫圖片描述
使用Squirrel的好處在于可以方便的查看數據庫中的各種對象,以及編輯和執行復雜的phoenix類sql腳本。

6. 使用Phoenix移植SQL代碼至HBase

Phoenix提供了完全適配JDBC的API,程序員可以像操作關系數據庫(例如Oracle)一樣來使用JDBC來操作Phoenix,這也是Phoenix的最大的優勢所在。唯一需要注意的是,提交的SQL語句必須符合Phoenix語法,雖然此語法很類似于SQL,但還是有許多不同之處。

6.1 Phoenix Java Coding

本章給出了一個最基本的Phoenix JDBC源代碼實例,注意其中所引用的所有類幾乎都來自于java.sql.*包,與Oracle唯一的不同是其driver的字符串,該字符串等于前面連接Squirrel的連接字符串,你可以在Squirrel上測試driver字符串是否能夠正確連接。driver字符串一般為jdbc:phoenix:ZooKeeper_hostname:port,例如jdbc:phoenix:Node4,Node5,Node6:2181。但是在端口為默認2181端口時,也可以省略端口號。
編碼之前將phoenix-4.1.0-client-hadoop2.jar加入java項目的依賴Libraries,例子代碼如下:

package com.wxb;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
 * @author wxb  Phoenix的基本操作方法
 * 
 */
public class PhoenixSample {
    public static String hbase_svr_ip = "192.168.1.104, 192.168.1.105, 192.168.1.106";
    public static String hbase_svr_port = "2181";
    public static String hbase_svr_hostname = "Node4,Node5,Node6";

    /*
     * 所有幾種方式的driver都能夠通過測試: 1.Node4 2.Node4,Node5,Node6 3.Node4:2181
     * 4.Node4,Node5,Node6:2181 5.Node4:2181,Node5:2181,Node6:2181
     * 6.101.60.27.114
     */
    public static String driver = "jdbc:phoenix:" + hbase_svr_hostname;

    public static void createTable(String tableName) {
        System.out.println("create table " + tableName);
        Statement stmt = null;

        try {
            Connection con = DriverManager.getConnection(driver);
            stmt = con.createStatement();

            stmt.executeUpdate("create table  if not exists " + tableName
                    + " (mykey integer not null primary key, mycolumn varchar)");
            con.commit();
            con.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void addRecord(String tableName, String values) {
        Statement stmt = null;

        try {
            Connection con = DriverManager.getConnection(driver);
            stmt = con.createStatement();

            stmt.executeUpdate("upsert into " + tableName + " values ("
                    + values + ")");
            con.commit();
            con.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void deleteRecord(String tableName, String whereClause) {
        Statement stmt = null;

        try {
            Connection con = DriverManager.getConnection(driver);
            stmt = con.createStatement();

            stmt.executeUpdate("delete from " + tableName + " where "
                    + whereClause);
            con.commit();
            con.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void createSequence(String seqName) {
        System.out.println("Create Sequence :" + seqName);
        Statement stmt = null;

        try {
            Connection con = DriverManager.getConnection(driver);
            stmt = con.createStatement();

            stmt.executeUpdate("CREATE SEQUENCE IF NOT EXISTS "
                    + seqName
                    + " START WITH 1000 INCREMENT BY 1 MINVALUE 1000 MAXVALUE 999999999 CYCLE CACHE 30");
            con.commit();
            con.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void dropSequence(String seqName) {
        System.out.println("drop Sequence :" + seqName);
        Statement stmt = null;

        try {
            Connection con = DriverManager.getConnection(driver);
            stmt = con.createStatement();

            stmt.executeUpdate("DROP SEQUENCE IF EXISTS " + seqName);
            con.commit();
            con.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void getAllData(String tableName) {

        System.out.println("Get all data from :" + tableName);
        ResultSet rset = null;

        try {
            Connection con = DriverManager.getConnection(driver);
            PreparedStatement statement = con.prepareStatement("select * from "
                    + tableName);
            rset = statement.executeQuery();
            while (rset.next()) {
                System.out.print(rset.getInt("mykey"));
                System.out.println(" " + rset.getString("mycolumn"));
            }
            statement.close();
            con.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void dropTable(String tableName) {

        Statement stmt = null;

        try {
            Connection con = DriverManager.getConnection(driver);
            stmt = con.createStatement();

            stmt.executeUpdate("drop table  if  exists " + tableName);
            con.commit();
            con.close();
            System.out.println("drop table " + tableName);
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        createTable("wxb_test");
        createSequence("WXB_SEQ_ID");

        // 使用了Sequence
        addRecord("wxb_test", "NEXT VALUE FOR WXB_SEQ_ID,'wxb'");
        addRecord("wxb_test", "NEXT VALUE FOR WXB_SEQ_ID,'wjw'");
        addRecord("wxb_test", "NEXT VALUE FOR WXB_SEQ_ID,'wjl'");

        // deleteRecord("wxb_test", " mykey = 1 ");
        getAllData("wxb_test");

        // dropTable("wxb_test");
//      dropSequence("WXB_SEQ_ID");

    }
}

6.2 每個表必須包含一個主鍵

在使用Phoenix時,建立的每個表都必須包含一個主鍵,這與關系數據庫不同。而且每個表的主鍵會自動被索引,這意味著在select語句的where子句中使用主鍵作為條件,會得到最快的查詢速度。關于索引,在后續章節中再詳細介紹。
我的建議是,為每個表創建一個序列,并在插入數據時以序列的值作為主鍵的值。

6.3 JDBC連接池

Phoenix支持用戶自己創建JDBC連接池,可以將基于JDBC連接池的代碼復制過來,把Driver部分修改一番即可。

6.4 中文支持

涉及中文的字段可設置為VARCHAR類型,經測試沒有問題。

6.5 CLOB和BLOB

CLOB和BLOB字段我都設置為VARCHAR類型,經測試存儲400k字節的數據沒有問題,更多的沒有測試。

6.6 復雜的SQL語句

因為本文使用的Phoenix版本不是最新版,因此官網上給出的SQL語法不是完全都能夠支持,例如下面的語句就不能支持:

delete from wxb_senword where swid in (select swid from wxb_rela_sw_group where groupid=1)

因此對于一些復雜的SQL語句,需要先到官網上查詢語法,然后在phoenix中進行測試,測試通過后才能夠在程序中使用。
兩個表的關聯查詢是可行的,語句如下:

SELECT d.swid,d.swname, d.userid, e.groupid FROM wxb_senword d JOIN wxb_rela_sw_group e ON e.swid = d.swid where e.groupid=1;

7. Phoenix性能調優

7.1 代碼移植流程

將基于SQL的java代碼移植到Phoenix其實不難,以Oracle為例,基本流程如下:

將Oracle中的所有表在Phoenix中重新建立一次,沒有主鍵的自己加一個主鍵(并建立對應的序列); 將Oracle中所有的序列、視圖都在Phoenix中重新建立一次; 將程序中的每條SQL語句都翻譯為Phoenix的SQL語句,并測試該語句是否能夠正確運行,若不能,總能找到幾條簡單的語句進行替代。

7.2 Oracle和HBase的性能差異

移植完成后,經過一系列debug,程序總算能夠正常運行了。但是性能問題會變得非常嚴重,這是關系數據庫和HBase之間的設計思路和應用問題域之間的差異造成的。
Oracle的設計思路是盡可能的快速對數據進行操作,但是隨著表中記錄數的不斷增加,查詢性能持續下降。要對Oracle進行硬件擴充會比較困難,而且會在單表一億條左右時(沒有經過本人驗證)碰到性能瓶頸。Oracle的優勢是在表中記錄數不多(幾百萬以內,具體看服務器性能)時擁有極高的查詢速度。
而HBase的優勢是讓單表可以存儲幾乎無限的記錄,并且可以方便的擴充硬件,使得查詢速度可以達到一個穩定的標準。但是其缺點在于表中數據不多時,查詢速度相對較慢。經測試,Phoenix的表在記錄數很少時(數十條),查詢單條數據也需要0.2秒左右(服務器集群配置見前面的章節),而同時單服務器的Oracle查詢這樣的數據僅需30ms左右,相差接近十倍。

7.3 Phoenix索引性能測試

與Oracle相比,Phoenix在性能上還有一個特點就是在沒有索引的情況下,查詢性能下降很快。
例如下表:

CREATE TABLE IF NOT EXISTS WXB_WORD (ID INTEGER PRIMARY KEY, NAME VARCHAR, VALUE DOUBLE, HEAT INTEGER, FOCUSLEVEL INTEGER, USERID INTEGER);

不建立索引的情況下,在前面介紹的集群上進行查詢性能測試,查詢語句如下(確保單條命中):

SELECT * FROM WXB_WORD WHERE NAME=’XXX’;

50萬條記錄,平均單條查詢時間為0.38秒;
100萬條記錄,平均單條查詢時間為0.79秒;
500萬條記錄,平均單條查詢時間為4.31秒;
然而在NAME字段上建立索引后,將表中數據增加到1億條,平均單條查詢時間為0.164秒,可見索引對Phoenix性能的提升作用是無可替代的。

7.4 Phoenix索引簡介

Phoenix中的索引被稱之為Secondary Indexing(二級索引),這是為了和HBase主鍵上的索引區分開。在HBase中,每個表有且僅有一個主鍵的索引,該索引按照字典序進行排序;所有不基于主鍵的查詢都會導致全表掃描,效率非常低下。在Phoenix中,可以對表中的任何一個字段或者幾個字段建立二級索引,該索引實際上是一個獨立的表,表中包含了被索引的列以及建立索引時包含的列(在索引的include語句中包含的列)。當用戶對表進行查詢時,會首先對索引進行查詢,若能夠得到全部的結果,則會直接返回,否則就到原表中進行查詢。
注意,Phoenix的每個表都可以建立多個索引,索引和原表之間的同步由Phoenix保證。但是,索引越多,寫入效率越低。
Phoenix支持兩種類型的索引:可變索引(mutable indexing)和不可變索引(immutable indexing)。在表中數據需要變化時,使用可變索引;當應用場景為“一次寫入,只會追加,永不改變”時使用不可變索引。本文中只使用了可變索引。

7.5 建立索引的方法與語句

在建立索引之前,再次檢查Phoenix的配置,在HBase集群的每個服務器的hbase-site.xml配置文件中,加入:

<property>
  <name>hbase.regionserver.wal.codec</name>
  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>

例如:在WXB_WORD表上對NAME字段建立DESC索引,該索引還包含了VALUE字段的值(注意,Phoenix是大小寫不敏感的)。

create index if not exists idx_wxb_word on wxb_word (name desc) include (value) ;

那么這種語句就查詢得特別快:

select name,value from wxb_word where name='AHNHLYPKGYAR_59999';

但是如果查詢語句中還需要知道其他字段的值,例如:

select name,value,userid from wxb_word where name='AHNHLYPKGYAR_59999';

那么,就和沒有索引差不多,因為該索引中沒有包含userid這個字段。
另外需要注意的是:主鍵不需要索引,查詢也非常快,這是由HBASE的特性保證的。
刪除索引語句:

drop index if exists idx_wxb_word on wxb_word;

8. 總結

使用Phoenix將SQL代碼移植到HBase應注意以下幾個問題。
第一,應用場景是否合適?是否需要在單表中存儲幾乎無限的數據,并保證一定的查詢性能?在數據量較少的情景下,Phoenix反而比Oracle的性能差。若要追求最高的性能,可以考慮同時使用關系數據庫和HBase,并自己保證這部分數據的同步。
第二,Phoenix、HBase、Hadoop、ZooKeeper的版本兼容問題。在大部分情況下,開發人員并不能決定HBase、Hadoop和ZooKeeper的版本,因此只能尋找合適的Phoenix版本來適配它們,這將導致你不能使用最新的Phoenix版本。如同本文中寫的一樣,這種情況會導致一些Phoenix SQL語句的特性得不到支持。
第三,注意Phoenix的每個表必須包含一個主鍵(其實就是HBase的Primary rowkey),且該主鍵自帶索引,合理設計這個主鍵能夠帶來性能上的提升和查詢的便利。作為從SQL時代過來的程序員,拋棄節約空間的想法;在大數據時代,就是盡可能的用空間換時間。舉個例子,你甚至可以將所有字段以一定的順序和分隔符全部堆到主鍵上。
第四,移植代碼時,將所有SQL語句一一翻譯為對應的Phoenix語句即可。注意參考Phoenix主頁上的語法介紹,并一一進行測試。Phoenix對JDBC的支持很好,諸如連接池一類的特性可以原封不動的照搬。但若原來的程序使用了針對SQL語句的中間件之類的技術,請恕我也不知如何處理。
第五,一定要對Phoenix的表建立二級索引,索引中盡可能包含所有需要查詢的字段。索引會導致數據插入速度變慢,但會帶來巨大的性能提升。

Tag標簽: 代碼  
  • 專題推薦

About IT165 - 廣告服務 - 隱私聲明 - 版權申明 - 免責條款 - 網站地圖 - 網友投稿 - 聯系方式
本站內容來自于互聯網,僅供用于網絡技術學習,學習中請遵循相關法律法規
香港最快开奖现场直播结果