-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathBulkInserter.cs
229 lines (188 loc) · 8.44 KB
/
BulkInserter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/*
* Written by Ronnie Overby
* and part of the Ronnie Overby Grab Bag: https://github.com/ronnieoverby/RonnieOverbyGrabBag
*/
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
// ReSharper disable CheckNamespace
namespace Overby.Data
// ReSharper restore CheckNamespace
{
public class BulkInsertEventArgs<T> : EventArgs
{
public T[] Items { get; private set; }
public DataTable DataTable { get; set; }
public BulkInsertEventArgs(T[] items, DataTable dataTable)
{
if (items == null) throw new ArgumentNullException("items");
if (dataTable == null) throw new ArgumentNullException("dataTable");
Items = items.ToArray();
DataTable = dataTable;
}
}
/// <summary>
/// Performs buffered bulk inserts into a sql server table using objects instead of DataRows. :)
/// </summary>
public class BulkInserter<T> : IDisposable where T : class
{
public string[] RemoveColumns { get; set; }
public event EventHandler<BulkInsertEventArgs<T>> PreBulkInsert;
public void OnPreBulkInsert(BulkInsertEventArgs<T> e)
{
var handler = PreBulkInsert;
if (handler != null) handler(this, e);
}
public event EventHandler<BulkInsertEventArgs<T>> PostBulkInsert;
public void OnPostBulkInsert(BulkInsertEventArgs<T> e)
{
var handler = PostBulkInsert;
if (handler != null) handler(this, e);
}
private const int DefaultBufferSize = 2000;
private readonly SqlConnection _connection;
private readonly int _bufferSize;
public int BufferSize { get { return _bufferSize; } }
public int InsertedCount { get; private set; }
private readonly Lazy<Dictionary<string, Func<T, object>>> _props =
new Lazy<Dictionary<string, Func<T, object>>>(GetPropertyInformation);
private readonly Lazy<DataTable> _dt;
private readonly bool _constructedSqlBulkCopy;
private readonly SqlBulkCopy _sbc;
private readonly List<T> _queue = new List<T>();
public TimeSpan? BulkCopyTimeout { get; set; }
private readonly SqlTransaction _tran;
/// <param name="connection">SqlConnection to use for retrieving the schema of sqlBulkCopy.DestinationTableName</param>
/// <param name="sqlBulkCopy">SqlBulkCopy to use for bulk insert.</param>
/// <param name="bufferSize">Number of rows to bulk insert at a time. The default is 2000.</param>
public BulkInserter(SqlConnection connection, SqlBulkCopy sqlBulkCopy, int bufferSize = DefaultBufferSize)
{
if (connection == null) throw new ArgumentNullException("connection");
if (sqlBulkCopy == null) throw new ArgumentNullException("sqlBulkCopy");
_bufferSize = bufferSize;
_connection = connection;
_sbc = sqlBulkCopy;
_dt = new Lazy<DataTable>(CreateDataTable);
}
/// <param name="connection">SqlConnection to use for retrieving the schema of sqlBulkCopy.DestinationTableName and for bulk insert.</param>
/// <param name="tableName">The name of the table that rows will be inserted into.</param>
/// <param name="bufferSize">Number of rows to bulk insert at a time. The default is 2000.</param>
/// <param name="copyOptions">Options for SqlBulkCopy.</param>
/// <param name="sqlTransaction">SqlTransaction for SqlBulkCopy</param>
public BulkInserter(SqlConnection connection, string tableName, int bufferSize = DefaultBufferSize,
SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default, SqlTransaction sqlTransaction = null)
: this(connection, new SqlBulkCopy(connection, copyOptions, sqlTransaction) { DestinationTableName = tableName }, bufferSize)
{
_tran = sqlTransaction;
_constructedSqlBulkCopy = true;
}
/// <summary>
/// Performs buffered bulk insert of enumerable items.
/// </summary>
/// <param name="items">The items to be inserted.</param>
public void Insert(IEnumerable<T> items)
{
if (items == null) throw new ArgumentNullException("items");
// get columns that have a matching property
var cols = _dt.Value.Columns.Cast<DataColumn>()
.Where(x => _props.Value.ContainsKey(x.ColumnName))
.Select(x => new { Column = x, Getter = _props.Value[x.ColumnName] })
.Where(x => x.Getter != null)
.ToArray();
foreach (var buffer in Buffer(items, BufferSize).Select(x => x.ToArray()))
{
foreach (var item in buffer)
{
var row = _dt.Value.NewRow();
foreach (var col in cols)
row[col.Column] = col.Getter(item) ?? DBNull.Value;
_dt.Value.Rows.Add(row);
}
var bulkInsertEventArgs = new BulkInsertEventArgs<T>(buffer, _dt.Value);
OnPreBulkInsert(bulkInsertEventArgs);
if (BulkCopyTimeout.HasValue)
_sbc.BulkCopyTimeout = (int)BulkCopyTimeout.Value.TotalSeconds;
_sbc.WriteToServer(_dt.Value);
OnPostBulkInsert(bulkInsertEventArgs);
InsertedCount += _dt.Value.Rows.Count;
_dt.Value.Clear();
}
}
public static IEnumerable<IEnumerable<T>> Buffer(IEnumerable<T> source, int bufferSize)
{
using (var enumerator = source.GetEnumerator())
while (enumerator.MoveNext())
yield return YieldBufferElements(enumerator, bufferSize - 1);
}
private static IEnumerable<T> YieldBufferElements(IEnumerator<T> source, int bufferSize)
{
yield return source.Current;
for (var i = 0; i < bufferSize && source.MoveNext(); i++)
yield return source.Current;
}
/// <summary>
/// Queues a single item for bulk insert. When the queue count reaches the buffer size, bulk insert will happen.
/// Call Flush() to manually bulk insert the currently queued items.
/// </summary>
/// <param name="item">The item to be inserted.</param>
public void Insert(T item)
{
if (item == null) throw new ArgumentNullException("item");
_queue.Add(item);
if (_queue.Count == _bufferSize)
Flush();
}
/// <summary>
/// Bulk inserts the currently queued items.
/// </summary>
public void Flush()
{
Insert(_queue);
_queue.Clear();
}
/// <summary>
/// Sets the InsertedCount property to zero.
/// </summary>
public void ResetInsertedCount()
{
InsertedCount = 0;
}
private static Dictionary<string, Func<T, object>> GetPropertyInformation()
{
return typeof(T).GetProperties().ToDictionary(x => x.Name, CreatePropertyGetter);
}
private static Func<T, object> CreatePropertyGetter(PropertyInfo propertyInfo)
{
if (typeof(T) != propertyInfo.DeclaringType)
throw new ArgumentException();
var instance = Expression.Parameter(propertyInfo.DeclaringType, "i");
var property = Expression.Property(instance, propertyInfo);
var convert = Expression.TypeAs(property, typeof(object));
return (Func<T, object>)Expression.Lambda(convert, instance).Compile();
}
private DataTable CreateDataTable()
{
var dt = new DataTable();
using (var cmd = _connection.CreateCommand())
{
cmd.Transaction = _tran;
cmd.CommandText = string.Format("select top 0 * from {0}", _sbc.DestinationTableName);
using (var reader = cmd.ExecuteReader())
dt.Load(reader);
}
if (RemoveColumns != null)
foreach (var col in RemoveColumns)
dt.Columns.Remove(col);
return dt;
}
public void Dispose()
{
if (_constructedSqlBulkCopy)
using (_sbc) _sbc.Close();
}
}
}