在之前只知道SqlServer支持数据批量插入,殊不知道Oracle、SQLite和MySql也是支持的,不过Oracle需要使用Orace.DataAccess驱动,今天就贴出几种数据库的批量插入解决方法。
首先说一下,IProvider里有一个用于实现批量插入的插件服务接口IBatcherProvider,此接口在前一篇文章中已经提到过了。
///
<summary>
///
提供数据批量处理的方法。
///
</summary>
public
interface
IBatcherProvider : IProviderService
{
///
<summary>
///
将
<see cref="DataTable"/>
的数据批量插入到数据库中。
///
</summary>
///
<param name="dataTable">
要批量插入的
<see cref="DataTable"/>
。
</param>
///
<param name="batchSize">
每批次写入的数据量。
</param>
void
Insert(DataTable dataTable,
int
batchSize =
10000
);
}
一、SqlServer数据批量插入
SqlServer的批量插入很简单,使用SqlBulkCopy就可以,以下是该类的实现:
///
<summary>
///
为 System.Data.SqlClient 提供的用于批量操作的方法。
///
</summary>
public
sealed
class
MsSqlBatcher : IBatcherProvider
{
///
<summary>
///
获取或设置提供者服务的上下文。
///
</summary>
public
ServiceContext ServiceContext {
get
;
set
; }
///
<summary>
///
将
<see cref="DataTable"/>
的数据批量插入到数据库中。
///
</summary>
///
<param name="dataTable">
要批量插入的
<see cref="DataTable"/>
。
</param>
///
<param name="batchSize">
每批次写入的数据量。
</param>
public
void
Insert(DataTable dataTable,
int
batchSize =
10000
)
{
Checker.ArgumentNull(dataTable,
"
dataTable
"
);
if
(dataTable.Rows.Count ==
0
)
{
return
;
}
using
(
var
connection =
(SqlConnection)ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
//
给表名加上前后导符
var
tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>
(), dataTable.TableName);
using
(
var
bulk =
new
SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity,
null
)
{
DestinationTableName
=
tableName,
BatchSize
=
batchSize
})
{
//
循环所有列,为bulk添加映射
dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !
c.AutoIncrement);
bulk.WriteToServer(dataTable);
bulk.Close();
}
}
catch
(Exception exp)
{
throw
new
BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
}
SqlBulkCopy的ColumnMappings中列的名称受大小写敏感限制,因此在构造DataTable的时候应请注意列名要与表一致。
以上没有使用事务,使用事务在性能上会有一定的影响,如果要使用事务,可以设置SqlBulkCopyOptions.UseInternalTransaction。
二、Oracle数据批量插入
System.Data.OracleClient不支持批量插入,因此只能使用Oracle.DataAccess组件来作为提供者。
///
<summary>
///
Oracle.Data.Access 组件提供的用于批量操作的方法。
///
</summary>
public
sealed
class
OracleAccessBatcher : IBatcherProvider
{
///
<summary>
///
获取或设置提供者服务的上下文。
///
</summary>
public
ServiceContext ServiceContext {
get
;
set
; }
///
<summary>
///
将
<see cref="DataTable"/>
的数据批量插入到数据库中。
///
</summary>
///
<param name="dataTable">
要批量插入的
<see cref="DataTable"/>
。
</param>
///
<param name="batchSize">
每批次写入的数据量。
</param>
public
void
Insert(DataTable dataTable,
int
batchSize =
10000
)
{
Checker.ArgumentNull(dataTable,
"
dataTable
"
);
if
(dataTable.Rows.Count ==
0
)
{
return
;
}
using
(
var
connection =
ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
using
(
var
command =
ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if
(command ==
null
)
{
throw
new
BatcherException(
new
ArgumentException(
"
command
"
));
}
command.Connection
=
connection;
command.CommandText
=
GenerateInserSql(ServiceContext.Database, command, dataTable);
command.ExecuteNonQuery();
}
}
catch
(Exception exp)
{
throw
new
BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
///
<summary>
///
生成插入数据的sql语句。
///
</summary>
///
<param name="database"></param>
///
<param name="command"></param>
///
<param name="table"></param>
///
<returns></returns>
private
string
GenerateInserSql(IDatabase database, DbCommand command, DataTable table)
{
var
names =
new
StringBuilder();
var
values =
new
StringBuilder();
//
将一个DataTable的数据转换为数组的数组
var
data =
table.ToArray();
//
设置ArrayBindCount属性
command.GetType().GetProperty(
"
ArrayBindCount
"
).SetValue(command, table.Rows.Count,
null
);
var
syntax = database.Provider.GetService<ISyntaxProvider>
();
for
(
var
i =
0
; i < table.Columns.Count; i++
)
{
var
column =
table.Columns[i];
var
parameter =
database.Provider.DbProviderFactory.CreateParameter();
if
(parameter ==
null
)
{
continue
;
}
parameter.ParameterName
=
column.ColumnName;
parameter.Direction
=
ParameterDirection.Input;
parameter.DbType
=
column.DataType.GetDbType();
parameter.Value
=
data[i];
if
(names.Length >
0
)
{
names.Append(
"
,
"
);
values.Append(
"
,
"
);
}
names.AppendFormat(
"
{0}
"
, DbUtility.FormatByQuote(syntax, column.ColumnName));
values.AppendFormat(
"
{0}{1}
"
, syntax.ParameterPrefix, column.ColumnName);
command.Parameters.Add(parameter);
}
return
string
.Format(
"
INSERT INTO {0}({1}) VALUES ({2})
"
, DbUtility.FormatByQuote(syntax, table.TableName), names, values);
}
}
以上最重要的一步,就是将DataTable转为数组的数组表示,即object[][],前数组的上标是列的个数,后数组是行的个数,因此循环 Columns将后数组作为Parameter的值,也就是说,参数的值是一个数组。而insert语句与一般的插入语句没有什么不一样。
三、SQLite数据批量插入
SQLite的批量插入只需开启事务就可以了,这个具体的原理不得而知。
public
sealed
class
SQLiteBatcher : IBatcherProvider
{
///
<summary>
///
获取或设置提供者服务的上下文。
///
</summary>
public
ServiceContext ServiceContext {
get
;
set
; }
///
<summary>
///
将
<see cref="DataTable"/>
的数据批量插入到数据库中。
///
</summary>
///
<param name="dataTable">
要批量插入的
<see cref="DataTable"/>
。
</param>
///
<param name="batchSize">
每批次写入的数据量。
</param>
public
void
Insert(DataTable dataTable,
int
batchSize =
10000
)
{
Checker.ArgumentNull(dataTable,
"
dataTable
"
);
if
(dataTable.Rows.Count ==
0
)
{
return
;
}
using
(
var
connection =
ServiceContext.Database.CreateConnection())
{
DbTransaction transcation
=
null
;
try
{
connection.TryOpen();
transcation
=
connection.BeginTransaction();
using
(
var
command =
ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if
(command ==
null
)
{
throw
new
BatcherException(
new
ArgumentException(
"
command
"
));
}
command.Connection
=
connection;
command.CommandText
=
GenerateInserSql(ServiceContext.Database, dataTable);
if
(command.CommandText ==
string
.Empty)
{
return
;
}
var
flag =
new
AssertFlag();
dataTable.EachRow(row
=>
{
var
first =
flag.AssertTrue();
ProcessCommandParameters(dataTable, command, row, first);
command.ExecuteNonQuery();
});
}
transcation.Commit();
}
catch
(Exception exp)
{
if
(transcation !=
null
)
{
transcation.Rollback();
}
throw
new
BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
private
void
ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row,
bool
first)
{
for
(
var
c =
0
; c < dataTable.Columns.Count; c++
)
{
DbParameter parameter;
//
首次创建参数,是为了使用缓存
if
(first)
{
parameter
=
ServiceContext.Database.Provider.DbProviderFactory.CreateParameter();
parameter.ParameterName
=
dataTable.Columns[c].ColumnName;
command.Parameters.Add(parameter);
}
else
{
parameter
=
command.Parameters[c];
}
parameter.Value
=
row[c];
}
}
///
<summary>
///
生成插入数据的sql语句。
///
</summary>
///
<param name="database"></param>
///
<param name="table"></param>
///
<returns></returns>
private
string
GenerateInserSql(IDatabase database, DataTable table)
{
var
syntax = database.Provider.GetService<ISyntaxProvider>
();
var
names =
new
StringBuilder();
var
values =
new
StringBuilder();
var
flag =
new
AssertFlag();
table.EachColumn(column
=>
{
if
(!
flag.AssertTrue())
{
names.Append(
"
,
"
);
values.Append(
"
,
"
);
}
names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName));
values.AppendFormat(
"
{0}{1}
"
, syntax.ParameterPrefix, column.ColumnName);
});
return
string
.Format(
"
INSERT INTO {0}({1}) VALUES ({2})
"
, DbUtility.FormatByQuote(syntax, table.TableName), names, values);
}
}
四、MySql数据批量插入
///
<summary>
///
为 MySql.Data 组件提供的用于批量操作的方法。
///
</summary>
public
sealed
class
MySqlBatcher : IBatcherProvider
{
///
<summary>
///
获取或设置提供者服务的上下文。
///
</summary>
public
ServiceContext ServiceContext {
get
;
set
; }
///
<summary>
///
将
<see cref="DataTable"/>
的数据批量插入到数据库中。
///
</summary>
///
<param name="dataTable">
要批量插入的
<see cref="DataTable"/>
。
</param>
///
<param name="batchSize">
每批次写入的数据量。
</param>
public
void
Insert(DataTable dataTable,
int
batchSize =
10000
)
{
Checker.ArgumentNull(dataTable,
"
dataTable
"
);
if
(dataTable.Rows.Count ==
0
)
{
return
;
}
using
(
var
connection =
ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
using
(
var
command =
ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if
(command ==
null
)
{
throw
new
BatcherException(
new
ArgumentException(
"
command
"
));
}
command.Connection
=
connection;
command.CommandText
=
GenerateInserSql(ServiceContext.Database, command, dataTable);
if
(command.CommandText ==
string
.Empty)
{
return
;
}
command.ExecuteNonQuery();
}
}
catch
(Exception exp)
{
throw
new
BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
///
<summary>
///
生成插入数据的sql语句。
///
</summary>
///
<param name="database"></param>
///
<param name="command"></param>
///
<param name="table"></param>
///
<returns></returns>
private
string
GenerateInserSql(IDatabase database, DbCommand command, DataTable table)
{
var
names =
new
StringBuilder();
var
values =
new
StringBuilder();
var
types =
new
List<DbType>
();
var
count =
table.Columns.Count;
var
syntax = database.Provider.GetService<ISyntaxProvider>
();
table.EachColumn(c
=>
{
if
(names.Length >
0
)
{
names.Append(
"
,
"
);
}
names.AppendFormat(
"
{0}
"
, DbUtility.FormatByQuote(syntax, c.ColumnName));
types.Add(c.DataType.GetDbType());
});
var
i =
0
;
foreach
(DataRow row
in
table.Rows)
{
if
(i >
0
)
{
values.Append(
"
,
"
);
}
values.Append(
"
(
"
);
for
(
var
j =
0
; j < count; j++
)
{
if
(j >
0
)
{
values.Append(
"
,
"
);
}
var
isStrType =
IsStringType(types[j]);
var
parameter =
CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j);
if
(parameter !=
null
)
{
values.Append(parameter.ParameterName);
command.Parameters.Add(parameter);
}
else
if
(isStrType)
{
values.AppendFormat(
"
'{0}'
"
, row[j]);
}
else
{
values.Append(row[j]);
}
}
values.Append(
"
)
"
);
i
++
;
}
return
string
.Format(
"
INSERT INTO {0}({1}) VALUES {2}
"
, DbUtility.FormatByQuote(syntax, table.TableName), names, values);
}
///
<summary>
///
判断是否为字符串类别。
///
</summary>
///
<param name="dbType"></param>
///
<returns></returns>
private
bool
IsStringType(DbType dbType)
{
return
dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType ==
DbType.StringFixedLength;
}
///
<summary>
///
创建参数。
///
</summary>
///
<param name="provider"></param>
///
<param name="isStrType"></param>
///
<param name="dbType"></param>
///
<param name="value"></param>
///
<param name="parPrefix"></param>
///
<param name="row"></param>
///
<param name="col"></param>
///
<returns></returns>
private
DbParameter CreateParameter(IProvider provider,
bool
isStrType, DbType dbType,
object
value,
char
parPrefix,
int
row,
int
col)
{
//
如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数
if
((isStrType && value.ToString().IndexOf(
'
\'
'
) != -
1
) || dbType ==
DbType.DateTime)
{
var
name =
string
.Format(
"
{0}p_{1}_{2}
"
, parPrefix, row, col);
var
parameter =
provider.DbProviderFactory.CreateParameter();
parameter.ParameterName
=
name;
parameter.Direction
=
ParameterDirection.Input;
parameter.DbType
=
dbType;
parameter.Value
=
value;
return
parameter;
}
return
null
;
}
}
MySql的批量插入,是将值全部写在语句的values里,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。
五、测试
接下来写一个测试用例来看一下使用批量插入的效果。
[Test]
public
void
TestBatchInsert()
{
Console.WriteLine(TimeWatcher.Watch(()
=>
InvokeTest(database
=>
{
var
table =
new
DataTable(
"
Batcher
"
);
table.Columns.Add(
"
Id
"
,
typeof
(
int
));
table.Columns.Add(
"
Name1
"
,
typeof
(
string
));
table.Columns.Add(
"
Name2
"
,
typeof
(
string
));
table.Columns.Add(
"
Name3
"
,
typeof
(
string
));
table.Columns.Add(
"
Name4
"
,
typeof
(
string
));
//
构造100000条数据
for
(
var
i =
0
; i <
100000
; i++
)
{
table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString());
}
//
获取 IBatcherProvider
var
batcher = database.Provider.GetService<IBatcherProvider>
();
if
(batcher ==
null
)
{
Console.WriteLine(
"
不支持批量插入。
"
);
}
else
{
batcher.Insert(table);
}
//
输出batcher表的数据量
var
sql =
new
SqlCommand(
"
SELECT COUNT(1) FROM Batcher
"
);
Console.WriteLine(
"
当前共有 {0} 条数据
"
, database.ExecuteScalar(sql));
})));
}

