1. 概述
可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。
本文主要分成四部分:
- 总体流程,让你有个整体的认识
- 查询操作
- 插入操作
- 彩蛋,😈彩蛋,🙂彩蛋
建议你看过这两篇文章(_非必须_):
- 《MyCAT源码分析 —— 【单库单表】插入》
- 《MyCAT源码分析 —— 【单库单表】查询》
2. 主流程
- MyCAT Server 接收 MySQL Client 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDB Server。
- MyCAT Server 接收 MongoDB Server 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQL Client。
这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。
Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。
MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。
是不是熟悉的味道。不得不说 JDBC 规范的精妙。
3. 查询操作
SELECTid,nameFROMuserWHEREname>''ORDERBY_idDESC;
看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。
1)、查询 MongoDB
//MongoSQLParser.java publicMongoDataquery()throwsMongoSQLException{ if(!(statementinstanceofSQLSelectStatement)){ //returnnull; thrownewIllegalArgumentException("notaquerysqlstatement"); } MongoDatamongo=newMongoData(); DBCursorc=null; SQLSelectStatementselectStmt=(SQLSelectStatement)statement; SQLSelectQuerysqlSelectQuery=selectStmt.getSelect().getQuery(); inticount=0; if(sqlSelectQueryinstanceofMySqlSelectQueryBlock){ MySqlSelectQueryBlockmysqlSelectQuery=(MySqlSelectQueryBlock)selectStmt.getSelect().getQuery(); BasicDBObjectfields=newBasicDBObject(); //显示(返回)的字段 for(SQLSelectItemitem:mysqlSelectQuery.getSelectList()){ //System.out.println(item.toString()); if(!(item.getExpr()instanceofSQLAllColumnExpr)){ if(item.getExpr()instanceofSQLAggregateExpr){ SQLAggregateExprexpr=(SQLAggregateExpr)item.getExpr(); if(expr.getMethodName().equals("COUNT")){//TODO待读:count(*) icount=1; mongo.setField(getExprFieldName(expr),Types.BIGINT); } fields.put(getExprFieldName(expr),1); }else{ fields.put(getFieldName(item),1); } } } //表名 SQLTableSourcetable=mysqlSelectQuery.getFrom(); DBCollectioncoll=this._db.getCollection(table.toString()); mongo.setTable(table.toString()); //WHERE SQLExprexpr=mysqlSelectQuery.getWhere(); DBObjectquery=parserWhere(expr); //GROUPBY SQLSelectGroupByClausegroupby=mysqlSelectQuery.getGroupBy(); BasicDBObjectgbkey=newBasicDBObject(); if(groupby!=null){ for(SQLExprgbexpr:groupby.getItems()){ if(gbexprinstanceofSQLIdentifierExpr){ Stringname=((SQLIdentifierExpr)gbexpr).getName(); gbkey.put(name,Integer.valueOf(1)); } } icount=2; } //SKIP/LIMIT intlimitoff=0; intlimitnum=0; if(mysqlSelectQuery.getLimit()!=null){ limitoff=getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset()); limitnum=getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount()); } if(icount==1){//COUNT(*) mongo.setCount(coll.count(query)); }elseif(icount==2){//MapReduce BasicDBObjectinitial=newBasicDBObject(); initial.put("num",0); Stringreduce="function(obj,prev){"+"prev.num++}"; mongo.setGrouyBy(coll.group(gbkey,query,initial,reduce)); }else{ if((limitoff>0)||(limitnum>0)){ c=coll.find(query,fields).skip(limitoff).limit(limitnum); }else{ c=coll.find(query,fields); } //orderby SQLOrderByorderby=mysqlSelectQuery.getOrderBy(); if(orderby!=null){ BasicDBObjectorder=newBasicDBObject(); for(inti=0;i<orderby.getItems().size();i++){ SQLSelectOrderByItemorderitem=orderby.getItems().get(i); order.put(orderitem.getExpr().toString(),getSQLExprToAsc(orderitem.getType())); } c.sort(order); //System.out.println(order); } } mongo.setCursor(c); } returnmongo; }
2)、查询条件
//MongoSQLParser.java privatevoidparserWhere(SQLExpraexpr,BasicDBObjecto){ if(aexprinstanceofSQLBinaryOpExpr){ SQLBinaryOpExprexpr=(SQLBinaryOpExpr)aexpr; SQLExprexprL=expr.getLeft(); if(!(exprLinstanceofSQLBinaryOpExpr)){ if(expr.getOperator().getName().equals("=")){ o.put(exprL.toString(),getExpValue(expr.getRight())); }else{ Stringop=""; if(expr.getOperator().getName().equals("<")){ op="$lt"; }elseif(expr.getOperator().getName().equals("<=")){ op="$lte"; }elseif(expr.getOperator().getName().equals(">")){ op="$gt"; }elseif(expr.getOperator().getName().equals(">=")){ op="$gte"; }elseif(expr.getOperator().getName().equals("!=")){ op="$ne"; }elseif(expr.getOperator().getName().equals("<>")){ op="$ne"; } parserDBObject(o,exprL.toString(),op,getExpValue(expr.getRight())); } }else{ if(expr.getOperator().getName().equals("AND")){ parserWhere(exprL,o); parserWhere(expr.getRight(),o); }elseif(expr.getOperator().getName().equals("OR")){ orWhere(exprL,expr.getRight(),o); }else{ thrownewRuntimeException("Can'tidentifytheoperationofofwhere"); } } } } privatevoidorWhere(SQLExprexprL,SQLExprexprR,BasicDBObjectob){ BasicDBObjectxo=newBasicDBObject(); BasicDBObjectyo=newBasicDBObject(); parserWhere(exprL,xo); parserWhere(exprR,yo); ob.put("$or",newObject[]{xo,yo}); }
3)、解析 MongoDB 数据
//MongoResultSet.java publicMongoResultSet(MongoDatamongo,Stringschema)throwsSQLException{ this._cursor=mongo.getCursor(); this._schema=schema; this._table=mongo.getTable(); this.isSum=mongo.getCount()>0; this._sum=mongo.getCount(); this.isGroupBy=mongo.getType(); if(this.isGroupBy){ dblist=mongo.getGrouyBys(); this.isSum=true; } if(this._cursor!=null){ select=_cursor.getKeysWanted().keySet().toArray(newString[0]); //解析fields if(this._cursor.hasNext()){ _cur=_cursor.next(); if(_cur!=null){ if(select.length==0){ SetFields(_cur.keySet()); } _row=1; } } //设置fields类型 if(select.length==0){ select=newString[]{"_id"}; SetFieldType(true); }else{ SetFieldType(false); } }else{ SetFields(mongo.getFields().keySet());//newString[]{"COUNT(*)"}; SetFieldType(mongo.getFields()); } }
当使用 SELECT * 查询字段时,fields 使用***条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。
4)、返回数据给 MySQL Client
//JDBCConnection.java privatevoidouputResultSet(ServerConnectionsc,Stringsql) throwsSQLException{ ResultSetrs=null; Statementstmt=null; try{ stmt=con.createStatement(); rs=stmt.executeQuery(sql); //header List<FieldPacket>fieldPks=newLinkedList<>(); ResultSetUtil.resultSetToFieldPacket(sc.getCharset(),fieldPks,rs,this.isSpark); intcolunmCount=fieldPks.size(); ByteBufferbyteBuf=sc.allocate(); ResultSetHeaderPacketheaderPkg=newResultSetHeaderPacket(); headerPkg.fieldCount=fieldPks.size(); headerPkg.packetId=++packetId; byteBuf=headerPkg.write(byteBuf,sc,true); byteBuf.flip(); byte[]header=newbyte[byteBuf.limit()]; byteBuf.get(header); byteBuf.clear(); List<byte[]>fields=newArrayList<byte[]>(fieldPks.size()); for(FieldPacketcurField:fieldPks){ curField.packetId=++packetId; byteBuf=curField.write(byteBuf,sc,false); byteBuf.flip(); byte[]field=newbyte[byteBuf.limit()]; byteBuf.get(field); byteBuf.clear(); fields.add(field); } //headereof EOFPacketeofPckg=newEOFPacket(); eofPckg.packetId=++packetId; byteBuf=eofPckg.write(byteBuf,sc,false); byteBuf.flip(); byte[]eof=newbyte[byteBuf.limit()]; byteBuf.get(eof); byteBuf.clear(); this.respHandler.fieldEofResponse(header,fields,eof,this); //row while(rs.next()){ RowDataPacketcurRow=newRowDataPacket(colunmCount); for(inti=0;i<colunmCount;i++){ intj=i+1; if(MysqlDefs.isBianry((byte)fieldPks.get(i).type)){ curRow.add(rs.getBytes(j)); }elseif(fieldPks.get(i).type==MysqlDefs.FIELD_TYPE_DECIMAL|| fieldPks.get(i).type==(MysqlDefs.FIELD_TYPE_NEW_DECIMAL-256)){//fieldtypeisunsignedbyte //ensurethatdonotusescientificnotationformat BigDecimalval=rs.getBigDecimal(j); curRow.add(StringUtil.encode(val!=null?val.toPlainString():null,sc.getCharset())); }else{ curRow.add(StringUtil.encode(rs.getString(j),sc.getCharset())); } } curRow.packetId=++packetId; byteBuf=curRow.write(byteBuf,sc,false); byteBuf.flip(); byte[]row=newbyte[byteBuf.limit()]; byteBuf.get(row); byteBuf.clear(); this.respHandler.rowResponse(row,this); } fieldPks.clear(); //roweof eofPckg=newEOFPacket(); eofPckg.packetId=++packetId; byteBuf=eofPckg.write(byteBuf,sc,false); byteBuf.flip(); eof=newbyte[byteBuf.limit()]; byteBuf.get(eof); sc.recycle(byteBuf); this.respHandler.rowEofResponse(eof,this); }finally{ if(rs!=null){ try{ rs.close(); }catch(SQLExceptione){ } } if(stmt!=null){ try{ stmt.close(); }catch(SQLExceptione){ } } } } //MongoResultSet.java @Override publicStringgetString(StringcolumnLabel)throwsSQLException{ Objectx=getObject(columnLabel); if(x==null){ returnnull; } returnx.toString(); }
当返回字段值是 Object 时,返回该对象.toString()。例如:
mysql>select*fromuserorderby_idasc; +--------------------------+------+-------------------------------+ |_id|name|profile| +--------------------------+------+-------------------------------+ |1|123|{"age":1,"height":100}|
4. 插入操作
//MongoSQLParser.java publicintexecuteUpdate()throwsMongoSQLException{ if(statementinstanceofSQLInsertStatement){ returnInsertData((SQLInsertStatement)statement); } if(statementinstanceofSQLUpdateStatement){ returnUpData((SQLUpdateStatement)statement); } if(statementinstanceofSQLDropTableStatement){ returndropTable((SQLDropTableStatement)statement); } if(statementinstanceofSQLDeleteStatement){ returnDeleteDate((SQLDeleteStatement)statement); } if(statementinstanceofSQLCreateTableStatement){ return1; } return1; } privateintInsertData(SQLInsertStatementstate){ if(state.getValues().getValues().size()==0){ thrownewRuntimeException("numberofcolumnserror"); } if(state.getValues().getValues().size()!=state.getColumns().size()){ thrownewRuntimeException("numberofvaluesandcolumnshavetomatch"); } SQLTableSourcetable=state.getTableSource(); BasicDBObjecto=newBasicDBObject(); inti=0; for(SQLExprcol:state.getColumns()){ o.put(getFieldName2(col),getExpValue(state.getValues().getValues().get(i))); i++; } DBCollectioncoll=this._db.getCollection(table.toString()); coll.insert(o); return1; }
5. 彩蛋
1)、支持多 MongoDB ,并使用 MyCAT 进行分片。
MyCAT 配置:multi_mongodb
2)、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。
查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。
MyCAT 配置:single_mongodb_mysql
3)、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。
MyCAT 配置:single_mongodb
转载请注明:IT运维空间 » 运维技术 » SQL ON MongoDB实现原理
发表评论