查看原文
其他

数据库中间件 MyCAT 源码分析 —— SQL ON MongoDB

2017-07-20 王文斌(芋艿) 芋道源码


🙂🙂🙂关注微信公众号有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表

  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址

  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢

  4. 新的源码解析文章实时收到通知。每周更新一篇左右


  • 1. 概述

  • 2. 主流程

  • 3. 查询操作

  • 4. 插入操作

  • 5. 彩蛋


1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。

本文主要分成四部分:

  1. 总体流程,让你有个整体的认识

  2. 查询操作

  3. 插入操作

  4. 彩蛋,😈彩蛋,🙂彩蛋

建议你看过这两篇文章(非必须):

  1. 《MyCAT 源码分析 —— 【单库单表】插入》

  2. 《MyCAT 源码分析 —— 【单库单表】查询》

2. 主流程

  1. MyCATServer 接收 MySQLClient 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDBServer

  2. MyCATServer 接收 MongoDBServer 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQLClient

这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。


Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。

MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。

是不是熟悉的味道。不得不说 JDBC 规范的精妙。

3. 查询操作

  1. SELECT id, name FROM user WHERE name > '' ORDER BY _id DESC;

看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。

1、查询 MongoDB

  1. // MongoSQLParser.java

  2. public MongoData query() throws MongoSQLException {

  3.   if (!(statement instanceof SQLSelectStatement)) {

  4.       //return null;

  5.       throw new IllegalArgumentException("not a query sql statement");

  6.   }

  7.   MongoData mongo = new MongoData();

  8.   DBCursor c = null;

  9.   SQLSelectStatement selectStmt = (SQLSelectStatement) statement;

  10.   SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();

  11.   int icount = 0;

  12.   if (sqlSelectQuery instanceof MySqlSelectQueryBlock) {

  13.       MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();

  14.       BasicDBObject fields = new BasicDBObject();

  15.       // 显示(返回)的字段

  16.       for (SQLSelectItem item : mysqlSelectQuery.getSelectList()) {

  17.           //System.out.println(item.toString());

  18.           if (!(item.getExpr() instanceof SQLAllColumnExpr)) {

  19.               if (item.getExpr() instanceof SQLAggregateExpr) {

  20.                   SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();

  21.                   if (expr.getMethodName().equals("COUNT")) { // TODO 待读:count(*)

  22.                       icount = 1;

  23.                       mongo.setField(getExprFieldName(expr), Types.BIGINT);

  24.                   }

  25.                   fields.put(getExprFieldName(expr), 1);

  26.               } else {

  27.                   fields.put(getFieldName(item), 1);

  28.               }

  29.           }

  30.       }

  31.       // 表名

  32.       SQLTableSource table = mysqlSelectQuery.getFrom();

  33.       DBCollection coll = this._db.getCollection(table.toString());

  34.       mongo.setTable(table.toString());

  35.       // WHERE

  36.       SQLExpr expr = mysqlSelectQuery.getWhere();

  37.       DBObject query = parserWhere(expr);

  38.       // GROUP BY

  39.       SQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();

  40.       BasicDBObject gbkey = new BasicDBObject();

  41.       if (groupby != null) {

  42.           for (SQLExpr gbexpr : groupby.getItems()) {

  43.               if (gbexpr instanceof SQLIdentifierExpr) {

  44.                   String name = ((SQLIdentifierExpr) gbexpr).getName();

  45.                   gbkey.put(name, Integer.valueOf(1));

  46.               }

  47.           }

  48.           icount = 2;

  49.       }

  50.       // SKIP / LIMIT

  51.       int limitoff = 0;

  52.       int limitnum = 0;

  53.       if (mysqlSelectQuery.getLimit() != null) {

  54.           limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());

  55.           limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());

  56.       }

  57.       if (icount == 1) { // COUNT(*)

  58.           mongo.setCount(coll.count(query));

  59.       } else if (icount == 2) { // MapReduce

  60.           BasicDBObject initial = new BasicDBObject();

  61.           initial.put("num", 0);

  62.           String reduce = "function (obj, prev) { " + "  prev.num++}";

  63.           mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));

  64.       } else {

  65.           if ((limitoff > 0) || (limitnum > 0)) {

  66.               c = coll.find(query, fields).skip(limitoff).limit(limitnum);

  67.           } else {

  68.               c = coll.find(query, fields);

  69.           }

  70.           // order by

  71.           SQLOrderBy orderby = mysqlSelectQuery.getOrderBy();

  72.           if (orderby != null) {

  73.               BasicDBObject order = new BasicDBObject();

  74.               for (int i = 0; i < orderby.getItems().size(); i++) {

  75.                   SQLSelectOrderByItem orderitem = orderby.getItems().get(i);

  76.                   order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));

  77.               }

  78.               c.sort(order);

  79.               // System.out.println(order);

  80.           }

  81.       }

  82.       mongo.setCursor(c);

  83.   }

  84.   return mongo;

  85. }

2、查询条件

  1. // MongoSQLParser.java

  2. private void parserWhere(SQLExpr aexpr, BasicDBObject o) {

  3.   if (aexpr instanceof SQLBinaryOpExpr) {

  4.       SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr;

  5.       SQLExpr exprL = expr.getLeft();

  6.       if (!(exprL instanceof SQLBinaryOpExpr)) {

  7.           if (expr.getOperator().getName().equals("=")) {

  8.               o.put(exprL.toString(), getExpValue(expr.getRight()));

  9.           } else {

  10.               String op = "";

  11.               if (expr.getOperator().getName().equals("<")) {

  12.                   op = "$lt";

  13.               } else if (expr.getOperator().getName().equals("<=")) {

  14.                   op = "$lte";

  15.               } else if (expr.getOperator().getName().equals(">")) {

  16.                   op = "$gt";

  17.               } else if (expr.getOperator().getName().equals(">=")) {

  18.                   op = "$gte";

  19.               } else if (expr.getOperator().getName().equals("!=")) {

  20.                   op = "$ne";

  21.               } else if (expr.getOperator().getName().equals("<>")) {

  22.                   op = "$ne";

  23.               }

  24.               parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));

  25.           }

  26.       } else {

  27.           if (expr.getOperator().getName().equals("AND")) {

  28.               parserWhere(exprL, o);

  29.               parserWhere(expr.getRight(), o);

  30.           } else if (expr.getOperator().getName().equals("OR")) {

  31.               orWhere(exprL, expr.getRight(), o);

  32.           } else {

  33.               throw new RuntimeException("Can't identify the operation of  of where");

  34.           }

  35.       }

  36.   }

  37. }

  38. private void orWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob) {

  39.   BasicDBObject xo = new BasicDBObject();

  40.   BasicDBObject yo = new BasicDBObject();

  41.   parserWhere(exprL, xo);

  42.   parserWhere(exprR, yo);

  43.   ob.put("$or", new Object[]{xo, yo});

  44. }

3、解析 MongoDB 数据

  1. // MongoResultSet.java

  2. public MongoResultSet(MongoData mongo, String schema) throws SQLException {

  3.   this._cursor = mongo.getCursor();

  4.   this._schema = schema;

  5.   this._table = mongo.getTable();

  6.   this.isSum = mongo.getCount() > 0;

  7.   this._sum = mongo.getCount();

  8.   this.isGroupBy = mongo.getType();

  9.   if (this.isGroupBy) {

  10.       dblist = mongo.getGrouyBys();

  11.       this.isSum = true;

  12.   }

  13.   if (this._cursor != null) {

  14.       select = _cursor.getKeysWanted().keySet().toArray(new String[0]);

  15.       // 解析 fields

  16.       if (this._cursor.hasNext()) {

  17.           _cur = _cursor.next();

  18.           if (_cur != null) {

  19.               if (select.length == 0) {

  20.                   SetFields(_cur.keySet());

  21.               }

  22.               _row = 1;

  23.           }

  24.       }

  25.       // 设置 fields 类型

  26.       if (select.length == 0) {

  27.           select = new String[]{"_id"};

  28.           SetFieldType(true);

  29.       } else {

  30.           SetFieldType(false);

  31.       }

  32.   } else {

  33.       SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};

  34.       SetFieldType(mongo.getFields());

  35.   }

  36. }

  • 当使用 SELECT* 查询字段时,fields 使用第一条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。

4、返回数据给 MySQL Client

  1. // JDBCConnection.java

  2. private void ouputResultSet(ServerConnection sc, String sql)

  3.       throws SQLException {

  4.   ResultSet rs = null;

  5.   Statement stmt = null;

  6.   try {

  7.       stmt = con.createStatement();

  8.       rs = stmt.executeQuery(sql);

  9.       // header

  10.       List<FieldPacket> fieldPks = new LinkedList<>();

  11.       ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark);

  12.       int colunmCount = fieldPks.size();

  13.       ByteBuffer byteBuf = sc.allocate();

  14.       ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();

  15.       headerPkg.fieldCount = fieldPks.size();

  16.       headerPkg.packetId = ++packetId;

  17.       byteBuf = headerPkg.write(byteBuf, sc, true);

  18.       byteBuf.flip();

  19.       byte[] header = new byte[byteBuf.limit()];

  20.       byteBuf.get(header);

  21.       byteBuf.clear();

  22.       List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size());

  23.       for (FieldPacket curField : fieldPks) {

  24.           curField.packetId = ++packetId;

  25.           byteBuf = curField.write(byteBuf, sc, false);

  26.           byteBuf.flip();

  27.           byte[] field = new byte[byteBuf.limit()];

  28.           byteBuf.get(field);

  29.           byteBuf.clear();

  30.           fields.add(field);

  31.       }

  32.       // header eof

  33.       EOFPacket eofPckg = new EOFPacket();

  34.       eofPckg.packetId = ++packetId;

  35.       byteBuf = eofPckg.write(byteBuf, sc, false);

  36.       byteBuf.flip();

  37.       byte[] eof = new byte[byteBuf.limit()];

  38.       byteBuf.get(eof);

  39.       byteBuf.clear();

  40.       this.respHandler.fieldEofResponse(header, fields, eof, this);

  41.       // row

  42.       while (rs.next()) {

  43.           RowDataPacket curRow = new RowDataPacket(colunmCount);

  44.           for (int i = 0; i < colunmCount; i++) {

  45.               int j = i + 1;

  46.               if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {

  47.                   curRow.add(rs.getBytes(j));

  48.               } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||

  49.                       fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte

  50.                   // ensure that do not use scientific notation format

  51.                   BigDecimal val = rs.getBigDecimal(j);

  52.                   curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset()));

  53.               } else {

  54.                   curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));

  55.               }

  56.           }

  57.           curRow.packetId = ++packetId;

  58.           byteBuf = curRow.write(byteBuf, sc, false);

  59.           byteBuf.flip();

  60.           byte[] row = new byte[byteBuf.limit()];

  61.           byteBuf.get(row);

  62.           byteBuf.clear();

  63.           this.respHandler.rowResponse(row, this);

  64.       }

  65.       fieldPks.clear();

  66.       // row eof

  67.       eofPckg = new EOFPacket();

  68.       eofPckg.packetId = ++packetId;

  69.       byteBuf = eofPckg.write(byteBuf, sc, false);

  70.       byteBuf.flip();

  71.       eof = new byte[byteBuf.limit()];

  72.       byteBuf.get(eof);

  73.       sc.recycle(byteBuf);

  74.       this.respHandler.rowEofResponse(eof, this);

  75.   } finally {

  76.       if (rs != null) {

  77.           try {

  78.               rs.close();

  79.           } catch (SQLException e) {

  80.           }

  81.       }

  82.       if (stmt != null) {

  83.           try {

  84.               stmt.close();

  85.           } catch (SQLException e) {

  86.           }

  87.       }

  88.   }

  89. }

  90. // MongoResultSet.java

  91. @Override

  92. public String getString(String columnLabel) throws SQLException {

  93.   Object x = getObject(columnLabel);

  94.   if (x == null) {

  95.       return null;

  96.   }

  97.   return x.toString();

  98. }

  • 当返回字段值是 Object 时,返回该对象.toString()。例如:

  1. mysql> select * from user order by _id asc;

  2. +--------------------------+------+-------------------------------+

  3. | _id                      | name | profile                       |

  4. +--------------------------+------+-------------------------------+

  5. | 1                        | 123  | { "age" : 1 , "height" : 100} |

4. 插入操作

  1. // MongoSQLParser.java

  2. public int executeUpdate() throws MongoSQLException {

  3.   if (statement instanceof SQLInsertStatement) {

  4.       return InsertData((SQLInsertStatement) statement);

  5.   }

  6.   if (statement instanceof SQLUpdateStatement) {

  7.       return UpData((SQLUpdateStatement) statement);

  8.   }

  9.   if (statement instanceof SQLDropTableStatement) {

  10.       return dropTable((SQLDropTableStatement) statement);

  11.   }

  12.   if (statement instanceof SQLDeleteStatement) {

  13.       return DeleteDate((SQLDeleteStatement) statement);

  14.   }

  15.   if (statement instanceof SQLCreateTableStatement) {

  16.       return 1;

  17.   }

  18.   return 1;

  19. }

  20. private int InsertData(SQLInsertStatement state) {

  21.   if (state.getValues().getValues().size() == 0) {

  22.       throw new RuntimeException("number of  columns error");

  23.   }

  24.   if (state.getValues().getValues().size() != state.getColumns().size()) {

  25.       throw new RuntimeException("number of values and columns have to match");

  26.   }

  27.   SQLTableSource table = state.getTableSource();

  28.   BasicDBObject o = new BasicDBObject();

  29.   int i = 0;

  30.   for (SQLExpr col : state.getColumns()) {

  31.       o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));

  32.       i++;

  33.   }

  34.   DBCollection coll = this._db.getCollection(table.toString());

  35.   coll.insert(o);

  36.   return 1;

  37. }

5. 彩蛋

老铁,看到这里,来一波微信公众号关注吧?!

1、支持多 MongoDB ,并使用 MyCAT 进行分片。

MyCAT 配置:multi_mongodb

2、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。

查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。

MyCAT 配置:singlemongodbmysql

3、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。

MyCAT 配置:single_mongodb


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存