`

一个将mysql表导入到mongodb的ruby脚本

阅读更多

功能:

将mysql指定表的数据导入到mongodb的指定表,导入过程保证数据不丢失,如果数据有更新也会重新再导。

 

要求:

源表必须要有两个字段:id:主键 mmm_ts:最后更新时间戳

 

 

 

 

require 'rubygems'
require 'mongo'
require 'active_record'

mongo_server =
    {:local =>
         {:server => "localhost", :port => 27017}
    }
mysql_server =
    {:local =>
         {
             :adapter => 'mysql',
             :host => 'localhost',
             :username => 'root',
             :password => '$$$',
             :encoding => 'utf8'
         }         ,
      :main_db =>
         {
             :adapter => 'mysql',
             :host => 'mydb.com',
             :username => 'abc',
             :password => 'cba',
             :encoding => 'utf8'
         }
    }

#从mysql指定的表中将数据导入到mongodb中
#源表必须有两个字段:id,自增, mmm_ts, 时间戳
#导数据的时候先按上次操作截止的时间戳获取指定数量的数据,然后再将数据存入到目标库中(如果id相同则更新数据)
#导入完毕更新截止点
class MMM
  def initialize(mysql_server, mongo_server, operation_type)
    @mysql_server= mysql_server
    @mongo_server = mongo_server
    @operation_type = operation_type
  end

  def migrate(source_db, source_table, process_items_each_time, target_db, target_table)
    mongodb = Mongo::Connection.new(@mongo_server[:server], @mongo_server[:port]).db(target_db)
    mongo_collection = mongodb.collection(target_table)
    @mysql_server[:database] = source_db
    ActiveRecord::Base.establish_connection(@mysql_server)
    model = Class.new(ActiveRecord::Base) do
      set_table_name source_table
    end
    operation_point = OperationPoint.new(mongodb)
    opt = operation_point.get(@operation_type)
    if opt == nil
      opt = 0
      mongo_collection.create_index("id", :unique => true)
    else
      opt = opt["point"].getlocal()
    end
    cnt = model.count( :conditions => ["mmm_ts=?", opt])
    #防止在过多mmm_ts字段的值都一样的情况下无法进行到下一步,这里一定要避免大量mmm_ts字段的值相同
    process_items_each_time = cnt + 1 if cnt >= process_items_each_time
    records = model.find(:all, :conditions => ["mmm_ts>=?", opt], :order => "mmm_ts", :limit => process_items_each_time)
    puts("#{records.length} records read from mysql")
    records.each do |record|
      record_hash = {}
      model.column_names.each do |column|
        record_hash[column] = record.read_attribute(column)
      end

      mongo_collection.update({:id => record.id}, record_hash, :upsert => true)
      opt = record.mmm_ts

    end
    operation_point.save(@operation_type, opt)
    puts("#{records.length} records saved to mongodb")
  end
end

#操作点
class OperationPoint
  def initialize(db)
    @table = db.collection("operation_point")
    @table.create_index("operation", :unique => true)
  end

  def save(operation, point)
    @table.update({:operation => operation}, {:operation => operation, :point => point}, :upsert => true)
  end

  def get(operation)
    return @table.find_one({:operation => operation})
  end
end


mmm = MMM.new(mysql_server[:main_db], mongo_server[:local], "export_mdb")
while (true) do
  begin
    mmm.migrate("mdb_production", "bet_plans", 3000, "mdb_production", "export_mdb")
  rescue Exception => ex
    puts ex.message
    puts ex.backtrace.join("\n")
    sleep(30)
  ensure
    sleep(0.001)
  end


end
 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics