功能:
将mysql指定表的数据导入到mongodb的指定表,导入过程保证数据不丢失,如果数据有更新也会重新再导。
要求:
源表必须要有两个字段:id:主键 mmm_ts:最后更新时间戳
require 'rubygems'
require 'mongo'
require 'active_record'
mongo_server =
{:local =>
{:server => "localhost", :port => 27017}
}
mysql_server =
{:local =>
{
:adapter => 'mysql',
:host => 'localhost',
:username => 'root',
:password => '$$$',
:encoding => 'utf8'
} ,
:main_db =>
{
:adapter => 'mysql',
:host => 'mydb.com',
:username => 'abc',
:password => 'cba',
:encoding => 'utf8'
}
}
#从mysql指定的表中将数据导入到mongodb中
#源表必须有两个字段:id,自增, mmm_ts, 时间戳
#导数据的时候先按上次操作截止的时间戳获取指定数量的数据,然后再将数据存入到目标库中(如果id相同则更新数据)
#导入完毕更新截止点
class MMM
def initialize(mysql_server, mongo_server, operation_type)
@mysql_server= mysql_server
@mongo_server = mongo_server
@operation_type = operation_type
end
def migrate(source_db, source_table, process_items_each_time, target_db, target_table)
mongodb = Mongo::Connection.new(@mongo_server[:server], @mongo_server[:port]).db(target_db)
mongo_collection = mongodb.collection(target_table)
@mysql_server[:database] = source_db
ActiveRecord::Base.establish_connection(@mysql_server)
model = Class.new(ActiveRecord::Base) do
set_table_name source_table
end
operation_point = OperationPoint.new(mongodb)
opt = operation_point.get(@operation_type)
if opt == nil
opt = 0
mongo_collection.create_index("id", :unique => true)
else
opt = opt["point"].getlocal()
end
cnt = model.count( :conditions => ["mmm_ts=?", opt])
#防止在过多mmm_ts字段的值都一样的情况下无法进行到下一步,这里一定要避免大量mmm_ts字段的值相同
process_items_each_time = cnt + 1 if cnt >= process_items_each_time
records = model.find(:all, :conditions => ["mmm_ts>=?", opt], :order => "mmm_ts", :limit => process_items_each_time)
puts("#{records.length} records read from mysql")
records.each do |record|
record_hash = {}
model.column_names.each do |column|
record_hash[column] = record.read_attribute(column)
end
mongo_collection.update({:id => record.id}, record_hash, :upsert => true)
opt = record.mmm_ts
end
operation_point.save(@operation_type, opt)
puts("#{records.length} records saved to mongodb")
end
end
#操作点
class OperationPoint
def initialize(db)
@table = db.collection("operation_point")
@table.create_index("operation", :unique => true)
end
def save(operation, point)
@table.update({:operation => operation}, {:operation => operation, :point => point}, :upsert => true)
end
def get(operation)
return @table.find_one({:operation => operation})
end
end
mmm = MMM.new(mysql_server[:main_db], mongo_server[:local], "export_mdb")
while (true) do
begin
mmm.migrate("mdb_production", "bet_plans", 3000, "mdb_production", "export_mdb")
rescue Exception => ex
puts ex.message
puts ex.backtrace.join("\n")
sleep(30)
ensure
sleep(0.001)
end
end
分享到:
相关推荐
logstash抽取mongodb 和 mysql 的全套 ruby脚本,包含mongodb.rb、jdbc.rb、pipeline.rb、mongodb.conf、jdbc.conf
简单的 S3 备份一个简单的 Ruby 脚本,用于将 MySQL 数据库表、MongoDB 数据库、完整目录和单个文件组备份到 Amazon S3(简单存储服务)。 使用步骤: 设置 Amazon S3 账户: : 安装 aws/s3 Ruby gem: : 将 ...
VPS安装程序适用于Ubuntu 16.04 VPS(虚拟专用服务器)的自动安装程序脚本特征安装和配置bash(带有主题) 安装和配置oh-my-zsh(带有主题) 安装MySQL 5.7 安装PostgreSQL 9.6 安装MongoDB 3.6 安装Memcached 安装...
即时通讯平台。 Go中的后端。 客户端:Swift iOS,Java Android,JS webapp,可编写脚本的命令行; chatbots Tinode即时消息...持久存储RethinkDB,MySQL和MongoDB(实验性)。 还存在第三方不受支持的DynamoDB适配器
开发设置 动机 设置新的开发人员计算机可能是临时的,手动的且耗时的过程。... 通用数据存储:MySQL,PostgreSQL,MongoDB,Redis和Elasticsearch Javascript Web开发:Node.js,JSHint和更少 Android开发:Java,An
看看我或者打个招呼。 目录 编程语言/框架/平台 安卓 . AngularJS 主干JS C++ C C♯ 。网 Clojure 咖啡脚本 CSS 姜戈 EmberJS 二郎 高朗 HTML 离子 IOS Java JavaScript KnockoutJS 较少的 Lisp 节点 目标-C PHP ...
看看我或者打个招呼。 目录 编程语言/框架/平台 安卓 . AngularJS 主干JS C++ C C♯ 。网 Clojure 咖啡脚本 CSS 姜戈 EmberJS 二郎 高朗 HTML 离子 IOS Java JavaScript KnockoutJS 较少的 Lisp 节点 目标-C PHP ...
对于您的NodeJS,Python,PHP,ASP.NET,Ruby,MySQL,MongoDB,Postgres,WordPress(等)应用程序, CapRover是一个非常易于使用的应用程序/数据库部署和Web服务器管理器! 它以其简单易用的界面背后的底层功能...
java bitset 源码 后端开发者知识图谱 大体了解,按需精通 语言 ...Laravel JAVA Go JavaScript(TypeScript) ...服务器脚本、科学计算、...之外不是一个层面的东西。 Session、Cookie JWT Oauth SSO 测试 单元测试 mock TDD
永久存储是 ,MySQL或MongoDB中的任何一种。 还存在第三方不受支持的。 可以通过编写自定义适配器来支持其他数据库。 Tinode不是XMPP / Jabber。 它与XMPP不兼容。 它是XMPP的替代品。 从表面上看,它很像开源的...