@@ -6,11 +6,14 @@ const {
66 AsyncResource
77} = require ( './helpers/instrument' )
88const shimmer = require ( '../../datadog-shimmer' )
9+ const semver = require ( 'semver' )
910
10- addHook ( { name : 'mysql2' , file : 'lib/connection.js' , versions : [ '>=1' ] } , Connection => {
11+ addHook ( { name : 'mysql2' , file : 'lib/connection.js' , versions : [ '>=1' ] } , ( Connection , version ) => {
1112 const startCh = channel ( 'apm:mysql2:query:start' )
1213 const finishCh = channel ( 'apm:mysql2:query:finish' )
1314 const errorCh = channel ( 'apm:mysql2:query:error' )
15+ const startOuterQueryCh = channel ( 'datadog:mysql2:outerquery:start' )
16+ const shouldEmitEndAfterQueryAbort = semver . intersects ( version , '>=1.3.3' )
1417
1518 shimmer . wrap ( Connection . prototype , 'addCommand' , addCommand => function ( cmd ) {
1619 if ( ! startCh . hasSubscribers ) return addCommand . apply ( this , arguments )
@@ -28,6 +31,76 @@ addHook({ name: 'mysql2', file: 'lib/connection.js', versions: ['>=1'] }, Connec
2831 return asyncResource . bind ( addCommand , this ) . apply ( this , arguments )
2932 } )
3033
34+ shimmer . wrap ( Connection . prototype , 'query' , query => function ( sql , values , cb ) {
35+ if ( ! startOuterQueryCh . hasSubscribers ) return query . apply ( this , arguments )
36+
37+ if ( typeof sql === 'object' ) sql = sql ?. sql
38+
39+ if ( ! sql ) return query . apply ( this , arguments )
40+
41+ const abortController = new AbortController ( )
42+ startOuterQueryCh . publish ( { sql, abortController } )
43+
44+ if ( abortController . signal . aborted ) {
45+ const addCommand = this . addCommand
46+ this . addCommand = function ( cmd ) { return cmd }
47+
48+ let queryCommand
49+ try {
50+ queryCommand = query . apply ( this , arguments )
51+ } finally {
52+ this . addCommand = addCommand
53+ }
54+
55+ cb = queryCommand . onResult
56+
57+ process . nextTick ( ( ) => {
58+ if ( cb ) {
59+ cb ( abortController . signal . reason )
60+ } else {
61+ queryCommand . emit ( 'error' , abortController . signal . reason )
62+ }
63+
64+ if ( shouldEmitEndAfterQueryAbort ) {
65+ queryCommand . emit ( 'end' )
66+ }
67+ } )
68+
69+ return queryCommand
70+ }
71+
72+ return query . apply ( this , arguments )
73+ } )
74+
75+ shimmer . wrap ( Connection . prototype , 'execute' , execute => function ( sql , values , cb ) {
76+ if ( ! startOuterQueryCh . hasSubscribers ) return execute . apply ( this , arguments )
77+
78+ if ( typeof sql === 'object' ) sql = sql ?. sql
79+
80+ if ( ! sql ) return execute . apply ( this , arguments )
81+
82+ const abortController = new AbortController ( )
83+ startOuterQueryCh . publish ( { sql, abortController } )
84+
85+ if ( abortController . signal . aborted ) {
86+ const addCommand = this . addCommand
87+ this . addCommand = function ( cmd ) { return cmd }
88+
89+ let result
90+ try {
91+ result = execute . apply ( this , arguments )
92+ } finally {
93+ this . addCommand = addCommand
94+ }
95+
96+ result ?. onResult ( abortController . signal . reason )
97+
98+ return result
99+ }
100+
101+ return execute . apply ( this , arguments )
102+ } )
103+
31104 return Connection
32105
33106 function bindExecute ( cmd , execute , asyncResource ) {
@@ -79,3 +152,149 @@ addHook({ name: 'mysql2', file: 'lib/connection.js', versions: ['>=1'] }, Connec
79152 } , cmd ) )
80153 }
81154} )
155+
156+ addHook ( { name : 'mysql2' , file : 'lib/pool.js' , versions : [ '>=1' ] } , ( Pool , version ) => {
157+ const startOuterQueryCh = channel ( 'datadog:mysql2:outerquery:start' )
158+ const shouldEmitEndAfterQueryAbort = semver . intersects ( version , '>=1.3.3' )
159+
160+ shimmer . wrap ( Pool . prototype , 'query' , query => function ( sql , values , cb ) {
161+ if ( ! startOuterQueryCh . hasSubscribers ) return query . apply ( this , arguments )
162+
163+ if ( typeof sql === 'object' ) sql = sql ?. sql
164+
165+ if ( ! sql ) return query . apply ( this , arguments )
166+
167+ const abortController = new AbortController ( )
168+ startOuterQueryCh . publish ( { sql, abortController } )
169+
170+ if ( abortController . signal . aborted ) {
171+ const getConnection = this . getConnection
172+ this . getConnection = function ( ) { }
173+
174+ let queryCommand
175+ try {
176+ queryCommand = query . apply ( this , arguments )
177+ } finally {
178+ this . getConnection = getConnection
179+ }
180+
181+ process . nextTick ( ( ) => {
182+ if ( queryCommand . onResult ) {
183+ queryCommand . onResult ( abortController . signal . reason )
184+ } else {
185+ queryCommand . emit ( 'error' , abortController . signal . reason )
186+ }
187+
188+ if ( shouldEmitEndAfterQueryAbort ) {
189+ queryCommand . emit ( 'end' )
190+ }
191+ } )
192+
193+ return queryCommand
194+ }
195+
196+ return query . apply ( this , arguments )
197+ } )
198+
199+ shimmer . wrap ( Pool . prototype , 'execute' , execute => function ( sql , values , cb ) {
200+ if ( ! startOuterQueryCh . hasSubscribers ) return execute . apply ( this , arguments )
201+
202+ if ( typeof sql === 'object' ) sql = sql ?. sql
203+
204+ if ( ! sql ) return execute . apply ( this , arguments )
205+
206+ const abortController = new AbortController ( )
207+ startOuterQueryCh . publish ( { sql, abortController } )
208+
209+ if ( abortController . signal . aborted ) {
210+ if ( typeof values === 'function' ) {
211+ cb = values
212+ }
213+
214+ process . nextTick ( ( ) => {
215+ cb ( abortController . signal . reason )
216+ } )
217+ return
218+ }
219+
220+ return execute . apply ( this , arguments )
221+ } )
222+
223+ return Pool
224+ } )
225+
226+ // PoolNamespace.prototype.query does not exist in mysql2<2.3.0
227+ addHook ( { name : 'mysql2' , file : 'lib/pool_cluster.js' , versions : [ '>=2.3.0' ] } , PoolCluster => {
228+ const startOuterQueryCh = channel ( 'datadog:mysql2:outerquery:start' )
229+ const wrappedPoolNamespaces = new WeakSet ( )
230+
231+ shimmer . wrap ( PoolCluster . prototype , 'of' , of => function ( ) {
232+ const poolNamespace = of . apply ( this , arguments )
233+
234+ if ( startOuterQueryCh . hasSubscribers && ! wrappedPoolNamespaces . has ( poolNamespace ) ) {
235+ shimmer . wrap ( poolNamespace , 'query' , query => function ( sql , values , cb ) {
236+ if ( typeof sql === 'object' ) sql = sql ?. sql
237+
238+ if ( ! sql ) return query . apply ( this , arguments )
239+
240+ const abortController = new AbortController ( )
241+ startOuterQueryCh . publish ( { sql, abortController } )
242+
243+ if ( abortController . signal . aborted ) {
244+ const getConnection = this . getConnection
245+ this . getConnection = function ( ) { }
246+
247+ let queryCommand
248+ try {
249+ queryCommand = query . apply ( this , arguments )
250+ } finally {
251+ this . getConnection = getConnection
252+ }
253+
254+ process . nextTick ( ( ) => {
255+ if ( queryCommand . onResult ) {
256+ queryCommand . onResult ( abortController . signal . reason )
257+ } else {
258+ queryCommand . emit ( 'error' , abortController . signal . reason )
259+ }
260+
261+ queryCommand . emit ( 'end' )
262+ } )
263+
264+ return queryCommand
265+ }
266+
267+ return query . apply ( this , arguments )
268+ } )
269+
270+ shimmer . wrap ( poolNamespace , 'execute' , execute => function ( sql , values , cb ) {
271+ if ( typeof sql === 'object' ) sql = sql ?. sql
272+
273+ if ( ! sql ) return execute . apply ( this , arguments )
274+
275+ const abortController = new AbortController ( )
276+ startOuterQueryCh . publish ( { sql, abortController } )
277+
278+ if ( abortController . signal . aborted ) {
279+ if ( typeof values === 'function' ) {
280+ cb = values
281+ }
282+
283+ process . nextTick ( ( ) => {
284+ cb ( abortController . signal . reason )
285+ } )
286+
287+ return
288+ }
289+
290+ return execute . apply ( this , arguments )
291+ } )
292+
293+ wrappedPoolNamespaces . add ( poolNamespace )
294+ }
295+
296+ return poolNamespace
297+ } )
298+
299+ return PoolCluster
300+ } )
0 commit comments