admin

SQL ON MongoDB实现原理

admin 运维技术 2022-11-19 404浏览 0

1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。

吼吼吼,让我们开启这段神奇的“旅途”。

本文主要分成四部分:

  1. 总体流程,让你有个整体的认识
  2. 查询操作
  3. 插入操作
  4. 彩蛋,😈彩蛋,🙂彩蛋

    建议你看过这两篇文章(_非必须_):

    1. 《MyCAT源码分析 —— 【单库单表】插入》
    2. 《MyCAT源码分析 —— 【单库单表】查询》

      2. 主流程

      SQL ON MongoDB实现原理

      1. MyCAT Server 接收 MySQL Client 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDB Server。
      2. MyCAT Server 接收 MongoDB Server 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQL Client。

        这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。

        SQL ON MongoDB实现原理

        Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。

        MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。

        SQL ON MongoDB实现原理

        是不是熟悉的味道。不得不说 JDBC 规范的精妙。

        3. 查询操作

        SELECTid,nameFROMuserWHEREname>''ORDERBY_idDESC;
        

        SQL ON MongoDB实现原理

        看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。

        1)、查询 MongoDB

        //MongoSQLParser.java
        publicMongoDataquery()throwsMongoSQLException{
        if(!(statementinstanceofSQLSelectStatement)){
        //returnnull;
        thrownewIllegalArgumentException("notaquerysqlstatement");
        }
        MongoDatamongo=newMongoData();
        DBCursorc=null;
        SQLSelectStatementselectStmt=(SQLSelectStatement)statement;
        SQLSelectQuerysqlSelectQuery=selectStmt.getSelect().getQuery();
        inticount=0;
        if(sqlSelectQueryinstanceofMySqlSelectQueryBlock){
        MySqlSelectQueryBlockmysqlSelectQuery=(MySqlSelectQueryBlock)selectStmt.getSelect().getQuery();
        
        BasicDBObjectfields=newBasicDBObject();
        
        //显示(返回)的字段
        for(SQLSelectItemitem:mysqlSelectQuery.getSelectList()){
        //System.out.println(item.toString());
        if(!(item.getExpr()instanceofSQLAllColumnExpr)){
        if(item.getExpr()instanceofSQLAggregateExpr){
        SQLAggregateExprexpr=(SQLAggregateExpr)item.getExpr();
        if(expr.getMethodName().equals("COUNT")){//TODO待读:count(*)
        icount=1;
        mongo.setField(getExprFieldName(expr),Types.BIGINT);
        }
        fields.put(getExprFieldName(expr),1);
        }else{
        fields.put(getFieldName(item),1);
        }
        }
        
        }
        
        //表名
        SQLTableSourcetable=mysqlSelectQuery.getFrom();
        DBCollectioncoll=this._db.getCollection(table.toString());
        mongo.setTable(table.toString());
        
        //WHERE
        SQLExprexpr=mysqlSelectQuery.getWhere();
        DBObjectquery=parserWhere(expr);
        
        //GROUPBY
        SQLSelectGroupByClausegroupby=mysqlSelectQuery.getGroupBy();
        BasicDBObjectgbkey=newBasicDBObject();
        if(groupby!=null){
        for(SQLExprgbexpr:groupby.getItems()){
        if(gbexprinstanceofSQLIdentifierExpr){
        Stringname=((SQLIdentifierExpr)gbexpr).getName();
        gbkey.put(name,Integer.valueOf(1));
        }
        }
        icount=2;
        }
        
        //SKIP/LIMIT
        intlimitoff=0;
        intlimitnum=0;
        if(mysqlSelectQuery.getLimit()!=null){
        limitoff=getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());
        limitnum=getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());
        }
        if(icount==1){//COUNT(*)
        mongo.setCount(coll.count(query));
        }elseif(icount==2){//MapReduce
        BasicDBObjectinitial=newBasicDBObject();
        initial.put("num",0);
        Stringreduce="function(obj,prev){"+"prev.num++}";
        mongo.setGrouyBy(coll.group(gbkey,query,initial,reduce));
        }else{
        if((limitoff>0)||(limitnum>0)){
        c=coll.find(query,fields).skip(limitoff).limit(limitnum);
        }else{
        c=coll.find(query,fields);
        }
        
        //orderby
        SQLOrderByorderby=mysqlSelectQuery.getOrderBy();
        if(orderby!=null){
        BasicDBObjectorder=newBasicDBObject();
        for(inti=0;i<orderby.getItems().size();i++){
        SQLSelectOrderByItemorderitem=orderby.getItems().get(i);
        order.put(orderitem.getExpr().toString(),getSQLExprToAsc(orderitem.getType()));
        }
        c.sort(order);
        //System.out.println(order);
        }
        }
        mongo.setCursor(c);
        }
        returnmongo;
        }
        

        2)、查询条件

        //MongoSQLParser.java
        privatevoidparserWhere(SQLExpraexpr,BasicDBObjecto){
        if(aexprinstanceofSQLBinaryOpExpr){
        SQLBinaryOpExprexpr=(SQLBinaryOpExpr)aexpr;
        SQLExprexprL=expr.getLeft();
        if(!(exprLinstanceofSQLBinaryOpExpr)){
        if(expr.getOperator().getName().equals("=")){
        o.put(exprL.toString(),getExpValue(expr.getRight()));
        }else{
        Stringop="";
        if(expr.getOperator().getName().equals("<")){
        op="$lt";
        }elseif(expr.getOperator().getName().equals("<=")){
        op="$lte";
        }elseif(expr.getOperator().getName().equals(">")){
        op="$gt";
        }elseif(expr.getOperator().getName().equals(">=")){
        op="$gte";
        }elseif(expr.getOperator().getName().equals("!=")){
        op="$ne";
        }elseif(expr.getOperator().getName().equals("<>")){
        op="$ne";
        }
        parserDBObject(o,exprL.toString(),op,getExpValue(expr.getRight()));
        }
        }else{
        if(expr.getOperator().getName().equals("AND")){
        parserWhere(exprL,o);
        parserWhere(expr.getRight(),o);
        }elseif(expr.getOperator().getName().equals("OR")){
        orWhere(exprL,expr.getRight(),o);
        }else{
        thrownewRuntimeException("Can'tidentifytheoperationofofwhere");
        }
        }
        }
        }
        
        privatevoidorWhere(SQLExprexprL,SQLExprexprR,BasicDBObjectob){
        BasicDBObjectxo=newBasicDBObject();
        BasicDBObjectyo=newBasicDBObject();
        parserWhere(exprL,xo);
        parserWhere(exprR,yo);
        ob.put("$or",newObject[]{xo,yo});
        }
        

        3)、解析 MongoDB 数据

        //MongoResultSet.java
        publicMongoResultSet(MongoDatamongo,Stringschema)throwsSQLException{
        this._cursor=mongo.getCursor();
        this._schema=schema;
        this._table=mongo.getTable();
        this.isSum=mongo.getCount()>0;
        this._sum=mongo.getCount();
        this.isGroupBy=mongo.getType();
        
        if(this.isGroupBy){
        dblist=mongo.getGrouyBys();
        this.isSum=true;
        }
        if(this._cursor!=null){
        select=_cursor.getKeysWanted().keySet().toArray(newString[0]);
        //解析fields
        if(this._cursor.hasNext()){
        _cur=_cursor.next();
        if(_cur!=null){
        if(select.length==0){
        SetFields(_cur.keySet());
        }
        _row=1;
        }
        }
        //设置fields类型
        if(select.length==0){
        select=newString[]{"_id"};
        SetFieldType(true);
        }else{
        SetFieldType(false);
        }
        }else{
        SetFields(mongo.getFields().keySet());//newString[]{"COUNT(*)"};
        SetFieldType(mongo.getFields());
        }
        }
        

        当使用 SELECT * 查询字段时,fields 使用***条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。

        4)、返回数据给 MySQL Client

        //JDBCConnection.java
        privatevoidouputResultSet(ServerConnectionsc,Stringsql)
        throwsSQLException{
        ResultSetrs=null;
        Statementstmt=null;
        
        try{
        stmt=con.createStatement();
        rs=stmt.executeQuery(sql);
        
        //header
        List<FieldPacket>fieldPks=newLinkedList<>();
        ResultSetUtil.resultSetToFieldPacket(sc.getCharset(),fieldPks,rs,this.isSpark);
        intcolunmCount=fieldPks.size();
        ByteBufferbyteBuf=sc.allocate();
        ResultSetHeaderPacketheaderPkg=newResultSetHeaderPacket();
        headerPkg.fieldCount=fieldPks.size();
        headerPkg.packetId=++packetId;
        byteBuf=headerPkg.write(byteBuf,sc,true);
        byteBuf.flip();
        byte[]header=newbyte[byteBuf.limit()];
        byteBuf.get(header);
        byteBuf.clear();
        List<byte[]>fields=newArrayList<byte[]>(fieldPks.size());
        for(FieldPacketcurField:fieldPks){
        curField.packetId=++packetId;
        byteBuf=curField.write(byteBuf,sc,false);
        byteBuf.flip();
        byte[]field=newbyte[byteBuf.limit()];
        byteBuf.get(field);
        byteBuf.clear();
        fields.add(field);
        }
        //headereof
        EOFPacketeofPckg=newEOFPacket();
        eofPckg.packetId=++packetId;
        byteBuf=eofPckg.write(byteBuf,sc,false);
        byteBuf.flip();
        byte[]eof=newbyte[byteBuf.limit()];
        byteBuf.get(eof);
        byteBuf.clear();
        this.respHandler.fieldEofResponse(header,fields,eof,this);
        
        //row
        while(rs.next()){
        RowDataPacketcurRow=newRowDataPacket(colunmCount);
        for(inti=0;i<colunmCount;i++){
        intj=i+1;
        if(MysqlDefs.isBianry((byte)fieldPks.get(i).type)){
        curRow.add(rs.getBytes(j));
        }elseif(fieldPks.get(i).type==MysqlDefs.FIELD_TYPE_DECIMAL||
        fieldPks.get(i).type==(MysqlDefs.FIELD_TYPE_NEW_DECIMAL-256)){//fieldtypeisunsignedbyte
        //ensurethatdonotusescientificnotationformat
        BigDecimalval=rs.getBigDecimal(j);
        curRow.add(StringUtil.encode(val!=null?val.toPlainString():null,sc.getCharset()));
        }else{
        curRow.add(StringUtil.encode(rs.getString(j),sc.getCharset()));
        }
        }
        curRow.packetId=++packetId;
        byteBuf=curRow.write(byteBuf,sc,false);
        byteBuf.flip();
        byte[]row=newbyte[byteBuf.limit()];
        byteBuf.get(row);
        byteBuf.clear();
        this.respHandler.rowResponse(row,this);
        }
        fieldPks.clear();
        //roweof
        eofPckg=newEOFPacket();
        eofPckg.packetId=++packetId;
        byteBuf=eofPckg.write(byteBuf,sc,false);
        byteBuf.flip();
        eof=newbyte[byteBuf.limit()];
        byteBuf.get(eof);
        sc.recycle(byteBuf);
        this.respHandler.rowEofResponse(eof,this);
        }finally{
        if(rs!=null){
        try{
        rs.close();
        }catch(SQLExceptione){
        }
        }
        if(stmt!=null){
        try{
        stmt.close();
        }catch(SQLExceptione){
        }
        }
        }
        }
        
        //MongoResultSet.java
        @Override
        publicStringgetString(StringcolumnLabel)throwsSQLException{
        Objectx=getObject(columnLabel);
        if(x==null){
        returnnull;
        }
        returnx.toString();
        }
        

        当返回字段值是 Object 时,返回该对象.toString()。例如:

        mysql>select*fromuserorderby_idasc;
        
        +--------------------------+------+-------------------------------+
        
        |_id|name|profile|
        
        +--------------------------+------+-------------------------------+
        
        |1|123|{"age":1,"height":100}|
        

        4. 插入操作

        SQL ON MongoDB实现原理

        //MongoSQLParser.java
        publicintexecuteUpdate()throwsMongoSQLException{
        if(statementinstanceofSQLInsertStatement){
        returnInsertData((SQLInsertStatement)statement);
        }
        if(statementinstanceofSQLUpdateStatement){
        returnUpData((SQLUpdateStatement)statement);
        }
        if(statementinstanceofSQLDropTableStatement){
        returndropTable((SQLDropTableStatement)statement);
        }
        if(statementinstanceofSQLDeleteStatement){
        returnDeleteDate((SQLDeleteStatement)statement);
        }
        if(statementinstanceofSQLCreateTableStatement){
        return1;
        }
        return1;
        }
        
        privateintInsertData(SQLInsertStatementstate){
        if(state.getValues().getValues().size()==0){
        thrownewRuntimeException("numberofcolumnserror");
        }
        if(state.getValues().getValues().size()!=state.getColumns().size()){
        thrownewRuntimeException("numberofvaluesandcolumnshavetomatch");
        }
        SQLTableSourcetable=state.getTableSource();
        BasicDBObjecto=newBasicDBObject();
        inti=0;
        for(SQLExprcol:state.getColumns()){
        o.put(getFieldName2(col),getExpValue(state.getValues().getValues().get(i)));
        i++;
        }
        DBCollectioncoll=this._db.getCollection(table.toString());
        coll.insert(o);
        return1;
        }
        

        5. 彩蛋

        1)、支持多 MongoDB ,并使用 MyCAT 进行分片。

        MyCAT 配置:multi_mongodb

        2)、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。

        查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。

        MyCAT 配置:single_mongodb_mysql

        3)、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。

        MyCAT 配置:single_mongodb

继续浏览有关 MongoDB 的文章
发表评论